#!@python@/bin/python import csv import subprocess import io from distutils.util import strtobool from datetime import datetime, timezone, timedelta from dateutil.tz import gettz, tzutc import pytimeparse import argparse import re import sys import logging import shlex from collections import defaultdict, OrderedDict, deque, namedtuple import configparser from xdg import BaseDirectory from functools import cache from math import floor import asyncio from dataclasses import dataclass TIME_PATTERNS = OrderedDict([ ("secondly", lambda t: t.strftime('%Y-%m-%d %H:%M:%S')), ("minutely", lambda t: t.strftime('%Y-%m-%d %H:%M')), ("5m", lambda t: (t.strftime('%Y-%m-%d %H'), floor(t.minute / 5) * 5)), ("15m", lambda t: (t.strftime('%Y-%m-%d %H'), floor(t.minute / 15) * 15)), ("hourly", lambda t: t.strftime('%Y-%m-%d %H')), ("4h", lambda t: (t.strftime('%Y-%m-%d'), floor(t.hour / 4) * 4)), ("12h", lambda t: (t.strftime('%Y-%m-%d'), floor(t.hour / 12) * 12)), ("daily", lambda t: t.strftime('%Y-%m-%d')), ("halfweekly", lambda t: (t.strftime('%G-%V'), floor(int(t.strftime('%u')) / 4) * 4)), ("weekly", lambda t: t.strftime('%G-%V')), ("monthly", lambda t: t.strftime('%Y-%m')), ("yearly", lambda t: t.strftime('%Y')), ]) @dataclass(eq=True, order=True, frozen=True) class Snap: name: str creation: datetime @dataclass(eq=True, order=True, frozen=True) class KeptBecause: rule: str ix: int base: str period: str @cache def _now(): return datetime.now(timezone.utc) def _snap_name(item, time=_now()): suffix = re.sub(r'\+00:00$', r'Z', time.isoformat(timespec='seconds')) return f'{item}@{suffix}' def _log_cmd(*args): fmt_args = ' '.join(map(shlex.quote, args)) logger.debug(f'Running command: {fmt_args}') def _get_items(): items = {} args = ['zfs', 'get', '-H', '-p', '-o', 'name,value', '-t', 'filesystem,volume', '-s', 'local,default,inherited,temporary,received', 'li.yggdrasil:auto-snapshot'] _log_cmd(*args) with subprocess.Popen(args, stdout=subprocess.PIPE) as proc: text_stdout = io.TextIOWrapper(proc.stdout) reader = csv.DictReader(text_stdout, fieldnames=['name', 'setting'], delimiter='\t', quoting=csv.QUOTE_NONE) Row = namedtuple('Row', reader.fieldnames) for row in [Row(**data) for data in reader]: items[row.name] = bool(strtobool(row.setting)) return items def _get_snaps(only_auto=True): snapshots = defaultdict(list) args = ['zfs', 'list', '-H', '-p', '-t', 'snapshot', '-o', 'name,li.yggdrasil:is-auto-snapshot,creation'] _log_cmd(*args) with subprocess.Popen(args, stdout=subprocess.PIPE) as proc: text_stdout = io.TextIOWrapper(proc.stdout) reader = csv.DictReader(text_stdout, fieldnames=['name', 'is_auto_snapshot', 'timestamp'], delimiter='\t', quoting=csv.QUOTE_NONE) Row = namedtuple('Row', reader.fieldnames) for row in [Row(**data) for data in reader]: if only_auto and not bool(strtobool(row.is_auto_snapshot)): continue base_name, _, _ = row.name.rpartition('@') creation = datetime.fromtimestamp(int(row.timestamp), timezone.utc) snapshots[base_name].append(Snap(name=row.name, creation=creation)) return snapshots def prune(config, dry_run, keep_newest, do_exec): do_exec = do_exec and 'EXEC' in config prune_timezone = config.gettimezone('KEEP', 'timezone', fallback=tzutc()) logger.debug(f'prune timezone: {prune_timezone}') items = _get_snaps() exec_candidates = set() if do_exec: exec_timezone = config.gettimezone('EXEC', 'timezone', fallback=prune_timezone) logger.debug(f'exec timezone: {exec_timezone}') for rule, pattern in TIME_PATTERNS.items(): desired_count = config.getint('EXEC', rule, fallback=0) for base, snaps in items.items(): periods = OrderedDict() for snap in sorted(snaps, key=lambda snap: snap.creation): period = pattern(snap.creation.astimezone(exec_timezone)) if period not in periods: periods[period] = deque() periods[period].append(snap) to_exec = desired_count ordered_periods = periods.items() for period, period_snaps in ordered_periods: if to_exec == 0: break for snap in period_snaps: exec_candidates.add(snap) logger.debug(f'{snap.name} is exec candidate') to_exec -= 1 break if to_exec > 0: logger.debug(f'Missing {to_exec} to fulfill exec {rule}={desired_count} for ‘{base}’') check_cmd = config.get('EXEC', 'check', fallback=None) if check_cmd: already_execed = set() for snap in exec_candidates: args = [] args += shlex.split(check_cmd) args += [snap.name] _log_cmd(*args) check_res = subprocess.run(args) if check_res.returncode == 0: already_execed.add(snap) logger.debug(f'{snap.name} already execed') exec_candidates -= already_execed exec_cmd = config.get('EXEC', 'cmd', fallback=None) exec_count = config.getint('EXEC', 'count', fallback=1) if exec_cmd: execed = set() for snap in sorted(exec_candidates, key=lambda snap: snap.creation): if len(execed) >= exec_count: logger.debug(f'exc_count of {exec_count} reached') break args = [] args += shlex.split(exec_cmd) args += [snap.name] _log_cmd(*args) subprocess.run(args).check_returncode() execed.add(snap) exec_candidates -= execed kept_count = defaultdict(lambda: defaultdict(lambda: 0)) kept_because = OrderedDict() def keep_because(base, snap, rule, period=None): nonlocal kept_count, kept_because kept_count[rule][base] += 1 if snap not in kept_because: kept_because[snap] = deque() kept_because[snap].append(KeptBecause(rule=rule, ix=kept_count[rule][base], base=base, period=period)) for candidate in exec_candidates: base_name, _, _ = candidate.name.rpartition('@') keep_because(base_name, candidate.name, 'exec-candidate') within = config.gettimedelta('KEEP', 'within') if within > timedelta(seconds=0): for base, snaps in items.items(): time_ref = max(snaps, key=lambda snap: snap.creation, default=None) if not time_ref: logger.warn(f'Nothing to keep for ‘{base}’') continue logger.info(f'Using ‘{time_ref.name}’ as time reference for ‘{base}’') within_cutoff = time_ref.creation - within for snap in snaps: if snap.creation >= within_cutoff: keep_because(base, snap.name, 'within') else: logger.warn('Skipping rule ‘within’ since retention period is zero') for rule, pattern in TIME_PATTERNS.items(): desired_count = config.getint('KEEP', rule, fallback=0) for base, snaps in items.items(): periods = OrderedDict() for snap in sorted(snaps, key=lambda snap: snap.creation, reverse=keep_newest): period = pattern(snap.creation.astimezone(prune_timezone)) if period not in periods: periods[period] = deque() periods[period].append(snap) to_keep = desired_count ordered_periods = periods.items() if keep_newest else reversed(periods.items()) for period, period_snaps in ordered_periods: if to_keep == 0: break for snap in period_snaps: keep_because(base, snap.name, rule, period=period) to_keep -= 1 break if to_keep > 0: logger.debug(f'Missing {to_keep} to fulfill prune {rule}={desired_count} for ‘{base}’') for snap, reasons in kept_because.items(): reasons_str = ', '.join(map(str, reasons)) logger.info(f'Keeping ‘{snap}’ because: {reasons_str}') all_snaps = {snap.name for _, snaps in items.items() for snap in snaps} to_destroy = all_snaps - {*kept_because} if not to_destroy: logger.info('Nothing to prune') for snap in sorted(to_destroy): args = ['zfs', 'destroy'] if dry_run: args += ['-n'] args += [snap] _log_cmd(*args) subprocess.run(args, check=True) if dry_run: logger.info(f'Would have pruned ‘{snap}’') else: logger.info(f'Pruned ‘{snap}’') def rename(snapshots, destroy=False, set_is_auto=False): args = ['zfs', 'get', '-H', '-p', '-o', 'name,value', 'creation', *snapshots] _log_cmd(*args) renamed_to = set() with subprocess.Popen(args, stdout=subprocess.PIPE) as proc: text_stdout = io.TextIOWrapper(proc.stdout) reader = csv.DictReader(text_stdout, fieldnames=['name', 'timestamp'], delimiter='\t', quoting=csv.QUOTE_NONE) Row = namedtuple('Row', reader.fieldnames) for row in [Row(**data) for data in reader]: creation = datetime.fromtimestamp(int(row.timestamp), timezone.utc) base_name, _, _ = row.name.rpartition('@') new_name = _snap_name(base_name, time=creation) if new_name == row.name: logger.debug(f'Not renaming ‘{row.name}’ since name is already correct') continue if new_name in renamed_to: if destroy: logger.warning(f'Destroying ‘{row.name}’ since ‘{new_name}’ was already renamed to') args = ['zfs', 'destroy', row.name] _log_cmd(*args) subprocess.run(args, check=True) else: logger.info(f'Skipping ‘{row.name}’ since ‘{new_name}’ was already renamed to') continue logger.info(f'Renaming ‘{row.name}’ to ‘{new_name}’') args = ['zfs', 'rename', row.name, new_name] _log_cmd(*args) subprocess.run(args, check=True) renamed_to.add(new_name) if set_is_auto: logger.info(f'Setting is-auto-snapshot on ‘{new_name}’') args = ['zfs', 'set', 'li.yggdrasil:is-auto-snapshot=true', new_name] _log_cmd(*args) subprocess.run(args, check=True) def autosnap(): items = _get_items() all_snap_names = set() async def do_snapshot(*snap_items, recursive=False): nonlocal items, all_snap_names snap_names = {_snap_name(item) for item in snap_items if items[item]} if recursive: for snap_item in snap_items: all_snap_names |= {_snap_name(item) for item in items if item.startswith(snap_item)} else: all_snap_names |= snap_names args = ['zfs', 'snapshot', '-o', 'li.yggdrasil:is-auto-snapshot=true'] if recursive: args += ['-r'] args += snap_names _log_cmd(*args) subprocess.run(args, check=True) pool_items = defaultdict(set) for item in items: pool, _, _ = item.partition('/') pool_items[pool].add(item) tasks = [] for snap_items in pool_items.values(): tasks.append(do_snapshot(*snap_items)) if not tasks: logger.warning('No snapshots to create') else: async def run_tasks(): await asyncio.gather(*tasks) asyncio.run(run_tasks()) for snap in all_snap_names: logger.info(f'Created ‘{snap}’') if all_snap_names: rename(snapshots=all_snap_names) def main(): global logger logger = logging.getLogger(__name__) console_handler = logging.StreamHandler() console_handler.setFormatter( logging.Formatter('[%(levelname)s](%(name)s): %(message)s') ) if sys.stderr.isatty(): console_handler.setFormatter( logging.Formatter('%(asctime)s [%(levelname)s](%(name)s): %(message)s') ) logger.addHandler(console_handler) # log uncaught exceptions def log_exceptions(type, value, tb): global logger logger.error(value) sys.__excepthook__(type, value, tb) # calls default excepthook sys.excepthook = log_exceptions parser = argparse.ArgumentParser(prog='zfssnap') parser.add_argument('--verbose', '-v', action='count', default=0) subparsers = parser.add_subparsers() parser.set_defaults(cmd=autosnap) rename_parser = subparsers.add_parser('rename') rename_parser.add_argument('snapshots', nargs='+') rename_parser.add_argument('--destroy', action='store_true', default=False) rename_parser.add_argument('--set-is-auto', action='store_true', default=False) rename_parser.set_defaults(cmd=rename) prune_parser = subparsers.add_parser('prune') prune_parser.add_argument('--config', '-c', dest='config_files', nargs='*', default=list()) prune_parser.add_argument('--dry-run', '-n', action='store_true', default=False) prune_parser.add_argument('--keep-newest', action='store_true', default=False) prune_parser.add_argument('--no-exec', dest='do_exec', action='store_false', default=True) prune_parser.set_defaults(cmd=prune) args = parser.parse_args() if args.verbose <= 0: logger.setLevel(logging.WARNING) elif args.verbose <= 1: logger.setLevel(logging.INFO) else: logger.setLevel(logging.DEBUG) cmdArgs = {} for copy in {'snapshots', 'dry_run', 'destroy', 'keep_newest', 'set_is_auto', 'do_exec'}: if copy in vars(args): cmdArgs[copy] = vars(args)[copy] if 'config_files' in vars(args): def convert_timedelta(secs_str): secs=pytimeparse.parse(secs_str) if secs is None: raise ValueError(f'Could not parse timedelta expression ‘{secs_str}’') return timedelta(seconds=secs) config = configparser.ConfigParser(converters={ 'timedelta': convert_timedelta, 'timezone': gettz }) search_files = args.config_files if args.config_files else [*BaseDirectory.load_config_paths('zfssnap.ini')] read_files = config.read(search_files) def format_config_files(files): if not files: return 'no files' return ', '.join(map(lambda file: f'‘{file}’', files)) if not read_files: raise Exception(f'Found no config files. Tried: {format_config_files(search_files)}') logger.debug(f'Read following config files: {format_config_files(read_files)}') cmdArgs['config'] = config args.cmd(**cmdArgs) if __name__ == '__main__': sys.exit(main())