1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
import os,sys
from pathlib import Path
from collections import deque, defaultdict
import argparse
import subprocess
from operator import attrgetter, itemgetter
from yaml import load, YAMLError
try:
from yaml import CLoader as Loader
except ImportError:
from yaml import Loader
SOPS_TYPES = frozenset({'kms', 'gcp_kms', 'azure_kv', 'hc_vault', 'age', 'pgp'})
def readnull(fh):
buffer = b''
while True:
chunk = fh.read(4096)
buffer += chunk
if not buffer:
break
while True:
lines = buffer.split(b'\0', maxsplit=1)
match lines:
case [l, r]:
buffer = r
yield l
case _:
if not chunk:
yield buffer
break
class BooleanAction(argparse.Action):
def __init__(self, option_strings, dest, nargs=None, **kwargs):
super(BooleanAction, self).__init__(option_strings, dest, nargs=0, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
setattr(namespace, self.dest, False if option_string.startswith('--no') else True)
def main():
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--list-files', '--no-list-files', action=BooleanAction, default=False, help='Only list sops files')
parser.add_argument('path', metavar='PATH', nargs='?', type=Path, default=Path('.'), help='Base directory to take inventory of')
args = parser.parse_args()
inventory = defaultdict(list)
with subprocess.Popen(['git', '-C', args.path, 'ls-files', '-z'], stdin=subprocess.DEVNULL, stdout=subprocess.PIPE) as proc:
files = sorted(map(lambda child: args.path / child.decode('utf-8').strip(), readnull(proc.stdout)), key=attrgetter('parts'))
for child in files:
try:
with child.open(mode='r') as fh:
yaml = load(fh, Loader=Loader)
if not yaml:
raise ValueError('Could not parse YAML')
if not isinstance(yaml, dict) or not 'sops' in yaml:
raise ValueError('Did not find "sops" key')
sops = yaml['sops']
key_info = set()
for k in SOPS_TYPES:
if k in sops:
v = sops[k]
if not v:
continue
match k:
case 'pgp':
for r in v:
key_info.add(r['fp'])
case 'age':
for r in v:
key_info.add(r['recipient'])
case _:
raise NotImplementedError
inventory[frozenset(key_info)].append(child.relative_to(args.path))
except (YAMLError, ValueError) as e:
pass
proc.wait(timeout=1)
if proc.returncode != 0:
raise RuntimeError(f'git ls-files returned with {proc.returncode}')
if not args.list_files:
for keys, files in sorted(inventory.items(), key=itemgetter(0)):
print(','.join(sorted(keys)) + ':')
for file in files:
print(' - ' + str(file))
else:
for _, files in inventory.items():
for file in files:
print(file)
if __name__ == '__main__':
os.exit(main())
|