From a73133a7c629c76a7a328f0e8d2bb693c46ef45d Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Tue, 1 Nov 2022 21:05:33 +0100 Subject: fix backups --- hosts/vidhar/borg/borgsnap/borgsnap/__main__.py | 355 ++++++++++++++----- hosts/vidhar/borg/default.nix | 6 +- hosts/vidhar/network/bifrost/default.nix | 18 +- modules/yggdrasil-wg/default.nix | 4 +- modules/zfssnap/default.nix | 42 +-- modules/zfssnap/zfssnap.py | 405 ---------------------- modules/zfssnap/zfssnap/setup.py | 10 + modules/zfssnap/zfssnap/zfssnap/__main__.py | 435 ++++++++++++++++++++++++ 8 files changed, 754 insertions(+), 521 deletions(-) delete mode 100644 modules/zfssnap/zfssnap.py create mode 100644 modules/zfssnap/zfssnap/setup.py create mode 100644 modules/zfssnap/zfssnap/zfssnap/__main__.py diff --git a/hosts/vidhar/borg/borgsnap/borgsnap/__main__.py b/hosts/vidhar/borg/borgsnap/borgsnap/__main__.py index e93e6a60..cd2f1ad2 100644 --- a/hosts/vidhar/borg/borgsnap/borgsnap/__main__.py +++ b/hosts/vidhar/borg/borgsnap/borgsnap/__main__.py @@ -1,12 +1,11 @@ import argparse -import os, sys, signal -from pyprctl import cap_permitted, cap_inheritable, cap_effective, cap_ambient, Cap +import os, sys, signal, io +from pyprctl import CapState, Cap, cap_ambient_raise, cap_ambient_is_set, set_keepcaps from pwd import getpwnam from datetime import datetime, timezone from dateutil.parser import isoparse -from xdg import xdg_runtime_dir import unshare from tempfile import TemporaryDirectory @@ -14,6 +13,9 @@ import logging import json import subprocess +import csv +from collections import namedtuple +from distutils.util import strtobool import pathlib from pathlib import Path @@ -22,21 +24,89 @@ from atomicwrites import atomic_write from traceback import format_exc +from multiprocessing import Process, Manager +from contextlib import closing + +from enum import Enum, auto + +import select +import time +import math + + +PROP_DO_BORGSNAP = 'li.yggdrasil:borgsnap' + + +class DoValue(Enum): + BORGSNAP_DO = auto() + BORGSNAP_KEEP = auto() + BORGSNAP_DONT = auto() + + @classmethod + def from_prop(cls, v: str): + if v.lower() == 'keep': + return cls.BORGSNAP_KEEP + + return cls.BORGSNAP_DO if not v or bool(strtobool(v)) else cls.BORGSNAP_DONT + + @classmethod + def merge(cls, v1, v2): + match (v1, v2): + case (cls.BORGSNAP_DONT, _): + return cls.BORGSNAP_DONT + case (_, cls.BORGSNAP_DONT): + return cls.BORGSNAP_DONT + case (cls.BORGSNAP_KEEP, _): + return cls.BORGSNAP_KEEP + case (_, cls.BORGSNAP_KEEP): + return cls.BORGSNAP_KEEP + case other: + return cls.BORGSNAP_DO + + def returncode(self): + match self: + case self.__class__.BORGSNAP_DO: + return 126 + case self.__class__.BORGSNAP_KEEP: + return 125 + case self.__class__.BORGSNAP_DONT: + return 124 borg_pwd = getpwnam('borg') -def as_borg(caps=set(), cwd=None): - if caps: - cap_permitted.add(*caps) - cap_inheritable.add(*caps) - cap_effective.add(*caps) - cap_ambient.add(*caps) +def as_borg(caps=set()): + global logger + + try: + if caps: + c_state = CapState.get_current() + c_state.permitted.add(*caps) + c_state.set_current() + + logger.debug("before setgid/setuid: cap_permitted=%s", CapState.get_current().permitted) + + set_keepcaps(True) + + os.setgid(borg_pwd.pw_gid) + os.setuid(borg_pwd.pw_uid) - os.setgid(borg_pwd.pw_gid) - os.setuid(borg_pwd.pw_uid) + if caps: + logger.debug("after setgid/setuid: cap_permitted=%s", CapState.get_current().permitted) - if cwd is not None: - os.chdir(cwd) + c_state = CapState.get_current() + c_state.permitted = caps.copy() + c_state.inheritable.add(*caps) + c_state.set_current() + + logger.debug("cap_permitted=%s", CapState.get_current().permitted) + logger.debug("cap_inheritable=%s", CapState.get_current().inheritable) + + for cap in caps: + cap_ambient_raise(cap) + logger.debug("cap_ambient[%s]=%s", cap, cap_ambient_is_set(cap)) + except Exception: + logger.error(format_exc()) + raise def _archive_name(snapshot, target, archive_prefix): @@ -50,13 +120,15 @@ def _archive_basename(snapshot, archive_prefix): return archive_prefix + base_name.replace('-', '--').replace('/', '-') def check(*, snapshot, target, archive_prefix, cache_file): + global logger + archives = None if cache_file: logger.debug('Trying cache...') try: with open(cache_file, mode='r', encoding='utf-8') as fp: archives = set(json.load(fp)) - logger.info('Loaded archive list from cache') + logger.debug('Loaded archive list from cache') except FileNotFoundError: pass @@ -72,76 +144,165 @@ def check(*, snapshot, target, archive_prefix, cache_file): # logger.debug(f'archives: {archives}') _, _, archive_name = _archive_name(snapshot, target, archive_prefix).partition('::') if archive_name in archives: - logger.info(f'{archive_name} found') + logger.info('‘%s’ found', archive_name) return 0 else: - logger.info(f'{archive_name} not found') - return 126 + logger.info('‘%s’ not found', archive_name) + + logger.debug('Checking %s for ‘%s’...', PROP_DO_BORGSNAP, snapshot) + intent = DoValue.BORGSNAP_DO + p = subprocess.run(['zfs', 'get', '-H', '-p', '-o', 'name,value', PROP_DO_BORGSNAP, snapshot], stdout=subprocess.PIPE, text=True, check=True) + reader = csv.DictReader(io.StringIO(p.stdout), fieldnames=['name', 'value'], delimiter='\t', quoting=csv.QUOTE_NONE) + Row = namedtuple('Row', reader.fieldnames) + for row in [Row(**data) for data in reader]: + if not row.value or row.value == '-': + continue + + logger.debug('%s=%s (parsed as %s) for ‘%s’...', PROP_DO_BORGSNAP, row.value, DoValue.from_prop(row.value), row.name) + intent = DoValue.merge(intent, DoValue.from_prop(row.value)) + + match intent: + case DoValue.BORGSNAP_DONT: + logger.warn('%s specifies to ignore, returning accordingly...', PROP_DO_BORGSNAP) + case DoValue.BORGSNAP_KEEP: + logger.info('%s specifies to ignore but keep, returning accordingly...', PROP_DO_BORGSNAP) + case other: + pass + + return intent.returncode() def create(*, snapshot, target, archive_prefix, dry_run): + global logger + basename = _archive_basename(snapshot, archive_prefix) - with TemporaryDirectory(prefix=f'borg-mount_{basename}_', dir=os.environ.get('RUNTIME_DIRECTORY')) as tmpdir: - child = os.fork() - if child == 0: - unshare.unshare(unshare.CLONE_NEWNS) - subprocess.run(['mount', '--make-rprivate', '/'], check=True) - chroot = pathlib.Path(tmpdir) / 'chroot' - upper = pathlib.Path(tmpdir) / 'upper' - work = pathlib.Path(tmpdir) / 'work' - for path in [chroot,upper,work]: - path.mkdir() - subprocess.run(['mount', '-t', 'overlay', 'overlay', '-o', f'lowerdir=/,upperdir={upper},workdir={work}', chroot], check=True) - bindMounts = ['nix', 'run', 'run/secrets.d', 'run/wrappers', 'proc', 'dev', 'sys', pathlib.Path(os.path.expanduser('~')).relative_to('/')] - if os.environ.get('BORG_BASE_DIR'): - bindMounts.append(pathlib.Path(os.environ['BORG_BASE_DIR']).relative_to('/')) - if 'SSH_AUTH_SOCK' in os.environ: - bindMounts.append(pathlib.Path(os.environ['SSH_AUTH_SOCK']).parent.relative_to('/')) - for bindMount in bindMounts: - (chroot / bindMount).mkdir(parents=True,exist_ok=True) - # print(*['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], file=stderr) - subprocess.run(['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], check=True) - os.chroot(chroot) - os.chdir('/') - dir = pathlib.Path('/borg') - dir.mkdir(parents=True,exist_ok=True,mode=0o0750) - os.chown(dir, borg_pwd.pw_uid, borg_pwd.pw_gid) - try: + def do_create(tmpdir_q): + global logger + nonlocal basename, snapshot, target, archive_prefix, dry_run + + tmpdir = tmpdir_q.get() + + unshare.unshare(unshare.CLONE_NEWNS) + subprocess.run(['mount', '--make-rprivate', '/'], check=True) + chroot = pathlib.Path(tmpdir) / 'chroot' + upper = pathlib.Path(tmpdir) / 'upper' + work = pathlib.Path(tmpdir) / 'work' + for path in [chroot,upper,work]: + path.mkdir() + subprocess.run(['mount', '-t', 'overlay', 'overlay', '-o', f'lowerdir=/,upperdir={upper},workdir={work}', chroot], check=True) + bindMounts = ['nix', 'run', 'run/secrets.d', 'run/wrappers', 'proc', 'dev', 'sys', pathlib.Path(os.path.expanduser('~')).relative_to('/')] + if borg_base_dir := os.getenv('BORG_BASE_DIR'): + bindMounts.append(pathlib.Path(borg_base_dir).relative_to('/')) + if ssh_auth_sock := os.getenv('SSH_AUTH_SOCK'): + bindMounts.append(pathlib.Path(ssh_auth_sock).parent.relative_to('/')) + for bindMount in bindMounts: + (chroot / bindMount).mkdir(parents=True,exist_ok=True) + subprocess.run(['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], check=True) + + os.chroot(chroot) + os.chdir('/') + dir = pathlib.Path('/borg') + dir.mkdir(parents=True,exist_ok=True,mode=0o0750) + os.chown(dir, borg_pwd.pw_uid, borg_pwd.pw_gid) + + base_name, _, _ = snapshot.rpartition('@') + type_val = subprocess.run(['zfs', 'get', '-H', '-p', '-o', 'value', 'type', base_name], stdout=subprocess.PIPE, text=True, check=True).stdout.strip() + match type_val: + case 'filesystem': subprocess.run(['mount', '-t', 'zfs', '-o', 'ro', snapshot, dir], check=True) - env = os.environ.copy() - create_args = ['borg', - 'create', - '--lock-wait=600', - '--one-file-system', - '--compression=auto,zstd,10', - '--chunker-params=10,23,16,4095', - '--files-cache=ctime,size', - '--show-rc', - # '--remote-ratelimit=20480', - '--progress', - '--list', - '--filter=AMEi-x?', - '--stats' if not dry_run else '--dry-run' - ] - _, _, ts = snapshot.rpartition('@') - creation_time = isoparse(ts).astimezone(timezone.utc) - create_args += [f'--timestamp={creation_time.strftime("%Y-%m-%dT%H:%M:%S")}'] - env['BORG_FILES_CACHE_SUFFIX'] = basename - create_args += [_archive_name(snapshot, target, archive_prefix), '.'] - print({'create_args': create_args, 'cwd': dir, 'env': env}, file=sys.stderr) - subprocess.run(create_args, stdin=subprocess.DEVNULL, env=env, preexec_fn=lambda: as_borg(caps={CAP.DAC_READ_SEARCH}, cwd=dir), check=True) - # subprocess.run(create_args, stdin=subprocess.DEVNULL, env=env, preexec_fn=lambda: None, cwd=dir, check=True) - finally: - subprocess.run(['umount', dir], check=True) - os._exit(0) + case 'volume': + snapdev_val = subprocess.run(['zfs', 'get', '-H', '-p', '-o', 'value', 'snapdev', base_name], stdout=subprocess.PIPE, text=True, check=True).stdout.strip() + try: + if snapdev_val == 'hidden': + subprocess.run(['zfs', 'set', 'snapdev=visible', base_name], check=True) + subprocess.run(['mount', '-t', 'auto', '-o', 'ro', Path('/dev/zvol') / snapshot, dir], check=True) + finally: + if snapdev_val == 'hidden': + subprocess.run(['zfs', 'inherit', 'snapdev', base_name], check=True) + case other: + raise ValueError(f'‘{base_name}’ is of type ‘{type_val}’') + + env = os.environ.copy() + create_args = ['borg', + 'create', + '--lock-wait=600', + '--one-file-system', + '--exclude-caches', + '--keep-exclude-tags', + '--compression=auto,zstd,10', + '--chunker-params=10,23,16,4095', + '--files-cache=ctime,size', + '--show-rc', + # '--remote-ratelimit=20480', + '--progress', + '--list', + '--filter=AMEi-x?', + '--stats' if not dry_run else '--dry-run', + ] + _, _, ts = snapshot.rpartition('@') + creation_time = isoparse(ts).astimezone(timezone.utc) + create_args += [f'--timestamp={creation_time.strftime("%Y-%m-%dT%H:%M:%S")}'] + env['BORG_FILES_CACHE_SUFFIX'] = basename + archive_name = _archive_name(snapshot, target, archive_prefix) + target_host, _, target_path = target.rpartition(':') + *parents_init, _ = list(Path(target_path).parents) + backup_patterns = [*(map(lambda p: Path('.backup') / f'{target_host}:{p}', [Path(target_path), *parents_init])), Path('.backup') / target_host, Path('.backup')] + for pattern_file in backup_patterns: + if (dir / pattern_file).is_file(): + logger.debug('Found backup patterns at ‘%s’', dir / pattern_file) + create_args += [f'--patterns-from={pattern_file}', archive_name] + break + elif (dir / pattern_file).exists(): + logger.warn('‘%s’ exists but is no file', dir / pattern_file) else: - while True: - waitpid, waitret = os.wait() - if waitret != 0: - sys.exit(waitret) - if waitpid == child: - break - return 0 + logger.debug('No backup patterns exist, checked %s', list(map(lambda pattern_file: str(dir / pattern_file), backup_patterns))) + create_args += [archive_name, '.'] + logger.debug('%s', {'create_args': create_args, 'cwd': dir, 'env': env}) + + with subprocess.Popen(create_args, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env, preexec_fn=lambda: as_borg(caps={Cap.DAC_READ_SEARCH}), cwd=dir, text=True) as proc: + proc_logger = logger.getChild('borg') + + poll = select.poll() + poll.register(proc.stdout, select.POLLIN | select.POLLHUP) + poll.register(proc.stderr, select.POLLIN | select.POLLHUP) + pollc = 2 + events = poll.poll() + while pollc > 0 and len(events) > 0: + for rfd, event in events: + if event & select.POLLIN: + if rfd == proc.stdout.fileno(): + line = proc.stdout.readline() + if len(line) > 0: + proc_logger.info(line[:-1]) + if rfd == proc.stderr.fileno(): + line = proc.stderr.readline() + if len(line) > 0: + proc_logger.info(line[:-1]) + if event & select.POLLHUP: + poll.unregister(rfd) + pollc -= 1 + + if pollc > 0: + events = poll.poll() + + for handler in proc_logger.handlers: + handler.flush() + + ret = proc.wait() + if ret != 0: + raise Exception(f'borg subprocess exited with returncode {ret}') + + with Manager() as manager: + tmpdir_q = manager.Queue(1) + with closing(Process(target=do_create, args=(tmpdir_q,), name='do_create')) as p: + p.start() + + with TemporaryDirectory(prefix=f'borg-mount_{basename}_', dir=os.getenv('RUNTIME_DIRECTORY')) as tmpdir: + tmpdir_q.put(tmpdir) + p.join() + if p.exitcode == 0 and dry_run: + return 125 + return p.exitcode def sigterm(signum, frame): raise SystemExit(128 + signum) @@ -155,6 +316,32 @@ def main(): console_handler.setFormatter( logging.Formatter('[%(levelname)s](%(name)s): %(message)s') ) if sys.stderr.isatty(): console_handler.setFormatter( logging.Formatter('%(asctime)s [%(levelname)s](%(name)s): %(message)s') ) + + burst_max = 10000 + burst = burst_max + last_use = None + inv_rate = 1e6 + def consume_filter(record): + nonlocal burst, burst_max, inv_rate, last_use + + delay = None + while True: + now = time.monotonic_ns() + burst = min(burst_max, burst + math.floor((now - last_use) / inv_rate)) if last_use else burst_max + last_use = now + + if burst > 0: + burst -= 1 + if delay: + delay = now - delay + + return True + + if delay is None: + delay = now + time.sleep(inv_rate / 1e9) + console_handler.addFilter(consume_filter) + logger.addHandler(console_handler) # log uncaught exceptions @@ -167,7 +354,8 @@ def main(): sys.excepthook = log_exceptions parser = argparse.ArgumentParser(prog='borgsnap') - parser.add_argument('--verbose', '-v', action='count', default=0) + parser.add_argument('--verbose', '-v', dest='log_level', action='append_const', const=-1) + parser.add_argument('--quiet', '-q', dest='log_level', action='append_const', const=1) parser.add_argument('--target', metavar='REPO', default='yggdrasil.borgbase:repo') parser.add_argument('--archive-prefix', metavar='REPO', default='yggdrasil.vidhar.') subparsers = parser.add_subparsers() @@ -183,12 +371,13 @@ def main(): create_parser.set_defaults(cmd=create) args = parser.parse_args() - if args.verbose <= 0: - logger.setLevel(logging.WARNING) - elif args.verbose <= 1: - logger.setLevel(logging.INFO) - else: - logger.setLevel(logging.DEBUG) + LOG_LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL] + DEFAULT_LOG_LEVEL = logging.ERROR + log_level = LOG_LEVELS.index(DEFAULT_LOG_LEVEL) + + for adjustment in args.log_level or (): + log_level = min(len(LOG_LEVELS) - 1, max(log_level + adjustment, 0)) + logger.setLevel(LOG_LEVELS[log_level]) cmdArgs = {} for copy in {'target', 'archive_prefix', 'snapshot', 'cache_file', 'dry_run'}: diff --git a/hosts/vidhar/borg/default.nix b/hosts/vidhar/borg/default.nix index 79c75c4d..7e3129f2 100644 --- a/hosts/vidhar/borg/default.nix +++ b/hosts/vidhar/borg/default.nix @@ -74,7 +74,7 @@ let copy wrapProgram $out/bin/copy \ - --prefix PATH : ${makeBinPath (with pkgs; [util-linux borgbackup])}:${config.security.wrapperDir} + --prefix PATH : ${makeBinPath (with pkgs; [config.boot.zfs.package util-linux borgbackup])}:${config.security.wrapperDir} ''; }); @@ -88,7 +88,6 @@ let atomicwrites pyprctl python-unshare - xdg python-dateutil ''; postInstall = '' @@ -101,14 +100,13 @@ let (self: super: { python-unshare = super.python-unshare.overrideAttrs (oldAttrs: { name = "python-unshare-0.2.1"; version = "0.2.1"; }); }) ]; - _.xdg.buildInputs.add = with pkgs."python3Packages"; [ poetry ]; _.tomli.buildInputs.add = with pkgs."python3Packages"; [ flit-core ]; }; in { config = { services.zfssnap.config.exec = { check = "${borgsnap}/bin/borgsnap -vvv --target yggdrasil.borgbase:repo --archive-prefix yggdrasil.vidhar. check --cache-file /run/zfssnap-prune/archives-cache.json"; - cmd = "${borgsnap}/bin/borgsnap -vvv --target yggdrasil.borgbase:repo --archive-prefix yggdrasil.vidhar. create --dry-run"; + cmd = "${borgsnap}/bin/borgsnap -vvv --target yggdrasil.borgbase:repo --archive-prefix yggdrasil.vidhar. create"; halfweekly = "8"; monthly = "-1"; diff --git a/hosts/vidhar/network/bifrost/default.nix b/hosts/vidhar/network/bifrost/default.nix index 8c2cc1de..ec354f81 100644 --- a/hosts/vidhar/network/bifrost/default.nix +++ b/hosts/vidhar/network/bifrost/default.nix @@ -40,18 +40,30 @@ in { Destination = "2a03:4000:52:ada:4::/80"; }; } - { routeConfig ={ + { routeConfig = { Gateway = "2a03:4000:52:ada:4::"; GatewayOnLink = true; Table = "bifrost"; }; } + { routeConfig = { + Destination = "2a03:4000:52:ada:4::/80"; + GatewayOnLink = true; + Table = "bifrost"; + }; + } + { routeConfig = { + Destination = "2a03:4000:52:ada:4:1::/96"; + GatewayOnLink = true; + Table = "bifrost"; + }; + } ]; routingPolicyRules = [ { routingPolicyRuleConfig = { Table = "bifrost"; From = "2a03:4000:52:ada:4:1::/96"; - Priority = 200; + Priority = 1; }; } ]; @@ -64,6 +76,8 @@ in { }; }; }; + + config.routeTables.bifrost = 1026; }; systemd.services."systemd-networkd".serviceConfig.LoadCredential = [ "bifrost.priv:${config.sops.secrets.bifrost.path}" diff --git a/modules/yggdrasil-wg/default.nix b/modules/yggdrasil-wg/default.nix index c27eb286..8525cea0 100644 --- a/modules/yggdrasil-wg/default.nix +++ b/modules/yggdrasil-wg/default.nix @@ -82,7 +82,7 @@ let mkPrivateKeyPath = family: host: ./hosts + "/${family}" + "/${host}.priv"; kernel = config.boot.kernelPackages; - + publicKeyPath = family: mkPublicKeyPath family hostName; privateKeyPath = family: mkPrivateKeyPath family hostName; inNetwork' = family: pathExists (privateKeyPath family) && pathExists (publicKeyPath family); @@ -221,7 +221,7 @@ in { }; } ] ++ (concatMap (router: map (rAddr: { routeConfig = { Destination = "::/0"; Gateway = stripSubnet rAddr; GatewayOnLink = true; Table = "yggdrasil"; }; }) batHostIPs.${router}) (filter (router: router != hostName) routers)); - routingPolicyRules = map (addr: { routingPolicyRuleConfig = { Table = "yggdrasil"; From = stripSubnet addr; Priority = 1; }; }) batHostIPs.${hostName}; + routingPolicyRules = map (addr: { routingPolicyRuleConfig = { Table = "yggdrasil"; From = addr; Priority = 1; }; }) batHostIPs.${hostName}; linkConfig = { MACAddress = "${batHostMACs.${hostName}}"; RequiredForOnline = false; diff --git a/modules/zfssnap/default.nix b/modules/zfssnap/default.nix index 42cdf46f..735e73ec 100644 --- a/modules/zfssnap/default.nix +++ b/modules/zfssnap/default.nix @@ -1,32 +1,20 @@ -{ config, pkgs, lib, ... }: +{ config, pkgs, lib, flakeInputs, ... }: with lib; let - zfssnap = pkgs.stdenv.mkDerivation rec { - name = "zfssnap"; - src = ./zfssnap.py; + zfssnap = flakeInputs.mach-nix.lib.${config.nixpkgs.system}.buildPythonPackage rec { + pname = "zfssnap"; + src = ./zfssnap; + version = "0.0.0"; + ignoreDataOutdated = true; - phases = [ "buildPhase" "checkPhase" "installPhase" ]; - - buildInputs = with pkgs; [makeWrapper]; - - python = pkgs.python39.withPackages (ps: with ps; [pyxdg pytimeparse python-dateutil]); - - buildPhase = '' - substitute $src zfssnap \ - --subst-var-by python ${escapeShellArg python} - ''; - - doCheck = true; - checkPhase = '' - ${python}/bin/python -m py_compile zfssnap + requirements = '' + pyxdg + pytimeparse + python-dateutil ''; - - installPhase = '' - install -m 0755 -D -t $out/bin \ - zfssnap - + postInstall = '' wrapProgram $out/bin/zfssnap \ --prefix PATH : ${makeBinPath [config.boot.zfs.package]} ''; @@ -71,7 +59,9 @@ in { before = [ "zfssnap-prune.service" ]; serviceConfig = { Type = "oneshot"; - ExecStart = "${zfssnap}/bin/zfssnap -v"; + ExecStart = "${zfssnap}/bin/zfssnap -vv"; + + LogRateLimitIntervalSec = 0; }; }; systemd.services."zfssnap-prune" = { @@ -82,7 +72,9 @@ in { ExecStart = let mkSectionName = name: strings.escape [ "[" "]" ] (strings.toUpper name); zfssnapConfig = generators.toINI { inherit mkSectionName; } cfg.config; - in "${zfssnap}/bin/zfssnap -vv prune --config=${pkgs.writeText "zfssnap.ini" zfssnapConfig}"; + in "${zfssnap}/bin/zfssnap -vv prune --exec-newest --config=${pkgs.writeText "zfssnap.ini" zfssnapConfig}"; # DEBUG + + LogRateLimitIntervalSec = 0; }; }; diff --git a/modules/zfssnap/zfssnap.py b/modules/zfssnap/zfssnap.py deleted file mode 100644 index a8dae75f..00000000 --- a/modules/zfssnap/zfssnap.py +++ /dev/null @@ -1,405 +0,0 @@ -#!@python@/bin/python - -import csv -import subprocess -import io -from distutils.util import strtobool -from datetime import datetime, timezone, timedelta -from dateutil.tz import gettz, tzutc -import pytimeparse -import argparse -import re - -import sys - -import logging - -import shlex - -from collections import defaultdict, OrderedDict, deque, namedtuple - -import configparser -from xdg import BaseDirectory - -from functools import cache - -from math import floor - -import asyncio - -from dataclasses import dataclass - - -TIME_PATTERNS = OrderedDict([ - ("secondly", lambda t: t.strftime('%Y-%m-%d %H:%M:%S')), - ("minutely", lambda t: t.strftime('%Y-%m-%d %H:%M')), - ("5m", lambda t: (t.strftime('%Y-%m-%d %H'), floor(t.minute / 5) * 5)), - ("15m", lambda t: (t.strftime('%Y-%m-%d %H'), floor(t.minute / 15) * 15)), - ("hourly", lambda t: t.strftime('%Y-%m-%d %H')), - ("4h", lambda t: (t.strftime('%Y-%m-%d'), floor(t.hour / 4) * 4)), - ("12h", lambda t: (t.strftime('%Y-%m-%d'), floor(t.hour / 12) * 12)), - ("daily", lambda t: t.strftime('%Y-%m-%d')), - ("halfweekly", lambda t: (t.strftime('%G-%V'), floor(int(t.strftime('%u')) / 4) * 4)), - ("weekly", lambda t: t.strftime('%G-%V')), - ("monthly", lambda t: t.strftime('%Y-%m')), - ("yearly", lambda t: t.strftime('%Y')), -]) - -@dataclass(eq=True, order=True, frozen=True) -class Snap: - name: str - creation: datetime - -@dataclass(eq=True, order=True, frozen=True) -class KeptBecause: - rule: str - ix: int - base: str - period: str - - -@cache -def _now(): - return datetime.now(timezone.utc) - -def _snap_name(item, time=_now()): - suffix = re.sub(r'\+00:00$', r'Z', time.isoformat(timespec='seconds')) - return f'{item}@{suffix}' - -def _log_cmd(*args): - fmt_args = ' '.join(map(shlex.quote, args)) - logger.debug(f'Running command: {fmt_args}') - -def _get_items(): - items = {} - - args = ['zfs', 'get', '-H', '-p', '-o', 'name,value', '-t', 'filesystem,volume', '-s', 'local,default,inherited,temporary,received', 'li.yggdrasil:auto-snapshot'] - _log_cmd(*args) - with subprocess.Popen(args, stdout=subprocess.PIPE) as proc: - text_stdout = io.TextIOWrapper(proc.stdout) - reader = csv.DictReader(text_stdout, fieldnames=['name', 'setting'], delimiter='\t', quoting=csv.QUOTE_NONE) - Row = namedtuple('Row', reader.fieldnames) - for row in [Row(**data) for data in reader]: - items[row.name] = bool(strtobool(row.setting)) - - return items - -def _get_snaps(only_auto=True): - snapshots = defaultdict(list) - args = ['zfs', 'list', '-H', '-p', '-t', 'snapshot', '-o', 'name,li.yggdrasil:is-auto-snapshot,creation'] - _log_cmd(*args) - with subprocess.Popen(args, stdout=subprocess.PIPE) as proc: - text_stdout = io.TextIOWrapper(proc.stdout) - reader = csv.DictReader(text_stdout, fieldnames=['name', 'is_auto_snapshot', 'timestamp'], delimiter='\t', quoting=csv.QUOTE_NONE) - Row = namedtuple('Row', reader.fieldnames) - for row in [Row(**data) for data in reader]: - if only_auto and not bool(strtobool(row.is_auto_snapshot)): - continue - - base_name, _, _ = row.name.rpartition('@') - creation = datetime.fromtimestamp(int(row.timestamp), timezone.utc) - snapshots[base_name].append(Snap(name=row.name, creation=creation)) - - return snapshots - -def prune(config, dry_run, keep_newest, do_exec): - do_exec = do_exec and 'EXEC' in config - prune_timezone = config.gettimezone('KEEP', 'timezone', fallback=tzutc()) - logger.debug(f'prune timezone: {prune_timezone}') - - items = _get_snaps() - - exec_candidates = set() - if do_exec: - exec_timezone = config.gettimezone('EXEC', 'timezone', fallback=prune_timezone) - logger.debug(f'exec timezone: {exec_timezone}') - - for rule, pattern in TIME_PATTERNS.items(): - desired_count = config.getint('EXEC', rule, fallback=0) - - for base, snaps in items.items(): - periods = OrderedDict() - - for snap in sorted(snaps, key=lambda snap: snap.creation): - period = pattern(snap.creation.astimezone(exec_timezone)) - if period not in periods: - periods[period] = deque() - periods[period].append(snap) - - to_exec = desired_count - ordered_periods = periods.items() - for period, period_snaps in ordered_periods: - if to_exec == 0: - break - - for snap in period_snaps: - exec_candidates.add(snap) - logger.debug(f'{snap.name} is exec candidate') - to_exec -= 1 - break - - if to_exec > 0: - logger.debug(f'Missing {to_exec} to fulfill exec {rule}={desired_count} for ‘{base}’') - - check_cmd = config.get('EXEC', 'check', fallback=None) - if check_cmd: - already_execed = set() - for snap in exec_candidates: - args = [] - args += shlex.split(check_cmd) - args += [snap.name] - _log_cmd(*args) - check_res = subprocess.run(args) - if check_res.returncode == 0: - already_execed.add(snap) - logger.debug(f'{snap.name} already execed') - exec_candidates -= already_execed - - exec_cmd = config.get('EXEC', 'cmd', fallback=None) - exec_count = config.getint('EXEC', 'count', fallback=1) - if exec_cmd: - execed = set() - for snap in sorted(exec_candidates, key=lambda snap: snap.creation): - if len(execed) >= exec_count: - logger.debug(f'exc_count of {exec_count} reached') - break - - args = [] - args += shlex.split(exec_cmd) - args += [snap.name] - _log_cmd(*args) - subprocess.run(args).check_returncode() - execed.add(snap) - - exec_candidates -= execed - - kept_count = defaultdict(lambda: defaultdict(lambda: 0)) - kept_because = OrderedDict() - def keep_because(base, snap, rule, period=None): - nonlocal kept_count, kept_because - kept_count[rule][base] += 1 - if snap not in kept_because: - kept_because[snap] = deque() - kept_because[snap].append(KeptBecause(rule=rule, ix=kept_count[rule][base], base=base, period=period)) - - for candidate in exec_candidates: - base_name, _, _ = candidate.name.rpartition('@') - keep_because(base_name, candidate.name, 'exec-candidate') - - within = config.gettimedelta('KEEP', 'within') - if within > timedelta(seconds=0): - for base, snaps in items.items(): - time_ref = max(snaps, key=lambda snap: snap.creation, default=None) - if not time_ref: - logger.warn(f'Nothing to keep for ‘{base}’') - continue - - logger.info(f'Using ‘{time_ref.name}’ as time reference for ‘{base}’') - within_cutoff = time_ref.creation - within - - for snap in snaps: - if snap.creation >= within_cutoff: - keep_because(base, snap.name, 'within') - else: - logger.warn('Skipping rule ‘within’ since retention period is zero') - - for rule, pattern in TIME_PATTERNS.items(): - desired_count = config.getint('KEEP', rule, fallback=0) - - for base, snaps in items.items(): - periods = OrderedDict() - - for snap in sorted(snaps, key=lambda snap: snap.creation, reverse=keep_newest): - period = pattern(snap.creation.astimezone(prune_timezone)) - if period not in periods: - periods[period] = deque() - periods[period].append(snap) - - to_keep = desired_count - ordered_periods = periods.items() if keep_newest else reversed(periods.items()) - for period, period_snaps in ordered_periods: - if to_keep == 0: - break - - for snap in period_snaps: - keep_because(base, snap.name, rule, period=period) - to_keep -= 1 - break - - if to_keep > 0: - logger.debug(f'Missing {to_keep} to fulfill prune {rule}={desired_count} for ‘{base}’') - - for snap, reasons in kept_because.items(): - reasons_str = ', '.join(map(str, reasons)) - logger.info(f'Keeping ‘{snap}’ because: {reasons_str}') - all_snaps = {snap.name for _, snaps in items.items() for snap in snaps} - to_destroy = all_snaps - {*kept_because} - if not to_destroy: - logger.info('Nothing to prune') - - for snap in sorted(to_destroy): - args = ['zfs', 'destroy'] - if dry_run: - args += ['-n'] - args += [snap] - _log_cmd(*args) - subprocess.run(args, check=True) - if dry_run: - logger.info(f'Would have pruned ‘{snap}’') - else: - logger.info(f'Pruned ‘{snap}’') - -def rename(snapshots, destroy=False, set_is_auto=False): - args = ['zfs', 'get', '-H', '-p', '-o', 'name,value', 'creation', *snapshots] - _log_cmd(*args) - renamed_to = set() - with subprocess.Popen(args, stdout=subprocess.PIPE) as proc: - text_stdout = io.TextIOWrapper(proc.stdout) - reader = csv.DictReader(text_stdout, fieldnames=['name', 'timestamp'], delimiter='\t', quoting=csv.QUOTE_NONE) - Row = namedtuple('Row', reader.fieldnames) - for row in [Row(**data) for data in reader]: - creation = datetime.fromtimestamp(int(row.timestamp), timezone.utc) - base_name, _, _ = row.name.rpartition('@') - new_name = _snap_name(base_name, time=creation) - if new_name == row.name: - logger.debug(f'Not renaming ‘{row.name}’ since name is already correct') - continue - - if new_name in renamed_to: - if destroy: - logger.warning(f'Destroying ‘{row.name}’ since ‘{new_name}’ was already renamed to') - args = ['zfs', 'destroy', row.name] - _log_cmd(*args) - subprocess.run(args, check=True) - else: - logger.info(f'Skipping ‘{row.name}’ since ‘{new_name}’ was already renamed to') - - continue - - logger.info(f'Renaming ‘{row.name}’ to ‘{new_name}’') - args = ['zfs', 'rename', row.name, new_name] - _log_cmd(*args) - subprocess.run(args, check=True) - renamed_to.add(new_name) - - if set_is_auto: - logger.info(f'Setting is-auto-snapshot on ‘{new_name}’') - args = ['zfs', 'set', 'li.yggdrasil:is-auto-snapshot=true', new_name] - _log_cmd(*args) - subprocess.run(args, check=True) - -def autosnap(): - items = _get_items() - - all_snap_names = set() - async def do_snapshot(*snap_items, recursive=False): - nonlocal items, all_snap_names - snap_names = {_snap_name(item) for item in snap_items if items[item]} - if recursive: - for snap_item in snap_items: - all_snap_names |= {_snap_name(item) for item in items if item.startswith(snap_item)} - else: - all_snap_names |= snap_names - - args = ['zfs', 'snapshot', '-o', 'li.yggdrasil:is-auto-snapshot=true'] - if recursive: - args += ['-r'] - args += snap_names - - _log_cmd(*args) - subprocess.run(args, check=True) - - pool_items = defaultdict(set) - for item in items: - pool, _, _ = item.partition('/') - pool_items[pool].add(item) - - tasks = [] - for snap_items in pool_items.values(): - tasks.append(do_snapshot(*snap_items)) - if not tasks: - logger.warning('No snapshots to create') - else: - async def run_tasks(): - await asyncio.gather(*tasks) - asyncio.run(run_tasks()) - for snap in all_snap_names: - logger.info(f'Created ‘{snap}’') - if all_snap_names: - rename(snapshots=all_snap_names) - -def main(): - global logger - logger = logging.getLogger(__name__) - console_handler = logging.StreamHandler() - console_handler.setFormatter( logging.Formatter('[%(levelname)s](%(name)s): %(message)s') ) - if sys.stderr.isatty(): - console_handler.setFormatter( logging.Formatter('%(asctime)s [%(levelname)s](%(name)s): %(message)s') ) - logger.addHandler(console_handler) - - # log uncaught exceptions - def log_exceptions(type, value, tb): - global logger - - logger.error(value) - sys.__excepthook__(type, value, tb) # calls default excepthook - - sys.excepthook = log_exceptions - - parser = argparse.ArgumentParser(prog='zfssnap') - parser.add_argument('--verbose', '-v', action='count', default=0) - subparsers = parser.add_subparsers() - parser.set_defaults(cmd=autosnap) - rename_parser = subparsers.add_parser('rename') - rename_parser.add_argument('snapshots', nargs='+') - rename_parser.add_argument('--destroy', action='store_true', default=False) - rename_parser.add_argument('--set-is-auto', action='store_true', default=False) - rename_parser.set_defaults(cmd=rename) - prune_parser = subparsers.add_parser('prune') - prune_parser.add_argument('--config', '-c', dest='config_files', nargs='*', default=list()) - prune_parser.add_argument('--dry-run', '-n', action='store_true', default=False) - prune_parser.add_argument('--keep-newest', action='store_true', default=False) - prune_parser.add_argument('--no-exec', dest='do_exec', action='store_false', default=True) - prune_parser.set_defaults(cmd=prune) - args = parser.parse_args() - - if args.verbose <= 0: - logger.setLevel(logging.WARNING) - elif args.verbose <= 1: - logger.setLevel(logging.INFO) - else: - logger.setLevel(logging.DEBUG) - - cmdArgs = {} - for copy in {'snapshots', 'dry_run', 'destroy', 'keep_newest', 'set_is_auto', 'do_exec'}: - if copy in vars(args): - cmdArgs[copy] = vars(args)[copy] - if 'config_files' in vars(args): - def convert_timedelta(secs_str): - secs=pytimeparse.parse(secs_str) - if secs is None: - raise ValueError(f'Could not parse timedelta expression ‘{secs_str}’') - return timedelta(seconds=secs) - config = configparser.ConfigParser(converters={ - 'timedelta': convert_timedelta, - 'timezone': gettz - }) - search_files = args.config_files if args.config_files else [*BaseDirectory.load_config_paths('zfssnap.ini')] - read_files = config.read(search_files) - - def format_config_files(files): - if not files: - return 'no files' - return ', '.join(map(lambda file: f'‘{file}’', files)) - - if not read_files: - raise Exception(f'Found no config files. Tried: {format_config_files(search_files)}') - - logger.debug(f'Read following config files: {format_config_files(read_files)}') - - cmdArgs['config'] = config - - args.cmd(**cmdArgs) - -if __name__ == '__main__': - sys.exit(main()) diff --git a/modules/zfssnap/zfssnap/setup.py b/modules/zfssnap/zfssnap/setup.py new file mode 100644 index 00000000..6c58757d --- /dev/null +++ b/modules/zfssnap/zfssnap/setup.py @@ -0,0 +1,10 @@ +from setuptools import setup + +setup(name='zfssnap', + packages=['zfssnap'], + entry_points={ + 'console_scripts': [ + 'zfssnap=zfssnap.__main__:main', + ], + } +) diff --git a/modules/zfssnap/zfssnap/zfssnap/__main__.py b/modules/zfssnap/zfssnap/zfssnap/__main__.py new file mode 100644 index 00000000..a0eade78 --- /dev/null +++ b/modules/zfssnap/zfssnap/zfssnap/__main__.py @@ -0,0 +1,435 @@ +#!@python@/bin/python + +import csv +import subprocess +import io +from distutils.util import strtobool +from datetime import datetime, timezone, timedelta +from dateutil.tz import gettz, tzutc +import pytimeparse +import argparse +import re + +import sys + +import logging + +import shlex + +from collections import defaultdict, OrderedDict, deque, namedtuple + +import configparser +from xdg import BaseDirectory + +from functools import cache + +from math import floor + +import asyncio + +from dataclasses import dataclass + + +TIME_PATTERNS = OrderedDict([ + ("secondly", lambda t: t.strftime('%Y-%m-%d %H:%M:%S')), + ("minutely", lambda t: t.strftime('%Y-%m-%d %H:%M')), + ("5m", lambda t: (t.strftime('%Y-%m-%d %H'), floor(t.minute / 5) * 5)), + ("15m", lambda t: (t.strftime('%Y-%m-%d %H'), floor(t.minute / 15) * 15)), + ("hourly", lambda t: t.strftime('%Y-%m-%d %H')), + ("4h", lambda t: (t.strftime('%Y-%m-%d'), floor(t.hour / 4) * 4)), + ("12h", lambda t: (t.strftime('%Y-%m-%d'), floor(t.hour / 12) * 12)), + ("daily", lambda t: t.strftime('%Y-%m-%d')), + ("halfweekly", lambda t: (t.strftime('%G-%V'), floor(int(t.strftime('%u')) / 4) * 4)), + ("weekly", lambda t: t.strftime('%G-%V')), + ("monthly", lambda t: t.strftime('%Y-%m')), + ("yearly", lambda t: t.strftime('%Y')), +]) + +PROP_DO_AUTO_SNAPSHOT = 'li.yggdrasil:auto-snapshot' +PROP_IS_AUTO_SNAPSHOT = 'li.yggdrasil:is-auto-snapshot' + +@dataclass(eq=True, order=True, frozen=True) +class Snap: + name: str + creation: datetime + +@dataclass(eq=True, order=True, frozen=True) +class KeptBecause: + rule: str + ix: int + base: str + period: str + + +@cache +def _now(): + return datetime.now(timezone.utc) + +def _snap_name(item, time=_now()): + suffix = re.sub(r'\+00:00$', r'Z', time.isoformat(timespec='seconds')) + return f'{item}@{suffix}' + +def _log_cmd(*args): + fmt_args = ' '.join(map(shlex.quote, args)) + logger.debug('Running command: %s', fmt_args) + +def _get_items(): + items = {} + + args = ['zfs', 'get', '-H', '-p', '-o', 'name,value', '-t', 'filesystem,volume', PROP_DO_AUTO_SNAPSHOT] + _log_cmd(*args) + with subprocess.Popen(args, stdout=subprocess.PIPE) as proc: + text_stdout = io.TextIOWrapper(proc.stdout) + reader = csv.DictReader(text_stdout, fieldnames=['name', 'value'], delimiter='\t', quoting=csv.QUOTE_NONE) + Row = namedtuple('Row', reader.fieldnames) + for row in [Row(**data) for data in reader]: + if not row.value or row.value == '-': + continue + + items[row.name] = bool(strtobool(row.value)) + + return items + +def _get_snaps(only_auto=True): + snapshots = defaultdict(list) + args = ['zfs', 'list', '-H', '-p', '-t', 'snapshot', '-o', f'name,{PROP_IS_AUTO_SNAPSHOT},creation'] + _log_cmd(*args) + with subprocess.Popen(args, stdout=subprocess.PIPE) as proc: + text_stdout = io.TextIOWrapper(proc.stdout) + reader = csv.DictReader(text_stdout, fieldnames=['name', 'is_auto_snapshot', 'timestamp'], delimiter='\t', quoting=csv.QUOTE_NONE) + Row = namedtuple('Row', reader.fieldnames) + for row in [Row(**data) for data in reader]: + if only_auto and not bool(strtobool(row.is_auto_snapshot)): + continue + + base_name, _, _ = row.name.rpartition('@') + creation = datetime.fromtimestamp(int(row.timestamp), timezone.utc) + snapshots[base_name].append(Snap(name=row.name, creation=creation)) + + return snapshots + +def prune(config, dry_run, keep_newest, do_exec, exec_newest): + do_exec = do_exec and 'EXEC' in config + prune_timezone = config.gettimezone('KEEP', 'timezone', fallback=tzutc()) + logger.debug('prune timezone: %s', prune_timezone) + + items = _get_snaps() + + kept_count = defaultdict(lambda: defaultdict(lambda: 0)) + kept_because = OrderedDict() + def keep_because(base, snap, rule, period=None): + nonlocal kept_count, kept_because + kept_count[rule][base] += 1 + if snap not in kept_because: + kept_because[snap] = deque() + kept_because[snap].append(KeptBecause(rule=rule, ix=kept_count[rule][base], base=base, period=period)) + + exec_candidates = set() + if do_exec: + exec_timezone = config.gettimezone('EXEC', 'timezone', fallback=prune_timezone) + logger.debug('exec timezone: %s', exec_timezone) + + for rule, pattern in TIME_PATTERNS.items(): + desired_count = config.getint('EXEC', rule, fallback=0) + + for base, snaps in items.items(): + periods = OrderedDict() + + for snap in sorted(snaps, key=lambda snap: snap.creation, reverse=exec_newest): + period = pattern(snap.creation.astimezone(exec_timezone)) + if period not in periods: + periods[period] = deque() + periods[period].append(snap) + + to_exec = desired_count + ordered_periods = periods.items() + for period, period_snaps in ordered_periods: + if to_exec == 0: + break + + for snap in period_snaps: + exec_candidates.add(snap) + logger.debug('‘%s’ is exec candidate', snap.name) + to_exec -= 1 + break + + if to_exec > 0: + logger.debug('Missing %d to fulfill exec %s=%d for ‘%s’', to_exec, rule, desired_count, base) + + check_cmd = config.get('EXEC', 'check', fallback=None) + if check_cmd: + logger.debug('exec_candidates=%s', exec_candidates) + already_execed = set() + for snap in exec_candidates: + args = [] + args += shlex.split(check_cmd) + args += [snap.name] + _log_cmd(*args) + check_res = subprocess.run(args) + if check_res.returncode == 0: + already_execed.add(snap) + logger.debug('‘%s’ already execed', snap.name) + elif check_res.returncode == 124: + already_execed.add(snap) + logger.warn('‘%s’ ignored', snap.name) + pass + elif check_res.returncode == 125: + already_execed.add(snap) + logger.info('‘%s’ ignored but specified for keeping, doing so...', snap.name) + base_name, _, _ = snap.name.rpartition('@') + keep_because(base_name, snap.name, 'exec-ignored') + elif check_res.returncode == 126: + logger.debug('‘%s’ to exec', snap.name) + else: + check_res.check_returncode() + exec_candidates -= already_execed + + exec_cmd = config.get('EXEC', 'cmd', fallback=None) + exec_count = config.getint('EXEC', 'count', fallback=1) + if exec_cmd: + execed = set() + for snap in sorted(exec_candidates, key=lambda snap: snap.creation): + if exec_count > 0 and len(execed) >= exec_count: + logger.debug('exec_count of %d reached', exec_count) + break + + args = [] + args += shlex.split(exec_cmd) + args += [snap.name] + _log_cmd(*args) + p = subprocess.run(args) + if p.returncode == 125: + logger.warn('got dry-run returncode for ‘%s’, keeping...', snap.name) + base_name, _, _ = snap.name.rpartition('@') + keep_because(base_name, snap.name, 'exec-dryrun') + pass + else: + p.check_returncode() + execed.add(snap) + + exec_candidates -= execed + + for candidate in exec_candidates: + base_name, _, _ = candidate.name.rpartition('@') + keep_because(base_name, candidate.name, 'exec-candidate') + + within = config.gettimedelta('KEEP', 'within') + if within > timedelta(seconds=0): + for base, snaps in items.items(): + time_ref = max(snaps, key=lambda snap: snap.creation, default=None) + if not time_ref: + logger.warn('Nothing to keep for ‘%s’', base) + continue + + logger.info('Using ‘%s’ as time reference for ‘%s’', time_ref.name, base) + within_cutoff = time_ref.creation - within + + for snap in snaps: + if snap.creation >= within_cutoff: + keep_because(base, snap.name, 'within') + else: + logger.warn('Skipping rule ‘within’ since retention period is zero') + + for rule, pattern in TIME_PATTERNS.items(): + desired_count = config.getint('KEEP', rule, fallback=0) + + for base, snaps in items.items(): + periods = OrderedDict() + + for snap in sorted(snaps, key=lambda snap: snap.creation, reverse=keep_newest): + period = pattern(snap.creation.astimezone(prune_timezone)) + if period not in periods: + periods[period] = deque() + periods[period].append(snap) + + to_keep = desired_count + ordered_periods = periods.items() if keep_newest else reversed(periods.items()) + for period, period_snaps in ordered_periods: + if to_keep == 0: + break + + for snap in period_snaps: + keep_because(base, snap.name, rule, period=period) + to_keep -= 1 + break + + if to_keep > 0: + logger.debug('Missing %d to fulfill prune %s=%d for ‘%s’', to_keep, rule, desired_count, base) + + for snap, reasons in kept_because.items(): + logger.info('Keeping ‘%s’ because: %s', snap, ', '.join(map(str, reasons))) + all_snaps = {snap.name for _, snaps in items.items() for snap in snaps} + to_destroy = all_snaps - {*kept_because} + if not to_destroy: + logger.info('Nothing to prune') + + for snap in sorted(to_destroy): + args = ['zfs', 'destroy'] + if dry_run: + args += ['-n'] + args += [snap] + _log_cmd(*args) + subprocess.run(args, check=True) + if dry_run: + logger.info('Would have pruned ‘%s’', snap) + else: + logger.info('Pruned ‘%s’', snap) + +def rename(snapshots, destroy=False, set_is_auto=False): + args = ['zfs', 'get', '-H', '-p', '-o', 'name,value', 'creation', *snapshots] + _log_cmd(*args) + renamed_to = set() + with subprocess.Popen(args, stdout=subprocess.PIPE) as proc: + text_stdout = io.TextIOWrapper(proc.stdout) + reader = csv.DictReader(text_stdout, fieldnames=['name', 'timestamp'], delimiter='\t', quoting=csv.QUOTE_NONE) + Row = namedtuple('Row', reader.fieldnames) + for row in [Row(**data) for data in reader]: + creation = datetime.fromtimestamp(int(row.timestamp), timezone.utc) + base_name, _, _ = row.name.rpartition('@') + new_name = _snap_name(base_name, time=creation) + if new_name == row.name: + logger.debug('Not renaming ‘%s’ since name is already correct', row.name) + continue + + if new_name in renamed_to: + if destroy: + logger.warning('Destroying ‘%s’ since ‘%s’ was already renamed to', row.name, new_name) + args = ['zfs', 'destroy', row.name] + _log_cmd(*args) + subprocess.run(args, check=True) + else: + logger.info('Skipping ‘%s’ since ‘%s’ was already renamed to', row.name, new_name) + + continue + + logger.info('Renaming ‘%s’ to ‘%s’', row.name, new_name) + args = ['zfs', 'rename', row.name, new_name] + _log_cmd(*args) + subprocess.run(args, check=True) + renamed_to.add(new_name) + + if set_is_auto: + logger.info('Setting is-auto-snapshot on ‘%s’', new_name) + args = ['zfs', 'set', f'{PROP_IS_AUTO_SNAPSHOT}=true', new_name] + _log_cmd(*args) + subprocess.run(args, check=True) + +def autosnap(): + items = _get_items() + + all_snap_names = set() + async def do_snapshot(*snap_items, recursive=False): + nonlocal items, all_snap_names + snap_names = {_snap_name(item) for item in snap_items if items[item]} + if recursive: + for snap_item in snap_items: + all_snap_names |= {_snap_name(item) for item in items if item.startswith(snap_item)} + else: + all_snap_names |= snap_names + + args = ['zfs', 'snapshot', '-o', f'{PROP_IS_AUTO_SNAPSHOT}=true'] + if recursive: + args += ['-r'] + args += snap_names + + _log_cmd(*args) + subprocess.run(args, check=True) + + pool_items = defaultdict(set) + for item in items: + pool, _, _ = item.partition('/') + pool_items[pool].add(item) + + tasks = [] + for snap_items in pool_items.values(): + tasks.append(do_snapshot(*snap_items)) + if not tasks: + logger.warning('No snapshots to create') + else: + async def run_tasks(): + await asyncio.gather(*tasks) + asyncio.run(run_tasks()) + for snap in all_snap_names: + logger.info('Created ‘%s’', snap) + if all_snap_names: + rename(snapshots=all_snap_names) + +def main(): + global logger + logger = logging.getLogger(__name__) + console_handler = logging.StreamHandler() + console_handler.setFormatter( logging.Formatter('[%(levelname)s](%(name)s): %(message)s') ) + if sys.stderr.isatty(): + console_handler.setFormatter( logging.Formatter('%(asctime)s [%(levelname)s](%(name)s): %(message)s') ) + logger.addHandler(console_handler) + + # log uncaught exceptions + def log_exceptions(type, value, tb): + global logger + + logger.error(value) + sys.__excepthook__(type, value, tb) # calls default excepthook + + sys.excepthook = log_exceptions + + parser = argparse.ArgumentParser(prog='zfssnap') + parser.add_argument('--verbose', '-v', dest='log_level', action='append_const', const=-1) + parser.add_argument('--quiet', '-q', dest='log_level', action='append_const', const=1) + subparsers = parser.add_subparsers() + parser.set_defaults(cmd=autosnap) + rename_parser = subparsers.add_parser('rename') + rename_parser.add_argument('snapshots', nargs='+') + rename_parser.add_argument('--destroy', action='store_true', default=False) + rename_parser.add_argument('--set-is-auto', action='store_true', default=False) + rename_parser.set_defaults(cmd=rename) + prune_parser = subparsers.add_parser('prune') + prune_parser.add_argument('--config', '-c', dest='config_files', nargs='*', default=list()) + prune_parser.add_argument('--dry-run', '-n', action='store_true', default=False) + prune_parser.add_argument('--keep-newest', action='store_true', default=False) + prune_parser.add_argument('--exec-newest', action='store_true', default=False) + prune_parser.add_argument('--no-exec', dest='do_exec', action='store_false', default=True) + prune_parser.set_defaults(cmd=prune) + args = parser.parse_args() + + + LOG_LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL] + DEFAULT_LOG_LEVEL = logging.ERROR + log_level = LOG_LEVELS.index(DEFAULT_LOG_LEVEL) + + for adjustment in args.log_level or (): + log_level = min(len(LOG_LEVELS) - 1, max(log_level + adjustment, 0)) + logger.setLevel(LOG_LEVELS[log_level]) + + cmdArgs = {} + for copy in {'snapshots', 'dry_run', 'destroy', 'keep_newest', 'exec_newest', 'set_is_auto', 'do_exec'}: + if copy in vars(args): + cmdArgs[copy] = vars(args)[copy] + if 'config_files' in vars(args): + def convert_timedelta(secs_str): + secs=pytimeparse.parse(secs_str) + if secs is None: + raise ValueError('Could not parse timedelta expression ‘%s’', secs_str) + return timedelta(seconds=secs) + config = configparser.ConfigParser(converters={ + 'timedelta': convert_timedelta, + 'timezone': gettz + }) + search_files = args.config_files if args.config_files else [*BaseDirectory.load_config_paths('zfssnap.ini')] + read_files = config.read(search_files) + + def format_config_files(files): + if not files: + return 'no files' + return ', '.join(map(lambda file: f'‘{file}’', files)) + + if not read_files: + raise Exception('Found no config files. Tried: %s', format_config_files(search_files)) + + logger.debug('Read following config files: %s', format_config_files(read_files)) + + cmdArgs['config'] = config + + args.cmd(**cmdArgs) + +if __name__ == '__main__': + sys.exit(main()) -- cgit v1.2.3