Skip to content

Commit

Permalink
fix: resubscribe on reconnect to different core node (#222)
Browse files Browse the repository at this point in the history
Signed-off-by: Tristan Bastian <[email protected]>
  • Loading branch information
reey authored Oct 15, 2024
1 parent 21b676c commit 53f4a2e
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 160 deletions.
Original file line number Diff line number Diff line change
@@ -1,32 +1,42 @@
const c8yClientLib = require('@c8y/client');

module.exports = function(RED) {
function RealtimeAlarmsNode(config) {
RED.nodes.createNode(this,config);
var node = this;
const tenant = process.env.C8Y_TENANT;
const baseUrl = process.env.C8Y_BASEURL;
const user = process.env.C8Y_USER;
const password = process.env.C8Y_PASSWORD;
const auth = new c8yClientLib.BasicAuth({tenant, user, password});

node.client = new c8yClientLib.Client(auth, baseUrl);
node.client.core.tenant = tenant;

const topic = '/alarms/' + (config.deviceId || '*');
this.log(`Subscribing to: ${topic} on tenant: ${tenant} and url: ${baseUrl}`);
const subscription = node.client.realtime.subscribe(topic, (evt) => {
const msg = {
payload: evt
};
node.send(msg);
});

node.on('close', function() {
if (node.client && subscription) {
node.client.realtime.unsubscribe(subscription);
}
});
}
RED.nodes.registerType("realtime-alarms", RealtimeAlarmsNode);
}
const c8yClientLib = require('@c8y/client');

module.exports = function(RED) {
function RealtimeAlarmsNode(config) {
RED.nodes.createNode(this,config);
var node = this;
const tenant = process.env.C8Y_TENANT;
const baseUrl = process.env.C8Y_BASEURL;
const user = process.env.C8Y_USER;
const password = process.env.C8Y_PASSWORD;
const auth = new c8yClientLib.BasicAuth({tenant, user, password});

node.client = new c8yClientLib.Client(auth, baseUrl);
node.client.core.tenant = tenant;

const topic = '/alarms/' + (config.deviceId || '*');
this.log(`Subscribing to: ${topic} on tenant: ${tenant} and url: ${baseUrl}`);
const subscription = node.client.realtime.subscribe(topic, (evt) => {
const msg = {
payload: evt
};
node.send(msg);
});

const handle = node.client.realtime.addHandshakeListener((msg) => {
if (msg.successful && msg.reestablish) {
this.log(`Resubscribing to: ${topic} on tenant: ${tenant} and url: ${baseUrl}`);
node.client.realtime.resubscribe(subscription);
}
});

node.on('close', function() {
if (node.client && handle) {
node.client.realtime.removeListener(handle);
}
if (node.client && subscription) {
node.client.realtime.unsubscribe(subscription);
}
});
}
RED.nodes.registerType("realtime-alarms", RealtimeAlarmsNode);
}
Original file line number Diff line number Diff line change
@@ -1,32 +1,42 @@
const c8yClientLib = require('@c8y/client');

module.exports = function(RED) {
function RealtimeDeviceNode(config) {
RED.nodes.createNode(this,config);
var node = this;
const tenant = process.env.C8Y_TENANT;
const baseUrl = process.env.C8Y_BASEURL;
const user = process.env.C8Y_USER;
const password = process.env.C8Y_PASSWORD;
const auth = new c8yClientLib.BasicAuth({tenant, user, password});

node.client = new c8yClientLib.Client(auth, baseUrl);
node.client.core.tenant = tenant;

const topic = '/inventory/' + (config.deviceId || '*');
this.log(`Subscribing to: ${topic} on tenant: ${tenant} and url: ${baseUrl}`);
const subscription = node.client.realtime.subscribe(topic, (evt) => {
const msg = {
payload: evt
};
node.send(msg);
});

node.on('close', function() {
if (node.client && subscription) {
node.client.realtime.unsubscribe(subscription);
}
});
}
RED.nodes.registerType("realtime-device", RealtimeDeviceNode);
}
const c8yClientLib = require('@c8y/client');

module.exports = function(RED) {
function RealtimeDeviceNode(config) {
RED.nodes.createNode(this,config);
var node = this;
const tenant = process.env.C8Y_TENANT;
const baseUrl = process.env.C8Y_BASEURL;
const user = process.env.C8Y_USER;
const password = process.env.C8Y_PASSWORD;
const auth = new c8yClientLib.BasicAuth({tenant, user, password});

node.client = new c8yClientLib.Client(auth, baseUrl);
node.client.core.tenant = tenant;

const topic = '/inventory/' + (config.deviceId || '*');
this.log(`Subscribing to: ${topic} on tenant: ${tenant} and url: ${baseUrl}`);
const subscription = node.client.realtime.subscribe(topic, (evt) => {
const msg = {
payload: evt
};
node.send(msg);
});

const handle = node.client.realtime.addHandshakeListener((msg) => {
if (msg.successful && msg.reestablish) {
this.log(`Resubscribing to: ${topic} on tenant: ${tenant} and url: ${baseUrl}`);
node.client.realtime.resubscribe(subscription);
}
});

node.on('close', function() {
if (node.client && handle) {
node.client.realtime.removeListener(handle);
}
if (node.client && subscription) {
node.client.realtime.unsubscribe(subscription);
}
});
}
RED.nodes.registerType("realtime-device", RealtimeDeviceNode);
}
Original file line number Diff line number Diff line change
@@ -1,32 +1,42 @@
const c8yClientLib = require('@c8y/client');

module.exports = function(RED) {
function RealtimeEventNode(config) {
RED.nodes.createNode(this,config);
var node = this;
const tenant = process.env.C8Y_TENANT;
const baseUrl = process.env.C8Y_BASEURL;
const user = process.env.C8Y_USER;
const password = process.env.C8Y_PASSWORD;
const auth = new c8yClientLib.BasicAuth({tenant, user, password});

node.client = new c8yClientLib.Client(auth, baseUrl);
node.client.core.tenant = tenant;

const topic = '/events/' + (config.deviceId || '*');
this.log(`Subscribing to: ${topic} on tenant: ${tenant} and url: ${baseUrl}`);
const subscription = node.client.realtime.subscribe(topic, (evt) => {
const msg = {
payload: evt
};
node.send(msg);
});

node.on('close', function() {
if (node.client && subscription) {
node.client.realtime.unsubscribe(subscription);
}
});
}
RED.nodes.registerType("realtime-events", RealtimeEventNode);
}
const c8yClientLib = require('@c8y/client');

module.exports = function(RED) {
function RealtimeEventNode(config) {
RED.nodes.createNode(this,config);
var node = this;
const tenant = process.env.C8Y_TENANT;
const baseUrl = process.env.C8Y_BASEURL;
const user = process.env.C8Y_USER;
const password = process.env.C8Y_PASSWORD;
const auth = new c8yClientLib.BasicAuth({tenant, user, password});

node.client = new c8yClientLib.Client(auth, baseUrl);
node.client.core.tenant = tenant;

const topic = '/events/' + (config.deviceId || '*');
this.log(`Subscribing to: ${topic} on tenant: ${tenant} and url: ${baseUrl}`);
const subscription = node.client.realtime.subscribe(topic, (evt) => {
const msg = {
payload: evt
};
node.send(msg);
});

const handle = node.client.realtime.addHandshakeListener((msg) => {
if (msg.successful && msg.reestablish) {
this.log(`Resubscribing to: ${topic} on tenant: ${tenant} and url: ${baseUrl}`);
node.client.realtime.resubscribe(subscription);
}
});

node.on('close', function() {
if (node.client && handle) {
node.client.realtime.removeListener(handle);
}
if (node.client && subscription) {
node.client.realtime.unsubscribe(subscription);
}
});
}
RED.nodes.registerType("realtime-events", RealtimeEventNode);
}
Original file line number Diff line number Diff line change
@@ -1,32 +1,42 @@
const c8yClientLib = require('@c8y/client');

module.exports = function(RED) {
function RealtimeMeasurementNode(config) {
RED.nodes.createNode(this,config);
var node = this;
const tenant = process.env.C8Y_TENANT;
const baseUrl = process.env.C8Y_BASEURL;
const user = process.env.C8Y_USER;
const password = process.env.C8Y_PASSWORD;
const auth = new c8yClientLib.BasicAuth({tenant, user, password});

node.client = new c8yClientLib.Client(auth, baseUrl);
node.client.core.tenant = tenant;

const topic = '/measurements/' + (config.deviceId || '*');
this.log(`Subscribing to: ${topic} on tenant: ${tenant} and url: ${baseUrl}`);
const subscription = node.client.realtime.subscribe(topic, (evt) => {
const msg = {
payload: evt
};
node.send(msg);
});

node.on('close', function() {
if (node.client && subscription) {
node.client.realtime.unsubscribe(subscription);
}
});
}
RED.nodes.registerType("realtime-measurements", RealtimeMeasurementNode);
}
const c8yClientLib = require('@c8y/client');

module.exports = function(RED) {
function RealtimeMeasurementNode(config) {
RED.nodes.createNode(this,config);
var node = this;
const tenant = process.env.C8Y_TENANT;
const baseUrl = process.env.C8Y_BASEURL;
const user = process.env.C8Y_USER;
const password = process.env.C8Y_PASSWORD;
const auth = new c8yClientLib.BasicAuth({tenant, user, password});

node.client = new c8yClientLib.Client(auth, baseUrl);
node.client.core.tenant = tenant;

const topic = '/measurements/' + (config.deviceId || '*');
this.log(`Subscribing to: ${topic} on tenant: ${tenant} and url: ${baseUrl}`);
const subscription = node.client.realtime.subscribe(topic, (evt) => {
const msg = {
payload: evt
};
node.send(msg);
});

const handle = node.client.realtime.addHandshakeListener((msg) => {
if (msg.successful && msg.reestablish) {
this.log(`Resubscribing to: ${topic} on tenant: ${tenant} and url: ${baseUrl}`);
node.client.realtime.resubscribe(subscription);
}
});

node.on('close', function() {
if (node.client && handle) {
node.client.realtime.removeListener(handle);
}
if (node.client && subscription) {
node.client.realtime.unsubscribe(subscription);
}
});
}
RED.nodes.registerType("realtime-measurements", RealtimeMeasurementNode);
}
Original file line number Diff line number Diff line change
@@ -1,32 +1,42 @@
const c8yClientLib = require('@c8y/client');

module.exports = function(RED) {
function RealtimeOperationsNode(config) {
RED.nodes.createNode(this,config);
var node = this;
const tenant = process.env.C8Y_TENANT;
const baseUrl = process.env.C8Y_BASEURL;
const user = process.env.C8Y_USER;
const password = process.env.C8Y_PASSWORD;
const auth = new c8yClientLib.BasicAuth({tenant, user, password});

node.client = new c8yClientLib.Client(auth, baseUrl);
node.client.core.tenant = tenant;

const topic = '/operations/' + (config.deviceId || '*');
this.log(`Subscribing to: ${topic} on tenant: ${tenant} and url: ${baseUrl}`);
const subscription = node.client.realtime.subscribe(topic, (evt) => {
const msg = {
payload: evt
};
node.send(msg);
});

node.on('close', function() {
if (node.client && subscription) {
node.client.realtime.unsubscribe(subscription);
}
});
}
RED.nodes.registerType("realtime-operations", RealtimeOperationsNode);
}
const c8yClientLib = require('@c8y/client');

module.exports = function(RED) {
function RealtimeOperationsNode(config) {
RED.nodes.createNode(this,config);
var node = this;
const tenant = process.env.C8Y_TENANT;
const baseUrl = process.env.C8Y_BASEURL;
const user = process.env.C8Y_USER;
const password = process.env.C8Y_PASSWORD;
const auth = new c8yClientLib.BasicAuth({tenant, user, password});

node.client = new c8yClientLib.Client(auth, baseUrl);
node.client.core.tenant = tenant;

const topic = '/operations/' + (config.deviceId || '*');
this.log(`Subscribing to: ${topic} on tenant: ${tenant} and url: ${baseUrl}`);
const subscription = node.client.realtime.subscribe(topic, (evt) => {
const msg = {
payload: evt
};
node.send(msg);
});

const handle = node.client.realtime.addHandshakeListener((msg) => {
if (msg.successful && msg.reestablish) {
this.log(`Resubscribing to: ${topic} on tenant: ${tenant} and url: ${baseUrl}`);
node.client.realtime.resubscribe(subscription);
}
});

node.on('close', function() {
if (node.client && handle) {
node.client.realtime.removeListener(handle);
}
if (node.client && subscription) {
node.client.realtime.unsubscribe(subscription);
}
});
}
RED.nodes.registerType("realtime-operations", RealtimeOperationsNode);
}

0 comments on commit 53f4a2e

Please sign in to comment.