-
Notifications
You must be signed in to change notification settings - Fork 0
/
watchlist.py
95 lines (76 loc) · 2.27 KB
/
watchlist.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
"""Watch changes in DNX IXFR log"""
import argparse
import gzip
import json
import logging
from collections import defaultdict
def main():
"""Main function"""
parser = argparse.ArgumentParser(description="Report changes in IXFR log")
parser.add_argument(
"--log",
dest="log",
metavar="filename",
help="IXFR log file",
required=True,
)
parser.add_argument(
"--watch",
dest="watchlist",
metavar="filename",
help="Watch list file",
default="watchlist.json",
required=False,
)
parser.add_argument(
"--report",
dest="report",
metavar="filename",
help="Report result file",
default="report.json",
required=False,
)
parser.add_argument(
"--debug", dest="debug", action="store_true", help="Enable debugging"
)
args = parser.parse_args()
if args.debug:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
with open(args.watchlist, "rt") as input_file:
watchlist = json.loads(input_file.read())
domains_per_recipient = defaultdict(list)
for domain, recipients in watchlist.items():
for r in recipients:
domains_per_recipient[r].append(domain)
data_del = defaultdict(set)
data_add = defaultdict(set)
modified_names = set()
if args.log.endswith(".gz"):
input_file = gzip.open(args.log, "rt")
else:
input_file = open(args.log, "rt")
for r in input_file.readlines():
change = json.loads(r)
name = change["name"]
if name not in watchlist:
continue
modified_names.add(name)
v = change["text"]
if change["deleted"]:
data_del[name].add(v)
data_add[name].discard(v)
else:
data_add[name].add(v)
data_del[name].discard(v)
input_file.close()
for recipient, domains in domains_per_recipient.items():
print(f"Report for {recipient}:")
for d in domains:
for rr in sorted(data_del[d]):
print(f" Deleted: {rr}")
for rr in sorted(data_add[d]):
print(f" Added: {rr}")
if __name__ == "__main__":
main()