diff --git a/GreengrassAwareConnection.py b/GreengrassAwareConnection.py index b051c6d..cc3eb72 100644 --- a/GreengrassAwareConnection.py +++ b/GreengrassAwareConnection.py @@ -150,12 +150,8 @@ def isShadowConnected(self): return self.shadowConnected def memberDeltaHandler(self, payload, responseStatus, token): - print("\nReceived a Delta Message") - payloadDict = json.loads(payload) state = payloadDict['state'] - deltaMessage = json.dumps(deltaMessage) - print(deltaMessage + "\n") if self.stateChangeQueue != None: self.stateChangeQueue.append(state) diff --git a/Observer.py b/Observer.py index 568a090..59b2b60 100644 --- a/Observer.py +++ b/Observer.py @@ -100,6 +100,25 @@ def append(self, newElements): self.notifyObservers(self.elements) self.clear() +# ObservableArray is 'flat' in that it will be extended with the new elements. +# sometimes you want to append a deep object to the array... +# +# use ObservableDeepArray for that +class ObservableDeepArray(Observable): + def __init__(self): + super().__init__() + self.clear() + + def clear(self): + self.elements = [] + + def append(self, newItem): + if len(newItem) > 0: + self.elements.append(newItem) + + self.notifyObservers(self.elements) + self.clear() + # an Observable wrapped dict class ObservableDict(Observable): def __init__(self): diff --git a/PentairProtocol.py b/PentairProtocol.py index 563d324..cf18108 100644 --- a/PentairProtocol.py +++ b/PentairProtocol.py @@ -226,8 +226,27 @@ def __init__(self, body): # A Command # class Command: - def __init__(self, dst, cmd, type=0x24, src=0x21): - pass + def __init__(self, dst, cmd, commandType=0x24, src=0x21): + self.type = commandType + self.destination = dst + self.source = src + self.command = cmd + + self.payload = b'' + + def getParsedCommand(self): + parsed = {} + parsed['type'] = self.type + parsed['destination'] = self.destination + parsed['source'] = self.source + parsed['command'] = self.command + + parsed['payloadLength'] = len(self.payload) + parsed['payload'] = self.payload + + # parsed['state'] = self.parsePayloadFromFrame(parsed) + + return parsed # 0x86 - Turn Circuits On and Off # @@ -237,24 +256,29 @@ def __init__(self, dst, cmd, type=0x24, src=0x21): # will be followed with a ACK from the dest # class CircuitChangeCommand(Command): - SPA = 1 - AUX1 = 2 - AUX2 = 3 - AUX3 = 4 - FEATURE1 = 5 - POOL = 6 - FEATURE2 = 7 - FEATURE3 = 8 - FEATURE4 = 9 - HEAT_BOOST = 0x85 + CKT_SELECTORS = { + 'spa': 0x01, + 'aux1': 0x02, + 'aux2': 0x03, + 'aux3': 0x04, + 'feature1': 0x05, + 'pool': 0x06, + 'feature2': 0x07, + 'feature3': 0x08, + 'feature4': 0x09, + 'HEAT_BOOST': 0x85 + } def __init__(self, ckt, onOff, dst=0x10): super().__init__(dst, 0x86) - state = b'\x01' if onOff else b'\x00' + try: + circuit = self.CKT_SELECTORS[ckt].to_bytes(1, 'big') + state = b'\x01' if onOff else b'\x00' - self.payload = ckt.to_bytes(1, byteorder='big') + state - pass + self.payload = circuit + state + except Exception as e: + pass # 0x88 - Change heating parameters -- set points, enable # @@ -296,6 +320,30 @@ def __init__(self): 0x08: TempPayload } } + self.commandPayloads = { + 'spa': CircuitChangeCommand, + 'aux1': CircuitChangeCommand, + 'aux2': CircuitChangeCommand, + 'aux3': CircuitChangeCommand, + 'pool': CircuitChangeCommand, + 'feature1': CircuitChangeCommand, + 'feature2': CircuitChangeCommand, + 'feature3': CircuitChangeCommand, + 'feature4': CircuitChangeCommand + } + # 'CircuitChanges': { + # 'command': CircuitChangeCommand, + # 'selectors': [ "spa", "aux1", "aux2", "aux3", "pool", "feature1", "feature2", "feature3", "feature4" ] + # }, + # 'HeatChanges': { + # 'command': HeatChangeCommand, + # 'selectors': [ "poolSetTemp", "spaSetTemp", "poolHeaterMode", "spaHeaterMode" ] + # } + + # "airTemp", "solarTemp", "runMode","tempUnits", "freezeProtect", "timeout", "heater", "delay" + # "pumpStarted","pumpMode", "pumpState", "pumpWatts", "pumpRPM", "waterTemp", "spaTemp" + + self.resetStats() def getStats(self): @@ -306,6 +354,13 @@ def resetStats(self): 'badFrames': 0, 'unprocessedPayloads': 0 } + # computes checksum for a frame + # if using an incoming frame, strip off the checksum before calling -- e..g f[:-2] + # otherwise, frame should include the START_BYTE and otherwise be stripped from IDLE_BYTES + def checkSum(self, frame): + cs = reduce((lambda x, sum: sum + x), frame) + return cs + # # validFrame # @@ -324,7 +379,8 @@ def resetStats(self): def validFrame(self, f): try: f = f.rstrip(self.IDLE_BYTE) - valid = f[0] == self.START_BYTE and ((f[-2] << 8) + f[-1]) & 0xFFFF == reduce((lambda x, sum: sum + x), f[:-2]) + # valid = f[0] == self.START_BYTE and ((f[-2] << 8) + f[-1]) & 0xFFFF == reduce((lambda x, sum: sum + x), f[:-2]) + valid = f[0] == self.START_BYTE and ((f[-2] << 8) + f[-1]) & 0xFFFF == self.checkSum(f[:-2]) # increment badFrame counter for checksum errors ONLY... not for 'empty' frames self.stats['badFrames'] += not valid @@ -386,3 +442,43 @@ def parseFrame(self, f): return parsed + # commands are dicts with fields separated out -- just as if parsed + def createCommand(self, desiredState): + cmd = {} + print("creating Command for " + json.dumps(desiredState)) + + try: + k = list(desiredState.keys())[0] + command = self.commandPayloads[k](k, desiredState[k]) + + cmd = command.getParsedCommand() + + except Exception as e: + pass + + + return cmd + + def createFrame(self, command): + frame = self.START_BYTE + print("creating frame from " ) #+ json.dumps(command)) + + try: + frame = b''.join(list(map( lambda x: x.to_bytes(1, 'big'),[ + self.START_BYTE, + command['type'], + command['destination'], + command['source'], + command['command'], + command['payloadLength'] + ] ))) + command['payload'] # payload is already serialized + + check = self.checkSum(frame) + + frame = self.RECORD_SEPARATOR + frame + check.to_bytes(2, 'big') + self.RECORD_SEPARATOR + + except Exception as e: + pass + + return frame + diff --git a/README.md b/README.md index 8d7d5a8..81cc50c 100644 --- a/README.md +++ b/README.md @@ -133,9 +133,10 @@ Some common TYPEs and CMDs | 24 | 02 | status -- the motherload of info... time, date, temps, lots of stuff | | 24 | 05 | date -- current controller date, happens every 2s or so | | 24 | 08 | temps -- air, water, preferred, solar, other temperatures | +| 24 | 86 | circuit change -- turn enum'd circuits (pumps, etc) on/off | | 24 | 88 | set heat -- sets thermostat temps and mode for spa and pool | -Seems like device 10 is the only one sending TYPE 24s. These may be the main informational messages. The TYPE 00 messages seem to occur in pairs SRC -> DST then a complementary 'ACK' messages from DST -> SRC. +Device $10 is the main one sending TYPE 24s, but any control device (such as the remote, $20) can probably send them. These may be the main informational messages. The TYPE 00 messages seem to occur in pairs SRC -> DST then a complementary 'ACK' messages from DST -> SRC. ### Interpreting Payloads A Simple, polymorphic class structure is implemented where specific payloads are intpreted based on TYPE and CMD. To add new interpreters: @@ -262,5 +263,3 @@ Not strictly necessary, but I like to see the hex scroll by on both sides. Then cat pool | nc mini.local 3000 ``` - - diff --git a/pentair-control.py b/pentair-control.py index d03402f..24c9eb6 100755 --- a/pentair-control.py +++ b/pentair-control.py @@ -82,6 +82,29 @@ def update(self, objects): except Exception as err: print(err) +class DeltaCommandProcessor(Observer): + def __init__(self, commands, protocol): + super().__init__() + self.commands = commands + self.protocol = protocol + + def update(self, updateList): + for u in updateList: + if len(u) > 0: + # try: + self.commands.append(self.protocol.createCommand(u)) + +class CommandFramer(Observer): + def __init__(self, frames, protocol): + super().__init__() + self.frames = frames + self.protocol = protocol + + def update(self, commands): + for c in commands: + if len(c) > 0: + self.frames.append(self.protocol.createFrame(c)) + # Configure logging logger = logging.getLogger("Pentair-Thing.core") @@ -140,7 +163,6 @@ def update(self, objects): frames = ObservableArray() state = ObservableDict() -deltas = ObservableArray() if len(inFile) > 0: @@ -172,6 +194,32 @@ def update(self, objects): output = CSVOutput() frames.addObserver(output) + +''' +Outut Chain + +delta updates come in to callback in GGAwareConnex..., which will append the `state` dict to the +deltas array. + +DeltaCommandProcessor gets these deltas and calls Protocol to create a command and appends commands +to the commands array. + +Then call the Protocol again to frame the commands. + +A writer Observer of that array will send the frames to the Serial Port and/or file. +''' +deltas = ObservableDeepArray() +commands = ObservableDeepArray() +commandStreams = ObservableArray() + +deltaCommandProcessor = DeltaCommandProcessor(commands, protocol) +deltas.addObserver(deltaCommandProcessor) + +commandFramer = CommandFramer(commandStreams, protocol) +commands.addObserver(commandFramer) + + + try: iotConnection = GreengrassAwareConnection(host, rootCA, cert, key, thingName, deltas)