forked from erjosito/azcli
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mrt2azmon.py
176 lines (164 loc) · 7.08 KB
/
mrt2azmon.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
#!/usr/bin/python3
import os, sys, getopt, json, collections, requests
import datetime, hashlib, hmac, base64
import mrtparse
from azure.keyvault.secrets import SecretClient
from azure.identity import DefaultAzureCredential
# Default values
default_mrt_file = "/tmp/bird-mrtdump_bgp"
consolidated_mrt_file = "/var/log/bird.mrt"
temp_mrt_file = "/tmp/bird-mrtdump_bgp.tmp"
log_type = 'BgpAnalytics'
send_keepalives = True
# Build signature to authenticate message
# See https://docs.microsoft.com/azure/azure-monitor/logs/data-collector-api
def build_signature(customer_id, shared_key, date, content_length, method, content_type, resource):
x_headers = 'x-ms-date:' + date
string_to_hash = method + "\n" + str(content_length) + "\n" + content_type + "\n" + x_headers + "\n" + resource
bytes_to_hash = bytes(string_to_hash, encoding="utf-8")
decoded_key = base64.b64decode(shared_key)
encoded_hash = base64.b64encode(hmac.new(decoded_key, bytes_to_hash, digestmod=hashlib.sha256).digest()).decode()
authorization = "SharedKey {}:{}".format(customer_id,encoded_hash)
return authorization
# Build and send a request to the POST API
# See https://docs.microsoft.com/azure/azure-monitor/logs/data-collector-api
def post_data(customer_id, shared_key, body, log_type):
method = 'POST'
content_type = 'application/json'
resource = '/api/logs'
rfc1123date = datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
content_length = len(body)
signature = build_signature(customer_id, shared_key, rfc1123date, content_length, method, content_type, resource)
uri = 'https://' + customer_id + '.ods.opinsights.azure.com' + resource + '?api-version=2016-04-01'
# Headers
headers = {
'content-type': content_type,
'Authorization': signature,
'Log-Type': log_type,
'x-ms-date': rfc1123date
}
# Send POST request
response = requests.post(uri,data=body, headers=headers)
if (response.status_code >= 200 and response.status_code <= 299):
print('INFO: API request to Azure Monitor accepted')
else:
print(f'ERROR: Response code: {response.status_code}, Response body: {response.content}')
# Only 1-level JSON is accepted by Log Analytics
def flatten(d, parent_key=None, items=None):
if items == None:
items = {}
# print(f'Called flatten on dictionary {str(d)}, parent_key is {parent_key}, items is {str(items)}')
for key in d:
# print(f'Processing key {key}, type is {str(type(d[key]))}...')
if parent_key == None:
new_key = key
else:
new_key = parent_key + '_' + key
if type(d[key]) == collections.OrderedDict:
items = flatten (d[key], parent_key=new_key, items=items)
elif type(d[key]) == list:
if len(d[key]) > 0:
if type(d[key][0]) == collections.OrderedDict:
i=0
for element in d[key]:
element_parent_key = new_key + '_' + str(i)
items = flatten(d[key][i], parent_key=element_parent_key, items=items)
i += 1
else:
# If it is a value, such as a list of ASN in the ASpath, concat everything
if key == "value":
separator = ' '
items[new_key] = separator.join(map(str, d[key]))
# Otherwise it might be a code/translation value pair
elif len(d[key]) == 2:
items[new_key + '_code'] = d[key][0]
items[new_key] = d[key][1]
# Otherwise, concatenate too
else:
separator = ' '
items[new_key] = separator.join(map(str, d[key]))
else:
items[new_key] = d[key]
# print (json.dumps(items))
return items
# Main
def main(argv):
# Get arguments
akv_name = None
mrt_file = default_mrt_file
dry_run = False
try:
opts, args = getopt.getopt(argv,"hdv:f:",["help", "dry-run", "vault-name=", "mrt-file="])
except getopt.GetoptError:
print ('Options: -v <azure_key_vault_name> -f <mrt_file_name>')
sys.exit(2)
for opt, arg in opts:
if opt == '-h':
print ('Options: -v <azure_key_vault_name> -f <mrt_file_name>')
sys.exit()
if opt in ("-d", "--dry-run"):
print ("INFO: running in dry-run mode")
dry_run = True
elif opt in ("-v", "--vault-name"):
akv_name = arg
elif opt in ("-f", "--mrt-file"):
mrt_file = arg
# Print vault name
if (akv_name == None):
print ('Options: -v <azure_key_vault_name> -f <mrt_file_name>')
sys.exit()
else:
print ('INFO: Getting configuration from Azure Key Vault', akv_name)
# Get secrets
akv_uri = f"https://{akv_name}.vault.azure.net"
credential = DefaultAzureCredential()
client = SecretClient(vault_url=akv_uri, credential=credential)
logws_id = client.get_secret('bgp-logws-id').value
logws_key = client.get_secret('bgp-logws-key').value
# Debug: print configuration
print('INFO: Log Analytics workspace is', logws_id, 'and key is', logws_key)
# Only do something if file is actually not empty
if os.stat(mrt_file).st_size > 0:
# Move mrt_file to temp_mrt_file, and append it to the consolidated_mrt_file
os.system(f'cat {mrt_file} >{temp_mrt_file}')
os.system(f'> {mrt_file}')
os.system(f'cat {temp_mrt_file} >> {consolidated_mrt_file}')
# Analyze temp MRT file and dump JSON into a flattened string variable
body='['
entry_no = 0
keepalive_no = 0
for entry in mrtparse.Reader(temp_mrt_file):
entry_no += 1
# Do not log keepalives
add_entry = True
try:
if entry.data['bgp_message']['type'][1] == 'KEEPALIVE':
add_entry = send_keepalives
keepalive_no += 1
except:
pass
if add_entry:
bgp_entry=flatten(entry.data)
bgp_entry['raw']=str(json.dumps(entry.data)) # Add raw JSON for troubleshooting
if dry_run:
print(bgp_entry)
# First run
if body == '[':
body += json.dumps(bgp_entry)
# After the first run, append a comma and a line break to keep JSON syntax
else:
body += ',\n'
body += json.dumps(bgp_entry)
body += ']'
if dry_run:
# Print the JSON variable
# print('INFO: Dry-run mode. Data to send:')
# print(body)
print(f'INFO: {entry_no} BGP messages analyzed, out of which {keepalive_no} were keepalives')
else:
# Send message to Azure Monitor
post_data(logws_id, logws_key, body, log_type)
else:
print (f'INFO: MRT file {mrt_file} is empty, not sending any logs')
if __name__ == "__main__":
main(sys.argv[1:])