Skip to content

Commit

Permalink
keep stats in shadow
Browse files Browse the repository at this point in the history
  • Loading branch information
scottrfrancis committed Aug 5, 2020
1 parent 4e2b2bb commit deafa4f
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 21 deletions.
8 changes: 3 additions & 5 deletions GreengrassAwareConnection.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@

from AWSIoTPythonSDK.core.greengrass.discovery.providers import DiscoveryInfoProvider
from AWSIoTPythonSDK.core.protocol.connection.cores import ProgressiveBackOffCore
# from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
from AWSIoTPythonSDK.exception.AWSIoTExceptions import DiscoveryInvalidRequestException
# from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTShadowClient

from AWSIoTPythonSDK.MQTTLib import *

Expand All @@ -33,6 +31,7 @@ def shadowDelete_callback(payload, responseStatus, token):
class GreengrassAwareConnection:
MAX_DISCOVERY_RETRIES = 10
GROUP_CA_PATH = "./groupCA/"
OFFLINE_QUEUE_DEPTH = 100

def __init__(self, host, rootCA, cert, key, thingName):
self.logger = logging.getLogger("GreengrassAwareConnection")
Expand Down Expand Up @@ -162,8 +161,7 @@ def connectShadow(self):
self.shadowClient.configureConnectDisconnectTimeout(10) # 10 sec
self.shadowClient.configureMQTTOperationTimeout(5) # 5 sec

self.shadowClient._AWSIoTMQTTClient.configureOfflinePublishQueueing(20, DROP_OLDEST)

self.shadowClient._AWSIoTMQTTClient.configureOfflinePublishQueueing(self.OFFLINE_QUEUE_DEPTH, DROP_OLDEST)

self.shadowClient.connect()

Expand All @@ -180,7 +178,7 @@ def updateShadow(self, update):
state = {'state': {
'reported': update
}}
self.deviceShadowHandler.shadowUpdate(json.dumps(state), shadowUpdate_callback, 5)
self.deviceShadowHandler.shadowUpdate(json.dumps(state), shadowUpdate_callback, 10)



Expand Down
20 changes: 20 additions & 0 deletions Observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,26 @@ def notifyObservers(self, arg = None):
pass


def mkObs(cls):
class Wrapper(cls):
def __init__(self, x=None):
print(f"init wrapper with {x}")
super().__init__(x)
# self.wrap = cls(x)
# print(f'wrap is {self.wrap}')
self.dirty = False

def __setitem__(self, key, item):
print(f'set item: {key}:{item}')
# self.wrap.__setitem__(key, item)
super().__setitem__(key, item)
self.dirty = True
print(f'{self.dirty}')

return Wrapper



# an observable chunk of raw data from the serial port, or a file, or ?
class ObservableString(Observable):
def __init__(self):
Expand Down
49 changes: 33 additions & 16 deletions pentair-control.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def update(self, objects):
try:
iotConnection = GreengrassAwareConnection(host, rootCA, cert, key, thingName)

# iotConnection.deleteShadow()
iotConnection.deleteShadow()
except Exception as e:
logger.error(f'{str(type(e))} Error')

Expand All @@ -182,7 +182,7 @@ def update(self, objects):
frames.addObserver(publisher)


def do_something(last_update = {}):
def do_something():
if not connection.isOpen():
connection.open()

Expand All @@ -192,30 +192,47 @@ def do_something(last_update = {}):
for k in ['hour', 'min', 'dow', 'day', 'month', 'year', 'adjust', 'dst']:
if k in accState:
accState.pop(k)

if accState != last_update:
stateMessage = json.dumps(accState)
logger.info(stateMessage)
iotConnection.updateShadow(accState)
last_update = accState
# iotConnection.publishMessageOnTopic(stateMessage, thingName + '/t')

# segregate immutable properties to a telemetry update
telemetry = {}
# for k in ['airTemp', 'solarTemp', 'spaTemp', 'tempUnits', 'timeout', 'waterTemp', 'pumpRPM', 'pumpWatts']:
# if k in accState:
# telemetry[k] = accState.pop(k)

if len(accState) > 0:
try:
stateMessage = json.dumps(accState)
logger.info(stateMessage)
iotConnection.updateShadow(accState)
except Exception as e:
logger.warn("Exception updating Shadow " + e)

if len(telemetry) > 0:
try:
telemetryMessage = json.dumps(telemetry)
logger.info(telemetryMessage)
iotConnection.publishMessageOnTopic(telemetryMessage, thingName + '/t')
except Exception as e:
logger.warn("Exception sending telemetry " + e)

stats = json.dumps(protocol.getStats())
logger.info(stats + "\n")
iotConnection.publishMessageOnTopic(stats, thingName + '/s')

if len(stats) > 0:
try:
logger.info(stats + "\n")
iotConnection.publishMessageOnTopic(stats, thingName + '/s')
except Exception as e:
logger.warn("Exception sending stats " + e)

protocol.resetStats()

return last_update

def run():
if not connection.isOpen():
connection.open()

last_update = {}
while True:
time.sleep(0.9*timeout) # crude approach to timing adjustment
last_update = do_something(last_update)
time.sleep(timeout) # crude approach to timing adjustment
do_something()

if __name__ == "__main__":

Expand Down

0 comments on commit deafa4f

Please sign in to comment.