summaryrefslogtreecommitdiff
path: root/hosts/vidhar/borg/copy.py
diff options
context:
space:
mode:
Diffstat (limited to 'hosts/vidhar/borg/copy.py')
-rwxr-xr-xhosts/vidhar/borg/copy.py291
1 files changed, 0 insertions, 291 deletions
diff --git a/hosts/vidhar/borg/copy.py b/hosts/vidhar/borg/copy.py
deleted file mode 100755
index c648c4f6..00000000
--- a/hosts/vidhar/borg/copy.py
+++ /dev/null
@@ -1,291 +0,0 @@
1#!@python@/bin/python
2
3import json
4import os
5import subprocess
6import re
7import sys
8from sys import stderr
9from humanize import naturalsize
10
11from tempfile import TemporaryDirectory
12
13from datetime import (datetime, timedelta)
14from dateutil.tz import (tzlocal, tzutc)
15import dateutil.parser
16import argparse
17
18from tqdm import tqdm
19
20from xdg import xdg_runtime_dir
21import pathlib
22
23import unshare
24from pyprctl import cap_permitted, cap_inheritable, cap_effective, cap_ambient, Cap
25from pwd import getpwnam
26
27import signal
28from time import sleep
29
30from halo import Halo
31
32from collections import deque
33
34
35parser = argparse.ArgumentParser()
36parser.add_argument('source', metavar='REPO_OR_ARCHIVE')
37parser.add_argument('target', metavar='REPO_OR_ARCHIVE')
38args = parser.parse_args()
39
40halo_args = {
41 'stream': stderr,
42 'enabled': stderr.isatty(),
43 'spinner': 'arc'
44}
45
46borg_pwd = getpwnam('borg')
47
48def as_borg(caps=set(), cwd=None):
49 if caps:
50 cap_permitted.add(*caps)
51 cap_inheritable.add(*caps)
52 cap_effective.add(*caps)
53 cap_ambient.add(*caps)
54
55 os.setgid(borg_pwd.pw_gid)
56 os.setuid(borg_pwd.pw_uid)
57
58 if cwd is not None:
59 os.chdir(cwd)
60
61def read_repo(path):
62 with Halo(text=f'Listing {path}', **halo_args) as sp:
63 res = None
64 with subprocess.Popen(['borg', 'list', '--info', '--lock-wait=600', '--json', path], stdout=subprocess.PIPE, preexec_fn=lambda: as_borg()) as proc:
65 res = json.load(proc.stdout)['archives']
66 if sp.enabled:
67 sp.succeed(f'{len(res)} archives in {path}')
68 else:
69 print(f'{len(res)} archives in {path}', file=stderr)
70 return res
71
72class ToSync:
73 to_sync = deque()
74
75 def __iter__(self):
76 return self
77
78 def __next__(self):
79 if self.to_sync:
80 return self.to_sync.popleft()
81
82 while True:
83 try:
84 src = read_repo(args.source)
85 dst = read_repo(args.target)
86 except (subprocess.CalledProcessError, json.decoder.JSONDecodeError) as err:
87 print(err, file=stderr)
88 continue
89
90 self.to_sync.extend([entry for entry in src if entry['name'] not in {dst_entry['name'] for dst_entry in dst} and not entry['name'].endswith('.checkpoint')])
91
92 if self.to_sync:
93 return self.to_sync.popleft()
94
95 raise StopIteration
96
97def copy_archive(src_repo_path, dst_repo_path, entry):
98 cache_suffix = None
99 with Halo(text=f'Determine archive parameters', **halo_args) as sp:
100 match = re.compile('^(.*)-[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(\.(checkpoint|recreate)(\.[0-9]+)?)?').fullmatch(entry['name'])
101 if match:
102 repo_id = None
103 with subprocess.Popen(['borg', 'info', '--info', '--lock-wait=600', '--json', src_repo_path], stdout=subprocess.PIPE, preexec_fn=lambda: as_borg()) as proc:
104 repo_id = json.load(proc.stdout)['repository']['id']
105 if repo_id:
106 cache_suffix = f'{repo_id}_{match.group(1)}'
107 if sp.enabled:
108 sp.succeed(f'Will process {entry["name"]} ({dateutil.parser.isoparse(entry["start"])}, cache_suffix={cache_suffix})')
109 else:
110 print(f'Will process {entry["name"]} ({dateutil.parser.isoparse(entry["start"])}, cache_suffix={cache_suffix})', file=stderr)
111 with TemporaryDirectory(prefix=f'borg-mount_{entry["name"]}_', dir=os.environ.get('RUNTIME_DIRECTORY')) as tmpdir:
112 child = os.fork()
113 if child == 0:
114 # print('unshare/chroot', file=stderr)
115 unshare.unshare(unshare.CLONE_NEWNS)
116 subprocess.run(['mount', '--make-rprivate', '/'], check=True)
117 chroot = pathlib.Path(tmpdir) / 'chroot'
118 upper = pathlib.Path(tmpdir) / 'upper'
119 work = pathlib.Path(tmpdir) / 'work'
120 for path in [chroot,upper,work]:
121 path.mkdir()
122 subprocess.run(['mount', '-t', 'overlay', 'overlay', '-o', f'lowerdir=/,upperdir={upper},workdir={work}', chroot], check=True)
123 bindMounts = ['nix', 'run', 'run/secrets.d', 'run/wrappers', 'proc', 'dev', 'sys', pathlib.Path(os.path.expanduser('~')).relative_to('/')]
124 if os.environ.get('BORG_BASE_DIR'):
125 bindMounts.append(pathlib.Path(os.environ['BORG_BASE_DIR']).relative_to('/'))
126 if not ":" in src_repo_path:
127 bindMounts.append(pathlib.Path(src_repo_path).relative_to('/'))
128 if 'SSH_AUTH_SOCK' in os.environ:
129 bindMounts.append(pathlib.Path(os.environ['SSH_AUTH_SOCK']).parent.relative_to('/'))
130 for bindMount in bindMounts:
131 (chroot / bindMount).mkdir(parents=True,exist_ok=True)
132 # print(*['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], file=stderr)
133 subprocess.run(['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], check=True)
134 os.chroot(chroot)
135 os.chdir('/')
136 try:
137 os.unlink('/etc/fuse.conf')
138 except FileNotFoundError:
139 pass
140 pathlib.Path('/etc/fuse.conf').parent.mkdir(parents=True,exist_ok=True)
141 with open('/etc/fuse.conf', 'w') as fuse_conf:
142 fuse_conf.write('user_allow_other\nmount_max = 1000\n')
143 dir = pathlib.Path('/borg')
144 dir.mkdir(parents=True,exist_ok=True,mode=0o0750)
145 os.chown(dir, borg_pwd.pw_uid, borg_pwd.pw_gid)
146
147 total_size = None
148 total_files = None
149 if stderr.isatty():
150 with Halo(text=f'Determine size', **halo_args) as sp:
151 with subprocess.Popen(['borg', 'info', '--info', '--json', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}'], stdout=subprocess.PIPE, text=True, preexec_fn=lambda: as_borg()) as proc:
152 stats = json.load(proc.stdout)['archives'][0]['stats']
153 total_size = stats['original_size']
154 total_files = stats['nfiles']
155 if sp.enabled:
156 sp.succeed(f'{total_files} files, {naturalsize(total_size, binary=True)}')
157 else:
158 print(f'{total_files} files, {naturalsize(total_size, binary=True)}', file=stderr)
159 # print(f'Mounting to {dir}', file=stderr)
160 with subprocess.Popen(['borg', 'mount', '-o', 'allow_other,ignore_permissions', '--foreground', '--progress', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}', dir], preexec_fn=lambda: as_borg()) as mount_proc:
161 with Halo(text='Waiting for mount', **halo_args) as sp:
162 wait_start = datetime.now()
163 while True:
164 if os.path.ismount(dir):
165 break
166 elif datetime.now() - wait_start > timedelta(minutes=15):
167 ret.check_returncode()
168 sleep(0.1)
169 if sp.enabled:
170 sp.succeed('Mounted')
171 else:
172 print('Mounted', file=stderr)
173 while True:
174 with tqdm(total=total_size, unit_scale=True, unit_divisor=1024, unit='B', smoothing=0.01, disable=None, dynamic_ncols=True, maxinterval=0.5, miniters=1) as progress:
175 seen = 0
176 env = os.environ.copy()
177 create_args = ['borg',
178 'create',
179 '--lock-wait=600',
180 '--one-file-system',
181 '--compression=auto,zstd,10',
182 '--chunker-params=10,23,16,4095',
183 '--files-cache=ctime,size',
184 '--show-rc',
185 '--upload-buffer=100',
186 '--upload-ratelimit=20480',
187 '--log-json',
188 '--progress',
189 '--list',
190 '--filter=AMEi-x?',
191 '--stats'
192 ]
193 archive_time = datetime.strptime(entry["time"], "%Y-%m-%dT%H:%M:%S.%f").replace(tzinfo=tzlocal()).astimezone(tzutc())
194 create_args += [f'--timestamp={archive_time.strftime("%Y-%m-%dT%H:%M:%S")}']
195 if cache_suffix:
196 env['BORG_FILES_CACHE_SUFFIX'] = cache_suffix
197 else:
198 create_args += ['--files-cache=disabled']
199 create_args += [f'{dst_repo_path}::{entry["name"]}', '.']
200
201 with subprocess.Popen(create_args, stdin=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True, env=env, preexec_fn=lambda: as_borg(caps={Cap.DAC_READ_SEARCH}, cwd=dir)) as proc:
202 last_list = None
203 last_list_time = None
204 for line in proc.stderr:
205 try:
206 json_line = json.loads(line)
207 except json.decoder.JSONDecodeError:
208 tqdm.write(line)
209 continue
210
211 t = ''
212 if 'time' in json_line and stderr.isatty():
213 ts = datetime.fromtimestamp(json_line['time']).replace(tzinfo=tzlocal())
214 t = f'{ts.isoformat(timespec="minutes")} '
215 if json_line['type'] == 'archive_progress':
216 if last_list_time is None or ((datetime.now() - last_list_time) // timedelta(seconds=3)) % 2 == 1:
217 if 'path' in json_line and json_line['path']:
218 progress.set_description(f'… {json_line["path"]}', refresh=False)
219 else:
220 progress.set_description(None, refresh=False)
221 elif last_list is not None:
222 progress.set_description(last_list, refresh=False)
223 nfiles=json_line["nfiles"]
224 if total_files is not None:
225 nfiles=f'{json_line["nfiles"]}/{total_files}'
226 progress.set_postfix(compressed=naturalsize(json_line['compressed_size'], binary=True), deduplicated=naturalsize(json_line['deduplicated_size'], binary=True), nfiles=nfiles, refresh=False)
227 progress.update(json_line["original_size"] - seen)
228 seen = json_line["original_size"]
229 elif json_line['type'] == 'file_status':
230 # tqdm.write(t + f'{json_line["status"]} {json_line["path"]}')
231 last_list = f'{json_line["status"]} {json_line["path"]}'
232 last_list_time = datetime.now()
233 progress.set_description(last_list, refresh=False)
234 if not stderr.isatty():
235 print(last_list, file=stderr)
236 elif (json_line['type'] == 'log_message' or json_line['type'] == 'progress_message' or json_line['type'] == 'progress_percent') and ('message' in json_line or 'msgid' in json_line):
237 if 'message' in json_line:
238 tqdm.write(t + json_line['message'])
239 elif 'msgid' in json_line:
240 tqdm.write(t + json_line['msgid'])
241 else:
242 tqdm.write(t + line)
243 progress.set_description(None)
244 if proc.wait() != 0:
245 dst = None
246 try:
247 dst = read_repo(args.target)
248 except (subprocess.CalledProcessError, json.decoder.JSONDecodeError) as err:
249 print(err, file=stderr)
250 continue
251 else:
252 if any(map(lambda other: entry['name'] == other['name'], dst)):
253 break
254
255 continue
256 mount_proc.terminate()
257 os._exit(0)
258 else:
259 while True:
260 waitpid, waitret = os.wait()
261 if waitret != 0:
262 sys.exit(waitret)
263 if waitpid == child:
264 break
265
266def sigterm(signum, frame):
267 raise SystemExit(128 + signum)
268
269def main():
270 signal.signal(signal.SIGTERM, sigterm)
271
272 if "::" in args.source:
273 (src_repo_path, _, src_archive) = args.source.partition("::")
274 entry = None
275 for candidate_entry in read_repo(src_repo_path):
276 if entry['name'] != src_archive:
277 continue
278 entry = candidate_entry
279 break
280
281 if entry is None:
282 print("Did not find archive", file=stderr)
283 os.exit(1)
284
285 copy_archive(src_repo_path, args.target, entry)
286 else:
287 for entry in ToSync():
288 copy_archive(args.source, args.target, entry)
289
290if __name__ == "__main__":
291 sys.exit(main())