summaryrefslogtreecommitdiff
path: root/hosts/vidhar/borg/copy.py
diff options
context:
space:
mode:
authorGregor Kleen <gkleen@yggdrasil.li>2022-02-18 11:57:23 +0100
committerGregor Kleen <gkleen@yggdrasil.li>2022-02-18 11:57:23 +0100
commit42620945a92775ecab4f47280083ec8e0169317d (patch)
treeae2b06492d10f722c231df2b26a8434f0b95766f /hosts/vidhar/borg/copy.py
parentacf3920a56b7b8b298f17e03441039bd97a1c232 (diff)
downloadnixos-42620945a92775ecab4f47280083ec8e0169317d.tar
nixos-42620945a92775ecab4f47280083ec8e0169317d.tar.gz
nixos-42620945a92775ecab4f47280083ec8e0169317d.tar.bz2
nixos-42620945a92775ecab4f47280083ec8e0169317d.tar.xz
nixos-42620945a92775ecab4f47280083ec8e0169317d.zip
vidhar: ...
Diffstat (limited to 'hosts/vidhar/borg/copy.py')
-rwxr-xr-xhosts/vidhar/borg/copy.py219
1 files changed, 108 insertions, 111 deletions
diff --git a/hosts/vidhar/borg/copy.py b/hosts/vidhar/borg/copy.py
index 1714e2e8..168fa967 100755
--- a/hosts/vidhar/borg/copy.py
+++ b/hosts/vidhar/borg/copy.py
@@ -45,10 +45,17 @@ halo_args = {
45 45
46borg_pwd = getpwnam('borg') 46borg_pwd = getpwnam('borg')
47 47
48def as_borg(caps=set()): 48def as_borg(caps=set(), newns=True):
49 if caps: 49 if caps:
50 for capset in [cap_permitted, cap_inheritable, cap_effective, cap_ambient]: 50 for capset in [cap_permitted, cap_inheritable, cap_effective, cap_ambient]:
51 capset.add(*caps) 51 capset.add(*caps)
52
53 if newns:
54 unshare.unshare(unshare.CLONE_NEWNS)
55 subprocess.run(['mount', '--make-rprivate', '/'], check=True)
56 if os.environ.get('CREDENTIALS_DIRECTORY'):
57 creds_path = pathlib.Path(os.environ['CREDENTIALS_DIRECTORY']).relative_to('/')
58 subprocess.run(['bindfs', '-r', f'--force-user={borg_pwd.pw_uid}', f'--force-group={borg_pwd.pw_gid}', pathlib.Path('/') / creds_path, chroot / creds_path], check=True)
52 59
53 os.setgid(borg_pwd.pw_gid) 60 os.setgid(borg_pwd.pw_gid)
54 os.setuid(borg_pwd.pw_uid) 61 os.setuid(borg_pwd.pw_uid)
@@ -104,8 +111,9 @@ def copy_archive(src_repo_path, dst_repo_path, entry):
104 else: 111 else:
105 print(f'Will process {entry["name"]} ({dateutil.parser.isoparse(entry["start"])}, cache_suffix={cache_suffix})', file=stderr) 112 print(f'Will process {entry["name"]} ({dateutil.parser.isoparse(entry["start"])}, cache_suffix={cache_suffix})', file=stderr)
106 with TemporaryDirectory(prefix=f'borg-mount_{entry["name"]}_', dir=os.environ.get('RUNTIME_DIRECTORY')) as tmpdir: 113 with TemporaryDirectory(prefix=f'borg-mount_{entry["name"]}_', dir=os.environ.get('RUNTIME_DIRECTORY')) as tmpdir:
107 child = os.fork() 114 dir = pathlib.Path('/borg')
108 if child == 0: 115
116 def preexec_fn():
109 # print('unshare/chroot', file=stderr) 117 # print('unshare/chroot', file=stderr)
110 unshare.unshare(unshare.CLONE_NEWNS) 118 unshare.unshare(unshare.CLONE_NEWNS)
111 subprocess.run(['mount', '--make-rprivate', '/'], check=True) 119 subprocess.run(['mount', '--make-rprivate', '/'], check=True)
@@ -126,123 +134,112 @@ def copy_archive(src_repo_path, dst_repo_path, entry):
126 (chroot / bindMount).mkdir(parents=True,exist_ok=True) 134 (chroot / bindMount).mkdir(parents=True,exist_ok=True)
127 # print(*['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], file=stderr) 135 # print(*['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], file=stderr)
128 subprocess.run(['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], check=True) 136 subprocess.run(['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], check=True)
129 if os.environ.get('CREDENTIALS_DIRECTORY'):
130 creds_path = pathlib.Path(os.environ['CREDENTIALS_DIRECTORY']).relative_to('/')
131 subprocess.run(['binfs', '-r', f'--force-user={borg_pwd.pw_uid}', f'--force-group={borg_pwd.pw_gid}', pathlib.Path('/') / creds_path, chroot / creds_path], check=True)
132 os.chroot(chroot) 137 os.chroot(chroot)
133 os.chdir('/') 138 os.chdir('/')
134 dir = pathlib.Path('/borg')
135 dir.mkdir(parents=True,exist_ok=True,mode=750) 139 dir.mkdir(parents=True,exist_ok=True,mode=750)
136 os.chown(dir, borg_pwd.pw_uid, borg_pwd.pw_gid) 140 os.chown(dir, borg_pwd.pw_uid, borg_pwd.pw_gid)
137 with Halo(text=f'Determine size', **halo_args) as sp: 141 as_borg(newns=False, caps={Cap.DAC_READ_SEARCH})
138 total_size = None 142 with Halo(text=f'Determine size', **halo_args) as sp:
139 total_files = None 143 total_size = None
140 with subprocess.Popen(['borg', 'info', '--info', '--json', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}'], stdout=subprocess.PIPE, text=True, preexec_fn=lambda: as_borg()) as proc: 144 total_files = None
141 stats = json.load(proc.stdout)['archives'][0]['stats'] 145 with subprocess.Popen(['borg', 'info', '--info', '--json', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}'], stdout=subprocess.PIPE, text=True, preexec_fn=lambda: as_borg()) as proc:
142 total_size = stats['original_size'] 146 stats = json.load(proc.stdout)['archives'][0]['stats']
143 total_files = stats['nfiles'] 147 total_size = stats['original_size']
148 total_files = stats['nfiles']
149 if sp.enabled:
150 sp.succeed(f'{total_files} files, {naturalsize(total_size, binary=True)}')
151 else:
152 print(f'{total_files} files, {naturalsize(total_size, binary=True)}', file=stderr)
153 # print(f'Mounting to {dir}', file=stderr)
154 with subprocess.Popen(['borg', 'mount', '--foreground', '--progress', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}', dir], preexec_fn=lambda: as_borg()) as mount_proc:
155 with Halo(text='Waiting for mount', **halo_args) as sp:
156 wait_start = datetime.now()
157 while True:
158 ret = subprocess.run(['mountpoint', '-q', dir])
159 if ret.returncode == 0:
160 break
161 elif datetime.now() - wait_start > timedelta(minutes=10):
162 ret.check_returncode()
163 sleep(0.1)
144 if sp.enabled: 164 if sp.enabled:
145 sp.succeed(f'{total_files} files, {naturalsize(total_size, binary=True)}') 165 sp.succeed('Mounted')
146 else: 166 else:
147 print(f'{total_files} files, {naturalsize(total_size, binary=True)}', file=stderr) 167 print('Mounted', file=stderr)
148 # print(f'Mounting to {dir}', file=stderr) 168 while True:
149 with subprocess.Popen(['borg', 'mount', '--foreground', '--progress', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}', dir], preexec_fn=lambda: as_borg()) as mount_proc: 169 try:
150 with Halo(text='Waiting for mount', **halo_args) as sp: 170 with tqdm(total=total_size, unit_scale=True, unit_divisor=1024, unit='B', smoothing=0.01, disable=None, dynamic_ncols=True, maxinterval=0.5, miniters=1) as progress:
151 wait_start = datetime.now() 171 seen = 0
152 while True: 172 env = os.environ.copy()
153 ret = subprocess.run(['mountpoint', '-q', dir]) 173 create_args = ['borg',
154 if ret.returncode == 0: 174 'create',
155 break 175 '--lock-wait=600',
156 elif datetime.now() - wait_start > timedelta(minutes=10): 176 '--one-file-system',
157 ret.check_returncode() 177 '--compression=auto,zstd,10',
158 sleep(0.1) 178 '--chunker-params=10,23,16,4095',
159 if sp.enabled: 179 '--files-cache=ctime,size',
160 sp.succeed('Mounted') 180 '--show-rc',
161 else: 181 # '--remote-ratelimit=20480',
162 print('Mounted', file=stderr) 182 '--log-json',
163 while True: 183 '--progress',
164 try: 184 '--list',
165 with tqdm(total=total_size, unit_scale=True, unit_divisor=1024, unit='B', smoothing=0.01, disable=None, dynamic_ncols=True, maxinterval=0.5, miniters=1) as progress: 185 '--filter=AMEi-x?',
166 seen = 0 186 '--stats'
167 env = os.environ.copy() 187 ]
168 create_args = ['borg', 188 archive_time = datetime.strptime(entry["time"], "%Y-%m-%dT%H:%M:%S.%f").replace(tzinfo=tzlocal()).astimezone(tzutc())
169 'create', 189 create_args += [f'--timestamp={archive_time.strftime("%Y-%m-%dT%H:%M:%S")}']
170 '--lock-wait=600', 190 if cache_suffix:
171 '--one-file-system', 191 env['BORG_FILES_CACHE_SUFFIX'] = cache_suffix
172 '--compression=auto,zstd,10', 192 else:
173 '--chunker-params=10,23,16,4095', 193 create_args += ['--files-cache=disabled']
174 '--files-cache=ctime,size', 194 create_args += [f'{dst_repo_path}::{entry["name"]}', '.']
175 '--show-rc', 195 with subprocess.Popen(create_args, cwd=dir, stdin=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True, env=env, preexec_fn=preexec_fn) as proc:
176 # '--remote-ratelimit=20480', 196 last_list = None
177 '--log-json', 197 last_list_time = None
178 '--progress', 198 for line in proc.stderr:
179 '--list', 199 try:
180 '--filter=AMEi-x?', 200 json_line = json.loads(line)
181 '--stats' 201 except json.decoder.JSONDecodeError:
182 ] 202 tqdm.write(line)
183 archive_time = datetime.strptime(entry["time"], "%Y-%m-%dT%H:%M:%S.%f").replace(tzinfo=tzlocal()).astimezone(tzutc()) 203 continue
184 create_args += [f'--timestamp={archive_time.strftime("%Y-%m-%dT%H:%M:%S")}']
185 if cache_suffix:
186 env['BORG_FILES_CACHE_SUFFIX'] = cache_suffix
187 else:
188 create_args += ['--files-cache=disabled']
189 create_args += [f'{dst_repo_path}::{entry["name"]}', '.']
190 with subprocess.Popen(create_args, cwd=dir, stdin=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True, env=env, preexec_fn=lambda: as_borg(caps={Cap.DAC_READ_SEARCH})) as proc:
191 last_list = None
192 last_list_time = None
193 for line in proc.stderr:
194 try:
195 json_line = json.loads(line)
196 except json.decoder.JSONDecodeError:
197 tqdm.write(line)
198 continue
199 204
200 t = '' 205 t = ''
201 if 'time' in json_line and stderr.isatty(): 206 if 'time' in json_line and stderr.isatty():
202 ts = datetime.fromtimestamp(json_line['time']).replace(tzinfo=tzlocal()) 207 ts = datetime.fromtimestamp(json_line['time']).replace(tzinfo=tzlocal())
203 t = f'{ts.isoformat(timespec="minutes")} ' 208 t = f'{ts.isoformat(timespec="minutes")} '
204 if json_line['type'] == 'archive_progress': 209 if json_line['type'] == 'archive_progress':
205 if last_list_time is None or ((datetime.now() - last_list_time) // timedelta(seconds=3)) % 2 == 1: 210 if last_list_time is None or ((datetime.now() - last_list_time) // timedelta(seconds=3)) % 2 == 1:
206 if 'path' in json_line and json_line['path']: 211 if 'path' in json_line and json_line['path']:
207 progress.set_description(f'… {json_line["path"]}', refresh=False) 212 progress.set_description(f'… {json_line["path"]}', refresh=False)
208 else: 213 else:
209 progress.set_description(None, refresh=False) 214 progress.set_description(None, refresh=False)
210 elif last_list is not None: 215 elif last_list is not None:
211 progress.set_description(last_list, refresh=False)
212 progress.set_postfix(compressed=naturalsize(json_line['compressed_size'], binary=True), deduplicated=naturalsize(json_line['deduplicated_size'], binary=True), nfiles=f'{json_line["nfiles"]}/{total_files}', refresh=False)
213 progress.update(json_line["original_size"] - seen)
214 seen = json_line["original_size"]
215 elif json_line['type'] == 'file_status':
216 # tqdm.write(t + f'{json_line["status"]} {json_line["path"]}')
217 last_list = f'{json_line["status"]} {json_line["path"]}'
218 last_list_time = datetime.now()
219 progress.set_description(last_list, refresh=False) 216 progress.set_description(last_list, refresh=False)
220 if not stderr.isatty(): 217 progress.set_postfix(compressed=naturalsize(json_line['compressed_size'], binary=True), deduplicated=naturalsize(json_line['deduplicated_size'], binary=True), nfiles=f'{json_line["nfiles"]}/{total_files}', refresh=False)
221 print(last_list, file=stderr) 218 progress.update(json_line["original_size"] - seen)
222 elif (json_line['type'] == 'log_message' or json_line['type'] == 'progress_message' or json_line['type'] == 'progress_percent') and ('message' in json_line or 'msgid' in json_line): 219 seen = json_line["original_size"]
223 if 'message' in json_line: 220 elif json_line['type'] == 'file_status':
224 tqdm.write(t + json_line['message']) 221 # tqdm.write(t + f'{json_line["status"]} {json_line["path"]}')
225 elif 'msgid' in json_line: 222 last_list = f'{json_line["status"]} {json_line["path"]}'
226 tqdm.write(t + json_line['msgid']) 223 last_list_time = datetime.now()
227 else: 224 progress.set_description(last_list, refresh=False)
228 tqdm.write(t + line) 225 if not stderr.isatty():
229 progress.set_description(None) 226 print(last_list, file=stderr)
230 if proc.wait() != 0: 227 elif (json_line['type'] == 'log_message' or json_line['type'] == 'progress_message' or json_line['type'] == 'progress_percent') and ('message' in json_line or 'msgid' in json_line):
231 continue 228 if 'message' in json_line:
232 except subprocess.CalledProcessError as err: 229 tqdm.write(t + json_line['message'])
233 print(err, file=stderr) 230 elif 'msgid' in json_line:
234 continue 231 tqdm.write(t + json_line['msgid'])
235 else: 232 else:
236 break 233 tqdm.write(t + line)
237 mount_proc.terminate() 234 progress.set_description(None)
238 os._exit(0) 235 if proc.wait() != 0:
239 else: 236 continue
240 while True: 237 except subprocess.CalledProcessError as err:
241 waitpid, waitret = os.wait() 238 print(err, file=stderr)
242 if waitret != 0: 239 continue
243 sys.exit(waitret) 240 else:
244 if waitpid == child:
245 break 241 break
242 mount_proc.terminate()
246 243
247def sigterm(signum, frame): 244def sigterm(signum, frame):
248 raise SystemExit(128 + signum) 245 raise SystemExit(128 + signum)