#!@python@/bin/python import json import os import subprocess import re import sys from sys import stderr from humanize import naturalsize from tempfile import TemporaryDirectory from datetime import (datetime, timedelta) from dateutil.tz import (tzlocal, tzutc) import dateutil.parser import argparse from tqdm import tqdm from xdg import xdg_runtime_dir import pathlib import unshare import pyprctl import signal from time import sleep from halo import Halo parser = argparse.ArgumentParser() parser.add_argument('source', metavar='REPO_OR_ARCHIVE') parser.add_argument('target', metavar='REPO_OR_ARCHIVE') args = parser.parse_args() halo_args = { 'stream': stderr, 'enabled': stderr.isatty(), 'spinner': 'arc' } def read_repo(path): with Halo(text=f'Listing {path}', **halo_args) as sp: res = None with subprocess.Popen(['borg', 'list', '--info', '--lock-wait=600', '--json', path], stdout=subprocess.PIPE) as proc: res = json.load(proc.stdout)['archives'] if sp.enabled: sp.succeed(f'{len(res)} archives in {path}') else: print(f'{len(res)} archives in {path}', file=stderr) return res class ToSync: to_sync = [] def __iter__(self): return self def __next__(self): if self.to_sync: return self.to_sync.pop() while True: try: src = read_repo(args.source) dst = read_repo(args.target) except (subprocess.CalledProcessError, json.decoder.JSONDecodeError) as err: print(err, file=stderr) continue self.to_sync = [entry for entry in src if entry['name'] not in {dst_entry['name'] for dst_entry in dst} and not entry['name'].endswith('.checkpoint')] if self.to_sync: return self.to_sync.pop() raise StopIteration def copy_archive(src_repo_path, dst_repo_path, entry): cache_suffix = None with Halo(text=f'Determine archive parameters', **halo_args) as sp: match = re.compile('^(.*)-[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(\.(checkpoint|recreate)(\.[0-9]+)?)?').fullmatch(entry['name']) if match: repo_id = None with subprocess.Popen(['borg', 'info', '--info', '--lock-wait=600', '--json', src_repo_path], stdout=subprocess.PIPE) as proc: repo_id = json.load(proc.stdout)['repository']['id'] if repo_id: cache_suffix = f'{repo_id}_{match.group(1)}' if sp.enabled: sp.succeed(f'Will process {entry["name"]} ({dateutil.parser.isoparse(entry["start"])}, cache_suffix={cache_suffix})') else: print(f'Will process {entry["name"]} ({dateutil.parser.isoparse(entry["start"])}, cache_suffix={cache_suffix})', file=stderr) with TemporaryDirectory(prefix=f'borg-mount_{entry["name"]}_', dir=os.environ.get('RUNTIME_DIRECTORY')) as tmpdir: child = os.fork() if child == 0: # print('unshare/chroot', file=stderr) uid, gid = os.geteuid(), os.getegid() unshare.unshare(unshare.CLONE_NEWNS | unshare.CLONE_NEWUSER) ps_effective = {} # {pyprctl.Cap.SETUID, pyprctl.Cap.SETGID} ps_ambient = {pyprctl.Cap.SYS_ADMIN} pyprctl.cap_permitted.add(*(ps_effective | ps_ambient)) pyprctl.cap_effective.add(*(ps_effective | ps_ambient)) pyprctl.cap_inheritable.add(*ps_ambient) pyprctl.cap_ambient.add(*ps_ambient) with open('/proc/self/setgroups', 'w') as setgroups: setgroups.write('deny') with open('/proc/self/uid_map', 'w') as uid_map: uid_map.write(f'0 {uid} 1') with open('/proc/self/gid_map', 'w') as gid_map: gid_map.write(f'0 {gid} 1') subprocess.run(['mount', '--make-rprivate', '/'], check=True) chroot = pathlib.Path(tmpdir) / 'chroot' upper = pathlib.Path(tmpdir) / 'upper' work = pathlib.Path(tmpdir) / 'work' for path in [chroot,upper,work]: path.mkdir() print(f'euid={os.getuid()}', file=stderr) subprocess.run(['stat', '/', upper, work, chroot], check=True) subprocess.run(['mount', '-t', 'overlay', 'overlay', '-o', f'lowerdir=/,upperdir={upper},workdir={work}', chroot], check=True) bindMounts = ['nix', 'run', 'proc', 'dev', 'sys', pathlib.Path(os.path.expanduser('~')).relative_to('/')] if not ":" in src_repo_path: bindMounts.append(pathlib.Path(src_repo_path).relative_to('/')) if 'SSH_AUTH_SOCK' in os.environ: bindMounts.append(pathlib.Path(os.environ['SSH_AUTH_SOCK']).parent.relative_to('/')) for bindMount in bindMounts: (chroot / bindMount).mkdir(parents=True,exist_ok=True) subprocess.run(['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], check=True) os.chroot(chroot) os.chdir('/') dir = pathlib.Path('/borg') dir.mkdir(parents=True,exist_ok=True) with Halo(text=f'Determine size', **halo_args) as sp: total_size = None total_files = None with subprocess.Popen(['borg', 'info', '--info', '--json', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}'], stdout=subprocess.PIPE, text=True) as proc: stats = json.load(proc.stdout)['archives'][0]['stats'] total_size = stats['original_size'] total_files = stats['nfiles'] if sp.enabled: sp.succeed(f'{total_files} files, {naturalsize(total_size, binary=True)}') else: print(f'{total_files} files, {naturalsize(total_size, binary=True)}', file=stderr) # print(f'Mounting to {dir}', file=stderr) with subprocess.Popen(['borg', 'mount', '--foreground', '--progress', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}', dir]) as mount_proc: with Halo(text='Waiting for mount', **halo_args) as sp: wait_start = datetime.now() while True: ret = subprocess.run(['mountpoint', '-q', dir]) if ret.returncode == 0: break elif datetime.now() - wait_start > timedelta(minutes=10): ret.check_returncode() sleep(0.1) if sp.enabled: sp.succeed('Mounted') else: print('Mounted', file=stderr) while True: try: with tqdm(total=total_size, unit_scale=True, unit_divisor=1024, unit='B', smoothing=0.01, disable=None, dynamic_ncols=True, maxinterval=0.5, miniters=1) as progress: seen = 0 env = os.environ.copy() create_args = ['borg', 'create', '--lock-wait=600', '--one-file-system', '--compression=auto,zstd,10', '--chunker-params=10,23,16,4095', '--files-cache=ctime,size', '--show-rc', # '--remote-ratelimit=20480', '--log-json', '--progress', '--list', '--filter=AMEi-x?', '--stats' ] archive_time = datetime.strptime(entry["time"], "%Y-%m-%dT%H:%M:%S.%f").replace(tzinfo=tzlocal()).astimezone(tzutc()) create_args += [f'--timestamp={archive_time.strftime("%Y-%m-%dT%H:%M:%S")}'] if cache_suffix: env['BORG_FILES_CACHE_SUFFIX'] = cache_suffix else: create_args += ['--files-cache=disabled'] create_args += [f'{dst_repo_path}::{entry["name"]}', '.'] with subprocess.Popen(create_args, cwd=dir, stdin=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True, env=env) as proc: last_list = None last_list_time = None for line in proc.stderr: try: json_line = json.loads(line) except json.decoder.JSONDecodeError: tqdm.write(line) continue t = '' if 'time' in json_line: ts = datetime.fromtimestamp(json_line['time']).replace(tzinfo=tzlocal()) t = f'{ts.isoformat(timespec="minutes")} ' if json_line['type'] == 'archive_progress': if last_list_time is None or ((datetime.now() - last_list_time) // timedelta(seconds=3)) % 2 == 1: if 'path' in json_line and json_line['path']: progress.set_description(f'… {json_line["path"]}', refresh=False) else: progress.set_description(None, refresh=False) elif last_list is not None: progress.set_description(last_list, refresh=False) progress.set_postfix(compressed=naturalsize(json_line['compressed_size'], binary=True), deduplicated=naturalsize(json_line['deduplicated_size'], binary=True), nfiles=f'{json_line["nfiles"]}/{total_files}', refresh=False) progress.update(json_line["original_size"] - seen) seen = json_line["original_size"] elif json_line['type'] == 'file_status': # tqdm.write(t + f'{json_line["status"]} {json_line["path"]}') last_list = f'{json_line["status"]} {json_line["path"]}' last_list_time = datetime.now() progress.set_description(last_list, refresh=False) elif (json_line['type'] == 'log_message' or json_line['type'] == 'progress_message' or json_line['type'] == 'progress_percent') and ('message' in json_line or 'msgid' in json_line): if 'message' in json_line: tqdm.write(t + json_line['message']) elif 'msgid' in json_line: tqdm.write(t + json_line['msgid']) else: tqdm.write(t + line) progress.set_description(None) if proc.wait() != 0: continue except subprocess.CalledProcessError as err: print(err, file=stderr) continue else: break mount_proc.terminate() os._exit(0) else: while True: waitpid, waitret = os.wait() if waitret != 0: sys.exit(waitret) if waitpid == child: break def sigterm(signum, frame): raise SystemExit(128 + signum) def main(): signal.signal(signal.SIGTERM, sigterm) if "::" in args.source: (src_repo_path, _, src_archive) = args.source.partition("::") entry = None for candidate_entry in read_repo(src_repo_path): if entry['name'] != src_archive: continue entry = candidate_entry break if entry is None: print("Did not find archive", file=stderr) os.exit(1) copy_archive(src_repo_path, args.target, entry) else: for entry in ToSync(): copy_archive(args.source, args.target, entry) if __name__ == "__main__": sys.exit(main())