From 7448d3431fcfc05f9b7991e337b02083300a99db Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Tue, 1 Nov 2022 22:43:25 +0100 Subject: ... --- modules/borgsnap/borgsnap/borgsnap/__main__.py | 395 +++++++++++++++++++++++++ modules/borgsnap/borgsnap/setup.py | 10 + modules/borgsnap/default.nix | 106 +++++++ modules/zfssnap/default.nix | 18 +- modules/zfssnap/zfssnap/zfssnap/__main__.py | 3 + 5 files changed, 530 insertions(+), 2 deletions(-) create mode 100644 modules/borgsnap/borgsnap/borgsnap/__main__.py create mode 100644 modules/borgsnap/borgsnap/setup.py create mode 100644 modules/borgsnap/default.nix (limited to 'modules') diff --git a/modules/borgsnap/borgsnap/borgsnap/__main__.py b/modules/borgsnap/borgsnap/borgsnap/__main__.py new file mode 100644 index 00000000..4b50cf80 --- /dev/null +++ b/modules/borgsnap/borgsnap/borgsnap/__main__.py @@ -0,0 +1,395 @@ +import argparse +import os, sys, signal, io +from pyprctl import CapState, Cap, cap_ambient_raise, cap_ambient_is_set, set_keepcaps +from pwd import getpwnam + +from datetime import datetime, timezone +from dateutil.parser import isoparse + +import unshare +from tempfile import TemporaryDirectory + +import logging + +import json +import subprocess +import csv +from collections import namedtuple +from distutils.util import strtobool + +import pathlib +from pathlib import Path + +from atomicwrites import atomic_write + +from traceback import format_exc + +from multiprocessing import Process, Manager +from contextlib import closing + +from enum import Enum, auto + +import select +import time +import math + + +PROP_DO_BORGSNAP = 'li.yggdrasil:borgsnap' + + +class DoValue(Enum): + BORGSNAP_DO = auto() + BORGSNAP_KEEP = auto() + BORGSNAP_DONT = auto() + + @classmethod + def from_prop(cls, v: str): + if v.lower() == 'keep': + return cls.BORGSNAP_KEEP + + return cls.BORGSNAP_DO if not v or bool(strtobool(v)) else cls.BORGSNAP_DONT + + @classmethod + def merge(cls, v1, v2): + match (v1, v2): + case (cls.BORGSNAP_DONT, _): + return cls.BORGSNAP_DONT + case (_, cls.BORGSNAP_DONT): + return cls.BORGSNAP_DONT + case (cls.BORGSNAP_KEEP, _): + return cls.BORGSNAP_KEEP + case (_, cls.BORGSNAP_KEEP): + return cls.BORGSNAP_KEEP + case other: + return cls.BORGSNAP_DO + + def returncode(self): + match self: + case self.__class__.BORGSNAP_DO: + return 126 + case self.__class__.BORGSNAP_KEEP: + return 125 + case self.__class__.BORGSNAP_DONT: + return 124 + +borg_pwd = getpwnam('borg') + +def as_borg(caps=set()): + global logger + + try: + if caps: + c_state = CapState.get_current() + c_state.permitted.add(*caps) + c_state.set_current() + + logger.debug("before setgid/setuid: cap_permitted=%s", CapState.get_current().permitted) + + set_keepcaps(True) + + os.setgid(borg_pwd.pw_gid) + os.setuid(borg_pwd.pw_uid) + + if caps: + logger.debug("after setgid/setuid: cap_permitted=%s", CapState.get_current().permitted) + + c_state = CapState.get_current() + c_state.permitted = caps.copy() + c_state.inheritable.add(*caps) + c_state.set_current() + + logger.debug("cap_permitted=%s", CapState.get_current().permitted) + logger.debug("cap_inheritable=%s", CapState.get_current().inheritable) + + for cap in caps: + cap_ambient_raise(cap) + logger.debug("cap_ambient[%s]=%s", cap, cap_ambient_is_set(cap)) + except Exception: + logger.error(format_exc()) + raise + + +def _archive_name(snapshot, target, archive_prefix): + _, _, ts = snapshot.rpartition('@') + creation_time = isoparse(ts).astimezone(timezone.utc) + archive_name = _archive_basename(snapshot, archive_prefix) + return f'{target}::{archive_name}-{creation_time.strftime("%Y-%m-%dT%H:%M:%S")}' + +def _archive_basename(snapshot, archive_prefix): + base_name, _, _ = snapshot.rpartition('@') + return archive_prefix + base_name.replace('-', '--').replace('/', '-') + +def check(*, snapshot, target, archive_prefix, cache_file): + global logger + + archives = None + if cache_file: + logger.debug('Trying cache...') + try: + with open(cache_file, mode='r', encoding='utf-8') as fp: + archives = set(json.load(fp)) + logger.debug('Loaded archive list from cache') + except FileNotFoundError: + pass + + if not archives: + logger.info('Loading archive list from remote...') + with subprocess.Popen(['borg', 'list', '--info', '--lock-wait=600', '--json', target], stdout=subprocess.PIPE, preexec_fn=lambda: as_borg()) as proc: + archives = set([archive['barchive'] for archive in json.load(proc.stdout)['archives']]) + if cache_file: + logger.debug('Saving archive list to cache...') + with atomic_write(cache_file, mode='w', encoding='utf-8', overwrite=True) as fp: + json.dump(list(archives), fp) + + # logger.debug(f'archives: {archives}') + _, _, archive_name = _archive_name(snapshot, target, archive_prefix).partition('::') + if archive_name in archives: + logger.info('‘%s’ found', archive_name) + return 0 + else: + logger.info('‘%s’ not found', archive_name) + + logger.debug('Checking %s for ‘%s’...', PROP_DO_BORGSNAP, snapshot) + intent = DoValue.BORGSNAP_DO + p = subprocess.run(['zfs', 'get', '-H', '-p', '-o', 'name,value', PROP_DO_BORGSNAP, snapshot], stdout=subprocess.PIPE, text=True, check=True) + reader = csv.DictReader(io.StringIO(p.stdout), fieldnames=['name', 'value'], delimiter='\t', quoting=csv.QUOTE_NONE) + Row = namedtuple('Row', reader.fieldnames) + for row in [Row(**data) for data in reader]: + if not row.value or row.value == '-': + continue + + logger.debug('%s=%s (parsed as %s) for ‘%s’...', PROP_DO_BORGSNAP, row.value, DoValue.from_prop(row.value), row.name) + intent = DoValue.merge(intent, DoValue.from_prop(row.value)) + + match intent: + case DoValue.BORGSNAP_DONT: + logger.warn('%s specifies to ignore, returning accordingly...', PROP_DO_BORGSNAP) + case DoValue.BORGSNAP_KEEP: + logger.info('%s specifies to ignore but keep, returning accordingly...', PROP_DO_BORGSNAP) + case other: + pass + + return intent.returncode() + +def create(*, snapshot, target, archive_prefix, dry_run): + global logger + + basename = _archive_basename(snapshot, archive_prefix) + + def do_create(tmpdir_q): + global logger + nonlocal basename, snapshot, target, archive_prefix, dry_run + + tmpdir = tmpdir_q.get() + + unshare.unshare(unshare.CLONE_NEWNS) + subprocess.run(['mount', '--make-rprivate', '/'], check=True) + chroot = pathlib.Path(tmpdir) / 'chroot' + upper = pathlib.Path(tmpdir) / 'upper' + work = pathlib.Path(tmpdir) / 'work' + for path in [chroot,upper,work]: + path.mkdir() + subprocess.run(['mount', '-t', 'overlay', 'overlay', '-o', f'lowerdir=/,upperdir={upper},workdir={work}', chroot], check=True) + bindMounts = ['nix', 'run', 'run/secrets.d', 'run/wrappers', 'proc', 'dev', 'sys', pathlib.Path(os.path.expanduser('~')).relative_to('/')] + if borg_base_dir := os.getenv('BORG_BASE_DIR'): + bindMounts.append(pathlib.Path(borg_base_dir).relative_to('/')) + if ssh_auth_sock := os.getenv('SSH_AUTH_SOCK'): + bindMounts.append(pathlib.Path(ssh_auth_sock).parent.relative_to('/')) + for bindMount in bindMounts: + (chroot / bindMount).mkdir(parents=True,exist_ok=True) + subprocess.run(['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], check=True) + + os.chroot(chroot) + os.chdir('/') + dir = pathlib.Path('/borg') + dir.mkdir(parents=True,exist_ok=True,mode=0o0750) + os.chown(dir, borg_pwd.pw_uid, borg_pwd.pw_gid) + + base_name, _, _ = snapshot.rpartition('@') + type_val = subprocess.run(['zfs', 'get', '-H', '-p', '-o', 'value', 'type', base_name], stdout=subprocess.PIPE, text=True, check=True).stdout.strip() + match type_val: + case 'filesystem': + subprocess.run(['mount', '-t', 'zfs', '-o', 'ro', snapshot, dir], check=True) + case 'volume': + snapdev_val = subprocess.run(['zfs', 'get', '-H', '-p', '-o', 'value', 'snapdev', base_name], stdout=subprocess.PIPE, text=True, check=True).stdout.strip() + try: + if snapdev_val == 'hidden': + subprocess.run(['zfs', 'set', 'snapdev=visible', base_name], check=True) + subprocess.run(['mount', '-t', 'auto', '-o', 'ro', Path('/dev/zvol') / snapshot, dir], check=True) + finally: + if snapdev_val == 'hidden': + subprocess.run(['zfs', 'inherit', 'snapdev', base_name], check=True) + case other: + raise ValueError(f'‘{base_name}’ is of type ‘{type_val}’') + + env = os.environ.copy() + create_args = ['borg', + 'create', + '--lock-wait=600', + '--one-file-system', + '--exclude-caches', + '--keep-exclude-tags', + '--compression=auto,zstd,10', + '--chunker-params=10,23,16,4095', + '--files-cache=ctime,size', + '--show-rc', + '--upload-buffer=100', + '--upload-ratelimit=20480', + '--progress', + '--list', + '--filter=AMEi-x?', + '--stats' if not dry_run else '--dry-run', + ] + _, _, ts = snapshot.rpartition('@') + creation_time = isoparse(ts).astimezone(timezone.utc) + create_args += [f'--timestamp={creation_time.strftime("%Y-%m-%dT%H:%M:%S")}'] + env['BORG_FILES_CACHE_SUFFIX'] = basename + archive_name = _archive_name(snapshot, target, archive_prefix) + target_host, _, target_path = target.rpartition(':') + *parents_init, _ = list(Path(target_path).parents) + backup_patterns = [*(map(lambda p: Path('.backup') / f'{target_host}:{p}', [Path(target_path), *parents_init])), Path('.backup') / target_host, Path('.backup')] + for pattern_file in backup_patterns: + if (dir / pattern_file).is_file(): + logger.debug('Found backup patterns at ‘%s’', dir / pattern_file) + create_args += [f'--patterns-from={pattern_file}', archive_name] + break + elif (dir / pattern_file).exists(): + logger.warn('‘%s’ exists but is no file', dir / pattern_file) + else: + logger.debug('No backup patterns exist, checked %s', list(map(lambda pattern_file: str(dir / pattern_file), backup_patterns))) + create_args += [archive_name, '.'] + logger.debug('%s', {'create_args': create_args, 'cwd': dir, 'env': env}) + + with subprocess.Popen(create_args, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env, preexec_fn=lambda: as_borg(caps={Cap.DAC_READ_SEARCH}), cwd=dir, text=True) as proc: + proc_logger = logger.getChild('borg') + stdout_logger = proc_logger.getChild('stdout') + stderr_logger = proc_logger.getChild('stderr') + + poll = select.poll() + poll.register(proc.stdout, select.POLLIN | select.POLLHUP) + poll.register(proc.stderr, select.POLLIN | select.POLLHUP) + pollc = 2 + events = poll.poll() + while pollc > 0 and len(events) > 0: + for rfd, event in events: + if event & select.POLLIN: + if rfd == proc.stdout.fileno(): + line = proc.stdout.readline() + if len(line) > 0: + stdout_logger.info(line[:-1]) + if rfd == proc.stderr.fileno(): + line = proc.stderr.readline() + if len(line) > 0: + stderr_logger.info(line[:-1]) + if event & select.POLLHUP: + poll.unregister(rfd) + pollc -= 1 + + if pollc > 0: + events = poll.poll() + + for handler in proc_logger.handlers: + handler.flush() + + ret = proc.wait() + if ret != 0: + raise Exception(f'borg subprocess exited with returncode {ret}') + + with Manager() as manager: + tmpdir_q = manager.Queue(1) + with closing(Process(target=do_create, args=(tmpdir_q,), name='do_create')) as p: + p.start() + + with TemporaryDirectory(prefix=f'borg-mount_{basename}_', dir=os.getenv('RUNTIME_DIRECTORY')) as tmpdir: + tmpdir_q.put(tmpdir) + p.join() + if p.exitcode == 0 and dry_run: + return 125 + return p.exitcode + +def sigterm(signum, frame): + raise SystemExit(128 + signum) + +def main(): + signal.signal(signal.SIGTERM, sigterm) + + global logger + logger = logging.getLogger(__name__) + console_handler = logging.StreamHandler() + console_handler.setFormatter( logging.Formatter('[%(levelname)s](%(name)s): %(message)s') ) + if sys.stderr.isatty(): + console_handler.setFormatter( logging.Formatter('%(asctime)s [%(levelname)s](%(name)s): %(message)s') ) + + burst_max = 10000 + burst = burst_max + last_use = None + inv_rate = 1e6 + def consume_filter(record): + nonlocal burst, burst_max, inv_rate, last_use + + delay = None + while True: + now = time.monotonic_ns() + burst = min(burst_max, burst + math.floor((now - last_use) / inv_rate)) if last_use else burst_max + last_use = now + + if burst > 0: + burst -= 1 + if delay: + delay = now - delay + + return True + + if delay is None: + delay = now + time.sleep(inv_rate / 1e9) + console_handler.addFilter(consume_filter) + + logger.addHandler(console_handler) + + # log uncaught exceptions + def log_exceptions(type, value, tb): + global logger + + logger.error(value) + sys.__excepthook__(type, value, tb) # calls default excepthook + + sys.excepthook = log_exceptions + + parser = argparse.ArgumentParser(prog='borgsnap') + parser.add_argument('--verbosity', dest='log_level', action='append') + parser.add_argument('--verbose', '-v', dest='log_level', action='append_const', const=-1) + parser.add_argument('--quiet', '-q', dest='log_level', action='append_const', const=1) + parser.add_argument('--target', metavar='REPO', default='yggdrasil.borgbase:repo') + parser.add_argument('--archive-prefix', metavar='REPO', default='yggdrasil.vidhar.') + subparsers = parser.add_subparsers() + subparsers.required = True + parser.set_defaults(cmd=None) + check_parser = subparsers.add_parser('check') + check_parser.add_argument('--cache-file', type=lambda p: Path(p).absolute(), default=None) + check_parser.add_argument('snapshot') + check_parser.set_defaults(cmd=check) + create_parser = subparsers.add_parser('create') + create_parser.add_argument('--dry-run', '-n', action='store_true', default=False) + create_parser.add_argument('snapshot') + create_parser.set_defaults(cmd=create) + args = parser.parse_args() + + LOG_LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL] + DEFAULT_LOG_LEVEL = logging.ERROR + log_level = LOG_LEVELS.index(DEFAULT_LOG_LEVEL) + + for adjustment in args.log_level or (): + log_level = min(len(LOG_LEVELS) - 1, max(log_level + adjustment, 0)) + logger.setLevel(LOG_LEVELS[log_level]) + + cmdArgs = {} + for copy in {'target', 'archive_prefix', 'snapshot', 'cache_file', 'dry_run'}: + if copy in vars(args): + cmdArgs[copy] = vars(args)[copy] + + return args.cmd(**cmdArgs) + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/modules/borgsnap/borgsnap/setup.py b/modules/borgsnap/borgsnap/setup.py new file mode 100644 index 00000000..76356bfc --- /dev/null +++ b/modules/borgsnap/borgsnap/setup.py @@ -0,0 +1,10 @@ +from setuptools import setup + +setup(name='borgsnap', + packages=['borgsnap'], + entry_points={ + 'console_scripts': [ + 'borgsnap=borgsnap.__main__:main', + ], + } +) diff --git a/modules/borgsnap/default.nix b/modules/borgsnap/default.nix new file mode 100644 index 00000000..f4c0eec4 --- /dev/null +++ b/modules/borgsnap/default.nix @@ -0,0 +1,106 @@ +{ config, pkgs, lib, flakeInputs, hostName, ... }: + +with lib; + +let + borgsnap = flakeInputs.mach-nix.lib.${config.nixpkgs.system}.buildPythonPackage rec { + pname = "borgsnap"; + src = ./borgsnap; + version = "0.0.0"; + ignoreDataOutdated = true; + + requirements = '' + atomicwrites + pyprctl + python-unshare + python-dateutil + ''; + postInstall = '' + wrapProgram $out/bin/borgsnap \ + --prefix PATH : ${makeBinPath (with pkgs; [config.boot.zfs.package util-linux borgbackup])}:${config.security.wrapperDir} + ''; + + providers.python-unshare = "nixpkgs"; + overridesPre = [ + (self: super: { python-unshare = super.python-unshare.overrideAttrs (oldAttrs: { name = "python-unshare-0.2.1"; version = "0.2.1"; }); }) + ]; + + _.tomli.buildInputs.add = with pkgs."python3Packages"; [ flit-core ]; + }; + + cfg = config.services.borgsnap; +in { + options = { + services.borgsnap = { + enable = mkEnableOption "borgsnap service"; + + target = mkOption { + type = types.str; + }; + + archive-prefix = mkOption { + type = types.str; + default = "yggdrasil.${hostName}."; + }; + + extraConfig = mkOption { + type = with types; attrsOf str; + default = { + halfweekly = "8"; + monthly = "-1"; + }; + }; + + verbosity = mkOption { + type = types.int; + default = config.services.zfssnap.verbosity; + }; + + sshConfig = mkOption { + type = with types; nullOr str; + default = null; + }; + + keyfile = mkOption { + type = with types; nullOr str; + default = null; + }; + + extraCreateArgs = mkOption { + type = with types; listOf str; + default = []; + }; + extraCheckArgs = mkOption { + type = with types; listOf str; + default = []; + }; + }; + }; + + config = mkIf cfg.enable { + warnings = mkIf (!config.services.zfssnap.enable) [ + "borgsnap will do nothing if zfssnap is not enabled" + ]; + + services.zfssnap.config.exec = { + check = "${borgsnap}/bin/borgsnap --verbosity=${toString cfg.verbosity} --target ${escapeShellArg cfg.target} --archive-prefix ${escapeShellArg cfg.archive-prefix} check --cache-file /run/zfssnap-prune/archives-cache.json ${escapeShellArgs cfg.extraCheckArgs}"; + cmd = "${borgsnap}/bin/borgsnap --verbosity=${toString cfg.verbosity} --target ${escapeShellArg cfg.target} --archive-prefix ${escapeShellArg cfg.archive-prefix} create ${escapeShellArgs cfg.extraCreateArgs}"; + } // cfg.extraConfig; + + systemd.services."zfssnap-prune" = { + serviceConfig = { + Environment = [ + "BORG_BASE_DIR=/var/lib/borg" + "BORG_CONFIG_DIR=/var/lib/borg/config" + "BORG_CACHE_DIR=/var/lib/borg/cache" + "BORG_SECURITY_DIR=/var/lib/borg/security" + "BORG_KEYS_DIR=/var/lib/borg/keys" + "BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK=yes" + "BORG_HOSTNAME_IS_UNIQUE=yes" + ] ++ optional (!(isNull cfg.sshConfig)) "BORG_RSH=\"${pkgs.openssh}/bin/ssh -F ${pkgs.writeText "config" cfg.sshConfig}\"" + ++ optional (!(isNull cfg.keyfile)) "BORG_KEY_FILE=${cfg.keyfile}"; + RuntimeDirectory = "zfssnap-prune"; + }; + }; + }; +} diff --git a/modules/zfssnap/default.nix b/modules/zfssnap/default.nix index f46cd510..f6f32852 100644 --- a/modules/zfssnap/default.nix +++ b/modules/zfssnap/default.nix @@ -48,6 +48,20 @@ in { type = types.str; default = "*-*-* *:00/5:00"; }; + + verbosity = mkOption { + type = types.int; + default = 2; + }; + + extraPruneArgs = mkOption { + type = with types; listOf str; + default = []; + }; + extraAutosnapArgs = mkOption { + type = with types; listOf str; + default = []; + }; }; }; @@ -59,7 +73,7 @@ in { before = [ "zfssnap-prune.service" ]; serviceConfig = { Type = "oneshot"; - ExecStart = "${zfssnap}/bin/zfssnap -vv"; + ExecStart = "${zfssnap}/bin/zfssnap --verbosity=${toString cfg.verbosity} autosnap ${escapeShellArgs cfg.extraAutosnapArgs}"; LogRateLimitIntervalSec = 0; }; @@ -72,7 +86,7 @@ in { ExecStart = let mkSectionName = name: strings.escape [ "[" "]" ] (strings.toUpper name); zfssnapConfig = generators.toINI { inherit mkSectionName; } cfg.config; - in "${zfssnap}/bin/zfssnap -vv prune --config=${pkgs.writeText "zfssnap.ini" zfssnapConfig}"; + in "${zfssnap}/bin/zfssnap --verbosity=${toString cfg.verbosity} prune --config=${pkgs.writeText "zfssnap.ini" zfssnapConfig} ${escapeShellArgs cfg.extraPruneArgs}"; LogRateLimitIntervalSec = 0; }; diff --git a/modules/zfssnap/zfssnap/zfssnap/__main__.py b/modules/zfssnap/zfssnap/zfssnap/__main__.py index 9d07401b..59478c71 100644 --- a/modules/zfssnap/zfssnap/zfssnap/__main__.py +++ b/modules/zfssnap/zfssnap/zfssnap/__main__.py @@ -375,10 +375,13 @@ def main(): sys.excepthook = log_exceptions parser = argparse.ArgumentParser(prog='zfssnap') + parser.add_argument('--verbosity', dest='log_level', action='append') parser.add_argument('--verbose', '-v', dest='log_level', action='append_const', const=-1) parser.add_argument('--quiet', '-q', dest='log_level', action='append_const', const=1) subparsers = parser.add_subparsers() parser.set_defaults(cmd=autosnap) + autosnap_parser = subparsers.add_parser('autosnap') + autosnap_parser.set_defaults(cmd=autosnap) rename_parser = subparsers.add_parser('rename') rename_parser.add_argument('snapshots', nargs='+') rename_parser.add_argument('--destroy', action='store_true', default=False) -- cgit v1.2.3