-
Notifications
You must be signed in to change notification settings - Fork 0
/
hibike_simulator.py
133 lines (108 loc) · 4.67 KB
/
hibike_simulator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import time, random
#weight of each time interval for flipping.
#for example, there is a 5% chance the data will flip within 1 sec
cutoffs = [0.05, 0.13, 0.15, 0.27, 0.5, 0.73, 0.85, 0.95, 0.99, 1.0]
# Sensor type: (low value, high value, noise)
sensor_values = {
'0x0000': (0, 1, 0),
'0x0001': (200, 800, 100),
'0x0002': (None, None, None),
'0x0003': (-256, 256, 10),
'0x0004': (0, 1, 0),
'0x0005': (0, 1, 0),
'0x0006': (None, None, None),
'0x0007': (None, None, None),
'0x0008': (None, None, None)
}
class Hibike:
def __init__(self):
self.UIDs = [
'0x000000FFFFFFFFFFFFFFFF', '0x000100FFFFFFFFFFFFFFFF',
'0x000200FFFFFFFFFFFFFFFF', '0x000300FFFFFFFFFFFFFFFF',
'0x000400FFFFFFFFFFFFFFFF', '0x000500FFFFFFFFFFFFFFFF',
'0x000600FFFFFFFFFFFFFFFF', '0x000700FFFFFFFFFFFFFFFF',
'0x000800FFFFFFFFFFFFFFFF'
]
#format Device Type (16) + Year (8) + ones (64)
self.subscribedTo = {}
self.sensors = []
def getEnumeratedDevices(self):
""" Returns a list of tuples of UIDs and device types. """
enum_devices = []
for UID in self.UIDs:
enum_devices.append((UID, UID[:6])) # (UID - in hex, Device type - in hex)
return enum_devices
def subToDevices(self, deviceList):
""" deviceList - List of tuples of UIDs and delays. Creates a dictionary
storing UID, delay, time created (ms), previous data, whether data is
low and flip times (the next time the data should flip). Flip is when
data changes low to high or high to low.
"""
for UID, delay in deviceList:
if UID in self.UIDs and UID not in self.subscribedTo:
last_time = time.time()*1000
self.subscribedTo[UID] = {'delay': delay, 'time': last_time}
self.subscribedTo[UID]['data'] = self.__getRandomData(UID, True)
self.subscribedTo[UID]['is_low'] = True
self.subscribedTo[UID]['flip_time'] = self.__calculateFlipTime(UID)
elif UID in self.UIDs and UID in self.subscribedTo:
self.subscribedTo[UID]['delay'] = delay
return 0
def getData(self, UID, param):
""" Extracts all data for specific UID. Checks whether to update data,
flip data, or return previous data, and returns correct appropriate
data.
"""
#TODO: param is currently unused
delay = self.subscribedTo[UID]['delay']
last_time = self.subscribedTo[UID]['time']
flip_time = self.subscribedTo[UID]['flip_time']
curr_time = time.time()*1000
should_flip = curr_time - delay >= flip_time
should_update = curr_time - delay >= last_time
if should_flip and should_update:
self.subscribedTo[UID]['time'] = curr_time
self.subscribedTo[UID]['data'] = self.__getRandomData(UID, self.subscribedTo[UID]['is_low'])
self.subscribedTo[UID]['flip_time'] = self.__calculateFlipTime(UID)
self.subscribedTo[UID]['is_low'] = not self.subscribedTo[UID]['is_low']
elif should_update:
self.subscribedTo[UID]['time'] = curr_time
self.subscribedTo[UID]['data'] = self.__getRandomData(UID, not self.subscribedTo[UID]['is_low'])
return self.subscribedTo[UID]['data'] #returns sensor data
def __calculateFlipTime(self, UID):
""" Determines time of flip. """
rand, last_time = random.random(), self.subscribedTo[UID]['time']
noise = random.random()
for i in range(len(cutoffs)):
if rand < cutoffs[i]:
return (time.time() + noise + i)*1000
def __getRandomData(self, UID, is_low):
""" Finds device_type of UID and returns corresponding flipped data."""
device_type = UID[:6]
low, high, noise = sensor_values[device_type]
if low is None or high is None or noise is None:
return 0 # TODO: what to do with these device types?
if is_low:
return high + (random.random() - .5) * noise
else:
return low + (random.random() - .5) * noise
############
## MOTORS ##
############
def readValue(self, UID, param):
#param values: 0 - delay, 1 - last_time, 2 - data
try:
return self.subscribedTo[UID][param]
except:
return 1
def writeValue(self, UID, param, value):
if param >= len(self.subscribedTo[UID]):
return 1
self.subscribedTo[UID][param] = value
return 0
#############
## TESTING ##
#############
if __name__ == "__main__":
hi = Hibike()
hi.subToDevices([('0x000100FFFFFFFFFFFFFFFF', 1)])