summaryrefslogtreecommitdiff
path: root/overlays/inwx-cdnskey/inwx-cdnskey.py
blob: 0add24a9cc859fccc2e8a5c44d95ebeb56a7a0a3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#!@python@/bin/python

from INWX.Domrobot import ApiClient, ApiType
from xdg import BaseDirectory
import configparser
import sys
import dns.message
import dns.rdataclass
import dns.rdatatype
import dns.query
import dns.zone
import dns.rdtypes.ANY.DNSKEY
import argparse
import ipaddress
import re
import base64


class ApiConnection:
    username = ''
    password = ''
    shared_secret = None
    api_client = None
    
    def __init__(self, **kwargs):
        config = configparser.ConfigParser()
        config.read(BaseDirectory.load_config_paths('inwx-cdnskey.ini'))
        self.username = config.get('INWX', 'username')
        self.password = config.get('INWX', 'password')
        self.shared_secret = config.get('INWX', 'shared_secret', fallback=None)
        self.api_client = ApiClient(api_url=ApiClient.API_LIVE_URL, api_type=ApiType.JSON_RPC, **kwargs)

    def __enter__(self):
        self.api_client.login(self.username, self.password, shared_secret = self.shared_secret)
        return self

    def __exit__(self, type, value, traceback):
        self.logout()

    def login(self, *args, **kwargs):
        return ApiConnection.check_api_result(self.api_client.login(*args, **kwargs), msg='API login error')
    
    def logout(self, *args, **kwargs):
        return ApiConnection.check_api_result(self.api_client.logout(*args, **kwargs), msg='API logout error', success_code=1500)

    def call_api(self, *args, **kwargs):
        return ApiConnection.check_api_result(self.api_client.call_api(*args, **kwargs), msg='API error')

    @staticmethod
    def check_api_result(result, msg='API error', success_code=1000):
        if result['code'] != success_code:
            raise ApiConnection.format_exception(msg, result)
        return result

    @staticmethod
    def format_exception(msg, result):
        return Exception(f"{msg}. Code: {result['code']}  Message: {result['msg']}")
        


def main():
    parser = argparse.ArgumentParser(prog = "inwx-cdnskey")
    parser.add_argument('--resolver', metavar='ADDRESS', type=ipaddress.ip_address, default='127.0.0.1')
    parser.add_argument('domains', metavar='DOMAIN', nargs='+')
    args = parser.parse_args()
    
    with ApiConnection(debug_mode=False) as api_conn:
        for domain in args.domains:
            active_keys = set()
            deleted_keys = set()
            api_data = api_conn.call_api('dnssec.listkeys', {'domainName': domain})['resData']
            for dat in api_data:
                if dat.get('publicKey') is None:
                    continue

                dnskey_rdat = dns.rdtypes.ANY.DNSKEY.DNSKEY(dns.rdataclass.IN, dns.rdatatype.DNSKEY, int(dat['flagId']), 3, int(dat['algorithmId']), base64.b64decode(dat['publicKey']))
                if dat['status'] == 'DELETED':
                    deleted_keys.add(dnskey_rdat)
                else:
                    active_keys.add(dnskey_rdat)

            dns_keys = set()
            qname = dns.name.from_text(domain)
            q = dns.message.make_query(qname, dns.rdatatype.CDNSKEY)
            r, _ = dns.query.udp_with_fallback(q, str(args.resolver))
            for rdat in r.find_rrset(dns.message.ANSWER, qname, dns.rdataclass.IN, dns.rdatatype.CDNSKEY):
                dnskey_rdat = dns.rdtypes.ANY.DNSKEY.DNSKEY(dns.rdataclass.IN, dns.rdatatype.DNSKEY, rdat.flags, rdat.protocol, rdat.algorithm, rdat.key)
                dns_keys.add(dnskey_rdat)

            print('delete: ', active_keys - dns_keys)
            print('add: ', dns_keys - active_keys)


if __name__ == '__main__':
    sys.exit(main())