diff --git a/.gitignore b/.gitignore index 916db7b..984474b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .vscode/ +certs/ # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/Connection.py b/Connection.py new file mode 100644 index 0000000..9e24fdd --- /dev/null +++ b/Connection.py @@ -0,0 +1,19 @@ +# Connection +# +# Base class for various sources of stream data +# + + +class Connection: + def __init__(self): + self.readbuffer = '' + # self.open() + + def open(self): + print('YOU MUST OVERRIDE THIS METHOD') + + def isOpen(self): + print('YOU MUST OVERRIDE THIS METHOD') + + def listen(self): + print('YOU MUST OVERRIDE THIS METHOD') diff --git a/FileReader.py b/FileReader.py index 90401b9..0c381b4 100644 --- a/FileReader.py +++ b/FileReader.py @@ -4,13 +4,16 @@ # no size limiting, no chunking, very simple # -class FileReader: - def __init__(self, file): - self.filename = file - self.readbuffer = '' +from Connection import Connection + + +class FileReader(Connection): + def __init__(self, filename): + super().__init__() + self.filename = filename self.file = None - self.open() + # self.open() def isOpen(self): return (self.file is not None) diff --git a/Observer.py b/Observer.py index e5f3651..66554e9 100644 --- a/Observer.py +++ b/Observer.py @@ -12,6 +12,7 @@ + class Observer: def update(observable, arg): '''Called when observed object is modified, from list @@ -46,4 +47,53 @@ def notifyObservers(self, arg = None): pass +# an observable chunk of raw data from the serial port, or a file, or ? +class ObservableString(Observable): + def __init__(self): + super().__init__() + self.clear() + + def clear(self): + self.chunk = b'' + + # call to add to the end of the chunk, notifies observers + def append(self, increment): + if len(increment) > 0: + self.chunk = self.chunk + increment + + self.notifyObservers(self.chunk) + self.clear() + +# an Observaable wrapped array +class ObservableArray(Observable): + def __init__(self): + super().__init__() + self.clear() + + def clear(self): + self.elements = [] + + def append(self, newElements): + if len(newElements) > 0: + self.elements.extend(newElements) + + self.notifyObservers(self.elements) + self.clear() + +# an Observable wrapped dict +class ObservableDict(Observable): + def __init__(self): + super().__init__() + self.clear() + + def clear(self): + self.dict = {} + + def append(self, newDict): + if len(newDict) > 0: + self.dict.update(newDict) + + def getDict(self): + return self.dict + \ No newline at end of file diff --git a/PentairProtocol.py b/PentairProtocol.py index 9ed7486..5943bf2 100644 --- a/PentairProtocol.py +++ b/PentairProtocol.py @@ -321,25 +321,25 @@ def parseFrame(self, f): return parsed - def parseEvents(self, events): - frames = [] - while events: - if self.validFrame(e): - frames.append(self.parseFrame(e)) + # def parseEvents(self, events): + # frames = [] + # while events: + # if self.validFrame(e): + # frames.append(self.parseFrame(e)) - try: - if frame['payloadLength'] != len(frame['payload']): - raise Exception + # try: + # if frame['payloadLength'] != len(frame['payload']): + # raise Exception - payload = self.payloads[frame['type']][frame['command']](frame['payload']) - # self.state.update(payload.getStatus()) # just overwrite ... and get the latest - payload.dump() + # payload = self.payloads[frame['type']][frame['command']](frame['payload']) + # # self.state.update(payload.getStatus()) # just overwrite ... and get the latest + # payload.dump() - except Exception as err: - print(err) - # print(",".join(list(map((lambda x: f'{x:02X}' if not isinstance(x, Iterable) else ' '.join(f'{b:02X}' for b in x) if len(x) > 0 else ''), list(frame.values()))))) - pass + # except Exception as err: + # print(err) + # # print(",".join(list(map((lambda x: f'{x:02X}' if not isinstance(x, Iterable) else ' '.join(f'{b:02X}' for b in x) if len(x) > 0 else ''), list(frame.values()))))) + # pass - events = events[1:] + # events = events[1:] - return frames + # return frames diff --git a/PentairStream.py b/PentairStream.py new file mode 100644 index 0000000..a5fb5c6 --- /dev/null +++ b/PentairStream.py @@ -0,0 +1,68 @@ +# PentairStream +# +# The processing stream from input connection to output state +# + +from Observer import * +from PentairProtocol import PentairProtocol + +# takes a stream on #update and writes it to the messages object, using messages#append +class MessageParser(Observer): + def __init__(self, separator, messages): + super().__init__() + self.separator = separator + self.messages = messages + + def update(self, stream): + self.parseStream(stream) + + def parseStream(self, stream): + if len(stream) > 0: + self.messages.append(stream.split(self.separator)) + + +# takes messsages and parses to frames +class FrameParser(Observer): + def __init__(self, frames): + super().__init__() + self.protocol = PentairProtocol() + + self.frames = frames + + def update(self, messages): + self.frames.append(list(map(self.protocol.parseFrame, messages))) + +class StateAggregator(Observer): + def __init__(self, state): + super().__init__() + self.state = state + + def update(self, parsedFrames): + for p in parsedFrames: + if 'state' in p: + self.state.append(p['state']) + +class PentairStream: + def __init__(self, connection): + self.connection = connection + + self.streamData = ObservableString() + self.messages = ObservableArray() + self.frames = ObservableArray() + self.state = ObservableDict() + + # messageParser will chop the stream into separate messages + self.messageParser = MessageParser(PentairProtocol.RECORD_SEPARATOR, self.messages) + # connect messageParser as an oberver of streamData + self.streamData.addObserver(self.messageParser) + + self.frameParser = FrameParser(self.frames) + self.messages.addObserver(self.frameParser) + + self.state = ObservableDict() + self.stateAggregator = StateAggregator(self.state) + self.frames.addObserver(self.stateAggregator) + + def getState(self): + self.streamData.append(self.connection.listen()) + return self.state.getDict() \ No newline at end of file diff --git a/README.md b/README.md index c888b15..8584e01 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,15 @@ _TODO_ To facilitate monitoring and control of residential pool equipment using the Pentair EasyTouch controller +#### Language, Requirements, Installation + +* Python 3.7 or later + +Use `pip3 install` for +* pyserial +* AWSIoTPythonSDK + + ### Background / Context I have an EasyTouch controller, a Variable Speed Pump, a Booster pump for the sweep, a Jet pump for the spa, two controlled valves (to switch between pool and spa), and a gas Heater. The controller is mounted outside near the equipment, but I mostly interract with the equipment using the EasyTouch wireless remote. Equipment was installed around 2010, so there may be more modern versions. I don't have Solar heating, water features, or poly-chromic lighting--so the project won't have info about those systems, but should help someone discover the protocol for those. diff --git a/SerialReader.py b/SerialReader.py index e90466a..a0339cf 100644 --- a/SerialReader.py +++ b/SerialReader.py @@ -2,13 +2,13 @@ # For details on pentair protocol # import serial +from Connection import Connection -class SerialReader: +class SerialReader(Connection): def __init__(self, device): + super().__init__() self.device = device - # self.separator = separator - # configure the serial connections (the parameters differs on the device you are connecting to) self.ser = serial.Serial( port=self.device, baudrate=9600, @@ -17,8 +17,6 @@ def __init__(self, device): bytesize=serial.EIGHTBITS, timeout=0.1, write_timeout=0.1 ) - self.readbuffer = '' - def open(self): self.ser.open() @@ -34,16 +32,7 @@ def listen(self): n = self.ser.inWaiting() if n > 0: try: - self.readbuffer = self.ser.read(n) #.decode('ascii') - # e = self.readbuffer.split(self.separator) - - # if self.readbuffer[-1] == self.RECORD_SEPARATOR: - # events = e - # self.readbuffer = '' - # else: - # emit the records that are complete - # events = e[:-1] - # self.readbuffer = e[-1] # retain the partial ones + self.readbuffer = self.ser.read(n) except Exception as e: print("Exception " + e + " while reading from Serial port") diff --git a/pentair-thing.py b/pentair-thing.py new file mode 100644 index 0000000..0c64c17 --- /dev/null +++ b/pentair-thing.py @@ -0,0 +1,237 @@ +#!/usr/bin/python3 +# /* +# * Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# * +# * Licensed under the Apache License, Version 2.0 (the "License"). +# * You may not use this file except in compliance with the License. +# * A copy of the License is located at +# * +# * http://aws.amazon.com/apache2.0 +# * +# * or in the "license" file accompanying this file. This file is distributed +# * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +# * express or implied. See the License for the specific language governing +# * permissions and limitations under the License. +# */ + + +import os +import sys +import time +import uuid +import json +import logging +import argparse +from AWSIoTPythonSDK.core.greengrass.discovery.providers import DiscoveryInfoProvider +from AWSIoTPythonSDK.core.protocol.connection.cores import ProgressiveBackOffCore +from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient +from AWSIoTPythonSDK.exception.AWSIoTExceptions import DiscoveryInvalidRequestException + +from FileReader import FileReader +from PentairStream import PentairStream +from SerialReader import SerialReader + + +# AllowedActions = ['both', 'publish', 'subscribe'] + +# General message notification callback +def customOnMessage(message): + print('Received message on topic %s: %s\n' % (message.topic, message.payload)) + +MAX_DISCOVERY_RETRIES = 10 +GROUP_CA_PATH = "./groupCA/" + +# Read in command-line parameters +parser = argparse.ArgumentParser() +parser.add_argument("-e", "--endpoint", action="store", required=True, dest="host", help="Your AWS IoT custom endpoint") +parser.add_argument("-r", "--rootCA", action="store", required=True, dest="rootCAPath", help="Root CA file path") +parser.add_argument("-c", "--cert", action="store", dest="certificatePath", help="Certificate file path") +parser.add_argument("-k", "--key", action="store", dest="privateKeyPath", help="Private key file path") +parser.add_argument("-n", "--thingName", action="store", dest="thingName", default="Bot", help="Targeted thing name") +# parser.add_argument("-t", "--topic", action="store", dest="topic", default="sdk/test/Python", help="Targeted topic") +# parser.add_argument("-m", "--mode", action="store", dest="mode", default="both", +# help="Operation modes: %s"%str(AllowedActions)) +# parser.add_argument("-M", "--message", action="store", dest="message", default="Hello World!", +# help="Message to publish") + +# +# Input Sources +# +# file input args +parser.add_argument("-i", "--inputfile", action="store", required=False, dest="inFile", default="", help="input file with raw protocol stream") + +# serial port args +parser.add_argument("-p", "--port", action="store", required=False, dest="port", default="/dev/ttyS0", help="Serial Port Device") +parser.add_argument("-t", "--timeout", action="store", required=True, dest="timeout", default="0.5", help="Timeout to wait for events") + + + +args = parser.parse_args() +host = args.host +rootCAPath = args.rootCAPath +certificatePath = args.certificatePath +privateKeyPath = args.privateKeyPath +clientId = args.thingName +thingName = args.thingName +# topic = args.topic +topic = "sdk/test/Python" + +# if args.mode not in AllowedActions: +# parser.error("Unknown --mode option %s. Must be one of %s" % (args.mode, str(AllowedActions))) +# exit(2) + +if not args.certificatePath or not args.privateKeyPath: + parser.error("Missing credentials for authentication, you must specify --cert and --key args.") + exit(2) + +if not os.path.isfile(rootCAPath): + parser.error("Root CA path does not exist {}".format(rootCAPath)) + exit(3) + +if not os.path.isfile(certificatePath): + parser.error("No certificate found at {}".format(certificatePath)) + exit(3) + +if not os.path.isfile(privateKeyPath): + parser.error("No private key found at {}".format(privateKeyPath)) + exit(3) + +if len(args.inFile) > 0 and os.path.isfile(args.inFile): + print(f'using {args.inFile} as source') + connection = FileReader(args.inFile) +elif len(args.port) > 0 and os.path.isfile(args.port): + # port = args.port + print(f'using {args.port} as source') + connection = SerialReader(args.port) +else: + parser.error("Must supply file or port") + exit(3) +# connection will read from either sourcse + +timeout = float(args.timeout) + + +# Configure logging +logger = logging.getLogger("AWSIoTPythonSDK.core") +logger.setLevel(logging.DEBUG) +streamHandler = logging.StreamHandler() +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +streamHandler.setFormatter(formatter) +logger.addHandler(streamHandler) + +# Progressive back off core +backOffCore = ProgressiveBackOffCore() + +# Discover GGCs +discoveryInfoProvider = DiscoveryInfoProvider() +discoveryInfoProvider.configureEndpoint(host) +discoveryInfoProvider.configureCredentials(rootCAPath, certificatePath, privateKeyPath) +discoveryInfoProvider.configureTimeout(10) # 10 sec + +retryCount = MAX_DISCOVERY_RETRIES +discovered = False +groupCA = None +coreInfo = None +while retryCount != 0: + try: + discoveryInfo = discoveryInfoProvider.discover(thingName) + caList = discoveryInfo.getAllCas() + coreList = discoveryInfo.getAllCores() + + # We only pick the first ca and core info + groupId, ca = caList[0] + coreInfo = coreList[0] + print("Discovered GGC: %s from Group: %s" % (coreInfo.coreThingArn, groupId)) + + print("Now we persist the connectivity/identity information...") + groupCA = GROUP_CA_PATH + groupId + "_CA_" + str(uuid.uuid4()) + ".crt" + if not os.path.exists(GROUP_CA_PATH): + os.makedirs(GROUP_CA_PATH) + groupCAFile = open(groupCA, "w") + groupCAFile.write(ca) + groupCAFile.close() + + discovered = True + print("Now proceed to the connecting flow...") + break + except DiscoveryInvalidRequestException as e: + print("Invalid discovery request detected!") + print("Type: %s" % str(type(e))) + print("Error message: %s" % e.message) + print("Stopping...") + break + except BaseException as e: + print("Error in discovery!") + print("Type: %s" % str(type(e))) + print("Error message: %s" % e.message) + retryCount -= 1 + print("\n%d/%d retries left\n" % (retryCount, MAX_DISCOVERY_RETRIES)) + print("Backing off...\n") + backOffCore.backOff() + +if not discovered: + print("Discovery failed after %d retries. Exiting...\n" % (MAX_DISCOVERY_RETRIES)) + sys.exit(-1) + +# Iterate through all connection options for the core and use the first successful one +myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId) +myAWSIoTMQTTClient.configureCredentials(groupCA, privateKeyPath, certificatePath) +myAWSIoTMQTTClient.onMessage = customOnMessage + +connected = False +for connectivityInfo in coreInfo.connectivityInfoList: + currentHost = connectivityInfo.host + currentPort = connectivityInfo.port + print("Trying to connect to core at %s:%d" % (currentHost, currentPort)) + myAWSIoTMQTTClient.configureEndpoint(currentHost, currentPort) + try: + myAWSIoTMQTTClient.connect() + connected = True + break + except BaseException as e: + print("Error in connect!") + print("Type: %s" % str(type(e))) + # print("Error message: %s" % e.message) + +if not connected: + print("Cannot connect to core %s. Exiting..." % coreInfo.coreThingArn) + sys.exit(-2) + +# Successfully connected to the core +# if args.mode == 'both' or args.mode == 'subscribe': +myAWSIoTMQTTClient.subscribe(topic, 0, None) +time.sleep(2) + +##### +# loopCount = 0 +# while True: +# if args.mode == 'both' or args.mode == 'publish': +# message = {} +# message['message'] = args.message +# message['sequence'] = loopCount +# messageJson = json.dumps(message) +# myAWSIoTMQTTClient.publish(topic, messageJson, 0) +# if args.mode == 'publish': +# print('Published topic %s: %s\n' % (topic, messageJson)) +# loopCount += 1 +# time.sleep(1) +#### + +stream = PentairStream(connection) + +def do_something(): + if not connection.isOpen(): + connection.open() + + message = stream.getState() + messageJson = json.dumps(message) + myAWSIoTMQTTClient.publish(topic, messageJson, 0) + + +def run(): + while True: + time.sleep(0.9*timeout) # crude approach to timing adjustment + do_something() + +if __name__ == "__main__": + run() \ No newline at end of file