From c471ec95f8ccf0363cfae09c4479bf8e0907d065 Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Sun, 27 Feb 2022 16:48:41 +0100 Subject: inwx-cdnskey --- overlays/inwx-cdnskey/inwx-cdnskey.py | 95 +++++++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) create mode 100644 overlays/inwx-cdnskey/inwx-cdnskey.py (limited to 'overlays/inwx-cdnskey/inwx-cdnskey.py') diff --git a/overlays/inwx-cdnskey/inwx-cdnskey.py b/overlays/inwx-cdnskey/inwx-cdnskey.py new file mode 100644 index 00000000..0add24a9 --- /dev/null +++ b/overlays/inwx-cdnskey/inwx-cdnskey.py @@ -0,0 +1,95 @@ +#!@python@/bin/python + +from INWX.Domrobot import ApiClient, ApiType +from xdg import BaseDirectory +import configparser +import sys +import dns.message +import dns.rdataclass +import dns.rdatatype +import dns.query +import dns.zone +import dns.rdtypes.ANY.DNSKEY +import argparse +import ipaddress +import re +import base64 + + +class ApiConnection: + username = '' + password = '' + shared_secret = None + api_client = None + + def __init__(self, **kwargs): + config = configparser.ConfigParser() + config.read(BaseDirectory.load_config_paths('inwx-cdnskey.ini')) + self.username = config.get('INWX', 'username') + self.password = config.get('INWX', 'password') + self.shared_secret = config.get('INWX', 'shared_secret', fallback=None) + self.api_client = ApiClient(api_url=ApiClient.API_LIVE_URL, api_type=ApiType.JSON_RPC, **kwargs) + + def __enter__(self): + self.api_client.login(self.username, self.password, shared_secret = self.shared_secret) + return self + + def __exit__(self, type, value, traceback): + self.logout() + + def login(self, *args, **kwargs): + return ApiConnection.check_api_result(self.api_client.login(*args, **kwargs), msg='API login error') + + def logout(self, *args, **kwargs): + return ApiConnection.check_api_result(self.api_client.logout(*args, **kwargs), msg='API logout error', success_code=1500) + + def call_api(self, *args, **kwargs): + return ApiConnection.check_api_result(self.api_client.call_api(*args, **kwargs), msg='API error') + + @staticmethod + def check_api_result(result, msg='API error', success_code=1000): + if result['code'] != success_code: + raise ApiConnection.format_exception(msg, result) + return result + + @staticmethod + def format_exception(msg, result): + return Exception(f"{msg}. Code: {result['code']} Message: {result['msg']}") + + + +def main(): + parser = argparse.ArgumentParser(prog = "inwx-cdnskey") + parser.add_argument('--resolver', metavar='ADDRESS', type=ipaddress.ip_address, default='127.0.0.1') + parser.add_argument('domains', metavar='DOMAIN', nargs='+') + args = parser.parse_args() + + with ApiConnection(debug_mode=False) as api_conn: + for domain in args.domains: + active_keys = set() + deleted_keys = set() + api_data = api_conn.call_api('dnssec.listkeys', {'domainName': domain})['resData'] + for dat in api_data: + if dat.get('publicKey') is None: + continue + + dnskey_rdat = dns.rdtypes.ANY.DNSKEY.DNSKEY(dns.rdataclass.IN, dns.rdatatype.DNSKEY, int(dat['flagId']), 3, int(dat['algorithmId']), base64.b64decode(dat['publicKey'])) + if dat['status'] == 'DELETED': + deleted_keys.add(dnskey_rdat) + else: + active_keys.add(dnskey_rdat) + + dns_keys = set() + qname = dns.name.from_text(domain) + q = dns.message.make_query(qname, dns.rdatatype.CDNSKEY) + r, _ = dns.query.udp_with_fallback(q, str(args.resolver)) + for rdat in r.find_rrset(dns.message.ANSWER, qname, dns.rdataclass.IN, dns.rdatatype.CDNSKEY): + dnskey_rdat = dns.rdtypes.ANY.DNSKEY.DNSKEY(dns.rdataclass.IN, dns.rdatatype.DNSKEY, rdat.flags, rdat.protocol, rdat.algorithm, rdat.key) + dns_keys.add(dnskey_rdat) + + print('delete: ', active_keys - dns_keys) + print('add: ', dns_keys - active_keys) + + +if __name__ == '__main__': + sys.exit(main()) -- cgit v1.2.3