From a73133a7c629c76a7a328f0e8d2bb693c46ef45d Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Tue, 1 Nov 2022 21:05:33 +0100 Subject: fix backups --- hosts/vidhar/borg/borgsnap/borgsnap/__main__.py | 355 ++++++++++++++++++------ 1 file changed, 272 insertions(+), 83 deletions(-) (limited to 'hosts/vidhar/borg/borgsnap') diff --git a/hosts/vidhar/borg/borgsnap/borgsnap/__main__.py b/hosts/vidhar/borg/borgsnap/borgsnap/__main__.py index e93e6a60..cd2f1ad2 100644 --- a/hosts/vidhar/borg/borgsnap/borgsnap/__main__.py +++ b/hosts/vidhar/borg/borgsnap/borgsnap/__main__.py @@ -1,12 +1,11 @@ import argparse -import os, sys, signal -from pyprctl import cap_permitted, cap_inheritable, cap_effective, cap_ambient, Cap +import os, sys, signal, io +from pyprctl import CapState, Cap, cap_ambient_raise, cap_ambient_is_set, set_keepcaps from pwd import getpwnam from datetime import datetime, timezone from dateutil.parser import isoparse -from xdg import xdg_runtime_dir import unshare from tempfile import TemporaryDirectory @@ -14,6 +13,9 @@ import logging import json import subprocess +import csv +from collections import namedtuple +from distutils.util import strtobool import pathlib from pathlib import Path @@ -22,21 +24,89 @@ from atomicwrites import atomic_write from traceback import format_exc +from multiprocessing import Process, Manager +from contextlib import closing + +from enum import Enum, auto + +import select +import time +import math + + +PROP_DO_BORGSNAP = 'li.yggdrasil:borgsnap' + + +class DoValue(Enum): + BORGSNAP_DO = auto() + BORGSNAP_KEEP = auto() + BORGSNAP_DONT = auto() + + @classmethod + def from_prop(cls, v: str): + if v.lower() == 'keep': + return cls.BORGSNAP_KEEP + + return cls.BORGSNAP_DO if not v or bool(strtobool(v)) else cls.BORGSNAP_DONT + + @classmethod + def merge(cls, v1, v2): + match (v1, v2): + case (cls.BORGSNAP_DONT, _): + return cls.BORGSNAP_DONT + case (_, cls.BORGSNAP_DONT): + return cls.BORGSNAP_DONT + case (cls.BORGSNAP_KEEP, _): + return cls.BORGSNAP_KEEP + case (_, cls.BORGSNAP_KEEP): + return cls.BORGSNAP_KEEP + case other: + return cls.BORGSNAP_DO + + def returncode(self): + match self: + case self.__class__.BORGSNAP_DO: + return 126 + case self.__class__.BORGSNAP_KEEP: + return 125 + case self.__class__.BORGSNAP_DONT: + return 124 borg_pwd = getpwnam('borg') -def as_borg(caps=set(), cwd=None): - if caps: - cap_permitted.add(*caps) - cap_inheritable.add(*caps) - cap_effective.add(*caps) - cap_ambient.add(*caps) +def as_borg(caps=set()): + global logger + + try: + if caps: + c_state = CapState.get_current() + c_state.permitted.add(*caps) + c_state.set_current() + + logger.debug("before setgid/setuid: cap_permitted=%s", CapState.get_current().permitted) + + set_keepcaps(True) + + os.setgid(borg_pwd.pw_gid) + os.setuid(borg_pwd.pw_uid) - os.setgid(borg_pwd.pw_gid) - os.setuid(borg_pwd.pw_uid) + if caps: + logger.debug("after setgid/setuid: cap_permitted=%s", CapState.get_current().permitted) - if cwd is not None: - os.chdir(cwd) + c_state = CapState.get_current() + c_state.permitted = caps.copy() + c_state.inheritable.add(*caps) + c_state.set_current() + + logger.debug("cap_permitted=%s", CapState.get_current().permitted) + logger.debug("cap_inheritable=%s", CapState.get_current().inheritable) + + for cap in caps: + cap_ambient_raise(cap) + logger.debug("cap_ambient[%s]=%s", cap, cap_ambient_is_set(cap)) + except Exception: + logger.error(format_exc()) + raise def _archive_name(snapshot, target, archive_prefix): @@ -50,13 +120,15 @@ def _archive_basename(snapshot, archive_prefix): return archive_prefix + base_name.replace('-', '--').replace('/', '-') def check(*, snapshot, target, archive_prefix, cache_file): + global logger + archives = None if cache_file: logger.debug('Trying cache...') try: with open(cache_file, mode='r', encoding='utf-8') as fp: archives = set(json.load(fp)) - logger.info('Loaded archive list from cache') + logger.debug('Loaded archive list from cache') except FileNotFoundError: pass @@ -72,76 +144,165 @@ def check(*, snapshot, target, archive_prefix, cache_file): # logger.debug(f'archives: {archives}') _, _, archive_name = _archive_name(snapshot, target, archive_prefix).partition('::') if archive_name in archives: - logger.info(f'{archive_name} found') + logger.info('‘%s’ found', archive_name) return 0 else: - logger.info(f'{archive_name} not found') - return 126 + logger.info('‘%s’ not found', archive_name) + + logger.debug('Checking %s for ‘%s’...', PROP_DO_BORGSNAP, snapshot) + intent = DoValue.BORGSNAP_DO + p = subprocess.run(['zfs', 'get', '-H', '-p', '-o', 'name,value', PROP_DO_BORGSNAP, snapshot], stdout=subprocess.PIPE, text=True, check=True) + reader = csv.DictReader(io.StringIO(p.stdout), fieldnames=['name', 'value'], delimiter='\t', quoting=csv.QUOTE_NONE) + Row = namedtuple('Row', reader.fieldnames) + for row in [Row(**data) for data in reader]: + if not row.value or row.value == '-': + continue + + logger.debug('%s=%s (parsed as %s) for ‘%s’...', PROP_DO_BORGSNAP, row.value, DoValue.from_prop(row.value), row.name) + intent = DoValue.merge(intent, DoValue.from_prop(row.value)) + + match intent: + case DoValue.BORGSNAP_DONT: + logger.warn('%s specifies to ignore, returning accordingly...', PROP_DO_BORGSNAP) + case DoValue.BORGSNAP_KEEP: + logger.info('%s specifies to ignore but keep, returning accordingly...', PROP_DO_BORGSNAP) + case other: + pass + + return intent.returncode() def create(*, snapshot, target, archive_prefix, dry_run): + global logger + basename = _archive_basename(snapshot, archive_prefix) - with TemporaryDirectory(prefix=f'borg-mount_{basename}_', dir=os.environ.get('RUNTIME_DIRECTORY')) as tmpdir: - child = os.fork() - if child == 0: - unshare.unshare(unshare.CLONE_NEWNS) - subprocess.run(['mount', '--make-rprivate', '/'], check=True) - chroot = pathlib.Path(tmpdir) / 'chroot' - upper = pathlib.Path(tmpdir) / 'upper' - work = pathlib.Path(tmpdir) / 'work' - for path in [chroot,upper,work]: - path.mkdir() - subprocess.run(['mount', '-t', 'overlay', 'overlay', '-o', f'lowerdir=/,upperdir={upper},workdir={work}', chroot], check=True) - bindMounts = ['nix', 'run', 'run/secrets.d', 'run/wrappers', 'proc', 'dev', 'sys', pathlib.Path(os.path.expanduser('~')).relative_to('/')] - if os.environ.get('BORG_BASE_DIR'): - bindMounts.append(pathlib.Path(os.environ['BORG_BASE_DIR']).relative_to('/')) - if 'SSH_AUTH_SOCK' in os.environ: - bindMounts.append(pathlib.Path(os.environ['SSH_AUTH_SOCK']).parent.relative_to('/')) - for bindMount in bindMounts: - (chroot / bindMount).mkdir(parents=True,exist_ok=True) - # print(*['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], file=stderr) - subprocess.run(['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], check=True) - os.chroot(chroot) - os.chdir('/') - dir = pathlib.Path('/borg') - dir.mkdir(parents=True,exist_ok=True,mode=0o0750) - os.chown(dir, borg_pwd.pw_uid, borg_pwd.pw_gid) - try: + def do_create(tmpdir_q): + global logger + nonlocal basename, snapshot, target, archive_prefix, dry_run + + tmpdir = tmpdir_q.get() + + unshare.unshare(unshare.CLONE_NEWNS) + subprocess.run(['mount', '--make-rprivate', '/'], check=True) + chroot = pathlib.Path(tmpdir) / 'chroot' + upper = pathlib.Path(tmpdir) / 'upper' + work = pathlib.Path(tmpdir) / 'work' + for path in [chroot,upper,work]: + path.mkdir() + subprocess.run(['mount', '-t', 'overlay', 'overlay', '-o', f'lowerdir=/,upperdir={upper},workdir={work}', chroot], check=True) + bindMounts = ['nix', 'run', 'run/secrets.d', 'run/wrappers', 'proc', 'dev', 'sys', pathlib.Path(os.path.expanduser('~')).relative_to('/')] + if borg_base_dir := os.getenv('BORG_BASE_DIR'): + bindMounts.append(pathlib.Path(borg_base_dir).relative_to('/')) + if ssh_auth_sock := os.getenv('SSH_AUTH_SOCK'): + bindMounts.append(pathlib.Path(ssh_auth_sock).parent.relative_to('/')) + for bindMount in bindMounts: + (chroot / bindMount).mkdir(parents=True,exist_ok=True) + subprocess.run(['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], check=True) + + os.chroot(chroot) + os.chdir('/') + dir = pathlib.Path('/borg') + dir.mkdir(parents=True,exist_ok=True,mode=0o0750) + os.chown(dir, borg_pwd.pw_uid, borg_pwd.pw_gid) + + base_name, _, _ = snapshot.rpartition('@') + type_val = subprocess.run(['zfs', 'get', '-H', '-p', '-o', 'value', 'type', base_name], stdout=subprocess.PIPE, text=True, check=True).stdout.strip() + match type_val: + case 'filesystem': subprocess.run(['mount', '-t', 'zfs', '-o', 'ro', snapshot, dir], check=True) - env = os.environ.copy() - create_args = ['borg', - 'create', - '--lock-wait=600', - '--one-file-system', - '--compression=auto,zstd,10', - '--chunker-params=10,23,16,4095', - '--files-cache=ctime,size', - '--show-rc', - # '--remote-ratelimit=20480', - '--progress', - '--list', - '--filter=AMEi-x?', - '--stats' if not dry_run else '--dry-run' - ] - _, _, ts = snapshot.rpartition('@') - creation_time = isoparse(ts).astimezone(timezone.utc) - create_args += [f'--timestamp={creation_time.strftime("%Y-%m-%dT%H:%M:%S")}'] - env['BORG_FILES_CACHE_SUFFIX'] = basename - create_args += [_archive_name(snapshot, target, archive_prefix), '.'] - print({'create_args': create_args, 'cwd': dir, 'env': env}, file=sys.stderr) - subprocess.run(create_args, stdin=subprocess.DEVNULL, env=env, preexec_fn=lambda: as_borg(caps={CAP.DAC_READ_SEARCH}, cwd=dir), check=True) - # subprocess.run(create_args, stdin=subprocess.DEVNULL, env=env, preexec_fn=lambda: None, cwd=dir, check=True) - finally: - subprocess.run(['umount', dir], check=True) - os._exit(0) + case 'volume': + snapdev_val = subprocess.run(['zfs', 'get', '-H', '-p', '-o', 'value', 'snapdev', base_name], stdout=subprocess.PIPE, text=True, check=True).stdout.strip() + try: + if snapdev_val == 'hidden': + subprocess.run(['zfs', 'set', 'snapdev=visible', base_name], check=True) + subprocess.run(['mount', '-t', 'auto', '-o', 'ro', Path('/dev/zvol') / snapshot, dir], check=True) + finally: + if snapdev_val == 'hidden': + subprocess.run(['zfs', 'inherit', 'snapdev', base_name], check=True) + case other: + raise ValueError(f'‘{base_name}’ is of type ‘{type_val}’') + + env = os.environ.copy() + create_args = ['borg', + 'create', + '--lock-wait=600', + '--one-file-system', + '--exclude-caches', + '--keep-exclude-tags', + '--compression=auto,zstd,10', + '--chunker-params=10,23,16,4095', + '--files-cache=ctime,size', + '--show-rc', + # '--remote-ratelimit=20480', + '--progress', + '--list', + '--filter=AMEi-x?', + '--stats' if not dry_run else '--dry-run', + ] + _, _, ts = snapshot.rpartition('@') + creation_time = isoparse(ts).astimezone(timezone.utc) + create_args += [f'--timestamp={creation_time.strftime("%Y-%m-%dT%H:%M:%S")}'] + env['BORG_FILES_CACHE_SUFFIX'] = basename + archive_name = _archive_name(snapshot, target, archive_prefix) + target_host, _, target_path = target.rpartition(':') + *parents_init, _ = list(Path(target_path).parents) + backup_patterns = [*(map(lambda p: Path('.backup') / f'{target_host}:{p}', [Path(target_path), *parents_init])), Path('.backup') / target_host, Path('.backup')] + for pattern_file in backup_patterns: + if (dir / pattern_file).is_file(): + logger.debug('Found backup patterns at ‘%s’', dir / pattern_file) + create_args += [f'--patterns-from={pattern_file}', archive_name] + break + elif (dir / pattern_file).exists(): + logger.warn('‘%s’ exists but is no file', dir / pattern_file) else: - while True: - waitpid, waitret = os.wait() - if waitret != 0: - sys.exit(waitret) - if waitpid == child: - break - return 0 + logger.debug('No backup patterns exist, checked %s', list(map(lambda pattern_file: str(dir / pattern_file), backup_patterns))) + create_args += [archive_name, '.'] + logger.debug('%s', {'create_args': create_args, 'cwd': dir, 'env': env}) + + with subprocess.Popen(create_args, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env, preexec_fn=lambda: as_borg(caps={Cap.DAC_READ_SEARCH}), cwd=dir, text=True) as proc: + proc_logger = logger.getChild('borg') + + poll = select.poll() + poll.register(proc.stdout, select.POLLIN | select.POLLHUP) + poll.register(proc.stderr, select.POLLIN | select.POLLHUP) + pollc = 2 + events = poll.poll() + while pollc > 0 and len(events) > 0: + for rfd, event in events: + if event & select.POLLIN: + if rfd == proc.stdout.fileno(): + line = proc.stdout.readline() + if len(line) > 0: + proc_logger.info(line[:-1]) + if rfd == proc.stderr.fileno(): + line = proc.stderr.readline() + if len(line) > 0: + proc_logger.info(line[:-1]) + if event & select.POLLHUP: + poll.unregister(rfd) + pollc -= 1 + + if pollc > 0: + events = poll.poll() + + for handler in proc_logger.handlers: + handler.flush() + + ret = proc.wait() + if ret != 0: + raise Exception(f'borg subprocess exited with returncode {ret}') + + with Manager() as manager: + tmpdir_q = manager.Queue(1) + with closing(Process(target=do_create, args=(tmpdir_q,), name='do_create')) as p: + p.start() + + with TemporaryDirectory(prefix=f'borg-mount_{basename}_', dir=os.getenv('RUNTIME_DIRECTORY')) as tmpdir: + tmpdir_q.put(tmpdir) + p.join() + if p.exitcode == 0 and dry_run: + return 125 + return p.exitcode def sigterm(signum, frame): raise SystemExit(128 + signum) @@ -155,6 +316,32 @@ def main(): console_handler.setFormatter( logging.Formatter('[%(levelname)s](%(name)s): %(message)s') ) if sys.stderr.isatty(): console_handler.setFormatter( logging.Formatter('%(asctime)s [%(levelname)s](%(name)s): %(message)s') ) + + burst_max = 10000 + burst = burst_max + last_use = None + inv_rate = 1e6 + def consume_filter(record): + nonlocal burst, burst_max, inv_rate, last_use + + delay = None + while True: + now = time.monotonic_ns() + burst = min(burst_max, burst + math.floor((now - last_use) / inv_rate)) if last_use else burst_max + last_use = now + + if burst > 0: + burst -= 1 + if delay: + delay = now - delay + + return True + + if delay is None: + delay = now + time.sleep(inv_rate / 1e9) + console_handler.addFilter(consume_filter) + logger.addHandler(console_handler) # log uncaught exceptions @@ -167,7 +354,8 @@ def main(): sys.excepthook = log_exceptions parser = argparse.ArgumentParser(prog='borgsnap') - parser.add_argument('--verbose', '-v', action='count', default=0) + parser.add_argument('--verbose', '-v', dest='log_level', action='append_const', const=-1) + parser.add_argument('--quiet', '-q', dest='log_level', action='append_const', const=1) parser.add_argument('--target', metavar='REPO', default='yggdrasil.borgbase:repo') parser.add_argument('--archive-prefix', metavar='REPO', default='yggdrasil.vidhar.') subparsers = parser.add_subparsers() @@ -183,12 +371,13 @@ def main(): create_parser.set_defaults(cmd=create) args = parser.parse_args() - if args.verbose <= 0: - logger.setLevel(logging.WARNING) - elif args.verbose <= 1: - logger.setLevel(logging.INFO) - else: - logger.setLevel(logging.DEBUG) + LOG_LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL] + DEFAULT_LOG_LEVEL = logging.ERROR + log_level = LOG_LEVELS.index(DEFAULT_LOG_LEVEL) + + for adjustment in args.log_level or (): + log_level = min(len(LOG_LEVELS) - 1, max(log_level + adjustment, 0)) + logger.setLevel(LOG_LEVELS[log_level]) cmdArgs = {} for copy in {'target', 'archive_prefix', 'snapshot', 'cache_file', 'dry_run'}: -- cgit v1.2.3