diff options
| author | Gregor Kleen <gkleen@yggdrasil.li> | 2022-11-07 20:51:39 +0100 |
|---|---|---|
| committer | Gregor Kleen <gkleen@yggdrasil.li> | 2022-11-07 20:51:39 +0100 |
| commit | 0e9f1e85cd8c6f9d546ef88e971043b909017170 (patch) | |
| tree | 5cb4d14df7594ef123f20d82cb2ec423b6bca744 /hosts/vidhar/borg/copy/copy_borg | |
| parent | f563ddece04adfd8d80d4e984405f5c70a6c94f3 (diff) | |
| download | nixos-0e9f1e85cd8c6f9d546ef88e971043b909017170.tar nixos-0e9f1e85cd8c6f9d546ef88e971043b909017170.tar.gz nixos-0e9f1e85cd8c6f9d546ef88e971043b909017170.tar.bz2 nixos-0e9f1e85cd8c6f9d546ef88e971043b909017170.tar.xz nixos-0e9f1e85cd8c6f9d546ef88e971043b909017170.zip | |
...
Diffstat (limited to 'hosts/vidhar/borg/copy/copy_borg')
| -rwxr-xr-x | hosts/vidhar/borg/copy/copy_borg/__main__.py | 556 |
1 files changed, 0 insertions, 556 deletions
diff --git a/hosts/vidhar/borg/copy/copy_borg/__main__.py b/hosts/vidhar/borg/copy/copy_borg/__main__.py deleted file mode 100755 index 5b374d99..00000000 --- a/hosts/vidhar/borg/copy/copy_borg/__main__.py +++ /dev/null | |||
| @@ -1,556 +0,0 @@ | |||
| 1 | #!@python@/bin/python | ||
| 2 | |||
| 3 | import json | ||
| 4 | import os | ||
| 5 | import subprocess | ||
| 6 | import re | ||
| 7 | import sys | ||
| 8 | import io | ||
| 9 | from sys import stderr | ||
| 10 | from humanize import naturalsize | ||
| 11 | |||
| 12 | from tempfile import TemporaryDirectory | ||
| 13 | |||
| 14 | from datetime import (datetime, timedelta) | ||
| 15 | from dateutil.tz import (tzlocal, tzutc) | ||
| 16 | import dateutil.parser | ||
| 17 | import argparse | ||
| 18 | |||
| 19 | from tqdm import tqdm | ||
| 20 | |||
| 21 | from xdg import xdg_runtime_dir | ||
| 22 | import pathlib | ||
| 23 | |||
| 24 | import unshare | ||
| 25 | from pyprctl import CapState, Cap, cap_ambient_raise, cap_ambient_is_set, set_keepcaps | ||
| 26 | from pwd import getpwnam | ||
| 27 | |||
| 28 | import logging | ||
| 29 | |||
| 30 | import signal | ||
| 31 | import time | ||
| 32 | import math | ||
| 33 | |||
| 34 | from halo import Halo | ||
| 35 | |||
| 36 | from collections import deque | ||
| 37 | |||
| 38 | import select | ||
| 39 | import fcntl | ||
| 40 | |||
| 41 | from multiprocessing import Process, Manager | ||
| 42 | from contextlib import closing | ||
| 43 | |||
| 44 | |||
| 45 | halo_args = { | ||
| 46 | 'stream': stderr, | ||
| 47 | 'enabled': stderr.isatty(), | ||
| 48 | 'spinner': 'arc' | ||
| 49 | } | ||
| 50 | |||
| 51 | borg_pwd = getpwnam('borg') | ||
| 52 | |||
| 53 | def as_borg(caps=set()): | ||
| 54 | global logger | ||
| 55 | |||
| 56 | try: | ||
| 57 | if caps: | ||
| 58 | c_state = CapState.get_current() | ||
| 59 | c_state.permitted.add(*caps) | ||
| 60 | c_state.set_current() | ||
| 61 | |||
| 62 | # logger.debug("before setgid/setuid: cap_permitted=%s", CapState.get_current().permitted) | ||
| 63 | |||
| 64 | set_keepcaps(True) | ||
| 65 | |||
| 66 | os.setgid(borg_pwd.pw_gid) | ||
| 67 | os.setuid(borg_pwd.pw_uid) | ||
| 68 | |||
| 69 | if caps: | ||
| 70 | # logger.debug("after setgid/setuid: cap_permitted=%s", CapState.get_current().permitted) | ||
| 71 | |||
| 72 | c_state = CapState.get_current() | ||
| 73 | c_state.permitted = caps.copy() | ||
| 74 | c_state.inheritable.add(*caps) | ||
| 75 | c_state.set_current() | ||
| 76 | |||
| 77 | # logger.debug("cap_permitted=%s", CapState.get_current().permitted) | ||
| 78 | # logger.debug("cap_inheritable=%s", CapState.get_current().inheritable) | ||
| 79 | |||
| 80 | for cap in caps: | ||
| 81 | cap_ambient_raise(cap) | ||
| 82 | # logger.debug("cap_ambient[%s]=%s", cap, cap_ambient_is_set(cap)) | ||
| 83 | except Exception: | ||
| 84 | logger.error(format_exc()) | ||
| 85 | raise | ||
| 86 | |||
| 87 | def borg_json(*args, **kwargs): | ||
| 88 | global logger | ||
| 89 | |||
| 90 | with subprocess.Popen(*args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, **kwargs) as proc: | ||
| 91 | stdout_buffer = io.BytesIO() | ||
| 92 | |||
| 93 | proc_logger = logger.getChild('borg') | ||
| 94 | stdout_logger = proc_logger.getChild('stdout') | ||
| 95 | stderr_logger = proc_logger.getChild('stderr') | ||
| 96 | |||
| 97 | fcntl.fcntl(proc.stdout.fileno(), fcntl.F_SETFL, fcntl.fcntl(proc.stdout.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK) | ||
| 98 | fcntl.fcntl(proc.stderr.fileno(), fcntl.F_SETFL, fcntl.fcntl(proc.stderr.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK) | ||
| 99 | |||
| 100 | poll = select.poll() | ||
| 101 | poll.register(proc.stdout, select.POLLIN | select.POLLHUP) | ||
| 102 | poll.register(proc.stderr, select.POLLIN | select.POLLHUP) | ||
| 103 | pollc = 2 | ||
| 104 | events = poll.poll() | ||
| 105 | stderr_linebuf = bytearray() | ||
| 106 | |||
| 107 | while pollc > 0 and len(events) > 0: | ||
| 108 | for rfd, event in events: | ||
| 109 | if event & select.POLLIN: | ||
| 110 | if rfd == proc.stdout.fileno(): | ||
| 111 | try: | ||
| 112 | buf = os.read(proc.stdout.fileno(), 8192) | ||
| 113 | # stdout_logger.debug(buf) | ||
| 114 | stdout_buffer.write(buf) | ||
| 115 | except BlockingIOError: | ||
| 116 | pass | ||
| 117 | if rfd == proc.stderr.fileno(): | ||
| 118 | try: | ||
| 119 | stderr_linebuf.extend(os.read(proc.stderr.fileno(), 8192)) | ||
| 120 | except BlockingIOError: | ||
| 121 | pass | ||
| 122 | |||
| 123 | while stderr_linebuf: | ||
| 124 | line, sep, stderr_linebuf = stderr_linebuf.partition(b'\n') | ||
| 125 | if not sep: | ||
| 126 | stderr_linebuf = line | ||
| 127 | break | ||
| 128 | |||
| 129 | stderr_logger.info(line.decode()) | ||
| 130 | if event == select.POLLHUP: | ||
| 131 | poll.unregister(rfd) | ||
| 132 | pollc -= 1 | ||
| 133 | |||
| 134 | if pollc > 0: | ||
| 135 | events = poll.poll() | ||
| 136 | |||
| 137 | for handler in proc_logger.handlers: | ||
| 138 | handler.flush() | ||
| 139 | |||
| 140 | ret = proc.wait() | ||
| 141 | if ret != 0: | ||
| 142 | raise Exception(f'borg subprocess exited with returncode {ret}') | ||
| 143 | |||
| 144 | stdout_buffer.seek(0) | ||
| 145 | return json.load(stdout_buffer) | ||
| 146 | |||
| 147 | def read_repo(path): | ||
| 148 | global logger | ||
| 149 | |||
| 150 | with Halo(text=f'Listing {path}', **halo_args) as sp: | ||
| 151 | if not sp.enabled: | ||
| 152 | logger.debug('Listing %s...', path) | ||
| 153 | res = borg_json(['borg', 'list', '--info', '--lock-wait=600', '--json', path], preexec_fn=lambda: as_borg())['archives'] | ||
| 154 | if sp.enabled: | ||
| 155 | sp.succeed(f'{len(res)} archives in {path}') | ||
| 156 | else: | ||
| 157 | logger.info('%d archives in ‘%s’', len(res), path) | ||
| 158 | return res | ||
| 159 | |||
| 160 | class ToSync: | ||
| 161 | to_sync = deque() | ||
| 162 | |||
| 163 | def __init__(self, source, target): | ||
| 164 | self.source = source | ||
| 165 | self.target = target | ||
| 166 | |||
| 167 | def __iter__(self): | ||
| 168 | return self | ||
| 169 | |||
| 170 | def __next__(self): | ||
| 171 | global logger | ||
| 172 | |||
| 173 | if self.to_sync: | ||
| 174 | return self.to_sync.popleft() | ||
| 175 | |||
| 176 | while True: | ||
| 177 | try: | ||
| 178 | src = read_repo(self.source) | ||
| 179 | dst = read_repo(self.target) | ||
| 180 | except (subprocess.CalledProcessError, json.decoder.JSONDecodeError) as err: | ||
| 181 | logger.error(err) | ||
| 182 | continue | ||
| 183 | |||
| 184 | self.to_sync.extend([entry for entry in src if entry['name'] not in {dst_entry['name'] for dst_entry in dst} and not entry['name'].endswith('.checkpoint')]) | ||
| 185 | |||
| 186 | if self.to_sync: | ||
| 187 | return self.to_sync.popleft() | ||
| 188 | |||
| 189 | raise StopIteration | ||
| 190 | |||
| 191 | def copy_archive(src_repo_path, dst_repo_path, entry): | ||
| 192 | global logger | ||
| 193 | |||
| 194 | def do_copy(tmpdir_q): | ||
| 195 | global logger | ||
| 196 | |||
| 197 | nonlocal src_repo_path, dst_repo_path, entry | ||
| 198 | |||
| 199 | tmpdir = tmpdir_q.get() | ||
| 200 | |||
| 201 | cache_suffix = None | ||
| 202 | with Halo(text=f'Determine archive parameters', **halo_args) as sp: | ||
| 203 | if not sp.enabled: | ||
| 204 | logger.debug('Determining archive parameters...') | ||
| 205 | match = re.compile('^(.*)-[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(\.(checkpoint|recreate)(\.[0-9]+)?)?').fullmatch(entry['name']) | ||
| 206 | if match: | ||
| 207 | repo_id = borg_json(['borg', 'info', '--info', '--lock-wait=600', '--json', src_repo_path], preexec_fn=lambda: as_borg())['repository']['id'] | ||
| 208 | |||
| 209 | if repo_id: | ||
| 210 | cache_suffix = f'{repo_id}_{match.group(1)}' | ||
| 211 | if sp.enabled: | ||
| 212 | sp.succeed(f'Will process {entry["name"]} ({dateutil.parser.isoparse(entry["start"])}, cache_suffix={cache_suffix})') | ||
| 213 | else: | ||
| 214 | logger.info('Will process ‘%s’ (%s, cache_suffix=%s)', entry['name'], dateutil.parser.isoparse(entry['start']), cache_suffix) | ||
| 215 | |||
| 216 | logger.debug('Setting up environment...') | ||
| 217 | unshare.unshare(unshare.CLONE_NEWNS) | ||
| 218 | subprocess.run(['mount', '--make-rprivate', '/'], check=True) | ||
| 219 | chroot = pathlib.Path(tmpdir) / 'chroot' | ||
| 220 | upper = pathlib.Path(tmpdir) / 'upper' | ||
| 221 | work = pathlib.Path(tmpdir) / 'work' | ||
| 222 | for path in [chroot,upper,work]: | ||
| 223 | path.mkdir() | ||
| 224 | subprocess.run(['mount', '-t', 'overlay', 'overlay', '-o', f'lowerdir=/,upperdir={upper},workdir={work}', chroot], check=True) | ||
| 225 | bindMounts = ['nix', 'run', 'run/secrets.d', 'run/wrappers', 'proc', 'dev', 'sys', pathlib.Path(os.path.expanduser('~')).relative_to('/')] | ||
| 226 | if os.environ.get('BORG_BASE_DIR'): | ||
| 227 | bindMounts.append(pathlib.Path(os.environ['BORG_BASE_DIR']).relative_to('/')) | ||
| 228 | if not ":" in src_repo_path: | ||
| 229 | bindMounts.append(pathlib.Path(src_repo_path).relative_to('/')) | ||
| 230 | if 'SSH_AUTH_SOCK' in os.environ: | ||
| 231 | bindMounts.append(pathlib.Path(os.environ['SSH_AUTH_SOCK']).parent.relative_to('/')) | ||
| 232 | for bindMount in bindMounts: | ||
| 233 | (chroot / bindMount).mkdir(parents=True,exist_ok=True) | ||
| 234 | subprocess.run(['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], check=True) | ||
| 235 | os.chroot(chroot) | ||
| 236 | os.chdir('/') | ||
| 237 | try: | ||
| 238 | os.unlink('/etc/fuse.conf') | ||
| 239 | except FileNotFoundError: | ||
| 240 | pass | ||
| 241 | pathlib.Path('/etc/fuse.conf').parent.mkdir(parents=True,exist_ok=True) | ||
| 242 | with open('/etc/fuse.conf', 'w') as fuse_conf: | ||
| 243 | fuse_conf.write('user_allow_other\nmount_max = 1000\n') | ||
| 244 | dir = pathlib.Path('/borg') | ||
| 245 | dir.mkdir(parents=True,exist_ok=True,mode=0o0750) | ||
| 246 | os.chown(dir, borg_pwd.pw_uid, borg_pwd.pw_gid) | ||
| 247 | |||
| 248 | total_size = None | ||
| 249 | total_files = None | ||
| 250 | if stderr.isatty(): | ||
| 251 | with Halo(text=f'Determine size', **halo_args) as sp: | ||
| 252 | stats = borg_json(['borg', 'info', '--info', '--json', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}'], preexec_fn=lambda: as_borg())['archives'][0]['stats'] | ||
| 253 | total_size = stats['original_size'] | ||
| 254 | total_files = stats['nfiles'] | ||
| 255 | if sp.enabled: | ||
| 256 | sp.succeed(f'{total_files} files, {naturalsize(total_size, binary=True)}') | ||
| 257 | else: | ||
| 258 | logger.info('%d files, %s', total_files, naturalsize(total_size, binary=True)) | ||
| 259 | with subprocess.Popen(['borg', 'mount', '-o', 'allow_other,ignore_permissions', '--foreground', '--progress', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}', dir], preexec_fn=lambda: as_borg()) as mount_proc: | ||
| 260 | with Halo(text='Waiting for mount', **halo_args) as sp: | ||
| 261 | if not sp.enabled: | ||
| 262 | logger.debug('Waiting for mount...') | ||
| 263 | wait_start = datetime.now() | ||
| 264 | while True: | ||
| 265 | if os.path.ismount(dir): | ||
| 266 | break | ||
| 267 | elif datetime.now() - wait_start > timedelta(minutes=15): | ||
| 268 | ret.check_returncode() | ||
| 269 | time.sleep(0.1) | ||
| 270 | if sp.enabled: | ||
| 271 | sp.succeed('Mounted') | ||
| 272 | else: | ||
| 273 | logger.info('Mounted %s', f'{src_repo_path}::{entry["name"]}') | ||
| 274 | |||
| 275 | while True: | ||
| 276 | with tqdm(total=total_size, unit_scale=True, unit_divisor=1024, unit='B', smoothing=0.01, disable=None, dynamic_ncols=True, maxinterval=0.5, miniters=1) as progress: | ||
| 277 | seen = 0 | ||
| 278 | env = os.environ.copy() | ||
| 279 | create_args = ['borg', | ||
| 280 | 'create', | ||
| 281 | '--lock-wait=600', | ||
| 282 | '--one-file-system', | ||
| 283 | '--compression=auto,zstd,10', | ||
| 284 | '--chunker-params=10,23,16,4095', | ||
| 285 | '--files-cache=ctime,size', | ||
| 286 | '--show-rc', | ||
| 287 | '--upload-buffer=100', | ||
| 288 | '--upload-ratelimit=20480', | ||
| 289 | '--log-json', | ||
| 290 | '--progress', | ||
| 291 | '--list', | ||
| 292 | '--filter=AMEi-x?', | ||
| 293 | '--stats' | ||
| 294 | ] | ||
| 295 | archive_time = datetime.strptime(entry["time"], "%Y-%m-%dT%H:%M:%S.%f").replace(tzinfo=tzlocal()).astimezone(tzutc()) | ||
| 296 | create_args += [f'--timestamp={archive_time.strftime("%Y-%m-%dT%H:%M:%S")}'] | ||
| 297 | if cache_suffix: | ||
| 298 | env['BORG_FILES_CACHE_SUFFIX'] = cache_suffix | ||
| 299 | else: | ||
| 300 | create_args += ['--files-cache=disabled'] | ||
| 301 | create_args += [f'{dst_repo_path}::{entry["name"]}', '.'] | ||
| 302 | |||
| 303 | with subprocess.Popen(create_args, stdin=subprocess.DEVNULL, stderr=subprocess.PIPE, stdout=subprocess.PIPE, env=env, preexec_fn=lambda: as_borg(caps={Cap.DAC_READ_SEARCH}), cwd=dir) as proc: | ||
| 304 | last_list = None | ||
| 305 | last_list_time = time.monotonic_ns() | ||
| 306 | logger.info('Creating...') | ||
| 307 | |||
| 308 | proc_logger = logger.getChild('borg') | ||
| 309 | stdout_logger = proc_logger.getChild('stdout') | ||
| 310 | stderr_logger = proc_logger.getChild('stderr') | ||
| 311 | |||
| 312 | fcntl.fcntl(proc.stdout.fileno(), fcntl.F_SETFL, fcntl.fcntl(proc.stdout.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK) | ||
| 313 | fcntl.fcntl(proc.stderr.fileno(), fcntl.F_SETFL, fcntl.fcntl(proc.stderr.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK) | ||
| 314 | |||
| 315 | poll = select.poll() | ||
| 316 | poll.register(proc.stdout, select.POLLIN | select.POLLHUP) | ||
| 317 | poll.register(proc.stderr, select.POLLIN | select.POLLHUP) | ||
| 318 | pollc = 2 | ||
| 319 | events = poll.poll() | ||
| 320 | stdout_linebuf = bytearray() | ||
| 321 | stderr_linebuf = bytearray() | ||
| 322 | |||
| 323 | while pollc > 0 and len(events) > 0: | ||
| 324 | # logger.debug('%d events', len(events)) | ||
| 325 | for rfd, event in events: | ||
| 326 | # logger.debug('event %s', event) | ||
| 327 | if event & select.POLLIN: | ||
| 328 | if rfd == proc.stdout.fileno(): | ||
| 329 | try: | ||
| 330 | # logger.debug('reading stdout...') | ||
| 331 | stdout_linebuf.extend(os.read(proc.stdout.fileno(), 8192)) | ||
| 332 | # logger.debug('read stdout, len(stdout_linebuf)=%d', len(stdout_linebuf)) | ||
| 333 | except BlockingIOError: | ||
| 334 | pass | ||
| 335 | |||
| 336 | while stdout_linebuf: | ||
| 337 | # logger.debug('stdout line...') | ||
| 338 | line, sep, stdout_linebuf = stdout_linebuf.partition(b'\n') | ||
| 339 | if not sep: | ||
| 340 | stdout_linebuf = line | ||
| 341 | break | ||
| 342 | |||
| 343 | stdout_logger.info(line.decode()) | ||
| 344 | # logger.debug('handled stdout lines, %d leftover', len(stdout_linebuf)) | ||
| 345 | if rfd == proc.stderr.fileno(): | ||
| 346 | try: | ||
| 347 | # logger.debug('reading stderr...') | ||
| 348 | stderr_linebuf.extend(os.read(proc.stderr.fileno(), 8192)) | ||
| 349 | # logger.debug('read stderr, len(stderr_linebuf)=%d', len(stderr_linebuf)) | ||
| 350 | except BlockingIOError: | ||
| 351 | pass | ||
| 352 | |||
| 353 | while stderr_linebuf: | ||
| 354 | # logger.debug('stderr line...') | ||
| 355 | line, sep, stderr_linebuf = stderr_linebuf.partition(b'\n') | ||
| 356 | if not sep: | ||
| 357 | stderr_linebuf = line | ||
| 358 | break | ||
| 359 | |||
| 360 | try: | ||
| 361 | json_line = json.loads(line) | ||
| 362 | except json.decoder.JSONDecodeError: | ||
| 363 | if progress.disable: | ||
| 364 | stderr_logger.error(line.decode()) | ||
| 365 | else: | ||
| 366 | tqdm.write(line.decode()) | ||
| 367 | continue | ||
| 368 | |||
| 369 | # logger.debug('stderr line decoded: %s', json_line['type'] if 'type' in json_line else None) | ||
| 370 | |||
| 371 | t = '' | ||
| 372 | if 'time' in json_line and not progress.disable: | ||
| 373 | ts = datetime.fromtimestamp(json_line['time']).replace(tzinfo=tzlocal()) | ||
| 374 | t = f'{ts.isoformat(timespec="minutes")} ' | ||
| 375 | if json_line['type'] == 'archive_progress' and not progress.disable: | ||
| 376 | now = time.monotonic_ns() | ||
| 377 | if last_list_time is None or now - last_list_time >= 3e9: | ||
| 378 | last_list_time = now | ||
| 379 | if 'path' in json_line and json_line['path']: | ||
| 380 | progress.set_description(f'… {json_line["path"]}', refresh=False) | ||
| 381 | else: | ||
| 382 | progress.set_description(None, refresh=False) | ||
| 383 | elif last_list is not None: | ||
| 384 | progress.set_description(last_list, refresh=False) | ||
| 385 | nfiles=json_line["nfiles"] | ||
| 386 | if total_files is not None: | ||
| 387 | nfiles=f'{json_line["nfiles"]}/{total_files}' | ||
| 388 | progress.set_postfix(compressed=naturalsize(json_line['compressed_size'], binary=True), deduplicated=naturalsize(json_line['deduplicated_size'], binary=True), nfiles=nfiles, refresh=False) | ||
| 389 | progress.update(json_line["original_size"] - seen) | ||
| 390 | seen = json_line["original_size"] | ||
| 391 | elif json_line['type'] == 'archive_progress': | ||
| 392 | now = time.monotonic_ns() | ||
| 393 | if last_list_time is None or now - last_list_time >= 3e9: | ||
| 394 | last_list_time = now | ||
| 395 | if 'path' in json_line and json_line['path']: | ||
| 396 | stderr_logger.debug('… %s (%s)', json_line["path"], naturalsize(json_line["original_size"])) | ||
| 397 | else: | ||
| 398 | stderr_logger.debug('… (%s)', naturalsize(json_line["original_size"])) | ||
| 399 | elif json_line['type'] == 'file_status': | ||
| 400 | # tqdm.write(t + f'{json_line["status"]} {json_line["path"]}') | ||
| 401 | last_list = f'{json_line["status"]} {json_line["path"]}' | ||
| 402 | last_list_time = time.monotonic_ns() | ||
| 403 | progress.set_description(last_list, refresh=False) | ||
| 404 | if progress.disable: | ||
| 405 | stderr_logger.info(last_list) | ||
| 406 | elif (json_line['type'] == 'log_message' or json_line['type'] == 'progress_message' or json_line['type'] == 'progress_percent') and ('message' in json_line or 'msgid' in json_line): | ||
| 407 | if 'message' in json_line: | ||
| 408 | if progress.disable: | ||
| 409 | stderr_logger.info(t + json_line['message']) | ||
| 410 | else: | ||
| 411 | tqdm.write(t + json_line['message']) | ||
| 412 | elif 'msgid' in json_line: | ||
| 413 | if progress.disable: | ||
| 414 | stderr_logger.info(t + json_line['msgid']) | ||
| 415 | else: | ||
| 416 | tqdm.write(t + json_line['msgid']) | ||
| 417 | else: | ||
| 418 | if progress.disable: | ||
| 419 | stderr_logger.info(t + line.decode()) | ||
| 420 | else: | ||
| 421 | tqdm.write(t + line.decode()) | ||
| 422 | # logger.debug('handled stderr lines, %d leftover', len(stderr_linebuf)) | ||
| 423 | if event == select.POLLHUP: | ||
| 424 | poll.unregister(rfd) | ||
| 425 | pollc -= 1 | ||
| 426 | |||
| 427 | if pollc > 0: | ||
| 428 | # logger.debug('polling %d fds...', pollc) | ||
| 429 | events = poll.poll() | ||
| 430 | # logger.debug('done polling') | ||
| 431 | |||
| 432 | # logger.debug('borg create closed stdout/stderr') | ||
| 433 | if stdout_linebuf: | ||
| 434 | logger.error('unterminated line leftover in stdout: %s', stdout_linebuf) | ||
| 435 | if stderr_linebuf: | ||
| 436 | logger.error('unterminated line leftover in stdout: %s', stderr_linebuf) | ||
| 437 | progress.set_description(None) | ||
| 438 | ret = proc.wait() | ||
| 439 | # logger.debug('borg create terminated; ret=%d', ret) | ||
| 440 | if ret != 0: | ||
| 441 | dst = None | ||
| 442 | try: | ||
| 443 | dst = read_repo(dst_repo_path) | ||
| 444 | except (subprocess.CalledProcessError, json.decoder.JSONDecodeError) as err: | ||
| 445 | logger.error(err) | ||
| 446 | continue | ||
| 447 | else: | ||
| 448 | if any(map(lambda other: entry['name'] == other['name'], dst)): | ||
| 449 | logger.info('destination exists, terminating') | ||
| 450 | break | ||
| 451 | |||
| 452 | logger.warn('destination does not exist, retrying') | ||
| 453 | continue | ||
| 454 | else: | ||
| 455 | # logger.debug('terminating') | ||
| 456 | break | ||
| 457 | mount_proc.terminate() | ||
| 458 | |||
| 459 | with Manager() as manager: | ||
| 460 | tmpdir_q = manager.Queue(1) | ||
| 461 | |||
| 462 | with closing(Process(target=do_copy, args=(tmpdir_q,), name='do_copy')) as p: | ||
| 463 | p.start() | ||
| 464 | |||
| 465 | with TemporaryDirectory(prefix=f'borg-mount_{entry["name"]}_', dir=os.environ.get('RUNTIME_DIRECTORY')) as tmpdir: | ||
| 466 | tmpdir_q.put(tmpdir) | ||
| 467 | p.join() | ||
| 468 | return p.exitcode | ||
| 469 | |||
| 470 | def sigterm(signum, frame): | ||
| 471 | raise SystemExit(128 + signum) | ||
| 472 | |||
| 473 | def main(): | ||
| 474 | signal.signal(signal.SIGTERM, sigterm) | ||
| 475 | |||
| 476 | global logger | ||
| 477 | logger = logging.getLogger(__name__) | ||
| 478 | console_handler = logging.StreamHandler() | ||
| 479 | console_handler.setFormatter( logging.Formatter('[%(levelname)s](%(name)s): %(message)s') ) | ||
| 480 | if sys.stderr.isatty(): | ||
| 481 | console_handler.setFormatter( logging.Formatter('%(asctime)s [%(levelname)s](%(name)s): %(message)s') ) | ||
| 482 | |||
| 483 | burst_max = 1000 | ||
| 484 | burst = burst_max | ||
| 485 | last_use = None | ||
| 486 | inv_rate = 1e7 | ||
| 487 | def consume_filter(record): | ||
| 488 | nonlocal burst, burst_max, inv_rate, last_use | ||
| 489 | |||
| 490 | delay = None | ||
| 491 | while True: | ||
| 492 | now = time.monotonic_ns() | ||
| 493 | burst = min(burst_max, burst + math.floor((now - last_use) / inv_rate)) if last_use else burst_max | ||
| 494 | last_use = now | ||
| 495 | |||
| 496 | if burst > 0: | ||
| 497 | burst -= 1 | ||
| 498 | if delay: | ||
| 499 | delay = now - delay | ||
| 500 | |||
| 501 | return True | ||
| 502 | |||
| 503 | if delay is None: | ||
| 504 | delay = now | ||
| 505 | time.sleep(inv_rate / 1e9) | ||
| 506 | console_handler.addFilter(consume_filter) | ||
| 507 | |||
| 508 | logging.getLogger().addHandler(console_handler) | ||
| 509 | |||
| 510 | # log uncaught exceptions | ||
| 511 | def log_exceptions(type, value, tb): | ||
| 512 | global logger | ||
| 513 | |||
| 514 | logger.error(value) | ||
| 515 | sys.__excepthook__(type, value, tb) # calls default excepthook | ||
| 516 | |||
| 517 | sys.excepthook = log_exceptions | ||
| 518 | |||
| 519 | parser = argparse.ArgumentParser(prog='copy') | ||
| 520 | parser.add_argument('--verbosity', dest='log_level', action='append', type=int) | ||
| 521 | parser.add_argument('--verbose', '-v', dest='log_level', action='append_const', const=1) | ||
| 522 | parser.add_argument('--quiet', '-q', dest='log_level', action='append_const', const=-1) | ||
| 523 | parser.add_argument('source', metavar='REPO_OR_ARCHIVE') | ||
| 524 | parser.add_argument('target', metavar='REPO_OR_ARCHIVE') | ||
| 525 | args = parser.parse_args() | ||
| 526 | |||
| 527 | |||
| 528 | LOG_LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL] | ||
| 529 | DEFAULT_LOG_LEVEL = logging.ERROR | ||
| 530 | log_level = LOG_LEVELS.index(DEFAULT_LOG_LEVEL) | ||
| 531 | |||
| 532 | for adjustment in args.log_level or (): | ||
| 533 | log_level = min(len(LOG_LEVELS) - 1, max(log_level - adjustment, 0)) | ||
| 534 | logger.setLevel(LOG_LEVELS[log_level]) | ||
| 535 | |||
| 536 | |||
| 537 | if "::" in args.source: | ||
| 538 | (src_repo_path, _, src_archive) = args.source.partition("::") | ||
| 539 | entry = None | ||
| 540 | for candidate_entry in read_repo(src_repo_path): | ||
| 541 | if entry['name'] != src_archive: | ||
| 542 | continue | ||
| 543 | entry = candidate_entry | ||
| 544 | break | ||
| 545 | |||
| 546 | if entry is None: | ||
| 547 | logger.critical("Did not find archive ‘%s’", src_archive) | ||
| 548 | os.exit(1) | ||
| 549 | |||
| 550 | copy_archive(src_repo_path, args.target, entry) | ||
| 551 | else: | ||
| 552 | for entry in ToSync(args.source, args.target): | ||
| 553 | copy_archive(args.source, args.target, entry) | ||
| 554 | |||
| 555 | if __name__ == "__main__": | ||
| 556 | sys.exit(main()) | ||
