forked from asdaraujo/edge2ai-workshop
-
Notifications
You must be signed in to change notification settings - Fork 1
/
mqtt.iot_simulator.py
72 lines (55 loc) · 2.22 KB
/
mqtt.iot_simulator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#!/usr/bin/env python
"""a simple sensor data generator that sends to an MQTT broker via paho"""
import sys
import json
import time
import random
import paho.mqtt.client as mqtt
def generate(host, port, username, password, topic, machines, interval_ms, verbose):
"""generate data and send it to an MQTT broker"""
mqttc = mqtt.Client()
if username:
mqttc.username_pw_set(username, password)
mqttc.connect(host, port)
interval_secs = interval_ms / 1000.0
while True:
data = {
"sensor_id": random.randint(1,100),
"sensor_ts": long(time.time()*1000000)
}
inject_error = False
if random.randint(1,100) >= 85:
inject_error = True
for key in range(0, 12):
min_val, max_val = machines.get("sensor_" + str(key))
data["sensor_" + str(key)] = random.randint(min_val, max_val)
if inject_error and key < 2 and random.randint(1, 100) < 80:
data["sensor_" + str(key)] += random.randint(500,1000)
payload = json.dumps(data)
if verbose:
print("%s: %s" % (topic, payload))
mqttc.publish(topic, payload)
time.sleep(interval_secs)
def main(config_path):
"""main entry point, load and validate config and call generate"""
try:
with open(config_path) as handle:
config = json.load(handle)
mqtt_config = config.get("mqtt", {})
misc_config = config.get("misc", {})
machines = config.get("machines")
interval_ms = misc_config.get("interval_ms", 500)
verbose = misc_config.get("verbose", False)
host = mqtt_config.get("host", "localhost")
port = mqtt_config.get("port", 1883)
username = mqtt_config.get("username")
password = mqtt_config.get("password")
topic = mqtt_config.get("topic", "mqttgen")
generate(host, port, username, password, topic, machines, interval_ms, verbose)
except IOError as error:
print("Error opening config file '%s'" % config_path, error)
if __name__ == '__main__':
if len(sys.argv) == 2:
main(sys.argv[1])
else:
print("usage %s config.json" % sys.argv[0])