-
Notifications
You must be signed in to change notification settings - Fork 0
/
Communication.py
94 lines (82 loc) · 2.73 KB
/
Communication.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
from serial import Serial, portNotOpenError
import serial.tools.list_ports as lp
from PyQt5.QtCore import QRunnable, pyqtSlot
import os
class SerialCommunicator(QRunnable):
def __init__(self, baudrate = 115200, timeout = 0.03, **kwargs):
super().__init__()
self.baudrate = baudrate
self.callbackFuns = []
self.buffer = set()
self.timeout = timeout
def reset_serial(self):
try:
self.serial.close()
except Exception as e:
pass
if hasattr(self, 'serial'):
delattr(self, 'serial')
try:
self.serial = Serial(self.port, self.baudrate, timeout=self.timeout)
print(self.port)
except Exception as e:
print('[Serial2] What we have here is the failure to communicate ;',e)
def readline(self):
try:
if not self.serial.is_open:
return None
data=self.serial.readline()
#return data
return str(data)[2:-4]
except Exception as e:
print('[Serial3]', e)
except portNotOpenError as e:
print('[serial4]', e)
def set_timeout(self, timeout):
self.serial.timeout = timeout
def writeline(self, data):
print('sending', data)
data = bytes(data, 'utf-8')
self.serial.write(data)
def setOnReadCallback(self, fun):
self.callbackFuns.append(fun)
def add_to_outbuffer(self, *args):
for a in args:
self.buffer.add(str(a))
def connection_status(self):
if not hasattr(self, 'serial'):
return 0
if self.serial.isOpen():
if os.path.exists(self.port):
return 1
return -2
return -1
@pyqtSlot()
def run(self):
self.main_loop = True
while self.main_loop:
self.run_condition = True
while self.run_condition and hasattr(self, 'serial'):
new = self.readline()
if new:
for f in self.callbackFuns:
f(new)
if self.buffer:
line = '<'
for b in self.buffer:
line += b
line += '>'
self.writeline(line)
self.buffer = set()
class DeviceSearcher:
def find_device_port(self, name='Arduino M0'):
ports = lp.comports(include_links=False)
for p in ports:
#print(p.description)
if name in p.description:
return p.device
return None
def list_ports(self):
ports = lp.comports(include_links=False)
ports = list(map(lambda p: p.device, ports))
return ports