summaryrefslogtreecommitdiff
path: root/modules/borgsnap
diff options
context:
space:
mode:
Diffstat (limited to 'modules/borgsnap')
-rw-r--r--modules/borgsnap/borgsnap/borgsnap/__main__.py395
-rw-r--r--modules/borgsnap/borgsnap/setup.py10
-rw-r--r--modules/borgsnap/default.nix106
3 files changed, 511 insertions, 0 deletions
diff --git a/modules/borgsnap/borgsnap/borgsnap/__main__.py b/modules/borgsnap/borgsnap/borgsnap/__main__.py
new file mode 100644
index 00000000..4b50cf80
--- /dev/null
+++ b/modules/borgsnap/borgsnap/borgsnap/__main__.py
@@ -0,0 +1,395 @@
1import argparse
2import os, sys, signal, io
3from pyprctl import CapState, Cap, cap_ambient_raise, cap_ambient_is_set, set_keepcaps
4from pwd import getpwnam
5
6from datetime import datetime, timezone
7from dateutil.parser import isoparse
8
9import unshare
10from tempfile import TemporaryDirectory
11
12import logging
13
14import json
15import subprocess
16import csv
17from collections import namedtuple
18from distutils.util import strtobool
19
20import pathlib
21from pathlib import Path
22
23from atomicwrites import atomic_write
24
25from traceback import format_exc
26
27from multiprocessing import Process, Manager
28from contextlib import closing
29
30from enum import Enum, auto
31
32import select
33import time
34import math
35
36
37PROP_DO_BORGSNAP = 'li.yggdrasil:borgsnap'
38
39
40class DoValue(Enum):
41 BORGSNAP_DO = auto()
42 BORGSNAP_KEEP = auto()
43 BORGSNAP_DONT = auto()
44
45 @classmethod
46 def from_prop(cls, v: str):
47 if v.lower() == 'keep':
48 return cls.BORGSNAP_KEEP
49
50 return cls.BORGSNAP_DO if not v or bool(strtobool(v)) else cls.BORGSNAP_DONT
51
52 @classmethod
53 def merge(cls, v1, v2):
54 match (v1, v2):
55 case (cls.BORGSNAP_DONT, _):
56 return cls.BORGSNAP_DONT
57 case (_, cls.BORGSNAP_DONT):
58 return cls.BORGSNAP_DONT
59 case (cls.BORGSNAP_KEEP, _):
60 return cls.BORGSNAP_KEEP
61 case (_, cls.BORGSNAP_KEEP):
62 return cls.BORGSNAP_KEEP
63 case other:
64 return cls.BORGSNAP_DO
65
66 def returncode(self):
67 match self:
68 case self.__class__.BORGSNAP_DO:
69 return 126
70 case self.__class__.BORGSNAP_KEEP:
71 return 125
72 case self.__class__.BORGSNAP_DONT:
73 return 124
74
75borg_pwd = getpwnam('borg')
76
77def as_borg(caps=set()):
78 global logger
79
80 try:
81 if caps:
82 c_state = CapState.get_current()
83 c_state.permitted.add(*caps)
84 c_state.set_current()
85
86 logger.debug("before setgid/setuid: cap_permitted=%s", CapState.get_current().permitted)
87
88 set_keepcaps(True)
89
90 os.setgid(borg_pwd.pw_gid)
91 os.setuid(borg_pwd.pw_uid)
92
93 if caps:
94 logger.debug("after setgid/setuid: cap_permitted=%s", CapState.get_current().permitted)
95
96 c_state = CapState.get_current()
97 c_state.permitted = caps.copy()
98 c_state.inheritable.add(*caps)
99 c_state.set_current()
100
101 logger.debug("cap_permitted=%s", CapState.get_current().permitted)
102 logger.debug("cap_inheritable=%s", CapState.get_current().inheritable)
103
104 for cap in caps:
105 cap_ambient_raise(cap)
106 logger.debug("cap_ambient[%s]=%s", cap, cap_ambient_is_set(cap))
107 except Exception:
108 logger.error(format_exc())
109 raise
110
111
112def _archive_name(snapshot, target, archive_prefix):
113 _, _, ts = snapshot.rpartition('@')
114 creation_time = isoparse(ts).astimezone(timezone.utc)
115 archive_name = _archive_basename(snapshot, archive_prefix)
116 return f'{target}::{archive_name}-{creation_time.strftime("%Y-%m-%dT%H:%M:%S")}'
117
118def _archive_basename(snapshot, archive_prefix):
119 base_name, _, _ = snapshot.rpartition('@')
120 return archive_prefix + base_name.replace('-', '--').replace('/', '-')
121
122def check(*, snapshot, target, archive_prefix, cache_file):
123 global logger
124
125 archives = None
126 if cache_file:
127 logger.debug('Trying cache...')
128 try:
129 with open(cache_file, mode='r', encoding='utf-8') as fp:
130 archives = set(json.load(fp))
131 logger.debug('Loaded archive list from cache')
132 except FileNotFoundError:
133 pass
134
135 if not archives:
136 logger.info('Loading archive list from remote...')
137 with subprocess.Popen(['borg', 'list', '--info', '--lock-wait=600', '--json', target], stdout=subprocess.PIPE, preexec_fn=lambda: as_borg()) as proc:
138 archives = set([archive['barchive'] for archive in json.load(proc.stdout)['archives']])
139 if cache_file:
140 logger.debug('Saving archive list to cache...')
141 with atomic_write(cache_file, mode='w', encoding='utf-8', overwrite=True) as fp:
142 json.dump(list(archives), fp)
143
144 # logger.debug(f'archives: {archives}')
145 _, _, archive_name = _archive_name(snapshot, target, archive_prefix).partition('::')
146 if archive_name in archives:
147 logger.info('‘%s’ found', archive_name)
148 return 0
149 else:
150 logger.info('‘%s’ not found', archive_name)
151
152 logger.debug('Checking %s for ‘%s’...', PROP_DO_BORGSNAP, snapshot)
153 intent = DoValue.BORGSNAP_DO
154 p = subprocess.run(['zfs', 'get', '-H', '-p', '-o', 'name,value', PROP_DO_BORGSNAP, snapshot], stdout=subprocess.PIPE, text=True, check=True)
155 reader = csv.DictReader(io.StringIO(p.stdout), fieldnames=['name', 'value'], delimiter='\t', quoting=csv.QUOTE_NONE)
156 Row = namedtuple('Row', reader.fieldnames)
157 for row in [Row(**data) for data in reader]:
158 if not row.value or row.value == '-':
159 continue
160
161 logger.debug('%s=%s (parsed as %s) for ‘%s’...', PROP_DO_BORGSNAP, row.value, DoValue.from_prop(row.value), row.name)
162 intent = DoValue.merge(intent, DoValue.from_prop(row.value))
163
164 match intent:
165 case DoValue.BORGSNAP_DONT:
166 logger.warn('%s specifies to ignore, returning accordingly...', PROP_DO_BORGSNAP)
167 case DoValue.BORGSNAP_KEEP:
168 logger.info('%s specifies to ignore but keep, returning accordingly...', PROP_DO_BORGSNAP)
169 case other:
170 pass
171
172 return intent.returncode()
173
174def create(*, snapshot, target, archive_prefix, dry_run):
175 global logger
176
177 basename = _archive_basename(snapshot, archive_prefix)
178
179 def do_create(tmpdir_q):
180 global logger
181 nonlocal basename, snapshot, target, archive_prefix, dry_run
182
183 tmpdir = tmpdir_q.get()
184
185 unshare.unshare(unshare.CLONE_NEWNS)
186 subprocess.run(['mount', '--make-rprivate', '/'], check=True)
187 chroot = pathlib.Path(tmpdir) / 'chroot'
188 upper = pathlib.Path(tmpdir) / 'upper'
189 work = pathlib.Path(tmpdir) / 'work'
190 for path in [chroot,upper,work]:
191 path.mkdir()
192 subprocess.run(['mount', '-t', 'overlay', 'overlay', '-o', f'lowerdir=/,upperdir={upper},workdir={work}', chroot], check=True)
193 bindMounts = ['nix', 'run', 'run/secrets.d', 'run/wrappers', 'proc', 'dev', 'sys', pathlib.Path(os.path.expanduser('~')).relative_to('/')]
194 if borg_base_dir := os.getenv('BORG_BASE_DIR'):
195 bindMounts.append(pathlib.Path(borg_base_dir).relative_to('/'))
196 if ssh_auth_sock := os.getenv('SSH_AUTH_SOCK'):
197 bindMounts.append(pathlib.Path(ssh_auth_sock).parent.relative_to('/'))
198 for bindMount in bindMounts:
199 (chroot / bindMount).mkdir(parents=True,exist_ok=True)
200 subprocess.run(['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], check=True)
201
202 os.chroot(chroot)
203 os.chdir('/')
204 dir = pathlib.Path('/borg')
205 dir.mkdir(parents=True,exist_ok=True,mode=0o0750)
206 os.chown(dir, borg_pwd.pw_uid, borg_pwd.pw_gid)
207
208 base_name, _, _ = snapshot.rpartition('@')
209 type_val = subprocess.run(['zfs', 'get', '-H', '-p', '-o', 'value', 'type', base_name], stdout=subprocess.PIPE, text=True, check=True).stdout.strip()
210 match type_val:
211 case 'filesystem':
212 subprocess.run(['mount', '-t', 'zfs', '-o', 'ro', snapshot, dir], check=True)
213 case 'volume':
214 snapdev_val = subprocess.run(['zfs', 'get', '-H', '-p', '-o', 'value', 'snapdev', base_name], stdout=subprocess.PIPE, text=True, check=True).stdout.strip()
215 try:
216 if snapdev_val == 'hidden':
217 subprocess.run(['zfs', 'set', 'snapdev=visible', base_name], check=True)
218 subprocess.run(['mount', '-t', 'auto', '-o', 'ro', Path('/dev/zvol') / snapshot, dir], check=True)
219 finally:
220 if snapdev_val == 'hidden':
221 subprocess.run(['zfs', 'inherit', 'snapdev', base_name], check=True)
222 case other:
223 raise ValueError(f'‘{base_name}’ is of type ‘{type_val}’')
224
225 env = os.environ.copy()
226 create_args = ['borg',
227 'create',
228 '--lock-wait=600',
229 '--one-file-system',
230 '--exclude-caches',
231 '--keep-exclude-tags',
232 '--compression=auto,zstd,10',
233 '--chunker-params=10,23,16,4095',
234 '--files-cache=ctime,size',
235 '--show-rc',
236 '--upload-buffer=100',
237 '--upload-ratelimit=20480',
238 '--progress',
239 '--list',
240 '--filter=AMEi-x?',
241 '--stats' if not dry_run else '--dry-run',
242 ]
243 _, _, ts = snapshot.rpartition('@')
244 creation_time = isoparse(ts).astimezone(timezone.utc)
245 create_args += [f'--timestamp={creation_time.strftime("%Y-%m-%dT%H:%M:%S")}']
246 env['BORG_FILES_CACHE_SUFFIX'] = basename
247 archive_name = _archive_name(snapshot, target, archive_prefix)
248 target_host, _, target_path = target.rpartition(':')
249 *parents_init, _ = list(Path(target_path).parents)
250 backup_patterns = [*(map(lambda p: Path('.backup') / f'{target_host}:{p}', [Path(target_path), *parents_init])), Path('.backup') / target_host, Path('.backup')]
251 for pattern_file in backup_patterns:
252 if (dir / pattern_file).is_file():
253 logger.debug('Found backup patterns at ‘%s’', dir / pattern_file)
254 create_args += [f'--patterns-from={pattern_file}', archive_name]
255 break
256 elif (dir / pattern_file).exists():
257 logger.warn('‘%s’ exists but is no file', dir / pattern_file)
258 else:
259 logger.debug('No backup patterns exist, checked %s', list(map(lambda pattern_file: str(dir / pattern_file), backup_patterns)))
260 create_args += [archive_name, '.']
261 logger.debug('%s', {'create_args': create_args, 'cwd': dir, 'env': env})
262
263 with subprocess.Popen(create_args, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env, preexec_fn=lambda: as_borg(caps={Cap.DAC_READ_SEARCH}), cwd=dir, text=True) as proc:
264 proc_logger = logger.getChild('borg')
265 stdout_logger = proc_logger.getChild('stdout')
266 stderr_logger = proc_logger.getChild('stderr')
267
268 poll = select.poll()
269 poll.register(proc.stdout, select.POLLIN | select.POLLHUP)
270 poll.register(proc.stderr, select.POLLIN | select.POLLHUP)
271 pollc = 2
272 events = poll.poll()
273 while pollc > 0 and len(events) > 0:
274 for rfd, event in events:
275 if event & select.POLLIN:
276 if rfd == proc.stdout.fileno():
277 line = proc.stdout.readline()
278 if len(line) > 0:
279 stdout_logger.info(line[:-1])
280 if rfd == proc.stderr.fileno():
281 line = proc.stderr.readline()
282 if len(line) > 0:
283 stderr_logger.info(line[:-1])
284 if event & select.POLLHUP:
285 poll.unregister(rfd)
286 pollc -= 1
287
288 if pollc > 0:
289 events = poll.poll()
290
291 for handler in proc_logger.handlers:
292 handler.flush()
293
294 ret = proc.wait()
295 if ret != 0:
296 raise Exception(f'borg subprocess exited with returncode {ret}')
297
298 with Manager() as manager:
299 tmpdir_q = manager.Queue(1)
300 with closing(Process(target=do_create, args=(tmpdir_q,), name='do_create')) as p:
301 p.start()
302
303 with TemporaryDirectory(prefix=f'borg-mount_{basename}_', dir=os.getenv('RUNTIME_DIRECTORY')) as tmpdir:
304 tmpdir_q.put(tmpdir)
305 p.join()
306 if p.exitcode == 0 and dry_run:
307 return 125
308 return p.exitcode
309
310def sigterm(signum, frame):
311 raise SystemExit(128 + signum)
312
313def main():
314 signal.signal(signal.SIGTERM, sigterm)
315
316 global logger
317 logger = logging.getLogger(__name__)
318 console_handler = logging.StreamHandler()
319 console_handler.setFormatter( logging.Formatter('[%(levelname)s](%(name)s): %(message)s') )
320 if sys.stderr.isatty():
321 console_handler.setFormatter( logging.Formatter('%(asctime)s [%(levelname)s](%(name)s): %(message)s') )
322
323 burst_max = 10000
324 burst = burst_max
325 last_use = None
326 inv_rate = 1e6
327 def consume_filter(record):
328 nonlocal burst, burst_max, inv_rate, last_use
329
330 delay = None
331 while True:
332 now = time.monotonic_ns()
333 burst = min(burst_max, burst + math.floor((now - last_use) / inv_rate)) if last_use else burst_max
334 last_use = now
335
336 if burst > 0:
337 burst -= 1
338 if delay:
339 delay = now - delay
340
341 return True
342
343 if delay is None:
344 delay = now
345 time.sleep(inv_rate / 1e9)
346 console_handler.addFilter(consume_filter)
347
348 logger.addHandler(console_handler)
349
350 # log uncaught exceptions
351 def log_exceptions(type, value, tb):
352 global logger
353
354 logger.error(value)
355 sys.__excepthook__(type, value, tb) # calls default excepthook
356
357 sys.excepthook = log_exceptions
358
359 parser = argparse.ArgumentParser(prog='borgsnap')
360 parser.add_argument('--verbosity', dest='log_level', action='append')
361 parser.add_argument('--verbose', '-v', dest='log_level', action='append_const', const=-1)
362 parser.add_argument('--quiet', '-q', dest='log_level', action='append_const', const=1)
363 parser.add_argument('--target', metavar='REPO', default='yggdrasil.borgbase:repo')
364 parser.add_argument('--archive-prefix', metavar='REPO', default='yggdrasil.vidhar.')
365 subparsers = parser.add_subparsers()
366 subparsers.required = True
367 parser.set_defaults(cmd=None)
368 check_parser = subparsers.add_parser('check')
369 check_parser.add_argument('--cache-file', type=lambda p: Path(p).absolute(), default=None)
370 check_parser.add_argument('snapshot')
371 check_parser.set_defaults(cmd=check)
372 create_parser = subparsers.add_parser('create')
373 create_parser.add_argument('--dry-run', '-n', action='store_true', default=False)
374 create_parser.add_argument('snapshot')
375 create_parser.set_defaults(cmd=create)
376 args = parser.parse_args()
377
378 LOG_LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL]
379 DEFAULT_LOG_LEVEL = logging.ERROR
380 log_level = LOG_LEVELS.index(DEFAULT_LOG_LEVEL)
381
382 for adjustment in args.log_level or ():
383 log_level = min(len(LOG_LEVELS) - 1, max(log_level + adjustment, 0))
384 logger.setLevel(LOG_LEVELS[log_level])
385
386 cmdArgs = {}
387 for copy in {'target', 'archive_prefix', 'snapshot', 'cache_file', 'dry_run'}:
388 if copy in vars(args):
389 cmdArgs[copy] = vars(args)[copy]
390
391 return args.cmd(**cmdArgs)
392
393
394if __name__ == '__main__':
395 sys.exit(main())
diff --git a/modules/borgsnap/borgsnap/setup.py b/modules/borgsnap/borgsnap/setup.py
new file mode 100644
index 00000000..76356bfc
--- /dev/null
+++ b/modules/borgsnap/borgsnap/setup.py
@@ -0,0 +1,10 @@
1from setuptools import setup
2
3setup(name='borgsnap',
4 packages=['borgsnap'],
5 entry_points={
6 'console_scripts': [
7 'borgsnap=borgsnap.__main__:main',
8 ],
9 }
10)
diff --git a/modules/borgsnap/default.nix b/modules/borgsnap/default.nix
new file mode 100644
index 00000000..f4c0eec4
--- /dev/null
+++ b/modules/borgsnap/default.nix
@@ -0,0 +1,106 @@
1{ config, pkgs, lib, flakeInputs, hostName, ... }:
2
3with lib;
4
5let
6 borgsnap = flakeInputs.mach-nix.lib.${config.nixpkgs.system}.buildPythonPackage rec {
7 pname = "borgsnap";
8 src = ./borgsnap;
9 version = "0.0.0";
10 ignoreDataOutdated = true;
11
12 requirements = ''
13 atomicwrites
14 pyprctl
15 python-unshare
16 python-dateutil
17 '';
18 postInstall = ''
19 wrapProgram $out/bin/borgsnap \
20 --prefix PATH : ${makeBinPath (with pkgs; [config.boot.zfs.package util-linux borgbackup])}:${config.security.wrapperDir}
21 '';
22
23 providers.python-unshare = "nixpkgs";
24 overridesPre = [
25 (self: super: { python-unshare = super.python-unshare.overrideAttrs (oldAttrs: { name = "python-unshare-0.2.1"; version = "0.2.1"; }); })
26 ];
27
28 _.tomli.buildInputs.add = with pkgs."python3Packages"; [ flit-core ];
29 };
30
31 cfg = config.services.borgsnap;
32in {
33 options = {
34 services.borgsnap = {
35 enable = mkEnableOption "borgsnap service";
36
37 target = mkOption {
38 type = types.str;
39 };
40
41 archive-prefix = mkOption {
42 type = types.str;
43 default = "yggdrasil.${hostName}.";
44 };
45
46 extraConfig = mkOption {
47 type = with types; attrsOf str;
48 default = {
49 halfweekly = "8";
50 monthly = "-1";
51 };
52 };
53
54 verbosity = mkOption {
55 type = types.int;
56 default = config.services.zfssnap.verbosity;
57 };
58
59 sshConfig = mkOption {
60 type = with types; nullOr str;
61 default = null;
62 };
63
64 keyfile = mkOption {
65 type = with types; nullOr str;
66 default = null;
67 };
68
69 extraCreateArgs = mkOption {
70 type = with types; listOf str;
71 default = [];
72 };
73 extraCheckArgs = mkOption {
74 type = with types; listOf str;
75 default = [];
76 };
77 };
78 };
79
80 config = mkIf cfg.enable {
81 warnings = mkIf (!config.services.zfssnap.enable) [
82 "borgsnap will do nothing if zfssnap is not enabled"
83 ];
84
85 services.zfssnap.config.exec = {
86 check = "${borgsnap}/bin/borgsnap --verbosity=${toString cfg.verbosity} --target ${escapeShellArg cfg.target} --archive-prefix ${escapeShellArg cfg.archive-prefix} check --cache-file /run/zfssnap-prune/archives-cache.json ${escapeShellArgs cfg.extraCheckArgs}";
87 cmd = "${borgsnap}/bin/borgsnap --verbosity=${toString cfg.verbosity} --target ${escapeShellArg cfg.target} --archive-prefix ${escapeShellArg cfg.archive-prefix} create ${escapeShellArgs cfg.extraCreateArgs}";
88 } // cfg.extraConfig;
89
90 systemd.services."zfssnap-prune" = {
91 serviceConfig = {
92 Environment = [
93 "BORG_BASE_DIR=/var/lib/borg"
94 "BORG_CONFIG_DIR=/var/lib/borg/config"
95 "BORG_CACHE_DIR=/var/lib/borg/cache"
96 "BORG_SECURITY_DIR=/var/lib/borg/security"
97 "BORG_KEYS_DIR=/var/lib/borg/keys"
98 "BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK=yes"
99 "BORG_HOSTNAME_IS_UNIQUE=yes"
100 ] ++ optional (!(isNull cfg.sshConfig)) "BORG_RSH=\"${pkgs.openssh}/bin/ssh -F ${pkgs.writeText "config" cfg.sshConfig}\""
101 ++ optional (!(isNull cfg.keyfile)) "BORG_KEY_FILE=${cfg.keyfile}";
102 RuntimeDirectory = "zfssnap-prune";
103 };
104 };
105 };
106}