forked from IoTCrawler/SemanticEnrichment
-
Notifications
You must be signed in to change notification settings - Fork 0
/
datasource_manager.py
124 lines (99 loc) · 4.64 KB
/
datasource_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import logging
import json
from ngsi_ld import broker_interface
from ngsi_ld import ngsi_parser
from ngsi_ld.ngsi_parser import NGSI_Type
from other.metadata_matcher_filebased import MetadataMatcher
logger = logging.getLogger('semanticenrichment')
class DatasourceManager:
def __init__(self):
self.subscriptions = {}
self.streams = {}
self.sensors = {}
self.observations = {}
self.observableproperties = {}
self.matcher = MetadataMatcher()
def update(self, ngsi_type, ngsi_id, ngsi_data):
# check type
if ngsi_type is NGSI_Type.IoTStream:
# stream comes in if it is a new stream or stream has been updated
# Existing Stream: we have to check what is different... => we only have to check if there are new relations, new metadata will just be updated
# New Stream: GET related ObservableProperty, SUB to ObservableProperty and StreamObservation
# get sensor id first
sensorId = ngsi_parser.get_stream_generatedBy(ngsi_data)
#check if iotstream contains sensorId, if yes check sensor details
if sensorId:
if ngsi_id in self.streams: # existing stream
# check if sensorId changed
oldstream = self.streams[ngsi_id]
oldSensorId = ngsi_parser.get_stream_generatedBy(oldstream)
if sensorId != oldSensorId: # observable property has changed
# delete old sensor from dict
self.sensors.pop(oldSensorId, None)
# reqeuest new sensor (in new tread to avoid blocking) and subscribe to obsproperties and streamobservations
broker_interface.handleNewSensor(sensorId, self.sensors, self.observableproperties, self.subscriptions)
else:
logger.error("SensorId for IotStream missing:" + json.dumps(ngsi_data))
# finally just update the stream, metrics will request new metadata from store automatically
self.streams[ngsi_id] = ngsi_data
elif ngsi_type is NGSI_Type.StreamObservation:
self.observations[ngsi_id] = ngsi_data
elif ngsi_type is NGSI_Type.Sensor:
self.sensors[ngsi_id] = ngsi_data
elif ngsi_type is NGSI_Type.ObservableProperty:
self.observableproperties[ngsi_id] = ngsi_data
def get_sensor(self, sensor_id):
try:
return self.sensors[sensor_id]
except KeyError:
return None
def get_observation(self, observation_id):
try:
return self.observations[observation_id]
except KeyError:
return None
def get_observableproperty(self, observableproperty_id):
try:
return self.observableproperties[observableproperty_id]
except KeyError:
return None
def link_qoi(self, stream_id, qoi_id):
try:
stream = self.streams[stream_id]
ngsi_parser.update_stream_hasQuality(stream, qoi_id)
except KeyError:
pass
def initialise_subscriptions(self):
broker_interface.initialise_subscriptions(self.subscriptions)
def get_active_subscriptions(self):
broker_interface.get_active_subscriptions(self.subscriptions)
def add_subscription(self, subscription):
broker_interface.add_subscription(subscription, self.subscriptions)
def del_subscription(self, subid):
subscription = self.subscriptions.pop(subid)
broker_interface.del_subscription(subscription)
def del_all_subscriptions(self):
for subid in list(self.subscriptions.keys()):
self.del_subscription(subid)
def get_subscriptions(self):
return self.subscriptions
def get_stream(self, stream_id):
if stream_id in self.streams:
return self.streams[stream_id]
return None
def getStoredMetadata(self, sensor, field):
#first get observed property, TODO move this to initilisation?
# sensor = self.get_sensor()
observablePropertyId = ngsi_parser.get_sensor_observes(sensor)
if observablePropertyId:
observableProperty = self.get_observableproperty(observablePropertyId)
if observableProperty:
observedType = ngsi_parser.get_obsproperty_label(observableProperty)
if observedType:
metadata = self.matcher.match(observedType)
if metadata:
try:
return metadata['metadata'][field]
except KeyError:
return None
return None