summaryrefslogtreecommitdiff
path: root/tools/sops-inventory/sops_inventory/__main__.py
blob: 3f694917f41719c1f487258cfcf719fd63a05d7f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import os,sys

from pathlib import Path
from collections import deque, defaultdict

import argparse

import subprocess

from operator import attrgetter, itemgetter

from itertools import chain

from yaml import load, YAMLError
try:
    from yaml import CLoader as Loader
except ImportError:
    from yaml import Loader

SOPS_TYPES = frozenset({'kms', 'gcp_kms', 'azure_kv', 'hc_vault', 'age', 'pgp'})


def readnull(fh):
    buffer = b''

    while True:
        chunk = fh.read(4096)
        buffer += chunk
        if not buffer:
            break

        while True:
            lines = buffer.split(b'\0', maxsplit=1)
            match lines:
              case [l, r]:
                buffer = r
                yield l
              case _:
                if not chunk:
                    yield buffer
                break

class BooleanAction(argparse.Action):
    def __init__(self, option_strings, dest, nargs=None, **kwargs):
        super(BooleanAction, self).__init__(option_strings, dest, nargs=0, **kwargs)

    def __call__(self, parser, namespace, values, option_string=None):
        setattr(namespace, self.dest, False if option_string.startswith('--no') else True)


def sops_files(path):
    with subprocess.Popen(['git', '-C', path, 'ls-files', '-z'], stdin=subprocess.DEVNULL, stdout=subprocess.PIPE) as proc:
        files = sorted(map(lambda child: path / child.decode('utf-8').strip(), readnull(proc.stdout)), key=attrgetter('parts'))
        for child in files:
            try:
                with child.open(mode='r') as fh:
                    yaml = load(fh, Loader=Loader)
                    if not yaml:
                        raise ValueError('Could not parse YAML')
                    if not isinstance(yaml, dict) or not 'sops' in yaml:
                        raise ValueError('Did not find "sops" key')
                    sops = yaml['sops']

                    key_info = set()
                    for k in SOPS_TYPES:
                        if k in sops:
                            v = sops[k]
                            if not v:
                                continue

                            match k:
                                case 'pgp':
                                  for r in v:
                                      key_info.add(r['fp'])
                                case 'age':
                                  for r in v:
                                      key_info.add(r['recipient'])
                                case _:
                                  raise NotImplementedError
                    yield (frozenset(key_info), child.relative_to(path))
            except (YAMLError, ValueError) as e:
                pass

        proc.wait(timeout=1)
        if proc.returncode != 0:
            raise RuntimeError(f'git ls-files returned with {proc.returncode}')

def main():
    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument('--list-files', '--no-list-files', action=BooleanAction, default=False, help='Only list sops files')
    parser.add_argument('path', metavar='PATH', nargs='?', type=Path, default=Path('.'), help='Base directory to take inventory of')
    args = parser.parse_args()

    if not args.list_files:
        inventory = defaultdict(list)
        for key_info, path in sops_files(args.path):
            inventory[key_info].append(path)

        for keys, files in sorted(inventory.items(), key=lambda kv: sorted(kv[0])):
            print(', '.join(sorted(keys)) + ':')
            for file in files:
                print('  - ' + str(file))
    else:
        for _, file in sorted(sops_files(args.path), key=lambda kv: kv[1].parts):
            print(file)

if __name__ == '__main__':
    os.exit(main())