-
Notifications
You must be signed in to change notification settings - Fork 1
/
observatory_configuration.py
116 lines (95 loc) · 3.29 KB
/
observatory_configuration.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
"""Class that governs the collection of various configurations for scan data
allow the user to specify what type of environment and settings for more
fine grained insights."""
import aws
import random
import uuid
from boto3.dynamodb.conditions import Key
from hashids import Hashids
class ScanConfig(object):
def __init__(self, user):
self.dynamo = aws.connect_dynamo(
table_name='observatory_configurations'
)
self.user = user
def create_configuration(self, configuration_information=None):
"""Return a dictionary of the configuration for the scan."""
config_id = uuid.uuid4().hex
api_key = self.__generate_api_key()
if configuration_information['notes'] == '':
configuration_information['notes'] = "No additional infomation."
response = self.dynamo.put_item(
Item={
'uuid': config_id,
'user_id': self.user.user_id,
'api_key': api_key,
'config': configuration_information,
'disabled': False
}
)
return response
def __destroy(self, config_id, user_id):
response = self.dynamo.delete_item(
Key={
'uuid': config_id,
'user_id': user_id
}
)
return response
def update_configuration(self, config):
"""Overwrite a configuration in place."""
print(config['uuid'])
self.__destroy(config['uuid'], config['user_id'])
response = self.dynamo.put_item(
Item={
'uuid': config['uuid'],
'user_id': config['user_id'],
'api_key': config['api_key'],
'config': config['config'],
'disabled': False
}
)
return response
def rotate_api_key(self, config_id):
"""Rotate the api key for a given configuration."""
response = self.dynamo.update_item(
Key={
'uuid': config_id
},
AttributeUpdates={
'api_key': self.__generate_api_key()
}
)
return response
def find_config_by_id(self, config_id):
"""Locate a configuration by the uuid."""
fe = Key('uuid').eq(config_id)
response = self.dynamo.scan(
FilterExpression=fe,
Select='ALL_ATTRIBUTES'
)
configs = []
for i in response['Items']:
configs.append(i)
return configs[0]
def find_configs_for_user(self):
"""Return all scored configurations for a user."""
fe = Key('user_id').eq(self.user.user_id)
response = self.dynamo.scan(
FilterExpression=fe,
Select='ALL_ATTRIBUTES'
)
configs = []
for i in response['Items']:
configs.append(i)
return configs
def api_key(self, config_id):
"""Return the api key for the uuid of the configuration."""
config = self.find_config_by_id(config_id)
return config[0]['api_key']
def __generate_api_key(self):
"""Return a unique api key."""
random_num = random.randint(0, 99999999)
hashids = Hashids(min_length=16)
hashid = hashids.encode(random_num)
return hashid