diff options
Diffstat (limited to 'hosts/vidhar/borg/copy.py')
-rwxr-xr-x | hosts/vidhar/borg/copy.py | 291 |
1 files changed, 0 insertions, 291 deletions
diff --git a/hosts/vidhar/borg/copy.py b/hosts/vidhar/borg/copy.py deleted file mode 100755 index c648c4f6..00000000 --- a/hosts/vidhar/borg/copy.py +++ /dev/null | |||
@@ -1,291 +0,0 @@ | |||
1 | #!@python@/bin/python | ||
2 | |||
3 | import json | ||
4 | import os | ||
5 | import subprocess | ||
6 | import re | ||
7 | import sys | ||
8 | from sys import stderr | ||
9 | from humanize import naturalsize | ||
10 | |||
11 | from tempfile import TemporaryDirectory | ||
12 | |||
13 | from datetime import (datetime, timedelta) | ||
14 | from dateutil.tz import (tzlocal, tzutc) | ||
15 | import dateutil.parser | ||
16 | import argparse | ||
17 | |||
18 | from tqdm import tqdm | ||
19 | |||
20 | from xdg import xdg_runtime_dir | ||
21 | import pathlib | ||
22 | |||
23 | import unshare | ||
24 | from pyprctl import cap_permitted, cap_inheritable, cap_effective, cap_ambient, Cap | ||
25 | from pwd import getpwnam | ||
26 | |||
27 | import signal | ||
28 | from time import sleep | ||
29 | |||
30 | from halo import Halo | ||
31 | |||
32 | from collections import deque | ||
33 | |||
34 | |||
35 | parser = argparse.ArgumentParser() | ||
36 | parser.add_argument('source', metavar='REPO_OR_ARCHIVE') | ||
37 | parser.add_argument('target', metavar='REPO_OR_ARCHIVE') | ||
38 | args = parser.parse_args() | ||
39 | |||
40 | halo_args = { | ||
41 | 'stream': stderr, | ||
42 | 'enabled': stderr.isatty(), | ||
43 | 'spinner': 'arc' | ||
44 | } | ||
45 | |||
46 | borg_pwd = getpwnam('borg') | ||
47 | |||
48 | def as_borg(caps=set(), cwd=None): | ||
49 | if caps: | ||
50 | cap_permitted.add(*caps) | ||
51 | cap_inheritable.add(*caps) | ||
52 | cap_effective.add(*caps) | ||
53 | cap_ambient.add(*caps) | ||
54 | |||
55 | os.setgid(borg_pwd.pw_gid) | ||
56 | os.setuid(borg_pwd.pw_uid) | ||
57 | |||
58 | if cwd is not None: | ||
59 | os.chdir(cwd) | ||
60 | |||
61 | def read_repo(path): | ||
62 | with Halo(text=f'Listing {path}', **halo_args) as sp: | ||
63 | res = None | ||
64 | with subprocess.Popen(['borg', 'list', '--info', '--lock-wait=600', '--json', path], stdout=subprocess.PIPE, preexec_fn=lambda: as_borg()) as proc: | ||
65 | res = json.load(proc.stdout)['archives'] | ||
66 | if sp.enabled: | ||
67 | sp.succeed(f'{len(res)} archives in {path}') | ||
68 | else: | ||
69 | print(f'{len(res)} archives in {path}', file=stderr) | ||
70 | return res | ||
71 | |||
72 | class ToSync: | ||
73 | to_sync = deque() | ||
74 | |||
75 | def __iter__(self): | ||
76 | return self | ||
77 | |||
78 | def __next__(self): | ||
79 | if self.to_sync: | ||
80 | return self.to_sync.popleft() | ||
81 | |||
82 | while True: | ||
83 | try: | ||
84 | src = read_repo(args.source) | ||
85 | dst = read_repo(args.target) | ||
86 | except (subprocess.CalledProcessError, json.decoder.JSONDecodeError) as err: | ||
87 | print(err, file=stderr) | ||
88 | continue | ||
89 | |||
90 | self.to_sync.extend([entry for entry in src if entry['name'] not in {dst_entry['name'] for dst_entry in dst} and not entry['name'].endswith('.checkpoint')]) | ||
91 | |||
92 | if self.to_sync: | ||
93 | return self.to_sync.popleft() | ||
94 | |||
95 | raise StopIteration | ||
96 | |||
97 | def copy_archive(src_repo_path, dst_repo_path, entry): | ||
98 | cache_suffix = None | ||
99 | with Halo(text=f'Determine archive parameters', **halo_args) as sp: | ||
100 | match = re.compile('^(.*)-[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(\.(checkpoint|recreate)(\.[0-9]+)?)?').fullmatch(entry['name']) | ||
101 | if match: | ||
102 | repo_id = None | ||
103 | with subprocess.Popen(['borg', 'info', '--info', '--lock-wait=600', '--json', src_repo_path], stdout=subprocess.PIPE, preexec_fn=lambda: as_borg()) as proc: | ||
104 | repo_id = json.load(proc.stdout)['repository']['id'] | ||
105 | if repo_id: | ||
106 | cache_suffix = f'{repo_id}_{match.group(1)}' | ||
107 | if sp.enabled: | ||
108 | sp.succeed(f'Will process {entry["name"]} ({dateutil.parser.isoparse(entry["start"])}, cache_suffix={cache_suffix})') | ||
109 | else: | ||
110 | print(f'Will process {entry["name"]} ({dateutil.parser.isoparse(entry["start"])}, cache_suffix={cache_suffix})', file=stderr) | ||
111 | with TemporaryDirectory(prefix=f'borg-mount_{entry["name"]}_', dir=os.environ.get('RUNTIME_DIRECTORY')) as tmpdir: | ||
112 | child = os.fork() | ||
113 | if child == 0: | ||
114 | # print('unshare/chroot', file=stderr) | ||
115 | unshare.unshare(unshare.CLONE_NEWNS) | ||
116 | subprocess.run(['mount', '--make-rprivate', '/'], check=True) | ||
117 | chroot = pathlib.Path(tmpdir) / 'chroot' | ||
118 | upper = pathlib.Path(tmpdir) / 'upper' | ||
119 | work = pathlib.Path(tmpdir) / 'work' | ||
120 | for path in [chroot,upper,work]: | ||
121 | path.mkdir() | ||
122 | subprocess.run(['mount', '-t', 'overlay', 'overlay', '-o', f'lowerdir=/,upperdir={upper},workdir={work}', chroot], check=True) | ||
123 | bindMounts = ['nix', 'run', 'run/secrets.d', 'run/wrappers', 'proc', 'dev', 'sys', pathlib.Path(os.path.expanduser('~')).relative_to('/')] | ||
124 | if os.environ.get('BORG_BASE_DIR'): | ||
125 | bindMounts.append(pathlib.Path(os.environ['BORG_BASE_DIR']).relative_to('/')) | ||
126 | if not ":" in src_repo_path: | ||
127 | bindMounts.append(pathlib.Path(src_repo_path).relative_to('/')) | ||
128 | if 'SSH_AUTH_SOCK' in os.environ: | ||
129 | bindMounts.append(pathlib.Path(os.environ['SSH_AUTH_SOCK']).parent.relative_to('/')) | ||
130 | for bindMount in bindMounts: | ||
131 | (chroot / bindMount).mkdir(parents=True,exist_ok=True) | ||
132 | # print(*['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], file=stderr) | ||
133 | subprocess.run(['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], check=True) | ||
134 | os.chroot(chroot) | ||
135 | os.chdir('/') | ||
136 | try: | ||
137 | os.unlink('/etc/fuse.conf') | ||
138 | except FileNotFoundError: | ||
139 | pass | ||
140 | pathlib.Path('/etc/fuse.conf').parent.mkdir(parents=True,exist_ok=True) | ||
141 | with open('/etc/fuse.conf', 'w') as fuse_conf: | ||
142 | fuse_conf.write('user_allow_other\nmount_max = 1000\n') | ||
143 | dir = pathlib.Path('/borg') | ||
144 | dir.mkdir(parents=True,exist_ok=True,mode=0o0750) | ||
145 | os.chown(dir, borg_pwd.pw_uid, borg_pwd.pw_gid) | ||
146 | |||
147 | total_size = None | ||
148 | total_files = None | ||
149 | if stderr.isatty(): | ||
150 | with Halo(text=f'Determine size', **halo_args) as sp: | ||
151 | with subprocess.Popen(['borg', 'info', '--info', '--json', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}'], stdout=subprocess.PIPE, text=True, preexec_fn=lambda: as_borg()) as proc: | ||
152 | stats = json.load(proc.stdout)['archives'][0]['stats'] | ||
153 | total_size = stats['original_size'] | ||
154 | total_files = stats['nfiles'] | ||
155 | if sp.enabled: | ||
156 | sp.succeed(f'{total_files} files, {naturalsize(total_size, binary=True)}') | ||
157 | else: | ||
158 | print(f'{total_files} files, {naturalsize(total_size, binary=True)}', file=stderr) | ||
159 | # print(f'Mounting to {dir}', file=stderr) | ||
160 | with subprocess.Popen(['borg', 'mount', '-o', 'allow_other,ignore_permissions', '--foreground', '--progress', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}', dir], preexec_fn=lambda: as_borg()) as mount_proc: | ||
161 | with Halo(text='Waiting for mount', **halo_args) as sp: | ||
162 | wait_start = datetime.now() | ||
163 | while True: | ||
164 | if os.path.ismount(dir): | ||
165 | break | ||
166 | elif datetime.now() - wait_start > timedelta(minutes=15): | ||
167 | ret.check_returncode() | ||
168 | sleep(0.1) | ||
169 | if sp.enabled: | ||
170 | sp.succeed('Mounted') | ||
171 | else: | ||
172 | print('Mounted', file=stderr) | ||
173 | while True: | ||
174 | with tqdm(total=total_size, unit_scale=True, unit_divisor=1024, unit='B', smoothing=0.01, disable=None, dynamic_ncols=True, maxinterval=0.5, miniters=1) as progress: | ||
175 | seen = 0 | ||
176 | env = os.environ.copy() | ||
177 | create_args = ['borg', | ||
178 | 'create', | ||
179 | '--lock-wait=600', | ||
180 | '--one-file-system', | ||
181 | '--compression=auto,zstd,10', | ||
182 | '--chunker-params=10,23,16,4095', | ||
183 | '--files-cache=ctime,size', | ||
184 | '--show-rc', | ||
185 | '--upload-buffer=100', | ||
186 | '--upload-ratelimit=20480', | ||
187 | '--log-json', | ||
188 | '--progress', | ||
189 | '--list', | ||
190 | '--filter=AMEi-x?', | ||
191 | '--stats' | ||
192 | ] | ||
193 | archive_time = datetime.strptime(entry["time"], "%Y-%m-%dT%H:%M:%S.%f").replace(tzinfo=tzlocal()).astimezone(tzutc()) | ||
194 | create_args += [f'--timestamp={archive_time.strftime("%Y-%m-%dT%H:%M:%S")}'] | ||
195 | if cache_suffix: | ||
196 | env['BORG_FILES_CACHE_SUFFIX'] = cache_suffix | ||
197 | else: | ||
198 | create_args += ['--files-cache=disabled'] | ||
199 | create_args += [f'{dst_repo_path}::{entry["name"]}', '.'] | ||
200 | |||
201 | with subprocess.Popen(create_args, stdin=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True, env=env, preexec_fn=lambda: as_borg(caps={Cap.DAC_READ_SEARCH}, cwd=dir)) as proc: | ||
202 | last_list = None | ||
203 | last_list_time = None | ||
204 | for line in proc.stderr: | ||
205 | try: | ||
206 | json_line = json.loads(line) | ||
207 | except json.decoder.JSONDecodeError: | ||
208 | tqdm.write(line) | ||
209 | continue | ||
210 | |||
211 | t = '' | ||
212 | if 'time' in json_line and stderr.isatty(): | ||
213 | ts = datetime.fromtimestamp(json_line['time']).replace(tzinfo=tzlocal()) | ||
214 | t = f'{ts.isoformat(timespec="minutes")} ' | ||
215 | if json_line['type'] == 'archive_progress': | ||
216 | if last_list_time is None or ((datetime.now() - last_list_time) // timedelta(seconds=3)) % 2 == 1: | ||
217 | if 'path' in json_line and json_line['path']: | ||
218 | progress.set_description(f'… {json_line["path"]}', refresh=False) | ||
219 | else: | ||
220 | progress.set_description(None, refresh=False) | ||
221 | elif last_list is not None: | ||
222 | progress.set_description(last_list, refresh=False) | ||
223 | nfiles=json_line["nfiles"] | ||
224 | if total_files is not None: | ||
225 | nfiles=f'{json_line["nfiles"]}/{total_files}' | ||
226 | progress.set_postfix(compressed=naturalsize(json_line['compressed_size'], binary=True), deduplicated=naturalsize(json_line['deduplicated_size'], binary=True), nfiles=nfiles, refresh=False) | ||
227 | progress.update(json_line["original_size"] - seen) | ||
228 | seen = json_line["original_size"] | ||
229 | elif json_line['type'] == 'file_status': | ||
230 | # tqdm.write(t + f'{json_line["status"]} {json_line["path"]}') | ||
231 | last_list = f'{json_line["status"]} {json_line["path"]}' | ||
232 | last_list_time = datetime.now() | ||
233 | progress.set_description(last_list, refresh=False) | ||
234 | if not stderr.isatty(): | ||
235 | print(last_list, file=stderr) | ||
236 | elif (json_line['type'] == 'log_message' or json_line['type'] == 'progress_message' or json_line['type'] == 'progress_percent') and ('message' in json_line or 'msgid' in json_line): | ||
237 | if 'message' in json_line: | ||
238 | tqdm.write(t + json_line['message']) | ||
239 | elif 'msgid' in json_line: | ||
240 | tqdm.write(t + json_line['msgid']) | ||
241 | else: | ||
242 | tqdm.write(t + line) | ||
243 | progress.set_description(None) | ||
244 | if proc.wait() != 0: | ||
245 | dst = None | ||
246 | try: | ||
247 | dst = read_repo(args.target) | ||
248 | except (subprocess.CalledProcessError, json.decoder.JSONDecodeError) as err: | ||
249 | print(err, file=stderr) | ||
250 | continue | ||
251 | else: | ||
252 | if any(map(lambda other: entry['name'] == other['name'], dst)): | ||
253 | break | ||
254 | |||
255 | continue | ||
256 | mount_proc.terminate() | ||
257 | os._exit(0) | ||
258 | else: | ||
259 | while True: | ||
260 | waitpid, waitret = os.wait() | ||
261 | if waitret != 0: | ||
262 | sys.exit(waitret) | ||
263 | if waitpid == child: | ||
264 | break | ||
265 | |||
266 | def sigterm(signum, frame): | ||
267 | raise SystemExit(128 + signum) | ||
268 | |||
269 | def main(): | ||
270 | signal.signal(signal.SIGTERM, sigterm) | ||
271 | |||
272 | if "::" in args.source: | ||
273 | (src_repo_path, _, src_archive) = args.source.partition("::") | ||
274 | entry = None | ||
275 | for candidate_entry in read_repo(src_repo_path): | ||
276 | if entry['name'] != src_archive: | ||
277 | continue | ||
278 | entry = candidate_entry | ||
279 | break | ||
280 | |||
281 | if entry is None: | ||
282 | print("Did not find archive", file=stderr) | ||
283 | os.exit(1) | ||
284 | |||
285 | copy_archive(src_repo_path, args.target, entry) | ||
286 | else: | ||
287 | for entry in ToSync(): | ||
288 | copy_archive(args.source, args.target, entry) | ||
289 | |||
290 | if __name__ == "__main__": | ||
291 | sys.exit(main()) | ||