From cfc871cce6aefaa0ff64619780a807cba761c6b2 Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Mon, 30 Jan 2023 12:20:23 +0100 Subject: ... --- tools/.keep | 0 tools/ca/ca/__main__.py | 667 ------------------------ tools/ca/default.nix | 25 - tools/ca/setup.py | 10 - tools/sops-inventory/default.nix | 19 + tools/sops-inventory/setup.py | 11 + tools/sops-inventory/sops_inventory/__init__.py | 0 tools/sops-inventory/sops_inventory/__main__.py | 85 +++ tools/tai64dec/default.nix | 18 - tools/tai64dec/setup.py | 10 - tools/tai64dec/tai64dec/__main__.py | 46 -- 11 files changed, 115 insertions(+), 776 deletions(-) create mode 100644 tools/.keep delete mode 100644 tools/ca/ca/__main__.py delete mode 100644 tools/ca/default.nix delete mode 100644 tools/ca/setup.py create mode 100644 tools/sops-inventory/default.nix create mode 100644 tools/sops-inventory/setup.py create mode 100644 tools/sops-inventory/sops_inventory/__init__.py create mode 100644 tools/sops-inventory/sops_inventory/__main__.py delete mode 100644 tools/tai64dec/default.nix delete mode 100644 tools/tai64dec/setup.py delete mode 100644 tools/tai64dec/tai64dec/__main__.py (limited to 'tools') diff --git a/tools/.keep b/tools/.keep new file mode 100644 index 00000000..e69de29b diff --git a/tools/ca/ca/__main__.py b/tools/ca/ca/__main__.py deleted file mode 100644 index bfaee63a..00000000 --- a/tools/ca/ca/__main__.py +++ /dev/null @@ -1,667 +0,0 @@ -import sys, os - -import logging -import argparse - -from inspect import signature - -from enum import Enum, auto -from contextlib import contextmanager - -from cryptography import __version__ as cryptography_version -from cryptography.hazmat.backends import openssl -from cryptography import x509 -from cryptography.x509.oid import NameOID, ExtendedKeyUsageOID, ExtensionOID -from cryptography.x509.extensions import ExtensionNotFound -from cryptography.hazmat.primitives import serialization, hashes -from cryptography.hazmat.primitives.serialization import PrivateFormat, pkcs12 -from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey -from cryptography.hazmat.primitives.asymmetric.ed448 import Ed448PrivateKey -from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey -from cryptography.hazmat.primitives.asymmetric import rsa -from pathlib import Path -from atomicwrites import atomic_write -from fqdn import FQDN -from datetime import datetime, timedelta, timezone -from math import ceil, ldexp -import re -from getpass import getpass -from itertools import count -from tempfile import TemporaryFile, mkstemp -import subprocess -import json -from leapseconddata import LeapSecondData -from collections.abc import Iterable -import ipaddress - - -class KeyType(Enum): - ED448 = 'ed448' - ED25519 = 'ed25519' - RSA4096 = 'rsa4096' - RSA2048 = 'rsa2048' - - def generate(self): - match self: - case KeyType.ED448: - return Ed448PrivateKey.generate() - case KeyType.ED25519: - return Ed25519PrivateKey.generate() - case KeyType.RSA4096: - return rsa.generate_private_key( - public_exponent = 65537, - key_size = 4096, - ) - case KeyType.RSA2048: - return rsa.generate_private_key( - public_exponent = 65537, - key_size = 2048, - ) - - def aligned(self, key): - match self: - case KeyType.ED448: - return isinstance(key, Ed448PrivateKey) - case KeyType.ED25519: - return isinstance(key, Ed25519PrivateKey) - case KeyType.RSA4096: - return isinstance(key, RSAPrivateKey) and key.key_size == 4096 - case KeyType.RSA2048: - return isinstance(key, RSAPrivateKey) and key.key_size == 2048 - - def __str__(self): - return self.value - - @classmethod - def from_string(cls, s): - try: - return cls(s) - except KeyError: - raise ValueError() - -class SupportedKeyUsage(Enum): - SERVER_AUTH = 'server' - CLIENT_AUTH = 'client' - - @property - def oid(self): - match self: - case SupportedKeyUsage.SERVER_AUTH: - return ExtendedKeyUsageOID.SERVER_AUTH - case SupportedKeyUsage.CLIENT_AUTH: - return ExtendedKeyUsageOID.CLIENT_AUTH - - def __str__(self): - return self.value - - @classmethod - def from_string(cls, s): - try: - return cls(s) - except KeyError: - raise ValueError() - -class ValidFQDN(FQDN): - def __init__(self, *args, **kwds): - super().__init__(*args, **kwds) - - if not self.is_valid: - raise ValueError(f'‘{self}’ is not valid') - -def duration(inp_str): - delta = timedelta() - - item_re = re.compile(r'\W*(?P\d+)\W*(?P(?i:d|h|m(?!s)|s|ms|µs))') - - match = item_re.match(inp_str) - while match: - val = int(match.group('value')) - unit = match.group('unit').lower() - - if unit == 'd': - delta += timedelta(days=val) - elif unit == 'h': - delta += timedelta(hours=val) - elif unit == 'm': - delta += timedelta(minutes=val) - elif unit == 's': - delta += timedelta(seconds=val) - elif unit == 'ms': - delta += timedelta(milliseconds=val) - elif unit == 'µs' or unit == 'us': - delta += timedelta(microseconds=val) - else: - raise ValueError(f'Unknown time unit ‘{unit:s}’') - - inp_str = inp_str[match.end():] - match = item_re.match(inp_str) - else: - if re.match('\w', inp_str): - raise ValueError(f'Parsing of duration resulted in leftovers: ‘{inp_str:s}’') - - return delta - -@contextmanager -def umask(desired_umask): - """ A little helper to safely set and restore umask(2). """ - try: - prev_umask = os.umask(0) - os.umask(prev_umask | desired_umask) - yield - finally: - os.umask(prev_umask) - -class BooleanAction(argparse.Action): - def __init__(self, option_strings, dest, nargs=None, **kwargs): - super(BooleanAction, self).__init__(option_strings, dest, nargs=0, **kwargs) - - def __call__(self, parser, namespace, values, option_string=None): - setattr(namespace, self.dest, False if option_string.startswith('--no') else True) - -class ExtendAction(argparse.Action): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.reset_dest = False - def __call__(self, parser, namespace, values, option_string=None): - if not self.reset_dest: - setattr(namespace, self.dest, []) - self.reset_dest = True - if isinstance(values, Iterable): - getattr(namespace, self.dest).extend(values) - else: - getattr(namespace, self.dest).append(values) - - -def load_key(keyfile, prompt='CA private key password: '): - key = None - with open(keyfile, 'rb') as f: - is_sops = False - try: - sops_json = json.load(f) - is_sops = 'sops' in sops_json - except json.JSONDecodeError: - pass - - f.seek(0) - - if not is_sops: - try: - key = serialization.load_pem_private_key(f.read(), password=None) - except TypeError: - pw = getpass(prompt=prompt) - key = serialization.load_pem_private_key(f.read(), password=bytes(pw, sys.stdin.encoding)) - else: - cmd = ['sops', '-d', f'/dev/fd/{f.fileno()}'] - with subprocess.Popen(cmd, stdout=subprocess.PIPE, pass_fds=(f.fileno(),)) as proc: - key = serialization.load_pem_private_key(proc.stdout.read(), password=None) - ret = proc.wait() - if ret != 0: - raise subprocess.CalledProcessErrror(ret, cmd) - - return key - -def mv_bak(path): - global logger - - bak_path = path.parent / f'{path.name}.bak' - for n in count(2): - if not bak_path.exists(): - break - bak_path = path.parent / f'{path.name}.bak{n}' - - try: - path.rename(bak_path) - except FileNotFoundError: - pass - else: - logger.warn('Renamed ‘%s’ to ‘%s’...', path, bak_path) - -def tai64nint(dt): - global leapsecond_data - - have_data = False - try: - have_data = bool(leapsecond_data) - except NameError: - pass - - if not have_data: - leapsecond_data = LeapSecondData.from_file(Path(os.getenv('LEAPSECONDS_FILE'))) - - tai_dt = leapsecond_data.to_tai(dt) - seconds = int(tai_dt.timestamp()) - nanoseconds = int((tai_dt.timestamp() - seconds) / 1e-9) - seconds += int(ldexp(1, 62)) - return seconds << 32 | nanoseconds - -def write_genkey(key_type, sops, keyfile): - if keyfile.exists(): - raise ValueError(f'Keyfile exists: {keyfile}') - - key = None - - def genkey(fh): - nonlocal key, key_type - - logger.debug('Generating new privkey...') - key = key_type.generate() - priv_bytes = key.private_bytes(encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption()) - fh.write(priv_bytes) - - if not sops: - with umask(0o0177), atomic_write(keyfile, overwrite=False, mode='wb') as fh: - logger.info('Writing new privkey to ‘%s’...', keyfile) - genkey(fh) - logger.debug('Adjusting permissions for ‘%s’...', keyfile) - os.chmod(keyfile, 0o0400) - else: - with TemporaryFile(mode='wb') as tf: - genkey(tf) - tf.seek(0) - - with umask(0o0177), atomic_write(keyfile, overwrite=False, mode='wb') as fh: - logger.info('Encrypting new privkey to ‘%s’...', keyfile) - subprocess.run(['sops', '-e', f'/dev/fd/{tf.fileno()}'], stdout=fh, pass_fds=(tf.fileno(),), check=True) - logger.debug('Adjusting permissions for ‘%s’...', keyfile) - os.chmod(keyfile, 0o0400) - - return key - -def to_dn(alternative_names): - def go(alternative_name): - dn = None - try: - dn = x509.Name.from_rfc4514_string(alternative_name) - except ValueError: - pass - - if dn: - logger.info('‘%s’ interpreted as directory name: %s', alternative_name, dn) - return x509.DirectoryName(dn) - - addr = None - try: - addr = ipaddress.IPv4Network(alternative_name) - except (ipaddress.AddressValueError, ipaddress.NetmaskValueError, ValueError): - pass - try: - addr = ipaddress.IPv4Address(alternative_name) - except ipaddress.AddressValueError: - pass - try: - addr = ipaddress.IPv6Network(alternative_name) - except (ipaddress.AddressValueError, ipaddress.NetmaskValueError, ValueError): - pass - try: - addr = ipaddress.IPv6Address(alternative_name) - except ipaddress.AddressValueError: - pass - - if addr: - logger.info('‘%s’ interpreted as ip address/subnet: %s', alternative_name, addr) - return x509.IPAddress(addr) - - return x509.DNSName(alternative_name) - - return map(go, alternative_names) - -def initca(ca_cert, ca_key, key_type, subject, clock_skew, validity, sops): - global logger - - key = None - try: - key = load_key(ca_key) - logger.info('Successfully loaded privkey from ‘%s’', ca_key) - - if not key_type.aligned(key): - logger.warn('Private key ‘%s’ does not align with requested type %s', ca_key, key_type) - - mv_bak(ca_key) - mv_bak(ca_cert) - - raise FileNotFoundError(f'Key does not align with requested type: {ca_key}') - except FileNotFoundError: - key = write_genkey(key_type, sops, ca_key) - - cert = None - try: - with open(ca_cert, 'rb') as fh: - cert = x509.load_pem_x509_certificate(fh.read()) - logger.info('Successfully loaded certificate from ‘%s’', ca_cert) - except FileNotFoundError: - logger.debug('Generating new certificate...') - - now = datetime.utcnow() - name = None - try: - name = x509.Name.from_rfc4514_string(subject) - logger.info('‘%s’ interpreted as directory name: %s', subject, name) - except ValueError: - name = x509.Name([ - x509.NameAttribute(NameOID.COMMON_NAME, subject) - ]) - - cert = x509.CertificateBuilder().subject_name( - name - ).public_key( - key.public_key() - ).serial_number( - x509.random_serial_number() - ).not_valid_before( - now - clock_skew - ).not_valid_after( - now + validity - ).issuer_name( - name - ).add_extension( - x509.AuthorityKeyIdentifier.from_issuer_public_key(key.public_key()), - False - ).add_extension( - x509.SubjectKeyIdentifier.from_public_key(key.public_key()), - False - ).add_extension( - x509.KeyUsage(digital_signature=True, content_commitment=False, key_encipherment=False, data_encipherment=False, key_agreement=False, key_cert_sign=True, crl_sign=True, encipher_only=False, decipher_only=False), - True - ).add_extension( - x509.BasicConstraints(ca=True, path_length=None), - True - ).sign(key, None if isinstance(key, Ed25519PrivateKey) or isinstance(key, Ed448PrivateKey) else hashes.SHA512()) - - with umask(0o0133), atomic_write(ca_cert, overwrite=False, mode='wb') as cf: - logger.info('Writing new certificate to ‘%s’...', ca_cert) - cf.write(cert.public_bytes(serialization.Encoding.PEM)) - logger.debug('Adjusting permissions for ‘%s’...', ca_cert) - os.chmod(ca_cert, 0o0444) - -def signcsr(ca_cert, ca_key, clock_skew, validity, subject, alternative_name, key_usage, ignore_alternative_names, csr, output): - if not key_usage: - raise InvalidParamsError('No extended key usages specified') - - csr_bytes = None - try: - csr_bytes = csr.read() - except AttributeError: - csr_bytes = csr - - csr = x509.load_pem_x509_csr(csr_bytes) - name = None - if not subject: - name = csr.subject - else: - try: - name = x509.Name.from_rfc4514_string(subject) - logger.info('‘%s’ interpreted as directory name: %s', subject, name) - except ValueError: - name = x509.Name([ - x509.NameAttribute(NameOID.COMMON_NAME, subject) - ]) - - if not ignore_alternative_names: - try: - ext = csr.extensions.get_extension_for_oid(ExtensionOID.SUBJECT_ALTERNATIVE_NAME) - csr_alt_names = set(ext.value) - logger.warn('Using alternative names from csr: %s', csr_alt_names) - alternative_name = set(to_dn(alternative_name)) | csr_alt_names - except ExtensionNotFound: - pass - else: - alternative_name = to_dn(alternative_name) - - ca_key = load_key(ca_key) - with open(ca_cert, 'rb') as fh: - ca_cert = x509.load_pem_x509_certificate(fh.read()) - - now = datetime.now(tz=timezone.utc) - cert = x509.CertificateBuilder().subject_name( - name - ).public_key( - csr.public_key() - ).serial_number( - (tai64nint(now) << 24) | (x509.random_serial_number() & int(ldexp(1, 24) - 1)) - ).not_valid_before( - now - clock_skew - ).not_valid_after( - now + validity - ).issuer_name( - ca_cert.subject - ).add_extension( - x509.AuthorityKeyIdentifier.from_issuer_public_key(ca_cert.public_key()), - False - ).add_extension( - x509.SubjectKeyIdentifier.from_public_key(csr.public_key()), - False - ).add_extension( - x509.KeyUsage(digital_signature=True, content_commitment=True, key_encipherment=True, data_encipherment=False, key_agreement=False, key_cert_sign=False, crl_sign=False, encipher_only=False, decipher_only=False), - True - ).add_extension( - x509.BasicConstraints(ca=False, path_length=None), - True - ).add_extension( - x509.ExtendedKeyUsage(list(map(lambda ku: ku.oid, key_usage))), - False - ) - - if alternative_name: - cert = cert.add_extension( - x509.SubjectAlternativeName(alternative_name), - False - ) - - cert = cert.sign(ca_key, None if isinstance(ca_key, Ed25519PrivateKey) or isinstance(ca_key, Ed448PrivateKey) else hashes.SHA256()) - - output = output.with_suffix('.crt') - - mv_bak(output) - with umask(0o0133), atomic_write(output, overwrite=False, mode='wb') as cf: - logger.info('Writing new certificate to ‘%s’...', output) - cf.write(cert.public_bytes(serialization.Encoding.PEM)) - logger.debug('Adjusting permissions for ‘%s’...', output) - os.chmod(output, 0o0444) - -def new_client(ca_cert, ca_key, key_type, clock_skew, validity, subject, alternative_name, key_usage, sops, output): - key_file = output.with_suffix('.key') - cert_file = output.with_suffix('.crt') - - key = None - try: - key = load_key(key_file) - logger.info('Successfully loaded privkey from ‘%s’', key_file) - - if not key_type.aligned(key): - logger.warn('Private key ‘%s’ does not align with requested type %s', key_file, key_type) - - mv_bak(key_file) - mv_bak(cert_file) - - raise FileNotFoundError(f'Key does not align with requested type: {key_file}') - except FileNotFoundError: - key = write_genkey(key_type, sops, key_file) - - name = None - try: - name = x509.Name.from_rfc4514_string(subject) - logger.info('‘%s’ interpreted as directory name: %s', subject, name) - except ValueError: - name = x509.Name([ - x509.NameAttribute(NameOID.COMMON_NAME, subject) - ]) - - csr = x509.CertificateSigningRequestBuilder().subject_name(name) - - if alternative_name: - csr = csr.add_extension( - x509.SubjectAlternativeName( - to_dn(alternative_name) - ), - False - ) - - return signcsr( - ca_cert=ca_cert, - ca_key=ca_key, - clock_skew=clock_skew, - validity=validity, - subject=None, - alternative_name=[], - key_usage=key_usage, - ignore_alternative_names=False, - output=cert_file, - csr=csr.sign( - key, - None if isinstance(key, Ed25519PrivateKey) or isinstance(key, Ed448PrivateKey) else hashes.SHA256(), - ).public_bytes(serialization.Encoding.PEM) - ) - -def to_pkcs12(random_password, random_password_length, weak_encryption, filename, temporary_output, output): - key_file = filename.with_suffix('.key') - cert_file = filename.with_suffix('.crt') - - output_handle = None - if not output: - if not temporary_output: - output = filename.with_suffix('.p12') - else: - output_handle, output = mkstemp(suffix='.p12', prefix=filename.stem + '.') - - key = load_key(key_file) - logger.info('Successfully loaded privkey from ‘%s’', key_file) - cert = None - with open(cert_file, mode='rb') as fh: - cert = x509.load_pem_x509_certificate(fh.read()) - logger.info('Successfully loaded certificate from ‘%s’', cert_file) - - with umask(0o0177), atomic_write(output, overwrite=False, mode='wb') if not output_handle else os.fdopen(output_handle, mode='wb') as fh: - logger.info('Writing to ‘%s’...', output) - common_name_attrs = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME) - if len(common_name_attrs) != 1: - raise InvalidParamsError('Invalid name structure in cert') - subject = common_name_attrs[0].value.lower() - - pw = None - if not random_password: - pw2 = None - while not pw2 or pw2 != pw: - pw = getpass(prompt='Password: ') - if not pw: - pw = None - break - else: - pw2 = getpass(prompt='Repeat password: ') - else: - from xkcdpass import xkcd_password as xp - ws = xp.generate_wordlist(wordfile=xp.locate_wordfile()) - pw = xp.generate_xkcdpassword(ws, numwords=random_password_length) - print(f'Password: {pw}', file=sys.stderr) - - encryption = None - if pw: - encryption = PrivateFormat.PKCS12.encryption_builder().kdf_rounds( - 500000 if not weak_encryption else 50000 - ).key_cert_algorithm( - pkcs12.PBES.PBESv2SHA256AndAES256CBC if not weak_encryption else pkcs12.PBES.PBESv1SHA1And3KeyTripleDESCBC - ).hmac_hash( - hashes.SHA256() if not weak_encryption else hashes.SHA1() - ).build(bytes(pw, 'utf-8')) - fh.write(pkcs12.serialize_key_and_certificates( - bytes(subject, 'utf-8'), - key, - cert, - None, - encryption, - )) - logger.debug('Adjusting permissions for ‘%s’...', output) - os.chmod(output, 0o0400) - - if temporary_output: - print(f'Temporary output file: {output}', file=sys.stderr) - - -def main(): - global logger - logger = logging.getLogger(__name__) - console_handler = logging.StreamHandler() - console_handler.setFormatter( logging.Formatter('[%(levelname)s](%(name)s): %(message)s') ) - if sys.stderr.isatty(): - console_handler.setFormatter( logging.Formatter('%(asctime)s [%(levelname)s](%(name)s): %(message)s') ) - logger.addHandler(console_handler) - - # log uncaught exceptions - def log_exceptions(type, value, tb): - global logger - - logger.error(value) - sys.__excepthook__(type, value, tb) # calls default excepthook - - sys.excepthook = log_exceptions - - - parser = argparse.ArgumentParser(prog='ca', formatter_class=argparse.ArgumentDefaultsHelpFormatter) - parser.add_argument('--verbosity', dest='log_level', action='append', type=int, help='Numeric verbosity') - parser.add_argument('--verbose', '-v', dest='log_level', action='append_const', const=1, help='Increase verbosity') - parser.add_argument('--quiet', '-q', dest='log_level', action='append_const', const=-1, help='Decrease verbosity') - subparsers = parser.add_subparsers(help='Subcommands', required=True) - - subparser = subparsers.add_parser('init', aliases=['initca', 'init-ca', 'ca'], formatter_class=argparse.ArgumentDefaultsHelpFormatter, description="Generate a new selfsigned CA certificate and associated private key\n\nPrivate key is only generated if it does not yet exist") - subparser.add_argument('--ca-cert', type=Path, default=Path('ca.crt'), help='Path to file containing CA certificate') - subparser.add_argument('--ca-key', type=Path, default=Path('ca.key'), help='Path to file containing CA private key') - subparser.add_argument('--key-type', type=KeyType.from_string, choices=list(KeyType), default=KeyType.ED448.value, help='Type of private key to generate') - subparser.add_argument('--clock-skew', metavar='DURATION', type=duration, default=timedelta(minutes=5), help='How far to shift begin of validity into the past') - subparser.add_argument('--validity', metavar='DURATION', type=duration, default=timedelta(days=ceil(365.2425*10)), help='How far to shift end of validity into the future') - subparser.add_argument('--sops', '--no-sops', action=BooleanAction, default=True, help='Encrypt private key using SOPS') - subparser.add_argument('--subject', metavar='DN', type=str, required=True, help='Subject name') - subparser.set_defaults(cmd=initca) - - subparser = subparsers.add_parser('sign', aliases=['signcsr', 'sign-csr'], formatter_class=argparse.ArgumentDefaultsHelpFormatter, description='Sign an existing CSR') - subparser.add_argument('--ca-cert', type=Path, default=Path('ca.crt'), help='Path to file containing CA certificate') - subparser.add_argument('--ca-key', type=Path, default=Path('ca.key'), help='Path to file containing CA private key') - subparser.add_argument('--clock-skew', metavar='DURATION', type=duration, default=timedelta(minutes=5), help='How far to shift begin of validity into the past') - subparser.add_argument('--validity', metavar='DURATION', type=duration, default=timedelta(days=ceil(365.2425*10)), help='How far to shift end of validity into the future') - subparser.add_argument('--subject', metavar='DN', type=str, required=False, help='Override subject name') - subparser.add_argument('--ignore-alternative-names', '--no-ignore-alternative-names', action=BooleanAction, default=True, help='Ignore subject alternative names provided in CSR') - subparser.add_argument('--key-usage', metavar='KEY_USAGE', type=SupportedKeyUsage, action=ExtendAction, default=[SupportedKeyUsage.CLIENT_AUTH], help='Allowed key usages') - subparser.add_argument('--alternative-name', metavar='CN', type=str, action='append', help='Subject alternative names') - subparser.add_argument('--output', type=Path, required=True, help='Output path') - subparser.add_argument('csr', metavar='FILE', type=argparse.FileType(mode='rb'), help='Path to file containing CSR') - subparser.set_defaults(cmd=signcsr) - - subparser = subparsers.add_parser('new-client', aliases=['new', 'new-client', 'client'], formatter_class=argparse.ArgumentDefaultsHelpFormatter, description='Generate a new CSR and sign it immediately') - subparser.add_argument('--ca-cert', type=Path, default=Path('ca.crt'), help='Path to file containing CA certificate') - subparser.add_argument('--ca-key', type=Path, default=Path('ca.key'), help='Path to file containing CA private key') - subparser.add_argument('--key-type', type=KeyType.from_string, choices=list(KeyType), default=KeyType.ED25519.value, help='Type of private key to generate') - subparser.add_argument('--clock-skew', metavar='DURATION', type=duration, default=timedelta(minutes=5), help='How far to shift begin of validity into the past') - subparser.add_argument('--validity', metavar='DURATION', type=duration, default=timedelta(days=ceil(365.2425*10)), help='How far to shift end of validity into the future') - subparser.add_argument('--sops', '--no-sops', action=BooleanAction, default=True, help='Encrypt private key using SOPS') - subparser.add_argument('--subject', metavar='DN', type=str, required=True, help='Subject name') - subparser.add_argument('--key-usage', metavar='KEY_USAGE', type=SupportedKeyUsage, action=ExtendAction, default=[SupportedKeyUsage.CLIENT_AUTH], help='Allowed key usages') - subparser.add_argument('--alternative-name', metavar='CN', type=str, action='append', help='Subject alternative names') - subparser.add_argument('--output', type=Path, required=True, help='Output path') - subparser.set_defaults(cmd=new_client) - - subparser = subparsers.add_parser('pkcs12', aliases=['p12', 'pfx'], formatter_class=argparse.ArgumentDefaultsHelpFormatter, description='Convert existing certificate and private key to PKCS#12 format') - subparser.add_argument('--random-password', '--no-random-password', action=BooleanAction, default=True, help='Encrypt PKCS#12 file with random passphrase -- otherwise prompt for one') - subparser.add_argument('--random-password-length', type=int, default=12, help='Number of words in random passphrase') - subparser.add_argument('--weak-encryption', '--no-weak-encryption', action=BooleanAction, default=False, help='Use weak, but more compatible, encryption') - subparser.add_argument('--temporary-output', '--no-temporary-output', action=BooleanAction, default=True, help='If output path is not given, generate output file in temporary directory') - subparser.add_argument('--output', type=Path, help='Output path') - subparser.add_argument('filename', metavar='BASENAME', type=Path, help='Input path') - subparser.set_defaults(cmd=to_pkcs12) - - args = parser.parse_args() - - - LOG_LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL] - DEFAULT_LOG_LEVEL = logging.INFO - log_level = LOG_LEVELS.index(DEFAULT_LOG_LEVEL) - - for adjustment in args.log_level or (): - log_level = min(len(LOG_LEVELS) - 1, max(log_level - adjustment, 0)) - logger.setLevel(LOG_LEVELS[log_level]) - - - logger.debug('Using cryptography %s (%s)', cryptography_version, openssl.backend.openssl_version_text()) - - - args.cmd(**{ k: v for k, v in vars(args).items() if k in signature(args.cmd).parameters.keys() }) - -if __name__ == '__main__': - sys.exit(main()) diff --git a/tools/ca/default.nix b/tools/ca/default.nix deleted file mode 100644 index c5fe0cea..00000000 --- a/tools/ca/default.nix +++ /dev/null @@ -1,25 +0,0 @@ -{ system, self, mach-nix, leapseconds, ... }: -let - pkgs = self.legacyPackages.${system}; -in mach-nix.lib.${system}.buildPythonPackage { - pname = "ca"; - src = pkgs.lib.sourceByRegex ./. ["^setup\.py$" "^ca(/[^/]+.*)?$"]; - version = "0.0.0"; - ignoreDataOutdated = true; - - requirements = '' - cryptography >=38.0.0 - fqdn - atomicwrites - leapseconddata - xkcdpass - ''; - - _.cryptography.buildInputs = with pkgs; [ openssl ]; - - postInstall = '' - wrapProgram $out/bin/ca \ - --set-default LEAPSECONDS_FILE ${leapseconds} \ - --prefix PATH : ${pkgs.lib.makeBinPath (with pkgs; [sops])} - ''; -} diff --git a/tools/ca/setup.py b/tools/ca/setup.py deleted file mode 100644 index 3342a7a6..00000000 --- a/tools/ca/setup.py +++ /dev/null @@ -1,10 +0,0 @@ -from setuptools import setup - -setup(name='ca', - packages=['ca'], - entry_points={ - 'console_scripts': [ - 'ca=ca.__main__:main' - ], - }, -) diff --git a/tools/sops-inventory/default.nix b/tools/sops-inventory/default.nix new file mode 100644 index 00000000..94c455e5 --- /dev/null +++ b/tools/sops-inventory/default.nix @@ -0,0 +1,19 @@ +{ system, self, mach-nix, ... }: +let + pkgs = self.legacyPackages.${system}; +in mach-nix.lib.${system}.buildPythonPackage { + pname = "sops-inventory"; + version = "0.0.0"; + + src = pkgs.lib.sourceByRegex ./. ["^setup\.py$" "^sops_inventory(/[^/]+.*)?$"]; + + ignoreDataOutdated = true; + requirements = '' + pyyaml + ''; + + postInstall = '' + wrapProgram $out/bin/sops-inventory \ + --set-default SOPS_INVENTORY_BASE ${self} + ''; +} diff --git a/tools/sops-inventory/setup.py b/tools/sops-inventory/setup.py new file mode 100644 index 00000000..3ea2a5d1 --- /dev/null +++ b/tools/sops-inventory/setup.py @@ -0,0 +1,11 @@ +from setuptools import setup + +setup( + name='sops-inventory', + packages=['sops_inventory'], + entry_points={ + 'console_scripts': [ + 'sops-inventory=sops_inventory.__main__:main' + ], + }, +) diff --git a/tools/sops-inventory/sops_inventory/__init__.py b/tools/sops-inventory/sops_inventory/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tools/sops-inventory/sops_inventory/__main__.py b/tools/sops-inventory/sops_inventory/__main__.py new file mode 100644 index 00000000..68f72b60 --- /dev/null +++ b/tools/sops-inventory/sops_inventory/__main__.py @@ -0,0 +1,85 @@ +import os,sys + +from pathlib import Path +from collections import deque, defaultdict + +import argparse + +from yaml import load, YAMLError +try: + from yaml import CLoader as Loader +except ImportError: + from yaml import Loader + + +SOPS_TYPES = frozenset({'kms', 'gcp_kms', 'azure_kv', 'hc_vault', 'age', 'pgp'}) + + +class BooleanAction(argparse.Action): + def __init__(self, option_strings, dest, nargs=None, **kwargs): + super(BooleanAction, self).__init__(option_strings, dest, nargs=0, **kwargs) + + def __call__(self, parser, namespace, values, option_string=None): + setattr(namespace, self.dest, False if option_string.startswith('--no') else True) + + +def main(): + default_base = os.getenv('SOPS_INVENTORY_BASE', default=[]) + if default_base: + default_base = Path(default_base) + + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument('--list-files', '--no-list-files', action=BooleanAction, default=False, help='Only list sops files') + parser.add_argument('path', metavar='PATH', nargs='?' if default_base else None, type=Path, default=default_base, help='Base directory to take inventory of') + args = parser.parse_args() + + inventory = defaultdict(set) + + queue = deque([args.path]) + while queue: + baseDir = queue.popleft() + for child in baseDir.iterdir(): + if child.is_dir(): + queue.append(child) + else: + try: + with child.open(mode='r') as fh: + yaml = load(fh, Loader=Loader) + if not yaml: + raise ValueError('Could not parse YAML') + if not isinstance(yaml, dict) or not 'sops' in yaml: + raise ValueError('Did not find "sops" key') + sops = yaml['sops'] + + key_info = set() + for k in SOPS_TYPES: + if k in sops: + v = sops[k] + if not v: + continue + + match k: + case 'pgp': + for r in v: + key_info.add(r['fp']) + case 'age': + for r in v: + key_info.add(r['recipient']) + case _: + raise NotImplementedError + inventory[frozenset(key_info)].add(child.relative_to(args.path)) + except (YAMLError, ValueError) as e: + pass + + if not args.list_files: + for keys, files in inventory.items(): + print(','.join(keys) + ':') + for file in files: + print(' - ' + str(file)) + else: + for _, files in inventory.items(): + for file in files: + print(file) + +if __name__ == '__main__': + os.exit(main()) diff --git a/tools/tai64dec/default.nix b/tools/tai64dec/default.nix deleted file mode 100644 index 380c22bf..00000000 --- a/tools/tai64dec/default.nix +++ /dev/null @@ -1,18 +0,0 @@ -{ system, self, mach-nix, leapseconds, ... }: -let - pkgs = self.legacyPackages.${system}; -in mach-nix.lib.${system}.buildPythonPackage { - pname = "tai64dec"; - src = pkgs.lib.sourceByRegex ./. ["^setup\.py$" "^tai64dec(/[^/]+.*)?$"]; - version = "0.0.0"; - ignoreDataOutdated = true; - - requirements = '' - leapseconddata - ''; - - postInstall = '' - wrapProgram $out/bin/tai64dec \ - --set-default LEAPSECONDS_FILE ${leapseconds} - ''; -} diff --git a/tools/tai64dec/setup.py b/tools/tai64dec/setup.py deleted file mode 100644 index d936796b..00000000 --- a/tools/tai64dec/setup.py +++ /dev/null @@ -1,10 +0,0 @@ -from setuptools import setup - -setup(name='tai64dec', - packages=['tai64dec'], - entry_points={ - 'console_scripts': [ - 'tai64dec=tai64dec.__main__:main' - ], - }, -) diff --git a/tools/tai64dec/tai64dec/__main__.py b/tools/tai64dec/tai64dec/__main__.py deleted file mode 100644 index a8854523..00000000 --- a/tools/tai64dec/tai64dec/__main__.py +++ /dev/null @@ -1,46 +0,0 @@ -import sys, os - -import argparse - -from leapseconddata import LeapSecondData -from math import ldexp -from pathlib import Path -from datetime import datetime, timezone -import secrets - - -class BooleanAction(argparse.Action): - def __init__(self, option_strings, dest, nargs=None, **kwargs): - super(BooleanAction, self).__init__(option_strings, dest, nargs=0, **kwargs) - - def __call__(self, parser, namespace, values, option_string=None): - setattr(namespace, self.dest, False if option_string.startswith('--no') else True) - - -def main(): - parser = argparse.ArgumentParser(prog='tai64dec', formatter_class=argparse.ArgumentDefaultsHelpFormatter) - parser.add_argument('--random', '--no-random', action=BooleanAction, default=False) - parser.add_argument('--ns', '--no-ns', action=BooleanAction, default=True) - args = parser.parse_args() - - - leapsecond_data = LeapSecondData.from_file(Path(os.getenv('LEAPSECONDS_FILE'))) - - now = datetime.now(tz=timezone.utc) - - tai_dt = leapsecond_data.to_tai(now) - seconds = int(tai_dt.timestamp()) - seconds += int(ldexp(1, 62)) - out = seconds - - if args.ns: - nanoseconds = int((tai_dt.timestamp() - seconds) / 1e-9) - out = out << 32 | nanoseconds - - if args.random: - out = out << 24 | int.from_bytes(secrets.token_bytes(3), byteorder='little', signed=False) - - print('{:d}'.format(out), file=sys.stdout) - -if __name__ == '__main__': - sys.exit(main()) -- cgit v1.2.3