diff options
author | Gregor Kleen <gkleen@yggdrasil.li> | 2022-11-07 20:51:39 +0100 |
---|---|---|
committer | Gregor Kleen <gkleen@yggdrasil.li> | 2022-11-07 20:51:39 +0100 |
commit | 0e9f1e85cd8c6f9d546ef88e971043b909017170 (patch) | |
tree | 5cb4d14df7594ef123f20d82cb2ec423b6bca744 /modules/borgcopy/copy/copy_borg | |
parent | f563ddece04adfd8d80d4e984405f5c70a6c94f3 (diff) | |
download | nixos-0e9f1e85cd8c6f9d546ef88e971043b909017170.tar nixos-0e9f1e85cd8c6f9d546ef88e971043b909017170.tar.gz nixos-0e9f1e85cd8c6f9d546ef88e971043b909017170.tar.bz2 nixos-0e9f1e85cd8c6f9d546ef88e971043b909017170.tar.xz nixos-0e9f1e85cd8c6f9d546ef88e971043b909017170.zip |
...
Diffstat (limited to 'modules/borgcopy/copy/copy_borg')
-rwxr-xr-x | modules/borgcopy/copy/copy_borg/__main__.py | 556 |
1 files changed, 556 insertions, 0 deletions
diff --git a/modules/borgcopy/copy/copy_borg/__main__.py b/modules/borgcopy/copy/copy_borg/__main__.py new file mode 100755 index 00000000..5b374d99 --- /dev/null +++ b/modules/borgcopy/copy/copy_borg/__main__.py | |||
@@ -0,0 +1,556 @@ | |||
1 | #!@python@/bin/python | ||
2 | |||
3 | import json | ||
4 | import os | ||
5 | import subprocess | ||
6 | import re | ||
7 | import sys | ||
8 | import io | ||
9 | from sys import stderr | ||
10 | from humanize import naturalsize | ||
11 | |||
12 | from tempfile import TemporaryDirectory | ||
13 | |||
14 | from datetime import (datetime, timedelta) | ||
15 | from dateutil.tz import (tzlocal, tzutc) | ||
16 | import dateutil.parser | ||
17 | import argparse | ||
18 | |||
19 | from tqdm import tqdm | ||
20 | |||
21 | from xdg import xdg_runtime_dir | ||
22 | import pathlib | ||
23 | |||
24 | import unshare | ||
25 | from pyprctl import CapState, Cap, cap_ambient_raise, cap_ambient_is_set, set_keepcaps | ||
26 | from pwd import getpwnam | ||
27 | |||
28 | import logging | ||
29 | |||
30 | import signal | ||
31 | import time | ||
32 | import math | ||
33 | |||
34 | from halo import Halo | ||
35 | |||
36 | from collections import deque | ||
37 | |||
38 | import select | ||
39 | import fcntl | ||
40 | |||
41 | from multiprocessing import Process, Manager | ||
42 | from contextlib import closing | ||
43 | |||
44 | |||
45 | halo_args = { | ||
46 | 'stream': stderr, | ||
47 | 'enabled': stderr.isatty(), | ||
48 | 'spinner': 'arc' | ||
49 | } | ||
50 | |||
51 | borg_pwd = getpwnam('borg') | ||
52 | |||
53 | def as_borg(caps=set()): | ||
54 | global logger | ||
55 | |||
56 | try: | ||
57 | if caps: | ||
58 | c_state = CapState.get_current() | ||
59 | c_state.permitted.add(*caps) | ||
60 | c_state.set_current() | ||
61 | |||
62 | # logger.debug("before setgid/setuid: cap_permitted=%s", CapState.get_current().permitted) | ||
63 | |||
64 | set_keepcaps(True) | ||
65 | |||
66 | os.setgid(borg_pwd.pw_gid) | ||
67 | os.setuid(borg_pwd.pw_uid) | ||
68 | |||
69 | if caps: | ||
70 | # logger.debug("after setgid/setuid: cap_permitted=%s", CapState.get_current().permitted) | ||
71 | |||
72 | c_state = CapState.get_current() | ||
73 | c_state.permitted = caps.copy() | ||
74 | c_state.inheritable.add(*caps) | ||
75 | c_state.set_current() | ||
76 | |||
77 | # logger.debug("cap_permitted=%s", CapState.get_current().permitted) | ||
78 | # logger.debug("cap_inheritable=%s", CapState.get_current().inheritable) | ||
79 | |||
80 | for cap in caps: | ||
81 | cap_ambient_raise(cap) | ||
82 | # logger.debug("cap_ambient[%s]=%s", cap, cap_ambient_is_set(cap)) | ||
83 | except Exception: | ||
84 | logger.error(format_exc()) | ||
85 | raise | ||
86 | |||
87 | def borg_json(*args, **kwargs): | ||
88 | global logger | ||
89 | |||
90 | with subprocess.Popen(*args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, **kwargs) as proc: | ||
91 | stdout_buffer = io.BytesIO() | ||
92 | |||
93 | proc_logger = logger.getChild('borg') | ||
94 | stdout_logger = proc_logger.getChild('stdout') | ||
95 | stderr_logger = proc_logger.getChild('stderr') | ||
96 | |||
97 | fcntl.fcntl(proc.stdout.fileno(), fcntl.F_SETFL, fcntl.fcntl(proc.stdout.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK) | ||
98 | fcntl.fcntl(proc.stderr.fileno(), fcntl.F_SETFL, fcntl.fcntl(proc.stderr.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK) | ||
99 | |||
100 | poll = select.poll() | ||
101 | poll.register(proc.stdout, select.POLLIN | select.POLLHUP) | ||
102 | poll.register(proc.stderr, select.POLLIN | select.POLLHUP) | ||
103 | pollc = 2 | ||
104 | events = poll.poll() | ||
105 | stderr_linebuf = bytearray() | ||
106 | |||
107 | while pollc > 0 and len(events) > 0: | ||
108 | for rfd, event in events: | ||
109 | if event & select.POLLIN: | ||
110 | if rfd == proc.stdout.fileno(): | ||
111 | try: | ||
112 | buf = os.read(proc.stdout.fileno(), 8192) | ||
113 | # stdout_logger.debug(buf) | ||
114 | stdout_buffer.write(buf) | ||
115 | except BlockingIOError: | ||
116 | pass | ||
117 | if rfd == proc.stderr.fileno(): | ||
118 | try: | ||
119 | stderr_linebuf.extend(os.read(proc.stderr.fileno(), 8192)) | ||
120 | except BlockingIOError: | ||
121 | pass | ||
122 | |||
123 | while stderr_linebuf: | ||
124 | line, sep, stderr_linebuf = stderr_linebuf.partition(b'\n') | ||
125 | if not sep: | ||
126 | stderr_linebuf = line | ||
127 | break | ||
128 | |||
129 | stderr_logger.info(line.decode()) | ||
130 | if event == select.POLLHUP: | ||
131 | poll.unregister(rfd) | ||
132 | pollc -= 1 | ||
133 | |||
134 | if pollc > 0: | ||
135 | events = poll.poll() | ||
136 | |||
137 | for handler in proc_logger.handlers: | ||
138 | handler.flush() | ||
139 | |||
140 | ret = proc.wait() | ||
141 | if ret != 0: | ||
142 | raise Exception(f'borg subprocess exited with returncode {ret}') | ||
143 | |||
144 | stdout_buffer.seek(0) | ||
145 | return json.load(stdout_buffer) | ||
146 | |||
147 | def read_repo(path): | ||
148 | global logger | ||
149 | |||
150 | with Halo(text=f'Listing {path}', **halo_args) as sp: | ||
151 | if not sp.enabled: | ||
152 | logger.debug('Listing %s...', path) | ||
153 | res = borg_json(['borg', 'list', '--info', '--lock-wait=600', '--json', path], preexec_fn=lambda: as_borg())['archives'] | ||
154 | if sp.enabled: | ||
155 | sp.succeed(f'{len(res)} archives in {path}') | ||
156 | else: | ||
157 | logger.info('%d archives in ‘%s’', len(res), path) | ||
158 | return res | ||
159 | |||
160 | class ToSync: | ||
161 | to_sync = deque() | ||
162 | |||
163 | def __init__(self, source, target): | ||
164 | self.source = source | ||
165 | self.target = target | ||
166 | |||
167 | def __iter__(self): | ||
168 | return self | ||
169 | |||
170 | def __next__(self): | ||
171 | global logger | ||
172 | |||
173 | if self.to_sync: | ||
174 | return self.to_sync.popleft() | ||
175 | |||
176 | while True: | ||
177 | try: | ||
178 | src = read_repo(self.source) | ||
179 | dst = read_repo(self.target) | ||
180 | except (subprocess.CalledProcessError, json.decoder.JSONDecodeError) as err: | ||
181 | logger.error(err) | ||
182 | continue | ||
183 | |||
184 | self.to_sync.extend([entry for entry in src if entry['name'] not in {dst_entry['name'] for dst_entry in dst} and not entry['name'].endswith('.checkpoint')]) | ||
185 | |||
186 | if self.to_sync: | ||
187 | return self.to_sync.popleft() | ||
188 | |||
189 | raise StopIteration | ||
190 | |||
191 | def copy_archive(src_repo_path, dst_repo_path, entry): | ||
192 | global logger | ||
193 | |||
194 | def do_copy(tmpdir_q): | ||
195 | global logger | ||
196 | |||
197 | nonlocal src_repo_path, dst_repo_path, entry | ||
198 | |||
199 | tmpdir = tmpdir_q.get() | ||
200 | |||
201 | cache_suffix = None | ||
202 | with Halo(text=f'Determine archive parameters', **halo_args) as sp: | ||
203 | if not sp.enabled: | ||
204 | logger.debug('Determining archive parameters...') | ||
205 | match = re.compile('^(.*)-[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(\.(checkpoint|recreate)(\.[0-9]+)?)?').fullmatch(entry['name']) | ||
206 | if match: | ||
207 | repo_id = borg_json(['borg', 'info', '--info', '--lock-wait=600', '--json', src_repo_path], preexec_fn=lambda: as_borg())['repository']['id'] | ||
208 | |||
209 | if repo_id: | ||
210 | cache_suffix = f'{repo_id}_{match.group(1)}' | ||
211 | if sp.enabled: | ||
212 | sp.succeed(f'Will process {entry["name"]} ({dateutil.parser.isoparse(entry["start"])}, cache_suffix={cache_suffix})') | ||
213 | else: | ||
214 | logger.info('Will process ‘%s’ (%s, cache_suffix=%s)', entry['name'], dateutil.parser.isoparse(entry['start']), cache_suffix) | ||
215 | |||
216 | logger.debug('Setting up environment...') | ||
217 | unshare.unshare(unshare.CLONE_NEWNS) | ||
218 | subprocess.run(['mount', '--make-rprivate', '/'], check=True) | ||
219 | chroot = pathlib.Path(tmpdir) / 'chroot' | ||
220 | upper = pathlib.Path(tmpdir) / 'upper' | ||
221 | work = pathlib.Path(tmpdir) / 'work' | ||
222 | for path in [chroot,upper,work]: | ||
223 | path.mkdir() | ||
224 | subprocess.run(['mount', '-t', 'overlay', 'overlay', '-o', f'lowerdir=/,upperdir={upper},workdir={work}', chroot], check=True) | ||
225 | bindMounts = ['nix', 'run', 'run/secrets.d', 'run/wrappers', 'proc', 'dev', 'sys', pathlib.Path(os.path.expanduser('~')).relative_to('/')] | ||
226 | if os.environ.get('BORG_BASE_DIR'): | ||
227 | bindMounts.append(pathlib.Path(os.environ['BORG_BASE_DIR']).relative_to('/')) | ||
228 | if not ":" in src_repo_path: | ||
229 | bindMounts.append(pathlib.Path(src_repo_path).relative_to('/')) | ||
230 | if 'SSH_AUTH_SOCK' in os.environ: | ||
231 | bindMounts.append(pathlib.Path(os.environ['SSH_AUTH_SOCK']).parent.relative_to('/')) | ||
232 | for bindMount in bindMounts: | ||
233 | (chroot / bindMount).mkdir(parents=True,exist_ok=True) | ||
234 | subprocess.run(['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], check=True) | ||
235 | os.chroot(chroot) | ||
236 | os.chdir('/') | ||
237 | try: | ||
238 | os.unlink('/etc/fuse.conf') | ||
239 | except FileNotFoundError: | ||
240 | pass | ||
241 | pathlib.Path('/etc/fuse.conf').parent.mkdir(parents=True,exist_ok=True) | ||
242 | with open('/etc/fuse.conf', 'w') as fuse_conf: | ||
243 | fuse_conf.write('user_allow_other\nmount_max = 1000\n') | ||
244 | dir = pathlib.Path('/borg') | ||
245 | dir.mkdir(parents=True,exist_ok=True,mode=0o0750) | ||
246 | os.chown(dir, borg_pwd.pw_uid, borg_pwd.pw_gid) | ||
247 | |||
248 | total_size = None | ||
249 | total_files = None | ||
250 | if stderr.isatty(): | ||
251 | with Halo(text=f'Determine size', **halo_args) as sp: | ||
252 | stats = borg_json(['borg', 'info', '--info', '--json', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}'], preexec_fn=lambda: as_borg())['archives'][0]['stats'] | ||
253 | total_size = stats['original_size'] | ||
254 | total_files = stats['nfiles'] | ||
255 | if sp.enabled: | ||
256 | sp.succeed(f'{total_files} files, {naturalsize(total_size, binary=True)}') | ||
257 | else: | ||
258 | logger.info('%d files, %s', total_files, naturalsize(total_size, binary=True)) | ||
259 | with subprocess.Popen(['borg', 'mount', '-o', 'allow_other,ignore_permissions', '--foreground', '--progress', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}', dir], preexec_fn=lambda: as_borg()) as mount_proc: | ||
260 | with Halo(text='Waiting for mount', **halo_args) as sp: | ||
261 | if not sp.enabled: | ||
262 | logger.debug('Waiting for mount...') | ||
263 | wait_start = datetime.now() | ||
264 | while True: | ||
265 | if os.path.ismount(dir): | ||
266 | break | ||
267 | elif datetime.now() - wait_start > timedelta(minutes=15): | ||
268 | ret.check_returncode() | ||
269 | time.sleep(0.1) | ||
270 | if sp.enabled: | ||
271 | sp.succeed('Mounted') | ||
272 | else: | ||
273 | logger.info('Mounted %s', f'{src_repo_path}::{entry["name"]}') | ||
274 | |||
275 | while True: | ||
276 | with tqdm(total=total_size, unit_scale=True, unit_divisor=1024, unit='B', smoothing=0.01, disable=None, dynamic_ncols=True, maxinterval=0.5, miniters=1) as progress: | ||
277 | seen = 0 | ||
278 | env = os.environ.copy() | ||
279 | create_args = ['borg', | ||
280 | 'create', | ||
281 | '--lock-wait=600', | ||
282 | '--one-file-system', | ||
283 | '--compression=auto,zstd,10', | ||
284 | '--chunker-params=10,23,16,4095', | ||
285 | '--files-cache=ctime,size', | ||
286 | '--show-rc', | ||
287 | '--upload-buffer=100', | ||
288 | '--upload-ratelimit=20480', | ||
289 | '--log-json', | ||
290 | '--progress', | ||
291 | '--list', | ||
292 | '--filter=AMEi-x?', | ||
293 | '--stats' | ||
294 | ] | ||
295 | archive_time = datetime.strptime(entry["time"], "%Y-%m-%dT%H:%M:%S.%f").replace(tzinfo=tzlocal()).astimezone(tzutc()) | ||
296 | create_args += [f'--timestamp={archive_time.strftime("%Y-%m-%dT%H:%M:%S")}'] | ||
297 | if cache_suffix: | ||
298 | env['BORG_FILES_CACHE_SUFFIX'] = cache_suffix | ||
299 | else: | ||
300 | create_args += ['--files-cache=disabled'] | ||
301 | create_args += [f'{dst_repo_path}::{entry["name"]}', '.'] | ||
302 | |||
303 | with subprocess.Popen(create_args, stdin=subprocess.DEVNULL, stderr=subprocess.PIPE, stdout=subprocess.PIPE, env=env, preexec_fn=lambda: as_borg(caps={Cap.DAC_READ_SEARCH}), cwd=dir) as proc: | ||
304 | last_list = None | ||
305 | last_list_time = time.monotonic_ns() | ||
306 | logger.info('Creating...') | ||
307 | |||
308 | proc_logger = logger.getChild('borg') | ||
309 | stdout_logger = proc_logger.getChild('stdout') | ||
310 | stderr_logger = proc_logger.getChild('stderr') | ||
311 | |||
312 | fcntl.fcntl(proc.stdout.fileno(), fcntl.F_SETFL, fcntl.fcntl(proc.stdout.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK) | ||
313 | fcntl.fcntl(proc.stderr.fileno(), fcntl.F_SETFL, fcntl.fcntl(proc.stderr.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK) | ||
314 | |||
315 | poll = select.poll() | ||
316 | poll.register(proc.stdout, select.POLLIN | select.POLLHUP) | ||
317 | poll.register(proc.stderr, select.POLLIN | select.POLLHUP) | ||
318 | pollc = 2 | ||
319 | events = poll.poll() | ||
320 | stdout_linebuf = bytearray() | ||
321 | stderr_linebuf = bytearray() | ||
322 | |||
323 | while pollc > 0 and len(events) > 0: | ||
324 | # logger.debug('%d events', len(events)) | ||
325 | for rfd, event in events: | ||
326 | # logger.debug('event %s', event) | ||
327 | if event & select.POLLIN: | ||
328 | if rfd == proc.stdout.fileno(): | ||
329 | try: | ||
330 | # logger.debug('reading stdout...') | ||
331 | stdout_linebuf.extend(os.read(proc.stdout.fileno(), 8192)) | ||
332 | # logger.debug('read stdout, len(stdout_linebuf)=%d', len(stdout_linebuf)) | ||
333 | except BlockingIOError: | ||
334 | pass | ||
335 | |||
336 | while stdout_linebuf: | ||
337 | # logger.debug('stdout line...') | ||
338 | line, sep, stdout_linebuf = stdout_linebuf.partition(b'\n') | ||
339 | if not sep: | ||
340 | stdout_linebuf = line | ||
341 | break | ||
342 | |||
343 | stdout_logger.info(line.decode()) | ||
344 | # logger.debug('handled stdout lines, %d leftover', len(stdout_linebuf)) | ||
345 | if rfd == proc.stderr.fileno(): | ||
346 | try: | ||
347 | # logger.debug('reading stderr...') | ||
348 | stderr_linebuf.extend(os.read(proc.stderr.fileno(), 8192)) | ||
349 | # logger.debug('read stderr, len(stderr_linebuf)=%d', len(stderr_linebuf)) | ||
350 | except BlockingIOError: | ||
351 | pass | ||
352 | |||
353 | while stderr_linebuf: | ||
354 | # logger.debug('stderr line...') | ||
355 | line, sep, stderr_linebuf = stderr_linebuf.partition(b'\n') | ||
356 | if not sep: | ||
357 | stderr_linebuf = line | ||
358 | break | ||
359 | |||
360 | try: | ||
361 | json_line = json.loads(line) | ||
362 | except json.decoder.JSONDecodeError: | ||
363 | if progress.disable: | ||
364 | stderr_logger.error(line.decode()) | ||
365 | else: | ||
366 | tqdm.write(line.decode()) | ||
367 | continue | ||
368 | |||
369 | # logger.debug('stderr line decoded: %s', json_line['type'] if 'type' in json_line else None) | ||
370 | |||
371 | t = '' | ||
372 | if 'time' in json_line and not progress.disable: | ||
373 | ts = datetime.fromtimestamp(json_line['time']).replace(tzinfo=tzlocal()) | ||
374 | t = f'{ts.isoformat(timespec="minutes")} ' | ||
375 | if json_line['type'] == 'archive_progress' and not progress.disable: | ||
376 | now = time.monotonic_ns() | ||
377 | if last_list_time is None or now - last_list_time >= 3e9: | ||
378 | last_list_time = now | ||
379 | if 'path' in json_line and json_line['path']: | ||
380 | progress.set_description(f'… {json_line["path"]}', refresh=False) | ||
381 | else: | ||
382 | progress.set_description(None, refresh=False) | ||
383 | elif last_list is not None: | ||
384 | progress.set_description(last_list, refresh=False) | ||
385 | nfiles=json_line["nfiles"] | ||
386 | if total_files is not None: | ||
387 | nfiles=f'{json_line["nfiles"]}/{total_files}' | ||
388 | progress.set_postfix(compressed=naturalsize(json_line['compressed_size'], binary=True), deduplicated=naturalsize(json_line['deduplicated_size'], binary=True), nfiles=nfiles, refresh=False) | ||
389 | progress.update(json_line["original_size"] - seen) | ||
390 | seen = json_line["original_size"] | ||
391 | elif json_line['type'] == 'archive_progress': | ||
392 | now = time.monotonic_ns() | ||
393 | if last_list_time is None or now - last_list_time >= 3e9: | ||
394 | last_list_time = now | ||
395 | if 'path' in json_line and json_line['path']: | ||
396 | stderr_logger.debug('… %s (%s)', json_line["path"], naturalsize(json_line["original_size"])) | ||
397 | else: | ||
398 | stderr_logger.debug('… (%s)', naturalsize(json_line["original_size"])) | ||
399 | elif json_line['type'] == 'file_status': | ||
400 | # tqdm.write(t + f'{json_line["status"]} {json_line["path"]}') | ||
401 | last_list = f'{json_line["status"]} {json_line["path"]}' | ||
402 | last_list_time = time.monotonic_ns() | ||
403 | progress.set_description(last_list, refresh=False) | ||
404 | if progress.disable: | ||
405 | stderr_logger.info(last_list) | ||
406 | elif (json_line['type'] == 'log_message' or json_line['type'] == 'progress_message' or json_line['type'] == 'progress_percent') and ('message' in json_line or 'msgid' in json_line): | ||
407 | if 'message' in json_line: | ||
408 | if progress.disable: | ||
409 | stderr_logger.info(t + json_line['message']) | ||
410 | else: | ||
411 | tqdm.write(t + json_line['message']) | ||
412 | elif 'msgid' in json_line: | ||
413 | if progress.disable: | ||
414 | stderr_logger.info(t + json_line['msgid']) | ||
415 | else: | ||
416 | tqdm.write(t + json_line['msgid']) | ||
417 | else: | ||
418 | if progress.disable: | ||
419 | stderr_logger.info(t + line.decode()) | ||
420 | else: | ||
421 | tqdm.write(t + line.decode()) | ||
422 | # logger.debug('handled stderr lines, %d leftover', len(stderr_linebuf)) | ||
423 | if event == select.POLLHUP: | ||
424 | poll.unregister(rfd) | ||
425 | pollc -= 1 | ||
426 | |||
427 | if pollc > 0: | ||
428 | # logger.debug('polling %d fds...', pollc) | ||
429 | events = poll.poll() | ||
430 | # logger.debug('done polling') | ||
431 | |||
432 | # logger.debug('borg create closed stdout/stderr') | ||
433 | if stdout_linebuf: | ||
434 | logger.error('unterminated line leftover in stdout: %s', stdout_linebuf) | ||
435 | if stderr_linebuf: | ||
436 | logger.error('unterminated line leftover in stdout: %s', stderr_linebuf) | ||
437 | progress.set_description(None) | ||
438 | ret = proc.wait() | ||
439 | # logger.debug('borg create terminated; ret=%d', ret) | ||
440 | if ret != 0: | ||
441 | dst = None | ||
442 | try: | ||
443 | dst = read_repo(dst_repo_path) | ||
444 | except (subprocess.CalledProcessError, json.decoder.JSONDecodeError) as err: | ||
445 | logger.error(err) | ||
446 | continue | ||
447 | else: | ||
448 | if any(map(lambda other: entry['name'] == other['name'], dst)): | ||
449 | logger.info('destination exists, terminating') | ||
450 | break | ||
451 | |||
452 | logger.warn('destination does not exist, retrying') | ||
453 | continue | ||
454 | else: | ||
455 | # logger.debug('terminating') | ||
456 | break | ||
457 | mount_proc.terminate() | ||
458 | |||
459 | with Manager() as manager: | ||
460 | tmpdir_q = manager.Queue(1) | ||
461 | |||
462 | with closing(Process(target=do_copy, args=(tmpdir_q,), name='do_copy')) as p: | ||
463 | p.start() | ||
464 | |||
465 | with TemporaryDirectory(prefix=f'borg-mount_{entry["name"]}_', dir=os.environ.get('RUNTIME_DIRECTORY')) as tmpdir: | ||
466 | tmpdir_q.put(tmpdir) | ||
467 | p.join() | ||
468 | return p.exitcode | ||
469 | |||
470 | def sigterm(signum, frame): | ||
471 | raise SystemExit(128 + signum) | ||
472 | |||
473 | def main(): | ||
474 | signal.signal(signal.SIGTERM, sigterm) | ||
475 | |||
476 | global logger | ||
477 | logger = logging.getLogger(__name__) | ||
478 | console_handler = logging.StreamHandler() | ||
479 | console_handler.setFormatter( logging.Formatter('[%(levelname)s](%(name)s): %(message)s') ) | ||
480 | if sys.stderr.isatty(): | ||
481 | console_handler.setFormatter( logging.Formatter('%(asctime)s [%(levelname)s](%(name)s): %(message)s') ) | ||
482 | |||
483 | burst_max = 1000 | ||
484 | burst = burst_max | ||
485 | last_use = None | ||
486 | inv_rate = 1e7 | ||
487 | def consume_filter(record): | ||
488 | nonlocal burst, burst_max, inv_rate, last_use | ||
489 | |||
490 | delay = None | ||
491 | while True: | ||
492 | now = time.monotonic_ns() | ||
493 | burst = min(burst_max, burst + math.floor((now - last_use) / inv_rate)) if last_use else burst_max | ||
494 | last_use = now | ||
495 | |||
496 | if burst > 0: | ||
497 | burst -= 1 | ||
498 | if delay: | ||
499 | delay = now - delay | ||
500 | |||
501 | return True | ||
502 | |||
503 | if delay is None: | ||
504 | delay = now | ||
505 | time.sleep(inv_rate / 1e9) | ||
506 | console_handler.addFilter(consume_filter) | ||
507 | |||
508 | logging.getLogger().addHandler(console_handler) | ||
509 | |||
510 | # log uncaught exceptions | ||
511 | def log_exceptions(type, value, tb): | ||
512 | global logger | ||
513 | |||
514 | logger.error(value) | ||
515 | sys.__excepthook__(type, value, tb) # calls default excepthook | ||
516 | |||
517 | sys.excepthook = log_exceptions | ||
518 | |||
519 | parser = argparse.ArgumentParser(prog='copy') | ||
520 | parser.add_argument('--verbosity', dest='log_level', action='append', type=int) | ||
521 | parser.add_argument('--verbose', '-v', dest='log_level', action='append_const', const=1) | ||
522 | parser.add_argument('--quiet', '-q', dest='log_level', action='append_const', const=-1) | ||
523 | parser.add_argument('source', metavar='REPO_OR_ARCHIVE') | ||
524 | parser.add_argument('target', metavar='REPO_OR_ARCHIVE') | ||
525 | args = parser.parse_args() | ||
526 | |||
527 | |||
528 | LOG_LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL] | ||
529 | DEFAULT_LOG_LEVEL = logging.ERROR | ||
530 | log_level = LOG_LEVELS.index(DEFAULT_LOG_LEVEL) | ||
531 | |||
532 | for adjustment in args.log_level or (): | ||
533 | log_level = min(len(LOG_LEVELS) - 1, max(log_level - adjustment, 0)) | ||
534 | logger.setLevel(LOG_LEVELS[log_level]) | ||
535 | |||
536 | |||
537 | if "::" in args.source: | ||
538 | (src_repo_path, _, src_archive) = args.source.partition("::") | ||
539 | entry = None | ||
540 | for candidate_entry in read_repo(src_repo_path): | ||
541 | if entry['name'] != src_archive: | ||
542 | continue | ||
543 | entry = candidate_entry | ||
544 | break | ||
545 | |||
546 | if entry is None: | ||
547 | logger.critical("Did not find archive ‘%s’", src_archive) | ||
548 | os.exit(1) | ||
549 | |||
550 | copy_archive(src_repo_path, args.target, entry) | ||
551 | else: | ||
552 | for entry in ToSync(args.source, args.target): | ||
553 | copy_archive(args.source, args.target, entry) | ||
554 | |||
555 | if __name__ == "__main__": | ||
556 | sys.exit(main()) | ||