From 7448d3431fcfc05f9b7991e337b02083300a99db Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Tue, 1 Nov 2022 22:43:25 +0100 Subject: ... --- hosts/vidhar/borg/borgsnap/borgsnap/__main__.py | 394 ----------------------- hosts/vidhar/borg/borgsnap/setup.py | 10 - hosts/vidhar/borg/default.nix | 61 +--- modules/borgsnap/borgsnap/borgsnap/__main__.py | 395 ++++++++++++++++++++++++ modules/borgsnap/borgsnap/setup.py | 10 + modules/borgsnap/default.nix | 106 +++++++ modules/zfssnap/default.nix | 18 +- modules/zfssnap/zfssnap/zfssnap/__main__.py | 3 + 8 files changed, 539 insertions(+), 458 deletions(-) delete mode 100644 hosts/vidhar/borg/borgsnap/borgsnap/__main__.py delete mode 100644 hosts/vidhar/borg/borgsnap/setup.py create mode 100644 modules/borgsnap/borgsnap/borgsnap/__main__.py create mode 100644 modules/borgsnap/borgsnap/setup.py create mode 100644 modules/borgsnap/default.nix diff --git a/hosts/vidhar/borg/borgsnap/borgsnap/__main__.py b/hosts/vidhar/borg/borgsnap/borgsnap/__main__.py deleted file mode 100644 index 15bf50c8..00000000 --- a/hosts/vidhar/borg/borgsnap/borgsnap/__main__.py +++ /dev/null @@ -1,394 +0,0 @@ -import argparse -import os, sys, signal, io -from pyprctl import CapState, Cap, cap_ambient_raise, cap_ambient_is_set, set_keepcaps -from pwd import getpwnam - -from datetime import datetime, timezone -from dateutil.parser import isoparse - -import unshare -from tempfile import TemporaryDirectory - -import logging - -import json -import subprocess -import csv -from collections import namedtuple -from distutils.util import strtobool - -import pathlib -from pathlib import Path - -from atomicwrites import atomic_write - -from traceback import format_exc - -from multiprocessing import Process, Manager -from contextlib import closing - -from enum import Enum, auto - -import select -import time -import math - - -PROP_DO_BORGSNAP = 'li.yggdrasil:borgsnap' - - -class DoValue(Enum): - BORGSNAP_DO = auto() - BORGSNAP_KEEP = auto() - BORGSNAP_DONT = auto() - - @classmethod - def from_prop(cls, v: str): - if v.lower() == 'keep': - return cls.BORGSNAP_KEEP - - return cls.BORGSNAP_DO if not v or bool(strtobool(v)) else cls.BORGSNAP_DONT - - @classmethod - def merge(cls, v1, v2): - match (v1, v2): - case (cls.BORGSNAP_DONT, _): - return cls.BORGSNAP_DONT - case (_, cls.BORGSNAP_DONT): - return cls.BORGSNAP_DONT - case (cls.BORGSNAP_KEEP, _): - return cls.BORGSNAP_KEEP - case (_, cls.BORGSNAP_KEEP): - return cls.BORGSNAP_KEEP - case other: - return cls.BORGSNAP_DO - - def returncode(self): - match self: - case self.__class__.BORGSNAP_DO: - return 126 - case self.__class__.BORGSNAP_KEEP: - return 125 - case self.__class__.BORGSNAP_DONT: - return 124 - -borg_pwd = getpwnam('borg') - -def as_borg(caps=set()): - global logger - - try: - if caps: - c_state = CapState.get_current() - c_state.permitted.add(*caps) - c_state.set_current() - - logger.debug("before setgid/setuid: cap_permitted=%s", CapState.get_current().permitted) - - set_keepcaps(True) - - os.setgid(borg_pwd.pw_gid) - os.setuid(borg_pwd.pw_uid) - - if caps: - logger.debug("after setgid/setuid: cap_permitted=%s", CapState.get_current().permitted) - - c_state = CapState.get_current() - c_state.permitted = caps.copy() - c_state.inheritable.add(*caps) - c_state.set_current() - - logger.debug("cap_permitted=%s", CapState.get_current().permitted) - logger.debug("cap_inheritable=%s", CapState.get_current().inheritable) - - for cap in caps: - cap_ambient_raise(cap) - logger.debug("cap_ambient[%s]=%s", cap, cap_ambient_is_set(cap)) - except Exception: - logger.error(format_exc()) - raise - - -def _archive_name(snapshot, target, archive_prefix): - _, _, ts = snapshot.rpartition('@') - creation_time = isoparse(ts).astimezone(timezone.utc) - archive_name = _archive_basename(snapshot, archive_prefix) - return f'{target}::{archive_name}-{creation_time.strftime("%Y-%m-%dT%H:%M:%S")}' - -def _archive_basename(snapshot, archive_prefix): - base_name, _, _ = snapshot.rpartition('@') - return archive_prefix + base_name.replace('-', '--').replace('/', '-') - -def check(*, snapshot, target, archive_prefix, cache_file): - global logger - - archives = None - if cache_file: - logger.debug('Trying cache...') - try: - with open(cache_file, mode='r', encoding='utf-8') as fp: - archives = set(json.load(fp)) - logger.debug('Loaded archive list from cache') - except FileNotFoundError: - pass - - if not archives: - logger.info('Loading archive list from remote...') - with subprocess.Popen(['borg', 'list', '--info', '--lock-wait=600', '--json', target], stdout=subprocess.PIPE, preexec_fn=lambda: as_borg()) as proc: - archives = set([archive['barchive'] for archive in json.load(proc.stdout)['archives']]) - if cache_file: - logger.debug('Saving archive list to cache...') - with atomic_write(cache_file, mode='w', encoding='utf-8', overwrite=True) as fp: - json.dump(list(archives), fp) - - # logger.debug(f'archives: {archives}') - _, _, archive_name = _archive_name(snapshot, target, archive_prefix).partition('::') - if archive_name in archives: - logger.info('‘%s’ found', archive_name) - return 0 - else: - logger.info('‘%s’ not found', archive_name) - - logger.debug('Checking %s for ‘%s’...', PROP_DO_BORGSNAP, snapshot) - intent = DoValue.BORGSNAP_DO - p = subprocess.run(['zfs', 'get', '-H', '-p', '-o', 'name,value', PROP_DO_BORGSNAP, snapshot], stdout=subprocess.PIPE, text=True, check=True) - reader = csv.DictReader(io.StringIO(p.stdout), fieldnames=['name', 'value'], delimiter='\t', quoting=csv.QUOTE_NONE) - Row = namedtuple('Row', reader.fieldnames) - for row in [Row(**data) for data in reader]: - if not row.value or row.value == '-': - continue - - logger.debug('%s=%s (parsed as %s) for ‘%s’...', PROP_DO_BORGSNAP, row.value, DoValue.from_prop(row.value), row.name) - intent = DoValue.merge(intent, DoValue.from_prop(row.value)) - - match intent: - case DoValue.BORGSNAP_DONT: - logger.warn('%s specifies to ignore, returning accordingly...', PROP_DO_BORGSNAP) - case DoValue.BORGSNAP_KEEP: - logger.info('%s specifies to ignore but keep, returning accordingly...', PROP_DO_BORGSNAP) - case other: - pass - - return intent.returncode() - -def create(*, snapshot, target, archive_prefix, dry_run): - global logger - - basename = _archive_basename(snapshot, archive_prefix) - - def do_create(tmpdir_q): - global logger - nonlocal basename, snapshot, target, archive_prefix, dry_run - - tmpdir = tmpdir_q.get() - - unshare.unshare(unshare.CLONE_NEWNS) - subprocess.run(['mount', '--make-rprivate', '/'], check=True) - chroot = pathlib.Path(tmpdir) / 'chroot' - upper = pathlib.Path(tmpdir) / 'upper' - work = pathlib.Path(tmpdir) / 'work' - for path in [chroot,upper,work]: - path.mkdir() - subprocess.run(['mount', '-t', 'overlay', 'overlay', '-o', f'lowerdir=/,upperdir={upper},workdir={work}', chroot], check=True) - bindMounts = ['nix', 'run', 'run/secrets.d', 'run/wrappers', 'proc', 'dev', 'sys', pathlib.Path(os.path.expanduser('~')).relative_to('/')] - if borg_base_dir := os.getenv('BORG_BASE_DIR'): - bindMounts.append(pathlib.Path(borg_base_dir).relative_to('/')) - if ssh_auth_sock := os.getenv('SSH_AUTH_SOCK'): - bindMounts.append(pathlib.Path(ssh_auth_sock).parent.relative_to('/')) - for bindMount in bindMounts: - (chroot / bindMount).mkdir(parents=True,exist_ok=True) - subprocess.run(['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], check=True) - - os.chroot(chroot) - os.chdir('/') - dir = pathlib.Path('/borg') - dir.mkdir(parents=True,exist_ok=True,mode=0o0750) - os.chown(dir, borg_pwd.pw_uid, borg_pwd.pw_gid) - - base_name, _, _ = snapshot.rpartition('@') - type_val = subprocess.run(['zfs', 'get', '-H', '-p', '-o', 'value', 'type', base_name], stdout=subprocess.PIPE, text=True, check=True).stdout.strip() - match type_val: - case 'filesystem': - subprocess.run(['mount', '-t', 'zfs', '-o', 'ro', snapshot, dir], check=True) - case 'volume': - snapdev_val = subprocess.run(['zfs', 'get', '-H', '-p', '-o', 'value', 'snapdev', base_name], stdout=subprocess.PIPE, text=True, check=True).stdout.strip() - try: - if snapdev_val == 'hidden': - subprocess.run(['zfs', 'set', 'snapdev=visible', base_name], check=True) - subprocess.run(['mount', '-t', 'auto', '-o', 'ro', Path('/dev/zvol') / snapshot, dir], check=True) - finally: - if snapdev_val == 'hidden': - subprocess.run(['zfs', 'inherit', 'snapdev', base_name], check=True) - case other: - raise ValueError(f'‘{base_name}’ is of type ‘{type_val}’') - - env = os.environ.copy() - create_args = ['borg', - 'create', - '--lock-wait=600', - '--one-file-system', - '--exclude-caches', - '--keep-exclude-tags', - '--compression=auto,zstd,10', - '--chunker-params=10,23,16,4095', - '--files-cache=ctime,size', - '--show-rc', - '--upload-buffer=100', - '--upload-ratelimit=20480', - '--progress', - '--list', - '--filter=AMEi-x?', - '--stats' if not dry_run else '--dry-run', - ] - _, _, ts = snapshot.rpartition('@') - creation_time = isoparse(ts).astimezone(timezone.utc) - create_args += [f'--timestamp={creation_time.strftime("%Y-%m-%dT%H:%M:%S")}'] - env['BORG_FILES_CACHE_SUFFIX'] = basename - archive_name = _archive_name(snapshot, target, archive_prefix) - target_host, _, target_path = target.rpartition(':') - *parents_init, _ = list(Path(target_path).parents) - backup_patterns = [*(map(lambda p: Path('.backup') / f'{target_host}:{p}', [Path(target_path), *parents_init])), Path('.backup') / target_host, Path('.backup')] - for pattern_file in backup_patterns: - if (dir / pattern_file).is_file(): - logger.debug('Found backup patterns at ‘%s’', dir / pattern_file) - create_args += [f'--patterns-from={pattern_file}', archive_name] - break - elif (dir / pattern_file).exists(): - logger.warn('‘%s’ exists but is no file', dir / pattern_file) - else: - logger.debug('No backup patterns exist, checked %s', list(map(lambda pattern_file: str(dir / pattern_file), backup_patterns))) - create_args += [archive_name, '.'] - logger.debug('%s', {'create_args': create_args, 'cwd': dir, 'env': env}) - - with subprocess.Popen(create_args, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env, preexec_fn=lambda: as_borg(caps={Cap.DAC_READ_SEARCH}), cwd=dir, text=True) as proc: - proc_logger = logger.getChild('borg') - stdout_logger = proc_logger.getChild('stdout') - stderr_logger = proc_logger.getChild('stderr') - - poll = select.poll() - poll.register(proc.stdout, select.POLLIN | select.POLLHUP) - poll.register(proc.stderr, select.POLLIN | select.POLLHUP) - pollc = 2 - events = poll.poll() - while pollc > 0 and len(events) > 0: - for rfd, event in events: - if event & select.POLLIN: - if rfd == proc.stdout.fileno(): - line = proc.stdout.readline() - if len(line) > 0: - stdout_logger.info(line[:-1]) - if rfd == proc.stderr.fileno(): - line = proc.stderr.readline() - if len(line) > 0: - stderr_logger.info(line[:-1]) - if event & select.POLLHUP: - poll.unregister(rfd) - pollc -= 1 - - if pollc > 0: - events = poll.poll() - - for handler in proc_logger.handlers: - handler.flush() - - ret = proc.wait() - if ret != 0: - raise Exception(f'borg subprocess exited with returncode {ret}') - - with Manager() as manager: - tmpdir_q = manager.Queue(1) - with closing(Process(target=do_create, args=(tmpdir_q,), name='do_create')) as p: - p.start() - - with TemporaryDirectory(prefix=f'borg-mount_{basename}_', dir=os.getenv('RUNTIME_DIRECTORY')) as tmpdir: - tmpdir_q.put(tmpdir) - p.join() - if p.exitcode == 0 and dry_run: - return 125 - return p.exitcode - -def sigterm(signum, frame): - raise SystemExit(128 + signum) - -def main(): - signal.signal(signal.SIGTERM, sigterm) - - global logger - logger = logging.getLogger(__name__) - console_handler = logging.StreamHandler() - console_handler.setFormatter( logging.Formatter('[%(levelname)s](%(name)s): %(message)s') ) - if sys.stderr.isatty(): - console_handler.setFormatter( logging.Formatter('%(asctime)s [%(levelname)s](%(name)s): %(message)s') ) - - burst_max = 10000 - burst = burst_max - last_use = None - inv_rate = 1e6 - def consume_filter(record): - nonlocal burst, burst_max, inv_rate, last_use - - delay = None - while True: - now = time.monotonic_ns() - burst = min(burst_max, burst + math.floor((now - last_use) / inv_rate)) if last_use else burst_max - last_use = now - - if burst > 0: - burst -= 1 - if delay: - delay = now - delay - - return True - - if delay is None: - delay = now - time.sleep(inv_rate / 1e9) - console_handler.addFilter(consume_filter) - - logger.addHandler(console_handler) - - # log uncaught exceptions - def log_exceptions(type, value, tb): - global logger - - logger.error(value) - sys.__excepthook__(type, value, tb) # calls default excepthook - - sys.excepthook = log_exceptions - - parser = argparse.ArgumentParser(prog='borgsnap') - parser.add_argument('--verbose', '-v', dest='log_level', action='append_const', const=-1) - parser.add_argument('--quiet', '-q', dest='log_level', action='append_const', const=1) - parser.add_argument('--target', metavar='REPO', default='yggdrasil.borgbase:repo') - parser.add_argument('--archive-prefix', metavar='REPO', default='yggdrasil.vidhar.') - subparsers = parser.add_subparsers() - subparsers.required = True - parser.set_defaults(cmd=None) - check_parser = subparsers.add_parser('check') - check_parser.add_argument('--cache-file', type=lambda p: Path(p).absolute(), default=None) - check_parser.add_argument('snapshot') - check_parser.set_defaults(cmd=check) - create_parser = subparsers.add_parser('create') - create_parser.add_argument('--dry-run', '-n', action='store_true', default=False) - create_parser.add_argument('snapshot') - create_parser.set_defaults(cmd=create) - args = parser.parse_args() - - LOG_LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL] - DEFAULT_LOG_LEVEL = logging.ERROR - log_level = LOG_LEVELS.index(DEFAULT_LOG_LEVEL) - - for adjustment in args.log_level or (): - log_level = min(len(LOG_LEVELS) - 1, max(log_level + adjustment, 0)) - logger.setLevel(LOG_LEVELS[log_level]) - - cmdArgs = {} - for copy in {'target', 'archive_prefix', 'snapshot', 'cache_file', 'dry_run'}: - if copy in vars(args): - cmdArgs[copy] = vars(args)[copy] - - return args.cmd(**cmdArgs) - - -if __name__ == '__main__': - sys.exit(main()) diff --git a/hosts/vidhar/borg/borgsnap/setup.py b/hosts/vidhar/borg/borgsnap/setup.py deleted file mode 100644 index 76356bfc..00000000 --- a/hosts/vidhar/borg/borgsnap/setup.py +++ /dev/null @@ -1,10 +0,0 @@ -from setuptools import setup - -setup(name='borgsnap', - packages=['borgsnap'], - entry_points={ - 'console_scripts': [ - 'borgsnap=borgsnap.__main__:main', - ], - } -) diff --git a/hosts/vidhar/borg/default.nix b/hosts/vidhar/borg/default.nix index fe62c956..8d0b46ef 100644 --- a/hosts/vidhar/borg/default.nix +++ b/hosts/vidhar/borg/default.nix @@ -3,7 +3,7 @@ with lib; let - sshConfig = pkgs.writeText "config" '' + sshConfig = '' Include /etc/ssh/ssh_config ControlMaster auto @@ -33,7 +33,7 @@ let # StateDirectory = "borg"; RuntimeDirectory = "copy-borg"; Environment = [ - "BORG_RSH=\"${pkgs.openssh}/bin/ssh -F ${sshConfig}\"" + "BORG_RSH=\"${pkgs.openssh}/bin/ssh -F ${pkgs.writeText "config" sshConfig}\"" "BORG_BASE_DIR=/var/lib/borg" "BORG_CONFIG_DIR=/var/lib/borg/config" "BORG_CACHE_DIR=/var/lib/borg/cache" @@ -74,63 +74,20 @@ let copy wrapProgram $out/bin/copy \ - --prefix PATH : ${makeBinPath (with pkgs; [config.boot.zfs.package util-linux borgbackup])}:${config.security.wrapperDir} - ''; - }); - - borgsnap = flakeInputs.mach-nix.lib.${config.nixpkgs.system}.buildPythonPackage rec { - pname = "borgsnap"; - src = ./borgsnap; - version = "0.0.0"; - ignoreDataOutdated = true; - - requirements = '' - atomicwrites - pyprctl - python-unshare - python-dateutil - ''; - postInstall = '' - wrapProgram $out/bin/borgsnap \ --prefix PATH : ${makeBinPath (with pkgs; [util-linux borgbackup])}:${config.security.wrapperDir} ''; - - providers.python-unshare = "nixpkgs"; - overridesPre = [ - (self: super: { python-unshare = super.python-unshare.overrideAttrs (oldAttrs: { name = "python-unshare-0.2.1"; version = "0.2.1"; }); }) - ]; - - _.tomli.buildInputs.add = with pkgs."python3Packages"; [ flit-core ]; - }; + }); in { config = { - services.zfssnap.config.exec = { - check = "${borgsnap}/bin/borgsnap -vv --target yggdrasil.borgbase:repo --archive-prefix yggdrasil.vidhar. check --cache-file /run/zfssnap-prune/archives-cache.json"; - cmd = "${borgsnap}/bin/borgsnap -vv --target yggdrasil.borgbase:repo --archive-prefix yggdrasil.vidhar. create"; + services.borgsnap = { + enable = true; - halfweekly = "8"; - monthly = "-1"; + target = "yggdrasil.borgbase:repo"; + inherit sshConfig; + keyfile = config.sops.secrets."yggdrasil.borgkey".path; }; - systemd.services = { - "zfssnap-prune" = { - serviceConfig = { - Environment = [ - "BORG_RSH=\"${pkgs.openssh}/bin/ssh -F ${sshConfig}\"" - "BORG_BASE_DIR=/var/lib/borg" - "BORG_CONFIG_DIR=/var/lib/borg/config" - "BORG_CACHE_DIR=/var/lib/borg/cache" - "BORG_SECURITY_DIR=/var/lib/borg/security" - "BORG_KEYS_DIR=/var/lib/borg/keys" - "BORG_KEY_FILE=${config.sops.secrets."yggdrasil.borgkey".path}" - "BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK=yes" - "BORG_HOSTNAME_IS_UNIQUE=yes" - ]; - RuntimeDirectory = "zfssnap-prune"; - }; - }; - } // listToAttrs (map copyService [{ repo = "/srv/backup/borg/jotnar"; repoEscaped = "srv-backup-borg-jotnar"; }]); - + systemd.services = listToAttrs (map copyService [{ repo = "/srv/backup/borg/jotnar"; repoEscaped = "srv-backup-borg-jotnar"; }]); services.borgbackup.repos.jotnar = { path = "/srv/backup/borg/jotnar"; diff --git a/modules/borgsnap/borgsnap/borgsnap/__main__.py b/modules/borgsnap/borgsnap/borgsnap/__main__.py new file mode 100644 index 00000000..4b50cf80 --- /dev/null +++ b/modules/borgsnap/borgsnap/borgsnap/__main__.py @@ -0,0 +1,395 @@ +import argparse +import os, sys, signal, io +from pyprctl import CapState, Cap, cap_ambient_raise, cap_ambient_is_set, set_keepcaps +from pwd import getpwnam + +from datetime import datetime, timezone +from dateutil.parser import isoparse + +import unshare +from tempfile import TemporaryDirectory + +import logging + +import json +import subprocess +import csv +from collections import namedtuple +from distutils.util import strtobool + +import pathlib +from pathlib import Path + +from atomicwrites import atomic_write + +from traceback import format_exc + +from multiprocessing import Process, Manager +from contextlib import closing + +from enum import Enum, auto + +import select +import time +import math + + +PROP_DO_BORGSNAP = 'li.yggdrasil:borgsnap' + + +class DoValue(Enum): + BORGSNAP_DO = auto() + BORGSNAP_KEEP = auto() + BORGSNAP_DONT = auto() + + @classmethod + def from_prop(cls, v: str): + if v.lower() == 'keep': + return cls.BORGSNAP_KEEP + + return cls.BORGSNAP_DO if not v or bool(strtobool(v)) else cls.BORGSNAP_DONT + + @classmethod + def merge(cls, v1, v2): + match (v1, v2): + case (cls.BORGSNAP_DONT, _): + return cls.BORGSNAP_DONT + case (_, cls.BORGSNAP_DONT): + return cls.BORGSNAP_DONT + case (cls.BORGSNAP_KEEP, _): + return cls.BORGSNAP_KEEP + case (_, cls.BORGSNAP_KEEP): + return cls.BORGSNAP_KEEP + case other: + return cls.BORGSNAP_DO + + def returncode(self): + match self: + case self.__class__.BORGSNAP_DO: + return 126 + case self.__class__.BORGSNAP_KEEP: + return 125 + case self.__class__.BORGSNAP_DONT: + return 124 + +borg_pwd = getpwnam('borg') + +def as_borg(caps=set()): + global logger + + try: + if caps: + c_state = CapState.get_current() + c_state.permitted.add(*caps) + c_state.set_current() + + logger.debug("before setgid/setuid: cap_permitted=%s", CapState.get_current().permitted) + + set_keepcaps(True) + + os.setgid(borg_pwd.pw_gid) + os.setuid(borg_pwd.pw_uid) + + if caps: + logger.debug("after setgid/setuid: cap_permitted=%s", CapState.get_current().permitted) + + c_state = CapState.get_current() + c_state.permitted = caps.copy() + c_state.inheritable.add(*caps) + c_state.set_current() + + logger.debug("cap_permitted=%s", CapState.get_current().permitted) + logger.debug("cap_inheritable=%s", CapState.get_current().inheritable) + + for cap in caps: + cap_ambient_raise(cap) + logger.debug("cap_ambient[%s]=%s", cap, cap_ambient_is_set(cap)) + except Exception: + logger.error(format_exc()) + raise + + +def _archive_name(snapshot, target, archive_prefix): + _, _, ts = snapshot.rpartition('@') + creation_time = isoparse(ts).astimezone(timezone.utc) + archive_name = _archive_basename(snapshot, archive_prefix) + return f'{target}::{archive_name}-{creation_time.strftime("%Y-%m-%dT%H:%M:%S")}' + +def _archive_basename(snapshot, archive_prefix): + base_name, _, _ = snapshot.rpartition('@') + return archive_prefix + base_name.replace('-', '--').replace('/', '-') + +def check(*, snapshot, target, archive_prefix, cache_file): + global logger + + archives = None + if cache_file: + logger.debug('Trying cache...') + try: + with open(cache_file, mode='r', encoding='utf-8') as fp: + archives = set(json.load(fp)) + logger.debug('Loaded archive list from cache') + except FileNotFoundError: + pass + + if not archives: + logger.info('Loading archive list from remote...') + with subprocess.Popen(['borg', 'list', '--info', '--lock-wait=600', '--json', target], stdout=subprocess.PIPE, preexec_fn=lambda: as_borg()) as proc: + archives = set([archive['barchive'] for archive in json.load(proc.stdout)['archives']]) + if cache_file: + logger.debug('Saving archive list to cache...') + with atomic_write(cache_file, mode='w', encoding='utf-8', overwrite=True) as fp: + json.dump(list(archives), fp) + + # logger.debug(f'archives: {archives}') + _, _, archive_name = _archive_name(snapshot, target, archive_prefix).partition('::') + if archive_name in archives: + logger.info('‘%s’ found', archive_name) + return 0 + else: + logger.info('‘%s’ not found', archive_name) + + logger.debug('Checking %s for ‘%s’...', PROP_DO_BORGSNAP, snapshot) + intent = DoValue.BORGSNAP_DO + p = subprocess.run(['zfs', 'get', '-H', '-p', '-o', 'name,value', PROP_DO_BORGSNAP, snapshot], stdout=subprocess.PIPE, text=True, check=True) + reader = csv.DictReader(io.StringIO(p.stdout), fieldnames=['name', 'value'], delimiter='\t', quoting=csv.QUOTE_NONE) + Row = namedtuple('Row', reader.fieldnames) + for row in [Row(**data) for data in reader]: + if not row.value or row.value == '-': + continue + + logger.debug('%s=%s (parsed as %s) for ‘%s’...', PROP_DO_BORGSNAP, row.value, DoValue.from_prop(row.value), row.name) + intent = DoValue.merge(intent, DoValue.from_prop(row.value)) + + match intent: + case DoValue.BORGSNAP_DONT: + logger.warn('%s specifies to ignore, returning accordingly...', PROP_DO_BORGSNAP) + case DoValue.BORGSNAP_KEEP: + logger.info('%s specifies to ignore but keep, returning accordingly...', PROP_DO_BORGSNAP) + case other: + pass + + return intent.returncode() + +def create(*, snapshot, target, archive_prefix, dry_run): + global logger + + basename = _archive_basename(snapshot, archive_prefix) + + def do_create(tmpdir_q): + global logger + nonlocal basename, snapshot, target, archive_prefix, dry_run + + tmpdir = tmpdir_q.get() + + unshare.unshare(unshare.CLONE_NEWNS) + subprocess.run(['mount', '--make-rprivate', '/'], check=True) + chroot = pathlib.Path(tmpdir) / 'chroot' + upper = pathlib.Path(tmpdir) / 'upper' + work = pathlib.Path(tmpdir) / 'work' + for path in [chroot,upper,work]: + path.mkdir() + subprocess.run(['mount', '-t', 'overlay', 'overlay', '-o', f'lowerdir=/,upperdir={upper},workdir={work}', chroot], check=True) + bindMounts = ['nix', 'run', 'run/secrets.d', 'run/wrappers', 'proc', 'dev', 'sys', pathlib.Path(os.path.expanduser('~')).relative_to('/')] + if borg_base_dir := os.getenv('BORG_BASE_DIR'): + bindMounts.append(pathlib.Path(borg_base_dir).relative_to('/')) + if ssh_auth_sock := os.getenv('SSH_AUTH_SOCK'): + bindMounts.append(pathlib.Path(ssh_auth_sock).parent.relative_to('/')) + for bindMount in bindMounts: + (chroot / bindMount).mkdir(parents=True,exist_ok=True) + subprocess.run(['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], check=True) + + os.chroot(chroot) + os.chdir('/') + dir = pathlib.Path('/borg') + dir.mkdir(parents=True,exist_ok=True,mode=0o0750) + os.chown(dir, borg_pwd.pw_uid, borg_pwd.pw_gid) + + base_name, _, _ = snapshot.rpartition('@') + type_val = subprocess.run(['zfs', 'get', '-H', '-p', '-o', 'value', 'type', base_name], stdout=subprocess.PIPE, text=True, check=True).stdout.strip() + match type_val: + case 'filesystem': + subprocess.run(['mount', '-t', 'zfs', '-o', 'ro', snapshot, dir], check=True) + case 'volume': + snapdev_val = subprocess.run(['zfs', 'get', '-H', '-p', '-o', 'value', 'snapdev', base_name], stdout=subprocess.PIPE, text=True, check=True).stdout.strip() + try: + if snapdev_val == 'hidden': + subprocess.run(['zfs', 'set', 'snapdev=visible', base_name], check=True) + subprocess.run(['mount', '-t', 'auto', '-o', 'ro', Path('/dev/zvol') / snapshot, dir], check=True) + finally: + if snapdev_val == 'hidden': + subprocess.run(['zfs', 'inherit', 'snapdev', base_name], check=True) + case other: + raise ValueError(f'‘{base_name}’ is of type ‘{type_val}’') + + env = os.environ.copy() + create_args = ['borg', + 'create', + '--lock-wait=600', + '--one-file-system', + '--exclude-caches', + '--keep-exclude-tags', + '--compression=auto,zstd,10', + '--chunker-params=10,23,16,4095', + '--files-cache=ctime,size', + '--show-rc', + '--upload-buffer=100', + '--upload-ratelimit=20480', + '--progress', + '--list', + '--filter=AMEi-x?', + '--stats' if not dry_run else '--dry-run', + ] + _, _, ts = snapshot.rpartition('@') + creation_time = isoparse(ts).astimezone(timezone.utc) + create_args += [f'--timestamp={creation_time.strftime("%Y-%m-%dT%H:%M:%S")}'] + env['BORG_FILES_CACHE_SUFFIX'] = basename + archive_name = _archive_name(snapshot, target, archive_prefix) + target_host, _, target_path = target.rpartition(':') + *parents_init, _ = list(Path(target_path).parents) + backup_patterns = [*(map(lambda p: Path('.backup') / f'{target_host}:{p}', [Path(target_path), *parents_init])), Path('.backup') / target_host, Path('.backup')] + for pattern_file in backup_patterns: + if (dir / pattern_file).is_file(): + logger.debug('Found backup patterns at ‘%s’', dir / pattern_file) + create_args += [f'--patterns-from={pattern_file}', archive_name] + break + elif (dir / pattern_file).exists(): + logger.warn('‘%s’ exists but is no file', dir / pattern_file) + else: + logger.debug('No backup patterns exist, checked %s', list(map(lambda pattern_file: str(dir / pattern_file), backup_patterns))) + create_args += [archive_name, '.'] + logger.debug('%s', {'create_args': create_args, 'cwd': dir, 'env': env}) + + with subprocess.Popen(create_args, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env, preexec_fn=lambda: as_borg(caps={Cap.DAC_READ_SEARCH}), cwd=dir, text=True) as proc: + proc_logger = logger.getChild('borg') + stdout_logger = proc_logger.getChild('stdout') + stderr_logger = proc_logger.getChild('stderr') + + poll = select.poll() + poll.register(proc.stdout, select.POLLIN | select.POLLHUP) + poll.register(proc.stderr, select.POLLIN | select.POLLHUP) + pollc = 2 + events = poll.poll() + while pollc > 0 and len(events) > 0: + for rfd, event in events: + if event & select.POLLIN: + if rfd == proc.stdout.fileno(): + line = proc.stdout.readline() + if len(line) > 0: + stdout_logger.info(line[:-1]) + if rfd == proc.stderr.fileno(): + line = proc.stderr.readline() + if len(line) > 0: + stderr_logger.info(line[:-1]) + if event & select.POLLHUP: + poll.unregister(rfd) + pollc -= 1 + + if pollc > 0: + events = poll.poll() + + for handler in proc_logger.handlers: + handler.flush() + + ret = proc.wait() + if ret != 0: + raise Exception(f'borg subprocess exited with returncode {ret}') + + with Manager() as manager: + tmpdir_q = manager.Queue(1) + with closing(Process(target=do_create, args=(tmpdir_q,), name='do_create')) as p: + p.start() + + with TemporaryDirectory(prefix=f'borg-mount_{basename}_', dir=os.getenv('RUNTIME_DIRECTORY')) as tmpdir: + tmpdir_q.put(tmpdir) + p.join() + if p.exitcode == 0 and dry_run: + return 125 + return p.exitcode + +def sigterm(signum, frame): + raise SystemExit(128 + signum) + +def main(): + signal.signal(signal.SIGTERM, sigterm) + + global logger + logger = logging.getLogger(__name__) + console_handler = logging.StreamHandler() + console_handler.setFormatter( logging.Formatter('[%(levelname)s](%(name)s): %(message)s') ) + if sys.stderr.isatty(): + console_handler.setFormatter( logging.Formatter('%(asctime)s [%(levelname)s](%(name)s): %(message)s') ) + + burst_max = 10000 + burst = burst_max + last_use = None + inv_rate = 1e6 + def consume_filter(record): + nonlocal burst, burst_max, inv_rate, last_use + + delay = None + while True: + now = time.monotonic_ns() + burst = min(burst_max, burst + math.floor((now - last_use) / inv_rate)) if last_use else burst_max + last_use = now + + if burst > 0: + burst -= 1 + if delay: + delay = now - delay + + return True + + if delay is None: + delay = now + time.sleep(inv_rate / 1e9) + console_handler.addFilter(consume_filter) + + logger.addHandler(console_handler) + + # log uncaught exceptions + def log_exceptions(type, value, tb): + global logger + + logger.error(value) + sys.__excepthook__(type, value, tb) # calls default excepthook + + sys.excepthook = log_exceptions + + parser = argparse.ArgumentParser(prog='borgsnap') + parser.add_argument('--verbosity', dest='log_level', action='append') + parser.add_argument('--verbose', '-v', dest='log_level', action='append_const', const=-1) + parser.add_argument('--quiet', '-q', dest='log_level', action='append_const', const=1) + parser.add_argument('--target', metavar='REPO', default='yggdrasil.borgbase:repo') + parser.add_argument('--archive-prefix', metavar='REPO', default='yggdrasil.vidhar.') + subparsers = parser.add_subparsers() + subparsers.required = True + parser.set_defaults(cmd=None) + check_parser = subparsers.add_parser('check') + check_parser.add_argument('--cache-file', type=lambda p: Path(p).absolute(), default=None) + check_parser.add_argument('snapshot') + check_parser.set_defaults(cmd=check) + create_parser = subparsers.add_parser('create') + create_parser.add_argument('--dry-run', '-n', action='store_true', default=False) + create_parser.add_argument('snapshot') + create_parser.set_defaults(cmd=create) + args = parser.parse_args() + + LOG_LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL] + DEFAULT_LOG_LEVEL = logging.ERROR + log_level = LOG_LEVELS.index(DEFAULT_LOG_LEVEL) + + for adjustment in args.log_level or (): + log_level = min(len(LOG_LEVELS) - 1, max(log_level + adjustment, 0)) + logger.setLevel(LOG_LEVELS[log_level]) + + cmdArgs = {} + for copy in {'target', 'archive_prefix', 'snapshot', 'cache_file', 'dry_run'}: + if copy in vars(args): + cmdArgs[copy] = vars(args)[copy] + + return args.cmd(**cmdArgs) + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/modules/borgsnap/borgsnap/setup.py b/modules/borgsnap/borgsnap/setup.py new file mode 100644 index 00000000..76356bfc --- /dev/null +++ b/modules/borgsnap/borgsnap/setup.py @@ -0,0 +1,10 @@ +from setuptools import setup + +setup(name='borgsnap', + packages=['borgsnap'], + entry_points={ + 'console_scripts': [ + 'borgsnap=borgsnap.__main__:main', + ], + } +) diff --git a/modules/borgsnap/default.nix b/modules/borgsnap/default.nix new file mode 100644 index 00000000..f4c0eec4 --- /dev/null +++ b/modules/borgsnap/default.nix @@ -0,0 +1,106 @@ +{ config, pkgs, lib, flakeInputs, hostName, ... }: + +with lib; + +let + borgsnap = flakeInputs.mach-nix.lib.${config.nixpkgs.system}.buildPythonPackage rec { + pname = "borgsnap"; + src = ./borgsnap; + version = "0.0.0"; + ignoreDataOutdated = true; + + requirements = '' + atomicwrites + pyprctl + python-unshare + python-dateutil + ''; + postInstall = '' + wrapProgram $out/bin/borgsnap \ + --prefix PATH : ${makeBinPath (with pkgs; [config.boot.zfs.package util-linux borgbackup])}:${config.security.wrapperDir} + ''; + + providers.python-unshare = "nixpkgs"; + overridesPre = [ + (self: super: { python-unshare = super.python-unshare.overrideAttrs (oldAttrs: { name = "python-unshare-0.2.1"; version = "0.2.1"; }); }) + ]; + + _.tomli.buildInputs.add = with pkgs."python3Packages"; [ flit-core ]; + }; + + cfg = config.services.borgsnap; +in { + options = { + services.borgsnap = { + enable = mkEnableOption "borgsnap service"; + + target = mkOption { + type = types.str; + }; + + archive-prefix = mkOption { + type = types.str; + default = "yggdrasil.${hostName}."; + }; + + extraConfig = mkOption { + type = with types; attrsOf str; + default = { + halfweekly = "8"; + monthly = "-1"; + }; + }; + + verbosity = mkOption { + type = types.int; + default = config.services.zfssnap.verbosity; + }; + + sshConfig = mkOption { + type = with types; nullOr str; + default = null; + }; + + keyfile = mkOption { + type = with types; nullOr str; + default = null; + }; + + extraCreateArgs = mkOption { + type = with types; listOf str; + default = []; + }; + extraCheckArgs = mkOption { + type = with types; listOf str; + default = []; + }; + }; + }; + + config = mkIf cfg.enable { + warnings = mkIf (!config.services.zfssnap.enable) [ + "borgsnap will do nothing if zfssnap is not enabled" + ]; + + services.zfssnap.config.exec = { + check = "${borgsnap}/bin/borgsnap --verbosity=${toString cfg.verbosity} --target ${escapeShellArg cfg.target} --archive-prefix ${escapeShellArg cfg.archive-prefix} check --cache-file /run/zfssnap-prune/archives-cache.json ${escapeShellArgs cfg.extraCheckArgs}"; + cmd = "${borgsnap}/bin/borgsnap --verbosity=${toString cfg.verbosity} --target ${escapeShellArg cfg.target} --archive-prefix ${escapeShellArg cfg.archive-prefix} create ${escapeShellArgs cfg.extraCreateArgs}"; + } // cfg.extraConfig; + + systemd.services."zfssnap-prune" = { + serviceConfig = { + Environment = [ + "BORG_BASE_DIR=/var/lib/borg" + "BORG_CONFIG_DIR=/var/lib/borg/config" + "BORG_CACHE_DIR=/var/lib/borg/cache" + "BORG_SECURITY_DIR=/var/lib/borg/security" + "BORG_KEYS_DIR=/var/lib/borg/keys" + "BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK=yes" + "BORG_HOSTNAME_IS_UNIQUE=yes" + ] ++ optional (!(isNull cfg.sshConfig)) "BORG_RSH=\"${pkgs.openssh}/bin/ssh -F ${pkgs.writeText "config" cfg.sshConfig}\"" + ++ optional (!(isNull cfg.keyfile)) "BORG_KEY_FILE=${cfg.keyfile}"; + RuntimeDirectory = "zfssnap-prune"; + }; + }; + }; +} diff --git a/modules/zfssnap/default.nix b/modules/zfssnap/default.nix index f46cd510..f6f32852 100644 --- a/modules/zfssnap/default.nix +++ b/modules/zfssnap/default.nix @@ -48,6 +48,20 @@ in { type = types.str; default = "*-*-* *:00/5:00"; }; + + verbosity = mkOption { + type = types.int; + default = 2; + }; + + extraPruneArgs = mkOption { + type = with types; listOf str; + default = []; + }; + extraAutosnapArgs = mkOption { + type = with types; listOf str; + default = []; + }; }; }; @@ -59,7 +73,7 @@ in { before = [ "zfssnap-prune.service" ]; serviceConfig = { Type = "oneshot"; - ExecStart = "${zfssnap}/bin/zfssnap -vv"; + ExecStart = "${zfssnap}/bin/zfssnap --verbosity=${toString cfg.verbosity} autosnap ${escapeShellArgs cfg.extraAutosnapArgs}"; LogRateLimitIntervalSec = 0; }; @@ -72,7 +86,7 @@ in { ExecStart = let mkSectionName = name: strings.escape [ "[" "]" ] (strings.toUpper name); zfssnapConfig = generators.toINI { inherit mkSectionName; } cfg.config; - in "${zfssnap}/bin/zfssnap -vv prune --config=${pkgs.writeText "zfssnap.ini" zfssnapConfig}"; + in "${zfssnap}/bin/zfssnap --verbosity=${toString cfg.verbosity} prune --config=${pkgs.writeText "zfssnap.ini" zfssnapConfig} ${escapeShellArgs cfg.extraPruneArgs}"; LogRateLimitIntervalSec = 0; }; diff --git a/modules/zfssnap/zfssnap/zfssnap/__main__.py b/modules/zfssnap/zfssnap/zfssnap/__main__.py index 9d07401b..59478c71 100644 --- a/modules/zfssnap/zfssnap/zfssnap/__main__.py +++ b/modules/zfssnap/zfssnap/zfssnap/__main__.py @@ -375,10 +375,13 @@ def main(): sys.excepthook = log_exceptions parser = argparse.ArgumentParser(prog='zfssnap') + parser.add_argument('--verbosity', dest='log_level', action='append') parser.add_argument('--verbose', '-v', dest='log_level', action='append_const', const=-1) parser.add_argument('--quiet', '-q', dest='log_level', action='append_const', const=1) subparsers = parser.add_subparsers() parser.set_defaults(cmd=autosnap) + autosnap_parser = subparsers.add_parser('autosnap') + autosnap_parser.set_defaults(cmd=autosnap) rename_parser = subparsers.add_parser('rename') rename_parser.add_argument('snapshots', nargs='+') rename_parser.add_argument('--destroy', action='store_true', default=False) -- cgit v1.2.3