diff --git a/galaxy.yml b/galaxy.yml index 0b17df1..e990432 100644 --- a/galaxy.yml +++ b/galaxy.yml @@ -4,7 +4,7 @@ namespace: kubealex name: eda -version: 1.0.7 +version: 1.0.8 readme: README.md diff --git a/plugins/event_source/mqtt.py b/plugins/event_source/mqtt.py index 12c50af..34d222e 100644 --- a/plugins/event_source/mqtt.py +++ b/plugins/event_source/mqtt.py @@ -63,21 +63,17 @@ async def main(queue: asyncio.Queue, args: Dict[str, Any]) -> None: password=password, tls_params=tls_params if ca_certs else None, ) - - await mqtt_consumer.connect() - - try: - async with mqtt_consumer.messages() as messages: + async with mqtt_consumer: + try: await mqtt_consumer.subscribe(topic) - async for message in messages: + async for message in mqtt_consumer.messages: try: data = json.loads(message.payload.decode()) await queue.put(data) except json.decoder.JSONDecodeError: logger.exception("Decoding exception for incoming message") - finally: - logger.info("Disconneccting from broker") - mqtt_consumer.disconnect() + finally: + logger.info("Disconneccting from broker") if __name__ == "__main__": @@ -96,3 +92,4 @@ async def put(self: "MockQueue", event: dict) -> None: {"topic": "eda", "host": "localhost", "port": "1883"}, ), ) +