Skip to content

Commit

Permalink
use current aiomqtt conventions
Browse files Browse the repository at this point in the history
This appears to address kubealex#3 in as much as it works and the warning I
reported there is silenced. Note that this patch contains the PR I
submitted in kubealex#4.
	closes kubealex#3
  • Loading branch information
jpmens committed Oct 1, 2023
1 parent f8d48ea commit ee051ac
Showing 1 changed file with 17 additions and 18 deletions.
35 changes: 17 additions & 18 deletions plugins/event_source/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,29 +56,28 @@ async def main(queue: asyncio.Queue, args: Dict[str, Any]) -> None:
cert_reqs=validate_certs if validate_certs is not None else True,
)

mqtt_consumer = aiomqtt.Client(
async with aiomqtt.Client(
hostname=host,
port=port,
username=username,
password=password,
tls_params=tls_params if ca_certs else None,
)

await mqtt_consumer.connect()

try:
async with mqtt_consumer.messages() as messages:
await mqtt_consumer.subscribe(topic)
async for message in messages:
try:
data = json.loads(message.payload.decode())
await queue.put(data)
except json.decoder.JSONDecodeError:
logger.exception("Decoding exception for incoming message")
finally:
logger.info("Disconneccting from broker")
mqtt_consumer.disconnect()

) as mqtt_consumer:

try:
async with mqtt_consumer.messages() as messages:
await mqtt_consumer.subscribe(topic)
async for message in messages:
try:
try:
data = json.loads(message.payload.decode())
except json.decoder.JSONDecodeError:
data = dict(payload=message.payload.decode())
await queue.put(data)
except json.decoder.JSONDecodeError:
logger.exception("Decoding exception for incoming message")
finally:
logger.info("Disconneccting from broker")

if __name__ == "__main__":
"""MockQueue if running directly."""
Expand Down

0 comments on commit ee051ac

Please sign in to comment.