1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
import os,sys
from pathlib import Path
from collections import deque, defaultdict
import argparse
from yaml import load, YAMLError
try:
from yaml import CLoader as Loader
except ImportError:
from yaml import Loader
SOPS_TYPES = frozenset({'kms', 'gcp_kms', 'azure_kv', 'hc_vault', 'age', 'pgp'})
class BooleanAction(argparse.Action):
def __init__(self, option_strings, dest, nargs=None, **kwargs):
super(BooleanAction, self).__init__(option_strings, dest, nargs=0, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
setattr(namespace, self.dest, False if option_string.startswith('--no') else True)
def main():
default_base = os.getenv('SOPS_INVENTORY_BASE', default=[])
if default_base:
default_base = Path(default_base)
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--list-files', '--no-list-files', action=BooleanAction, default=False, help='Only list sops files')
parser.add_argument('path', metavar='PATH', nargs='?' if default_base else None, type=Path, default=default_base, help='Base directory to take inventory of')
args = parser.parse_args()
inventory = defaultdict(set)
queue = deque([args.path])
while queue:
baseDir = queue.popleft()
for child in baseDir.iterdir():
if child.is_dir():
queue.append(child)
else:
try:
with child.open(mode='r') as fh:
yaml = load(fh, Loader=Loader)
if not yaml:
raise ValueError('Could not parse YAML')
if not isinstance(yaml, dict) or not 'sops' in yaml:
raise ValueError('Did not find "sops" key')
sops = yaml['sops']
key_info = set()
for k in SOPS_TYPES:
if k in sops:
v = sops[k]
if not v:
continue
match k:
case 'pgp':
for r in v:
key_info.add(r['fp'])
case 'age':
for r in v:
key_info.add(r['recipient'])
case _:
raise NotImplementedError
inventory[frozenset(key_info)].add(child.relative_to(args.path))
except (YAMLError, ValueError) as e:
pass
if not args.list_files:
for keys, files in inventory.items():
print(','.join(keys) + ':')
for file in files:
print(' - ' + str(file))
else:
for _, files in inventory.items():
for file in files:
print(file)
if __name__ == '__main__':
os.exit(main())
|