From ee051ac911b4fa0cdff494c03b47e547e72cdef8 Mon Sep 17 00:00:00 2001 From: Jan-Piet Mens Date: Sun, 1 Oct 2023 11:44:58 +0200 Subject: [PATCH] use current aiomqtt conventions This appears to address #3 in as much as it works and the warning I reported there is silenced. Note that this patch contains the PR I submitted in #4. closes #3 --- plugins/event_source/mqtt.py | 35 +++++++++++++++++------------------ 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/plugins/event_source/mqtt.py b/plugins/event_source/mqtt.py index 12c50af..0f71f0e 100644 --- a/plugins/event_source/mqtt.py +++ b/plugins/event_source/mqtt.py @@ -56,29 +56,28 @@ async def main(queue: asyncio.Queue, args: Dict[str, Any]) -> None: cert_reqs=validate_certs if validate_certs is not None else True, ) - mqtt_consumer = aiomqtt.Client( + async with aiomqtt.Client( hostname=host, port=port, username=username, password=password, tls_params=tls_params if ca_certs else None, - ) - - await mqtt_consumer.connect() - - try: - async with mqtt_consumer.messages() as messages: - await mqtt_consumer.subscribe(topic) - async for message in messages: - try: - data = json.loads(message.payload.decode()) - await queue.put(data) - except json.decoder.JSONDecodeError: - logger.exception("Decoding exception for incoming message") - finally: - logger.info("Disconneccting from broker") - mqtt_consumer.disconnect() - + ) as mqtt_consumer: + + try: + async with mqtt_consumer.messages() as messages: + await mqtt_consumer.subscribe(topic) + async for message in messages: + try: + try: + data = json.loads(message.payload.decode()) + except json.decoder.JSONDecodeError: + data = dict(payload=message.payload.decode()) + await queue.put(data) + except json.decoder.JSONDecodeError: + logger.exception("Decoding exception for incoming message") + finally: + logger.info("Disconneccting from broker") if __name__ == "__main__": """MockQueue if running directly."""