-
Notifications
You must be signed in to change notification settings - Fork 2
/
mqttduino.ino
136 lines (127 loc) · 5.49 KB
/
mqttduino.ino
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#define APssid "HomeWIFI" //wifi network ssid
#define APpsw "wifipassword" //wifi netwrok password
#define MQTTid "MqttTestClient" //id of this mqtt client
#define MQTTip "mqtt.myserver.com" //ip address or hostname of the mqtt broker
#define MQTTport 1883 //port of the mqtt broker
#define MQTTuser "client1" //username of this mqtt client
#define MQTTpsw "password1" //password of this mqtt client
#define MQTTalive 120 //mqtt keep alive interval (seconds)
#define MQTTretry 10 //time to wait before reconnect if connection drops (seconds)
#define MQTTqos 2 //quality of service for subscriptions and publishes
#define esp8266reset A5 //arduino pin connected to esp8266 reset pin (analog pin suggested due to higher impedance)
#define esp8266alive 40 //esp8266 keep alive interval (reset board if fail) (seconds)
#define esp8266serial Serial //Serial port to use to communicate to the esp8266 (Serial, Serial1, Serial2, etc)
boolean connected = false;
void onConnected() { //on connected callback
mqttSubscribe("hello"); //subscribe to "hello" topic
}
void onDisconnected() { //on disconnected callback
}
void onMessage(String topic, String message) { //new message callback
}
// #### DO NOT TOUCH THIS CODE! ####
#define buffer_l 50
#define replyTimeout 5000
char in_buffer[buffer_l + 1];
char cb[1];
boolean success;
boolean messageQueued = false;
unsigned long lastAliveCheck = 0;
void checkComm() {
if (millis() - lastAliveCheck > esp8266alive * 2000UL || lastAliveCheck == 0) {
pinMode(esp8266reset, OUTPUT);
delay(50);
pinMode(esp8266reset, INPUT);
lastAliveCheck = millis();
connected = false;
}
if (esp8266serial.find("[(")) {
esp8266serial.readBytes(cb, 1);
if (cb[0] == 'r') {
//ready
if (connected) {
connected = false;
onDisconnected();
}
lastAliveCheck = millis();
esp8266serial.println("startAlive(" + String(esp8266alive) + ")");
esp8266serial.flush();
esp8266serial.println("connectAP(\"" + String(APssid) + "\", \"" + String(APpsw) + "\")");
esp8266serial.flush();
} else if (cb[0] == 'a') {
lastAliveCheck = millis();
checkComm();
} else if (cb[0] == 'w') {
//wifi connected
esp8266serial.println("mqttInit(\"" + String(MQTTid) + "\", \"" + String(MQTTip) + "\", " + MQTTport + ", \"" + String(MQTTuser)
+ "\", \"" + String(MQTTpsw) + "\", " + MQTTalive + ", " + MQTTretry + ")");
esp8266serial.flush();
} else if (cb[0] == 'c') {
//mqtt connected
connected = true;
onConnected();
} else if (cb[0] == 'd') {
//disconnected
connected = false;
onDisconnected();
} else if (cb[0] == 'm') {
//new message
if (messageQueued)
return;
if (!success)
messageQueued = true;
memset(in_buffer, 0, sizeof(in_buffer));
esp8266serial.readBytesUntil('|', in_buffer, buffer_l);
String topic = String(in_buffer);
memset(in_buffer, 0, sizeof(in_buffer));
esp8266serial.readBytesUntil('|', in_buffer, buffer_l);
String message = String(in_buffer);
waitForSuccess();
onMessage(topic, message);
messageQueued = false;
} else if (cb[0] == 'p' || cb[0] == 's') {
success = true;
}
}
}
void waitForSuccess() {
unsigned long started = millis();
while (!success) {
if (!connected || millis() - started > replyTimeout) {
success = true;
break;
}
checkComm();
}
}
void mqttPublish(String topic, String message, byte retain) {
if (!connected)
return;
success = false;
esp8266serial.println("mqttPublish(\"" + topic + "\", \"" + message + "\", " + MQTTqos + ", " + retain + ")");
esp8266serial.flush();
waitForSuccess();
}
void mqttSubscribe(String topic) {
if (!connected)
return;
success = false;
esp8266serial.println("mqttSubscribe(\"" + String(topic) + "\", " + MQTTqos + ")");
esp8266serial.flush();
waitForSuccess();
}
// #### END OF UNTOUCHABLE CODE ####
void setup() {
esp8266serial.begin(9600); //
esp8266serial.setTimeout(500) //start serial
while(!connected) //
checkComm(); //wait for first connection
}
void loop() {
do //
checkComm(); //
while(!connected); //check for incoming messages
delay(10000);
mqttPublish("uptime", String(millis()), 0); //publish new message to "uptime" topic, with no retain
mqttPublish("sensor", String(analogRead(A0)), 1); //publish new message to "sensor" topic, with retain
}