This repository has been archived by the owner on Oct 26, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 116
/
mqtt.py
206 lines (161 loc) · 5.68 KB
/
mqtt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
import json
import paho.mqtt.client as mqtt
import logger
LWT_ONLINE = "online"
LWT_OFFLINE = "offline"
_LOGGER = logger.get(__name__)
class MqttClient:
def __init__(self, config):
self._config = config
self._mqttc = mqtt.Client(
client_id=self.client_id,
clean_session=False,
userdata={"global_topic_prefix": self.topic_prefix},
)
if self.username and self.password and not self.client_key:
self.mqttc.username_pw_set(self.username, self.password)
if self.ca_cert and not self.client_key:
cert_reqs = mqtt.ssl.CERT_REQUIRED if self.ca_verify else mqtt.ssl.CERT_NONE
self.mqttc.tls_set(self.ca_cert, cert_reqs=cert_reqs)
self.mqttc.tls_insecure_set(not self.ca_verify)
if self.ca_cert and self.client_key:
cert_reqs = mqtt.ssl.CERT_REQUIRED if self.ca_verify else mqtt.ssl.CERT_NONE
self.mqttc.tls_set(self.ca_cert, self.client_cert, self.client_key, cert_reqs=cert_reqs)
self.mqttc.tls_insecure_set(not self.ca_verify)
if self.availability_topic:
topic = self._format_topic(self.availability_topic)
_LOGGER.debug("Setting LWT to: %s" % topic)
self.mqttc.will_set(topic, payload=LWT_OFFLINE, retain=True)
def publish(self, messages):
if not messages:
return
for m in messages:
if m.use_global_prefix:
topic = self._format_topic(m.topic)
else:
topic = m.topic
self.mqttc.publish(topic, m.payload, retain=m.retain)
@property
def client_id(self):
return (
self._config["client_id"]
if "client_id" in self._config
else "bt-mqtt-gateway"
)
@property
def hostname(self):
return self._config["host"]
@property
def port(self):
return self._config["port"] if "port" in self._config else 1883
@property
def username(self):
return str(self._config["username"]) if "username" in self._config else None
@property
def password(self):
return str(self._config["password"]) if "password" in self._config else None
@property
def ca_cert(self):
return self._config["ca_cert"] if "ca_cert" in self._config else None
@property
def client_cert(self):
return self._config["client_cert"] if "client_cert" in self._config else None
@property
def client_key(self):
return self._config["client_key"] if "client_key" in self._config else None
@property
def ca_verify(self):
if "ca_verify" in self._config:
# Constrain config input to boolean value
if self._config["ca_verify"]:
return True
else:
return False
else:
return True
@property
def topic_prefix(self):
return self._config["topic_prefix"] if "topic_prefix" in self._config else None
@property
def availability_topic(self):
return (
self._config["availability_topic"]
if "availability_topic" in self._config
else None
)
@property
def mqttc(self):
return self._mqttc
# noinspection PyUnusedLocal
def on_connect(self, client, userdata, flags, rc):
if self.availability_topic:
self.publish(
[
MqttMessage(
topic=self.availability_topic, payload=LWT_ONLINE, retain=True
)
]
)
def callbacks_subscription(self, callbacks):
self.mqttc.on_connect = self.on_connect
self.mqttc.connect(self.hostname, port=self.port)
for topic, callback in callbacks:
topic = self._format_topic(topic)
_LOGGER.debug("Subscribing to: %s" % topic)
self.mqttc.message_callback_add(topic, callback)
self.mqttc.subscribe(topic)
self.mqttc.loop_start()
def __del__(self):
if self.availability_topic:
self.publish(
[
MqttMessage(
topic=self.availability_topic, payload=LWT_OFFLINE, retain=True
)
]
)
def _format_topic(self, topic):
return "{}/{}".format(self.topic_prefix, topic) if self.topic_prefix else topic
class MqttMessage:
use_global_prefix = True
def __init__(self, topic=None, payload=None, retain=False):
self._topic = topic
self._payload = payload
self._retain = retain
@property
def topic(self):
return self._topic
@topic.setter
def topic(self, new_topic):
self._topic = new_topic
@property
def payload(self):
if isinstance(self.raw_payload, str):
return self.raw_payload
else:
return json.dumps(self.raw_payload)
@property
def raw_payload(self):
return self._payload
@property
def retain(self):
return self._retain
@retain.setter
def retain(self, new_retain):
self._retain = new_retain
@property
def as_dict(self):
return {"topic": self.topic, "payload": self.payload}
def __repr__(self):
return self.as_dict.__str__()
def __str__(self):
return self.__repr__()
class MqttConfigMessage(MqttMessage):
SENSOR = "sensor"
CLIMATE = "climate"
BINARY_SENSOR = "binary_sensor"
COVER = "cover"
SWITCH = "switch"
use_global_prefix = False
def __init__(self, component, name, payload=None, retain=False):
super().__init__("{}/{}/config".format(component, name), payload, retain)