From 42620945a92775ecab4f47280083ec8e0169317d Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Fri, 18 Feb 2022 11:57:23 +0100 Subject: vidhar: ... --- hosts/vidhar/borg/copy.py | 219 +++++++++++++++++++++++----------------------- 1 file changed, 108 insertions(+), 111 deletions(-) diff --git a/hosts/vidhar/borg/copy.py b/hosts/vidhar/borg/copy.py index 1714e2e8..168fa967 100755 --- a/hosts/vidhar/borg/copy.py +++ b/hosts/vidhar/borg/copy.py @@ -45,10 +45,17 @@ halo_args = { borg_pwd = getpwnam('borg') -def as_borg(caps=set()): +def as_borg(caps=set(), newns=True): if caps: for capset in [cap_permitted, cap_inheritable, cap_effective, cap_ambient]: capset.add(*caps) + + if newns: + unshare.unshare(unshare.CLONE_NEWNS) + subprocess.run(['mount', '--make-rprivate', '/'], check=True) + if os.environ.get('CREDENTIALS_DIRECTORY'): + creds_path = pathlib.Path(os.environ['CREDENTIALS_DIRECTORY']).relative_to('/') + subprocess.run(['bindfs', '-r', f'--force-user={borg_pwd.pw_uid}', f'--force-group={borg_pwd.pw_gid}', pathlib.Path('/') / creds_path, chroot / creds_path], check=True) os.setgid(borg_pwd.pw_gid) os.setuid(borg_pwd.pw_uid) @@ -104,8 +111,9 @@ def copy_archive(src_repo_path, dst_repo_path, entry): else: print(f'Will process {entry["name"]} ({dateutil.parser.isoparse(entry["start"])}, cache_suffix={cache_suffix})', file=stderr) with TemporaryDirectory(prefix=f'borg-mount_{entry["name"]}_', dir=os.environ.get('RUNTIME_DIRECTORY')) as tmpdir: - child = os.fork() - if child == 0: + dir = pathlib.Path('/borg') + + def preexec_fn(): # print('unshare/chroot', file=stderr) unshare.unshare(unshare.CLONE_NEWNS) subprocess.run(['mount', '--make-rprivate', '/'], check=True) @@ -126,123 +134,112 @@ def copy_archive(src_repo_path, dst_repo_path, entry): (chroot / bindMount).mkdir(parents=True,exist_ok=True) # print(*['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], file=stderr) subprocess.run(['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], check=True) - if os.environ.get('CREDENTIALS_DIRECTORY'): - creds_path = pathlib.Path(os.environ['CREDENTIALS_DIRECTORY']).relative_to('/') - subprocess.run(['binfs', '-r', f'--force-user={borg_pwd.pw_uid}', f'--force-group={borg_pwd.pw_gid}', pathlib.Path('/') / creds_path, chroot / creds_path], check=True) os.chroot(chroot) os.chdir('/') - dir = pathlib.Path('/borg') dir.mkdir(parents=True,exist_ok=True,mode=750) os.chown(dir, borg_pwd.pw_uid, borg_pwd.pw_gid) - with Halo(text=f'Determine size', **halo_args) as sp: - total_size = None - total_files = None - with subprocess.Popen(['borg', 'info', '--info', '--json', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}'], stdout=subprocess.PIPE, text=True, preexec_fn=lambda: as_borg()) as proc: - stats = json.load(proc.stdout)['archives'][0]['stats'] - total_size = stats['original_size'] - total_files = stats['nfiles'] + as_borg(newns=False, caps={Cap.DAC_READ_SEARCH}) + with Halo(text=f'Determine size', **halo_args) as sp: + total_size = None + total_files = None + with subprocess.Popen(['borg', 'info', '--info', '--json', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}'], stdout=subprocess.PIPE, text=True, preexec_fn=lambda: as_borg()) as proc: + stats = json.load(proc.stdout)['archives'][0]['stats'] + total_size = stats['original_size'] + total_files = stats['nfiles'] + if sp.enabled: + sp.succeed(f'{total_files} files, {naturalsize(total_size, binary=True)}') + else: + print(f'{total_files} files, {naturalsize(total_size, binary=True)}', file=stderr) + # print(f'Mounting to {dir}', file=stderr) + with subprocess.Popen(['borg', 'mount', '--foreground', '--progress', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}', dir], preexec_fn=lambda: as_borg()) as mount_proc: + with Halo(text='Waiting for mount', **halo_args) as sp: + wait_start = datetime.now() + while True: + ret = subprocess.run(['mountpoint', '-q', dir]) + if ret.returncode == 0: + break + elif datetime.now() - wait_start > timedelta(minutes=10): + ret.check_returncode() + sleep(0.1) if sp.enabled: - sp.succeed(f'{total_files} files, {naturalsize(total_size, binary=True)}') + sp.succeed('Mounted') else: - print(f'{total_files} files, {naturalsize(total_size, binary=True)}', file=stderr) - # print(f'Mounting to {dir}', file=stderr) - with subprocess.Popen(['borg', 'mount', '--foreground', '--progress', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}', dir], preexec_fn=lambda: as_borg()) as mount_proc: - with Halo(text='Waiting for mount', **halo_args) as sp: - wait_start = datetime.now() - while True: - ret = subprocess.run(['mountpoint', '-q', dir]) - if ret.returncode == 0: - break - elif datetime.now() - wait_start > timedelta(minutes=10): - ret.check_returncode() - sleep(0.1) - if sp.enabled: - sp.succeed('Mounted') - else: - print('Mounted', file=stderr) - while True: - try: - with tqdm(total=total_size, unit_scale=True, unit_divisor=1024, unit='B', smoothing=0.01, disable=None, dynamic_ncols=True, maxinterval=0.5, miniters=1) as progress: - seen = 0 - env = os.environ.copy() - create_args = ['borg', - 'create', - '--lock-wait=600', - '--one-file-system', - '--compression=auto,zstd,10', - '--chunker-params=10,23,16,4095', - '--files-cache=ctime,size', - '--show-rc', - # '--remote-ratelimit=20480', - '--log-json', - '--progress', - '--list', - '--filter=AMEi-x?', - '--stats' - ] - archive_time = datetime.strptime(entry["time"], "%Y-%m-%dT%H:%M:%S.%f").replace(tzinfo=tzlocal()).astimezone(tzutc()) - create_args += [f'--timestamp={archive_time.strftime("%Y-%m-%dT%H:%M:%S")}'] - if cache_suffix: - env['BORG_FILES_CACHE_SUFFIX'] = cache_suffix - else: - create_args += ['--files-cache=disabled'] - create_args += [f'{dst_repo_path}::{entry["name"]}', '.'] - with subprocess.Popen(create_args, cwd=dir, stdin=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True, env=env, preexec_fn=lambda: as_borg(caps={Cap.DAC_READ_SEARCH})) as proc: - last_list = None - last_list_time = None - for line in proc.stderr: - try: - json_line = json.loads(line) - except json.decoder.JSONDecodeError: - tqdm.write(line) - continue + print('Mounted', file=stderr) + while True: + try: + with tqdm(total=total_size, unit_scale=True, unit_divisor=1024, unit='B', smoothing=0.01, disable=None, dynamic_ncols=True, maxinterval=0.5, miniters=1) as progress: + seen = 0 + env = os.environ.copy() + create_args = ['borg', + 'create', + '--lock-wait=600', + '--one-file-system', + '--compression=auto,zstd,10', + '--chunker-params=10,23,16,4095', + '--files-cache=ctime,size', + '--show-rc', + # '--remote-ratelimit=20480', + '--log-json', + '--progress', + '--list', + '--filter=AMEi-x?', + '--stats' + ] + archive_time = datetime.strptime(entry["time"], "%Y-%m-%dT%H:%M:%S.%f").replace(tzinfo=tzlocal()).astimezone(tzutc()) + create_args += [f'--timestamp={archive_time.strftime("%Y-%m-%dT%H:%M:%S")}'] + if cache_suffix: + env['BORG_FILES_CACHE_SUFFIX'] = cache_suffix + else: + create_args += ['--files-cache=disabled'] + create_args += [f'{dst_repo_path}::{entry["name"]}', '.'] + with subprocess.Popen(create_args, cwd=dir, stdin=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True, env=env, preexec_fn=preexec_fn) as proc: + last_list = None + last_list_time = None + for line in proc.stderr: + try: + json_line = json.loads(line) + except json.decoder.JSONDecodeError: + tqdm.write(line) + continue - t = '' - if 'time' in json_line and stderr.isatty(): - ts = datetime.fromtimestamp(json_line['time']).replace(tzinfo=tzlocal()) - t = f'{ts.isoformat(timespec="minutes")} ' - if json_line['type'] == 'archive_progress': - if last_list_time is None or ((datetime.now() - last_list_time) // timedelta(seconds=3)) % 2 == 1: - if 'path' in json_line and json_line['path']: - progress.set_description(f'… {json_line["path"]}', refresh=False) - else: - progress.set_description(None, refresh=False) - elif last_list is not None: - progress.set_description(last_list, refresh=False) - progress.set_postfix(compressed=naturalsize(json_line['compressed_size'], binary=True), deduplicated=naturalsize(json_line['deduplicated_size'], binary=True), nfiles=f'{json_line["nfiles"]}/{total_files}', refresh=False) - progress.update(json_line["original_size"] - seen) - seen = json_line["original_size"] - elif json_line['type'] == 'file_status': - # tqdm.write(t + f'{json_line["status"]} {json_line["path"]}') - last_list = f'{json_line["status"]} {json_line["path"]}' - last_list_time = datetime.now() + t = '' + if 'time' in json_line and stderr.isatty(): + ts = datetime.fromtimestamp(json_line['time']).replace(tzinfo=tzlocal()) + t = f'{ts.isoformat(timespec="minutes")} ' + if json_line['type'] == 'archive_progress': + if last_list_time is None or ((datetime.now() - last_list_time) // timedelta(seconds=3)) % 2 == 1: + if 'path' in json_line and json_line['path']: + progress.set_description(f'… {json_line["path"]}', refresh=False) + else: + progress.set_description(None, refresh=False) + elif last_list is not None: progress.set_description(last_list, refresh=False) - if not stderr.isatty(): - print(last_list, file=stderr) - elif (json_line['type'] == 'log_message' or json_line['type'] == 'progress_message' or json_line['type'] == 'progress_percent') and ('message' in json_line or 'msgid' in json_line): - if 'message' in json_line: - tqdm.write(t + json_line['message']) - elif 'msgid' in json_line: - tqdm.write(t + json_line['msgid']) - else: - tqdm.write(t + line) - progress.set_description(None) - if proc.wait() != 0: - continue - except subprocess.CalledProcessError as err: - print(err, file=stderr) - continue - else: - break - mount_proc.terminate() - os._exit(0) - else: - while True: - waitpid, waitret = os.wait() - if waitret != 0: - sys.exit(waitret) - if waitpid == child: + progress.set_postfix(compressed=naturalsize(json_line['compressed_size'], binary=True), deduplicated=naturalsize(json_line['deduplicated_size'], binary=True), nfiles=f'{json_line["nfiles"]}/{total_files}', refresh=False) + progress.update(json_line["original_size"] - seen) + seen = json_line["original_size"] + elif json_line['type'] == 'file_status': + # tqdm.write(t + f'{json_line["status"]} {json_line["path"]}') + last_list = f'{json_line["status"]} {json_line["path"]}' + last_list_time = datetime.now() + progress.set_description(last_list, refresh=False) + if not stderr.isatty(): + print(last_list, file=stderr) + elif (json_line['type'] == 'log_message' or json_line['type'] == 'progress_message' or json_line['type'] == 'progress_percent') and ('message' in json_line or 'msgid' in json_line): + if 'message' in json_line: + tqdm.write(t + json_line['message']) + elif 'msgid' in json_line: + tqdm.write(t + json_line['msgid']) + else: + tqdm.write(t + line) + progress.set_description(None) + if proc.wait() != 0: + continue + except subprocess.CalledProcessError as err: + print(err, file=stderr) + continue + else: break + mount_proc.terminate() def sigterm(signum, frame): raise SystemExit(128 + signum) -- cgit v1.2.3