-
Notifications
You must be signed in to change notification settings - Fork 8
/
example.py
61 lines (47 loc) · 1.89 KB
/
example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#!/usr/bin/env python3
import time
import uuid
import paho.mqtt.client as mqtt
import trio
from trio_paho_mqtt import AsyncClient
client_id = 'trio-paho-mqtt/' + str(uuid.uuid4())
topic = client_id
n_messages = 3
print("Using client_id / topic: " + client_id)
print(f"Sending {n_messages} messages.")
async def test_read(client, nursery):
"""Read from the broker. Quit after all messages have been received."""
count = 0
async for msg in client.messages():
# Got a message. Print the first few bytes.
print(f"< Received msg: {msg.payload[:18]}")
count += 1
if count == n_messages:
# We have received all the messages. Cancel.
nursery.cancel_scope.cancel()
async def test_write(client):
"""Publish a long message. The message will typically be about 720k bytes, so it will take some time to send.
Publishing asynchronously will cause this function to return almost immediately."""
now = time.time()
print(f"> Publishing: {now} * 40000")
client.publish(topic, bytes(str(now), encoding='utf8') * 40000, qos=1)
print(f"publish took {time.time() - now} seconds")
async def main():
async with trio.open_nursery() as nursery:
# Get a regular MQTT client
sync_client = mqtt.Client()
# Wrap it to create an asyncronous version
client = AsyncClient(sync_client, nursery)
# Connect to the broker, and subscribe to the topic
client.connect('mqtt.eclipse.org', 1883, 60)
client.subscribe(topic)
# Start the reader
nursery.start_soon(test_read, client, nursery)
# Start a bunch of writers. Wait 5 seconds between them to demonstrate the asynchronous nature of reading and
# writing:
for i in range(n_messages):
nursery.start_soon(test_write, client)
await trio.sleep(5)
print("Starting")
trio.run(main)
print("Finished")