-
Notifications
You must be signed in to change notification settings - Fork 199
/
report_statuses.py
157 lines (136 loc) · 6.8 KB
/
report_statuses.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#!/usr/bin/python3
"""
=== PREREQUISITES ===
Run in Python 3.6+
Install Meraki Python library: pip[3] install --upgrade meraki
=== DESCRIPTION ===
This script reports the org-wide appliances' uplink and devices's statuses.
=== USAGE ===
python[3] report_statuses.py -k <api_key> -o <org_id>
Use "-o /all" to iterate through all organizations
"""
import argparse
import csv
import meraki
import os
def parse_arguments(parser):
parser.add_argument('-k', '--key', help='Dashboard API key. If omitted, will use environment variable MERAKI_DASHBOARD_API_KEY')
parser.add_argument('-o', '--org', help='Organization ID. Use "-o /all" to iterate through all organizations')
parser.exit
args = parser.parse_args()
return args.key, args.org
def getApiKey(argument):
if not argument is None:
return str(argument)
return os.environ.get("MERAKI_DASHBOARD_API_KEY", None)
def main():
# Check if all required parameters have been specified
parser = argparse.ArgumentParser()
tmp_api_key, arg_org_id = parse_arguments(parser)
api_key = getApiKey(tmp_api_key)
if not(api_key and arg_org_id):
parser.exit(2, parser.print_help())
# Make API calls to retrieve data
dashboard = meraki.DashboardAPI(api_key)
org_id_list = []
flag_multi_org = False
if arg_org_id == '/all':
flag_multi_org = True
orgs_result = dashboard.organizations.getOrganizations()
for org in orgs_result:
org_id_list.append(org['id'])
else:
org_id_list.append(arg_org_id)
for org_id in org_id_list:
try:
appliance_statuses = dashboard.appliance.getOrganizationApplianceUplinkStatuses(org_id, total_pages='all')
device_statuses = dashboard.organizations.getOrganizationDevicesStatuses(org_id, total_pages='all')
networks = dashboard.organizations.getOrganizationNetworks(org_id, total_pages='all')
except Exception as e:
print(str(e))
continue
devices_by_serial = {d['serial']: d['name'] for d in device_statuses}
networks_by_id = {n['id']: n['name'] for n in networks}
# Output appliance statuses file
output_file = 'appliance_statuses'
if flag_multi_org:
output_file += '_' + org_id
output_file += '.csv'
field_names = ['name', 'serial', 'model', 'network', 'networkId', 'lastReportedAt',
'wan1_status', 'wan1_ip', 'wan1_gateway', 'wan1_publicIp', 'wan1_primaryDns', 'wan1_secondaryDns',
'wan1_ipAssignedBy', 'wan2_status', 'wan2_ip', 'wan2_gateway', 'wan2_publicIp', 'wan2_primaryDns',
'wan2_secondaryDns', 'wan2_ipAssignedBy', 'cellular_status', 'cellular_ip', 'cellular_provider', 'highAvailability',
'cellular_publicIp', 'cellular_model', 'cellular_signalStat', 'cellular_connectionType', 'cellular_apn']
with open(output_file, mode='w', newline='\n') as fp:
csv_writer = csv.DictWriter(fp, field_names, delimiter=',', quotechar='"', quoting=csv.QUOTE_ALL)
csv_writer.writeheader()
for status in appliance_statuses:
status.update(
{
'name': devices_by_serial[status['serial']],
'network': networks_by_id[status['networkId']]
}
)
# Flatten objects/dictionaries, without requiring a third-party library
interfaces = [uplink['interface'] for uplink in status['uplinks']]
if 'wan1' in interfaces:
wan1 = status['uplinks'][interfaces.index('wan1')]
status.update(
{
'wan1_status': wan1['status'],
'wan1_ip': wan1['ip'],
'wan1_gateway': wan1['gateway'],
'wan1_publicIp': wan1['publicIp'],
'wan1_primaryDns': wan1['primaryDns'],
'wan1_secondaryDns': wan1['secondaryDns'],
'wan1_ipAssignedBy': wan1['ipAssignedBy']
}
)
if 'wan2' in interfaces:
wan2 = status['uplinks'][interfaces.index('wan2')]
status.update(
{
'wan2_status': wan2['status'],
'wan2_ip': wan2['ip'],
'wan2_gateway': wan2['gateway'],
'wan2_publicIp': wan2['publicIp'],
'wan2_primaryDns': wan2['primaryDns'],
'wan2_secondaryDns': wan2['secondaryDns'],
'wan2_ipAssignedBy': wan2['ipAssignedBy']
}
)
if 'cellular' in interfaces:
cellular = status['uplinks'][interfaces.index('cellular')]
status.update(
{
'cellular_status': cellular['status'],
'cellular_ip': cellular['ip'],
'cellular_provider': cellular['provider'],
'cellular_publicIp': cellular['publicIp'],
'cellular_model': cellular['model'],
'cellular_signalStat': cellular['signalStat'],
'cellular_connectionType': cellular['connectionType'],
'cellular_apn': cellular['apn']
}
)
status.pop('uplinks')
#print(status)
csv_writer.writerow(status)
# Output device statuses file
output_file = 'device_statuses'
if flag_multi_org:
output_file += '_' + org_id
output_file += '.csv'
field_names = ['name', 'serial', 'model', 'network', 'networkId', 'mac', 'publicIp', 'status', 'lastReportedAt', 'lanIp', 'gateway', 'ipType',
'primaryDns', 'secondaryDns', 'productType', 'tags', 'usingCellularFailover', 'wan1Ip', 'wan1Gateway', 'wan1IpType',
'wan1PrimaryDns', 'wan1SecondaryDns', 'wan2Ip', 'wan2Gateway', 'wan2IpType', 'wan2PrimaryDns', 'wan2SecondaryDns', 'components',
'configurationUpdatedAt']
with open(output_file, mode='w', newline='\n') as fp:
csv_writer = csv.DictWriter(fp, field_names, delimiter=',', quotechar='"', quoting=csv.QUOTE_ALL)
csv_writer.writeheader()
for status in device_statuses:
status.update({'network': networks_by_id[status['networkId']]})
print(status)
csv_writer.writerow(status)
if __name__ == '__main__':
main()