1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
import os,sys
from pathlib import Path
from collections import deque, defaultdict
import argparse
import re
import subprocess
from operator import attrgetter, itemgetter
from itertools import chain
from yaml import load, YAMLError
try:
from yaml import CLoader as Loader
except ImportError:
from yaml import Loader
SOPS_TYPES = frozenset({'kms', 'gcp_kms', 'azure_kv', 'hc_vault', 'age', 'pgp'})
def readnull(fh):
buffer = b''
while True:
chunk = fh.read(4096)
buffer += chunk
if not buffer:
break
while True:
lines = buffer.split(b'\0', maxsplit=1)
match lines:
case [l, r]:
buffer = r
yield l
case _:
if not chunk:
yield buffer
break
class BooleanAction(argparse.Action):
def __init__(self, option_strings, dest, nargs=None, **kwargs):
super(BooleanAction, self).__init__(option_strings, dest, nargs=0, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
setattr(namespace, self.dest, False if option_string.startswith('--no') or re.match('^-[A-Z]$', option_string) else True)
def sops_files(path):
with subprocess.Popen(['git', '-C', path, 'ls-files', '-z'], stdin=subprocess.DEVNULL, stdout=subprocess.PIPE) as proc:
files = sorted(map(lambda child: path / child.decode('utf-8').strip(), readnull(proc.stdout)), key=attrgetter('parts'))
for child in files:
try:
with child.open(mode='rb') as fh:
yaml = load(fh, Loader=Loader)
if not yaml:
raise ValueError('Could not parse YAML')
if not isinstance(yaml, dict) or not 'sops' in yaml:
raise ValueError('Did not find "sops" key')
sops = yaml['sops']
key_info = set()
for k in SOPS_TYPES:
if k in sops:
v = sops[k]
if not v:
continue
match k:
case 'pgp':
for r in v:
key_info.add(r['fp'])
case 'age':
for r in v:
key_info.add(r['recipient'])
case _:
raise NotImplementedError
yield (frozenset(key_info), child.relative_to(path))
except (YAMLError, ValueError) as e:
pass
proc.wait(timeout=1)
if proc.returncode != 0:
raise RuntimeError(f'git ls-files returned with {proc.returncode}')
def main():
parser = argparse.ArgumentParser(prog='sops-inventory', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--list-files', '--no-list-files', '-l', '-L', action=BooleanAction, default=False, help='Only list sops files')
parser.add_argument('path', metavar='PATH', nargs='?', type=Path, default=Path('.'), help='Base directory to take inventory of')
args = parser.parse_args()
if not args.list_files:
inventory = defaultdict(list)
for key_info, path in sops_files(args.path):
inventory[key_info].append(path)
for keys, files in sorted(inventory.items(), key=lambda kv: sorted(kv[0])):
print(', '.join(sorted(keys)) + ':')
for file in files:
print(' - ' + str(file))
else:
for _, file in sorted(sops_files(args.path), key=lambda kv: kv[1].parts):
print(file)
if __name__ == '__main__':
os.exit(main())
|