From 6bafcb244bd6cd031ac9b65fce8a2a939698ecaa Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Wed, 2 Nov 2022 16:31:02 +0100 Subject: ... --- hosts/vidhar/borg/copy.py | 291 ------------- hosts/vidhar/borg/copy/copy_borg/__main__.py | 556 +++++++++++++++++++++++++ hosts/vidhar/borg/copy/setup.py | 10 + hosts/vidhar/borg/default.nix | 54 ++- modules/borgsnap/borgsnap/borgsnap/__main__.py | 6 +- 5 files changed, 594 insertions(+), 323 deletions(-) delete mode 100755 hosts/vidhar/borg/copy.py create mode 100755 hosts/vidhar/borg/copy/copy_borg/__main__.py create mode 100644 hosts/vidhar/borg/copy/setup.py diff --git a/hosts/vidhar/borg/copy.py b/hosts/vidhar/borg/copy.py deleted file mode 100755 index c648c4f6..00000000 --- a/hosts/vidhar/borg/copy.py +++ /dev/null @@ -1,291 +0,0 @@ -#!@python@/bin/python - -import json -import os -import subprocess -import re -import sys -from sys import stderr -from humanize import naturalsize - -from tempfile import TemporaryDirectory - -from datetime import (datetime, timedelta) -from dateutil.tz import (tzlocal, tzutc) -import dateutil.parser -import argparse - -from tqdm import tqdm - -from xdg import xdg_runtime_dir -import pathlib - -import unshare -from pyprctl import cap_permitted, cap_inheritable, cap_effective, cap_ambient, Cap -from pwd import getpwnam - -import signal -from time import sleep - -from halo import Halo - -from collections import deque - - -parser = argparse.ArgumentParser() -parser.add_argument('source', metavar='REPO_OR_ARCHIVE') -parser.add_argument('target', metavar='REPO_OR_ARCHIVE') -args = parser.parse_args() - -halo_args = { - 'stream': stderr, - 'enabled': stderr.isatty(), - 'spinner': 'arc' -} - -borg_pwd = getpwnam('borg') - -def as_borg(caps=set(), cwd=None): - if caps: - cap_permitted.add(*caps) - cap_inheritable.add(*caps) - cap_effective.add(*caps) - cap_ambient.add(*caps) - - os.setgid(borg_pwd.pw_gid) - os.setuid(borg_pwd.pw_uid) - - if cwd is not None: - os.chdir(cwd) - -def read_repo(path): - with Halo(text=f'Listing {path}', **halo_args) as sp: - res = None - with subprocess.Popen(['borg', 'list', '--info', '--lock-wait=600', '--json', path], stdout=subprocess.PIPE, preexec_fn=lambda: as_borg()) as proc: - res = json.load(proc.stdout)['archives'] - if sp.enabled: - sp.succeed(f'{len(res)} archives in {path}') - else: - print(f'{len(res)} archives in {path}', file=stderr) - return res - -class ToSync: - to_sync = deque() - - def __iter__(self): - return self - - def __next__(self): - if self.to_sync: - return self.to_sync.popleft() - - while True: - try: - src = read_repo(args.source) - dst = read_repo(args.target) - except (subprocess.CalledProcessError, json.decoder.JSONDecodeError) as err: - print(err, file=stderr) - continue - - self.to_sync.extend([entry for entry in src if entry['name'] not in {dst_entry['name'] for dst_entry in dst} and not entry['name'].endswith('.checkpoint')]) - - if self.to_sync: - return self.to_sync.popleft() - - raise StopIteration - -def copy_archive(src_repo_path, dst_repo_path, entry): - cache_suffix = None - with Halo(text=f'Determine archive parameters', **halo_args) as sp: - match = re.compile('^(.*)-[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(\.(checkpoint|recreate)(\.[0-9]+)?)?').fullmatch(entry['name']) - if match: - repo_id = None - with subprocess.Popen(['borg', 'info', '--info', '--lock-wait=600', '--json', src_repo_path], stdout=subprocess.PIPE, preexec_fn=lambda: as_borg()) as proc: - repo_id = json.load(proc.stdout)['repository']['id'] - if repo_id: - cache_suffix = f'{repo_id}_{match.group(1)}' - if sp.enabled: - sp.succeed(f'Will process {entry["name"]} ({dateutil.parser.isoparse(entry["start"])}, cache_suffix={cache_suffix})') - else: - print(f'Will process {entry["name"]} ({dateutil.parser.isoparse(entry["start"])}, cache_suffix={cache_suffix})', file=stderr) - with TemporaryDirectory(prefix=f'borg-mount_{entry["name"]}_', dir=os.environ.get('RUNTIME_DIRECTORY')) as tmpdir: - child = os.fork() - if child == 0: - # print('unshare/chroot', file=stderr) - unshare.unshare(unshare.CLONE_NEWNS) - subprocess.run(['mount', '--make-rprivate', '/'], check=True) - chroot = pathlib.Path(tmpdir) / 'chroot' - upper = pathlib.Path(tmpdir) / 'upper' - work = pathlib.Path(tmpdir) / 'work' - for path in [chroot,upper,work]: - path.mkdir() - subprocess.run(['mount', '-t', 'overlay', 'overlay', '-o', f'lowerdir=/,upperdir={upper},workdir={work}', chroot], check=True) - bindMounts = ['nix', 'run', 'run/secrets.d', 'run/wrappers', 'proc', 'dev', 'sys', pathlib.Path(os.path.expanduser('~')).relative_to('/')] - if os.environ.get('BORG_BASE_DIR'): - bindMounts.append(pathlib.Path(os.environ['BORG_BASE_DIR']).relative_to('/')) - if not ":" in src_repo_path: - bindMounts.append(pathlib.Path(src_repo_path).relative_to('/')) - if 'SSH_AUTH_SOCK' in os.environ: - bindMounts.append(pathlib.Path(os.environ['SSH_AUTH_SOCK']).parent.relative_to('/')) - for bindMount in bindMounts: - (chroot / bindMount).mkdir(parents=True,exist_ok=True) - # print(*['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], file=stderr) - subprocess.run(['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], check=True) - os.chroot(chroot) - os.chdir('/') - try: - os.unlink('/etc/fuse.conf') - except FileNotFoundError: - pass - pathlib.Path('/etc/fuse.conf').parent.mkdir(parents=True,exist_ok=True) - with open('/etc/fuse.conf', 'w') as fuse_conf: - fuse_conf.write('user_allow_other\nmount_max = 1000\n') - dir = pathlib.Path('/borg') - dir.mkdir(parents=True,exist_ok=True,mode=0o0750) - os.chown(dir, borg_pwd.pw_uid, borg_pwd.pw_gid) - - total_size = None - total_files = None - if stderr.isatty(): - with Halo(text=f'Determine size', **halo_args) as sp: - with subprocess.Popen(['borg', 'info', '--info', '--json', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}'], stdout=subprocess.PIPE, text=True, preexec_fn=lambda: as_borg()) as proc: - stats = json.load(proc.stdout)['archives'][0]['stats'] - total_size = stats['original_size'] - total_files = stats['nfiles'] - if sp.enabled: - sp.succeed(f'{total_files} files, {naturalsize(total_size, binary=True)}') - else: - print(f'{total_files} files, {naturalsize(total_size, binary=True)}', file=stderr) - # print(f'Mounting to {dir}', file=stderr) - with subprocess.Popen(['borg', 'mount', '-o', 'allow_other,ignore_permissions', '--foreground', '--progress', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}', dir], preexec_fn=lambda: as_borg()) as mount_proc: - with Halo(text='Waiting for mount', **halo_args) as sp: - wait_start = datetime.now() - while True: - if os.path.ismount(dir): - break - elif datetime.now() - wait_start > timedelta(minutes=15): - ret.check_returncode() - sleep(0.1) - if sp.enabled: - sp.succeed('Mounted') - else: - print('Mounted', file=stderr) - while True: - with tqdm(total=total_size, unit_scale=True, unit_divisor=1024, unit='B', smoothing=0.01, disable=None, dynamic_ncols=True, maxinterval=0.5, miniters=1) as progress: - seen = 0 - env = os.environ.copy() - create_args = ['borg', - 'create', - '--lock-wait=600', - '--one-file-system', - '--compression=auto,zstd,10', - '--chunker-params=10,23,16,4095', - '--files-cache=ctime,size', - '--show-rc', - '--upload-buffer=100', - '--upload-ratelimit=20480', - '--log-json', - '--progress', - '--list', - '--filter=AMEi-x?', - '--stats' - ] - archive_time = datetime.strptime(entry["time"], "%Y-%m-%dT%H:%M:%S.%f").replace(tzinfo=tzlocal()).astimezone(tzutc()) - create_args += [f'--timestamp={archive_time.strftime("%Y-%m-%dT%H:%M:%S")}'] - if cache_suffix: - env['BORG_FILES_CACHE_SUFFIX'] = cache_suffix - else: - create_args += ['--files-cache=disabled'] - create_args += [f'{dst_repo_path}::{entry["name"]}', '.'] - - with subprocess.Popen(create_args, stdin=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True, env=env, preexec_fn=lambda: as_borg(caps={Cap.DAC_READ_SEARCH}, cwd=dir)) as proc: - last_list = None - last_list_time = None - for line in proc.stderr: - try: - json_line = json.loads(line) - except json.decoder.JSONDecodeError: - tqdm.write(line) - continue - - t = '' - if 'time' in json_line and stderr.isatty(): - ts = datetime.fromtimestamp(json_line['time']).replace(tzinfo=tzlocal()) - t = f'{ts.isoformat(timespec="minutes")} ' - if json_line['type'] == 'archive_progress': - if last_list_time is None or ((datetime.now() - last_list_time) // timedelta(seconds=3)) % 2 == 1: - if 'path' in json_line and json_line['path']: - progress.set_description(f'… {json_line["path"]}', refresh=False) - else: - progress.set_description(None, refresh=False) - elif last_list is not None: - progress.set_description(last_list, refresh=False) - nfiles=json_line["nfiles"] - if total_files is not None: - nfiles=f'{json_line["nfiles"]}/{total_files}' - progress.set_postfix(compressed=naturalsize(json_line['compressed_size'], binary=True), deduplicated=naturalsize(json_line['deduplicated_size'], binary=True), nfiles=nfiles, refresh=False) - progress.update(json_line["original_size"] - seen) - seen = json_line["original_size"] - elif json_line['type'] == 'file_status': - # tqdm.write(t + f'{json_line["status"]} {json_line["path"]}') - last_list = f'{json_line["status"]} {json_line["path"]}' - last_list_time = datetime.now() - progress.set_description(last_list, refresh=False) - if not stderr.isatty(): - print(last_list, file=stderr) - elif (json_line['type'] == 'log_message' or json_line['type'] == 'progress_message' or json_line['type'] == 'progress_percent') and ('message' in json_line or 'msgid' in json_line): - if 'message' in json_line: - tqdm.write(t + json_line['message']) - elif 'msgid' in json_line: - tqdm.write(t + json_line['msgid']) - else: - tqdm.write(t + line) - progress.set_description(None) - if proc.wait() != 0: - dst = None - try: - dst = read_repo(args.target) - except (subprocess.CalledProcessError, json.decoder.JSONDecodeError) as err: - print(err, file=stderr) - continue - else: - if any(map(lambda other: entry['name'] == other['name'], dst)): - break - - continue - mount_proc.terminate() - os._exit(0) - else: - while True: - waitpid, waitret = os.wait() - if waitret != 0: - sys.exit(waitret) - if waitpid == child: - break - -def sigterm(signum, frame): - raise SystemExit(128 + signum) - -def main(): - signal.signal(signal.SIGTERM, sigterm) - - if "::" in args.source: - (src_repo_path, _, src_archive) = args.source.partition("::") - entry = None - for candidate_entry in read_repo(src_repo_path): - if entry['name'] != src_archive: - continue - entry = candidate_entry - break - - if entry is None: - print("Did not find archive", file=stderr) - os.exit(1) - - copy_archive(src_repo_path, args.target, entry) - else: - for entry in ToSync(): - copy_archive(args.source, args.target, entry) - -if __name__ == "__main__": - sys.exit(main()) diff --git a/hosts/vidhar/borg/copy/copy_borg/__main__.py b/hosts/vidhar/borg/copy/copy_borg/__main__.py new file mode 100755 index 00000000..5b374d99 --- /dev/null +++ b/hosts/vidhar/borg/copy/copy_borg/__main__.py @@ -0,0 +1,556 @@ +#!@python@/bin/python + +import json +import os +import subprocess +import re +import sys +import io +from sys import stderr +from humanize import naturalsize + +from tempfile import TemporaryDirectory + +from datetime import (datetime, timedelta) +from dateutil.tz import (tzlocal, tzutc) +import dateutil.parser +import argparse + +from tqdm import tqdm + +from xdg import xdg_runtime_dir +import pathlib + +import unshare +from pyprctl import CapState, Cap, cap_ambient_raise, cap_ambient_is_set, set_keepcaps +from pwd import getpwnam + +import logging + +import signal +import time +import math + +from halo import Halo + +from collections import deque + +import select +import fcntl + +from multiprocessing import Process, Manager +from contextlib import closing + + +halo_args = { + 'stream': stderr, + 'enabled': stderr.isatty(), + 'spinner': 'arc' +} + +borg_pwd = getpwnam('borg') + +def as_borg(caps=set()): + global logger + + try: + if caps: + c_state = CapState.get_current() + c_state.permitted.add(*caps) + c_state.set_current() + + # logger.debug("before setgid/setuid: cap_permitted=%s", CapState.get_current().permitted) + + set_keepcaps(True) + + os.setgid(borg_pwd.pw_gid) + os.setuid(borg_pwd.pw_uid) + + if caps: + # logger.debug("after setgid/setuid: cap_permitted=%s", CapState.get_current().permitted) + + c_state = CapState.get_current() + c_state.permitted = caps.copy() + c_state.inheritable.add(*caps) + c_state.set_current() + + # logger.debug("cap_permitted=%s", CapState.get_current().permitted) + # logger.debug("cap_inheritable=%s", CapState.get_current().inheritable) + + for cap in caps: + cap_ambient_raise(cap) + # logger.debug("cap_ambient[%s]=%s", cap, cap_ambient_is_set(cap)) + except Exception: + logger.error(format_exc()) + raise + +def borg_json(*args, **kwargs): + global logger + + with subprocess.Popen(*args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, **kwargs) as proc: + stdout_buffer = io.BytesIO() + + proc_logger = logger.getChild('borg') + stdout_logger = proc_logger.getChild('stdout') + stderr_logger = proc_logger.getChild('stderr') + + fcntl.fcntl(proc.stdout.fileno(), fcntl.F_SETFL, fcntl.fcntl(proc.stdout.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK) + fcntl.fcntl(proc.stderr.fileno(), fcntl.F_SETFL, fcntl.fcntl(proc.stderr.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK) + + poll = select.poll() + poll.register(proc.stdout, select.POLLIN | select.POLLHUP) + poll.register(proc.stderr, select.POLLIN | select.POLLHUP) + pollc = 2 + events = poll.poll() + stderr_linebuf = bytearray() + + while pollc > 0 and len(events) > 0: + for rfd, event in events: + if event & select.POLLIN: + if rfd == proc.stdout.fileno(): + try: + buf = os.read(proc.stdout.fileno(), 8192) + # stdout_logger.debug(buf) + stdout_buffer.write(buf) + except BlockingIOError: + pass + if rfd == proc.stderr.fileno(): + try: + stderr_linebuf.extend(os.read(proc.stderr.fileno(), 8192)) + except BlockingIOError: + pass + + while stderr_linebuf: + line, sep, stderr_linebuf = stderr_linebuf.partition(b'\n') + if not sep: + stderr_linebuf = line + break + + stderr_logger.info(line.decode()) + if event == select.POLLHUP: + poll.unregister(rfd) + pollc -= 1 + + if pollc > 0: + events = poll.poll() + + for handler in proc_logger.handlers: + handler.flush() + + ret = proc.wait() + if ret != 0: + raise Exception(f'borg subprocess exited with returncode {ret}') + + stdout_buffer.seek(0) + return json.load(stdout_buffer) + +def read_repo(path): + global logger + + with Halo(text=f'Listing {path}', **halo_args) as sp: + if not sp.enabled: + logger.debug('Listing %s...', path) + res = borg_json(['borg', 'list', '--info', '--lock-wait=600', '--json', path], preexec_fn=lambda: as_borg())['archives'] + if sp.enabled: + sp.succeed(f'{len(res)} archives in {path}') + else: + logger.info('%d archives in ‘%s’', len(res), path) + return res + +class ToSync: + to_sync = deque() + + def __init__(self, source, target): + self.source = source + self.target = target + + def __iter__(self): + return self + + def __next__(self): + global logger + + if self.to_sync: + return self.to_sync.popleft() + + while True: + try: + src = read_repo(self.source) + dst = read_repo(self.target) + except (subprocess.CalledProcessError, json.decoder.JSONDecodeError) as err: + logger.error(err) + continue + + self.to_sync.extend([entry for entry in src if entry['name'] not in {dst_entry['name'] for dst_entry in dst} and not entry['name'].endswith('.checkpoint')]) + + if self.to_sync: + return self.to_sync.popleft() + + raise StopIteration + +def copy_archive(src_repo_path, dst_repo_path, entry): + global logger + + def do_copy(tmpdir_q): + global logger + + nonlocal src_repo_path, dst_repo_path, entry + + tmpdir = tmpdir_q.get() + + cache_suffix = None + with Halo(text=f'Determine archive parameters', **halo_args) as sp: + if not sp.enabled: + logger.debug('Determining archive parameters...') + match = re.compile('^(.*)-[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(\.(checkpoint|recreate)(\.[0-9]+)?)?').fullmatch(entry['name']) + if match: + repo_id = borg_json(['borg', 'info', '--info', '--lock-wait=600', '--json', src_repo_path], preexec_fn=lambda: as_borg())['repository']['id'] + + if repo_id: + cache_suffix = f'{repo_id}_{match.group(1)}' + if sp.enabled: + sp.succeed(f'Will process {entry["name"]} ({dateutil.parser.isoparse(entry["start"])}, cache_suffix={cache_suffix})') + else: + logger.info('Will process ‘%s’ (%s, cache_suffix=%s)', entry['name'], dateutil.parser.isoparse(entry['start']), cache_suffix) + + logger.debug('Setting up environment...') + unshare.unshare(unshare.CLONE_NEWNS) + subprocess.run(['mount', '--make-rprivate', '/'], check=True) + chroot = pathlib.Path(tmpdir) / 'chroot' + upper = pathlib.Path(tmpdir) / 'upper' + work = pathlib.Path(tmpdir) / 'work' + for path in [chroot,upper,work]: + path.mkdir() + subprocess.run(['mount', '-t', 'overlay', 'overlay', '-o', f'lowerdir=/,upperdir={upper},workdir={work}', chroot], check=True) + bindMounts = ['nix', 'run', 'run/secrets.d', 'run/wrappers', 'proc', 'dev', 'sys', pathlib.Path(os.path.expanduser('~')).relative_to('/')] + if os.environ.get('BORG_BASE_DIR'): + bindMounts.append(pathlib.Path(os.environ['BORG_BASE_DIR']).relative_to('/')) + if not ":" in src_repo_path: + bindMounts.append(pathlib.Path(src_repo_path).relative_to('/')) + if 'SSH_AUTH_SOCK' in os.environ: + bindMounts.append(pathlib.Path(os.environ['SSH_AUTH_SOCK']).parent.relative_to('/')) + for bindMount in bindMounts: + (chroot / bindMount).mkdir(parents=True,exist_ok=True) + subprocess.run(['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], check=True) + os.chroot(chroot) + os.chdir('/') + try: + os.unlink('/etc/fuse.conf') + except FileNotFoundError: + pass + pathlib.Path('/etc/fuse.conf').parent.mkdir(parents=True,exist_ok=True) + with open('/etc/fuse.conf', 'w') as fuse_conf: + fuse_conf.write('user_allow_other\nmount_max = 1000\n') + dir = pathlib.Path('/borg') + dir.mkdir(parents=True,exist_ok=True,mode=0o0750) + os.chown(dir, borg_pwd.pw_uid, borg_pwd.pw_gid) + + total_size = None + total_files = None + if stderr.isatty(): + with Halo(text=f'Determine size', **halo_args) as sp: + stats = borg_json(['borg', 'info', '--info', '--json', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}'], preexec_fn=lambda: as_borg())['archives'][0]['stats'] + total_size = stats['original_size'] + total_files = stats['nfiles'] + if sp.enabled: + sp.succeed(f'{total_files} files, {naturalsize(total_size, binary=True)}') + else: + logger.info('%d files, %s', total_files, naturalsize(total_size, binary=True)) + with subprocess.Popen(['borg', 'mount', '-o', 'allow_other,ignore_permissions', '--foreground', '--progress', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}', dir], preexec_fn=lambda: as_borg()) as mount_proc: + with Halo(text='Waiting for mount', **halo_args) as sp: + if not sp.enabled: + logger.debug('Waiting for mount...') + wait_start = datetime.now() + while True: + if os.path.ismount(dir): + break + elif datetime.now() - wait_start > timedelta(minutes=15): + ret.check_returncode() + time.sleep(0.1) + if sp.enabled: + sp.succeed('Mounted') + else: + logger.info('Mounted %s', f'{src_repo_path}::{entry["name"]}') + + while True: + with tqdm(total=total_size, unit_scale=True, unit_divisor=1024, unit='B', smoothing=0.01, disable=None, dynamic_ncols=True, maxinterval=0.5, miniters=1) as progress: + seen = 0 + env = os.environ.copy() + create_args = ['borg', + 'create', + '--lock-wait=600', + '--one-file-system', + '--compression=auto,zstd,10', + '--chunker-params=10,23,16,4095', + '--files-cache=ctime,size', + '--show-rc', + '--upload-buffer=100', + '--upload-ratelimit=20480', + '--log-json', + '--progress', + '--list', + '--filter=AMEi-x?', + '--stats' + ] + archive_time = datetime.strptime(entry["time"], "%Y-%m-%dT%H:%M:%S.%f").replace(tzinfo=tzlocal()).astimezone(tzutc()) + create_args += [f'--timestamp={archive_time.strftime("%Y-%m-%dT%H:%M:%S")}'] + if cache_suffix: + env['BORG_FILES_CACHE_SUFFIX'] = cache_suffix + else: + create_args += ['--files-cache=disabled'] + create_args += [f'{dst_repo_path}::{entry["name"]}', '.'] + + with subprocess.Popen(create_args, stdin=subprocess.DEVNULL, stderr=subprocess.PIPE, stdout=subprocess.PIPE, env=env, preexec_fn=lambda: as_borg(caps={Cap.DAC_READ_SEARCH}), cwd=dir) as proc: + last_list = None + last_list_time = time.monotonic_ns() + logger.info('Creating...') + + proc_logger = logger.getChild('borg') + stdout_logger = proc_logger.getChild('stdout') + stderr_logger = proc_logger.getChild('stderr') + + fcntl.fcntl(proc.stdout.fileno(), fcntl.F_SETFL, fcntl.fcntl(proc.stdout.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK) + fcntl.fcntl(proc.stderr.fileno(), fcntl.F_SETFL, fcntl.fcntl(proc.stderr.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK) + + poll = select.poll() + poll.register(proc.stdout, select.POLLIN | select.POLLHUP) + poll.register(proc.stderr, select.POLLIN | select.POLLHUP) + pollc = 2 + events = poll.poll() + stdout_linebuf = bytearray() + stderr_linebuf = bytearray() + + while pollc > 0 and len(events) > 0: + # logger.debug('%d events', len(events)) + for rfd, event in events: + # logger.debug('event %s', event) + if event & select.POLLIN: + if rfd == proc.stdout.fileno(): + try: + # logger.debug('reading stdout...') + stdout_linebuf.extend(os.read(proc.stdout.fileno(), 8192)) + # logger.debug('read stdout, len(stdout_linebuf)=%d', len(stdout_linebuf)) + except BlockingIOError: + pass + + while stdout_linebuf: + # logger.debug('stdout line...') + line, sep, stdout_linebuf = stdout_linebuf.partition(b'\n') + if not sep: + stdout_linebuf = line + break + + stdout_logger.info(line.decode()) + # logger.debug('handled stdout lines, %d leftover', len(stdout_linebuf)) + if rfd == proc.stderr.fileno(): + try: + # logger.debug('reading stderr...') + stderr_linebuf.extend(os.read(proc.stderr.fileno(), 8192)) + # logger.debug('read stderr, len(stderr_linebuf)=%d', len(stderr_linebuf)) + except BlockingIOError: + pass + + while stderr_linebuf: + # logger.debug('stderr line...') + line, sep, stderr_linebuf = stderr_linebuf.partition(b'\n') + if not sep: + stderr_linebuf = line + break + + try: + json_line = json.loads(line) + except json.decoder.JSONDecodeError: + if progress.disable: + stderr_logger.error(line.decode()) + else: + tqdm.write(line.decode()) + continue + + # logger.debug('stderr line decoded: %s', json_line['type'] if 'type' in json_line else None) + + t = '' + if 'time' in json_line and not progress.disable: + ts = datetime.fromtimestamp(json_line['time']).replace(tzinfo=tzlocal()) + t = f'{ts.isoformat(timespec="minutes")} ' + if json_line['type'] == 'archive_progress' and not progress.disable: + now = time.monotonic_ns() + if last_list_time is None or now - last_list_time >= 3e9: + last_list_time = now + if 'path' in json_line and json_line['path']: + progress.set_description(f'… {json_line["path"]}', refresh=False) + else: + progress.set_description(None, refresh=False) + elif last_list is not None: + progress.set_description(last_list, refresh=False) + nfiles=json_line["nfiles"] + if total_files is not None: + nfiles=f'{json_line["nfiles"]}/{total_files}' + progress.set_postfix(compressed=naturalsize(json_line['compressed_size'], binary=True), deduplicated=naturalsize(json_line['deduplicated_size'], binary=True), nfiles=nfiles, refresh=False) + progress.update(json_line["original_size"] - seen) + seen = json_line["original_size"] + elif json_line['type'] == 'archive_progress': + now = time.monotonic_ns() + if last_list_time is None or now - last_list_time >= 3e9: + last_list_time = now + if 'path' in json_line and json_line['path']: + stderr_logger.debug('… %s (%s)', json_line["path"], naturalsize(json_line["original_size"])) + else: + stderr_logger.debug('… (%s)', naturalsize(json_line["original_size"])) + elif json_line['type'] == 'file_status': + # tqdm.write(t + f'{json_line["status"]} {json_line["path"]}') + last_list = f'{json_line["status"]} {json_line["path"]}' + last_list_time = time.monotonic_ns() + progress.set_description(last_list, refresh=False) + if progress.disable: + stderr_logger.info(last_list) + elif (json_line['type'] == 'log_message' or json_line['type'] == 'progress_message' or json_line['type'] == 'progress_percent') and ('message' in json_line or 'msgid' in json_line): + if 'message' in json_line: + if progress.disable: + stderr_logger.info(t + json_line['message']) + else: + tqdm.write(t + json_line['message']) + elif 'msgid' in json_line: + if progress.disable: + stderr_logger.info(t + json_line['msgid']) + else: + tqdm.write(t + json_line['msgid']) + else: + if progress.disable: + stderr_logger.info(t + line.decode()) + else: + tqdm.write(t + line.decode()) + # logger.debug('handled stderr lines, %d leftover', len(stderr_linebuf)) + if event == select.POLLHUP: + poll.unregister(rfd) + pollc -= 1 + + if pollc > 0: + # logger.debug('polling %d fds...', pollc) + events = poll.poll() + # logger.debug('done polling') + + # logger.debug('borg create closed stdout/stderr') + if stdout_linebuf: + logger.error('unterminated line leftover in stdout: %s', stdout_linebuf) + if stderr_linebuf: + logger.error('unterminated line leftover in stdout: %s', stderr_linebuf) + progress.set_description(None) + ret = proc.wait() + # logger.debug('borg create terminated; ret=%d', ret) + if ret != 0: + dst = None + try: + dst = read_repo(dst_repo_path) + except (subprocess.CalledProcessError, json.decoder.JSONDecodeError) as err: + logger.error(err) + continue + else: + if any(map(lambda other: entry['name'] == other['name'], dst)): + logger.info('destination exists, terminating') + break + + logger.warn('destination does not exist, retrying') + continue + else: + # logger.debug('terminating') + break + mount_proc.terminate() + + with Manager() as manager: + tmpdir_q = manager.Queue(1) + + with closing(Process(target=do_copy, args=(tmpdir_q,), name='do_copy')) as p: + p.start() + + with TemporaryDirectory(prefix=f'borg-mount_{entry["name"]}_', dir=os.environ.get('RUNTIME_DIRECTORY')) as tmpdir: + tmpdir_q.put(tmpdir) + p.join() + return p.exitcode + +def sigterm(signum, frame): + raise SystemExit(128 + signum) + +def main(): + signal.signal(signal.SIGTERM, sigterm) + + global logger + logger = logging.getLogger(__name__) + console_handler = logging.StreamHandler() + console_handler.setFormatter( logging.Formatter('[%(levelname)s](%(name)s): %(message)s') ) + if sys.stderr.isatty(): + console_handler.setFormatter( logging.Formatter('%(asctime)s [%(levelname)s](%(name)s): %(message)s') ) + + burst_max = 1000 + burst = burst_max + last_use = None + inv_rate = 1e7 + def consume_filter(record): + nonlocal burst, burst_max, inv_rate, last_use + + delay = None + while True: + now = time.monotonic_ns() + burst = min(burst_max, burst + math.floor((now - last_use) / inv_rate)) if last_use else burst_max + last_use = now + + if burst > 0: + burst -= 1 + if delay: + delay = now - delay + + return True + + if delay is None: + delay = now + time.sleep(inv_rate / 1e9) + console_handler.addFilter(consume_filter) + + logging.getLogger().addHandler(console_handler) + + # log uncaught exceptions + def log_exceptions(type, value, tb): + global logger + + logger.error(value) + sys.__excepthook__(type, value, tb) # calls default excepthook + + sys.excepthook = log_exceptions + + parser = argparse.ArgumentParser(prog='copy') + parser.add_argument('--verbosity', dest='log_level', action='append', type=int) + parser.add_argument('--verbose', '-v', dest='log_level', action='append_const', const=1) + parser.add_argument('--quiet', '-q', dest='log_level', action='append_const', const=-1) + parser.add_argument('source', metavar='REPO_OR_ARCHIVE') + parser.add_argument('target', metavar='REPO_OR_ARCHIVE') + args = parser.parse_args() + + + LOG_LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL] + DEFAULT_LOG_LEVEL = logging.ERROR + log_level = LOG_LEVELS.index(DEFAULT_LOG_LEVEL) + + for adjustment in args.log_level or (): + log_level = min(len(LOG_LEVELS) - 1, max(log_level - adjustment, 0)) + logger.setLevel(LOG_LEVELS[log_level]) + + + if "::" in args.source: + (src_repo_path, _, src_archive) = args.source.partition("::") + entry = None + for candidate_entry in read_repo(src_repo_path): + if entry['name'] != src_archive: + continue + entry = candidate_entry + break + + if entry is None: + logger.critical("Did not find archive ‘%s’", src_archive) + os.exit(1) + + copy_archive(src_repo_path, args.target, entry) + else: + for entry in ToSync(args.source, args.target): + copy_archive(args.source, args.target, entry) + +if __name__ == "__main__": + sys.exit(main()) diff --git a/hosts/vidhar/borg/copy/setup.py b/hosts/vidhar/borg/copy/setup.py new file mode 100644 index 00000000..f77d9560 --- /dev/null +++ b/hosts/vidhar/borg/copy/setup.py @@ -0,0 +1,10 @@ +from setuptools import setup + +setup(name='copy_borg', + packages=['copy_borg'], + entry_points={ + 'console_scripts': [ + 'copy_borg=copy_borg.__main__:main', + ], + } +) diff --git a/hosts/vidhar/borg/default.nix b/hosts/vidhar/borg/default.nix index 8d0b46ef..7672de18 100644 --- a/hosts/vidhar/borg/default.nix +++ b/hosts/vidhar/borg/default.nix @@ -26,7 +26,7 @@ let in nameValuePair serviceName { serviceConfig = { Type = "oneshot"; - ExecStart = "${copyBorg}/bin/copy ${escapeShellArg repo} yggdrasil.borgbase:repo"; + ExecStart = "${copyBorg}/bin/copy_borg --verbosity 3 ${escapeShellArg repo} yggdrasil.borgbase:repo"; TimeoutStartSec = "8h"; # User = "borg"; # Group = "borg"; @@ -43,40 +43,38 @@ let "BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK=yes" "BORG_HOSTNAME_IS_UNIQUE=yes" ]; + + LogRateLimitIntervalSec = 0; }; }; - copyBorg = pkgs.stdenv.mkDerivation (let - packageOverrides = pkgs.callPackage ./pyprctl-packages.nix {}; - inpPython = pkgs.python39.override { inherit packageOverrides; }; - in rec { - name = "copy"; - src = ./copy.py; - - phases = ["buildPhase" "checkPhase" "installPhase"]; - - buildInputs = with pkgs; [makeWrapper]; - - python = inpPython.withPackages (ps: with ps; [humanize tqdm python-dateutil xdg python-unshare pyprctl halo]); - - buildPhase = '' - substitute $src copy \ - --subst-var-by python ${escapeShellArg python} + copyBorg = flakeInputs.mach-nix.lib.${config.nixpkgs.system}.buildPythonPackage rec { + pname = "copy-borg"; + src = ./copy; + version = "0.0.0"; + ignoreDataOutdated = true; + + requirements = '' + humanize + tqdm + python-dateutil + xdg + python-unshare + pyprctl + halo ''; - - doCheck = true; - checkPhase = '' - ${python}/bin/python -m py_compile copy + postInstall = '' + wrapProgram $out/bin/copy_borg \ + --prefix PATH : ${makeBinPath (with pkgs; [util-linux borgbackup])}:${config.security.wrapperDir} ''; - installPhase = '' - install -m 0755 -D -t $out/bin \ - copy + providers.python-unshare = "nixpkgs"; + overridesPre = [ + (self: super: { python-unshare = super.python-unshare.overrideAttrs (oldAttrs: { name = "python-unshare-0.2.1"; version = "0.2.1"; }); }) + ]; - wrapProgram $out/bin/copy \ - --prefix PATH : ${makeBinPath (with pkgs; [util-linux borgbackup])}:${config.security.wrapperDir} - ''; - }); + # _.tomli.buildInputs.add = with pkgs."python3Packages"; [ flit-core ]; + }; in { config = { services.borgsnap = { diff --git a/modules/borgsnap/borgsnap/borgsnap/__main__.py b/modules/borgsnap/borgsnap/borgsnap/__main__.py index 80bf511d..91144780 100644 --- a/modules/borgsnap/borgsnap/borgsnap/__main__.py +++ b/modules/borgsnap/borgsnap/borgsnap/__main__.py @@ -274,12 +274,10 @@ def create(*, snapshot, target, archive_prefix, dry_run): for rfd, event in events: if event & select.POLLIN: if rfd == proc.stdout.fileno(): - line = proc.stdout.readline() - if len(line) > 0: + if line := proc.stdout.readline(): stdout_logger.info(line[:-1]) if rfd == proc.stderr.fileno(): - line = proc.stderr.readline() - if len(line) > 0: + if line := proc.stderr.readline(): stderr_logger.info(line[:-1]) if event & select.POLLHUP: poll.unregister(rfd) -- cgit v1.2.3