forked from martenjacobs/py-otgw-mqtt
-
Notifications
You must be signed in to change notification settings - Fork 0
/
opentherm_serial.py
52 lines (44 loc) · 1.48 KB
/
opentherm_serial.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from opentherm import OTGWClient
import re
from threading import Lock, Thread
import logging
import serial
log = logging.getLogger(__name__)
class OTGWSerialClient(OTGWClient):
r"""
A serial-based OTGWClient implementation
"""
def __init__(self, listener, **kwargs):
super(OTGWSerialClient, self).__init__(listener)
self._args=kwargs
def open(self):
r"""
Open the serial connection
"""
# TODO: Move other settings to config
self._serial = serial.Serial(self._args['device'],
baudrate=self._args.get('baudrate', 9600),
bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE, timeout=0.1)
def close(self):
r"""
Close the serial connection
"""
self._serial.close()
def write(self, data):
r"""
Write data to the serial device
"""
self._serial.write("{}\r\n".format(data.rstrip('\r\n')).encode('ascii', 'ignore'))
self._serial.flush()
def read(self, timeout):
r"""
Read a block of data from the serial device
"""
if(self._serial.timeout != timeout):
self._serial.timeout = timeout
try:
return self._serial.read(128).decode('ascii', 'ignore')
except Exception:
log.debug("Invalid response from serial read cycle")
return "invalid"