diff options
author | Gregor Kleen <gkleen@yggdrasil.li> | 2022-11-02 16:31:02 +0100 |
---|---|---|
committer | Gregor Kleen <gkleen@yggdrasil.li> | 2022-11-02 16:31:02 +0100 |
commit | 6bafcb244bd6cd031ac9b65fce8a2a939698ecaa (patch) | |
tree | ad5346d15e226f227e9a9b111c5894782885b79a | |
parent | 7f5cf6c9bbc830ab830275cb71cc1e6229d79afd (diff) | |
download | nixos-6bafcb244bd6cd031ac9b65fce8a2a939698ecaa.tar nixos-6bafcb244bd6cd031ac9b65fce8a2a939698ecaa.tar.gz nixos-6bafcb244bd6cd031ac9b65fce8a2a939698ecaa.tar.bz2 nixos-6bafcb244bd6cd031ac9b65fce8a2a939698ecaa.tar.xz nixos-6bafcb244bd6cd031ac9b65fce8a2a939698ecaa.zip |
...
-rwxr-xr-x | hosts/vidhar/borg/copy.py | 291 | ||||
-rwxr-xr-x | hosts/vidhar/borg/copy/copy_borg/__main__.py | 556 | ||||
-rw-r--r-- | hosts/vidhar/borg/copy/setup.py | 10 | ||||
-rw-r--r-- | hosts/vidhar/borg/default.nix | 54 | ||||
-rw-r--r-- | modules/borgsnap/borgsnap/borgsnap/__main__.py | 6 |
5 files changed, 594 insertions, 323 deletions
diff --git a/hosts/vidhar/borg/copy.py b/hosts/vidhar/borg/copy.py deleted file mode 100755 index c648c4f6..00000000 --- a/hosts/vidhar/borg/copy.py +++ /dev/null | |||
@@ -1,291 +0,0 @@ | |||
1 | #!@python@/bin/python | ||
2 | |||
3 | import json | ||
4 | import os | ||
5 | import subprocess | ||
6 | import re | ||
7 | import sys | ||
8 | from sys import stderr | ||
9 | from humanize import naturalsize | ||
10 | |||
11 | from tempfile import TemporaryDirectory | ||
12 | |||
13 | from datetime import (datetime, timedelta) | ||
14 | from dateutil.tz import (tzlocal, tzutc) | ||
15 | import dateutil.parser | ||
16 | import argparse | ||
17 | |||
18 | from tqdm import tqdm | ||
19 | |||
20 | from xdg import xdg_runtime_dir | ||
21 | import pathlib | ||
22 | |||
23 | import unshare | ||
24 | from pyprctl import cap_permitted, cap_inheritable, cap_effective, cap_ambient, Cap | ||
25 | from pwd import getpwnam | ||
26 | |||
27 | import signal | ||
28 | from time import sleep | ||
29 | |||
30 | from halo import Halo | ||
31 | |||
32 | from collections import deque | ||
33 | |||
34 | |||
35 | parser = argparse.ArgumentParser() | ||
36 | parser.add_argument('source', metavar='REPO_OR_ARCHIVE') | ||
37 | parser.add_argument('target', metavar='REPO_OR_ARCHIVE') | ||
38 | args = parser.parse_args() | ||
39 | |||
40 | halo_args = { | ||
41 | 'stream': stderr, | ||
42 | 'enabled': stderr.isatty(), | ||
43 | 'spinner': 'arc' | ||
44 | } | ||
45 | |||
46 | borg_pwd = getpwnam('borg') | ||
47 | |||
48 | def as_borg(caps=set(), cwd=None): | ||
49 | if caps: | ||
50 | cap_permitted.add(*caps) | ||
51 | cap_inheritable.add(*caps) | ||
52 | cap_effective.add(*caps) | ||
53 | cap_ambient.add(*caps) | ||
54 | |||
55 | os.setgid(borg_pwd.pw_gid) | ||
56 | os.setuid(borg_pwd.pw_uid) | ||
57 | |||
58 | if cwd is not None: | ||
59 | os.chdir(cwd) | ||
60 | |||
61 | def read_repo(path): | ||
62 | with Halo(text=f'Listing {path}', **halo_args) as sp: | ||
63 | res = None | ||
64 | with subprocess.Popen(['borg', 'list', '--info', '--lock-wait=600', '--json', path], stdout=subprocess.PIPE, preexec_fn=lambda: as_borg()) as proc: | ||
65 | res = json.load(proc.stdout)['archives'] | ||
66 | if sp.enabled: | ||
67 | sp.succeed(f'{len(res)} archives in {path}') | ||
68 | else: | ||
69 | print(f'{len(res)} archives in {path}', file=stderr) | ||
70 | return res | ||
71 | |||
72 | class ToSync: | ||
73 | to_sync = deque() | ||
74 | |||
75 | def __iter__(self): | ||
76 | return self | ||
77 | |||
78 | def __next__(self): | ||
79 | if self.to_sync: | ||
80 | return self.to_sync.popleft() | ||
81 | |||
82 | while True: | ||
83 | try: | ||
84 | src = read_repo(args.source) | ||
85 | dst = read_repo(args.target) | ||
86 | except (subprocess.CalledProcessError, json.decoder.JSONDecodeError) as err: | ||
87 | print(err, file=stderr) | ||
88 | continue | ||
89 | |||
90 | self.to_sync.extend([entry for entry in src if entry['name'] not in {dst_entry['name'] for dst_entry in dst} and not entry['name'].endswith('.checkpoint')]) | ||
91 | |||
92 | if self.to_sync: | ||
93 | return self.to_sync.popleft() | ||
94 | |||
95 | raise StopIteration | ||
96 | |||
97 | def copy_archive(src_repo_path, dst_repo_path, entry): | ||
98 | cache_suffix = None | ||
99 | with Halo(text=f'Determine archive parameters', **halo_args) as sp: | ||
100 | match = re.compile('^(.*)-[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(\.(checkpoint|recreate)(\.[0-9]+)?)?').fullmatch(entry['name']) | ||
101 | if match: | ||
102 | repo_id = None | ||
103 | with subprocess.Popen(['borg', 'info', '--info', '--lock-wait=600', '--json', src_repo_path], stdout=subprocess.PIPE, preexec_fn=lambda: as_borg()) as proc: | ||
104 | repo_id = json.load(proc.stdout)['repository']['id'] | ||
105 | if repo_id: | ||
106 | cache_suffix = f'{repo_id}_{match.group(1)}' | ||
107 | if sp.enabled: | ||
108 | sp.succeed(f'Will process {entry["name"]} ({dateutil.parser.isoparse(entry["start"])}, cache_suffix={cache_suffix})') | ||
109 | else: | ||
110 | print(f'Will process {entry["name"]} ({dateutil.parser.isoparse(entry["start"])}, cache_suffix={cache_suffix})', file=stderr) | ||
111 | with TemporaryDirectory(prefix=f'borg-mount_{entry["name"]}_', dir=os.environ.get('RUNTIME_DIRECTORY')) as tmpdir: | ||
112 | child = os.fork() | ||
113 | if child == 0: | ||
114 | # print('unshare/chroot', file=stderr) | ||
115 | unshare.unshare(unshare.CLONE_NEWNS) | ||
116 | subprocess.run(['mount', '--make-rprivate', '/'], check=True) | ||
117 | chroot = pathlib.Path(tmpdir) / 'chroot' | ||
118 | upper = pathlib.Path(tmpdir) / 'upper' | ||
119 | work = pathlib.Path(tmpdir) / 'work' | ||
120 | for path in [chroot,upper,work]: | ||
121 | path.mkdir() | ||
122 | subprocess.run(['mount', '-t', 'overlay', 'overlay', '-o', f'lowerdir=/,upperdir={upper},workdir={work}', chroot], check=True) | ||
123 | bindMounts = ['nix', 'run', 'run/secrets.d', 'run/wrappers', 'proc', 'dev', 'sys', pathlib.Path(os.path.expanduser('~')).relative_to('/')] | ||
124 | if os.environ.get('BORG_BASE_DIR'): | ||
125 | bindMounts.append(pathlib.Path(os.environ['BORG_BASE_DIR']).relative_to('/')) | ||
126 | if not ":" in src_repo_path: | ||
127 | bindMounts.append(pathlib.Path(src_repo_path).relative_to('/')) | ||
128 | if 'SSH_AUTH_SOCK' in os.environ: | ||
129 | bindMounts.append(pathlib.Path(os.environ['SSH_AUTH_SOCK']).parent.relative_to('/')) | ||
130 | for bindMount in bindMounts: | ||
131 | (chroot / bindMount).mkdir(parents=True,exist_ok=True) | ||
132 | # print(*['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], file=stderr) | ||
133 | subprocess.run(['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], check=True) | ||
134 | os.chroot(chroot) | ||
135 | os.chdir('/') | ||
136 | try: | ||
137 | os.unlink('/etc/fuse.conf') | ||
138 | except FileNotFoundError: | ||
139 | pass | ||
140 | pathlib.Path('/etc/fuse.conf').parent.mkdir(parents=True,exist_ok=True) | ||
141 | with open('/etc/fuse.conf', 'w') as fuse_conf: | ||
142 | fuse_conf.write('user_allow_other\nmount_max = 1000\n') | ||
143 | dir = pathlib.Path('/borg') | ||
144 | dir.mkdir(parents=True,exist_ok=True,mode=0o0750) | ||
145 | os.chown(dir, borg_pwd.pw_uid, borg_pwd.pw_gid) | ||
146 | |||
147 | total_size = None | ||
148 | total_files = None | ||
149 | if stderr.isatty(): | ||
150 | with Halo(text=f'Determine size', **halo_args) as sp: | ||
151 | with subprocess.Popen(['borg', 'info', '--info', '--json', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}'], stdout=subprocess.PIPE, text=True, preexec_fn=lambda: as_borg()) as proc: | ||
152 | stats = json.load(proc.stdout)['archives'][0]['stats'] | ||
153 | total_size = stats['original_size'] | ||
154 | total_files = stats['nfiles'] | ||
155 | if sp.enabled: | ||
156 | sp.succeed(f'{total_files} files, {naturalsize(total_size, binary=True)}') | ||
157 | else: | ||
158 | print(f'{total_files} files, {naturalsize(total_size, binary=True)}', file=stderr) | ||
159 | # print(f'Mounting to {dir}', file=stderr) | ||
160 | with subprocess.Popen(['borg', 'mount', '-o', 'allow_other,ignore_permissions', '--foreground', '--progress', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}', dir], preexec_fn=lambda: as_borg()) as mount_proc: | ||
161 | with Halo(text='Waiting for mount', **halo_args) as sp: | ||
162 | wait_start = datetime.now() | ||
163 | while True: | ||
164 | if os.path.ismount(dir): | ||
165 | break | ||
166 | elif datetime.now() - wait_start > timedelta(minutes=15): | ||
167 | ret.check_returncode() | ||
168 | sleep(0.1) | ||
169 | if sp.enabled: | ||
170 | sp.succeed('Mounted') | ||
171 | else: | ||
172 | print('Mounted', file=stderr) | ||
173 | while True: | ||
174 | with tqdm(total=total_size, unit_scale=True, unit_divisor=1024, unit='B', smoothing=0.01, disable=None, dynamic_ncols=True, maxinterval=0.5, miniters=1) as progress: | ||
175 | seen = 0 | ||
176 | env = os.environ.copy() | ||
177 | create_args = ['borg', | ||
178 | 'create', | ||
179 | '--lock-wait=600', | ||
180 | '--one-file-system', | ||
181 | '--compression=auto,zstd,10', | ||
182 | '--chunker-params=10,23,16,4095', | ||
183 | '--files-cache=ctime,size', | ||
184 | '--show-rc', | ||
185 | '--upload-buffer=100', | ||
186 | '--upload-ratelimit=20480', | ||
187 | '--log-json', | ||
188 | '--progress', | ||
189 | '--list', | ||
190 | '--filter=AMEi-x?', | ||
191 | '--stats' | ||
192 | ] | ||
193 | archive_time = datetime.strptime(entry["time"], "%Y-%m-%dT%H:%M:%S.%f").replace(tzinfo=tzlocal()).astimezone(tzutc()) | ||
194 | create_args += [f'--timestamp={archive_time.strftime("%Y-%m-%dT%H:%M:%S")}'] | ||
195 | if cache_suffix: | ||
196 | env['BORG_FILES_CACHE_SUFFIX'] = cache_suffix | ||
197 | else: | ||
198 | create_args += ['--files-cache=disabled'] | ||
199 | create_args += [f'{dst_repo_path}::{entry["name"]}', '.'] | ||
200 | |||
201 | with subprocess.Popen(create_args, stdin=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True, env=env, preexec_fn=lambda: as_borg(caps={Cap.DAC_READ_SEARCH}, cwd=dir)) as proc: | ||
202 | last_list = None | ||
203 | last_list_time = None | ||
204 | for line in proc.stderr: | ||
205 | try: | ||
206 | json_line = json.loads(line) | ||
207 | except json.decoder.JSONDecodeError: | ||
208 | tqdm.write(line) | ||
209 | continue | ||
210 | |||
211 | t = '' | ||
212 | if 'time' in json_line and stderr.isatty(): | ||
213 | ts = datetime.fromtimestamp(json_line['time']).replace(tzinfo=tzlocal()) | ||
214 | t = f'{ts.isoformat(timespec="minutes")} ' | ||
215 | if json_line['type'] == 'archive_progress': | ||
216 | if last_list_time is None or ((datetime.now() - last_list_time) // timedelta(seconds=3)) % 2 == 1: | ||
217 | if 'path' in json_line and json_line['path']: | ||
218 | progress.set_description(f'ā¦ {json_line["path"]}', refresh=False) | ||
219 | else: | ||
220 | progress.set_description(None, refresh=False) | ||
221 | elif last_list is not None: | ||
222 | progress.set_description(last_list, refresh=False) | ||
223 | nfiles=json_line["nfiles"] | ||
224 | if total_files is not None: | ||
225 | nfiles=f'{json_line["nfiles"]}/{total_files}' | ||
226 | progress.set_postfix(compressed=naturalsize(json_line['compressed_size'], binary=True), deduplicated=naturalsize(json_line['deduplicated_size'], binary=True), nfiles=nfiles, refresh=False) | ||
227 | progress.update(json_line["original_size"] - seen) | ||
228 | seen = json_line["original_size"] | ||
229 | elif json_line['type'] == 'file_status': | ||
230 | # tqdm.write(t + f'{json_line["status"]} {json_line["path"]}') | ||
231 | last_list = f'{json_line["status"]} {json_line["path"]}' | ||
232 | last_list_time = datetime.now() | ||
233 | progress.set_description(last_list, refresh=False) | ||
234 | if not stderr.isatty(): | ||
235 | print(last_list, file=stderr) | ||
236 | elif (json_line['type'] == 'log_message' or json_line['type'] == 'progress_message' or json_line['type'] == 'progress_percent') and ('message' in json_line or 'msgid' in json_line): | ||
237 | if 'message' in json_line: | ||
238 | tqdm.write(t + json_line['message']) | ||
239 | elif 'msgid' in json_line: | ||
240 | tqdm.write(t + json_line['msgid']) | ||
241 | else: | ||
242 | tqdm.write(t + line) | ||
243 | progress.set_description(None) | ||
244 | if proc.wait() != 0: | ||
245 | dst = None | ||
246 | try: | ||
247 | dst = read_repo(args.target) | ||
248 | except (subprocess.CalledProcessError, json.decoder.JSONDecodeError) as err: | ||
249 | print(err, file=stderr) | ||
250 | continue | ||
251 | else: | ||
252 | if any(map(lambda other: entry['name'] == other['name'], dst)): | ||
253 | break | ||
254 | |||
255 | continue | ||
256 | mount_proc.terminate() | ||
257 | os._exit(0) | ||
258 | else: | ||
259 | while True: | ||
260 | waitpid, waitret = os.wait() | ||
261 | if waitret != 0: | ||
262 | sys.exit(waitret) | ||
263 | if waitpid == child: | ||
264 | break | ||
265 | |||
266 | def sigterm(signum, frame): | ||
267 | raise SystemExit(128 + signum) | ||
268 | |||
269 | def main(): | ||
270 | signal.signal(signal.SIGTERM, sigterm) | ||
271 | |||
272 | if "::" in args.source: | ||
273 | (src_repo_path, _, src_archive) = args.source.partition("::") | ||
274 | entry = None | ||
275 | for candidate_entry in read_repo(src_repo_path): | ||
276 | if entry['name'] != src_archive: | ||
277 | continue | ||
278 | entry = candidate_entry | ||
279 | break | ||
280 | |||
281 | if entry is None: | ||
282 | print("Did not find archive", file=stderr) | ||
283 | os.exit(1) | ||
284 | |||
285 | copy_archive(src_repo_path, args.target, entry) | ||
286 | else: | ||
287 | for entry in ToSync(): | ||
288 | copy_archive(args.source, args.target, entry) | ||
289 | |||
290 | if __name__ == "__main__": | ||
291 | sys.exit(main()) | ||
diff --git a/hosts/vidhar/borg/copy/copy_borg/__main__.py b/hosts/vidhar/borg/copy/copy_borg/__main__.py new file mode 100755 index 00000000..5b374d99 --- /dev/null +++ b/hosts/vidhar/borg/copy/copy_borg/__main__.py | |||
@@ -0,0 +1,556 @@ | |||
1 | #!@python@/bin/python | ||
2 | |||
3 | import json | ||
4 | import os | ||
5 | import subprocess | ||
6 | import re | ||
7 | import sys | ||
8 | import io | ||
9 | from sys import stderr | ||
10 | from humanize import naturalsize | ||
11 | |||
12 | from tempfile import TemporaryDirectory | ||
13 | |||
14 | from datetime import (datetime, timedelta) | ||
15 | from dateutil.tz import (tzlocal, tzutc) | ||
16 | import dateutil.parser | ||
17 | import argparse | ||
18 | |||
19 | from tqdm import tqdm | ||
20 | |||
21 | from xdg import xdg_runtime_dir | ||
22 | import pathlib | ||
23 | |||
24 | import unshare | ||
25 | from pyprctl import CapState, Cap, cap_ambient_raise, cap_ambient_is_set, set_keepcaps | ||
26 | from pwd import getpwnam | ||
27 | |||
28 | import logging | ||
29 | |||
30 | import signal | ||
31 | import time | ||
32 | import math | ||
33 | |||
34 | from halo import Halo | ||
35 | |||
36 | from collections import deque | ||
37 | |||
38 | import select | ||
39 | import fcntl | ||
40 | |||
41 | from multiprocessing import Process, Manager | ||
42 | from contextlib import closing | ||
43 | |||
44 | |||
45 | halo_args = { | ||
46 | 'stream': stderr, | ||
47 | 'enabled': stderr.isatty(), | ||
48 | 'spinner': 'arc' | ||
49 | } | ||
50 | |||
51 | borg_pwd = getpwnam('borg') | ||
52 | |||
53 | def as_borg(caps=set()): | ||
54 | global logger | ||
55 | |||
56 | try: | ||
57 | if caps: | ||
58 | c_state = CapState.get_current() | ||
59 | c_state.permitted.add(*caps) | ||
60 | c_state.set_current() | ||
61 | |||
62 | # logger.debug("before setgid/setuid: cap_permitted=%s", CapState.get_current().permitted) | ||
63 | |||
64 | set_keepcaps(True) | ||
65 | |||
66 | os.setgid(borg_pwd.pw_gid) | ||
67 | os.setuid(borg_pwd.pw_uid) | ||
68 | |||
69 | if caps: | ||
70 | # logger.debug("after setgid/setuid: cap_permitted=%s", CapState.get_current().permitted) | ||
71 | |||
72 | c_state = CapState.get_current() | ||
73 | c_state.permitted = caps.copy() | ||
74 | c_state.inheritable.add(*caps) | ||
75 | c_state.set_current() | ||
76 | |||
77 | # logger.debug("cap_permitted=%s", CapState.get_current().permitted) | ||
78 | # logger.debug("cap_inheritable=%s", CapState.get_current().inheritable) | ||
79 | |||
80 | for cap in caps: | ||
81 | cap_ambient_raise(cap) | ||
82 | # logger.debug("cap_ambient[%s]=%s", cap, cap_ambient_is_set(cap)) | ||
83 | except Exception: | ||
84 | logger.error(format_exc()) | ||
85 | raise | ||
86 | |||
87 | def borg_json(*args, **kwargs): | ||
88 | global logger | ||
89 | |||
90 | with subprocess.Popen(*args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, **kwargs) as proc: | ||
91 | stdout_buffer = io.BytesIO() | ||
92 | |||
93 | proc_logger = logger.getChild('borg') | ||
94 | stdout_logger = proc_logger.getChild('stdout') | ||
95 | stderr_logger = proc_logger.getChild('stderr') | ||
96 | |||
97 | fcntl.fcntl(proc.stdout.fileno(), fcntl.F_SETFL, fcntl.fcntl(proc.stdout.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK) | ||
98 | fcntl.fcntl(proc.stderr.fileno(), fcntl.F_SETFL, fcntl.fcntl(proc.stderr.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK) | ||
99 | |||
100 | poll = select.poll() | ||
101 | poll.register(proc.stdout, select.POLLIN | select.POLLHUP) | ||
102 | poll.register(proc.stderr, select.POLLIN | select.POLLHUP) | ||
103 | pollc = 2 | ||
104 | events = poll.poll() | ||
105 | stderr_linebuf = bytearray() | ||
106 | |||
107 | while pollc > 0 and len(events) > 0: | ||
108 | for rfd, event in events: | ||
109 | if event & select.POLLIN: | ||
110 | if rfd == proc.stdout.fileno(): | ||
111 | try: | ||
112 | buf = os.read(proc.stdout.fileno(), 8192) | ||
113 | # stdout_logger.debug(buf) | ||
114 | stdout_buffer.write(buf) | ||
115 | except BlockingIOError: | ||
116 | pass | ||
117 | if rfd == proc.stderr.fileno(): | ||
118 | try: | ||
119 | stderr_linebuf.extend(os.read(proc.stderr.fileno(), 8192)) | ||
120 | except BlockingIOError: | ||
121 | pass | ||
122 | |||
123 | while stderr_linebuf: | ||
124 | line, sep, stderr_linebuf = stderr_linebuf.partition(b'\n') | ||
125 | if not sep: | ||
126 | stderr_linebuf = line | ||
127 | break | ||
128 | |||
129 | stderr_logger.info(line.decode()) | ||
130 | if event == select.POLLHUP: | ||
131 | poll.unregister(rfd) | ||
132 | pollc -= 1 | ||
133 | |||
134 | if pollc > 0: | ||
135 | events = poll.poll() | ||
136 | |||
137 | for handler in proc_logger.handlers: | ||
138 | handler.flush() | ||
139 | |||
140 | ret = proc.wait() | ||
141 | if ret != 0: | ||
142 | raise Exception(f'borg subprocess exited with returncode {ret}') | ||
143 | |||
144 | stdout_buffer.seek(0) | ||
145 | return json.load(stdout_buffer) | ||
146 | |||
147 | def read_repo(path): | ||
148 | global logger | ||
149 | |||
150 | with Halo(text=f'Listing {path}', **halo_args) as sp: | ||
151 | if not sp.enabled: | ||
152 | logger.debug('Listing %s...', path) | ||
153 | res = borg_json(['borg', 'list', '--info', '--lock-wait=600', '--json', path], preexec_fn=lambda: as_borg())['archives'] | ||
154 | if sp.enabled: | ||
155 | sp.succeed(f'{len(res)} archives in {path}') | ||
156 | else: | ||
157 | logger.info('%d archives in ā%sā', len(res), path) | ||
158 | return res | ||
159 | |||
160 | class ToSync: | ||
161 | to_sync = deque() | ||
162 | |||
163 | def __init__(self, source, target): | ||
164 | self.source = source | ||
165 | self.target = target | ||
166 | |||
167 | def __iter__(self): | ||
168 | return self | ||
169 | |||
170 | def __next__(self): | ||
171 | global logger | ||
172 | |||
173 | if self.to_sync: | ||
174 | return self.to_sync.popleft() | ||
175 | |||
176 | while True: | ||
177 | try: | ||
178 | src = read_repo(self.source) | ||
179 | dst = read_repo(self.target) | ||
180 | except (subprocess.CalledProcessError, json.decoder.JSONDecodeError) as err: | ||
181 | logger.error(err) | ||
182 | continue | ||
183 | |||
184 | self.to_sync.extend([entry for entry in src if entry['name'] not in {dst_entry['name'] for dst_entry in dst} and not entry['name'].endswith('.checkpoint')]) | ||
185 | |||
186 | if self.to_sync: | ||
187 | return self.to_sync.popleft() | ||
188 | |||
189 | raise StopIteration | ||
190 | |||
191 | def copy_archive(src_repo_path, dst_repo_path, entry): | ||
192 | global logger | ||
193 | |||
194 | def do_copy(tmpdir_q): | ||
195 | global logger | ||
196 | |||
197 | nonlocal src_repo_path, dst_repo_path, entry | ||
198 | |||
199 | tmpdir = tmpdir_q.get() | ||
200 | |||
201 | cache_suffix = None | ||
202 | with Halo(text=f'Determine archive parameters', **halo_args) as sp: | ||
203 | if not sp.enabled: | ||
204 | logger.debug('Determining archive parameters...') | ||
205 | match = re.compile('^(.*)-[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(\.(checkpoint|recreate)(\.[0-9]+)?)?').fullmatch(entry['name']) | ||
206 | if match: | ||
207 | repo_id = borg_json(['borg', 'info', '--info', '--lock-wait=600', '--json', src_repo_path], preexec_fn=lambda: as_borg())['repository']['id'] | ||
208 | |||
209 | if repo_id: | ||
210 | cache_suffix = f'{repo_id}_{match.group(1)}' | ||
211 | if sp.enabled: | ||
212 | sp.succeed(f'Will process {entry["name"]} ({dateutil.parser.isoparse(entry["start"])}, cache_suffix={cache_suffix})') | ||
213 | else: | ||
214 | logger.info('Will process ā%sā (%s, cache_suffix=%s)', entry['name'], dateutil.parser.isoparse(entry['start']), cache_suffix) | ||
215 | |||
216 | logger.debug('Setting up environment...') | ||
217 | unshare.unshare(unshare.CLONE_NEWNS) | ||
218 | subprocess.run(['mount', '--make-rprivate', '/'], check=True) | ||
219 | chroot = pathlib.Path(tmpdir) / 'chroot' | ||
220 | upper = pathlib.Path(tmpdir) / 'upper' | ||
221 | work = pathlib.Path(tmpdir) / 'work' | ||
222 | for path in [chroot,upper,work]: | ||
223 | path.mkdir() | ||
224 | subprocess.run(['mount', '-t', 'overlay', 'overlay', '-o', f'lowerdir=/,upperdir={upper},workdir={work}', chroot], check=True) | ||
225 | bindMounts = ['nix', 'run', 'run/secrets.d', 'run/wrappers', 'proc', 'dev', 'sys', pathlib.Path(os.path.expanduser('~')).relative_to('/')] | ||
226 | if os.environ.get('BORG_BASE_DIR'): | ||
227 | bindMounts.append(pathlib.Path(os.environ['BORG_BASE_DIR']).relative_to('/')) | ||
228 | if not ":" in src_repo_path: | ||
229 | bindMounts.append(pathlib.Path(src_repo_path).relative_to('/')) | ||
230 | if 'SSH_AUTH_SOCK' in os.environ: | ||
231 | bindMounts.append(pathlib.Path(os.environ['SSH_AUTH_SOCK']).parent.relative_to('/')) | ||
232 | for bindMount in bindMounts: | ||
233 | (chroot / bindMount).mkdir(parents=True,exist_ok=True) | ||
234 | subprocess.run(['mount', '--bind', pathlib.Path('/') / bindMount, chroot / bindMount], check=True) | ||
235 | os.chroot(chroot) | ||
236 | os.chdir('/') | ||
237 | try: | ||
238 | os.unlink('/etc/fuse.conf') | ||
239 | except FileNotFoundError: | ||
240 | pass | ||
241 | pathlib.Path('/etc/fuse.conf').parent.mkdir(parents=True,exist_ok=True) | ||
242 | with open('/etc/fuse.conf', 'w') as fuse_conf: | ||
243 | fuse_conf.write('user_allow_other\nmount_max = 1000\n') | ||
244 | dir = pathlib.Path('/borg') | ||
245 | dir.mkdir(parents=True,exist_ok=True,mode=0o0750) | ||
246 | os.chown(dir, borg_pwd.pw_uid, borg_pwd.pw_gid) | ||
247 | |||
248 | total_size = None | ||
249 | total_files = None | ||
250 | if stderr.isatty(): | ||
251 | with Halo(text=f'Determine size', **halo_args) as sp: | ||
252 | stats = borg_json(['borg', 'info', '--info', '--json', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}'], preexec_fn=lambda: as_borg())['archives'][0]['stats'] | ||
253 | total_size = stats['original_size'] | ||
254 | total_files = stats['nfiles'] | ||
255 | if sp.enabled: | ||
256 | sp.succeed(f'{total_files} files, {naturalsize(total_size, binary=True)}') | ||
257 | else: | ||
258 | logger.info('%d files, %s', total_files, naturalsize(total_size, binary=True)) | ||
259 | with subprocess.Popen(['borg', 'mount', '-o', 'allow_other,ignore_permissions', '--foreground', '--progress', '--lock-wait=600', f'{src_repo_path}::{entry["name"]}', dir], preexec_fn=lambda: as_borg()) as mount_proc: | ||
260 | with Halo(text='Waiting for mount', **halo_args) as sp: | ||
261 | if not sp.enabled: | ||
262 | logger.debug('Waiting for mount...') | ||
263 | wait_start = datetime.now() | ||
264 | while True: | ||
265 | if os.path.ismount(dir): | ||
266 | break | ||
267 | elif datetime.now() - wait_start > timedelta(minutes=15): | ||
268 | ret.check_returncode() | ||
269 | time.sleep(0.1) | ||
270 | if sp.enabled: | ||
271 | sp.succeed('Mounted') | ||
272 | else: | ||
273 | logger.info('Mounted %s', f'{src_repo_path}::{entry["name"]}') | ||
274 | |||
275 | while True: | ||
276 | with tqdm(total=total_size, unit_scale=True, unit_divisor=1024, unit='B', smoothing=0.01, disable=None, dynamic_ncols=True, maxinterval=0.5, miniters=1) as progress: | ||
277 | seen = 0 | ||
278 | env = os.environ.copy() | ||
279 | create_args = ['borg', | ||
280 | 'create', | ||
281 | '--lock-wait=600', | ||
282 | '--one-file-system', | ||
283 | '--compression=auto,zstd,10', | ||
284 | '--chunker-params=10,23,16,4095', | ||
285 | '--files-cache=ctime,size', | ||
286 | '--show-rc', | ||
287 | '--upload-buffer=100', | ||
288 | '--upload-ratelimit=20480', | ||
289 | '--log-json', | ||
290 | '--progress', | ||
291 | '--list', | ||
292 | '--filter=AMEi-x?', | ||
293 | '--stats' | ||
294 | ] | ||
295 | archive_time = datetime.strptime(entry["time"], "%Y-%m-%dT%H:%M:%S.%f").replace(tzinfo=tzlocal()).astimezone(tzutc()) | ||
296 | create_args += [f'--timestamp={archive_time.strftime("%Y-%m-%dT%H:%M:%S")}'] | ||
297 | if cache_suffix: | ||
298 | env['BORG_FILES_CACHE_SUFFIX'] = cache_suffix | ||
299 | else: | ||
300 | create_args += ['--files-cache=disabled'] | ||
301 | create_args += [f'{dst_repo_path}::{entry["name"]}', '.'] | ||
302 | |||
303 | with subprocess.Popen(create_args, stdin=subprocess.DEVNULL, stderr=subprocess.PIPE, stdout=subprocess.PIPE, env=env, preexec_fn=lambda: as_borg(caps={Cap.DAC_READ_SEARCH}), cwd=dir) as proc: | ||
304 | last_list = None | ||
305 | last_list_time = time.monotonic_ns() | ||
306 | logger.info('Creating...') | ||
307 | |||
308 | proc_logger = logger.getChild('borg') | ||
309 | stdout_logger = proc_logger.getChild('stdout') | ||
310 | stderr_logger = proc_logger.getChild('stderr') | ||
311 | |||
312 | fcntl.fcntl(proc.stdout.fileno(), fcntl.F_SETFL, fcntl.fcntl(proc.stdout.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK) | ||
313 | fcntl.fcntl(proc.stderr.fileno(), fcntl.F_SETFL, fcntl.fcntl(proc.stderr.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK) | ||
314 | |||
315 | poll = select.poll() | ||
316 | poll.register(proc.stdout, select.POLLIN | select.POLLHUP) | ||
317 | poll.register(proc.stderr, select.POLLIN | select.POLLHUP) | ||
318 | pollc = 2 | ||
319 | events = poll.poll() | ||
320 | stdout_linebuf = bytearray() | ||
321 | stderr_linebuf = bytearray() | ||
322 | |||
323 | while pollc > 0 and len(events) > 0: | ||
324 | # logger.debug('%d events', len(events)) | ||
325 | for rfd, event in events: | ||
326 | # logger.debug('event %s', event) | ||
327 | if event & select.POLLIN: | ||
328 | if rfd == proc.stdout.fileno(): | ||
329 | try: | ||
330 | # logger.debug('reading stdout...') | ||
331 | stdout_linebuf.extend(os.read(proc.stdout.fileno(), 8192)) | ||
332 | # logger.debug('read stdout, len(stdout_linebuf)=%d', len(stdout_linebuf)) | ||
333 | except BlockingIOError: | ||
334 | pass | ||
335 | |||
336 | while stdout_linebuf: | ||
337 | # logger.debug('stdout line...') | ||
338 | line, sep, stdout_linebuf = stdout_linebuf.partition(b'\n') | ||
339 | if not sep: | ||
340 | stdout_linebuf = line | ||
341 | break | ||
342 | |||
343 | stdout_logger.info(line.decode()) | ||
344 | # logger.debug('handled stdout lines, %d leftover', len(stdout_linebuf)) | ||
345 | if rfd == proc.stderr.fileno(): | ||
346 | try: | ||
347 | # logger.debug('reading stderr...') | ||
348 | stderr_linebuf.extend(os.read(proc.stderr.fileno(), 8192)) | ||
349 | # logger.debug('read stderr, len(stderr_linebuf)=%d', len(stderr_linebuf)) | ||
350 | except BlockingIOError: | ||
351 | pass | ||
352 | |||
353 | while stderr_linebuf: | ||
354 | # logger.debug('stderr line...') | ||
355 | line, sep, stderr_linebuf = stderr_linebuf.partition(b'\n') | ||
356 | if not sep: | ||
357 | stderr_linebuf = line | ||
358 | break | ||
359 | |||
360 | try: | ||
361 | json_line = json.loads(line) | ||
362 | except json.decoder.JSONDecodeError: | ||
363 | if progress.disable: | ||
364 | stderr_logger.error(line.decode()) | ||
365 | else: | ||
366 | tqdm.write(line.decode()) | ||
367 | continue | ||
368 | |||
369 | # logger.debug('stderr line decoded: %s', json_line['type'] if 'type' in json_line else None) | ||
370 | |||
371 | t = '' | ||
372 | if 'time' in json_line and not progress.disable: | ||
373 | ts = datetime.fromtimestamp(json_line['time']).replace(tzinfo=tzlocal()) | ||
374 | t = f'{ts.isoformat(timespec="minutes")} ' | ||
375 | if json_line['type'] == 'archive_progress' and not progress.disable: | ||
376 | now = time.monotonic_ns() | ||
377 | if last_list_time is None or now - last_list_time >= 3e9: | ||
378 | last_list_time = now | ||
379 | if 'path' in json_line and json_line['path']: | ||
380 | progress.set_description(f'ā¦ {json_line["path"]}', refresh=False) | ||
381 | else: | ||
382 | progress.set_description(None, refresh=False) | ||
383 | elif last_list is not None: | ||
384 | progress.set_description(last_list, refresh=False) | ||
385 | nfiles=json_line["nfiles"] | ||
386 | if total_files is not None: | ||
387 | nfiles=f'{json_line["nfiles"]}/{total_files}' | ||
388 | progress.set_postfix(compressed=naturalsize(json_line['compressed_size'], binary=True), deduplicated=naturalsize(json_line['deduplicated_size'], binary=True), nfiles=nfiles, refresh=False) | ||
389 | progress.update(json_line["original_size"] - seen) | ||
390 | seen = json_line["original_size"] | ||
391 | elif json_line['type'] == 'archive_progress': | ||
392 | now = time.monotonic_ns() | ||
393 | if last_list_time is None or now - last_list_time >= 3e9: | ||
394 | last_list_time = now | ||
395 | if 'path' in json_line and json_line['path']: | ||
396 | stderr_logger.debug('ā¦ %s (%s)', json_line["path"], naturalsize(json_line["original_size"])) | ||
397 | else: | ||
398 | stderr_logger.debug('ā¦ (%s)', naturalsize(json_line["original_size"])) | ||
399 | elif json_line['type'] == 'file_status': | ||
400 | # tqdm.write(t + f'{json_line["status"]} {json_line["path"]}') | ||
401 | last_list = f'{json_line["status"]} {json_line["path"]}' | ||
402 | last_list_time = time.monotonic_ns() | ||
403 | progress.set_description(last_list, refresh=False) | ||
404 | if progress.disable: | ||
405 | stderr_logger.info(last_list) | ||
406 | elif (json_line['type'] == 'log_message' or json_line['type'] == 'progress_message' or json_line['type'] == 'progress_percent') and ('message' in json_line or 'msgid' in json_line): | ||
407 | if 'message' in json_line: | ||
408 | if progress.disable: | ||
409 | stderr_logger.info(t + json_line['message']) | ||
410 | else: | ||
411 | tqdm.write(t + json_line['message']) | ||
412 | elif 'msgid' in json_line: | ||
413 | if progress.disable: | ||
414 | stderr_logger.info(t + json_line['msgid']) | ||
415 | else: | ||
416 | tqdm.write(t + json_line['msgid']) | ||
417 | else: | ||
418 | if progress.disable: | ||
419 | stderr_logger.info(t + line.decode()) | ||
420 | else: | ||
421 | tqdm.write(t + line.decode()) | ||
422 | # logger.debug('handled stderr lines, %d leftover', len(stderr_linebuf)) | ||
423 | if event == select.POLLHUP: | ||
424 | poll.unregister(rfd) | ||
425 | pollc -= 1 | ||
426 | |||
427 | if pollc > 0: | ||
428 | # logger.debug('polling %d fds...', pollc) | ||
429 | events = poll.poll() | ||
430 | # logger.debug('done polling') | ||
431 | |||
432 | # logger.debug('borg create closed stdout/stderr') | ||
433 | if stdout_linebuf: | ||
434 | logger.error('unterminated line leftover in stdout: %s', stdout_linebuf) | ||
435 | if stderr_linebuf: | ||
436 | logger.error('unterminated line leftover in stdout: %s', stderr_linebuf) | ||
437 | progress.set_description(None) | ||
438 | ret = proc.wait() | ||
439 | # logger.debug('borg create terminated; ret=%d', ret) | ||
440 | if ret != 0: | ||
441 | dst = None | ||
442 | try: | ||
443 | dst = read_repo(dst_repo_path) | ||
444 | except (subprocess.CalledProcessError, json.decoder.JSONDecodeError) as err: | ||
445 | logger.error(err) | ||
446 | continue | ||
447 | else: | ||
448 | if any(map(lambda other: entry['name'] == other['name'], dst)): | ||
449 | logger.info('destination exists, terminating') | ||
450 | break | ||
451 | |||
452 | logger.warn('destination does not exist, retrying') | ||
453 | continue | ||
454 | else: | ||
455 | # logger.debug('terminating') | ||
456 | break | ||
457 | mount_proc.terminate() | ||
458 | |||
459 | with Manager() as manager: | ||
460 | tmpdir_q = manager.Queue(1) | ||
461 | |||
462 | with closing(Process(target=do_copy, args=(tmpdir_q,), name='do_copy')) as p: | ||
463 | p.start() | ||
464 | |||
465 | with TemporaryDirectory(prefix=f'borg-mount_{entry["name"]}_', dir=os.environ.get('RUNTIME_DIRECTORY')) as tmpdir: | ||
466 | tmpdir_q.put(tmpdir) | ||
467 | p.join() | ||
468 | return p.exitcode | ||
469 | |||
470 | def sigterm(signum, frame): | ||
471 | raise SystemExit(128 + signum) | ||
472 | |||
473 | def main(): | ||
474 | signal.signal(signal.SIGTERM, sigterm) | ||
475 | |||
476 | global logger | ||
477 | logger = logging.getLogger(__name__) | ||
478 | console_handler = logging.StreamHandler() | ||
479 | console_handler.setFormatter( logging.Formatter('[%(levelname)s](%(name)s): %(message)s') ) | ||
480 | if sys.stderr.isatty(): | ||
481 | console_handler.setFormatter( logging.Formatter('%(asctime)s [%(levelname)s](%(name)s): %(message)s') ) | ||
482 | |||
483 | burst_max = 1000 | ||
484 | burst = burst_max | ||
485 | last_use = None | ||
486 | inv_rate = 1e7 | ||
487 | def consume_filter(record): | ||
488 | nonlocal burst, burst_max, inv_rate, last_use | ||
489 | |||
490 | delay = None | ||
491 | while True: | ||
492 | now = time.monotonic_ns() | ||
493 | burst = min(burst_max, burst + math.floor((now - last_use) / inv_rate)) if last_use else burst_max | ||
494 | last_use = now | ||
495 | |||
496 | if burst > 0: | ||
497 | burst -= 1 | ||
498 | if delay: | ||
499 | delay = now - delay | ||
500 | |||
501 | return True | ||
502 | |||
503 | if delay is None: | ||
504 | delay = now | ||
505 | time.sleep(inv_rate / 1e9) | ||
506 | console_handler.addFilter(consume_filter) | ||
507 | |||
508 | logging.getLogger().addHandler(console_handler) | ||
509 | |||
510 | # log uncaught exceptions | ||
511 | def log_exceptions(type, value, tb): | ||
512 | global logger | ||
513 | |||
514 | logger.error(value) | ||
515 | sys.__excepthook__(type, value, tb) # calls default excepthook | ||
516 | |||
517 | sys.excepthook = log_exceptions | ||
518 | |||
519 | parser = argparse.ArgumentParser(prog='copy') | ||
520 | parser.add_argument('--verbosity', dest='log_level', action='append', type=int) | ||
521 | parser.add_argument('--verbose', '-v', dest='log_level', action='append_const', const=1) | ||
522 | parser.add_argument('--quiet', '-q', dest='log_level', action='append_const', const=-1) | ||
523 | parser.add_argument('source', metavar='REPO_OR_ARCHIVE') | ||
524 | parser.add_argument('target', metavar='REPO_OR_ARCHIVE') | ||
525 | args = parser.parse_args() | ||
526 | |||
527 | |||
528 | LOG_LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL] | ||
529 | DEFAULT_LOG_LEVEL = logging.ERROR | ||
530 | log_level = LOG_LEVELS.index(DEFAULT_LOG_LEVEL) | ||
531 | |||
532 | for adjustment in args.log_level or (): | ||
533 | log_level = min(len(LOG_LEVELS) - 1, max(log_level - adjustment, 0)) | ||
534 | logger.setLevel(LOG_LEVELS[log_level]) | ||
535 | |||
536 | |||
537 | if "::" in args.source: | ||
538 | (src_repo_path, _, src_archive) = args.source.partition("::") | ||
539 | entry = None | ||
540 | for candidate_entry in read_repo(src_repo_path): | ||
541 | if entry['name'] != src_archive: | ||
542 | continue | ||
543 | entry = candidate_entry | ||
544 | break | ||
545 | |||
546 | if entry is None: | ||
547 | logger.critical("Did not find archive ā%sā", src_archive) | ||
548 | os.exit(1) | ||
549 | |||
550 | copy_archive(src_repo_path, args.target, entry) | ||
551 | else: | ||
552 | for entry in ToSync(args.source, args.target): | ||
553 | copy_archive(args.source, args.target, entry) | ||
554 | |||
555 | if __name__ == "__main__": | ||
556 | sys.exit(main()) | ||
diff --git a/hosts/vidhar/borg/copy/setup.py b/hosts/vidhar/borg/copy/setup.py new file mode 100644 index 00000000..f77d9560 --- /dev/null +++ b/hosts/vidhar/borg/copy/setup.py | |||
@@ -0,0 +1,10 @@ | |||
1 | from setuptools import setup | ||
2 | |||
3 | setup(name='copy_borg', | ||
4 | packages=['copy_borg'], | ||
5 | entry_points={ | ||
6 | 'console_scripts': [ | ||
7 | 'copy_borg=copy_borg.__main__:main', | ||
8 | ], | ||
9 | } | ||
10 | ) | ||
diff --git a/hosts/vidhar/borg/default.nix b/hosts/vidhar/borg/default.nix index 8d0b46ef..7672de18 100644 --- a/hosts/vidhar/borg/default.nix +++ b/hosts/vidhar/borg/default.nix | |||
@@ -26,7 +26,7 @@ let | |||
26 | in nameValuePair serviceName { | 26 | in nameValuePair serviceName { |
27 | serviceConfig = { | 27 | serviceConfig = { |
28 | Type = "oneshot"; | 28 | Type = "oneshot"; |
29 | ExecStart = "${copyBorg}/bin/copy ${escapeShellArg repo} yggdrasil.borgbase:repo"; | 29 | ExecStart = "${copyBorg}/bin/copy_borg --verbosity 3 ${escapeShellArg repo} yggdrasil.borgbase:repo"; |
30 | TimeoutStartSec = "8h"; | 30 | TimeoutStartSec = "8h"; |
31 | # User = "borg"; | 31 | # User = "borg"; |
32 | # Group = "borg"; | 32 | # Group = "borg"; |
@@ -43,40 +43,38 @@ let | |||
43 | "BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK=yes" | 43 | "BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK=yes" |
44 | "BORG_HOSTNAME_IS_UNIQUE=yes" | 44 | "BORG_HOSTNAME_IS_UNIQUE=yes" |
45 | ]; | 45 | ]; |
46 | |||
47 | LogRateLimitIntervalSec = 0; | ||
46 | }; | 48 | }; |
47 | }; | 49 | }; |
48 | 50 | ||
49 | copyBorg = pkgs.stdenv.mkDerivation (let | 51 | copyBorg = flakeInputs.mach-nix.lib.${config.nixpkgs.system}.buildPythonPackage rec { |
50 | packageOverrides = pkgs.callPackage ./pyprctl-packages.nix {}; | 52 | pname = "copy-borg"; |
51 | inpPython = pkgs.python39.override { inherit packageOverrides; }; | 53 | src = ./copy; |
52 | in rec { | 54 | version = "0.0.0"; |
53 | name = "copy"; | 55 | ignoreDataOutdated = true; |
54 | src = ./copy.py; | 56 | |
55 | 57 | requirements = '' | |
56 | phases = ["buildPhase" "checkPhase" "installPhase"]; | 58 | humanize |
57 | 59 | tqdm | |
58 | buildInputs = with pkgs; [makeWrapper]; | 60 | python-dateutil |
59 | 61 | xdg | |
60 | python = inpPython.withPackages (ps: with ps; [humanize tqdm python-dateutil xdg python-unshare pyprctl halo]); | 62 | python-unshare |
61 | 63 | pyprctl | |
62 | buildPhase = '' | 64 | halo |
63 | substitute $src copy \ | ||
64 | --subst-var-by python ${escapeShellArg python} | ||
65 | ''; | 65 | ''; |
66 | 66 | postInstall = '' | |
67 | doCheck = true; | 67 | wrapProgram $out/bin/copy_borg \ |
68 | checkPhase = '' | 68 | --prefix PATH : ${makeBinPath (with pkgs; [util-linux borgbackup])}:${config.security.wrapperDir} |
69 | ${python}/bin/python -m py_compile copy | ||
70 | ''; | 69 | ''; |
71 | 70 | ||
72 | installPhase = '' | 71 | providers.python-unshare = "nixpkgs"; |
73 | install -m 0755 -D -t $out/bin \ | 72 | overridesPre = [ |
74 | copy | 73 | (self: super: { python-unshare = super.python-unshare.overrideAttrs (oldAttrs: { name = "python-unshare-0.2.1"; version = "0.2.1"; }); }) |
74 | ]; | ||
75 | 75 | ||
76 | wrapProgram $out/bin/copy \ | 76 | # _.tomli.buildInputs.add = with pkgs."python3Packages"; [ flit-core ]; |
77 | --prefix PATH : ${makeBinPath (with pkgs; [util-linux borgbackup])}:${config.security.wrapperDir} | 77 | }; |
78 | ''; | ||
79 | }); | ||
80 | in { | 78 | in { |
81 | config = { | 79 | config = { |
82 | services.borgsnap = { | 80 | services.borgsnap = { |
diff --git a/modules/borgsnap/borgsnap/borgsnap/__main__.py b/modules/borgsnap/borgsnap/borgsnap/__main__.py index 80bf511d..91144780 100644 --- a/modules/borgsnap/borgsnap/borgsnap/__main__.py +++ b/modules/borgsnap/borgsnap/borgsnap/__main__.py | |||
@@ -274,12 +274,10 @@ def create(*, snapshot, target, archive_prefix, dry_run): | |||
274 | for rfd, event in events: | 274 | for rfd, event in events: |
275 | if event & select.POLLIN: | 275 | if event & select.POLLIN: |
276 | if rfd == proc.stdout.fileno(): | 276 | if rfd == proc.stdout.fileno(): |
277 | line = proc.stdout.readline() | 277 | if line := proc.stdout.readline(): |
278 | if len(line) > 0: | ||
279 | stdout_logger.info(line[:-1]) | 278 | stdout_logger.info(line[:-1]) |
280 | if rfd == proc.stderr.fileno(): | 279 | if rfd == proc.stderr.fileno(): |
281 | line = proc.stderr.readline() | 280 | if line := proc.stderr.readline(): |
282 | if len(line) > 0: | ||
283 | stderr_logger.info(line[:-1]) | 281 | stderr_logger.info(line[:-1]) |
284 | if event & select.POLLHUP: | 282 | if event & select.POLLHUP: |
285 | poll.unregister(rfd) | 283 | poll.unregister(rfd) |