diff options
-rwxr-xr-x | hosts/vidhar/borg/copy.py | 214 |
1 files changed, 113 insertions, 101 deletions
diff --git a/hosts/vidhar/borg/copy.py b/hosts/vidhar/borg/copy.py index f685a490..e7d0becc 100755 --- a/hosts/vidhar/borg/copy.py +++ b/hosts/vidhar/borg/copy.py | |||
@@ -104,9 +104,9 @@ def copy_archive(src_repo_path, dst_repo_path, entry): | |||
104 | else: | 104 | else: |
105 | print(f'Will process {entry["name"]} ({dateutil.parser.isoparse(entry["start"])}, cache_suffix={cache_suffix})', file=stderr) | 105 | print(f'Will process {entry["name"]} ({dateutil.parser.isoparse(entry["start"])}, cache_suffix={cache_suffix})', file=stderr) |
106 | with TemporaryDirectory(prefix=f'borg-mount_{entry["name"]}_', dir=os.environ.get('RUNTIME_DIRECTORY')) as tmpdir: | 106 | with TemporaryDirectory(prefix=f'borg-mount_{entry["name"]}_', dir=os.environ.get('RUNTIME_DIRECTORY')) as tmpdir: |
107 | dir = pathlib.Path('/borg') | 107 | child = os.fork() |
108 | 108 | if child == 0: | |
109 | def preexec_fn(): | 109 | # print('unshare/chroot', file=stderr) |
110 | unshare.unshare(unshare.CLONE_NEWNS) | 110 | unshare.unshare(unshare.CLONE_NEWNS) |
111 | subprocess.run(['mount', '--make-rprivate', '/'], check=True) | 111 | subprocess.run(['mount', '--make-rprivate', '/'], check=True) |
112 | chroot = pathlib.Path(tmpdir) / 'chroot' | 112 | chroot = pathlib.Path(tmpdir) / 'chroot' |
@@ -128,111 +128,123 @@ def copy_archive(src_repo_path, dst_repo_path, entry): | |||
128 | subprocess.run(['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], check=True) | 128 | subprocess.run(['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], check=True) |
129 | os.chroot(chroot) | 129 | os.chroot(chroot) |
130 | os.chdir('/') | 130 | os.chdir('/') |
131 | dir = pathlib.Path('/borg') | ||
131 | dir.mkdir(parents=True,exist_ok=True,mode=750) | 132 | dir.mkdir(parents=True,exist_ok=True,mode=750) |
132 | os.chown(dir, borg_pwd.pw_uid, borg_pwd.pw_gid) | 133 | os.chown(dir, borg_pwd.pw_uid, borg_pwd.pw_gid) |
133 | as_borg(caps={Cap.DAC_READ_SEARCH}) | ||
134 | 134 | ||
135 | total_size = None | 135 | total_size = None |
136 | total_files = None | 136 | total_files = None |
137 | with Halo(text=f'Determine size', **halo_args) as sp: | 137 | if stderr.isatty(): |
138 | with subprocess.Popen(['borg', 'info', '--info', '--json', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}'], stdout=subprocess.PIPE, text=True, preexec_fn=lambda: as_borg()) as proc: | 138 | with Halo(text=f'Determine size', **halo_args) as sp: |
139 | stats = json.load(proc.stdout)['archives'][0]['stats'] | 139 | with subprocess.Popen(['borg', 'info', '--info', '--json', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}'], stdout=subprocess.PIPE, text=True, preexec_fn=lambda: as_borg()) as proc: |
140 | total_size = stats['original_size'] | 140 | stats = json.load(proc.stdout)['archives'][0]['stats'] |
141 | total_files = stats['nfiles'] | 141 | total_size = stats['original_size'] |
142 | if sp.enabled: | 142 | total_files = stats['nfiles'] |
143 | sp.succeed(f'{total_files} files, {naturalsize(total_size, binary=True)}') | 143 | if sp.enabled: |
144 | else: | 144 | sp.succeed(f'{total_files} files, {naturalsize(total_size, binary=True)}') |
145 | print(f'{total_files} files, {naturalsize(total_size, binary=True)}', file=stderr) | 145 | else: |
146 | # print(f'Mounting to {dir}', file=stderr) | 146 | print(f'{total_files} files, {naturalsize(total_size, binary=True)}', file=stderr) |
147 | with subprocess.Popen(['borg', 'mount', '--foreground', '--progress', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}', dir], preexec_fn=lambda: as_borg()) as mount_proc: | 147 | # print(f'Mounting to {dir}', file=stderr) |
148 | with Halo(text='Waiting for mount', **halo_args) as sp: | 148 | with subprocess.Popen(['borg', 'mount', '--foreground', '--progress', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}', dir], preexec_fn=lambda: as_borg()) as mount_proc: |
149 | wait_start = datetime.now() | 149 | with Halo(text='Waiting for mount', **halo_args) as sp: |
150 | wait_start = datetime.now() | ||
151 | while True: | ||
152 | ret = subprocess.run(['mountpoint', '-q', dir]) | ||
153 | if ret.returncode == 0: | ||
154 | break | ||
155 | elif datetime.now() - wait_start > timedelta(minutes=10): | ||
156 | ret.check_returncode() | ||
157 | sleep(0.1) | ||
158 | if sp.enabled: | ||
159 | sp.succeed('Mounted') | ||
160 | else: | ||
161 | print('Mounted', file=stderr) | ||
150 | while True: | 162 | while True: |
151 | ret = subprocess.run(['mountpoint', '-q', dir]) | 163 | try: |
152 | if ret.returncode == 0: | 164 | with tqdm(total=total_size, unit_scale=True, unit_divisor=1024, unit='B', smoothing=0.01, disable=None, dynamic_ncols=True, maxinterval=0.5, miniters=1) as progress: |
153 | break | 165 | seen = 0 |
154 | elif datetime.now() - wait_start > timedelta(minutes=10): | 166 | env = os.environ.copy() |
155 | ret.check_returncode() | 167 | create_args = ['borg', |
156 | sleep(0.1) | 168 | 'create', |
157 | if sp.enabled: | 169 | '--lock-wait=600', |
158 | sp.succeed('Mounted') | 170 | '--one-file-system', |
159 | else: | 171 | '--compression=auto,zstd,10', |
160 | print('Mounted', file=stderr) | 172 | '--chunker-params=10,23,16,4095', |
161 | while True: | 173 | '--files-cache=ctime,size', |
162 | try: | 174 | '--show-rc', |
163 | with tqdm(total=total_size, unit_scale=True, unit_divisor=1024, unit='B', smoothing=0.01, disable=None, dynamic_ncols=True, maxinterval=0.5, miniters=1) as progress: | 175 | # '--remote-ratelimit=20480', |
164 | seen = 0 | 176 | '--log-json', |
165 | env = os.environ.copy() | 177 | '--progress', |
166 | create_args = ['borg', | 178 | '--list', |
167 | 'create', | 179 | '--filter=AMEi-x?', |
168 | '--lock-wait=600', | 180 | '--stats' |
169 | '--one-file-system', | 181 | ] |
170 | '--compression=auto,zstd,10', | 182 | archive_time = datetime.strptime(entry["time"], "%Y-%m-%dT%H:%M:%S.%f").replace(tzinfo=tzlocal()).astimezone(tzutc()) |
171 | '--chunker-params=10,23,16,4095', | 183 | create_args += [f'--timestamp={archive_time.strftime("%Y-%m-%dT%H:%M:%S")}'] |
172 | '--files-cache=ctime,size', | 184 | if cache_suffix: |
173 | '--show-rc', | 185 | env['BORG_FILES_CACHE_SUFFIX'] = cache_suffix |
174 | # '--remote-ratelimit=20480', | 186 | else: |
175 | '--log-json', | 187 | create_args += ['--files-cache=disabled'] |
176 | '--progress', | 188 | create_args += [f'{dst_repo_path}::{entry["name"]}', '.'] |
177 | '--list', | 189 | with subprocess.Popen(create_args, cwd=dir, stdin=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True, env=env, preexec_fn=lambda: as_borg(caps={Cap.DAC_READ_SEARCH})) as proc: |
178 | '--filter=AMEi-x?', | 190 | last_list = None |
179 | '--stats' | 191 | last_list_time = None |
180 | ] | 192 | for line in proc.stderr: |
181 | archive_time = datetime.strptime(entry["time"], "%Y-%m-%dT%H:%M:%S.%f").replace(tzinfo=tzlocal()).astimezone(tzutc()) | 193 | try: |
182 | create_args += [f'--timestamp={archive_time.strftime("%Y-%m-%dT%H:%M:%S")}'] | 194 | json_line = json.loads(line) |
183 | if cache_suffix: | 195 | except json.decoder.JSONDecodeError: |
184 | env['BORG_FILES_CACHE_SUFFIX'] = cache_suffix | 196 | tqdm.write(line) |
185 | else: | 197 | continue |
186 | create_args += ['--files-cache=disabled'] | ||
187 | create_args += [f'{dst_repo_path}::{entry["name"]}', '.'] | ||
188 | with subprocess.Popen(create_args, cwd=dir, stdin=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True, env=env, preexec_fn=preexec_fn) as proc: | ||
189 | last_list = None | ||
190 | last_list_time = None | ||
191 | for line in proc.stderr: | ||
192 | try: | ||
193 | json_line = json.loads(line) | ||
194 | except json.decoder.JSONDecodeError: | ||
195 | tqdm.write(line) | ||
196 | continue | ||
197 | 198 | ||
198 | t = '' | 199 | t = '' |
199 | if 'time' in json_line and stderr.isatty(): | 200 | if 'time' in json_line and stderr.isatty(): |
200 | ts = datetime.fromtimestamp(json_line['time']).replace(tzinfo=tzlocal()) | 201 | ts = datetime.fromtimestamp(json_line['time']).replace(tzinfo=tzlocal()) |
201 | t = f'{ts.isoformat(timespec="minutes")} ' | 202 | t = f'{ts.isoformat(timespec="minutes")} ' |
202 | if json_line['type'] == 'archive_progress': | 203 | if json_line['type'] == 'archive_progress': |
203 | if last_list_time is None or ((datetime.now() - last_list_time) // timedelta(seconds=3)) % 2 == 1: | 204 | if last_list_time is None or ((datetime.now() - last_list_time) // timedelta(seconds=3)) % 2 == 1: |
204 | if 'path' in json_line and json_line['path']: | 205 | if 'path' in json_line and json_line['path']: |
205 | progress.set_description(f'… {json_line["path"]}', refresh=False) | 206 | progress.set_description(f'… {json_line["path"]}', refresh=False) |
206 | else: | 207 | else: |
207 | progress.set_description(None, refresh=False) | 208 | progress.set_description(None, refresh=False) |
208 | elif last_list is not None: | 209 | elif last_list is not None: |
210 | progress.set_description(last_list, refresh=False) | ||
211 | nfiles=json_line["nfiles"] | ||
212 | if total_files is not None: | ||
213 | nfiles=f'{json_line["nfiles"]}/{total_files}' | ||
214 | progress.set_postfix(compressed=naturalsize(json_line['compressed_size'], binary=True), deduplicated=naturalsize(json_line['deduplicated_size'], binary=True), nfiles=nfiles, refresh=False) | ||
215 | progress.update(json_line["original_size"] - seen) | ||
216 | seen = json_line["original_size"] | ||
217 | elif json_line['type'] == 'file_status': | ||
218 | # tqdm.write(t + f'{json_line["status"]} {json_line["path"]}') | ||
219 | last_list = f'{json_line["status"]} {json_line["path"]}' | ||
220 | last_list_time = datetime.now() | ||
209 | progress.set_description(last_list, refresh=False) | 221 | progress.set_description(last_list, refresh=False) |
210 | progress.set_postfix(compressed=naturalsize(json_line['compressed_size'], binary=True), deduplicated=naturalsize(json_line['deduplicated_size'], binary=True), nfiles=f'{json_line["nfiles"]}/{total_files}', refresh=False) | 222 | if not stderr.isatty(): |
211 | progress.update(json_line["original_size"] - seen) | 223 | print(last_list, file=stderr) |
212 | seen = json_line["original_size"] | 224 | elif (json_line['type'] == 'log_message' or json_line['type'] == 'progress_message' or json_line['type'] == 'progress_percent') and ('message' in json_line or 'msgid' in json_line): |
213 | elif json_line['type'] == 'file_status': | 225 | if 'message' in json_line: |
214 | # tqdm.write(t + f'{json_line["status"]} {json_line["path"]}') | 226 | tqdm.write(t + json_line['message']) |
215 | last_list = f'{json_line["status"]} {json_line["path"]}' | 227 | elif 'msgid' in json_line: |
216 | last_list_time = datetime.now() | 228 | tqdm.write(t + json_line['msgid']) |
217 | progress.set_description(last_list, refresh=False) | 229 | else: |
218 | if not stderr.isatty(): | 230 | tqdm.write(t + line) |
219 | print(last_list, file=stderr) | 231 | progress.set_description(None) |
220 | elif (json_line['type'] == 'log_message' or json_line['type'] == 'progress_message' or json_line['type'] == 'progress_percent') and ('message' in json_line or 'msgid' in json_line): | 232 | if proc.wait() != 0: |
221 | if 'message' in json_line: | 233 | continue |
222 | tqdm.write(t + json_line['message']) | 234 | except subprocess.CalledProcessError as err: |
223 | elif 'msgid' in json_line: | 235 | print(err, file=stderr) |
224 | tqdm.write(t + json_line['msgid']) | 236 | continue |
225 | else: | 237 | else: |
226 | tqdm.write(t + line) | 238 | break |
227 | progress.set_description(None) | 239 | mount_proc.terminate() |
228 | if proc.wait() != 0: | 240 | os._exit(0) |
229 | continue | 241 | else: |
230 | except subprocess.CalledProcessError as err: | 242 | while True: |
231 | print(err, file=stderr) | 243 | waitpid, waitret = os.wait() |
232 | continue | 244 | if waitret != 0: |
233 | else: | 245 | sys.exit(waitret) |
246 | if waitpid == child: | ||
234 | break | 247 | break |
235 | mount_proc.terminate() | ||
236 | 248 | ||
237 | def sigterm(signum, frame): | 249 | def sigterm(signum, frame): |
238 | raise SystemExit(128 + signum) | 250 | raise SystemExit(128 + signum) |