Skip to content

Commit

Permalink
refactor observables, add stats
Browse files Browse the repository at this point in the history
  • Loading branch information
scottrfrancis committed Jul 26, 2020
1 parent b35a623 commit d54b211
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 93 deletions.
40 changes: 13 additions & 27 deletions PentairProtocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,15 @@ def __init__(self):
0x08: TempPayload }
}

self.state = {}
self.resetStats()

def getStats(self):
return self.stats

def resetStats(self):
self.stats= { 'frameCount': 0,
'badFrames': 0,
'unprocessedPayloads': 0 }

#
# validFrame
Expand All @@ -261,8 +268,9 @@ def validFrame(self, f):
try:
f = f.rstrip(self.IDLE_BYTE)
valid = f[0] == self.START_BYTE and ((f[-2] << 8) + f[-1]) & 0xFFFF == reduce((lambda x, sum: sum + x), f[:-2])
# valid = ((f[-2] << 8) + f[-1]) & 0xFFFF == reduce((lambda x, sum: sum + x), f[:-2], 0xA5)

# increment badFrame counter for checksum errors ONLY... not for 'empty' frames
self.stats['badFrames'] += not valid
except Exception as e:
valid = False

Expand All @@ -284,10 +292,10 @@ def parsePayloadFromFrame(self, frame):
# payload.dump()

except Exception as err:
self.stats['unprocessedPayloads'] += 1
# print(err)
# print(",".join(list(map((lambda x: f'{x:02X}' if not isinstance(x, Iterable) else ' '.join(f'{b:02X}' for b in x) if len(x) > 0 else ''), list(frame.values())))))
pass


return state


Expand All @@ -298,6 +306,7 @@ def parsePayloadFromFrame(self, frame):
# DOES NOT PARSE PAYLOAD or SPECIFIC MESSAGE CODES
#
def parseFrame(self, f):
self.stats['frameCount'] += 1
f = f.rstrip(self.IDLE_BYTE)

if not self.validFrame(f):
Expand All @@ -319,27 +328,4 @@ def parseFrame(self, f):
pass

return parsed


# def parseEvents(self, events):
# frames = []
# while events:
# if self.validFrame(e):
# frames.append(self.parseFrame(e))

# try:
# if frame['payloadLength'] != len(frame['payload']):
# raise Exception

# payload = self.payloads[frame['type']][frame['command']](frame['payload'])
# # self.state.update(payload.getStatus()) # just overwrite ... and get the latest
# payload.dump()

# except Exception as err:
# print(err)
# # print(",".join(list(map((lambda x: f'{x:02X}' if not isinstance(x, Iterable) else ' '.join(f'{b:02X}' for b in x) if len(x) > 0 else ''), list(frame.values())))))
# pass

# events = events[1:]

# return frames
73 changes: 7 additions & 66 deletions pentair-control.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,56 +13,6 @@
import time


# an observable chunk of raw data from the serial port, or a file, or ?
class ObservableString(Observable):
def __init__(self):
super().__init__()
self.clear()

def clear(self):
self.chunk = b''

# call to add to the end of the chunk, notifies observers
def append(self, increment):
if len(increment) > 0:
self.chunk = self.chunk + increment

self.notifyObservers(self.chunk)
self.clear()

# an Observaable wrapped array
class ObservableArray(Observable):
def __init__(self):
super().__init__()
self.clear()

def clear(self):
self.elements = []

def append(self, newElements):
if len(newElements) > 0:
self.elements.extend(newElements)

self.notifyObservers(self.elements)
self.clear()

# an Observable wrapped dict
class ObservableDict(Observable):
def __init__(self):
super().__init__()
self.clear()

def clear(self):
self.dict = {}

def append(self, newDict):
if len(newDict) > 0:
self.dict.update(newDict)

def getDict(self):
return self.dict


# takes a stream on #update and writes it to the messages object, using messages#append
class MessageParser(Observer):
def __init__(self, separator, messages):
Expand All @@ -80,27 +30,15 @@ def parseStream(self, stream):

# takes messsages and parses to frames
class FrameParser(Observer):
def __init__(self, frames):
def __init__(self, frames, protocol):
super().__init__()
self.protocol = PentairProtocol()
self.protocol = protocol

self.frames = frames

def update(self, messages):
self.frames.append(list(map(self.protocol.parseFrame, messages)))

# for m in messages:
# self.frames.append(self.protocol.parseFrame(m))

class PayloadParser(Observer):
def __init__(self, payloads):
super().__init__()
self.protocol = PentairProtocol()

self.payloads = payloads

def update(self, frames):
self.payloads.append(list(map(self.protocol.parsePayload, frames)))

class stateAggregator(Observer):
def __init__(self, state):
Expand Down Expand Up @@ -191,7 +129,8 @@ def update(self, objects):
# connect messageParser as an oberver of streamData
streamData.addObserver(messageParser)

frameParser = FrameParser(frames)
protocol = PentairProtocol()
frameParser = FrameParser(frames, protocol)
messages.addObserver(frameParser)

state = ObservableDict()
Expand All @@ -210,7 +149,9 @@ def do_something():

# state.clear()
streamData.append(connection.listen())
logger.info(json.dumps(state.getDict()) + "\n")
logger.info(json.dumps(state.getDict()))
logger.info(json.dumps(protocol.getStats()) + "\n")
protocol.resetStats()

def run():
if not connection.isOpen():
Expand Down

0 comments on commit d54b211

Please sign in to comment.