#!@python@/bin/python from INWX.Domrobot import ApiClient, ApiType from xdg import BaseDirectory import configparser import sys import dns.message import dns.rdataclass import dns.rdatatype import dns.query import dns.zone import dns.rdtypes.ANY.DNSKEY import argparse import ipaddress import re import base64 class ApiConnection: username = '' password = '' shared_secret = None api_client = None def __init__(self, **kwargs): config = configparser.ConfigParser() config.read(BaseDirectory.load_config_paths('inwx-cdnskey.ini')) self.username = config.get('INWX', 'username') self.password = config.get('INWX', 'password') self.shared_secret = config.get('INWX', 'shared_secret', fallback=None) self.api_client = ApiClient(api_url=ApiClient.API_LIVE_URL, api_type=ApiType.JSON_RPC, **kwargs) def __enter__(self): self.api_client.login(self.username, self.password, shared_secret = self.shared_secret) return self def __exit__(self, type, value, traceback): self.logout() def login(self, *args, **kwargs): return ApiConnection.check_api_result(self.api_client.login(*args, **kwargs), msg='API login error') def logout(self, *args, **kwargs): return ApiConnection.check_api_result(self.api_client.logout(*args, **kwargs), msg='API logout error', success_code=1500) def call_api(self, *args, **kwargs): return ApiConnection.check_api_result(self.api_client.call_api(*args, **kwargs), msg='API error') @staticmethod def check_api_result(result, msg='API error', success_code=1000): if result['code'] != success_code: raise ApiConnection.format_exception(msg, result) return result @staticmethod def format_exception(msg, result): return Exception(f"{msg}. Code: {result['code']} Message: {result['msg']}") def main(): parser = argparse.ArgumentParser(prog = "inwx-cdnskey") parser.add_argument('--resolver', metavar='ADDRESS', type=ipaddress.ip_address, default='127.0.0.1') parser.add_argument('domains', metavar='DOMAIN', nargs='+') args = parser.parse_args() with ApiConnection(debug_mode=False) as api_conn: for domain in args.domains: active_keys = set() deleted_keys = set() api_data = api_conn.call_api('dnssec.listkeys', {'domainName': domain})['resData'] for dat in api_data: if dat.get('publicKey') is None: continue dnskey_rdat = dns.rdtypes.ANY.DNSKEY.DNSKEY(dns.rdataclass.IN, dns.rdatatype.DNSKEY, int(dat['flagId']), 3, int(dat['algorithmId']), base64.b64decode(dat['publicKey'])) if dat['status'] == 'DELETED': deleted_keys.add(dnskey_rdat) else: active_keys.add(dnskey_rdat) dns_keys = set() qname = dns.name.from_text(domain) q = dns.message.make_query(qname, dns.rdatatype.CDNSKEY) r, _ = dns.query.udp_with_fallback(q, str(args.resolver)) for rdat in r.find_rrset(dns.message.ANSWER, qname, dns.rdataclass.IN, dns.rdatatype.CDNSKEY): dnskey_rdat = dns.rdtypes.ANY.DNSKEY.DNSKEY(dns.rdataclass.IN, dns.rdatatype.DNSKEY, rdat.flags, rdat.protocol, rdat.algorithm, rdat.key) dns_keys.add(dnskey_rdat) print('delete: ', active_keys - dns_keys) print('add: ', dns_keys - active_keys) if __name__ == '__main__': sys.exit(main())