summaryrefslogtreecommitdiff
path: root/modules/borgcopy/copy_borg
diff options
context:
space:
mode:
authorGregor Kleen <gkleen@yggdrasil.li>2023-04-10 20:56:16 +0200
committerGregor Kleen <gkleen@yggdrasil.li>2023-04-10 20:56:16 +0200
commitc0e3d0e72d9d636728a5171511e3ce1003203567 (patch)
treec75fbcfe4802c48fd5162792f38475d74d2cad57 /modules/borgcopy/copy_borg
parent24207674f36e900fd2aa51787cb70756413962c2 (diff)
downloadnixos-c0e3d0e72d9d636728a5171511e3ce1003203567.tar
nixos-c0e3d0e72d9d636728a5171511e3ce1003203567.tar.gz
nixos-c0e3d0e72d9d636728a5171511e3ce1003203567.tar.bz2
nixos-c0e3d0e72d9d636728a5171511e3ce1003203567.tar.xz
nixos-c0e3d0e72d9d636728a5171511e3ce1003203567.zip
bump
Diffstat (limited to 'modules/borgcopy/copy_borg')
-rwxr-xr-xmodules/borgcopy/copy_borg/__main__.py555
1 files changed, 555 insertions, 0 deletions
diff --git a/modules/borgcopy/copy_borg/__main__.py b/modules/borgcopy/copy_borg/__main__.py
new file mode 100755
index 00000000..09f7557a
--- /dev/null
+++ b/modules/borgcopy/copy_borg/__main__.py
@@ -0,0 +1,555 @@
1#!@python@/bin/python
2
3import json
4import os
5import subprocess
6import re
7import sys
8import io
9from sys import stderr
10from humanize import naturalsize
11
12from tempfile import TemporaryDirectory
13
14from datetime import (datetime, timedelta)
15from dateutil.tz import (tzlocal, tzutc)
16import dateutil.parser
17import argparse
18
19from tqdm import tqdm
20
21from xdg import xdg_runtime_dir
22import pathlib
23
24import unshare
25from pyprctl import CapState, Cap, cap_ambient_raise, cap_ambient_is_set, set_keepcaps
26from pwd import getpwnam
27
28import logging
29
30import signal
31import time
32import math
33
34from halo import Halo
35
36from collections import deque
37
38import select
39import fcntl
40
41from multiprocessing import Process, Manager
42from contextlib import closing
43
44
45halo_args = {
46 'stream': stderr,
47 'enabled': stderr.isatty(),
48 'spinner': 'arc'
49}
50
51borg_pwd = getpwnam('borg')
52
53def as_borg(caps=set()):
54 global logger
55
56 try:
57 if caps:
58 c_state = CapState.get_current()
59 c_state.permitted.add(*caps)
60 c_state.set_current()
61
62 # logger.debug("before setgid/setuid: cap_permitted=%s", CapState.get_current().permitted)
63
64 set_keepcaps(True)
65
66 os.setgid(borg_pwd.pw_gid)
67 os.setuid(borg_pwd.pw_uid)
68
69 if caps:
70 # logger.debug("after setgid/setuid: cap_permitted=%s", CapState.get_current().permitted)
71
72 c_state = CapState.get_current()
73 c_state.permitted = caps.copy()
74 c_state.inheritable.add(*caps)
75 c_state.set_current()
76
77 # logger.debug("cap_permitted=%s", CapState.get_current().permitted)
78 # logger.debug("cap_inheritable=%s", CapState.get_current().inheritable)
79
80 for cap in caps:
81 cap_ambient_raise(cap)
82 # logger.debug("cap_ambient[%s]=%s", cap, cap_ambient_is_set(cap))
83 except Exception:
84 logger.error(format_exc())
85 raise
86
87def borg_json(*args, **kwargs):
88 global logger
89
90 with subprocess.Popen(*args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, **kwargs) as proc:
91 stdout_buffer = io.BytesIO()
92
93 proc_logger = logger.getChild('borg')
94 stdout_logger = proc_logger.getChild('stdout')
95 stderr_logger = proc_logger.getChild('stderr')
96
97 fcntl.fcntl(proc.stdout.fileno(), fcntl.F_SETFL, fcntl.fcntl(proc.stdout.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK)
98 fcntl.fcntl(proc.stderr.fileno(), fcntl.F_SETFL, fcntl.fcntl(proc.stderr.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK)
99
100 poll = select.poll()
101 poll.register(proc.stdout, select.POLLIN | select.POLLHUP)
102 poll.register(proc.stderr, select.POLLIN | select.POLLHUP)
103 pollc = 2
104 events = poll.poll()
105 stderr_linebuf = bytearray()
106
107 while pollc > 0 and len(events) > 0:
108 for rfd, event in events:
109 if event & select.POLLIN:
110 if rfd == proc.stdout.fileno():
111 try:
112 buf = os.read(proc.stdout.fileno(), 8192)
113 # stdout_logger.debug(buf)
114 stdout_buffer.write(buf)
115 except BlockingIOError:
116 pass
117 if rfd == proc.stderr.fileno():
118 try:
119 stderr_linebuf.extend(os.read(proc.stderr.fileno(), 8192))
120 except BlockingIOError:
121 pass
122
123 while stderr_linebuf:
124 line, sep, stderr_linebuf = stderr_linebuf.partition(b'\n')
125 if not sep:
126 stderr_linebuf = line
127 break
128
129 stderr_logger.info(line.decode())
130 if event == select.POLLHUP:
131 poll.unregister(rfd)
132 pollc -= 1
133
134 if pollc > 0:
135 events = poll.poll()
136
137 for handler in proc_logger.handlers:
138 handler.flush()
139
140 ret = proc.wait()
141 if ret != 0:
142 raise Exception(f'borg subprocess exited with returncode {ret}')
143
144 stdout_buffer.seek(0)
145 return json.load(stdout_buffer)
146
147def read_repo(path):
148 global logger
149
150 with Halo(text=f'Listing {path}', **halo_args) as sp:
151 if not sp.enabled:
152 logger.debug('Listing %s...', path)
153 res = borg_json(['borg', 'list', '--info', '--lock-wait=600', '--json', path], preexec_fn=lambda: as_borg())['archives']
154 if sp.enabled:
155 sp.succeed(f'{len(res)} archives in {path}')
156 else:
157 logger.info('%d archives in ‘%s’', len(res), path)
158 return res
159
160class ToSync:
161 to_sync = deque()
162
163 def __init__(self, source, target):
164 self.source = source
165 self.target = target
166
167 def __iter__(self):
168 return self
169
170 def __next__(self):
171 global logger
172
173 if self.to_sync:
174 return self.to_sync.popleft()
175
176 while True:
177 try:
178 src = read_repo(self.source)
179 dst = read_repo(self.target)
180 except (subprocess.CalledProcessError, json.decoder.JSONDecodeError) as err:
181 logger.error(err)
182 continue
183
184 self.to_sync.extend([entry for entry in src if entry['name'] not in {dst_entry['name'] for dst_entry in dst} and not entry['name'].endswith('.checkpoint')])
185
186 if self.to_sync:
187 return self.to_sync.popleft()
188
189 raise StopIteration
190
191def copy_archive(src_repo_path, dst_repo_path, entry):
192 global logger
193
194 def do_copy(tmpdir_q):
195 global logger
196
197 nonlocal src_repo_path, dst_repo_path, entry
198
199 tmpdir = tmpdir_q.get()
200
201 cache_suffix = None
202 with Halo(text=f'Determine archive parameters', **halo_args) as sp:
203 if not sp.enabled:
204 logger.debug('Determining archive parameters...')
205 match = re.compile('^(.*)-[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(\.(checkpoint|recreate)(\.[0-9]+)?)?').fullmatch(entry['name'])
206 if match:
207 repo_id = borg_json(['borg', 'info', '--info', '--lock-wait=600', '--json', src_repo_path], preexec_fn=lambda: as_borg())['repository']['id']
208
209 if repo_id:
210 cache_suffix = f'{repo_id}_{match.group(1)}'
211 if sp.enabled:
212 sp.succeed(f'Will process {entry["name"]} ({dateutil.parser.isoparse(entry["start"])}, cache_suffix={cache_suffix})')
213 else:
214 logger.info('Will process ‘%s’ (%s, cache_suffix=%s)', entry['name'], dateutil.parser.isoparse(entry['start']), cache_suffix)
215
216 logger.debug('Setting up environment...')
217 unshare.unshare(unshare.CLONE_NEWNS)
218 subprocess.run(['mount', '--make-rprivate', '/'], check=True)
219 chroot = pathlib.Path(tmpdir) / 'chroot'
220 upper = pathlib.Path(tmpdir) / 'upper'
221 work = pathlib.Path(tmpdir) / 'work'
222 for path in [chroot,upper,work]:
223 path.mkdir()
224 subprocess.run(['mount', '-t', 'overlay', 'overlay', '-o', f'lowerdir=/,upperdir={upper},workdir={work}', chroot], check=True)
225 bindMounts = ['nix', 'run', 'run/secrets.d', 'run/wrappers', 'proc', 'dev', 'sys', pathlib.Path(os.path.expanduser('~')).relative_to('/')]
226 if os.environ.get('BORG_BASE_DIR'):
227 bindMounts.append(pathlib.Path(os.environ['BORG_BASE_DIR']).relative_to('/'))
228 if not ":" in src_repo_path:
229 bindMounts.append(pathlib.Path(src_repo_path).relative_to('/'))
230 if 'SSH_AUTH_SOCK' in os.environ:
231 bindMounts.append(pathlib.Path(os.environ['SSH_AUTH_SOCK']).parent.relative_to('/'))
232 for bindMount in bindMounts:
233 (chroot / bindMount).mkdir(parents=True,exist_ok=True)
234 subprocess.run(['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], check=True)
235 os.chroot(chroot)
236 os.chdir('/')
237 try:
238 os.unlink('/etc/fuse.conf')
239 except FileNotFoundError:
240 pass
241 pathlib.Path('/etc/fuse.conf').parent.mkdir(parents=True,exist_ok=True)
242 with open('/etc/fuse.conf', 'w') as fuse_conf:
243 fuse_conf.write('user_allow_other\nmount_max = 1000\n')
244 dir = pathlib.Path('/borg')
245 dir.mkdir(parents=True,exist_ok=True,mode=0o0750)
246 os.chown(dir, borg_pwd.pw_uid, borg_pwd.pw_gid)
247
248 total_size = None
249 total_files = None
250 if stderr.isatty():
251 with Halo(text=f'Determine size', **halo_args) as sp:
252 stats = borg_json(['borg', 'info', '--info', '--json', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}'], preexec_fn=lambda: as_borg())['archives'][0]['stats']
253 total_size = stats['original_size']
254 total_files = stats['nfiles']
255 if sp.enabled:
256 sp.succeed(f'{total_files} files, {naturalsize(total_size, binary=True)}')
257 else:
258 logger.info('%d files, %s', total_files, naturalsize(total_size, binary=True))
259 with subprocess.Popen(['borg', 'mount', '-o', 'allow_other,ignore_permissions', '--foreground', '--progress', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}', dir], preexec_fn=lambda: as_borg()) as mount_proc:
260 with Halo(text='Waiting for mount', **halo_args) as sp:
261 if not sp.enabled:
262 logger.debug('Waiting for mount...')
263 wait_start = datetime.now()
264 while True:
265 if os.path.ismount(dir):
266 break
267 elif datetime.now() - wait_start > timedelta(minutes=15):
268 ret.check_returncode()
269 time.sleep(0.1)
270 if sp.enabled:
271 sp.succeed('Mounted')
272 else:
273 logger.info('Mounted %s', f'{src_repo_path}::{entry["name"]}')
274
275 while True:
276 with tqdm(total=total_size, unit_scale=True, unit_divisor=1024, unit='B', smoothing=0.01, disable=None, dynamic_ncols=True, maxinterval=0.5, miniters=1) as progress:
277 seen = 0
278 env = os.environ.copy()
279 create_args = ['borg',
280 'create',
281 '--lock-wait=600',
282 '--one-file-system',
283 '--compression=auto,zstd,10',
284 '--chunker-params=10,23,16,4095',
285 '--files-cache=ctime,size',
286 '--show-rc',
287 '--upload-buffer=100',
288 '--log-json',
289 '--progress',
290 '--list',
291 '--filter=AMEi-x?',
292 '--stats'
293 ]
294 archive_time = datetime.strptime(entry["time"], "%Y-%m-%dT%H:%M:%S.%f").replace(tzinfo=tzlocal()).astimezone(tzutc())
295 create_args += [f'--timestamp={archive_time.strftime("%Y-%m-%dT%H:%M:%S")}']
296 if cache_suffix:
297 env['BORG_FILES_CACHE_SUFFIX'] = cache_suffix
298 else:
299 create_args += ['--files-cache=disabled']
300 create_args += [f'{dst_repo_path}::{entry["name"]}', '.']
301
302 with subprocess.Popen(create_args, stdin=subprocess.DEVNULL, stderr=subprocess.PIPE, stdout=subprocess.PIPE, env=env, preexec_fn=lambda: as_borg(caps={Cap.DAC_READ_SEARCH}), cwd=dir) as proc:
303 last_list = None
304 last_list_time = time.monotonic_ns()
305 logger.info('Creating...')
306
307 proc_logger = logger.getChild('borg')
308 stdout_logger = proc_logger.getChild('stdout')
309 stderr_logger = proc_logger.getChild('stderr')
310
311 fcntl.fcntl(proc.stdout.fileno(), fcntl.F_SETFL, fcntl.fcntl(proc.stdout.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK)
312 fcntl.fcntl(proc.stderr.fileno(), fcntl.F_SETFL, fcntl.fcntl(proc.stderr.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK)
313
314 poll = select.poll()
315 poll.register(proc.stdout, select.POLLIN | select.POLLHUP)
316 poll.register(proc.stderr, select.POLLIN | select.POLLHUP)
317 pollc = 2
318 events = poll.poll()
319 stdout_linebuf = bytearray()
320 stderr_linebuf = bytearray()
321
322 while pollc > 0 and len(events) > 0:
323 # logger.debug('%d events', len(events))
324 for rfd, event in events:
325 # logger.debug('event %s', event)
326 if event & select.POLLIN:
327 if rfd == proc.stdout.fileno():
328 try:
329 # logger.debug('reading stdout...')
330 stdout_linebuf.extend(os.read(proc.stdout.fileno(), 8192))
331 # logger.debug('read stdout, len(stdout_linebuf)=%d', len(stdout_linebuf))
332 except BlockingIOError:
333 pass
334
335 while stdout_linebuf:
336 # logger.debug('stdout line...')
337 line, sep, stdout_linebuf = stdout_linebuf.partition(b'\n')
338 if not sep:
339 stdout_linebuf = line
340 break
341
342 stdout_logger.info(line.decode())
343 # logger.debug('handled stdout lines, %d leftover', len(stdout_linebuf))
344 if rfd == proc.stderr.fileno():
345 try:
346 # logger.debug('reading stderr...')
347 stderr_linebuf.extend(os.read(proc.stderr.fileno(), 8192))
348 # logger.debug('read stderr, len(stderr_linebuf)=%d', len(stderr_linebuf))
349 except BlockingIOError:
350 pass
351
352 while stderr_linebuf:
353 # logger.debug('stderr line...')
354 line, sep, stderr_linebuf = stderr_linebuf.partition(b'\n')
355 if not sep:
356 stderr_linebuf = line
357 break
358
359 try:
360 json_line = json.loads(line)
361 except json.decoder.JSONDecodeError:
362 if progress.disable:
363 stderr_logger.error(line.decode())
364 else:
365 tqdm.write(line.decode())
366 continue
367
368 # logger.debug('stderr line decoded: %s', json_line['type'] if 'type' in json_line else None)
369
370 t = ''
371 if 'time' in json_line and not progress.disable:
372 ts = datetime.fromtimestamp(json_line['time']).replace(tzinfo=tzlocal())
373 t = f'{ts.isoformat(timespec="minutes")} '
374 if json_line['type'] == 'archive_progress' and not progress.disable:
375 now = time.monotonic_ns()
376 if last_list_time is None or now - last_list_time >= 3e9:
377 last_list_time = now
378 if 'path' in json_line and json_line['path']:
379 progress.set_description(f'… {json_line["path"]}', refresh=False)
380 else:
381 progress.set_description(None, refresh=False)
382 elif last_list is not None:
383 progress.set_description(last_list, refresh=False)
384 nfiles=json_line["nfiles"]
385 if total_files is not None:
386 nfiles=f'{json_line["nfiles"]}/{total_files}'
387 progress.set_postfix(compressed=naturalsize(json_line['compressed_size'], binary=True), deduplicated=naturalsize(json_line['deduplicated_size'], binary=True), nfiles=nfiles, refresh=False)
388 progress.update(json_line["original_size"] - seen)
389 seen = json_line["original_size"]
390 elif json_line['type'] == 'archive_progress':
391 now = time.monotonic_ns()
392 if last_list_time is None or now - last_list_time >= 3e9:
393 last_list_time = now
394 if 'path' in json_line and json_line['path']:
395 stderr_logger.debug('… %s (%s)', json_line["path"], naturalsize(json_line["original_size"]))
396 else:
397 stderr_logger.debug('… (%s)', naturalsize(json_line["original_size"]))
398 elif json_line['type'] == 'file_status':
399 # tqdm.write(t + f'{json_line["status"]} {json_line["path"]}')
400 last_list = f'{json_line["status"]} {json_line["path"]}'
401 last_list_time = time.monotonic_ns()
402 progress.set_description(last_list, refresh=False)
403 if progress.disable:
404 stderr_logger.info(last_list)
405 elif (json_line['type'] == 'log_message' or json_line['type'] == 'progress_message' or json_line['type'] == 'progress_percent') and ('message' in json_line or 'msgid' in json_line):
406 if 'message' in json_line:
407 if progress.disable:
408 stderr_logger.info(t + json_line['message'])
409 else:
410 tqdm.write(t + json_line['message'])
411 elif 'msgid' in json_line:
412 if progress.disable:
413 stderr_logger.info(t + json_line['msgid'])
414 else:
415 tqdm.write(t + json_line['msgid'])
416 else:
417 if progress.disable:
418 stderr_logger.info(t + line.decode())
419 else:
420 tqdm.write(t + line.decode())
421 # logger.debug('handled stderr lines, %d leftover', len(stderr_linebuf))
422 if event == select.POLLHUP:
423 poll.unregister(rfd)
424 pollc -= 1
425
426 if pollc > 0:
427 # logger.debug('polling %d fds...', pollc)
428 events = poll.poll()
429 # logger.debug('done polling')
430
431 # logger.debug('borg create closed stdout/stderr')
432 if stdout_linebuf:
433 logger.error('unterminated line leftover in stdout: %s', stdout_linebuf)
434 if stderr_linebuf:
435 logger.error('unterminated line leftover in stdout: %s', stderr_linebuf)
436 progress.set_description(None)
437 ret = proc.wait()
438 # logger.debug('borg create terminated; ret=%d', ret)
439 if ret != 0:
440 dst = None
441 try:
442 dst = read_repo(dst_repo_path)
443 except (subprocess.CalledProcessError, json.decoder.JSONDecodeError) as err:
444 logger.error(err)
445 continue
446 else:
447 if any(map(lambda other: entry['name'] == other['name'], dst)):
448 logger.info('destination exists, terminating')
449 break
450
451 logger.warn('destination does not exist, retrying')
452 continue
453 else:
454 # logger.debug('terminating')
455 break
456 mount_proc.terminate()
457
458 with Manager() as manager:
459 tmpdir_q = manager.Queue(1)
460
461 with closing(Process(target=do_copy, args=(tmpdir_q,), name='do_copy')) as p:
462 p.start()
463
464 with TemporaryDirectory(prefix=f'borg-mount_{entry["name"]}_', dir=os.environ.get('RUNTIME_DIRECTORY')) as tmpdir:
465 tmpdir_q.put(tmpdir)
466 p.join()
467 return p.exitcode
468
469def sigterm(signum, frame):
470 raise SystemExit(128 + signum)
471
472def main():
473 signal.signal(signal.SIGTERM, sigterm)
474
475 global logger
476 logger = logging.getLogger(__name__)
477 console_handler = logging.StreamHandler()
478 console_handler.setFormatter( logging.Formatter('[%(levelname)s](%(name)s): %(message)s') )
479 if sys.stderr.isatty():
480 console_handler.setFormatter( logging.Formatter('%(asctime)s [%(levelname)s](%(name)s): %(message)s') )
481
482 burst_max = 1000
483 burst = burst_max
484 last_use = None
485 inv_rate = 1e7
486 def consume_filter(record):
487 nonlocal burst, burst_max, inv_rate, last_use
488
489 delay = None
490 while True:
491 now = time.monotonic_ns()
492 burst = min(burst_max, burst + math.floor((now - last_use) / inv_rate)) if last_use else burst_max
493 last_use = now
494
495 if burst > 0:
496 burst -= 1
497 if delay:
498 delay = now - delay
499
500 return True
501
502 if delay is None:
503 delay = now
504 time.sleep(inv_rate / 1e9)
505 console_handler.addFilter(consume_filter)
506
507 logging.getLogger().addHandler(console_handler)
508
509 # log uncaught exceptions
510 def log_exceptions(type, value, tb):
511 global logger
512
513 logger.error(value)
514 sys.__excepthook__(type, value, tb) # calls default excepthook
515
516 sys.excepthook = log_exceptions
517
518 parser = argparse.ArgumentParser(prog='copy')
519 parser.add_argument('--verbosity', dest='log_level', action='append', type=int)
520 parser.add_argument('--verbose', '-v', dest='log_level', action='append_const', const=1)
521 parser.add_argument('--quiet', '-q', dest='log_level', action='append_const', const=-1)
522 parser.add_argument('source', metavar='REPO_OR_ARCHIVE')
523 parser.add_argument('target', metavar='REPO_OR_ARCHIVE')
524 args = parser.parse_args()
525
526
527 LOG_LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL]
528 DEFAULT_LOG_LEVEL = logging.ERROR
529 log_level = LOG_LEVELS.index(DEFAULT_LOG_LEVEL)
530
531 for adjustment in args.log_level or ():
532 log_level = min(len(LOG_LEVELS) - 1, max(log_level - adjustment, 0))
533 logger.setLevel(LOG_LEVELS[log_level])
534
535
536 if "::" in args.source:
537 (src_repo_path, _, src_archive) = args.source.partition("::")
538 entry = None
539 for candidate_entry in read_repo(src_repo_path):
540 if entry['name'] != src_archive:
541 continue
542 entry = candidate_entry
543 break
544
545 if entry is None:
546 logger.critical("Did not find archive ‘%s’", src_archive)
547 os.exit(1)
548
549 copy_archive(src_repo_path, args.target, entry)
550 else:
551 for entry in ToSync(args.source, args.target):
552 copy_archive(args.source, args.target, entry)
553
554if __name__ == "__main__":
555 sys.exit(main())