diff options
Diffstat (limited to 'hosts/vidhar/borg/copy.py')
-rwxr-xr-x | hosts/vidhar/borg/copy.py | 216 |
1 files changed, 216 insertions, 0 deletions
diff --git a/hosts/vidhar/borg/copy.py b/hosts/vidhar/borg/copy.py new file mode 100755 index 00000000..b99e301a --- /dev/null +++ b/hosts/vidhar/borg/copy.py | |||
@@ -0,0 +1,216 @@ | |||
1 | #!@python@/bin/python | ||
2 | |||
3 | import json | ||
4 | import os | ||
5 | import subprocess | ||
6 | import re | ||
7 | import sys | ||
8 | from sys import stderr | ||
9 | from humanize import naturalsize | ||
10 | |||
11 | from tempfile import TemporaryDirectory | ||
12 | |||
13 | from datetime import (datetime, timedelta) | ||
14 | from dateutil.tz import (tzlocal, tzutc) | ||
15 | import dateutil.parser | ||
16 | import argparse | ||
17 | |||
18 | from tqdm import tqdm | ||
19 | |||
20 | from xdg import xdg_runtime_dir | ||
21 | import pathlib | ||
22 | |||
23 | import unshare | ||
24 | from time import sleep | ||
25 | |||
26 | from halo import Halo | ||
27 | |||
28 | |||
29 | parser = argparse.ArgumentParser() | ||
30 | parser.add_argument('source', metavar='REPO_OR_ARCHIVE') | ||
31 | parser.add_argument('target', metavar='REPO_OR_ARCHIVE') | ||
32 | args = parser.parse_args() | ||
33 | |||
34 | def read_repo(path): | ||
35 | with Halo(text=f'Listing {path}', spinner='arc') as sp: | ||
36 | res = None | ||
37 | with subprocess.Popen(['borg', 'list', '--info', '--lock-wait', '120', '--json', path], stdout=subprocess.PIPE) as proc: | ||
38 | res = json.load(proc.stdout)['archives'] | ||
39 | sp.succeed(f'{len(res)} archives in {path}') | ||
40 | return res | ||
41 | |||
42 | class ToSync: | ||
43 | def __iter__(self): | ||
44 | return self | ||
45 | |||
46 | def __next__(self): | ||
47 | while True: | ||
48 | try: | ||
49 | src = read_repo(args.source) | ||
50 | dst = read_repo(args.target) | ||
51 | for entry in src: | ||
52 | if entry['name'] not in {dst_entry['name'] for dst_entry in dst} and not entry['name'].endswith('.checkpoint'): | ||
53 | return entry | ||
54 | raise StopIteration | ||
55 | except (subprocess.CalledProcessError, json.decoder.JSONDecodeError) as err: | ||
56 | print(err, file=stderr) | ||
57 | continue | ||
58 | |||
59 | def copy_archive(src_repo_path, dst_repo_path, entry): | ||
60 | cache_suffix = None | ||
61 | with Halo(text=f'Determine archive parameters', spinner='arc') as sp: | ||
62 | match = re.compile('^(.*)-[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(\.(checkpoint|recreate)(\.[0-9]+)?)?').fullmatch(entry['name']) | ||
63 | if match: | ||
64 | repo_id = None | ||
65 | with subprocess.Popen(['borg', 'info', '--info', '--lock-wait', '120', '--json', src_repo_path], stdout=subprocess.PIPE) as proc: | ||
66 | repo_id = json.load(proc.stdout)['repository']['id'] | ||
67 | if repo_id: | ||
68 | cache_suffix = f'{repo_id}_{match.group(1)}' | ||
69 | sp.succeed(f'Will process {entry["name"]} ({dateutil.parser.isoparse(entry["start"])}, cache_suffix={cache_suffix})') | ||
70 | with TemporaryDirectory(prefix=f'borg-mount_{entry["name"]}_') as tmpdir: | ||
71 | child = os.fork() | ||
72 | if child == 0: | ||
73 | # print('unshare/chroot', file=stderr) | ||
74 | unshare.unshare(unshare.CLONE_NEWNS) | ||
75 | subprocess.run(['mount', '--make-rprivate', '/'], check=True) | ||
76 | chroot = pathlib.Path(tmpdir) / 'chroot' | ||
77 | upper = pathlib.Path(tmpdir) / 'upper' | ||
78 | work = pathlib.Path(tmpdir) / 'work' | ||
79 | for path in [chroot,upper,work]: | ||
80 | path.mkdir() | ||
81 | subprocess.run(['mount', '-t', 'overlay', 'overlay', '-o', f'lowerdir=/,upperdir={upper},workdir={work}', chroot], check=True) | ||
82 | bindMounts = ['nix', 'run', 'proc', 'dev', 'sys', pathlib.Path(os.path.expanduser('~')).relative_to('/')] | ||
83 | if not ":" in src_repo_path: | ||
84 | bindMounts.append(pathlib.Path(src_repo_path).relative_to('/')) | ||
85 | if 'SSH_AUTH_SOCK' in os.environ: | ||
86 | bindMounts.append(pathlib.Path(os.environ['SSH_AUTH_SOCK']).parent.relative_to('/')) | ||
87 | for bindMount in bindMounts: | ||
88 | (chroot / bindMount).mkdir(parents=True,exist_ok=True) | ||
89 | subprocess.run(['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], check=True) | ||
90 | os.chroot(chroot) | ||
91 | os.chdir('/') | ||
92 | dir = pathlib.Path('/borg') | ||
93 | dir.mkdir(parents=True,exist_ok=True) | ||
94 | with Halo(text=f'Determine size', spinner='arc') as sp: | ||
95 | total_size = None | ||
96 | total_files = None | ||
97 | with subprocess.Popen(['borg', 'info', '--info', '--json', '--lock-wait', '120', f'{src_repo_path}::{entry["name"]}'], stdout=subprocess.PIPE, text=True) as proc: | ||
98 | stats = json.load(proc.stdout)['archives'][0]['stats'] | ||
99 | total_size = stats['original_size'] | ||
100 | total_files = stats['nfiles'] | ||
101 | sp.succeed(f'{total_files} files, {naturalsize(total_size, binary=True)}') | ||
102 | # print(f'Mounting to {dir}', file=stderr) | ||
103 | with subprocess.Popen(['borg', 'mount', '--foreground', '--progress', '--lock-wait', '120', f'{src_repo_path}::{entry["name"]}', dir]) as mount_proc: | ||
104 | with Halo(text='Waiting for mount', spinner='arc') as sp: | ||
105 | wait_start = datetime.now() | ||
106 | while True: | ||
107 | ret = subprocess.run(['mountpoint', '-q', dir]) | ||
108 | if ret.returncode == 0: | ||
109 | break | ||
110 | elif datetime.now() - wait_start > timedelta(minutes=10): | ||
111 | ret.check_returncode() | ||
112 | sleep(0.1) | ||
113 | sp.succeed('Mounted') | ||
114 | while True: | ||
115 | try: | ||
116 | with tqdm(total=total_size, unit_scale=True, unit_divisor=1024, unit='B', smoothing=0.01, disable=None, dynamic_ncols=True, maxinterval=0.5, miniters=1) as progress: | ||
117 | seen = 0 | ||
118 | env = os.environ.copy() | ||
119 | create_args = ['borg', | ||
120 | 'create', | ||
121 | '--lock-wait=120', | ||
122 | '--one-file-system', | ||
123 | '--compression=auto,zstd,10', | ||
124 | '--chunker-params=10,23,16,4095', | ||
125 | '--files-cache=ctime,size', | ||
126 | '--show-rc', | ||
127 | # '--remote-ratelimit=20480', | ||
128 | '--log-json', | ||
129 | '--progress', | ||
130 | '--list', | ||
131 | '--filter=AMEi-x?', | ||
132 | '--stats' | ||
133 | ] | ||
134 | archive_time = datetime.strptime(entry["time"], "%Y-%m-%dT%H:%M:%S.%f").replace(tzinfo=tzlocal()).astimezone(tzutc()) | ||
135 | create_args += [f'--timestamp={archive_time.strftime("%Y-%m-%dT%H:%M:%S")}'] | ||
136 | if cache_suffix: | ||
137 | env['BORG_FILES_CACHE_SUFFIX'] = cache_suffix | ||
138 | else: | ||
139 | create_args += ['--files-cache=disabled'] | ||
140 | create_args += [f'{dst_repo_path}::{entry["name"]}', '.'] | ||
141 | with subprocess.Popen(create_args, cwd=dir, stdin=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True, env=env) as proc: | ||
142 | last_list = None | ||
143 | last_list_time = None | ||
144 | for line in proc.stderr: | ||
145 | try: | ||
146 | json_line = json.loads(line) | ||
147 | except json.decoder.JSONDecodeError: | ||
148 | tqdm.write(line) | ||
149 | continue | ||
150 | |||
151 | t = '' | ||
152 | if 'time' in json_line: | ||
153 | ts = datetime.fromtimestamp(json_line['time']).replace(tzinfo=tzlocal()) | ||
154 | t = f'{ts.isoformat(timespec="minutes")} ' | ||
155 | if json_line['type'] == 'archive_progress': | ||
156 | if last_list_time is None or ((datetime.now() - last_list_time) // timedelta(seconds=3)) % 2 == 1: | ||
157 | if 'path' in json_line and json_line['path']: | ||
158 | progress.set_description(f'… {json_line["path"]}', refresh=False) | ||
159 | else: | ||
160 | progress.set_description(None, refresh=False) | ||
161 | elif last_list is not None: | ||
162 | progress.set_description(last_list, refresh=False) | ||
163 | progress.set_postfix(compressed=naturalsize(json_line['compressed_size'], binary=True), deduplicated=naturalsize(json_line['deduplicated_size'], binary=True), nfiles=f'{json_line["nfiles"]}/{total_files}', refresh=False) | ||
164 | progress.update(json_line["original_size"] - seen) | ||
165 | seen = json_line["original_size"] | ||
166 | elif json_line['type'] == 'file_status': | ||
167 | # tqdm.write(t + f'{json_line["status"]} {json_line["path"]}') | ||
168 | last_list = f'{json_line["status"]} {json_line["path"]}' | ||
169 | last_list_time = datetime.now() | ||
170 | progress.set_description(last_list, refresh=False) | ||
171 | elif (json_line['type'] == 'log_message' or json_line['type'] == 'progress_message' or json_line['type'] == 'progress_percent') and ('message' in json_line or 'msgid' in json_line): | ||
172 | if 'message' in json_line: | ||
173 | tqdm.write(t + json_line['message']) | ||
174 | elif 'msgid' in json_line: | ||
175 | tqdm.write(t + json_line['msgid']) | ||
176 | else: | ||
177 | tqdm.write(t + line) | ||
178 | progress.set_description(None) | ||
179 | if proc.wait() != 0: | ||
180 | continue | ||
181 | except subprocess.CalledProcessError as err: | ||
182 | print(err, file=stderr) | ||
183 | continue | ||
184 | else: | ||
185 | break | ||
186 | mount_proc.terminate() | ||
187 | os._exit(0) | ||
188 | else: | ||
189 | while True: | ||
190 | waitpid, waitret = os.wait() | ||
191 | if waitret != 0: | ||
192 | sys.exit(waitret) | ||
193 | if waitpid == child: | ||
194 | break | ||
195 | |||
196 | def main(): | ||
197 | if "::" in args.source: | ||
198 | (src_repo_path, _, src_archive) = args.source.partition("::") | ||
199 | entry = None | ||
200 | for candidate_entry in read_repo(src_repo_path): | ||
201 | if entry['name'] != src_archive: | ||
202 | continue | ||
203 | entry = candidate_entry | ||
204 | break | ||
205 | |||
206 | if entry is None: | ||
207 | print("Did not find archive", file=stderr) | ||
208 | os.exit(1) | ||
209 | |||
210 | copy_archive(src_repo_path, args.target, entry) | ||
211 | else: | ||
212 | for entry in ToSync(): | ||
213 | copy_archive(args.source, args.target, entry) | ||
214 | |||
215 | if __name__ == "__main__": | ||
216 | sys.exit(main()) | ||