import os,sys from pathlib import Path from collections import deque, defaultdict import argparse import subprocess from operator import attrgetter, itemgetter from itertools import chain from yaml import load, YAMLError try: from yaml import CLoader as Loader except ImportError: from yaml import Loader SOPS_TYPES = frozenset({'kms', 'gcp_kms', 'azure_kv', 'hc_vault', 'age', 'pgp'}) def readnull(fh): buffer = b'' while True: chunk = fh.read(4096) buffer += chunk if not buffer: break while True: lines = buffer.split(b'\0', maxsplit=1) match lines: case [l, r]: buffer = r yield l case _: if not chunk: yield buffer break class BooleanAction(argparse.Action): def __init__(self, option_strings, dest, nargs=None, **kwargs): super(BooleanAction, self).__init__(option_strings, dest, nargs=0, **kwargs) def __call__(self, parser, namespace, values, option_string=None): setattr(namespace, self.dest, False if option_string.startswith('--no') else True) def sops_files(path): with subprocess.Popen(['git', '-C', path, 'ls-files', '-z'], stdin=subprocess.DEVNULL, stdout=subprocess.PIPE) as proc: files = sorted(map(lambda child: path / child.decode('utf-8').strip(), readnull(proc.stdout)), key=attrgetter('parts')) for child in files: try: with child.open(mode='rb') as fh: yaml = load(fh, Loader=Loader) if not yaml: raise ValueError('Could not parse YAML') if not isinstance(yaml, dict) or not 'sops' in yaml: raise ValueError('Did not find "sops" key') sops = yaml['sops'] key_info = set() for k in SOPS_TYPES: if k in sops: v = sops[k] if not v: continue match k: case 'pgp': for r in v: key_info.add(r['fp']) case 'age': for r in v: key_info.add(r['recipient']) case _: raise NotImplementedError yield (frozenset(key_info), child.relative_to(path)) except (YAMLError, ValueError) as e: pass proc.wait(timeout=1) if proc.returncode != 0: raise RuntimeError(f'git ls-files returned with {proc.returncode}') def main(): parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('--list-files', '--no-list-files', action=BooleanAction, default=False, help='Only list sops files') parser.add_argument('path', metavar='PATH', nargs='?', type=Path, default=Path('.'), help='Base directory to take inventory of') args = parser.parse_args() if not args.list_files: inventory = defaultdict(list) for key_info, path in sops_files(args.path): inventory[key_info].append(path) for keys, files in sorted(inventory.items(), key=lambda kv: sorted(kv[0])): print(', '.join(sorted(keys)) + ':') for file in files: print(' - ' + str(file)) else: for _, file in sorted(sops_files(args.path), key=lambda kv: kv[1].parts): print(file) if __name__ == '__main__': os.exit(main())