import os,sys from pathlib import Path from collections import deque, defaultdict import argparse import re import subprocess from operator import attrgetter, itemgetter from itertools import chain from yaml import load, YAMLError try: from yaml import CLoader as Loader except ImportError: from yaml import Loader SOPS_TYPES = frozenset({'kms', 'gcp_kms', 'azure_kv', 'hc_vault', 'age', 'pgp'}) def readnull(fh): buffer = b'' while True: chunk = fh.read(4096) buffer += chunk if not buffer: break while True: lines = buffer.split(b'\0', maxsplit=1) match lines: case [l, r]: buffer = r yield l case _: if not chunk: yield buffer break class BooleanAction(argparse.Action): def __init__(self, option_strings, dest, nargs=None, **kwargs): super(BooleanAction, self).__init__(option_strings, dest, nargs=0, **kwargs) def __call__(self, parser, namespace, values, option_string=None): setattr(namespace, self.dest, False if option_string.startswith('--no') or re.match('^-[A-Z]$', option_string) else True) def sops_files(path): with subprocess.Popen(['git', '-C', path, 'ls-files', '-z'], stdin=subprocess.DEVNULL, stdout=subprocess.PIPE) as proc: files = sorted(map(lambda child: path / child.decode('utf-8').strip(), readnull(proc.stdout)), key=attrgetter('parts')) for child in files: try: with child.open(mode='rb') as fh: yaml = load(fh, Loader=Loader) if not yaml: raise ValueError('Could not parse YAML') if not isinstance(yaml, dict) or not 'sops' in yaml: raise ValueError('Did not find "sops" key') sops = yaml['sops'] key_info = set() for k in SOPS_TYPES: if k in sops: v = sops[k] if not v: continue match k: case 'pgp': for r in v: key_info.add(r['fp']) case 'age': for r in v: key_info.add(r['recipient']) case _: raise NotImplementedError yield (frozenset(key_info), child.relative_to(path)) except (YAMLError, ValueError) as e: pass proc.wait(timeout=1) if proc.returncode != 0: raise RuntimeError(f'git ls-files returned with {proc.returncode}') def main(): parser = argparse.ArgumentParser(prog='sops-inventory', formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('--list-files', '--no-list-files', '-l', '-L', action=BooleanAction, default=False, help='Only list sops files') parser.add_argument('path', metavar='PATH', nargs='?', type=Path, default=Path('.'), help='Base directory to take inventory of') args = parser.parse_args() if not args.list_files: inventory = defaultdict(list) for key_info, path in sops_files(args.path): inventory[key_info].append(path) for keys, files in sorted(inventory.items(), key=lambda kv: sorted(kv[0])): print(', '.join(sorted(keys)) + ':') for file in files: print(' - ' + str(file)) else: for _, file in sorted(sops_files(args.path), key=lambda kv: kv[1].parts): print(file) if __name__ == '__main__': os.exit(main())