summaryrefslogtreecommitdiff
path: root/tools/sops-inventory/sops_inventory/__main__.py
blob: 2ee1b91d09d58df44df6b5d817df1cd833062b32 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import os,sys

from pathlib import Path
from collections import deque, defaultdict

import argparse

import subprocess

from operator import attrgetter, itemgetter

from yaml import load, YAMLError
try:
    from yaml import CLoader as Loader
except ImportError:
    from yaml import Loader

SOPS_TYPES = frozenset({'kms', 'gcp_kms', 'azure_kv', 'hc_vault', 'age', 'pgp'})


def readnull(fh):
    buffer = b''

    while True:
        chunk = fh.read(4096)
        buffer += chunk
        if not buffer:
            break

        while True:
            lines = buffer.split(b'\0', maxsplit=1)
            match lines:
              case [l, r]:
                buffer = r
                yield l
              case _:
                if not chunk:
                    yield buffer
                break

class BooleanAction(argparse.Action):
    def __init__(self, option_strings, dest, nargs=None, **kwargs):
        super(BooleanAction, self).__init__(option_strings, dest, nargs=0, **kwargs)

    def __call__(self, parser, namespace, values, option_string=None):
        setattr(namespace, self.dest, False if option_string.startswith('--no') else True)


def main():
    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument('--list-files', '--no-list-files', action=BooleanAction, default=False, help='Only list sops files')
    parser.add_argument('path', metavar='PATH', nargs='?', type=Path, default=Path('.'), help='Base directory to take inventory of')
    args = parser.parse_args()

    inventory = defaultdict(list)

    with subprocess.Popen(['git', '-C', args.path, 'ls-files', '-z'], stdin=subprocess.DEVNULL, stdout=subprocess.PIPE) as proc:
        files = sorted(map(lambda child: args.path / child.decode('utf-8').strip(), readnull(proc.stdout)), key=attrgetter('parts'))
        for child in files:
            try:
                with child.open(mode='r') as fh:
                    yaml = load(fh, Loader=Loader)
                    if not yaml:
                        raise ValueError('Could not parse YAML')
                    if not isinstance(yaml, dict) or not 'sops' in yaml:
                        raise ValueError('Did not find "sops" key')
                    sops = yaml['sops']

                    key_info = set()
                    for k in SOPS_TYPES:
                        if k in sops:
                            v = sops[k]
                            if not v:
                                continue

                            match k:
                                case 'pgp':
                                  for r in v:
                                      key_info.add(r['fp'])
                                case 'age':
                                  for r in v:
                                      key_info.add(r['recipient'])
                                case _:
                                  raise NotImplementedError
                    inventory[frozenset(key_info)].append(child.relative_to(args.path))
            except (YAMLError, ValueError) as e:
                pass

        proc.wait(timeout=1)
        if proc.returncode != 0:
            raise RuntimeError(f'git ls-files returned with {proc.returncode}')

    if not args.list_files:
        for keys, files in sorted(inventory.items(), key=itemgetter(0)):
            print(', '.join(sorted(keys)) + ':')
            for file in files:
                print('  - ' + str(file))
    else:
        for _, files in inventory.items():
            for file in files:
                print(file)

if __name__ == '__main__':
    os.exit(main())