Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow MQTT topic to have wildcards (# or +) #5398

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions server/monitor-types/mqtt.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class MqttMonitorType extends MonitorType {
* @inheritdoc
*/
async check(monitor, heartbeat, server) {
const receivedMessage = await this.mqttAsync(monitor.hostname, monitor.mqttTopic, {
const [ messageTopic, receivedMessage ] = await this.mqttAsync(monitor.hostname, monitor.mqttTopic, {
port: monitor.port,
username: monitor.mqttUsername,
password: monitor.mqttPassword,
Expand All @@ -24,7 +24,7 @@ class MqttMonitorType extends MonitorType {

if (monitor.mqttCheckType === "keyword") {
if (receivedMessage != null && receivedMessage.includes(monitor.mqttSuccessMessage)) {
heartbeat.msg = `Topic: ${monitor.mqttTopic}; Message: ${receivedMessage}`;
heartbeat.msg = `Topic: ${messageTopic}; Message: ${receivedMessage}`;
heartbeat.status = UP;
} else {
throw Error(`Message Mismatch - Topic: ${monitor.mqttTopic}; Message: ${receivedMessage}`);
Expand Down Expand Up @@ -101,11 +101,9 @@ class MqttMonitorType extends MonitorType {
});

client.on("message", (messageTopic, message) => {
if (messageTopic === topic) {
client.end();
clearTimeout(timeoutID);
resolve(message.toString("utf8"));
}
client.end();
CommanderStorm marked this conversation as resolved.
Show resolved Hide resolved
clearTimeout(timeoutID);
resolve([ messageTopic, message.toString("utf8") ]);
});

});
Expand Down
57 changes: 51 additions & 6 deletions test/backend-test/test-mqtt.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,19 @@ const { UP, PENDING } = require("../../src/util");
* Runs an MQTT test with the
* @param {string} mqttSuccessMessage the message that the monitor expects
* @param {null|"keyword"|"json-query"} mqttCheckType the type of check we perform
* @param {string} receivedMessage what message is recieved from the mqtt channel
* @param {string} receivedMessage what message is received from the mqtt channel
* @param {string} monitorTopic which MQTT topic is monitored (wildcards are allowed)
* @param {string} publishTopic to which MQTT topic the message is sent
* @returns {Promise<Heartbeat>} the heartbeat produced by the check
*/
async function testMqtt(mqttSuccessMessage, mqttCheckType, receivedMessage) {
async function testMqtt(mqttSuccessMessage, mqttCheckType, receivedMessage, monitorTopic = "test", publishTopic = "test") {
const hiveMQContainer = await new HiveMQContainer().start();
const connectionString = hiveMQContainer.getConnectionString();
const mqttMonitorType = new MqttMonitorType();
const monitor = {
jsonPath: "firstProp", // always return firstProp for the json-query monitor
hostname: connectionString.split(":", 2).join(":"),
mqttTopic: "test",
mqttTopic: monitorTopic,
port: connectionString.split(":")[2],
mqttUsername: null,
mqttPassword: null,
Expand All @@ -35,9 +37,9 @@ async function testMqtt(mqttSuccessMessage, mqttCheckType, receivedMessage) {

const testMqttClient = mqtt.connect(hiveMQContainer.getConnectionString());
testMqttClient.on("connect", () => {
testMqttClient.subscribe("test", (error) => {
testMqttClient.subscribe(monitorTopic, (error) => {
if (!error) {
testMqttClient.publish("test", receivedMessage);
testMqttClient.publish(publishTopic, receivedMessage);
}
});
});
Expand All @@ -52,7 +54,7 @@ async function testMqtt(mqttSuccessMessage, mqttCheckType, receivedMessage) {
}

describe("MqttMonitorType", {
concurrency: true,
concurrency: 4,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the background behind this change? How was 4 chosen? 🤔

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I almost doubled the number of tests in this describe. For each test, a HiveMQ docker container is started. With concurrency: true, they are all started at the same time.

My powerful laptop and servers didn't like too much starting 13 containers at exactly the same time. The test suite needed over 2 minutes, and sometimes didn't even return at all. With concurrency: 4, the suite consistently required ~22s, which cannot get much better with a 20s timeout. There are 3 timeout tests, so concurrency: 3 would probably be fine as well. Anything more than 4 and I'd be afraid to DDOS other computers when testing.

skip: !!process.env.CI && (process.platform !== "linux" || process.arch !== "x64")
}, () => {
test("valid keywords (type=default)", async () => {
Expand All @@ -61,11 +63,51 @@ describe("MqttMonitorType", {
assert.strictEqual(heartbeat.msg, "Topic: test; Message: -> KEYWORD <-");
});

test("valid nested topic", async () => {
const heartbeat = await testMqtt("KEYWORD", null, "-> KEYWORD <-", "a/b/c", "a/b/c");
assert.strictEqual(heartbeat.status, UP);
assert.strictEqual(heartbeat.msg, "Topic: a/b/c; Message: -> KEYWORD <-");
});

test("valid wildcard topic (with #)", async () => {
const heartbeat = await testMqtt("KEYWORD", null, "-> KEYWORD <-", "a/#", "a/b/c");
assert.strictEqual(heartbeat.status, UP);
assert.strictEqual(heartbeat.msg, "Topic: a/b/c; Message: -> KEYWORD <-");
});

test("valid wildcard topic (with +)", async () => {
const heartbeat = await testMqtt("KEYWORD", null, "-> KEYWORD <-", "a/+/c", "a/b/c");
assert.strictEqual(heartbeat.status, UP);
assert.strictEqual(heartbeat.msg, "Topic: a/b/c; Message: -> KEYWORD <-");
});

test("invalid topic", async () => {
await assert.rejects(
testMqtt("keyword will not be checked anyway", null, "message", "x/y/z", "a/b/c"),
new Error("Timeout, Message not received"),
);
});

test("invalid wildcard topic (with #)", async () => {
await assert.rejects(
testMqtt("", null, "# should be last character", "#/c", "a/b/c"),
new Error("Timeout, Message not received"),
);
});

test("invalid wildcard topic (with +)", async () => {
await assert.rejects(
testMqtt("", null, "message", "x/+/z", "a/b/c"),
new Error("Timeout, Message not received"),
);
});

test("valid keywords (type=keyword)", async () => {
const heartbeat = await testMqtt("KEYWORD", "keyword", "-> KEYWORD <-");
assert.strictEqual(heartbeat.status, UP);
assert.strictEqual(heartbeat.msg, "Topic: test; Message: -> KEYWORD <-");
});

test("invalid keywords (type=default)", async () => {
await assert.rejects(
testMqtt("NOT_PRESENT", null, "-> KEYWORD <-"),
Expand All @@ -79,19 +121,22 @@ describe("MqttMonitorType", {
new Error("Message Mismatch - Topic: test; Message: -> KEYWORD <-"),
);
});

test("valid json-query", async () => {
// works because the monitors' jsonPath is hard-coded to "firstProp"
const heartbeat = await testMqtt("present", "json-query", "{\"firstProp\":\"present\"}");
assert.strictEqual(heartbeat.status, UP);
assert.strictEqual(heartbeat.msg, "Message received, expected value is found");
});

test("invalid (because query fails) json-query", async () => {
// works because the monitors' jsonPath is hard-coded to "firstProp"
await assert.rejects(
testMqtt("[not_relevant]", "json-query", "{}"),
new Error("Message received but value is not equal to expected value, value was: [undefined]"),
);
});

test("invalid (because successMessage fails) json-query", async () => {
// works because the monitors' jsonPath is hard-coded to "firstProp"
await assert.rejects(
Expand Down
Loading