1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
#!@python@/bin/python
from INWX.Domrobot import ApiClient, ApiType
from xdg import BaseDirectory
import configparser
import sys
import dns.message
import dns.rdataclass
import dns.rdatatype
import dns.query
import dns.zone
import dns.rdtypes.ANY.DNSKEY
import argparse
import ipaddress
import re
import base64
class ApiConnection:
username = ''
password = ''
shared_secret = None
api_client = None
def __init__(self, **kwargs):
config = configparser.ConfigParser()
config.read(BaseDirectory.load_config_paths('inwx-cdnskey.ini'))
self.username = config.get('INWX', 'username')
self.password = config.get('INWX', 'password')
self.shared_secret = config.get('INWX', 'shared_secret', fallback=None)
self.api_client = ApiClient(api_url=ApiClient.API_LIVE_URL, api_type=ApiType.JSON_RPC, **kwargs)
def __enter__(self):
self.api_client.login(self.username, self.password, shared_secret = self.shared_secret)
return self
def __exit__(self, type, value, traceback):
self.logout()
def login(self, *args, **kwargs):
return ApiConnection.check_api_result(self.api_client.login(*args, **kwargs), msg='API login error')
def logout(self, *args, **kwargs):
return ApiConnection.check_api_result(self.api_client.logout(*args, **kwargs), msg='API logout error', success_code=1500)
def call_api(self, *args, **kwargs):
return ApiConnection.check_api_result(self.api_client.call_api(*args, **kwargs), msg='API error')
@staticmethod
def check_api_result(result, msg='API error', success_code=1000):
if result['code'] != success_code:
raise ApiConnection.format_exception(msg, result)
return result
@staticmethod
def format_exception(msg, result):
return Exception(f"{msg}. Code: {result['code']} Message: {result['msg']}")
def main():
parser = argparse.ArgumentParser(prog = "inwx-cdnskey")
parser.add_argument('--resolver', metavar='ADDRESS', type=ipaddress.ip_address, default='127.0.0.1')
parser.add_argument('domains', metavar='DOMAIN', nargs='+')
args = parser.parse_args()
with ApiConnection(debug_mode=False) as api_conn:
for domain in args.domains:
active_keys = set()
deleted_keys = set()
api_data = api_conn.call_api('dnssec.listkeys', {'domainName': domain})['resData']
for dat in api_data:
if dat.get('publicKey') is None:
continue
dnskey_rdat = dns.rdtypes.ANY.DNSKEY.DNSKEY(dns.rdataclass.IN, dns.rdatatype.DNSKEY, int(dat['flagId']), 3, int(dat['algorithmId']), base64.b64decode(dat['publicKey']))
if dat['status'] == 'DELETED':
deleted_keys.add(dnskey_rdat)
else:
active_keys.add(dnskey_rdat)
dns_keys = set()
qname = dns.name.from_text(domain)
q = dns.message.make_query(qname, dns.rdatatype.CDNSKEY)
r, _ = dns.query.udp_with_fallback(q, str(args.resolver))
for rdat in r.find_rrset(dns.message.ANSWER, qname, dns.rdataclass.IN, dns.rdatatype.CDNSKEY):
dnskey_rdat = dns.rdtypes.ANY.DNSKEY.DNSKEY(dns.rdataclass.IN, dns.rdatatype.DNSKEY, rdat.flags, rdat.protocol, rdat.algorithm, rdat.key)
dns_keys.add(dnskey_rdat)
print('delete: ', active_keys - dns_keys)
print('add: ', dns_keys - active_keys)
if __name__ == '__main__':
sys.exit(main())
|