diff --git a/GreengrassAwareConnection.py b/GreengrassAwareConnection.py index b3e6941..3b92376 100644 --- a/GreengrassAwareConnection.py +++ b/GreengrassAwareConnection.py @@ -13,9 +13,7 @@ from AWSIoTPythonSDK.core.greengrass.discovery.providers import DiscoveryInfoProvider from AWSIoTPythonSDK.core.protocol.connection.cores import ProgressiveBackOffCore -# from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient from AWSIoTPythonSDK.exception.AWSIoTExceptions import DiscoveryInvalidRequestException -# from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTShadowClient from AWSIoTPythonSDK.MQTTLib import * @@ -33,6 +31,7 @@ def shadowDelete_callback(payload, responseStatus, token): class GreengrassAwareConnection: MAX_DISCOVERY_RETRIES = 10 GROUP_CA_PATH = "./groupCA/" + OFFLINE_QUEUE_DEPTH = 100 def __init__(self, host, rootCA, cert, key, thingName): self.logger = logging.getLogger("GreengrassAwareConnection") @@ -162,8 +161,7 @@ def connectShadow(self): self.shadowClient.configureConnectDisconnectTimeout(10) # 10 sec self.shadowClient.configureMQTTOperationTimeout(5) # 5 sec - self.shadowClient._AWSIoTMQTTClient.configureOfflinePublishQueueing(20, DROP_OLDEST) - + self.shadowClient._AWSIoTMQTTClient.configureOfflinePublishQueueing(self.OFFLINE_QUEUE_DEPTH, DROP_OLDEST) self.shadowClient.connect() @@ -180,7 +178,7 @@ def updateShadow(self, update): state = {'state': { 'reported': update }} - self.deviceShadowHandler.shadowUpdate(json.dumps(state), shadowUpdate_callback, 5) + self.deviceShadowHandler.shadowUpdate(json.dumps(state), shadowUpdate_callback, 10) diff --git a/Observer.py b/Observer.py index d7b1dc6..568a090 100644 --- a/Observer.py +++ b/Observer.py @@ -47,6 +47,26 @@ def notifyObservers(self, arg = None): pass +def mkObs(cls): + class Wrapper(cls): + def __init__(self, x=None): + print(f"init wrapper with {x}") + super().__init__(x) + # self.wrap = cls(x) + # print(f'wrap is {self.wrap}') + self.dirty = False + + def __setitem__(self, key, item): + print(f'set item: {key}:{item}') + # self.wrap.__setitem__(key, item) + super().__setitem__(key, item) + self.dirty = True + print(f'{self.dirty}') + + return Wrapper + + + # an observable chunk of raw data from the serial port, or a file, or ? class ObservableString(Observable): def __init__(self): diff --git a/pentair-control.py b/pentair-control.py index fbdee54..ff831d5 100755 --- a/pentair-control.py +++ b/pentair-control.py @@ -173,7 +173,7 @@ def update(self, objects): try: iotConnection = GreengrassAwareConnection(host, rootCA, cert, key, thingName) - # iotConnection.deleteShadow() + iotConnection.deleteShadow() except Exception as e: logger.error(f'{str(type(e))} Error') @@ -182,7 +182,7 @@ def update(self, objects): frames.addObserver(publisher) -def do_something(last_update = {}): +def do_something(): if not connection.isOpen(): connection.open() @@ -192,30 +192,47 @@ def do_something(last_update = {}): for k in ['hour', 'min', 'dow', 'day', 'month', 'year', 'adjust', 'dst']: if k in accState: accState.pop(k) - - if accState != last_update: - stateMessage = json.dumps(accState) - logger.info(stateMessage) - iotConnection.updateShadow(accState) - last_update = accState - # iotConnection.publishMessageOnTopic(stateMessage, thingName + '/t') + + # segregate immutable properties to a telemetry update + telemetry = {} + # for k in ['airTemp', 'solarTemp', 'spaTemp', 'tempUnits', 'timeout', 'waterTemp', 'pumpRPM', 'pumpWatts']: + # if k in accState: + # telemetry[k] = accState.pop(k) + + if len(accState) > 0: + try: + stateMessage = json.dumps(accState) + logger.info(stateMessage) + iotConnection.updateShadow(accState) + except Exception as e: + logger.warn("Exception updating Shadow " + e) + + if len(telemetry) > 0: + try: + telemetryMessage = json.dumps(telemetry) + logger.info(telemetryMessage) + iotConnection.publishMessageOnTopic(telemetryMessage, thingName + '/t') + except Exception as e: + logger.warn("Exception sending telemetry " + e) stats = json.dumps(protocol.getStats()) - logger.info(stats + "\n") - iotConnection.publishMessageOnTopic(stats, thingName + '/s') - + if len(stats) > 0: + try: + logger.info(stats + "\n") + iotConnection.publishMessageOnTopic(stats, thingName + '/s') + except Exception as e: + logger.warn("Exception sending stats " + e) + protocol.resetStats() - return last_update def run(): if not connection.isOpen(): connection.open() - last_update = {} while True: - time.sleep(0.9*timeout) # crude approach to timing adjustment - last_update = do_something(last_update) + time.sleep(timeout) # crude approach to timing adjustment + do_something() if __name__ == "__main__":