From 6b0fd8d0197eccd18a6e079e5d9c58d40002ba44 Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Wed, 18 Jan 2023 08:33:46 +0100 Subject: ... --- tools/ca/ca/__main__.py | 207 +++++++++++++++++++++++++++++------------------- 1 file changed, 125 insertions(+), 82 deletions(-) (limited to 'tools/ca') diff --git a/tools/ca/ca/__main__.py b/tools/ca/ca/__main__.py index 6615da55..bfaee63a 100644 --- a/tools/ca/ca/__main__.py +++ b/tools/ca/ca/__main__.py @@ -32,6 +32,7 @@ import subprocess import json from leapseconddata import LeapSecondData from collections.abc import Iterable +import ipaddress class KeyType(Enum): @@ -208,8 +209,12 @@ def mv_bak(path): break bak_path = path.parent / f'{path.name}.bak{n}' - logger.warn('Renaming ‘%s’ to ‘%s’...', path, bak_path) - path.rename(bak_path) + try: + path.rename(bak_path) + except FileNotFoundError: + pass + else: + logger.warn('Renamed ‘%s’ to ‘%s’...', path, bak_path) def tai64nint(dt): global leapsecond_data @@ -262,6 +267,44 @@ def write_genkey(key_type, sops, keyfile): return key +def to_dn(alternative_names): + def go(alternative_name): + dn = None + try: + dn = x509.Name.from_rfc4514_string(alternative_name) + except ValueError: + pass + + if dn: + logger.info('‘%s’ interpreted as directory name: %s', alternative_name, dn) + return x509.DirectoryName(dn) + + addr = None + try: + addr = ipaddress.IPv4Network(alternative_name) + except (ipaddress.AddressValueError, ipaddress.NetmaskValueError, ValueError): + pass + try: + addr = ipaddress.IPv4Address(alternative_name) + except ipaddress.AddressValueError: + pass + try: + addr = ipaddress.IPv6Network(alternative_name) + except (ipaddress.AddressValueError, ipaddress.NetmaskValueError, ValueError): + pass + try: + addr = ipaddress.IPv6Address(alternative_name) + except ipaddress.AddressValueError: + pass + + if addr: + logger.info('‘%s’ interpreted as ip address/subnet: %s', alternative_name, addr) + return x509.IPAddress(addr) + + return x509.DNSName(alternative_name) + + return map(go, alternative_names) + def initca(ca_cert, ca_key, key_type, subject, clock_skew, validity, sops): global logger @@ -273,14 +316,8 @@ def initca(ca_cert, ca_key, key_type, subject, clock_skew, validity, sops): if not key_type.aligned(key): logger.warn('Private key ‘%s’ does not align with requested type %s', ca_key, key_type) - try: - mv_bak(ca_key) - except FileNotFoundError: - pass - try: - mv_bak(ca_cert) - except FileNotFoundError: - pass + mv_bak(ca_key) + mv_bak(ca_cert) raise FileNotFoundError(f'Key does not align with requested type: {ca_key}') except FileNotFoundError: @@ -295,9 +332,14 @@ def initca(ca_cert, ca_key, key_type, subject, clock_skew, validity, sops): logger.debug('Generating new certificate...') now = datetime.utcnow() - name = x509.Name([ - x509.NameAttribute(NameOID.COMMON_NAME, subject.relative) - ]) + name = None + try: + name = x509.Name.from_rfc4514_string(subject) + logger.info('‘%s’ interpreted as directory name: %s', subject, name) + except ValueError: + name = x509.Name([ + x509.NameAttribute(NameOID.COMMON_NAME, subject) + ]) cert = x509.CertificateBuilder().subject_name( name @@ -342,24 +384,28 @@ def signcsr(ca_cert, ca_key, clock_skew, validity, subject, alternative_name, ke csr_bytes = csr csr = x509.load_pem_x509_csr(csr_bytes) + name = None if not subject: - common_name_attrs = csr.subject.get_attributes_for_oid(NameOID.COMMON_NAME) - if len(common_name_attrs) != 1: - raise InvalidParamsError('Invalid name structure in CSR') - subject = common_name_attrs[0].value.lower() - logger.warn('Using subject common name from csr: %s', subject) - name = x509.Name([ - x509.NameAttribute(NameOID.COMMON_NAME, subject) - ]) + name = csr.subject + else: + try: + name = x509.Name.from_rfc4514_string(subject) + logger.info('‘%s’ interpreted as directory name: %s', subject, name) + except ValueError: + name = x509.Name([ + x509.NameAttribute(NameOID.COMMON_NAME, subject) + ]) if not ignore_alternative_names: try: ext = csr.extensions.get_extension_for_oid(ExtensionOID.SUBJECT_ALTERNATIVE_NAME) - csr_alt_names = ext.value.get_values_for_type(x509.DNSName) + csr_alt_names = set(ext.value) logger.warn('Using alternative names from csr: %s', csr_alt_names) - alternative_name = list(set(alternative_name) | set(csr_alt_names)) + alternative_name = set(to_dn(alternative_name)) | csr_alt_names except ExtensionNotFound: pass + else: + alternative_name = to_dn(alternative_name) ca_key = load_key(ca_key) with open(ca_cert, 'rb') as fh: @@ -397,9 +443,7 @@ def signcsr(ca_cert, ca_key, clock_skew, validity, subject, alternative_name, ke if alternative_name: cert = cert.add_extension( - x509.SubjectAlternativeName( - list(map(x509.DNSName, alternative_name)) - ), + x509.SubjectAlternativeName(alternative_name), False ) @@ -407,10 +451,7 @@ def signcsr(ca_cert, ca_key, clock_skew, validity, subject, alternative_name, ke output = output.with_suffix('.crt') - try: - mv_bak(output) - except FileNotFoundError: - pass + mv_bak(output) with umask(0o0133), atomic_write(output, overwrite=False, mode='wb') as cf: logger.info('Writing new certificate to ‘%s’...', output) cf.write(cert.public_bytes(serialization.Encoding.PEM)) @@ -429,26 +470,28 @@ def new_client(ca_cert, ca_key, key_type, clock_skew, validity, subject, alterna if not key_type.aligned(key): logger.warn('Private key ‘%s’ does not align with requested type %s', key_file, key_type) - try: - mv_bak(key_file) - except FileNotFoundError: - pass - try: - mv_bak(cert_file) - except FileNotFoundError: - pass + mv_bak(key_file) + mv_bak(cert_file) raise FileNotFoundError(f'Key does not align with requested type: {key_file}') except FileNotFoundError: key = write_genkey(key_type, sops, key_file) - csr = x509.CertificateSigningRequestBuilder().subject_name(x509.Name([ - x509.NameAttribute(NameOID.COMMON_NAME, subject) - ])) + name = None + try: + name = x509.Name.from_rfc4514_string(subject) + logger.info('‘%s’ interpreted as directory name: %s', subject, name) + except ValueError: + name = x509.Name([ + x509.NameAttribute(NameOID.COMMON_NAME, subject) + ]) + + csr = x509.CertificateSigningRequestBuilder().subject_name(name) + if alternative_name: csr = csr.add_extension( x509.SubjectAlternativeName( - list(map(x509.DNSName, alternative_name)) + to_dn(alternative_name) ), False ) @@ -553,54 +596,54 @@ def main(): parser = argparse.ArgumentParser(prog='ca', formatter_class=argparse.ArgumentDefaultsHelpFormatter) - parser.add_argument('--verbosity', dest='log_level', action='append', type=int) - parser.add_argument('--verbose', '-v', dest='log_level', action='append_const', const=1) - parser.add_argument('--quiet', '-q', dest='log_level', action='append_const', const=-1) + parser.add_argument('--verbosity', dest='log_level', action='append', type=int, help='Numeric verbosity') + parser.add_argument('--verbose', '-v', dest='log_level', action='append_const', const=1, help='Increase verbosity') + parser.add_argument('--quiet', '-q', dest='log_level', action='append_const', const=-1, help='Decrease verbosity') subparsers = parser.add_subparsers(help='Subcommands', required=True) - subparser = subparsers.add_parser('init', aliases=['initca', 'init-ca', 'ca'], formatter_class=argparse.ArgumentDefaultsHelpFormatter) - subparser.add_argument('--ca-cert', type=Path, default=Path('ca.crt')) - subparser.add_argument('--ca-key', type=Path, default=Path('ca.key')) - subparser.add_argument('--key-type', type=KeyType.from_string, choices=list(KeyType), default=KeyType.ED448.value) - subparser.add_argument('--clock-skew', metavar='DURATION', type=duration, default=timedelta(minutes=5)) - subparser.add_argument('--validity', metavar='DURATION', type=duration, default=timedelta(days=ceil(365.2425*10))) - subparser.add_argument('--sops', '--no-sops', action=BooleanAction, default=True) - subparser.add_argument('--subject', metavar='FQDN', type=ValidFQDN, required=True) + subparser = subparsers.add_parser('init', aliases=['initca', 'init-ca', 'ca'], formatter_class=argparse.ArgumentDefaultsHelpFormatter, description="Generate a new selfsigned CA certificate and associated private key\n\nPrivate key is only generated if it does not yet exist") + subparser.add_argument('--ca-cert', type=Path, default=Path('ca.crt'), help='Path to file containing CA certificate') + subparser.add_argument('--ca-key', type=Path, default=Path('ca.key'), help='Path to file containing CA private key') + subparser.add_argument('--key-type', type=KeyType.from_string, choices=list(KeyType), default=KeyType.ED448.value, help='Type of private key to generate') + subparser.add_argument('--clock-skew', metavar='DURATION', type=duration, default=timedelta(minutes=5), help='How far to shift begin of validity into the past') + subparser.add_argument('--validity', metavar='DURATION', type=duration, default=timedelta(days=ceil(365.2425*10)), help='How far to shift end of validity into the future') + subparser.add_argument('--sops', '--no-sops', action=BooleanAction, default=True, help='Encrypt private key using SOPS') + subparser.add_argument('--subject', metavar='DN', type=str, required=True, help='Subject name') subparser.set_defaults(cmd=initca) - subparser = subparsers.add_parser('sign', aliases=['signcsr', 'sign-csr'], formatter_class=argparse.ArgumentDefaultsHelpFormatter) - subparser.add_argument('--ca-cert', type=Path, default=Path('ca.crt')) - subparser.add_argument('--ca-key', type=Path, default=Path('ca.key')) - subparser.add_argument('--clock-skew', metavar='DURATION', type=duration, default=timedelta(minutes=5)) - subparser.add_argument('--validity', metavar='DURATION', type=duration, default=timedelta(days=ceil(365.2425*10))) - subparser.add_argument('--subject', metavar='CN', type=str, required=False) - subparser.add_argument('--ignore-alternative-names', '--no-ignore-alternative-names', action=BooleanAction, default=True) - subparser.add_argument('--key-usage', metavar='KEY_USAGE', type=SupportedKeyUsage, action=ExtendAction, default=[SupportedKeyUsage.CLIENT_AUTH]) - subparser.add_argument('--alternative-name', metavar='CN', type=str, action='append') - subparser.add_argument('--output', type=Path, required=True) - subparser.add_argument('csr', metavar='FILE', type=argparse.FileType(mode='rb')) + subparser = subparsers.add_parser('sign', aliases=['signcsr', 'sign-csr'], formatter_class=argparse.ArgumentDefaultsHelpFormatter, description='Sign an existing CSR') + subparser.add_argument('--ca-cert', type=Path, default=Path('ca.crt'), help='Path to file containing CA certificate') + subparser.add_argument('--ca-key', type=Path, default=Path('ca.key'), help='Path to file containing CA private key') + subparser.add_argument('--clock-skew', metavar='DURATION', type=duration, default=timedelta(minutes=5), help='How far to shift begin of validity into the past') + subparser.add_argument('--validity', metavar='DURATION', type=duration, default=timedelta(days=ceil(365.2425*10)), help='How far to shift end of validity into the future') + subparser.add_argument('--subject', metavar='DN', type=str, required=False, help='Override subject name') + subparser.add_argument('--ignore-alternative-names', '--no-ignore-alternative-names', action=BooleanAction, default=True, help='Ignore subject alternative names provided in CSR') + subparser.add_argument('--key-usage', metavar='KEY_USAGE', type=SupportedKeyUsage, action=ExtendAction, default=[SupportedKeyUsage.CLIENT_AUTH], help='Allowed key usages') + subparser.add_argument('--alternative-name', metavar='CN', type=str, action='append', help='Subject alternative names') + subparser.add_argument('--output', type=Path, required=True, help='Output path') + subparser.add_argument('csr', metavar='FILE', type=argparse.FileType(mode='rb'), help='Path to file containing CSR') subparser.set_defaults(cmd=signcsr) - subparser = subparsers.add_parser('new-client', aliases=['new', 'new-client', 'client'], formatter_class=argparse.ArgumentDefaultsHelpFormatter) - subparser.add_argument('--ca-cert', type=Path, default=Path('ca.crt')) - subparser.add_argument('--ca-key', type=Path, default=Path('ca.key')) - subparser.add_argument('--key-type', type=KeyType.from_string, choices=list(KeyType), default=KeyType.ED25519.value) - subparser.add_argument('--clock-skew', metavar='DURATION', type=duration, default=timedelta(minutes=5)) - subparser.add_argument('--validity', metavar='DURATION', type=duration, default=timedelta(days=ceil(365.2425*10))) - subparser.add_argument('--sops', '--no-sops', action=BooleanAction, default=True) - subparser.add_argument('--subject', metavar='CN', type=str, required=True) - subparser.add_argument('--key-usage', metavar='KEY_USAGE', type=SupportedKeyUsage, action=ExtendAction, default=[SupportedKeyUsage.CLIENT_AUTH]) - subparser.add_argument('--alternative-name', metavar='CN', type=str, action='append') - subparser.add_argument('--output', type=Path, required=True) + subparser = subparsers.add_parser('new-client', aliases=['new', 'new-client', 'client'], formatter_class=argparse.ArgumentDefaultsHelpFormatter, description='Generate a new CSR and sign it immediately') + subparser.add_argument('--ca-cert', type=Path, default=Path('ca.crt'), help='Path to file containing CA certificate') + subparser.add_argument('--ca-key', type=Path, default=Path('ca.key'), help='Path to file containing CA private key') + subparser.add_argument('--key-type', type=KeyType.from_string, choices=list(KeyType), default=KeyType.ED25519.value, help='Type of private key to generate') + subparser.add_argument('--clock-skew', metavar='DURATION', type=duration, default=timedelta(minutes=5), help='How far to shift begin of validity into the past') + subparser.add_argument('--validity', metavar='DURATION', type=duration, default=timedelta(days=ceil(365.2425*10)), help='How far to shift end of validity into the future') + subparser.add_argument('--sops', '--no-sops', action=BooleanAction, default=True, help='Encrypt private key using SOPS') + subparser.add_argument('--subject', metavar='DN', type=str, required=True, help='Subject name') + subparser.add_argument('--key-usage', metavar='KEY_USAGE', type=SupportedKeyUsage, action=ExtendAction, default=[SupportedKeyUsage.CLIENT_AUTH], help='Allowed key usages') + subparser.add_argument('--alternative-name', metavar='CN', type=str, action='append', help='Subject alternative names') + subparser.add_argument('--output', type=Path, required=True, help='Output path') subparser.set_defaults(cmd=new_client) - subparser = subparsers.add_parser('pkcs12', aliases=['p12', 'pfx'], formatter_class=argparse.ArgumentDefaultsHelpFormatter) - subparser.add_argument('--random-password', '--no-random-password', action=BooleanAction, default=True) - subparser.add_argument('--random-password-length', type=int, default=12) - subparser.add_argument('--weak-encryption', '--no-weak-encryption', action=BooleanAction, default=False) - subparser.add_argument('--temporary-output', '--no-temporary-output', action=BooleanAction, default=True) - subparser.add_argument('--output', type=Path) - subparser.add_argument('filename', metavar='BASENAME', type=Path) + subparser = subparsers.add_parser('pkcs12', aliases=['p12', 'pfx'], formatter_class=argparse.ArgumentDefaultsHelpFormatter, description='Convert existing certificate and private key to PKCS#12 format') + subparser.add_argument('--random-password', '--no-random-password', action=BooleanAction, default=True, help='Encrypt PKCS#12 file with random passphrase -- otherwise prompt for one') + subparser.add_argument('--random-password-length', type=int, default=12, help='Number of words in random passphrase') + subparser.add_argument('--weak-encryption', '--no-weak-encryption', action=BooleanAction, default=False, help='Use weak, but more compatible, encryption') + subparser.add_argument('--temporary-output', '--no-temporary-output', action=BooleanAction, default=True, help='If output path is not given, generate output file in temporary directory') + subparser.add_argument('--output', type=Path, help='Output path') + subparser.add_argument('filename', metavar='BASENAME', type=Path, help='Input path') subparser.set_defaults(cmd=to_pkcs12) args = parser.parse_args() -- cgit v1.2.3