summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xhosts/vidhar/borg/copy.py291
-rwxr-xr-xhosts/vidhar/borg/copy/copy_borg/__main__.py556
-rw-r--r--hosts/vidhar/borg/copy/setup.py10
-rw-r--r--hosts/vidhar/borg/default.nix54
-rw-r--r--modules/borgsnap/borgsnap/borgsnap/__main__.py6
5 files changed, 594 insertions, 323 deletions
diff --git a/hosts/vidhar/borg/copy.py b/hosts/vidhar/borg/copy.py
deleted file mode 100755
index c648c4f6..00000000
--- a/hosts/vidhar/borg/copy.py
+++ /dev/null
@@ -1,291 +0,0 @@
1#!@python@/bin/python
2
3import json
4import os
5import subprocess
6import re
7import sys
8from sys import stderr
9from humanize import naturalsize
10
11from tempfile import TemporaryDirectory
12
13from datetime import (datetime, timedelta)
14from dateutil.tz import (tzlocal, tzutc)
15import dateutil.parser
16import argparse
17
18from tqdm import tqdm
19
20from xdg import xdg_runtime_dir
21import pathlib
22
23import unshare
24from pyprctl import cap_permitted, cap_inheritable, cap_effective, cap_ambient, Cap
25from pwd import getpwnam
26
27import signal
28from time import sleep
29
30from halo import Halo
31
32from collections import deque
33
34
35parser = argparse.ArgumentParser()
36parser.add_argument('source', metavar='REPO_OR_ARCHIVE')
37parser.add_argument('target', metavar='REPO_OR_ARCHIVE')
38args = parser.parse_args()
39
40halo_args = {
41 'stream': stderr,
42 'enabled': stderr.isatty(),
43 'spinner': 'arc'
44}
45
46borg_pwd = getpwnam('borg')
47
48def as_borg(caps=set(), cwd=None):
49 if caps:
50 cap_permitted.add(*caps)
51 cap_inheritable.add(*caps)
52 cap_effective.add(*caps)
53 cap_ambient.add(*caps)
54
55 os.setgid(borg_pwd.pw_gid)
56 os.setuid(borg_pwd.pw_uid)
57
58 if cwd is not None:
59 os.chdir(cwd)
60
61def read_repo(path):
62 with Halo(text=f'Listing {path}', **halo_args) as sp:
63 res = None
64 with subprocess.Popen(['borg', 'list', '--info', '--lock-wait=600', '--json', path], stdout=subprocess.PIPE, preexec_fn=lambda: as_borg()) as proc:
65 res = json.load(proc.stdout)['archives']
66 if sp.enabled:
67 sp.succeed(f'{len(res)} archives in {path}')
68 else:
69 print(f'{len(res)} archives in {path}', file=stderr)
70 return res
71
72class ToSync:
73 to_sync = deque()
74
75 def __iter__(self):
76 return self
77
78 def __next__(self):
79 if self.to_sync:
80 return self.to_sync.popleft()
81
82 while True:
83 try:
84 src = read_repo(args.source)
85 dst = read_repo(args.target)
86 except (subprocess.CalledProcessError, json.decoder.JSONDecodeError) as err:
87 print(err, file=stderr)
88 continue
89
90 self.to_sync.extend([entry for entry in src if entry['name'] not in {dst_entry['name'] for dst_entry in dst} and not entry['name'].endswith('.checkpoint')])
91
92 if self.to_sync:
93 return self.to_sync.popleft()
94
95 raise StopIteration
96
97def copy_archive(src_repo_path, dst_repo_path, entry):
98 cache_suffix = None
99 with Halo(text=f'Determine archive parameters', **halo_args) as sp:
100 match = re.compile('^(.*)-[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(\.(checkpoint|recreate)(\.[0-9]+)?)?').fullmatch(entry['name'])
101 if match:
102 repo_id = None
103 with subprocess.Popen(['borg', 'info', '--info', '--lock-wait=600', '--json', src_repo_path], stdout=subprocess.PIPE, preexec_fn=lambda: as_borg()) as proc:
104 repo_id = json.load(proc.stdout)['repository']['id']
105 if repo_id:
106 cache_suffix = f'{repo_id}_{match.group(1)}'
107 if sp.enabled:
108 sp.succeed(f'Will process {entry["name"]} ({dateutil.parser.isoparse(entry["start"])}, cache_suffix={cache_suffix})')
109 else:
110 print(f'Will process {entry["name"]} ({dateutil.parser.isoparse(entry["start"])}, cache_suffix={cache_suffix})', file=stderr)
111 with TemporaryDirectory(prefix=f'borg-mount_{entry["name"]}_', dir=os.environ.get('RUNTIME_DIRECTORY')) as tmpdir:
112 child = os.fork()
113 if child == 0:
114 # print('unshare/chroot', file=stderr)
115 unshare.unshare(unshare.CLONE_NEWNS)
116 subprocess.run(['mount', '--make-rprivate', '/'], check=True)
117 chroot = pathlib.Path(tmpdir) / 'chroot'
118 upper = pathlib.Path(tmpdir) / 'upper'
119 work = pathlib.Path(tmpdir) / 'work'
120 for path in [chroot,upper,work]:
121 path.mkdir()
122 subprocess.run(['mount', '-t', 'overlay', 'overlay', '-o', f'lowerdir=/,upperdir={upper},workdir={work}', chroot], check=True)
123 bindMounts = ['nix', 'run', 'run/secrets.d', 'run/wrappers', 'proc', 'dev', 'sys', pathlib.Path(os.path.expanduser('~')).relative_to('/')]
124 if os.environ.get('BORG_BASE_DIR'):
125 bindMounts.append(pathlib.Path(os.environ['BORG_BASE_DIR']).relative_to('/'))
126 if not ":" in src_repo_path:
127 bindMounts.append(pathlib.Path(src_repo_path).relative_to('/'))
128 if 'SSH_AUTH_SOCK' in os.environ:
129 bindMounts.append(pathlib.Path(os.environ['SSH_AUTH_SOCK']).parent.relative_to('/'))
130 for bindMount in bindMounts:
131 (chroot / bindMount).mkdir(parents=True,exist_ok=True)
132 # print(*['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], file=stderr)
133 subprocess.run(['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], check=True)
134 os.chroot(chroot)
135 os.chdir('/')
136 try:
137 os.unlink('/etc/fuse.conf')
138 except FileNotFoundError:
139 pass
140 pathlib.Path('/etc/fuse.conf').parent.mkdir(parents=True,exist_ok=True)
141 with open('/etc/fuse.conf', 'w') as fuse_conf:
142 fuse_conf.write('user_allow_other\nmount_max = 1000\n')
143 dir = pathlib.Path('/borg')
144 dir.mkdir(parents=True,exist_ok=True,mode=0o0750)
145 os.chown(dir, borg_pwd.pw_uid, borg_pwd.pw_gid)
146
147 total_size = None
148 total_files = None
149 if stderr.isatty():
150 with Halo(text=f'Determine size', **halo_args) as sp:
151 with subprocess.Popen(['borg', 'info', '--info', '--json', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}'], stdout=subprocess.PIPE, text=True, preexec_fn=lambda: as_borg()) as proc:
152 stats = json.load(proc.stdout)['archives'][0]['stats']
153 total_size = stats['original_size']
154 total_files = stats['nfiles']
155 if sp.enabled:
156 sp.succeed(f'{total_files} files, {naturalsize(total_size, binary=True)}')
157 else:
158 print(f'{total_files} files, {naturalsize(total_size, binary=True)}', file=stderr)
159 # print(f'Mounting to {dir}', file=stderr)
160 with subprocess.Popen(['borg', 'mount', '-o', 'allow_other,ignore_permissions', '--foreground', '--progress', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}', dir], preexec_fn=lambda: as_borg()) as mount_proc:
161 with Halo(text='Waiting for mount', **halo_args) as sp:
162 wait_start = datetime.now()
163 while True:
164 if os.path.ismount(dir):
165 break
166 elif datetime.now() - wait_start > timedelta(minutes=15):
167 ret.check_returncode()
168 sleep(0.1)
169 if sp.enabled:
170 sp.succeed('Mounted')
171 else:
172 print('Mounted', file=stderr)
173 while True:
174 with tqdm(total=total_size, unit_scale=True, unit_divisor=1024, unit='B', smoothing=0.01, disable=None, dynamic_ncols=True, maxinterval=0.5, miniters=1) as progress:
175 seen = 0
176 env = os.environ.copy()
177 create_args = ['borg',
178 'create',
179 '--lock-wait=600',
180 '--one-file-system',
181 '--compression=auto,zstd,10',
182 '--chunker-params=10,23,16,4095',
183 '--files-cache=ctime,size',
184 '--show-rc',
185 '--upload-buffer=100',
186 '--upload-ratelimit=20480',
187 '--log-json',
188 '--progress',
189 '--list',
190 '--filter=AMEi-x?',
191 '--stats'
192 ]
193 archive_time = datetime.strptime(entry["time"], "%Y-%m-%dT%H:%M:%S.%f").replace(tzinfo=tzlocal()).astimezone(tzutc())
194 create_args += [f'--timestamp={archive_time.strftime("%Y-%m-%dT%H:%M:%S")}']
195 if cache_suffix:
196 env['BORG_FILES_CACHE_SUFFIX'] = cache_suffix
197 else:
198 create_args += ['--files-cache=disabled']
199 create_args += [f'{dst_repo_path}::{entry["name"]}', '.']
200
201 with subprocess.Popen(create_args, stdin=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True, env=env, preexec_fn=lambda: as_borg(caps={Cap.DAC_READ_SEARCH}, cwd=dir)) as proc:
202 last_list = None
203 last_list_time = None
204 for line in proc.stderr:
205 try:
206 json_line = json.loads(line)
207 except json.decoder.JSONDecodeError:
208 tqdm.write(line)
209 continue
210
211 t = ''
212 if 'time' in json_line and stderr.isatty():
213 ts = datetime.fromtimestamp(json_line['time']).replace(tzinfo=tzlocal())
214 t = f'{ts.isoformat(timespec="minutes")} '
215 if json_line['type'] == 'archive_progress':
216 if last_list_time is None or ((datetime.now() - last_list_time) // timedelta(seconds=3)) % 2 == 1:
217 if 'path' in json_line and json_line['path']:
218 progress.set_description(f'ā€¦ {json_line["path"]}', refresh=False)
219 else:
220 progress.set_description(None, refresh=False)
221 elif last_list is not None:
222 progress.set_description(last_list, refresh=False)
223 nfiles=json_line["nfiles"]
224 if total_files is not None:
225 nfiles=f'{json_line["nfiles"]}/{total_files}'
226 progress.set_postfix(compressed=naturalsize(json_line['compressed_size'], binary=True), deduplicated=naturalsize(json_line['deduplicated_size'], binary=True), nfiles=nfiles, refresh=False)
227 progress.update(json_line["original_size"] - seen)
228 seen = json_line["original_size"]
229 elif json_line['type'] == 'file_status':
230 # tqdm.write(t + f'{json_line["status"]} {json_line["path"]}')
231 last_list = f'{json_line["status"]} {json_line["path"]}'
232 last_list_time = datetime.now()
233 progress.set_description(last_list, refresh=False)
234 if not stderr.isatty():
235 print(last_list, file=stderr)
236 elif (json_line['type'] == 'log_message' or json_line['type'] == 'progress_message' or json_line['type'] == 'progress_percent') and ('message' in json_line or 'msgid' in json_line):
237 if 'message' in json_line:
238 tqdm.write(t + json_line['message'])
239 elif 'msgid' in json_line:
240 tqdm.write(t + json_line['msgid'])
241 else:
242 tqdm.write(t + line)
243 progress.set_description(None)
244 if proc.wait() != 0:
245 dst = None
246 try:
247 dst = read_repo(args.target)
248 except (subprocess.CalledProcessError, json.decoder.JSONDecodeError) as err:
249 print(err, file=stderr)
250 continue
251 else:
252 if any(map(lambda other: entry['name'] == other['name'], dst)):
253 break
254
255 continue
256 mount_proc.terminate()
257 os._exit(0)
258 else:
259 while True:
260 waitpid, waitret = os.wait()
261 if waitret != 0:
262 sys.exit(waitret)
263 if waitpid == child:
264 break
265
266def sigterm(signum, frame):
267 raise SystemExit(128 + signum)
268
269def main():
270 signal.signal(signal.SIGTERM, sigterm)
271
272 if "::" in args.source:
273 (src_repo_path, _, src_archive) = args.source.partition("::")
274 entry = None
275 for candidate_entry in read_repo(src_repo_path):
276 if entry['name'] != src_archive:
277 continue
278 entry = candidate_entry
279 break
280
281 if entry is None:
282 print("Did not find archive", file=stderr)
283 os.exit(1)
284
285 copy_archive(src_repo_path, args.target, entry)
286 else:
287 for entry in ToSync():
288 copy_archive(args.source, args.target, entry)
289
290if __name__ == "__main__":
291 sys.exit(main())
diff --git a/hosts/vidhar/borg/copy/copy_borg/__main__.py b/hosts/vidhar/borg/copy/copy_borg/__main__.py
new file mode 100755
index 00000000..5b374d99
--- /dev/null
+++ b/hosts/vidhar/borg/copy/copy_borg/__main__.py
@@ -0,0 +1,556 @@
1#!@python@/bin/python
2
3import json
4import os
5import subprocess
6import re
7import sys
8import io
9from sys import stderr
10from humanize import naturalsize
11
12from tempfile import TemporaryDirectory
13
14from datetime import (datetime, timedelta)
15from dateutil.tz import (tzlocal, tzutc)
16import dateutil.parser
17import argparse
18
19from tqdm import tqdm
20
21from xdg import xdg_runtime_dir
22import pathlib
23
24import unshare
25from pyprctl import CapState, Cap, cap_ambient_raise, cap_ambient_is_set, set_keepcaps
26from pwd import getpwnam
27
28import logging
29
30import signal
31import time
32import math
33
34from halo import Halo
35
36from collections import deque
37
38import select
39import fcntl
40
41from multiprocessing import Process, Manager
42from contextlib import closing
43
44
45halo_args = {
46 'stream': stderr,
47 'enabled': stderr.isatty(),
48 'spinner': 'arc'
49}
50
51borg_pwd = getpwnam('borg')
52
53def as_borg(caps=set()):
54 global logger
55
56 try:
57 if caps:
58 c_state = CapState.get_current()
59 c_state.permitted.add(*caps)
60 c_state.set_current()
61
62 # logger.debug("before setgid/setuid: cap_permitted=%s", CapState.get_current().permitted)
63
64 set_keepcaps(True)
65
66 os.setgid(borg_pwd.pw_gid)
67 os.setuid(borg_pwd.pw_uid)
68
69 if caps:
70 # logger.debug("after setgid/setuid: cap_permitted=%s", CapState.get_current().permitted)
71
72 c_state = CapState.get_current()
73 c_state.permitted = caps.copy()
74 c_state.inheritable.add(*caps)
75 c_state.set_current()
76
77 # logger.debug("cap_permitted=%s", CapState.get_current().permitted)
78 # logger.debug("cap_inheritable=%s", CapState.get_current().inheritable)
79
80 for cap in caps:
81 cap_ambient_raise(cap)
82 # logger.debug("cap_ambient[%s]=%s", cap, cap_ambient_is_set(cap))
83 except Exception:
84 logger.error(format_exc())
85 raise
86
87def borg_json(*args, **kwargs):
88 global logger
89
90 with subprocess.Popen(*args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, **kwargs) as proc:
91 stdout_buffer = io.BytesIO()
92
93 proc_logger = logger.getChild('borg')
94 stdout_logger = proc_logger.getChild('stdout')
95 stderr_logger = proc_logger.getChild('stderr')
96
97 fcntl.fcntl(proc.stdout.fileno(), fcntl.F_SETFL, fcntl.fcntl(proc.stdout.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK)
98 fcntl.fcntl(proc.stderr.fileno(), fcntl.F_SETFL, fcntl.fcntl(proc.stderr.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK)
99
100 poll = select.poll()
101 poll.register(proc.stdout, select.POLLIN | select.POLLHUP)
102 poll.register(proc.stderr, select.POLLIN | select.POLLHUP)
103 pollc = 2
104 events = poll.poll()
105 stderr_linebuf = bytearray()
106
107 while pollc > 0 and len(events) > 0:
108 for rfd, event in events:
109 if event & select.POLLIN:
110 if rfd == proc.stdout.fileno():
111 try:
112 buf = os.read(proc.stdout.fileno(), 8192)
113 # stdout_logger.debug(buf)
114 stdout_buffer.write(buf)
115 except BlockingIOError:
116 pass
117 if rfd == proc.stderr.fileno():
118 try:
119 stderr_linebuf.extend(os.read(proc.stderr.fileno(), 8192))
120 except BlockingIOError:
121 pass
122
123 while stderr_linebuf:
124 line, sep, stderr_linebuf = stderr_linebuf.partition(b'\n')
125 if not sep:
126 stderr_linebuf = line
127 break
128
129 stderr_logger.info(line.decode())
130 if event == select.POLLHUP:
131 poll.unregister(rfd)
132 pollc -= 1
133
134 if pollc > 0:
135 events = poll.poll()
136
137 for handler in proc_logger.handlers:
138 handler.flush()
139
140 ret = proc.wait()
141 if ret != 0:
142 raise Exception(f'borg subprocess exited with returncode {ret}')
143
144 stdout_buffer.seek(0)
145 return json.load(stdout_buffer)
146
147def read_repo(path):
148 global logger
149
150 with Halo(text=f'Listing {path}', **halo_args) as sp:
151 if not sp.enabled:
152 logger.debug('Listing %s...', path)
153 res = borg_json(['borg', 'list', '--info', '--lock-wait=600', '--json', path], preexec_fn=lambda: as_borg())['archives']
154 if sp.enabled:
155 sp.succeed(f'{len(res)} archives in {path}')
156 else:
157 logger.info('%d archives in ā€˜%sā€™', len(res), path)
158 return res
159
160class ToSync:
161 to_sync = deque()
162
163 def __init__(self, source, target):
164 self.source = source
165 self.target = target
166
167 def __iter__(self):
168 return self
169
170 def __next__(self):
171 global logger
172
173 if self.to_sync:
174 return self.to_sync.popleft()
175
176 while True:
177 try:
178 src = read_repo(self.source)
179 dst = read_repo(self.target)
180 except (subprocess.CalledProcessError, json.decoder.JSONDecodeError) as err:
181 logger.error(err)
182 continue
183
184 self.to_sync.extend([entry for entry in src if entry['name'] not in {dst_entry['name'] for dst_entry in dst} and not entry['name'].endswith('.checkpoint')])
185
186 if self.to_sync:
187 return self.to_sync.popleft()
188
189 raise StopIteration
190
191def copy_archive(src_repo_path, dst_repo_path, entry):
192 global logger
193
194 def do_copy(tmpdir_q):
195 global logger
196
197 nonlocal src_repo_path, dst_repo_path, entry
198
199 tmpdir = tmpdir_q.get()
200
201 cache_suffix = None
202 with Halo(text=f'Determine archive parameters', **halo_args) as sp:
203 if not sp.enabled:
204 logger.debug('Determining archive parameters...')
205 match = re.compile('^(.*)-[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(\.(checkpoint|recreate)(\.[0-9]+)?)?').fullmatch(entry['name'])
206 if match:
207 repo_id = borg_json(['borg', 'info', '--info', '--lock-wait=600', '--json', src_repo_path], preexec_fn=lambda: as_borg())['repository']['id']
208
209 if repo_id:
210 cache_suffix = f'{repo_id}_{match.group(1)}'
211 if sp.enabled:
212 sp.succeed(f'Will process {entry["name"]} ({dateutil.parser.isoparse(entry["start"])}, cache_suffix={cache_suffix})')
213 else:
214 logger.info('Will process ā€˜%sā€™ (%s, cache_suffix=%s)', entry['name'], dateutil.parser.isoparse(entry['start']), cache_suffix)
215
216 logger.debug('Setting up environment...')
217 unshare.unshare(unshare.CLONE_NEWNS)
218 subprocess.run(['mount', '--make-rprivate', '/'], check=True)
219 chroot = pathlib.Path(tmpdir) / 'chroot'
220 upper = pathlib.Path(tmpdir) / 'upper'
221 work = pathlib.Path(tmpdir) / 'work'
222 for path in [chroot,upper,work]:
223 path.mkdir()
224 subprocess.run(['mount', '-t', 'overlay', 'overlay', '-o', f'lowerdir=/,upperdir={upper},workdir={work}', chroot], check=True)
225 bindMounts = ['nix', 'run', 'run/secrets.d', 'run/wrappers', 'proc', 'dev', 'sys', pathlib.Path(os.path.expanduser('~')).relative_to('/')]
226 if os.environ.get('BORG_BASE_DIR'):
227 bindMounts.append(pathlib.Path(os.environ['BORG_BASE_DIR']).relative_to('/'))
228 if not ":" in src_repo_path:
229 bindMounts.append(pathlib.Path(src_repo_path).relative_to('/'))
230 if 'SSH_AUTH_SOCK' in os.environ:
231 bindMounts.append(pathlib.Path(os.environ['SSH_AUTH_SOCK']).parent.relative_to('/'))
232 for bindMount in bindMounts:
233 (chroot / bindMount).mkdir(parents=True,exist_ok=True)
234 subprocess.run(['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], check=True)
235 os.chroot(chroot)
236 os.chdir('/')
237 try:
238 os.unlink('/etc/fuse.conf')
239 except FileNotFoundError:
240 pass
241 pathlib.Path('/etc/fuse.conf').parent.mkdir(parents=True,exist_ok=True)
242 with open('/etc/fuse.conf', 'w') as fuse_conf:
243 fuse_conf.write('user_allow_other\nmount_max = 1000\n')
244 dir = pathlib.Path('/borg')
245 dir.mkdir(parents=True,exist_ok=True,mode=0o0750)
246 os.chown(dir, borg_pwd.pw_uid, borg_pwd.pw_gid)
247
248 total_size = None
249 total_files = None
250 if stderr.isatty():
251 with Halo(text=f'Determine size', **halo_args) as sp:
252 stats = borg_json(['borg', 'info', '--info', '--json', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}'], preexec_fn=lambda: as_borg())['archives'][0]['stats']
253 total_size = stats['original_size']
254 total_files = stats['nfiles']
255 if sp.enabled:
256 sp.succeed(f'{total_files} files, {naturalsize(total_size, binary=True)}')
257 else:
258 logger.info('%d files, %s', total_files, naturalsize(total_size, binary=True))
259 with subprocess.Popen(['borg', 'mount', '-o', 'allow_other,ignore_permissions', '--foreground', '--progress', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}', dir], preexec_fn=lambda: as_borg()) as mount_proc:
260 with Halo(text='Waiting for mount', **halo_args) as sp:
261 if not sp.enabled:
262 logger.debug('Waiting for mount...')
263 wait_start = datetime.now()
264 while True:
265 if os.path.ismount(dir):
266 break
267 elif datetime.now() - wait_start > timedelta(minutes=15):
268 ret.check_returncode()
269 time.sleep(0.1)
270 if sp.enabled:
271 sp.succeed('Mounted')
272 else:
273 logger.info('Mounted %s', f'{src_repo_path}::{entry["name"]}')
274
275 while True:
276 with tqdm(total=total_size, unit_scale=True, unit_divisor=1024, unit='B', smoothing=0.01, disable=None, dynamic_ncols=True, maxinterval=0.5, miniters=1) as progress:
277 seen = 0
278 env = os.environ.copy()
279 create_args = ['borg',
280 'create',
281 '--lock-wait=600',
282 '--one-file-system',
283 '--compression=auto,zstd,10',
284 '--chunker-params=10,23,16,4095',
285 '--files-cache=ctime,size',
286 '--show-rc',
287 '--upload-buffer=100',
288 '--upload-ratelimit=20480',
289 '--log-json',
290 '--progress',
291 '--list',
292 '--filter=AMEi-x?',
293 '--stats'
294 ]
295 archive_time = datetime.strptime(entry["time"], "%Y-%m-%dT%H:%M:%S.%f").replace(tzinfo=tzlocal()).astimezone(tzutc())
296 create_args += [f'--timestamp={archive_time.strftime("%Y-%m-%dT%H:%M:%S")}']
297 if cache_suffix:
298 env['BORG_FILES_CACHE_SUFFIX'] = cache_suffix
299 else:
300 create_args += ['--files-cache=disabled']
301 create_args += [f'{dst_repo_path}::{entry["name"]}', '.']
302
303 with subprocess.Popen(create_args, stdin=subprocess.DEVNULL, stderr=subprocess.PIPE, stdout=subprocess.PIPE, env=env, preexec_fn=lambda: as_borg(caps={Cap.DAC_READ_SEARCH}), cwd=dir) as proc:
304 last_list = None
305 last_list_time = time.monotonic_ns()
306 logger.info('Creating...')
307
308 proc_logger = logger.getChild('borg')
309 stdout_logger = proc_logger.getChild('stdout')
310 stderr_logger = proc_logger.getChild('stderr')
311
312 fcntl.fcntl(proc.stdout.fileno(), fcntl.F_SETFL, fcntl.fcntl(proc.stdout.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK)
313 fcntl.fcntl(proc.stderr.fileno(), fcntl.F_SETFL, fcntl.fcntl(proc.stderr.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK)
314
315 poll = select.poll()
316 poll.register(proc.stdout, select.POLLIN | select.POLLHUP)
317 poll.register(proc.stderr, select.POLLIN | select.POLLHUP)
318 pollc = 2
319 events = poll.poll()
320 stdout_linebuf = bytearray()
321 stderr_linebuf = bytearray()
322
323 while pollc > 0 and len(events) > 0:
324 # logger.debug('%d events', len(events))
325 for rfd, event in events:
326 # logger.debug('event %s', event)
327 if event & select.POLLIN:
328 if rfd == proc.stdout.fileno():
329 try:
330 # logger.debug('reading stdout...')
331 stdout_linebuf.extend(os.read(proc.stdout.fileno(), 8192))
332 # logger.debug('read stdout, len(stdout_linebuf)=%d', len(stdout_linebuf))
333 except BlockingIOError:
334 pass
335
336 while stdout_linebuf:
337 # logger.debug('stdout line...')
338 line, sep, stdout_linebuf = stdout_linebuf.partition(b'\n')
339 if not sep:
340 stdout_linebuf = line
341 break
342
343 stdout_logger.info(line.decode())
344 # logger.debug('handled stdout lines, %d leftover', len(stdout_linebuf))
345 if rfd == proc.stderr.fileno():
346 try:
347 # logger.debug('reading stderr...')
348 stderr_linebuf.extend(os.read(proc.stderr.fileno(), 8192))
349 # logger.debug('read stderr, len(stderr_linebuf)=%d', len(stderr_linebuf))
350 except BlockingIOError:
351 pass
352
353 while stderr_linebuf:
354 # logger.debug('stderr line...')
355 line, sep, stderr_linebuf = stderr_linebuf.partition(b'\n')
356 if not sep:
357 stderr_linebuf = line
358 break
359
360 try:
361 json_line = json.loads(line)
362 except json.decoder.JSONDecodeError:
363 if progress.disable:
364 stderr_logger.error(line.decode())
365 else:
366 tqdm.write(line.decode())
367 continue
368
369 # logger.debug('stderr line decoded: %s', json_line['type'] if 'type' in json_line else None)
370
371 t = ''
372 if 'time' in json_line and not progress.disable:
373 ts = datetime.fromtimestamp(json_line['time']).replace(tzinfo=tzlocal())
374 t = f'{ts.isoformat(timespec="minutes")} '
375 if json_line['type'] == 'archive_progress' and not progress.disable:
376 now = time.monotonic_ns()
377 if last_list_time is None or now - last_list_time >= 3e9:
378 last_list_time = now
379 if 'path' in json_line and json_line['path']:
380 progress.set_description(f'ā€¦ {json_line["path"]}', refresh=False)
381 else:
382 progress.set_description(None, refresh=False)
383 elif last_list is not None:
384 progress.set_description(last_list, refresh=False)
385 nfiles=json_line["nfiles"]
386 if total_files is not None:
387 nfiles=f'{json_line["nfiles"]}/{total_files}'
388 progress.set_postfix(compressed=naturalsize(json_line['compressed_size'], binary=True), deduplicated=naturalsize(json_line['deduplicated_size'], binary=True), nfiles=nfiles, refresh=False)
389 progress.update(json_line["original_size"] - seen)
390 seen = json_line["original_size"]
391 elif json_line['type'] == 'archive_progress':
392 now = time.monotonic_ns()
393 if last_list_time is None or now - last_list_time >= 3e9:
394 last_list_time = now
395 if 'path' in json_line and json_line['path']:
396 stderr_logger.debug('ā€¦ %s (%s)', json_line["path"], naturalsize(json_line["original_size"]))
397 else:
398 stderr_logger.debug('ā€¦ (%s)', naturalsize(json_line["original_size"]))
399 elif json_line['type'] == 'file_status':
400 # tqdm.write(t + f'{json_line["status"]} {json_line["path"]}')
401 last_list = f'{json_line["status"]} {json_line["path"]}'
402 last_list_time = time.monotonic_ns()
403 progress.set_description(last_list, refresh=False)
404 if progress.disable:
405 stderr_logger.info(last_list)
406 elif (json_line['type'] == 'log_message' or json_line['type'] == 'progress_message' or json_line['type'] == 'progress_percent') and ('message' in json_line or 'msgid' in json_line):
407 if 'message' in json_line:
408 if progress.disable:
409 stderr_logger.info(t + json_line['message'])
410 else:
411 tqdm.write(t + json_line['message'])
412 elif 'msgid' in json_line:
413 if progress.disable:
414 stderr_logger.info(t + json_line['msgid'])
415 else:
416 tqdm.write(t + json_line['msgid'])
417 else:
418 if progress.disable:
419 stderr_logger.info(t + line.decode())
420 else:
421 tqdm.write(t + line.decode())
422 # logger.debug('handled stderr lines, %d leftover', len(stderr_linebuf))
423 if event == select.POLLHUP:
424 poll.unregister(rfd)
425 pollc -= 1
426
427 if pollc > 0:
428 # logger.debug('polling %d fds...', pollc)
429 events = poll.poll()
430 # logger.debug('done polling')
431
432 # logger.debug('borg create closed stdout/stderr')
433 if stdout_linebuf:
434 logger.error('unterminated line leftover in stdout: %s', stdout_linebuf)
435 if stderr_linebuf:
436 logger.error('unterminated line leftover in stdout: %s', stderr_linebuf)
437 progress.set_description(None)
438 ret = proc.wait()
439 # logger.debug('borg create terminated; ret=%d', ret)
440 if ret != 0:
441 dst = None
442 try:
443 dst = read_repo(dst_repo_path)
444 except (subprocess.CalledProcessError, json.decoder.JSONDecodeError) as err:
445 logger.error(err)
446 continue
447 else:
448 if any(map(lambda other: entry['name'] == other['name'], dst)):
449 logger.info('destination exists, terminating')
450 break
451
452 logger.warn('destination does not exist, retrying')
453 continue
454 else:
455 # logger.debug('terminating')
456 break
457 mount_proc.terminate()
458
459 with Manager() as manager:
460 tmpdir_q = manager.Queue(1)
461
462 with closing(Process(target=do_copy, args=(tmpdir_q,), name='do_copy')) as p:
463 p.start()
464
465 with TemporaryDirectory(prefix=f'borg-mount_{entry["name"]}_', dir=os.environ.get('RUNTIME_DIRECTORY')) as tmpdir:
466 tmpdir_q.put(tmpdir)
467 p.join()
468 return p.exitcode
469
470def sigterm(signum, frame):
471 raise SystemExit(128 + signum)
472
473def main():
474 signal.signal(signal.SIGTERM, sigterm)
475
476 global logger
477 logger = logging.getLogger(__name__)
478 console_handler = logging.StreamHandler()
479 console_handler.setFormatter( logging.Formatter('[%(levelname)s](%(name)s): %(message)s') )
480 if sys.stderr.isatty():
481 console_handler.setFormatter( logging.Formatter('%(asctime)s [%(levelname)s](%(name)s): %(message)s') )
482
483 burst_max = 1000
484 burst = burst_max
485 last_use = None
486 inv_rate = 1e7
487 def consume_filter(record):
488 nonlocal burst, burst_max, inv_rate, last_use
489
490 delay = None
491 while True:
492 now = time.monotonic_ns()
493 burst = min(burst_max, burst + math.floor((now - last_use) / inv_rate)) if last_use else burst_max
494 last_use = now
495
496 if burst > 0:
497 burst -= 1
498 if delay:
499 delay = now - delay
500
501 return True
502
503 if delay is None:
504 delay = now
505 time.sleep(inv_rate / 1e9)
506 console_handler.addFilter(consume_filter)
507
508 logging.getLogger().addHandler(console_handler)
509
510 # log uncaught exceptions
511 def log_exceptions(type, value, tb):
512 global logger
513
514 logger.error(value)
515 sys.__excepthook__(type, value, tb) # calls default excepthook
516
517 sys.excepthook = log_exceptions
518
519 parser = argparse.ArgumentParser(prog='copy')
520 parser.add_argument('--verbosity', dest='log_level', action='append', type=int)
521 parser.add_argument('--verbose', '-v', dest='log_level', action='append_const', const=1)
522 parser.add_argument('--quiet', '-q', dest='log_level', action='append_const', const=-1)
523 parser.add_argument('source', metavar='REPO_OR_ARCHIVE')
524 parser.add_argument('target', metavar='REPO_OR_ARCHIVE')
525 args = parser.parse_args()
526
527
528 LOG_LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL]
529 DEFAULT_LOG_LEVEL = logging.ERROR
530 log_level = LOG_LEVELS.index(DEFAULT_LOG_LEVEL)
531
532 for adjustment in args.log_level or ():
533 log_level = min(len(LOG_LEVELS) - 1, max(log_level - adjustment, 0))
534 logger.setLevel(LOG_LEVELS[log_level])
535
536
537 if "::" in args.source:
538 (src_repo_path, _, src_archive) = args.source.partition("::")
539 entry = None
540 for candidate_entry in read_repo(src_repo_path):
541 if entry['name'] != src_archive:
542 continue
543 entry = candidate_entry
544 break
545
546 if entry is None:
547 logger.critical("Did not find archive ā€˜%sā€™", src_archive)
548 os.exit(1)
549
550 copy_archive(src_repo_path, args.target, entry)
551 else:
552 for entry in ToSync(args.source, args.target):
553 copy_archive(args.source, args.target, entry)
554
555if __name__ == "__main__":
556 sys.exit(main())
diff --git a/hosts/vidhar/borg/copy/setup.py b/hosts/vidhar/borg/copy/setup.py
new file mode 100644
index 00000000..f77d9560
--- /dev/null
+++ b/hosts/vidhar/borg/copy/setup.py
@@ -0,0 +1,10 @@
1from setuptools import setup
2
3setup(name='copy_borg',
4 packages=['copy_borg'],
5 entry_points={
6 'console_scripts': [
7 'copy_borg=copy_borg.__main__:main',
8 ],
9 }
10)
diff --git a/hosts/vidhar/borg/default.nix b/hosts/vidhar/borg/default.nix
index 8d0b46ef..7672de18 100644
--- a/hosts/vidhar/borg/default.nix
+++ b/hosts/vidhar/borg/default.nix
@@ -26,7 +26,7 @@ let
26 in nameValuePair serviceName { 26 in nameValuePair serviceName {
27 serviceConfig = { 27 serviceConfig = {
28 Type = "oneshot"; 28 Type = "oneshot";
29 ExecStart = "${copyBorg}/bin/copy ${escapeShellArg repo} yggdrasil.borgbase:repo"; 29 ExecStart = "${copyBorg}/bin/copy_borg --verbosity 3 ${escapeShellArg repo} yggdrasil.borgbase:repo";
30 TimeoutStartSec = "8h"; 30 TimeoutStartSec = "8h";
31 # User = "borg"; 31 # User = "borg";
32 # Group = "borg"; 32 # Group = "borg";
@@ -43,40 +43,38 @@ let
43 "BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK=yes" 43 "BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK=yes"
44 "BORG_HOSTNAME_IS_UNIQUE=yes" 44 "BORG_HOSTNAME_IS_UNIQUE=yes"
45 ]; 45 ];
46
47 LogRateLimitIntervalSec = 0;
46 }; 48 };
47 }; 49 };
48 50
49 copyBorg = pkgs.stdenv.mkDerivation (let 51 copyBorg = flakeInputs.mach-nix.lib.${config.nixpkgs.system}.buildPythonPackage rec {
50 packageOverrides = pkgs.callPackage ./pyprctl-packages.nix {}; 52 pname = "copy-borg";
51 inpPython = pkgs.python39.override { inherit packageOverrides; }; 53 src = ./copy;
52 in rec { 54 version = "0.0.0";
53 name = "copy"; 55 ignoreDataOutdated = true;
54 src = ./copy.py; 56
55 57 requirements = ''
56 phases = ["buildPhase" "checkPhase" "installPhase"]; 58 humanize
57 59 tqdm
58 buildInputs = with pkgs; [makeWrapper]; 60 python-dateutil
59 61 xdg
60 python = inpPython.withPackages (ps: with ps; [humanize tqdm python-dateutil xdg python-unshare pyprctl halo]); 62 python-unshare
61 63 pyprctl
62 buildPhase = '' 64 halo
63 substitute $src copy \
64 --subst-var-by python ${escapeShellArg python}
65 ''; 65 '';
66 66 postInstall = ''
67 doCheck = true; 67 wrapProgram $out/bin/copy_borg \
68 checkPhase = '' 68 --prefix PATH : ${makeBinPath (with pkgs; [util-linux borgbackup])}:${config.security.wrapperDir}
69 ${python}/bin/python -m py_compile copy
70 ''; 69 '';
71 70
72 installPhase = '' 71 providers.python-unshare = "nixpkgs";
73 install -m 0755 -D -t $out/bin \ 72 overridesPre = [
74 copy 73 (self: super: { python-unshare = super.python-unshare.overrideAttrs (oldAttrs: { name = "python-unshare-0.2.1"; version = "0.2.1"; }); })
74 ];
75 75
76 wrapProgram $out/bin/copy \ 76 # _.tomli.buildInputs.add = with pkgs."python3Packages"; [ flit-core ];
77 --prefix PATH : ${makeBinPath (with pkgs; [util-linux borgbackup])}:${config.security.wrapperDir} 77 };
78 '';
79 });
80in { 78in {
81 config = { 79 config = {
82 services.borgsnap = { 80 services.borgsnap = {
diff --git a/modules/borgsnap/borgsnap/borgsnap/__main__.py b/modules/borgsnap/borgsnap/borgsnap/__main__.py
index 80bf511d..91144780 100644
--- a/modules/borgsnap/borgsnap/borgsnap/__main__.py
+++ b/modules/borgsnap/borgsnap/borgsnap/__main__.py
@@ -274,12 +274,10 @@ def create(*, snapshot, target, archive_prefix, dry_run):
274 for rfd, event in events: 274 for rfd, event in events:
275 if event & select.POLLIN: 275 if event & select.POLLIN:
276 if rfd == proc.stdout.fileno(): 276 if rfd == proc.stdout.fileno():
277 line = proc.stdout.readline() 277 if line := proc.stdout.readline():
278 if len(line) > 0:
279 stdout_logger.info(line[:-1]) 278 stdout_logger.info(line[:-1])
280 if rfd == proc.stderr.fileno(): 279 if rfd == proc.stderr.fileno():
281 line = proc.stderr.readline() 280 if line := proc.stderr.readline():
282 if len(line) > 0:
283 stderr_logger.info(line[:-1]) 281 stderr_logger.info(line[:-1])
284 if event & select.POLLHUP: 282 if event & select.POLLHUP:
285 poll.unregister(rfd) 283 poll.unregister(rfd)