Skip to content

Commit

Permalink
feat: mqtt writer
Browse files Browse the repository at this point in the history
  • Loading branch information
davidgiga1993 committed Aug 1, 2024
1 parent f0ddf40 commit 79d90d6
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 1 deletion.
72 changes: 72 additions & 0 deletions pollect/writers/MqttWriter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from __future__ import annotations

import json
from typing import List, Optional

import paho.mqtt.client as mqtt

from pollect.core.ValueSet import ValueSet
from pollect.sources.Source import Source
from pollect.writers.Writer import Writer


class MqttWriter(Writer):
_port: int
_host: str
_client: mqtt.Client

hass_discovery_prefix: str = "homeassistant"

def __init__(self, config):
super().__init__(config)
self._host = self.config.get('host', '127.0.01')
self._port = self.config.get('port', 1883)
self._ha_autodiscovery = self.config.get('hassAutodiscovery', False)
self._discovery_sent = {}

def supports_partial_write(self) -> bool:
return True

def start(self):
self._client = mqtt.Client(client_id='pollect')
self._client.on_connect_fail = self._on_connect_fail
self._client.connect_async(self._host, self._port, keepalive=30)
self._client.loop_start()

def stop(self):
self._client.disconnect()

def _on_connect_fail(self, client, userdata: any):
self.log.error(f'Connection failed with {userdata}')

def write(self, data: List[ValueSet], source_ref: Optional[Source] = None):
if not self._client.is_connected():
self.log.warning('Not connected to mqtt broker')
return
for value_set in data:
for value_obj in value_set.values:
path = 'pollect/' + value_set.name
if value_obj.name is not None:
path += '/' + value_obj.name
for x in range(len(value_set.labels)):
path += '/' + value_set.labels[x] + '/' + value_obj.label_values[x]

path = path.lower()

if self._ha_autodiscovery and path not in self._discovery_sent:
self._send_discovery(value_obj.name, path)

self._client.publish(path, value_obj.value, retain=True)

def _send_discovery(self, name: str, path: str):
discovery_msg = {
'name': name,
'device_class': None,
'state_topic': name
}
object_id = path.replace('/', '_')
self._client.publish(self.hass_discovery_prefix + '/sensor/' + object_id + '/config',
json.dumps(discovery_msg),
retain=True)
self._discovery_sent[path] = True

2 changes: 1 addition & 1 deletion pollect/writers/PrometheusWriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def write(self, data: List[ValueSet], source_ref: Optional[Source] = None):
for value_obj in value_set.values:
path = value_set.name
if value_obj.name is not None:
path += '.' + value_obj.name
path += '_' + value_obj.name

path = path.replace('-', '_').replace('.', '_').replace('!', '')

Expand Down

0 comments on commit 79d90d6

Please sign in to comment.