summaryrefslogtreecommitdiff
path: root/hosts/vidhar/borg/copy/copy_borg
diff options
context:
space:
mode:
Diffstat (limited to 'hosts/vidhar/borg/copy/copy_borg')
-rwxr-xr-xhosts/vidhar/borg/copy/copy_borg/__main__.py556
1 files changed, 556 insertions, 0 deletions
diff --git a/hosts/vidhar/borg/copy/copy_borg/__main__.py b/hosts/vidhar/borg/copy/copy_borg/__main__.py
new file mode 100755
index 00000000..5b374d99
--- /dev/null
+++ b/hosts/vidhar/borg/copy/copy_borg/__main__.py
@@ -0,0 +1,556 @@
1#!@python@/bin/python
2
3import json
4import os
5import subprocess
6import re
7import sys
8import io
9from sys import stderr
10from humanize import naturalsize
11
12from tempfile import TemporaryDirectory
13
14from datetime import (datetime, timedelta)
15from dateutil.tz import (tzlocal, tzutc)
16import dateutil.parser
17import argparse
18
19from tqdm import tqdm
20
21from xdg import xdg_runtime_dir
22import pathlib
23
24import unshare
25from pyprctl import CapState, Cap, cap_ambient_raise, cap_ambient_is_set, set_keepcaps
26from pwd import getpwnam
27
28import logging
29
30import signal
31import time
32import math
33
34from halo import Halo
35
36from collections import deque
37
38import select
39import fcntl
40
41from multiprocessing import Process, Manager
42from contextlib import closing
43
44
45halo_args = {
46 'stream': stderr,
47 'enabled': stderr.isatty(),
48 'spinner': 'arc'
49}
50
51borg_pwd = getpwnam('borg')
52
53def as_borg(caps=set()):
54 global logger
55
56 try:
57 if caps:
58 c_state = CapState.get_current()
59 c_state.permitted.add(*caps)
60 c_state.set_current()
61
62 # logger.debug("before setgid/setuid: cap_permitted=%s", CapState.get_current().permitted)
63
64 set_keepcaps(True)
65
66 os.setgid(borg_pwd.pw_gid)
67 os.setuid(borg_pwd.pw_uid)
68
69 if caps:
70 # logger.debug("after setgid/setuid: cap_permitted=%s", CapState.get_current().permitted)
71
72 c_state = CapState.get_current()
73 c_state.permitted = caps.copy()
74 c_state.inheritable.add(*caps)
75 c_state.set_current()
76
77 # logger.debug("cap_permitted=%s", CapState.get_current().permitted)
78 # logger.debug("cap_inheritable=%s", CapState.get_current().inheritable)
79
80 for cap in caps:
81 cap_ambient_raise(cap)
82 # logger.debug("cap_ambient[%s]=%s", cap, cap_ambient_is_set(cap))
83 except Exception:
84 logger.error(format_exc())
85 raise
86
87def borg_json(*args, **kwargs):
88 global logger
89
90 with subprocess.Popen(*args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, **kwargs) as proc:
91 stdout_buffer = io.BytesIO()
92
93 proc_logger = logger.getChild('borg')
94 stdout_logger = proc_logger.getChild('stdout')
95 stderr_logger = proc_logger.getChild('stderr')
96
97 fcntl.fcntl(proc.stdout.fileno(), fcntl.F_SETFL, fcntl.fcntl(proc.stdout.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK)
98 fcntl.fcntl(proc.stderr.fileno(), fcntl.F_SETFL, fcntl.fcntl(proc.stderr.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK)
99
100 poll = select.poll()
101 poll.register(proc.stdout, select.POLLIN | select.POLLHUP)
102 poll.register(proc.stderr, select.POLLIN | select.POLLHUP)
103 pollc = 2
104 events = poll.poll()
105 stderr_linebuf = bytearray()
106
107 while pollc > 0 and len(events) > 0:
108 for rfd, event in events:
109 if event & select.POLLIN:
110 if rfd == proc.stdout.fileno():
111 try:
112 buf = os.read(proc.stdout.fileno(), 8192)
113 # stdout_logger.debug(buf)
114 stdout_buffer.write(buf)
115 except BlockingIOError:
116 pass
117 if rfd == proc.stderr.fileno():
118 try:
119 stderr_linebuf.extend(os.read(proc.stderr.fileno(), 8192))
120 except BlockingIOError:
121 pass
122
123 while stderr_linebuf:
124 line, sep, stderr_linebuf = stderr_linebuf.partition(b'\n')
125 if not sep:
126 stderr_linebuf = line
127 break
128
129 stderr_logger.info(line.decode())
130 if event == select.POLLHUP:
131 poll.unregister(rfd)
132 pollc -= 1
133
134 if pollc > 0:
135 events = poll.poll()
136
137 for handler in proc_logger.handlers:
138 handler.flush()
139
140 ret = proc.wait()
141 if ret != 0:
142 raise Exception(f'borg subprocess exited with returncode {ret}')
143
144 stdout_buffer.seek(0)
145 return json.load(stdout_buffer)
146
147def read_repo(path):
148 global logger
149
150 with Halo(text=f'Listing {path}', **halo_args) as sp:
151 if not sp.enabled:
152 logger.debug('Listing %s...', path)
153 res = borg_json(['borg', 'list', '--info', '--lock-wait=600', '--json', path], preexec_fn=lambda: as_borg())['archives']
154 if sp.enabled:
155 sp.succeed(f'{len(res)} archives in {path}')
156 else:
157 logger.info('%d archives in ‘%s’', len(res), path)
158 return res
159
160class ToSync:
161 to_sync = deque()
162
163 def __init__(self, source, target):
164 self.source = source
165 self.target = target
166
167 def __iter__(self):
168 return self
169
170 def __next__(self):
171 global logger
172
173 if self.to_sync:
174 return self.to_sync.popleft()
175
176 while True:
177 try:
178 src = read_repo(self.source)
179 dst = read_repo(self.target)
180 except (subprocess.CalledProcessError, json.decoder.JSONDecodeError) as err:
181 logger.error(err)
182 continue
183
184 self.to_sync.extend([entry for entry in src if entry['name'] not in {dst_entry['name'] for dst_entry in dst} and not entry['name'].endswith('.checkpoint')])
185
186 if self.to_sync:
187 return self.to_sync.popleft()
188
189 raise StopIteration
190
191def copy_archive(src_repo_path, dst_repo_path, entry):
192 global logger
193
194 def do_copy(tmpdir_q):
195 global logger
196
197 nonlocal src_repo_path, dst_repo_path, entry
198
199 tmpdir = tmpdir_q.get()
200
201 cache_suffix = None
202 with Halo(text=f'Determine archive parameters', **halo_args) as sp:
203 if not sp.enabled:
204 logger.debug('Determining archive parameters...')
205 match = re.compile('^(.*)-[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(\.(checkpoint|recreate)(\.[0-9]+)?)?').fullmatch(entry['name'])
206 if match:
207 repo_id = borg_json(['borg', 'info', '--info', '--lock-wait=600', '--json', src_repo_path], preexec_fn=lambda: as_borg())['repository']['id']
208
209 if repo_id:
210 cache_suffix = f'{repo_id}_{match.group(1)}'
211 if sp.enabled:
212 sp.succeed(f'Will process {entry["name"]} ({dateutil.parser.isoparse(entry["start"])}, cache_suffix={cache_suffix})')
213 else:
214 logger.info('Will process ‘%s’ (%s, cache_suffix=%s)', entry['name'], dateutil.parser.isoparse(entry['start']), cache_suffix)
215
216 logger.debug('Setting up environment...')
217 unshare.unshare(unshare.CLONE_NEWNS)
218 subprocess.run(['mount', '--make-rprivate', '/'], check=True)
219 chroot = pathlib.Path(tmpdir) / 'chroot'
220 upper = pathlib.Path(tmpdir) / 'upper'
221 work = pathlib.Path(tmpdir) / 'work'
222 for path in [chroot,upper,work]:
223 path.mkdir()
224 subprocess.run(['mount', '-t', 'overlay', 'overlay', '-o', f'lowerdir=/,upperdir={upper},workdir={work}', chroot], check=True)
225 bindMounts = ['nix', 'run', 'run/secrets.d', 'run/wrappers', 'proc', 'dev', 'sys', pathlib.Path(os.path.expanduser('~')).relative_to('/')]
226 if os.environ.get('BORG_BASE_DIR'):
227 bindMounts.append(pathlib.Path(os.environ['BORG_BASE_DIR']).relative_to('/'))
228 if not ":" in src_repo_path:
229 bindMounts.append(pathlib.Path(src_repo_path).relative_to('/'))
230 if 'SSH_AUTH_SOCK' in os.environ:
231 bindMounts.append(pathlib.Path(os.environ['SSH_AUTH_SOCK']).parent.relative_to('/'))
232 for bindMount in bindMounts:
233 (chroot / bindMount).mkdir(parents=True,exist_ok=True)
234 subprocess.run(['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], check=True)
235 os.chroot(chroot)
236 os.chdir('/')
237 try:
238 os.unlink('/etc/fuse.conf')
239 except FileNotFoundError:
240 pass
241 pathlib.Path('/etc/fuse.conf').parent.mkdir(parents=True,exist_ok=True)
242 with open('/etc/fuse.conf', 'w') as fuse_conf:
243 fuse_conf.write('user_allow_other\nmount_max = 1000\n')
244 dir = pathlib.Path('/borg')
245 dir.mkdir(parents=True,exist_ok=True,mode=0o0750)
246 os.chown(dir, borg_pwd.pw_uid, borg_pwd.pw_gid)
247
248 total_size = None
249 total_files = None
250 if stderr.isatty():
251 with Halo(text=f'Determine size', **halo_args) as sp:
252 stats = borg_json(['borg', 'info', '--info', '--json', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}'], preexec_fn=lambda: as_borg())['archives'][0]['stats']
253 total_size = stats['original_size']
254 total_files = stats['nfiles']
255 if sp.enabled:
256 sp.succeed(f'{total_files} files, {naturalsize(total_size, binary=True)}')
257 else:
258 logger.info('%d files, %s', total_files, naturalsize(total_size, binary=True))
259 with subprocess.Popen(['borg', 'mount', '-o', 'allow_other,ignore_permissions', '--foreground', '--progress', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}', dir], preexec_fn=lambda: as_borg()) as mount_proc:
260 with Halo(text='Waiting for mount', **halo_args) as sp:
261 if not sp.enabled:
262 logger.debug('Waiting for mount...')
263 wait_start = datetime.now()
264 while True:
265 if os.path.ismount(dir):
266 break
267 elif datetime.now() - wait_start > timedelta(minutes=15):
268 ret.check_returncode()
269 time.sleep(0.1)
270 if sp.enabled:
271 sp.succeed('Mounted')
272 else:
273 logger.info('Mounted %s', f'{src_repo_path}::{entry["name"]}')
274
275 while True:
276 with tqdm(total=total_size, unit_scale=True, unit_divisor=1024, unit='B', smoothing=0.01, disable=None, dynamic_ncols=True, maxinterval=0.5, miniters=1) as progress:
277 seen = 0
278 env = os.environ.copy()
279 create_args = ['borg',
280 'create',
281 '--lock-wait=600',
282 '--one-file-system',
283 '--compression=auto,zstd,10',
284 '--chunker-params=10,23,16,4095',
285 '--files-cache=ctime,size',
286 '--show-rc',
287 '--upload-buffer=100',
288 '--upload-ratelimit=20480',
289 '--log-json',
290 '--progress',
291 '--list',
292 '--filter=AMEi-x?',
293 '--stats'
294 ]
295 archive_time = datetime.strptime(entry["time"], "%Y-%m-%dT%H:%M:%S.%f").replace(tzinfo=tzlocal()).astimezone(tzutc())
296 create_args += [f'--timestamp={archive_time.strftime("%Y-%m-%dT%H:%M:%S")}']
297 if cache_suffix:
298 env['BORG_FILES_CACHE_SUFFIX'] = cache_suffix
299 else:
300 create_args += ['--files-cache=disabled']
301 create_args += [f'{dst_repo_path}::{entry["name"]}', '.']
302
303 with subprocess.Popen(create_args, stdin=subprocess.DEVNULL, stderr=subprocess.PIPE, stdout=subprocess.PIPE, env=env, preexec_fn=lambda: as_borg(caps={Cap.DAC_READ_SEARCH}), cwd=dir) as proc:
304 last_list = None
305 last_list_time = time.monotonic_ns()
306 logger.info('Creating...')
307
308 proc_logger = logger.getChild('borg')
309 stdout_logger = proc_logger.getChild('stdout')
310 stderr_logger = proc_logger.getChild('stderr')
311
312 fcntl.fcntl(proc.stdout.fileno(), fcntl.F_SETFL, fcntl.fcntl(proc.stdout.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK)
313 fcntl.fcntl(proc.stderr.fileno(), fcntl.F_SETFL, fcntl.fcntl(proc.stderr.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK)
314
315 poll = select.poll()
316 poll.register(proc.stdout, select.POLLIN | select.POLLHUP)
317 poll.register(proc.stderr, select.POLLIN | select.POLLHUP)
318 pollc = 2
319 events = poll.poll()
320 stdout_linebuf = bytearray()
321 stderr_linebuf = bytearray()
322
323 while pollc > 0 and len(events) > 0:
324 # logger.debug('%d events', len(events))
325 for rfd, event in events:
326 # logger.debug('event %s', event)
327 if event & select.POLLIN:
328 if rfd == proc.stdout.fileno():
329 try:
330 # logger.debug('reading stdout...')
331 stdout_linebuf.extend(os.read(proc.stdout.fileno(), 8192))
332 # logger.debug('read stdout, len(stdout_linebuf)=%d', len(stdout_linebuf))
333 except BlockingIOError:
334 pass
335
336 while stdout_linebuf:
337 # logger.debug('stdout line...')
338 line, sep, stdout_linebuf = stdout_linebuf.partition(b'\n')
339 if not sep:
340 stdout_linebuf = line
341 break
342
343 stdout_logger.info(line.decode())
344 # logger.debug('handled stdout lines, %d leftover', len(stdout_linebuf))
345 if rfd == proc.stderr.fileno():
346 try:
347 # logger.debug('reading stderr...')
348 stderr_linebuf.extend(os.read(proc.stderr.fileno(), 8192))
349 # logger.debug('read stderr, len(stderr_linebuf)=%d', len(stderr_linebuf))
350 except BlockingIOError:
351 pass
352
353 while stderr_linebuf:
354 # logger.debug('stderr line...')
355 line, sep, stderr_linebuf = stderr_linebuf.partition(b'\n')
356 if not sep:
357 stderr_linebuf = line
358 break
359
360 try:
361 json_line = json.loads(line)
362 except json.decoder.JSONDecodeError:
363 if progress.disable:
364 stderr_logger.error(line.decode())
365 else:
366 tqdm.write(line.decode())
367 continue
368
369 # logger.debug('stderr line decoded: %s', json_line['type'] if 'type' in json_line else None)
370
371 t = ''
372 if 'time' in json_line and not progress.disable:
373 ts = datetime.fromtimestamp(json_line['time']).replace(tzinfo=tzlocal())
374 t = f'{ts.isoformat(timespec="minutes")} '
375 if json_line['type'] == 'archive_progress' and not progress.disable:
376 now = time.monotonic_ns()
377 if last_list_time is None or now - last_list_time >= 3e9:
378 last_list_time = now
379 if 'path' in json_line and json_line['path']:
380 progress.set_description(f'… {json_line["path"]}', refresh=False)
381 else:
382 progress.set_description(None, refresh=False)
383 elif last_list is not None:
384 progress.set_description(last_list, refresh=False)
385 nfiles=json_line["nfiles"]
386 if total_files is not None:
387 nfiles=f'{json_line["nfiles"]}/{total_files}'
388 progress.set_postfix(compressed=naturalsize(json_line['compressed_size'], binary=True), deduplicated=naturalsize(json_line['deduplicated_size'], binary=True), nfiles=nfiles, refresh=False)
389 progress.update(json_line["original_size"] - seen)
390 seen = json_line["original_size"]
391 elif json_line['type'] == 'archive_progress':
392 now = time.monotonic_ns()
393 if last_list_time is None or now - last_list_time >= 3e9:
394 last_list_time = now
395 if 'path' in json_line and json_line['path']:
396 stderr_logger.debug('… %s (%s)', json_line["path"], naturalsize(json_line["original_size"]))
397 else:
398 stderr_logger.debug('… (%s)', naturalsize(json_line["original_size"]))
399 elif json_line['type'] == 'file_status':
400 # tqdm.write(t + f'{json_line["status"]} {json_line["path"]}')
401 last_list = f'{json_line["status"]} {json_line["path"]}'
402 last_list_time = time.monotonic_ns()
403 progress.set_description(last_list, refresh=False)
404 if progress.disable:
405 stderr_logger.info(last_list)
406 elif (json_line['type'] == 'log_message' or json_line['type'] == 'progress_message' or json_line['type'] == 'progress_percent') and ('message' in json_line or 'msgid' in json_line):
407 if 'message' in json_line:
408 if progress.disable:
409 stderr_logger.info(t + json_line['message'])
410 else:
411 tqdm.write(t + json_line['message'])
412 elif 'msgid' in json_line:
413 if progress.disable:
414 stderr_logger.info(t + json_line['msgid'])
415 else:
416 tqdm.write(t + json_line['msgid'])
417 else:
418 if progress.disable:
419 stderr_logger.info(t + line.decode())
420 else:
421 tqdm.write(t + line.decode())
422 # logger.debug('handled stderr lines, %d leftover', len(stderr_linebuf))
423 if event == select.POLLHUP:
424 poll.unregister(rfd)
425 pollc -= 1
426
427 if pollc > 0:
428 # logger.debug('polling %d fds...', pollc)
429 events = poll.poll()
430 # logger.debug('done polling')
431
432 # logger.debug('borg create closed stdout/stderr')
433 if stdout_linebuf:
434 logger.error('unterminated line leftover in stdout: %s', stdout_linebuf)
435 if stderr_linebuf:
436 logger.error('unterminated line leftover in stdout: %s', stderr_linebuf)
437 progress.set_description(None)
438 ret = proc.wait()
439 # logger.debug('borg create terminated; ret=%d', ret)
440 if ret != 0:
441 dst = None
442 try:
443 dst = read_repo(dst_repo_path)
444 except (subprocess.CalledProcessError, json.decoder.JSONDecodeError) as err:
445 logger.error(err)
446 continue
447 else:
448 if any(map(lambda other: entry['name'] == other['name'], dst)):
449 logger.info('destination exists, terminating')
450 break
451
452 logger.warn('destination does not exist, retrying')
453 continue
454 else:
455 # logger.debug('terminating')
456 break
457 mount_proc.terminate()
458
459 with Manager() as manager:
460 tmpdir_q = manager.Queue(1)
461
462 with closing(Process(target=do_copy, args=(tmpdir_q,), name='do_copy')) as p:
463 p.start()
464
465 with TemporaryDirectory(prefix=f'borg-mount_{entry["name"]}_', dir=os.environ.get('RUNTIME_DIRECTORY')) as tmpdir:
466 tmpdir_q.put(tmpdir)
467 p.join()
468 return p.exitcode
469
470def sigterm(signum, frame):
471 raise SystemExit(128 + signum)
472
473def main():
474 signal.signal(signal.SIGTERM, sigterm)
475
476 global logger
477 logger = logging.getLogger(__name__)
478 console_handler = logging.StreamHandler()
479 console_handler.setFormatter( logging.Formatter('[%(levelname)s](%(name)s): %(message)s') )
480 if sys.stderr.isatty():
481 console_handler.setFormatter( logging.Formatter('%(asctime)s [%(levelname)s](%(name)s): %(message)s') )
482
483 burst_max = 1000
484 burst = burst_max
485 last_use = None
486 inv_rate = 1e7
487 def consume_filter(record):
488 nonlocal burst, burst_max, inv_rate, last_use
489
490 delay = None
491 while True:
492 now = time.monotonic_ns()
493 burst = min(burst_max, burst + math.floor((now - last_use) / inv_rate)) if last_use else burst_max
494 last_use = now
495
496 if burst > 0:
497 burst -= 1
498 if delay:
499 delay = now - delay
500
501 return True
502
503 if delay is None:
504 delay = now
505 time.sleep(inv_rate / 1e9)
506 console_handler.addFilter(consume_filter)
507
508 logging.getLogger().addHandler(console_handler)
509
510 # log uncaught exceptions
511 def log_exceptions(type, value, tb):
512 global logger
513
514 logger.error(value)
515 sys.__excepthook__(type, value, tb) # calls default excepthook
516
517 sys.excepthook = log_exceptions
518
519 parser = argparse.ArgumentParser(prog='copy')
520 parser.add_argument('--verbosity', dest='log_level', action='append', type=int)
521 parser.add_argument('--verbose', '-v', dest='log_level', action='append_const', const=1)
522 parser.add_argument('--quiet', '-q', dest='log_level', action='append_const', const=-1)
523 parser.add_argument('source', metavar='REPO_OR_ARCHIVE')
524 parser.add_argument('target', metavar='REPO_OR_ARCHIVE')
525 args = parser.parse_args()
526
527
528 LOG_LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL]
529 DEFAULT_LOG_LEVEL = logging.ERROR
530 log_level = LOG_LEVELS.index(DEFAULT_LOG_LEVEL)
531
532 for adjustment in args.log_level or ():
533 log_level = min(len(LOG_LEVELS) - 1, max(log_level - adjustment, 0))
534 logger.setLevel(LOG_LEVELS[log_level])
535
536
537 if "::" in args.source:
538 (src_repo_path, _, src_archive) = args.source.partition("::")
539 entry = None
540 for candidate_entry in read_repo(src_repo_path):
541 if entry['name'] != src_archive:
542 continue
543 entry = candidate_entry
544 break
545
546 if entry is None:
547 logger.critical("Did not find archive ‘%s’", src_archive)
548 os.exit(1)
549
550 copy_archive(src_repo_path, args.target, entry)
551 else:
552 for entry in ToSync(args.source, args.target):
553 copy_archive(args.source, args.target, entry)
554
555if __name__ == "__main__":
556 sys.exit(main())