From 7448d3431fcfc05f9b7991e337b02083300a99db Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Tue, 1 Nov 2022 22:43:25 +0100 Subject: ... --- hosts/vidhar/borg/borgsnap/borgsnap/__main__.py | 394 ------------------------ hosts/vidhar/borg/borgsnap/setup.py | 10 - 2 files changed, 404 deletions(-) delete mode 100644 hosts/vidhar/borg/borgsnap/borgsnap/__main__.py delete mode 100644 hosts/vidhar/borg/borgsnap/setup.py (limited to 'hosts/vidhar/borg/borgsnap') diff --git a/hosts/vidhar/borg/borgsnap/borgsnap/__main__.py b/hosts/vidhar/borg/borgsnap/borgsnap/__main__.py deleted file mode 100644 index 15bf50c8..00000000 --- a/hosts/vidhar/borg/borgsnap/borgsnap/__main__.py +++ /dev/null @@ -1,394 +0,0 @@ -import argparse -import os, sys, signal, io -from pyprctl import CapState, Cap, cap_ambient_raise, cap_ambient_is_set, set_keepcaps -from pwd import getpwnam - -from datetime import datetime, timezone -from dateutil.parser import isoparse - -import unshare -from tempfile import TemporaryDirectory - -import logging - -import json -import subprocess -import csv -from collections import namedtuple -from distutils.util import strtobool - -import pathlib -from pathlib import Path - -from atomicwrites import atomic_write - -from traceback import format_exc - -from multiprocessing import Process, Manager -from contextlib import closing - -from enum import Enum, auto - -import select -import time -import math - - -PROP_DO_BORGSNAP = 'li.yggdrasil:borgsnap' - - -class DoValue(Enum): - BORGSNAP_DO = auto() - BORGSNAP_KEEP = auto() - BORGSNAP_DONT = auto() - - @classmethod - def from_prop(cls, v: str): - if v.lower() == 'keep': - return cls.BORGSNAP_KEEP - - return cls.BORGSNAP_DO if not v or bool(strtobool(v)) else cls.BORGSNAP_DONT - - @classmethod - def merge(cls, v1, v2): - match (v1, v2): - case (cls.BORGSNAP_DONT, _): - return cls.BORGSNAP_DONT - case (_, cls.BORGSNAP_DONT): - return cls.BORGSNAP_DONT - case (cls.BORGSNAP_KEEP, _): - return cls.BORGSNAP_KEEP - case (_, cls.BORGSNAP_KEEP): - return cls.BORGSNAP_KEEP - case other: - return cls.BORGSNAP_DO - - def returncode(self): - match self: - case self.__class__.BORGSNAP_DO: - return 126 - case self.__class__.BORGSNAP_KEEP: - return 125 - case self.__class__.BORGSNAP_DONT: - return 124 - -borg_pwd = getpwnam('borg') - -def as_borg(caps=set()): - global logger - - try: - if caps: - c_state = CapState.get_current() - c_state.permitted.add(*caps) - c_state.set_current() - - logger.debug("before setgid/setuid: cap_permitted=%s", CapState.get_current().permitted) - - set_keepcaps(True) - - os.setgid(borg_pwd.pw_gid) - os.setuid(borg_pwd.pw_uid) - - if caps: - logger.debug("after setgid/setuid: cap_permitted=%s", CapState.get_current().permitted) - - c_state = CapState.get_current() - c_state.permitted = caps.copy() - c_state.inheritable.add(*caps) - c_state.set_current() - - logger.debug("cap_permitted=%s", CapState.get_current().permitted) - logger.debug("cap_inheritable=%s", CapState.get_current().inheritable) - - for cap in caps: - cap_ambient_raise(cap) - logger.debug("cap_ambient[%s]=%s", cap, cap_ambient_is_set(cap)) - except Exception: - logger.error(format_exc()) - raise - - -def _archive_name(snapshot, target, archive_prefix): - _, _, ts = snapshot.rpartition('@') - creation_time = isoparse(ts).astimezone(timezone.utc) - archive_name = _archive_basename(snapshot, archive_prefix) - return f'{target}::{archive_name}-{creation_time.strftime("%Y-%m-%dT%H:%M:%S")}' - -def _archive_basename(snapshot, archive_prefix): - base_name, _, _ = snapshot.rpartition('@') - return archive_prefix + base_name.replace('-', '--').replace('/', '-') - -def check(*, snapshot, target, archive_prefix, cache_file): - global logger - - archives = None - if cache_file: - logger.debug('Trying cache...') - try: - with open(cache_file, mode='r', encoding='utf-8') as fp: - archives = set(json.load(fp)) - logger.debug('Loaded archive list from cache') - except FileNotFoundError: - pass - - if not archives: - logger.info('Loading archive list from remote...') - with subprocess.Popen(['borg', 'list', '--info', '--lock-wait=600', '--json', target], stdout=subprocess.PIPE, preexec_fn=lambda: as_borg()) as proc: - archives = set([archive['barchive'] for archive in json.load(proc.stdout)['archives']]) - if cache_file: - logger.debug('Saving archive list to cache...') - with atomic_write(cache_file, mode='w', encoding='utf-8', overwrite=True) as fp: - json.dump(list(archives), fp) - - # logger.debug(f'archives: {archives}') - _, _, archive_name = _archive_name(snapshot, target, archive_prefix).partition('::') - if archive_name in archives: - logger.info('‘%s’ found', archive_name) - return 0 - else: - logger.info('‘%s’ not found', archive_name) - - logger.debug('Checking %s for ‘%s’...', PROP_DO_BORGSNAP, snapshot) - intent = DoValue.BORGSNAP_DO - p = subprocess.run(['zfs', 'get', '-H', '-p', '-o', 'name,value', PROP_DO_BORGSNAP, snapshot], stdout=subprocess.PIPE, text=True, check=True) - reader = csv.DictReader(io.StringIO(p.stdout), fieldnames=['name', 'value'], delimiter='\t', quoting=csv.QUOTE_NONE) - Row = namedtuple('Row', reader.fieldnames) - for row in [Row(**data) for data in reader]: - if not row.value or row.value == '-': - continue - - logger.debug('%s=%s (parsed as %s) for ‘%s’...', PROP_DO_BORGSNAP, row.value, DoValue.from_prop(row.value), row.name) - intent = DoValue.merge(intent, DoValue.from_prop(row.value)) - - match intent: - case DoValue.BORGSNAP_DONT: - logger.warn('%s specifies to ignore, returning accordingly...', PROP_DO_BORGSNAP) - case DoValue.BORGSNAP_KEEP: - logger.info('%s specifies to ignore but keep, returning accordingly...', PROP_DO_BORGSNAP) - case other: - pass - - return intent.returncode() - -def create(*, snapshot, target, archive_prefix, dry_run): - global logger - - basename = _archive_basename(snapshot, archive_prefix) - - def do_create(tmpdir_q): - global logger - nonlocal basename, snapshot, target, archive_prefix, dry_run - - tmpdir = tmpdir_q.get() - - unshare.unshare(unshare.CLONE_NEWNS) - subprocess.run(['mount', '--make-rprivate', '/'], check=True) - chroot = pathlib.Path(tmpdir) / 'chroot' - upper = pathlib.Path(tmpdir) / 'upper' - work = pathlib.Path(tmpdir) / 'work' - for path in [chroot,upper,work]: - path.mkdir() - subprocess.run(['mount', '-t', 'overlay', 'overlay', '-o', f'lowerdir=/,upperdir={upper},workdir={work}', chroot], check=True) - bindMounts = ['nix', 'run', 'run/secrets.d', 'run/wrappers', 'proc', 'dev', 'sys', pathlib.Path(os.path.expanduser('~')).relative_to('/')] - if borg_base_dir := os.getenv('BORG_BASE_DIR'): - bindMounts.append(pathlib.Path(borg_base_dir).relative_to('/')) - if ssh_auth_sock := os.getenv('SSH_AUTH_SOCK'): - bindMounts.append(pathlib.Path(ssh_auth_sock).parent.relative_to('/')) - for bindMount in bindMounts: - (chroot / bindMount).mkdir(parents=True,exist_ok=True) - subprocess.run(['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], check=True) - - os.chroot(chroot) - os.chdir('/') - dir = pathlib.Path('/borg') - dir.mkdir(parents=True,exist_ok=True,mode=0o0750) - os.chown(dir, borg_pwd.pw_uid, borg_pwd.pw_gid) - - base_name, _, _ = snapshot.rpartition('@') - type_val = subprocess.run(['zfs', 'get', '-H', '-p', '-o', 'value', 'type', base_name], stdout=subprocess.PIPE, text=True, check=True).stdout.strip() - match type_val: - case 'filesystem': - subprocess.run(['mount', '-t', 'zfs', '-o', 'ro', snapshot, dir], check=True) - case 'volume': - snapdev_val = subprocess.run(['zfs', 'get', '-H', '-p', '-o', 'value', 'snapdev', base_name], stdout=subprocess.PIPE, text=True, check=True).stdout.strip() - try: - if snapdev_val == 'hidden': - subprocess.run(['zfs', 'set', 'snapdev=visible', base_name], check=True) - subprocess.run(['mount', '-t', 'auto', '-o', 'ro', Path('/dev/zvol') / snapshot, dir], check=True) - finally: - if snapdev_val == 'hidden': - subprocess.run(['zfs', 'inherit', 'snapdev', base_name], check=True) - case other: - raise ValueError(f'‘{base_name}’ is of type ‘{type_val}’') - - env = os.environ.copy() - create_args = ['borg', - 'create', - '--lock-wait=600', - '--one-file-system', - '--exclude-caches', - '--keep-exclude-tags', - '--compression=auto,zstd,10', - '--chunker-params=10,23,16,4095', - '--files-cache=ctime,size', - '--show-rc', - '--upload-buffer=100', - '--upload-ratelimit=20480', - '--progress', - '--list', - '--filter=AMEi-x?', - '--stats' if not dry_run else '--dry-run', - ] - _, _, ts = snapshot.rpartition('@') - creation_time = isoparse(ts).astimezone(timezone.utc) - create_args += [f'--timestamp={creation_time.strftime("%Y-%m-%dT%H:%M:%S")}'] - env['BORG_FILES_CACHE_SUFFIX'] = basename - archive_name = _archive_name(snapshot, target, archive_prefix) - target_host, _, target_path = target.rpartition(':') - *parents_init, _ = list(Path(target_path).parents) - backup_patterns = [*(map(lambda p: Path('.backup') / f'{target_host}:{p}', [Path(target_path), *parents_init])), Path('.backup') / target_host, Path('.backup')] - for pattern_file in backup_patterns: - if (dir / pattern_file).is_file(): - logger.debug('Found backup patterns at ‘%s’', dir / pattern_file) - create_args += [f'--patterns-from={pattern_file}', archive_name] - break - elif (dir / pattern_file).exists(): - logger.warn('‘%s’ exists but is no file', dir / pattern_file) - else: - logger.debug('No backup patterns exist, checked %s', list(map(lambda pattern_file: str(dir / pattern_file), backup_patterns))) - create_args += [archive_name, '.'] - logger.debug('%s', {'create_args': create_args, 'cwd': dir, 'env': env}) - - with subprocess.Popen(create_args, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env, preexec_fn=lambda: as_borg(caps={Cap.DAC_READ_SEARCH}), cwd=dir, text=True) as proc: - proc_logger = logger.getChild('borg') - stdout_logger = proc_logger.getChild('stdout') - stderr_logger = proc_logger.getChild('stderr') - - poll = select.poll() - poll.register(proc.stdout, select.POLLIN | select.POLLHUP) - poll.register(proc.stderr, select.POLLIN | select.POLLHUP) - pollc = 2 - events = poll.poll() - while pollc > 0 and len(events) > 0: - for rfd, event in events: - if event & select.POLLIN: - if rfd == proc.stdout.fileno(): - line = proc.stdout.readline() - if len(line) > 0: - stdout_logger.info(line[:-1]) - if rfd == proc.stderr.fileno(): - line = proc.stderr.readline() - if len(line) > 0: - stderr_logger.info(line[:-1]) - if event & select.POLLHUP: - poll.unregister(rfd) - pollc -= 1 - - if pollc > 0: - events = poll.poll() - - for handler in proc_logger.handlers: - handler.flush() - - ret = proc.wait() - if ret != 0: - raise Exception(f'borg subprocess exited with returncode {ret}') - - with Manager() as manager: - tmpdir_q = manager.Queue(1) - with closing(Process(target=do_create, args=(tmpdir_q,), name='do_create')) as p: - p.start() - - with TemporaryDirectory(prefix=f'borg-mount_{basename}_', dir=os.getenv('RUNTIME_DIRECTORY')) as tmpdir: - tmpdir_q.put(tmpdir) - p.join() - if p.exitcode == 0 and dry_run: - return 125 - return p.exitcode - -def sigterm(signum, frame): - raise SystemExit(128 + signum) - -def main(): - signal.signal(signal.SIGTERM, sigterm) - - global logger - logger = logging.getLogger(__name__) - console_handler = logging.StreamHandler() - console_handler.setFormatter( logging.Formatter('[%(levelname)s](%(name)s): %(message)s') ) - if sys.stderr.isatty(): - console_handler.setFormatter( logging.Formatter('%(asctime)s [%(levelname)s](%(name)s): %(message)s') ) - - burst_max = 10000 - burst = burst_max - last_use = None - inv_rate = 1e6 - def consume_filter(record): - nonlocal burst, burst_max, inv_rate, last_use - - delay = None - while True: - now = time.monotonic_ns() - burst = min(burst_max, burst + math.floor((now - last_use) / inv_rate)) if last_use else burst_max - last_use = now - - if burst > 0: - burst -= 1 - if delay: - delay = now - delay - - return True - - if delay is None: - delay = now - time.sleep(inv_rate / 1e9) - console_handler.addFilter(consume_filter) - - logger.addHandler(console_handler) - - # log uncaught exceptions - def log_exceptions(type, value, tb): - global logger - - logger.error(value) - sys.__excepthook__(type, value, tb) # calls default excepthook - - sys.excepthook = log_exceptions - - parser = argparse.ArgumentParser(prog='borgsnap') - parser.add_argument('--verbose', '-v', dest='log_level', action='append_const', const=-1) - parser.add_argument('--quiet', '-q', dest='log_level', action='append_const', const=1) - parser.add_argument('--target', metavar='REPO', default='yggdrasil.borgbase:repo') - parser.add_argument('--archive-prefix', metavar='REPO', default='yggdrasil.vidhar.') - subparsers = parser.add_subparsers() - subparsers.required = True - parser.set_defaults(cmd=None) - check_parser = subparsers.add_parser('check') - check_parser.add_argument('--cache-file', type=lambda p: Path(p).absolute(), default=None) - check_parser.add_argument('snapshot') - check_parser.set_defaults(cmd=check) - create_parser = subparsers.add_parser('create') - create_parser.add_argument('--dry-run', '-n', action='store_true', default=False) - create_parser.add_argument('snapshot') - create_parser.set_defaults(cmd=create) - args = parser.parse_args() - - LOG_LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL] - DEFAULT_LOG_LEVEL = logging.ERROR - log_level = LOG_LEVELS.index(DEFAULT_LOG_LEVEL) - - for adjustment in args.log_level or (): - log_level = min(len(LOG_LEVELS) - 1, max(log_level + adjustment, 0)) - logger.setLevel(LOG_LEVELS[log_level]) - - cmdArgs = {} - for copy in {'target', 'archive_prefix', 'snapshot', 'cache_file', 'dry_run'}: - if copy in vars(args): - cmdArgs[copy] = vars(args)[copy] - - return args.cmd(**cmdArgs) - - -if __name__ == '__main__': - sys.exit(main()) diff --git a/hosts/vidhar/borg/borgsnap/setup.py b/hosts/vidhar/borg/borgsnap/setup.py deleted file mode 100644 index 76356bfc..00000000 --- a/hosts/vidhar/borg/borgsnap/setup.py +++ /dev/null @@ -1,10 +0,0 @@ -from setuptools import setup - -setup(name='borgsnap', - packages=['borgsnap'], - entry_points={ - 'console_scripts': [ - 'borgsnap=borgsnap.__main__:main', - ], - } -) -- cgit v1.2.3