diff options
Diffstat (limited to 'hosts')
| -rwxr-xr-x | hosts/vidhar/borg/copy.py | 291 | ||||
| -rwxr-xr-x | hosts/vidhar/borg/copy/copy_borg/__main__.py | 556 | ||||
| -rw-r--r-- | hosts/vidhar/borg/copy/setup.py | 10 | ||||
| -rw-r--r-- | hosts/vidhar/borg/default.nix | 54 | 
4 files changed, 592 insertions, 319 deletions
| diff --git a/hosts/vidhar/borg/copy.py b/hosts/vidhar/borg/copy.py deleted file mode 100755 index c648c4f6..00000000 --- a/hosts/vidhar/borg/copy.py +++ /dev/null | |||
| @@ -1,291 +0,0 @@ | |||
| 1 | #!@python@/bin/python | ||
| 2 | |||
| 3 | import json | ||
| 4 | import os | ||
| 5 | import subprocess | ||
| 6 | import re | ||
| 7 | import sys | ||
| 8 | from sys import stderr | ||
| 9 | from humanize import naturalsize | ||
| 10 | |||
| 11 | from tempfile import TemporaryDirectory | ||
| 12 | |||
| 13 | from datetime import (datetime, timedelta) | ||
| 14 | from dateutil.tz import (tzlocal, tzutc) | ||
| 15 | import dateutil.parser | ||
| 16 | import argparse | ||
| 17 | |||
| 18 | from tqdm import tqdm | ||
| 19 | |||
| 20 | from xdg import xdg_runtime_dir | ||
| 21 | import pathlib | ||
| 22 | |||
| 23 | import unshare | ||
| 24 | from pyprctl import cap_permitted, cap_inheritable, cap_effective, cap_ambient, Cap | ||
| 25 | from pwd import getpwnam | ||
| 26 | |||
| 27 | import signal | ||
| 28 | from time import sleep | ||
| 29 | |||
| 30 | from halo import Halo | ||
| 31 | |||
| 32 | from collections import deque | ||
| 33 | |||
| 34 | |||
| 35 | parser = argparse.ArgumentParser() | ||
| 36 | parser.add_argument('source', metavar='REPO_OR_ARCHIVE') | ||
| 37 | parser.add_argument('target', metavar='REPO_OR_ARCHIVE') | ||
| 38 | args = parser.parse_args() | ||
| 39 | |||
| 40 | halo_args = { | ||
| 41 | 'stream': stderr, | ||
| 42 | 'enabled': stderr.isatty(), | ||
| 43 | 'spinner': 'arc' | ||
| 44 | } | ||
| 45 | |||
| 46 | borg_pwd = getpwnam('borg') | ||
| 47 | |||
| 48 | def as_borg(caps=set(), cwd=None): | ||
| 49 | if caps: | ||
| 50 | cap_permitted.add(*caps) | ||
| 51 | cap_inheritable.add(*caps) | ||
| 52 | cap_effective.add(*caps) | ||
| 53 | cap_ambient.add(*caps) | ||
| 54 | |||
| 55 | os.setgid(borg_pwd.pw_gid) | ||
| 56 | os.setuid(borg_pwd.pw_uid) | ||
| 57 | |||
| 58 | if cwd is not None: | ||
| 59 | os.chdir(cwd) | ||
| 60 | |||
| 61 | def read_repo(path): | ||
| 62 | with Halo(text=f'Listing {path}', **halo_args) as sp: | ||
| 63 | res = None | ||
| 64 | with subprocess.Popen(['borg', 'list', '--info', '--lock-wait=600', '--json', path], stdout=subprocess.PIPE, preexec_fn=lambda: as_borg()) as proc: | ||
| 65 | res = json.load(proc.stdout)['archives'] | ||
| 66 | if sp.enabled: | ||
| 67 | sp.succeed(f'{len(res)} archives in {path}') | ||
| 68 | else: | ||
| 69 | print(f'{len(res)} archives in {path}', file=stderr) | ||
| 70 | return res | ||
| 71 | |||
| 72 | class ToSync: | ||
| 73 | to_sync = deque() | ||
| 74 | |||
| 75 | def __iter__(self): | ||
| 76 | return self | ||
| 77 | |||
| 78 | def __next__(self): | ||
| 79 | if self.to_sync: | ||
| 80 | return self.to_sync.popleft() | ||
| 81 | |||
| 82 | while True: | ||
| 83 | try: | ||
| 84 | src = read_repo(args.source) | ||
| 85 | dst = read_repo(args.target) | ||
| 86 | except (subprocess.CalledProcessError, json.decoder.JSONDecodeError) as err: | ||
| 87 | print(err, file=stderr) | ||
| 88 | continue | ||
| 89 | |||
| 90 | self.to_sync.extend([entry for entry in src if entry['name'] not in {dst_entry['name'] for dst_entry in dst} and not entry['name'].endswith('.checkpoint')]) | ||
| 91 | |||
| 92 | if self.to_sync: | ||
| 93 | return self.to_sync.popleft() | ||
| 94 | |||
| 95 | raise StopIteration | ||
| 96 | |||
| 97 | def copy_archive(src_repo_path, dst_repo_path, entry): | ||
| 98 | cache_suffix = None | ||
| 99 | with Halo(text=f'Determine archive parameters', **halo_args) as sp: | ||
| 100 | match = re.compile('^(.*)-[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(\.(checkpoint|recreate)(\.[0-9]+)?)?').fullmatch(entry['name']) | ||
| 101 | if match: | ||
| 102 | repo_id = None | ||
| 103 | with subprocess.Popen(['borg', 'info', '--info', '--lock-wait=600', '--json', src_repo_path], stdout=subprocess.PIPE, preexec_fn=lambda: as_borg()) as proc: | ||
| 104 | repo_id = json.load(proc.stdout)['repository']['id'] | ||
| 105 | if repo_id: | ||
| 106 | cache_suffix = f'{repo_id}_{match.group(1)}' | ||
| 107 | if sp.enabled: | ||
| 108 | sp.succeed(f'Will process {entry["name"]} ({dateutil.parser.isoparse(entry["start"])}, cache_suffix={cache_suffix})') | ||
| 109 | else: | ||
| 110 | print(f'Will process {entry["name"]} ({dateutil.parser.isoparse(entry["start"])}, cache_suffix={cache_suffix})', file=stderr) | ||
| 111 | with TemporaryDirectory(prefix=f'borg-mount_{entry["name"]}_', dir=os.environ.get('RUNTIME_DIRECTORY')) as tmpdir: | ||
| 112 | child = os.fork() | ||
| 113 | if child == 0: | ||
| 114 | # print('unshare/chroot', file=stderr) | ||
| 115 | unshare.unshare(unshare.CLONE_NEWNS) | ||
| 116 | subprocess.run(['mount', '--make-rprivate', '/'], check=True) | ||
| 117 | chroot = pathlib.Path(tmpdir) / 'chroot' | ||
| 118 | upper = pathlib.Path(tmpdir) / 'upper' | ||
| 119 | work = pathlib.Path(tmpdir) / 'work' | ||
| 120 | for path in [chroot,upper,work]: | ||
| 121 | path.mkdir() | ||
| 122 | subprocess.run(['mount', '-t', 'overlay', 'overlay', '-o', f'lowerdir=/,upperdir={upper},workdir={work}', chroot], check=True) | ||
| 123 | bindMounts = ['nix', 'run', 'run/secrets.d', 'run/wrappers', 'proc', 'dev', 'sys', pathlib.Path(os.path.expanduser('~')).relative_to('/')] | ||
| 124 | if os.environ.get('BORG_BASE_DIR'): | ||
| 125 | bindMounts.append(pathlib.Path(os.environ['BORG_BASE_DIR']).relative_to('/')) | ||
| 126 | if not ":" in src_repo_path: | ||
| 127 | bindMounts.append(pathlib.Path(src_repo_path).relative_to('/')) | ||
| 128 | if 'SSH_AUTH_SOCK' in os.environ: | ||
| 129 | bindMounts.append(pathlib.Path(os.environ['SSH_AUTH_SOCK']).parent.relative_to('/')) | ||
| 130 | for bindMount in bindMounts: | ||
| 131 | (chroot / bindMount).mkdir(parents=True,exist_ok=True) | ||
| 132 | # print(*['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], file=stderr) | ||
| 133 | subprocess.run(['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], check=True) | ||
| 134 | os.chroot(chroot) | ||
| 135 | os.chdir('/') | ||
| 136 | try: | ||
| 137 | os.unlink('/etc/fuse.conf') | ||
| 138 | except FileNotFoundError: | ||
| 139 | pass | ||
| 140 | pathlib.Path('/etc/fuse.conf').parent.mkdir(parents=True,exist_ok=True) | ||
| 141 | with open('/etc/fuse.conf', 'w') as fuse_conf: | ||
| 142 | fuse_conf.write('user_allow_other\nmount_max = 1000\n') | ||
| 143 | dir = pathlib.Path('/borg') | ||
| 144 | dir.mkdir(parents=True,exist_ok=True,mode=0o0750) | ||
| 145 | os.chown(dir, borg_pwd.pw_uid, borg_pwd.pw_gid) | ||
| 146 | |||
| 147 | total_size = None | ||
| 148 | total_files = None | ||
| 149 | if stderr.isatty(): | ||
| 150 | with Halo(text=f'Determine size', **halo_args) as sp: | ||
| 151 | with subprocess.Popen(['borg', 'info', '--info', '--json', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}'], stdout=subprocess.PIPE, text=True, preexec_fn=lambda: as_borg()) as proc: | ||
| 152 | stats = json.load(proc.stdout)['archives'][0]['stats'] | ||
| 153 | total_size = stats['original_size'] | ||
| 154 | total_files = stats['nfiles'] | ||
| 155 | if sp.enabled: | ||
| 156 | sp.succeed(f'{total_files} files, {naturalsize(total_size, binary=True)}') | ||
| 157 | else: | ||
| 158 | print(f'{total_files} files, {naturalsize(total_size, binary=True)}', file=stderr) | ||
| 159 | # print(f'Mounting to {dir}', file=stderr) | ||
| 160 | with subprocess.Popen(['borg', 'mount', '-o', 'allow_other,ignore_permissions', '--foreground', '--progress', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}', dir], preexec_fn=lambda: as_borg()) as mount_proc: | ||
| 161 | with Halo(text='Waiting for mount', **halo_args) as sp: | ||
| 162 | wait_start = datetime.now() | ||
| 163 | while True: | ||
| 164 | if os.path.ismount(dir): | ||
| 165 | break | ||
| 166 | elif datetime.now() - wait_start > timedelta(minutes=15): | ||
| 167 | ret.check_returncode() | ||
| 168 | sleep(0.1) | ||
| 169 | if sp.enabled: | ||
| 170 | sp.succeed('Mounted') | ||
| 171 | else: | ||
| 172 | print('Mounted', file=stderr) | ||
| 173 | while True: | ||
| 174 | with tqdm(total=total_size, unit_scale=True, unit_divisor=1024, unit='B', smoothing=0.01, disable=None, dynamic_ncols=True, maxinterval=0.5, miniters=1) as progress: | ||
| 175 | seen = 0 | ||
| 176 | env = os.environ.copy() | ||
| 177 | create_args = ['borg', | ||
| 178 | 'create', | ||
| 179 | '--lock-wait=600', | ||
| 180 | '--one-file-system', | ||
| 181 | '--compression=auto,zstd,10', | ||
| 182 | '--chunker-params=10,23,16,4095', | ||
| 183 | '--files-cache=ctime,size', | ||
| 184 | '--show-rc', | ||
| 185 | '--upload-buffer=100', | ||
| 186 | '--upload-ratelimit=20480', | ||
| 187 | '--log-json', | ||
| 188 | '--progress', | ||
| 189 | '--list', | ||
| 190 | '--filter=AMEi-x?', | ||
| 191 | '--stats' | ||
| 192 | ] | ||
| 193 | archive_time = datetime.strptime(entry["time"], "%Y-%m-%dT%H:%M:%S.%f").replace(tzinfo=tzlocal()).astimezone(tzutc()) | ||
| 194 | create_args += [f'--timestamp={archive_time.strftime("%Y-%m-%dT%H:%M:%S")}'] | ||
| 195 | if cache_suffix: | ||
| 196 | env['BORG_FILES_CACHE_SUFFIX'] = cache_suffix | ||
| 197 | else: | ||
| 198 | create_args += ['--files-cache=disabled'] | ||
| 199 | create_args += [f'{dst_repo_path}::{entry["name"]}', '.'] | ||
| 200 | |||
| 201 | with subprocess.Popen(create_args, stdin=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True, env=env, preexec_fn=lambda: as_borg(caps={Cap.DAC_READ_SEARCH}, cwd=dir)) as proc: | ||
| 202 | last_list = None | ||
| 203 | last_list_time = None | ||
| 204 | for line in proc.stderr: | ||
| 205 | try: | ||
| 206 | json_line = json.loads(line) | ||
| 207 | except json.decoder.JSONDecodeError: | ||
| 208 | tqdm.write(line) | ||
| 209 | continue | ||
| 210 | |||
| 211 | t = '' | ||
| 212 | if 'time' in json_line and stderr.isatty(): | ||
| 213 | ts = datetime.fromtimestamp(json_line['time']).replace(tzinfo=tzlocal()) | ||
| 214 | t = f'{ts.isoformat(timespec="minutes")} ' | ||
| 215 | if json_line['type'] == 'archive_progress': | ||
| 216 | if last_list_time is None or ((datetime.now() - last_list_time) // timedelta(seconds=3)) % 2 == 1: | ||
| 217 | if 'path' in json_line and json_line['path']: | ||
| 218 | progress.set_description(f'… {json_line["path"]}', refresh=False) | ||
| 219 | else: | ||
| 220 | progress.set_description(None, refresh=False) | ||
| 221 | elif last_list is not None: | ||
| 222 | progress.set_description(last_list, refresh=False) | ||
| 223 | nfiles=json_line["nfiles"] | ||
| 224 | if total_files is not None: | ||
| 225 | nfiles=f'{json_line["nfiles"]}/{total_files}' | ||
| 226 | progress.set_postfix(compressed=naturalsize(json_line['compressed_size'], binary=True), deduplicated=naturalsize(json_line['deduplicated_size'], binary=True), nfiles=nfiles, refresh=False) | ||
| 227 | progress.update(json_line["original_size"] - seen) | ||
| 228 | seen = json_line["original_size"] | ||
| 229 | elif json_line['type'] == 'file_status': | ||
| 230 | # tqdm.write(t + f'{json_line["status"]} {json_line["path"]}') | ||
| 231 | last_list = f'{json_line["status"]} {json_line["path"]}' | ||
| 232 | last_list_time = datetime.now() | ||
| 233 | progress.set_description(last_list, refresh=False) | ||
| 234 | if not stderr.isatty(): | ||
| 235 | print(last_list, file=stderr) | ||
| 236 | elif (json_line['type'] == 'log_message' or json_line['type'] == 'progress_message' or json_line['type'] == 'progress_percent') and ('message' in json_line or 'msgid' in json_line): | ||
| 237 | if 'message' in json_line: | ||
| 238 | tqdm.write(t + json_line['message']) | ||
| 239 | elif 'msgid' in json_line: | ||
| 240 | tqdm.write(t + json_line['msgid']) | ||
| 241 | else: | ||
| 242 | tqdm.write(t + line) | ||
| 243 | progress.set_description(None) | ||
| 244 | if proc.wait() != 0: | ||
| 245 | dst = None | ||
| 246 | try: | ||
| 247 | dst = read_repo(args.target) | ||
| 248 | except (subprocess.CalledProcessError, json.decoder.JSONDecodeError) as err: | ||
| 249 | print(err, file=stderr) | ||
| 250 | continue | ||
| 251 | else: | ||
| 252 | if any(map(lambda other: entry['name'] == other['name'], dst)): | ||
| 253 | break | ||
| 254 | |||
| 255 | continue | ||
| 256 | mount_proc.terminate() | ||
| 257 | os._exit(0) | ||
| 258 | else: | ||
| 259 | while True: | ||
| 260 | waitpid, waitret = os.wait() | ||
| 261 | if waitret != 0: | ||
| 262 | sys.exit(waitret) | ||
| 263 | if waitpid == child: | ||
| 264 | break | ||
| 265 | |||
| 266 | def sigterm(signum, frame): | ||
| 267 | raise SystemExit(128 + signum) | ||
| 268 | |||
| 269 | def main(): | ||
| 270 | signal.signal(signal.SIGTERM, sigterm) | ||
| 271 | |||
| 272 | if "::" in args.source: | ||
| 273 | (src_repo_path, _, src_archive) = args.source.partition("::") | ||
| 274 | entry = None | ||
| 275 | for candidate_entry in read_repo(src_repo_path): | ||
| 276 | if entry['name'] != src_archive: | ||
| 277 | continue | ||
| 278 | entry = candidate_entry | ||
| 279 | break | ||
| 280 | |||
| 281 | if entry is None: | ||
| 282 | print("Did not find archive", file=stderr) | ||
| 283 | os.exit(1) | ||
| 284 | |||
| 285 | copy_archive(src_repo_path, args.target, entry) | ||
| 286 | else: | ||
| 287 | for entry in ToSync(): | ||
| 288 | copy_archive(args.source, args.target, entry) | ||
| 289 | |||
| 290 | if __name__ == "__main__": | ||
| 291 | sys.exit(main()) | ||
| diff --git a/hosts/vidhar/borg/copy/copy_borg/__main__.py b/hosts/vidhar/borg/copy/copy_borg/__main__.py new file mode 100755 index 00000000..5b374d99 --- /dev/null +++ b/hosts/vidhar/borg/copy/copy_borg/__main__.py | |||
| @@ -0,0 +1,556 @@ | |||
| 1 | #!@python@/bin/python | ||
| 2 | |||
| 3 | import json | ||
| 4 | import os | ||
| 5 | import subprocess | ||
| 6 | import re | ||
| 7 | import sys | ||
| 8 | import io | ||
| 9 | from sys import stderr | ||
| 10 | from humanize import naturalsize | ||
| 11 | |||
| 12 | from tempfile import TemporaryDirectory | ||
| 13 | |||
| 14 | from datetime import (datetime, timedelta) | ||
| 15 | from dateutil.tz import (tzlocal, tzutc) | ||
| 16 | import dateutil.parser | ||
| 17 | import argparse | ||
| 18 | |||
| 19 | from tqdm import tqdm | ||
| 20 | |||
| 21 | from xdg import xdg_runtime_dir | ||
| 22 | import pathlib | ||
| 23 | |||
| 24 | import unshare | ||
| 25 | from pyprctl import CapState, Cap, cap_ambient_raise, cap_ambient_is_set, set_keepcaps | ||
| 26 | from pwd import getpwnam | ||
| 27 | |||
| 28 | import logging | ||
| 29 | |||
| 30 | import signal | ||
| 31 | import time | ||
| 32 | import math | ||
| 33 | |||
| 34 | from halo import Halo | ||
| 35 | |||
| 36 | from collections import deque | ||
| 37 | |||
| 38 | import select | ||
| 39 | import fcntl | ||
| 40 | |||
| 41 | from multiprocessing import Process, Manager | ||
| 42 | from contextlib import closing | ||
| 43 | |||
| 44 | |||
| 45 | halo_args = { | ||
| 46 | 'stream': stderr, | ||
| 47 | 'enabled': stderr.isatty(), | ||
| 48 | 'spinner': 'arc' | ||
| 49 | } | ||
| 50 | |||
| 51 | borg_pwd = getpwnam('borg') | ||
| 52 | |||
| 53 | def as_borg(caps=set()): | ||
| 54 | global logger | ||
| 55 | |||
| 56 | try: | ||
| 57 | if caps: | ||
| 58 | c_state = CapState.get_current() | ||
| 59 | c_state.permitted.add(*caps) | ||
| 60 | c_state.set_current() | ||
| 61 | |||
| 62 | # logger.debug("before setgid/setuid: cap_permitted=%s", CapState.get_current().permitted) | ||
| 63 | |||
| 64 | set_keepcaps(True) | ||
| 65 | |||
| 66 | os.setgid(borg_pwd.pw_gid) | ||
| 67 | os.setuid(borg_pwd.pw_uid) | ||
| 68 | |||
| 69 | if caps: | ||
| 70 | # logger.debug("after setgid/setuid: cap_permitted=%s", CapState.get_current().permitted) | ||
| 71 | |||
| 72 | c_state = CapState.get_current() | ||
| 73 | c_state.permitted = caps.copy() | ||
| 74 | c_state.inheritable.add(*caps) | ||
| 75 | c_state.set_current() | ||
| 76 | |||
| 77 | # logger.debug("cap_permitted=%s", CapState.get_current().permitted) | ||
| 78 | # logger.debug("cap_inheritable=%s", CapState.get_current().inheritable) | ||
| 79 | |||
| 80 | for cap in caps: | ||
| 81 | cap_ambient_raise(cap) | ||
| 82 | # logger.debug("cap_ambient[%s]=%s", cap, cap_ambient_is_set(cap)) | ||
| 83 | except Exception: | ||
| 84 | logger.error(format_exc()) | ||
| 85 | raise | ||
| 86 | |||
| 87 | def borg_json(*args, **kwargs): | ||
| 88 | global logger | ||
| 89 | |||
| 90 | with subprocess.Popen(*args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, **kwargs) as proc: | ||
| 91 | stdout_buffer = io.BytesIO() | ||
| 92 | |||
| 93 | proc_logger = logger.getChild('borg') | ||
| 94 | stdout_logger = proc_logger.getChild('stdout') | ||
| 95 | stderr_logger = proc_logger.getChild('stderr') | ||
| 96 | |||
| 97 | fcntl.fcntl(proc.stdout.fileno(), fcntl.F_SETFL, fcntl.fcntl(proc.stdout.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK) | ||
| 98 | fcntl.fcntl(proc.stderr.fileno(), fcntl.F_SETFL, fcntl.fcntl(proc.stderr.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK) | ||
| 99 | |||
| 100 | poll = select.poll() | ||
| 101 | poll.register(proc.stdout, select.POLLIN | select.POLLHUP) | ||
| 102 | poll.register(proc.stderr, select.POLLIN | select.POLLHUP) | ||
| 103 | pollc = 2 | ||
| 104 | events = poll.poll() | ||
| 105 | stderr_linebuf = bytearray() | ||
| 106 | |||
| 107 | while pollc > 0 and len(events) > 0: | ||
| 108 | for rfd, event in events: | ||
| 109 | if event & select.POLLIN: | ||
| 110 | if rfd == proc.stdout.fileno(): | ||
| 111 | try: | ||
| 112 | buf = os.read(proc.stdout.fileno(), 8192) | ||
| 113 | # stdout_logger.debug(buf) | ||
| 114 | stdout_buffer.write(buf) | ||
| 115 | except BlockingIOError: | ||
| 116 | pass | ||
| 117 | if rfd == proc.stderr.fileno(): | ||
| 118 | try: | ||
| 119 | stderr_linebuf.extend(os.read(proc.stderr.fileno(), 8192)) | ||
| 120 | except BlockingIOError: | ||
| 121 | pass | ||
| 122 | |||
| 123 | while stderr_linebuf: | ||
| 124 | line, sep, stderr_linebuf = stderr_linebuf.partition(b'\n') | ||
| 125 | if not sep: | ||
| 126 | stderr_linebuf = line | ||
| 127 | break | ||
| 128 | |||
| 129 | stderr_logger.info(line.decode()) | ||
| 130 | if event == select.POLLHUP: | ||
| 131 | poll.unregister(rfd) | ||
| 132 | pollc -= 1 | ||
| 133 | |||
| 134 | if pollc > 0: | ||
| 135 | events = poll.poll() | ||
| 136 | |||
| 137 | for handler in proc_logger.handlers: | ||
| 138 | handler.flush() | ||
| 139 | |||
| 140 | ret = proc.wait() | ||
| 141 | if ret != 0: | ||
| 142 | raise Exception(f'borg subprocess exited with returncode {ret}') | ||
| 143 | |||
| 144 | stdout_buffer.seek(0) | ||
| 145 | return json.load(stdout_buffer) | ||
| 146 | |||
| 147 | def read_repo(path): | ||
| 148 | global logger | ||
| 149 | |||
| 150 | with Halo(text=f'Listing {path}', **halo_args) as sp: | ||
| 151 | if not sp.enabled: | ||
| 152 | logger.debug('Listing %s...', path) | ||
| 153 | res = borg_json(['borg', 'list', '--info', '--lock-wait=600', '--json', path], preexec_fn=lambda: as_borg())['archives'] | ||
| 154 | if sp.enabled: | ||
| 155 | sp.succeed(f'{len(res)} archives in {path}') | ||
| 156 | else: | ||
| 157 | logger.info('%d archives in ‘%s’', len(res), path) | ||
| 158 | return res | ||
| 159 | |||
| 160 | class ToSync: | ||
| 161 | to_sync = deque() | ||
| 162 | |||
| 163 | def __init__(self, source, target): | ||
| 164 | self.source = source | ||
| 165 | self.target = target | ||
| 166 | |||
| 167 | def __iter__(self): | ||
| 168 | return self | ||
| 169 | |||
| 170 | def __next__(self): | ||
| 171 | global logger | ||
| 172 | |||
| 173 | if self.to_sync: | ||
| 174 | return self.to_sync.popleft() | ||
| 175 | |||
| 176 | while True: | ||
| 177 | try: | ||
| 178 | src = read_repo(self.source) | ||
| 179 | dst = read_repo(self.target) | ||
| 180 | except (subprocess.CalledProcessError, json.decoder.JSONDecodeError) as err: | ||
| 181 | logger.error(err) | ||
| 182 | continue | ||
| 183 | |||
| 184 | self.to_sync.extend([entry for entry in src if entry['name'] not in {dst_entry['name'] for dst_entry in dst} and not entry['name'].endswith('.checkpoint')]) | ||
| 185 | |||
| 186 | if self.to_sync: | ||
| 187 | return self.to_sync.popleft() | ||
| 188 | |||
| 189 | raise StopIteration | ||
| 190 | |||
| 191 | def copy_archive(src_repo_path, dst_repo_path, entry): | ||
| 192 | global logger | ||
| 193 | |||
| 194 | def do_copy(tmpdir_q): | ||
| 195 | global logger | ||
| 196 | |||
| 197 | nonlocal src_repo_path, dst_repo_path, entry | ||
| 198 | |||
| 199 | tmpdir = tmpdir_q.get() | ||
| 200 | |||
| 201 | cache_suffix = None | ||
| 202 | with Halo(text=f'Determine archive parameters', **halo_args) as sp: | ||
| 203 | if not sp.enabled: | ||
| 204 | logger.debug('Determining archive parameters...') | ||
| 205 | match = re.compile('^(.*)-[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(\.(checkpoint|recreate)(\.[0-9]+)?)?').fullmatch(entry['name']) | ||
| 206 | if match: | ||
| 207 | repo_id = borg_json(['borg', 'info', '--info', '--lock-wait=600', '--json', src_repo_path], preexec_fn=lambda: as_borg())['repository']['id'] | ||
| 208 | |||
| 209 | if repo_id: | ||
| 210 | cache_suffix = f'{repo_id}_{match.group(1)}' | ||
| 211 | if sp.enabled: | ||
| 212 | sp.succeed(f'Will process {entry["name"]} ({dateutil.parser.isoparse(entry["start"])}, cache_suffix={cache_suffix})') | ||
| 213 | else: | ||
| 214 | logger.info('Will process ‘%s’ (%s, cache_suffix=%s)', entry['name'], dateutil.parser.isoparse(entry['start']), cache_suffix) | ||
| 215 | |||
| 216 | logger.debug('Setting up environment...') | ||
| 217 | unshare.unshare(unshare.CLONE_NEWNS) | ||
| 218 | subprocess.run(['mount', '--make-rprivate', '/'], check=True) | ||
| 219 | chroot = pathlib.Path(tmpdir) / 'chroot' | ||
| 220 | upper = pathlib.Path(tmpdir) / 'upper' | ||
| 221 | work = pathlib.Path(tmpdir) / 'work' | ||
| 222 | for path in [chroot,upper,work]: | ||
| 223 | path.mkdir() | ||
| 224 | subprocess.run(['mount', '-t', 'overlay', 'overlay', '-o', f'lowerdir=/,upperdir={upper},workdir={work}', chroot], check=True) | ||
| 225 | bindMounts = ['nix', 'run', 'run/secrets.d', 'run/wrappers', 'proc', 'dev', 'sys', pathlib.Path(os.path.expanduser('~')).relative_to('/')] | ||
| 226 | if os.environ.get('BORG_BASE_DIR'): | ||
| 227 | bindMounts.append(pathlib.Path(os.environ['BORG_BASE_DIR']).relative_to('/')) | ||
| 228 | if not ":" in src_repo_path: | ||
| 229 | bindMounts.append(pathlib.Path(src_repo_path).relative_to('/')) | ||
| 230 | if 'SSH_AUTH_SOCK' in os.environ: | ||
| 231 | bindMounts.append(pathlib.Path(os.environ['SSH_AUTH_SOCK']).parent.relative_to('/')) | ||
| 232 | for bindMount in bindMounts: | ||
| 233 | (chroot / bindMount).mkdir(parents=True,exist_ok=True) | ||
| 234 | subprocess.run(['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], check=True) | ||
| 235 | os.chroot(chroot) | ||
| 236 | os.chdir('/') | ||
| 237 | try: | ||
| 238 | os.unlink('/etc/fuse.conf') | ||
| 239 | except FileNotFoundError: | ||
| 240 | pass | ||
| 241 | pathlib.Path('/etc/fuse.conf').parent.mkdir(parents=True,exist_ok=True) | ||
| 242 | with open('/etc/fuse.conf', 'w') as fuse_conf: | ||
| 243 | fuse_conf.write('user_allow_other\nmount_max = 1000\n') | ||
| 244 | dir = pathlib.Path('/borg') | ||
| 245 | dir.mkdir(parents=True,exist_ok=True,mode=0o0750) | ||
| 246 | os.chown(dir, borg_pwd.pw_uid, borg_pwd.pw_gid) | ||
| 247 | |||
| 248 | total_size = None | ||
| 249 | total_files = None | ||
| 250 | if stderr.isatty(): | ||
| 251 | with Halo(text=f'Determine size', **halo_args) as sp: | ||
| 252 | stats = borg_json(['borg', 'info', '--info', '--json', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}'], preexec_fn=lambda: as_borg())['archives'][0]['stats'] | ||
| 253 | total_size = stats['original_size'] | ||
| 254 | total_files = stats['nfiles'] | ||
| 255 | if sp.enabled: | ||
| 256 | sp.succeed(f'{total_files} files, {naturalsize(total_size, binary=True)}') | ||
| 257 | else: | ||
| 258 | logger.info('%d files, %s', total_files, naturalsize(total_size, binary=True)) | ||
| 259 | with subprocess.Popen(['borg', 'mount', '-o', 'allow_other,ignore_permissions', '--foreground', '--progress', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}', dir], preexec_fn=lambda: as_borg()) as mount_proc: | ||
| 260 | with Halo(text='Waiting for mount', **halo_args) as sp: | ||
| 261 | if not sp.enabled: | ||
| 262 | logger.debug('Waiting for mount...') | ||
| 263 | wait_start = datetime.now() | ||
| 264 | while True: | ||
| 265 | if os.path.ismount(dir): | ||
| 266 | break | ||
| 267 | elif datetime.now() - wait_start > timedelta(minutes=15): | ||
| 268 | ret.check_returncode() | ||
| 269 | time.sleep(0.1) | ||
| 270 | if sp.enabled: | ||
| 271 | sp.succeed('Mounted') | ||
| 272 | else: | ||
| 273 | logger.info('Mounted %s', f'{src_repo_path}::{entry["name"]}') | ||
| 274 | |||
| 275 | while True: | ||
| 276 | with tqdm(total=total_size, unit_scale=True, unit_divisor=1024, unit='B', smoothing=0.01, disable=None, dynamic_ncols=True, maxinterval=0.5, miniters=1) as progress: | ||
| 277 | seen = 0 | ||
| 278 | env = os.environ.copy() | ||
| 279 | create_args = ['borg', | ||
| 280 | 'create', | ||
| 281 | '--lock-wait=600', | ||
| 282 | '--one-file-system', | ||
| 283 | '--compression=auto,zstd,10', | ||
| 284 | '--chunker-params=10,23,16,4095', | ||
| 285 | '--files-cache=ctime,size', | ||
| 286 | '--show-rc', | ||
| 287 | '--upload-buffer=100', | ||
| 288 | '--upload-ratelimit=20480', | ||
| 289 | '--log-json', | ||
| 290 | '--progress', | ||
| 291 | '--list', | ||
| 292 | '--filter=AMEi-x?', | ||
| 293 | '--stats' | ||
| 294 | ] | ||
| 295 | archive_time = datetime.strptime(entry["time"], "%Y-%m-%dT%H:%M:%S.%f").replace(tzinfo=tzlocal()).astimezone(tzutc()) | ||
| 296 | create_args += [f'--timestamp={archive_time.strftime("%Y-%m-%dT%H:%M:%S")}'] | ||
| 297 | if cache_suffix: | ||
| 298 | env['BORG_FILES_CACHE_SUFFIX'] = cache_suffix | ||
| 299 | else: | ||
| 300 | create_args += ['--files-cache=disabled'] | ||
| 301 | create_args += [f'{dst_repo_path}::{entry["name"]}', '.'] | ||
| 302 | |||
| 303 | with subprocess.Popen(create_args, stdin=subprocess.DEVNULL, stderr=subprocess.PIPE, stdout=subprocess.PIPE, env=env, preexec_fn=lambda: as_borg(caps={Cap.DAC_READ_SEARCH}), cwd=dir) as proc: | ||
| 304 | last_list = None | ||
| 305 | last_list_time = time.monotonic_ns() | ||
| 306 | logger.info('Creating...') | ||
| 307 | |||
| 308 | proc_logger = logger.getChild('borg') | ||
| 309 | stdout_logger = proc_logger.getChild('stdout') | ||
| 310 | stderr_logger = proc_logger.getChild('stderr') | ||
| 311 | |||
| 312 | fcntl.fcntl(proc.stdout.fileno(), fcntl.F_SETFL, fcntl.fcntl(proc.stdout.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK) | ||
| 313 | fcntl.fcntl(proc.stderr.fileno(), fcntl.F_SETFL, fcntl.fcntl(proc.stderr.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK) | ||
| 314 | |||
| 315 | poll = select.poll() | ||
| 316 | poll.register(proc.stdout, select.POLLIN | select.POLLHUP) | ||
| 317 | poll.register(proc.stderr, select.POLLIN | select.POLLHUP) | ||
| 318 | pollc = 2 | ||
| 319 | events = poll.poll() | ||
| 320 | stdout_linebuf = bytearray() | ||
| 321 | stderr_linebuf = bytearray() | ||
| 322 | |||
| 323 | while pollc > 0 and len(events) > 0: | ||
| 324 | # logger.debug('%d events', len(events)) | ||
| 325 | for rfd, event in events: | ||
| 326 | # logger.debug('event %s', event) | ||
| 327 | if event & select.POLLIN: | ||
| 328 | if rfd == proc.stdout.fileno(): | ||
| 329 | try: | ||
| 330 | # logger.debug('reading stdout...') | ||
| 331 | stdout_linebuf.extend(os.read(proc.stdout.fileno(), 8192)) | ||
| 332 | # logger.debug('read stdout, len(stdout_linebuf)=%d', len(stdout_linebuf)) | ||
| 333 | except BlockingIOError: | ||
| 334 | pass | ||
| 335 | |||
| 336 | while stdout_linebuf: | ||
| 337 | # logger.debug('stdout line...') | ||
| 338 | line, sep, stdout_linebuf = stdout_linebuf.partition(b'\n') | ||
| 339 | if not sep: | ||
| 340 | stdout_linebuf = line | ||
| 341 | break | ||
| 342 | |||
| 343 | stdout_logger.info(line.decode()) | ||
| 344 | # logger.debug('handled stdout lines, %d leftover', len(stdout_linebuf)) | ||
| 345 | if rfd == proc.stderr.fileno(): | ||
| 346 | try: | ||
| 347 | # logger.debug('reading stderr...') | ||
| 348 | stderr_linebuf.extend(os.read(proc.stderr.fileno(), 8192)) | ||
| 349 | # logger.debug('read stderr, len(stderr_linebuf)=%d', len(stderr_linebuf)) | ||
| 350 | except BlockingIOError: | ||
| 351 | pass | ||
| 352 | |||
| 353 | while stderr_linebuf: | ||
| 354 | # logger.debug('stderr line...') | ||
| 355 | line, sep, stderr_linebuf = stderr_linebuf.partition(b'\n') | ||
| 356 | if not sep: | ||
| 357 | stderr_linebuf = line | ||
| 358 | break | ||
| 359 | |||
| 360 | try: | ||
| 361 | json_line = json.loads(line) | ||
| 362 | except json.decoder.JSONDecodeError: | ||
| 363 | if progress.disable: | ||
| 364 | stderr_logger.error(line.decode()) | ||
| 365 | else: | ||
| 366 | tqdm.write(line.decode()) | ||
| 367 | continue | ||
| 368 | |||
| 369 | # logger.debug('stderr line decoded: %s', json_line['type'] if 'type' in json_line else None) | ||
| 370 | |||
| 371 | t = '' | ||
| 372 | if 'time' in json_line and not progress.disable: | ||
| 373 | ts = datetime.fromtimestamp(json_line['time']).replace(tzinfo=tzlocal()) | ||
| 374 | t = f'{ts.isoformat(timespec="minutes")} ' | ||
| 375 | if json_line['type'] == 'archive_progress' and not progress.disable: | ||
| 376 | now = time.monotonic_ns() | ||
| 377 | if last_list_time is None or now - last_list_time >= 3e9: | ||
| 378 | last_list_time = now | ||
| 379 | if 'path' in json_line and json_line['path']: | ||
| 380 | progress.set_description(f'… {json_line["path"]}', refresh=False) | ||
| 381 | else: | ||
| 382 | progress.set_description(None, refresh=False) | ||
| 383 | elif last_list is not None: | ||
| 384 | progress.set_description(last_list, refresh=False) | ||
| 385 | nfiles=json_line["nfiles"] | ||
| 386 | if total_files is not None: | ||
| 387 | nfiles=f'{json_line["nfiles"]}/{total_files}' | ||
| 388 | progress.set_postfix(compressed=naturalsize(json_line['compressed_size'], binary=True), deduplicated=naturalsize(json_line['deduplicated_size'], binary=True), nfiles=nfiles, refresh=False) | ||
| 389 | progress.update(json_line["original_size"] - seen) | ||
| 390 | seen = json_line["original_size"] | ||
| 391 | elif json_line['type'] == 'archive_progress': | ||
| 392 | now = time.monotonic_ns() | ||
| 393 | if last_list_time is None or now - last_list_time >= 3e9: | ||
| 394 | last_list_time = now | ||
| 395 | if 'path' in json_line and json_line['path']: | ||
| 396 | stderr_logger.debug('… %s (%s)', json_line["path"], naturalsize(json_line["original_size"])) | ||
| 397 | else: | ||
| 398 | stderr_logger.debug('… (%s)', naturalsize(json_line["original_size"])) | ||
| 399 | elif json_line['type'] == 'file_status': | ||
| 400 | # tqdm.write(t + f'{json_line["status"]} {json_line["path"]}') | ||
| 401 | last_list = f'{json_line["status"]} {json_line["path"]}' | ||
| 402 | last_list_time = time.monotonic_ns() | ||
| 403 | progress.set_description(last_list, refresh=False) | ||
| 404 | if progress.disable: | ||
| 405 | stderr_logger.info(last_list) | ||
| 406 | elif (json_line['type'] == 'log_message' or json_line['type'] == 'progress_message' or json_line['type'] == 'progress_percent') and ('message' in json_line or 'msgid' in json_line): | ||
| 407 | if 'message' in json_line: | ||
| 408 | if progress.disable: | ||
| 409 | stderr_logger.info(t + json_line['message']) | ||
| 410 | else: | ||
| 411 | tqdm.write(t + json_line['message']) | ||
| 412 | elif 'msgid' in json_line: | ||
| 413 | if progress.disable: | ||
| 414 | stderr_logger.info(t + json_line['msgid']) | ||
| 415 | else: | ||
| 416 | tqdm.write(t + json_line['msgid']) | ||
| 417 | else: | ||
| 418 | if progress.disable: | ||
| 419 | stderr_logger.info(t + line.decode()) | ||
| 420 | else: | ||
| 421 | tqdm.write(t + line.decode()) | ||
| 422 | # logger.debug('handled stderr lines, %d leftover', len(stderr_linebuf)) | ||
| 423 | if event == select.POLLHUP: | ||
| 424 | poll.unregister(rfd) | ||
| 425 | pollc -= 1 | ||
| 426 | |||
| 427 | if pollc > 0: | ||
| 428 | # logger.debug('polling %d fds...', pollc) | ||
| 429 | events = poll.poll() | ||
| 430 | # logger.debug('done polling') | ||
| 431 | |||
| 432 | # logger.debug('borg create closed stdout/stderr') | ||
| 433 | if stdout_linebuf: | ||
| 434 | logger.error('unterminated line leftover in stdout: %s', stdout_linebuf) | ||
| 435 | if stderr_linebuf: | ||
| 436 | logger.error('unterminated line leftover in stdout: %s', stderr_linebuf) | ||
| 437 | progress.set_description(None) | ||
| 438 | ret = proc.wait() | ||
| 439 | # logger.debug('borg create terminated; ret=%d', ret) | ||
| 440 | if ret != 0: | ||
| 441 | dst = None | ||
| 442 | try: | ||
| 443 | dst = read_repo(dst_repo_path) | ||
| 444 | except (subprocess.CalledProcessError, json.decoder.JSONDecodeError) as err: | ||
| 445 | logger.error(err) | ||
| 446 | continue | ||
| 447 | else: | ||
| 448 | if any(map(lambda other: entry['name'] == other['name'], dst)): | ||
| 449 | logger.info('destination exists, terminating') | ||
| 450 | break | ||
| 451 | |||
| 452 | logger.warn('destination does not exist, retrying') | ||
| 453 | continue | ||
| 454 | else: | ||
| 455 | # logger.debug('terminating') | ||
| 456 | break | ||
| 457 | mount_proc.terminate() | ||
| 458 | |||
| 459 | with Manager() as manager: | ||
| 460 | tmpdir_q = manager.Queue(1) | ||
| 461 | |||
| 462 | with closing(Process(target=do_copy, args=(tmpdir_q,), name='do_copy')) as p: | ||
| 463 | p.start() | ||
| 464 | |||
| 465 | with TemporaryDirectory(prefix=f'borg-mount_{entry["name"]}_', dir=os.environ.get('RUNTIME_DIRECTORY')) as tmpdir: | ||
| 466 | tmpdir_q.put(tmpdir) | ||
| 467 | p.join() | ||
| 468 | return p.exitcode | ||
| 469 | |||
| 470 | def sigterm(signum, frame): | ||
| 471 | raise SystemExit(128 + signum) | ||
| 472 | |||
| 473 | def main(): | ||
| 474 | signal.signal(signal.SIGTERM, sigterm) | ||
| 475 | |||
| 476 | global logger | ||
| 477 | logger = logging.getLogger(__name__) | ||
| 478 | console_handler = logging.StreamHandler() | ||
| 479 | console_handler.setFormatter( logging.Formatter('[%(levelname)s](%(name)s): %(message)s') ) | ||
| 480 | if sys.stderr.isatty(): | ||
| 481 | console_handler.setFormatter( logging.Formatter('%(asctime)s [%(levelname)s](%(name)s): %(message)s') ) | ||
| 482 | |||
| 483 | burst_max = 1000 | ||
| 484 | burst = burst_max | ||
| 485 | last_use = None | ||
| 486 | inv_rate = 1e7 | ||
| 487 | def consume_filter(record): | ||
| 488 | nonlocal burst, burst_max, inv_rate, last_use | ||
| 489 | |||
| 490 | delay = None | ||
| 491 | while True: | ||
| 492 | now = time.monotonic_ns() | ||
| 493 | burst = min(burst_max, burst + math.floor((now - last_use) / inv_rate)) if last_use else burst_max | ||
| 494 | last_use = now | ||
| 495 | |||
| 496 | if burst > 0: | ||
| 497 | burst -= 1 | ||
| 498 | if delay: | ||
| 499 | delay = now - delay | ||
| 500 | |||
| 501 | return True | ||
| 502 | |||
| 503 | if delay is None: | ||
| 504 | delay = now | ||
| 505 | time.sleep(inv_rate / 1e9) | ||
| 506 | console_handler.addFilter(consume_filter) | ||
| 507 | |||
| 508 | logging.getLogger().addHandler(console_handler) | ||
| 509 | |||
| 510 | # log uncaught exceptions | ||
| 511 | def log_exceptions(type, value, tb): | ||
| 512 | global logger | ||
| 513 | |||
| 514 | logger.error(value) | ||
| 515 | sys.__excepthook__(type, value, tb) # calls default excepthook | ||
| 516 | |||
| 517 | sys.excepthook = log_exceptions | ||
| 518 | |||
| 519 | parser = argparse.ArgumentParser(prog='copy') | ||
| 520 | parser.add_argument('--verbosity', dest='log_level', action='append', type=int) | ||
| 521 | parser.add_argument('--verbose', '-v', dest='log_level', action='append_const', const=1) | ||
| 522 | parser.add_argument('--quiet', '-q', dest='log_level', action='append_const', const=-1) | ||
| 523 | parser.add_argument('source', metavar='REPO_OR_ARCHIVE') | ||
| 524 | parser.add_argument('target', metavar='REPO_OR_ARCHIVE') | ||
| 525 | args = parser.parse_args() | ||
| 526 | |||
| 527 | |||
| 528 | LOG_LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL] | ||
| 529 | DEFAULT_LOG_LEVEL = logging.ERROR | ||
| 530 | log_level = LOG_LEVELS.index(DEFAULT_LOG_LEVEL) | ||
| 531 | |||
| 532 | for adjustment in args.log_level or (): | ||
| 533 | log_level = min(len(LOG_LEVELS) - 1, max(log_level - adjustment, 0)) | ||
| 534 | logger.setLevel(LOG_LEVELS[log_level]) | ||
| 535 | |||
| 536 | |||
| 537 | if "::" in args.source: | ||
| 538 | (src_repo_path, _, src_archive) = args.source.partition("::") | ||
| 539 | entry = None | ||
| 540 | for candidate_entry in read_repo(src_repo_path): | ||
| 541 | if entry['name'] != src_archive: | ||
| 542 | continue | ||
| 543 | entry = candidate_entry | ||
| 544 | break | ||
| 545 | |||
| 546 | if entry is None: | ||
| 547 | logger.critical("Did not find archive ‘%s’", src_archive) | ||
| 548 | os.exit(1) | ||
| 549 | |||
| 550 | copy_archive(src_repo_path, args.target, entry) | ||
| 551 | else: | ||
| 552 | for entry in ToSync(args.source, args.target): | ||
| 553 | copy_archive(args.source, args.target, entry) | ||
| 554 | |||
| 555 | if __name__ == "__main__": | ||
| 556 | sys.exit(main()) | ||
| diff --git a/hosts/vidhar/borg/copy/setup.py b/hosts/vidhar/borg/copy/setup.py new file mode 100644 index 00000000..f77d9560 --- /dev/null +++ b/hosts/vidhar/borg/copy/setup.py | |||
| @@ -0,0 +1,10 @@ | |||
| 1 | from setuptools import setup | ||
| 2 | |||
| 3 | setup(name='copy_borg', | ||
| 4 | packages=['copy_borg'], | ||
| 5 | entry_points={ | ||
| 6 | 'console_scripts': [ | ||
| 7 | 'copy_borg=copy_borg.__main__:main', | ||
| 8 | ], | ||
| 9 | } | ||
| 10 | ) | ||
| diff --git a/hosts/vidhar/borg/default.nix b/hosts/vidhar/borg/default.nix index 8d0b46ef..7672de18 100644 --- a/hosts/vidhar/borg/default.nix +++ b/hosts/vidhar/borg/default.nix | |||
| @@ -26,7 +26,7 @@ let | |||
| 26 | in nameValuePair serviceName { | 26 | in nameValuePair serviceName { | 
| 27 | serviceConfig = { | 27 | serviceConfig = { | 
| 28 | Type = "oneshot"; | 28 | Type = "oneshot"; | 
| 29 | ExecStart = "${copyBorg}/bin/copy ${escapeShellArg repo} yggdrasil.borgbase:repo"; | 29 | ExecStart = "${copyBorg}/bin/copy_borg --verbosity 3 ${escapeShellArg repo} yggdrasil.borgbase:repo"; | 
| 30 | TimeoutStartSec = "8h"; | 30 | TimeoutStartSec = "8h"; | 
| 31 | # User = "borg"; | 31 | # User = "borg"; | 
| 32 | # Group = "borg"; | 32 | # Group = "borg"; | 
| @@ -43,40 +43,38 @@ let | |||
| 43 | "BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK=yes" | 43 | "BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK=yes" | 
| 44 | "BORG_HOSTNAME_IS_UNIQUE=yes" | 44 | "BORG_HOSTNAME_IS_UNIQUE=yes" | 
| 45 | ]; | 45 | ]; | 
| 46 | |||
| 47 | LogRateLimitIntervalSec = 0; | ||
| 46 | }; | 48 | }; | 
| 47 | }; | 49 | }; | 
| 48 | 50 | ||
| 49 | copyBorg = pkgs.stdenv.mkDerivation (let | 51 | copyBorg = flakeInputs.mach-nix.lib.${config.nixpkgs.system}.buildPythonPackage rec { | 
| 50 | packageOverrides = pkgs.callPackage ./pyprctl-packages.nix {}; | 52 | pname = "copy-borg"; | 
| 51 | inpPython = pkgs.python39.override { inherit packageOverrides; }; | 53 | src = ./copy; | 
| 52 | in rec { | 54 | version = "0.0.0"; | 
| 53 | name = "copy"; | 55 | ignoreDataOutdated = true; | 
| 54 | src = ./copy.py; | 56 | |
| 55 | 57 | requirements = '' | |
| 56 | phases = ["buildPhase" "checkPhase" "installPhase"]; | 58 | humanize | 
| 57 | 59 | tqdm | |
| 58 | buildInputs = with pkgs; [makeWrapper]; | 60 | python-dateutil | 
| 59 | 61 | xdg | |
| 60 | python = inpPython.withPackages (ps: with ps; [humanize tqdm python-dateutil xdg python-unshare pyprctl halo]); | 62 | python-unshare | 
| 61 | 63 | pyprctl | |
| 62 | buildPhase = '' | 64 | halo | 
| 63 | substitute $src copy \ | ||
| 64 | --subst-var-by python ${escapeShellArg python} | ||
| 65 | ''; | 65 | ''; | 
| 66 | 66 | postInstall = '' | |
| 67 | doCheck = true; | 67 | wrapProgram $out/bin/copy_borg \ | 
| 68 | checkPhase = '' | 68 | --prefix PATH : ${makeBinPath (with pkgs; [util-linux borgbackup])}:${config.security.wrapperDir} | 
| 69 | ${python}/bin/python -m py_compile copy | ||
| 70 | ''; | 69 | ''; | 
| 71 | 70 | ||
| 72 | installPhase = '' | 71 | providers.python-unshare = "nixpkgs"; | 
| 73 | install -m 0755 -D -t $out/bin \ | 72 | overridesPre = [ | 
| 74 | copy | 73 | (self: super: { python-unshare = super.python-unshare.overrideAttrs (oldAttrs: { name = "python-unshare-0.2.1"; version = "0.2.1"; }); }) | 
| 74 | ]; | ||
| 75 | 75 | ||
| 76 | wrapProgram $out/bin/copy \ | 76 | # _.tomli.buildInputs.add = with pkgs."python3Packages"; [ flit-core ]; | 
| 77 | --prefix PATH : ${makeBinPath (with pkgs; [util-linux borgbackup])}:${config.security.wrapperDir} | 77 | }; | 
| 78 | ''; | ||
| 79 | }); | ||
| 80 | in { | 78 | in { | 
| 81 | config = { | 79 | config = { | 
| 82 | services.borgsnap = { | 80 | services.borgsnap = { | 
