diff options
Diffstat (limited to 'hosts')
-rwxr-xr-x | hosts/vidhar/borg/copy.py | 219 |
1 files changed, 108 insertions, 111 deletions
diff --git a/hosts/vidhar/borg/copy.py b/hosts/vidhar/borg/copy.py index 1714e2e8..168fa967 100755 --- a/hosts/vidhar/borg/copy.py +++ b/hosts/vidhar/borg/copy.py | |||
@@ -45,10 +45,17 @@ halo_args = { | |||
45 | 45 | ||
46 | borg_pwd = getpwnam('borg') | 46 | borg_pwd = getpwnam('borg') |
47 | 47 | ||
48 | def as_borg(caps=set()): | 48 | def as_borg(caps=set(), newns=True): |
49 | if caps: | 49 | if caps: |
50 | for capset in [cap_permitted, cap_inheritable, cap_effective, cap_ambient]: | 50 | for capset in [cap_permitted, cap_inheritable, cap_effective, cap_ambient]: |
51 | capset.add(*caps) | 51 | capset.add(*caps) |
52 | |||
53 | if newns: | ||
54 | unshare.unshare(unshare.CLONE_NEWNS) | ||
55 | subprocess.run(['mount', '--make-rprivate', '/'], check=True) | ||
56 | if os.environ.get('CREDENTIALS_DIRECTORY'): | ||
57 | creds_path = pathlib.Path(os.environ['CREDENTIALS_DIRECTORY']).relative_to('/') | ||
58 | subprocess.run(['bindfs', '-r', f'--force-user={borg_pwd.pw_uid}', f'--force-group={borg_pwd.pw_gid}', pathlib.Path('/') / creds_path, chroot / creds_path], check=True) | ||
52 | 59 | ||
53 | os.setgid(borg_pwd.pw_gid) | 60 | os.setgid(borg_pwd.pw_gid) |
54 | os.setuid(borg_pwd.pw_uid) | 61 | os.setuid(borg_pwd.pw_uid) |
@@ -104,8 +111,9 @@ def copy_archive(src_repo_path, dst_repo_path, entry): | |||
104 | else: | 111 | else: |
105 | print(f'Will process {entry["name"]} ({dateutil.parser.isoparse(entry["start"])}, cache_suffix={cache_suffix})', file=stderr) | 112 | print(f'Will process {entry["name"]} ({dateutil.parser.isoparse(entry["start"])}, cache_suffix={cache_suffix})', file=stderr) |
106 | with TemporaryDirectory(prefix=f'borg-mount_{entry["name"]}_', dir=os.environ.get('RUNTIME_DIRECTORY')) as tmpdir: | 113 | with TemporaryDirectory(prefix=f'borg-mount_{entry["name"]}_', dir=os.environ.get('RUNTIME_DIRECTORY')) as tmpdir: |
107 | child = os.fork() | 114 | dir = pathlib.Path('/borg') |
108 | if child == 0: | 115 | |
116 | def preexec_fn(): | ||
109 | # print('unshare/chroot', file=stderr) | 117 | # print('unshare/chroot', file=stderr) |
110 | unshare.unshare(unshare.CLONE_NEWNS) | 118 | unshare.unshare(unshare.CLONE_NEWNS) |
111 | subprocess.run(['mount', '--make-rprivate', '/'], check=True) | 119 | subprocess.run(['mount', '--make-rprivate', '/'], check=True) |
@@ -126,123 +134,112 @@ def copy_archive(src_repo_path, dst_repo_path, entry): | |||
126 | (chroot / bindMount).mkdir(parents=True,exist_ok=True) | 134 | (chroot / bindMount).mkdir(parents=True,exist_ok=True) |
127 | # print(*['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], file=stderr) | 135 | # print(*['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], file=stderr) |
128 | subprocess.run(['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], check=True) | 136 | subprocess.run(['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], check=True) |
129 | if os.environ.get('CREDENTIALS_DIRECTORY'): | ||
130 | creds_path = pathlib.Path(os.environ['CREDENTIALS_DIRECTORY']).relative_to('/') | ||
131 | subprocess.run(['binfs', '-r', f'--force-user={borg_pwd.pw_uid}', f'--force-group={borg_pwd.pw_gid}', pathlib.Path('/') / creds_path, chroot / creds_path], check=True) | ||
132 | os.chroot(chroot) | 137 | os.chroot(chroot) |
133 | os.chdir('/') | 138 | os.chdir('/') |
134 | dir = pathlib.Path('/borg') | ||
135 | dir.mkdir(parents=True,exist_ok=True,mode=750) | 139 | dir.mkdir(parents=True,exist_ok=True,mode=750) |
136 | os.chown(dir, borg_pwd.pw_uid, borg_pwd.pw_gid) | 140 | os.chown(dir, borg_pwd.pw_uid, borg_pwd.pw_gid) |
137 | with Halo(text=f'Determine size', **halo_args) as sp: | 141 | as_borg(newns=False, caps={Cap.DAC_READ_SEARCH}) |
138 | total_size = None | 142 | with Halo(text=f'Determine size', **halo_args) as sp: |
139 | total_files = None | 143 | total_size = None |
140 | with subprocess.Popen(['borg', 'info', '--info', '--json', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}'], stdout=subprocess.PIPE, text=True, preexec_fn=lambda: as_borg()) as proc: | 144 | total_files = None |
141 | stats = json.load(proc.stdout)['archives'][0]['stats'] | 145 | with subprocess.Popen(['borg', 'info', '--info', '--json', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}'], stdout=subprocess.PIPE, text=True, preexec_fn=lambda: as_borg()) as proc: |
142 | total_size = stats['original_size'] | 146 | stats = json.load(proc.stdout)['archives'][0]['stats'] |
143 | total_files = stats['nfiles'] | 147 | total_size = stats['original_size'] |
148 | total_files = stats['nfiles'] | ||
149 | if sp.enabled: | ||
150 | sp.succeed(f'{total_files} files, {naturalsize(total_size, binary=True)}') | ||
151 | else: | ||
152 | print(f'{total_files} files, {naturalsize(total_size, binary=True)}', file=stderr) | ||
153 | # print(f'Mounting to {dir}', file=stderr) | ||
154 | with subprocess.Popen(['borg', 'mount', '--foreground', '--progress', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}', dir], preexec_fn=lambda: as_borg()) as mount_proc: | ||
155 | with Halo(text='Waiting for mount', **halo_args) as sp: | ||
156 | wait_start = datetime.now() | ||
157 | while True: | ||
158 | ret = subprocess.run(['mountpoint', '-q', dir]) | ||
159 | if ret.returncode == 0: | ||
160 | break | ||
161 | elif datetime.now() - wait_start > timedelta(minutes=10): | ||
162 | ret.check_returncode() | ||
163 | sleep(0.1) | ||
144 | if sp.enabled: | 164 | if sp.enabled: |
145 | sp.succeed(f'{total_files} files, {naturalsize(total_size, binary=True)}') | 165 | sp.succeed('Mounted') |
146 | else: | 166 | else: |
147 | print(f'{total_files} files, {naturalsize(total_size, binary=True)}', file=stderr) | 167 | print('Mounted', file=stderr) |
148 | # print(f'Mounting to {dir}', file=stderr) | 168 | while True: |
149 | with subprocess.Popen(['borg', 'mount', '--foreground', '--progress', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}', dir], preexec_fn=lambda: as_borg()) as mount_proc: | 169 | try: |
150 | with Halo(text='Waiting for mount', **halo_args) as sp: | 170 | with tqdm(total=total_size, unit_scale=True, unit_divisor=1024, unit='B', smoothing=0.01, disable=None, dynamic_ncols=True, maxinterval=0.5, miniters=1) as progress: |
151 | wait_start = datetime.now() | 171 | seen = 0 |
152 | while True: | 172 | env = os.environ.copy() |
153 | ret = subprocess.run(['mountpoint', '-q', dir]) | 173 | create_args = ['borg', |
154 | if ret.returncode == 0: | 174 | 'create', |
155 | break | 175 | '--lock-wait=600', |
156 | elif datetime.now() - wait_start > timedelta(minutes=10): | 176 | '--one-file-system', |
157 | ret.check_returncode() | 177 | '--compression=auto,zstd,10', |
158 | sleep(0.1) | 178 | '--chunker-params=10,23,16,4095', |
159 | if sp.enabled: | 179 | '--files-cache=ctime,size', |
160 | sp.succeed('Mounted') | 180 | '--show-rc', |
161 | else: | 181 | # '--remote-ratelimit=20480', |
162 | print('Mounted', file=stderr) | 182 | '--log-json', |
163 | while True: | 183 | '--progress', |
164 | try: | 184 | '--list', |
165 | with tqdm(total=total_size, unit_scale=True, unit_divisor=1024, unit='B', smoothing=0.01, disable=None, dynamic_ncols=True, maxinterval=0.5, miniters=1) as progress: | 185 | '--filter=AMEi-x?', |
166 | seen = 0 | 186 | '--stats' |
167 | env = os.environ.copy() | 187 | ] |
168 | create_args = ['borg', | 188 | archive_time = datetime.strptime(entry["time"], "%Y-%m-%dT%H:%M:%S.%f").replace(tzinfo=tzlocal()).astimezone(tzutc()) |
169 | 'create', | 189 | create_args += [f'--timestamp={archive_time.strftime("%Y-%m-%dT%H:%M:%S")}'] |
170 | '--lock-wait=600', | 190 | if cache_suffix: |
171 | '--one-file-system', | 191 | env['BORG_FILES_CACHE_SUFFIX'] = cache_suffix |
172 | '--compression=auto,zstd,10', | 192 | else: |
173 | '--chunker-params=10,23,16,4095', | 193 | create_args += ['--files-cache=disabled'] |
174 | '--files-cache=ctime,size', | 194 | create_args += [f'{dst_repo_path}::{entry["name"]}', '.'] |
175 | '--show-rc', | 195 | with subprocess.Popen(create_args, cwd=dir, stdin=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True, env=env, preexec_fn=preexec_fn) as proc: |
176 | # '--remote-ratelimit=20480', | 196 | last_list = None |
177 | '--log-json', | 197 | last_list_time = None |
178 | '--progress', | 198 | for line in proc.stderr: |
179 | '--list', | 199 | try: |
180 | '--filter=AMEi-x?', | 200 | json_line = json.loads(line) |
181 | '--stats' | 201 | except json.decoder.JSONDecodeError: |
182 | ] | 202 | tqdm.write(line) |
183 | archive_time = datetime.strptime(entry["time"], "%Y-%m-%dT%H:%M:%S.%f").replace(tzinfo=tzlocal()).astimezone(tzutc()) | 203 | continue |
184 | create_args += [f'--timestamp={archive_time.strftime("%Y-%m-%dT%H:%M:%S")}'] | ||
185 | if cache_suffix: | ||
186 | env['BORG_FILES_CACHE_SUFFIX'] = cache_suffix | ||
187 | else: | ||
188 | create_args += ['--files-cache=disabled'] | ||
189 | create_args += [f'{dst_repo_path}::{entry["name"]}', '.'] | ||
190 | with subprocess.Popen(create_args, cwd=dir, stdin=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True, env=env, preexec_fn=lambda: as_borg(caps={Cap.DAC_READ_SEARCH})) as proc: | ||
191 | last_list = None | ||
192 | last_list_time = None | ||
193 | for line in proc.stderr: | ||
194 | try: | ||
195 | json_line = json.loads(line) | ||
196 | except json.decoder.JSONDecodeError: | ||
197 | tqdm.write(line) | ||
198 | continue | ||
199 | 204 | ||
200 | t = '' | 205 | t = '' |
201 | if 'time' in json_line and stderr.isatty(): | 206 | if 'time' in json_line and stderr.isatty(): |
202 | ts = datetime.fromtimestamp(json_line['time']).replace(tzinfo=tzlocal()) | 207 | ts = datetime.fromtimestamp(json_line['time']).replace(tzinfo=tzlocal()) |
203 | t = f'{ts.isoformat(timespec="minutes")} ' | 208 | t = f'{ts.isoformat(timespec="minutes")} ' |
204 | if json_line['type'] == 'archive_progress': | 209 | if json_line['type'] == 'archive_progress': |
205 | if last_list_time is None or ((datetime.now() - last_list_time) // timedelta(seconds=3)) % 2 == 1: | 210 | if last_list_time is None or ((datetime.now() - last_list_time) // timedelta(seconds=3)) % 2 == 1: |
206 | if 'path' in json_line and json_line['path']: | 211 | if 'path' in json_line and json_line['path']: |
207 | progress.set_description(f'… {json_line["path"]}', refresh=False) | 212 | progress.set_description(f'… {json_line["path"]}', refresh=False) |
208 | else: | 213 | else: |
209 | progress.set_description(None, refresh=False) | 214 | progress.set_description(None, refresh=False) |
210 | elif last_list is not None: | 215 | elif last_list is not None: |
211 | progress.set_description(last_list, refresh=False) | ||
212 | progress.set_postfix(compressed=naturalsize(json_line['compressed_size'], binary=True), deduplicated=naturalsize(json_line['deduplicated_size'], binary=True), nfiles=f'{json_line["nfiles"]}/{total_files}', refresh=False) | ||
213 | progress.update(json_line["original_size"] - seen) | ||
214 | seen = json_line["original_size"] | ||
215 | elif json_line['type'] == 'file_status': | ||
216 | # tqdm.write(t + f'{json_line["status"]} {json_line["path"]}') | ||
217 | last_list = f'{json_line["status"]} {json_line["path"]}' | ||
218 | last_list_time = datetime.now() | ||
219 | progress.set_description(last_list, refresh=False) | 216 | progress.set_description(last_list, refresh=False) |
220 | if not stderr.isatty(): | 217 | progress.set_postfix(compressed=naturalsize(json_line['compressed_size'], binary=True), deduplicated=naturalsize(json_line['deduplicated_size'], binary=True), nfiles=f'{json_line["nfiles"]}/{total_files}', refresh=False) |
221 | print(last_list, file=stderr) | 218 | progress.update(json_line["original_size"] - seen) |
222 | elif (json_line['type'] == 'log_message' or json_line['type'] == 'progress_message' or json_line['type'] == 'progress_percent') and ('message' in json_line or 'msgid' in json_line): | 219 | seen = json_line["original_size"] |
223 | if 'message' in json_line: | 220 | elif json_line['type'] == 'file_status': |
224 | tqdm.write(t + json_line['message']) | 221 | # tqdm.write(t + f'{json_line["status"]} {json_line["path"]}') |
225 | elif 'msgid' in json_line: | 222 | last_list = f'{json_line["status"]} {json_line["path"]}' |
226 | tqdm.write(t + json_line['msgid']) | 223 | last_list_time = datetime.now() |
227 | else: | 224 | progress.set_description(last_list, refresh=False) |
228 | tqdm.write(t + line) | 225 | if not stderr.isatty(): |
229 | progress.set_description(None) | 226 | print(last_list, file=stderr) |
230 | if proc.wait() != 0: | 227 | elif (json_line['type'] == 'log_message' or json_line['type'] == 'progress_message' or json_line['type'] == 'progress_percent') and ('message' in json_line or 'msgid' in json_line): |
231 | continue | 228 | if 'message' in json_line: |
232 | except subprocess.CalledProcessError as err: | 229 | tqdm.write(t + json_line['message']) |
233 | print(err, file=stderr) | 230 | elif 'msgid' in json_line: |
234 | continue | 231 | tqdm.write(t + json_line['msgid']) |
235 | else: | 232 | else: |
236 | break | 233 | tqdm.write(t + line) |
237 | mount_proc.terminate() | 234 | progress.set_description(None) |
238 | os._exit(0) | 235 | if proc.wait() != 0: |
239 | else: | 236 | continue |
240 | while True: | 237 | except subprocess.CalledProcessError as err: |
241 | waitpid, waitret = os.wait() | 238 | print(err, file=stderr) |
242 | if waitret != 0: | 239 | continue |
243 | sys.exit(waitret) | 240 | else: |
244 | if waitpid == child: | ||
245 | break | 241 | break |
242 | mount_proc.terminate() | ||
246 | 243 | ||
247 | def sigterm(signum, frame): | 244 | def sigterm(signum, frame): |
248 | raise SystemExit(128 + signum) | 245 | raise SystemExit(128 + signum) |