summaryrefslogtreecommitdiff
path: root/hosts/vidhar/borg/copy.py
diff options
context:
space:
mode:
Diffstat (limited to 'hosts/vidhar/borg/copy.py')
-rwxr-xr-xhosts/vidhar/borg/copy.py214
1 files changed, 113 insertions, 101 deletions
diff --git a/hosts/vidhar/borg/copy.py b/hosts/vidhar/borg/copy.py
index f685a490..e7d0becc 100755
--- a/hosts/vidhar/borg/copy.py
+++ b/hosts/vidhar/borg/copy.py
@@ -104,9 +104,9 @@ def copy_archive(src_repo_path, dst_repo_path, entry):
104 else: 104 else:
105 print(f'Will process {entry["name"]} ({dateutil.parser.isoparse(entry["start"])}, cache_suffix={cache_suffix})', file=stderr) 105 print(f'Will process {entry["name"]} ({dateutil.parser.isoparse(entry["start"])}, cache_suffix={cache_suffix})', file=stderr)
106 with TemporaryDirectory(prefix=f'borg-mount_{entry["name"]}_', dir=os.environ.get('RUNTIME_DIRECTORY')) as tmpdir: 106 with TemporaryDirectory(prefix=f'borg-mount_{entry["name"]}_', dir=os.environ.get('RUNTIME_DIRECTORY')) as tmpdir:
107 dir = pathlib.Path('/borg') 107 child = os.fork()
108 108 if child == 0:
109 def preexec_fn(): 109 # print('unshare/chroot', file=stderr)
110 unshare.unshare(unshare.CLONE_NEWNS) 110 unshare.unshare(unshare.CLONE_NEWNS)
111 subprocess.run(['mount', '--make-rprivate', '/'], check=True) 111 subprocess.run(['mount', '--make-rprivate', '/'], check=True)
112 chroot = pathlib.Path(tmpdir) / 'chroot' 112 chroot = pathlib.Path(tmpdir) / 'chroot'
@@ -128,111 +128,123 @@ def copy_archive(src_repo_path, dst_repo_path, entry):
128 subprocess.run(['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], check=True) 128 subprocess.run(['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], check=True)
129 os.chroot(chroot) 129 os.chroot(chroot)
130 os.chdir('/') 130 os.chdir('/')
131 dir = pathlib.Path('/borg')
131 dir.mkdir(parents=True,exist_ok=True,mode=750) 132 dir.mkdir(parents=True,exist_ok=True,mode=750)
132 os.chown(dir, borg_pwd.pw_uid, borg_pwd.pw_gid) 133 os.chown(dir, borg_pwd.pw_uid, borg_pwd.pw_gid)
133 as_borg(caps={Cap.DAC_READ_SEARCH})
134 134
135 total_size = None 135 total_size = None
136 total_files = None 136 total_files = None
137 with Halo(text=f'Determine size', **halo_args) as sp: 137 if stderr.isatty():
138 with subprocess.Popen(['borg', 'info', '--info', '--json', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}'], stdout=subprocess.PIPE, text=True, preexec_fn=lambda: as_borg()) as proc: 138 with Halo(text=f'Determine size', **halo_args) as sp:
139 stats = json.load(proc.stdout)['archives'][0]['stats'] 139 with subprocess.Popen(['borg', 'info', '--info', '--json', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}'], stdout=subprocess.PIPE, text=True, preexec_fn=lambda: as_borg()) as proc:
140 total_size = stats['original_size'] 140 stats = json.load(proc.stdout)['archives'][0]['stats']
141 total_files = stats['nfiles'] 141 total_size = stats['original_size']
142 if sp.enabled: 142 total_files = stats['nfiles']
143 sp.succeed(f'{total_files} files, {naturalsize(total_size, binary=True)}') 143 if sp.enabled:
144 else: 144 sp.succeed(f'{total_files} files, {naturalsize(total_size, binary=True)}')
145 print(f'{total_files} files, {naturalsize(total_size, binary=True)}', file=stderr) 145 else:
146 # print(f'Mounting to {dir}', file=stderr) 146 print(f'{total_files} files, {naturalsize(total_size, binary=True)}', file=stderr)
147 with subprocess.Popen(['borg', 'mount', '--foreground', '--progress', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}', dir], preexec_fn=lambda: as_borg()) as mount_proc: 147 # print(f'Mounting to {dir}', file=stderr)
148 with Halo(text='Waiting for mount', **halo_args) as sp: 148 with subprocess.Popen(['borg', 'mount', '--foreground', '--progress', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}', dir], preexec_fn=lambda: as_borg()) as mount_proc:
149 wait_start = datetime.now() 149 with Halo(text='Waiting for mount', **halo_args) as sp:
150 wait_start = datetime.now()
151 while True:
152 ret = subprocess.run(['mountpoint', '-q', dir])
153 if ret.returncode == 0:
154 break
155 elif datetime.now() - wait_start > timedelta(minutes=10):
156 ret.check_returncode()
157 sleep(0.1)
158 if sp.enabled:
159 sp.succeed('Mounted')
160 else:
161 print('Mounted', file=stderr)
150 while True: 162 while True:
151 ret = subprocess.run(['mountpoint', '-q', dir]) 163 try:
152 if ret.returncode == 0: 164 with tqdm(total=total_size, unit_scale=True, unit_divisor=1024, unit='B', smoothing=0.01, disable=None, dynamic_ncols=True, maxinterval=0.5, miniters=1) as progress:
153 break 165 seen = 0
154 elif datetime.now() - wait_start > timedelta(minutes=10): 166 env = os.environ.copy()
155 ret.check_returncode() 167 create_args = ['borg',
156 sleep(0.1) 168 'create',
157 if sp.enabled: 169 '--lock-wait=600',
158 sp.succeed('Mounted') 170 '--one-file-system',
159 else: 171 '--compression=auto,zstd,10',
160 print('Mounted', file=stderr) 172 '--chunker-params=10,23,16,4095',
161 while True: 173 '--files-cache=ctime,size',
162 try: 174 '--show-rc',
163 with tqdm(total=total_size, unit_scale=True, unit_divisor=1024, unit='B', smoothing=0.01, disable=None, dynamic_ncols=True, maxinterval=0.5, miniters=1) as progress: 175 # '--remote-ratelimit=20480',
164 seen = 0 176 '--log-json',
165 env = os.environ.copy() 177 '--progress',
166 create_args = ['borg', 178 '--list',
167 'create', 179 '--filter=AMEi-x?',
168 '--lock-wait=600', 180 '--stats'
169 '--one-file-system', 181 ]
170 '--compression=auto,zstd,10', 182 archive_time = datetime.strptime(entry["time"], "%Y-%m-%dT%H:%M:%S.%f").replace(tzinfo=tzlocal()).astimezone(tzutc())
171 '--chunker-params=10,23,16,4095', 183 create_args += [f'--timestamp={archive_time.strftime("%Y-%m-%dT%H:%M:%S")}']
172 '--files-cache=ctime,size', 184 if cache_suffix:
173 '--show-rc', 185 env['BORG_FILES_CACHE_SUFFIX'] = cache_suffix
174 # '--remote-ratelimit=20480', 186 else:
175 '--log-json', 187 create_args += ['--files-cache=disabled']
176 '--progress', 188 create_args += [f'{dst_repo_path}::{entry["name"]}', '.']
177 '--list', 189 with subprocess.Popen(create_args, cwd=dir, stdin=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True, env=env, preexec_fn=lambda: as_borg(caps={Cap.DAC_READ_SEARCH})) as proc:
178 '--filter=AMEi-x?', 190 last_list = None
179 '--stats' 191 last_list_time = None
180 ] 192 for line in proc.stderr:
181 archive_time = datetime.strptime(entry["time"], "%Y-%m-%dT%H:%M:%S.%f").replace(tzinfo=tzlocal()).astimezone(tzutc()) 193 try:
182 create_args += [f'--timestamp={archive_time.strftime("%Y-%m-%dT%H:%M:%S")}'] 194 json_line = json.loads(line)
183 if cache_suffix: 195 except json.decoder.JSONDecodeError:
184 env['BORG_FILES_CACHE_SUFFIX'] = cache_suffix 196 tqdm.write(line)
185 else: 197 continue
186 create_args += ['--files-cache=disabled']
187 create_args += [f'{dst_repo_path}::{entry["name"]}', '.']
188 with subprocess.Popen(create_args, cwd=dir, stdin=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True, env=env, preexec_fn=preexec_fn) as proc:
189 last_list = None
190 last_list_time = None
191 for line in proc.stderr:
192 try:
193 json_line = json.loads(line)
194 except json.decoder.JSONDecodeError:
195 tqdm.write(line)
196 continue
197 198
198 t = '' 199 t = ''
199 if 'time' in json_line and stderr.isatty(): 200 if 'time' in json_line and stderr.isatty():
200 ts = datetime.fromtimestamp(json_line['time']).replace(tzinfo=tzlocal()) 201 ts = datetime.fromtimestamp(json_line['time']).replace(tzinfo=tzlocal())
201 t = f'{ts.isoformat(timespec="minutes")} ' 202 t = f'{ts.isoformat(timespec="minutes")} '
202 if json_line['type'] == 'archive_progress': 203 if json_line['type'] == 'archive_progress':
203 if last_list_time is None or ((datetime.now() - last_list_time) // timedelta(seconds=3)) % 2 == 1: 204 if last_list_time is None or ((datetime.now() - last_list_time) // timedelta(seconds=3)) % 2 == 1:
204 if 'path' in json_line and json_line['path']: 205 if 'path' in json_line and json_line['path']:
205 progress.set_description(f'… {json_line["path"]}', refresh=False) 206 progress.set_description(f'… {json_line["path"]}', refresh=False)
206 else: 207 else:
207 progress.set_description(None, refresh=False) 208 progress.set_description(None, refresh=False)
208 elif last_list is not None: 209 elif last_list is not None:
210 progress.set_description(last_list, refresh=False)
211 nfiles=json_line["nfiles"]
212 if total_files is not None:
213 nfiles=f'{json_line["nfiles"]}/{total_files}'
214 progress.set_postfix(compressed=naturalsize(json_line['compressed_size'], binary=True), deduplicated=naturalsize(json_line['deduplicated_size'], binary=True), nfiles=nfiles, refresh=False)
215 progress.update(json_line["original_size"] - seen)
216 seen = json_line["original_size"]
217 elif json_line['type'] == 'file_status':
218 # tqdm.write(t + f'{json_line["status"]} {json_line["path"]}')
219 last_list = f'{json_line["status"]} {json_line["path"]}'
220 last_list_time = datetime.now()
209 progress.set_description(last_list, refresh=False) 221 progress.set_description(last_list, refresh=False)
210 progress.set_postfix(compressed=naturalsize(json_line['compressed_size'], binary=True), deduplicated=naturalsize(json_line['deduplicated_size'], binary=True), nfiles=f'{json_line["nfiles"]}/{total_files}', refresh=False) 222 if not stderr.isatty():
211 progress.update(json_line["original_size"] - seen) 223 print(last_list, file=stderr)
212 seen = json_line["original_size"] 224 elif (json_line['type'] == 'log_message' or json_line['type'] == 'progress_message' or json_line['type'] == 'progress_percent') and ('message' in json_line or 'msgid' in json_line):
213 elif json_line['type'] == 'file_status': 225 if 'message' in json_line:
214 # tqdm.write(t + f'{json_line["status"]} {json_line["path"]}') 226 tqdm.write(t + json_line['message'])
215 last_list = f'{json_line["status"]} {json_line["path"]}' 227 elif 'msgid' in json_line:
216 last_list_time = datetime.now() 228 tqdm.write(t + json_line['msgid'])
217 progress.set_description(last_list, refresh=False) 229 else:
218 if not stderr.isatty(): 230 tqdm.write(t + line)
219 print(last_list, file=stderr) 231 progress.set_description(None)
220 elif (json_line['type'] == 'log_message' or json_line['type'] == 'progress_message' or json_line['type'] == 'progress_percent') and ('message' in json_line or 'msgid' in json_line): 232 if proc.wait() != 0:
221 if 'message' in json_line: 233 continue
222 tqdm.write(t + json_line['message']) 234 except subprocess.CalledProcessError as err:
223 elif 'msgid' in json_line: 235 print(err, file=stderr)
224 tqdm.write(t + json_line['msgid']) 236 continue
225 else: 237 else:
226 tqdm.write(t + line) 238 break
227 progress.set_description(None) 239 mount_proc.terminate()
228 if proc.wait() != 0: 240 os._exit(0)
229 continue 241 else:
230 except subprocess.CalledProcessError as err: 242 while True:
231 print(err, file=stderr) 243 waitpid, waitret = os.wait()
232 continue 244 if waitret != 0:
233 else: 245 sys.exit(waitret)
246 if waitpid == child:
234 break 247 break
235 mount_proc.terminate()
236 248
237def sigterm(signum, frame): 249def sigterm(signum, frame):
238 raise SystemExit(128 + signum) 250 raise SystemExit(128 + signum)