Skip to content

Commit

Permalink
Handle disconnection by Modbus/TCP gateway (#347)
Browse files Browse the repository at this point in the history
* WC-2988: index.js: Make ModbusRTU an EventEmitter.

`node-red-modbus-contrib` monitors for disconnection events by hooking
the `close` event from the port by accessing the `_port` property of the
connection object.

This seems like a bad thing to do, but there are numerous cases where a
user of this library would need to receive such events.  So, better
plan: make the client itself an `EventEmitter` so it can proxy such
events.

* WC-2988: index.js: Expose 'close' event.

On successful connect, bind the `close` event of the parent connection
class to the port's `close` event so that when it fires on the port, it
fires the event (once) on the connection object.

No more events should be fired until the library user calls `open`
again.

* WC-2988: index.js: Prevent memory leak when calling `.open`.

If a port gets closed, and we call `.open`, a new copy of the `data`
handler is created and added to the port's `data` handler list, which
will race the first and cause double-handling.

Thus, we need to remove the old listener first before we add a new one.
HOWEVER, since it's a new copy, we don't have the reference to the old
one to hand to `removeListener`, so a better plan is needed:

1. move the logic out to its own private function.
2. we need a consistent reference, so use the `.bind` function once in
   our constructor, so we can re-use that same reference when calling
   `removeListener`
3. pass our premade reference to `_port.on`

This will save memory and prevent an ugly double-up situation.

* WC-2988: ports: Emit `close` if connection lost on TCP sockets.

If a TCP socket gets closed on us, emit a `close` signal so that Modbus
library users can initiate a re-connect if needed.
  • Loading branch information
sjlongland authored Jun 23, 2020
1 parent 5c06f21 commit 1d593c0
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 120 deletions.
264 changes: 144 additions & 120 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ require("./utils/buffer_bit")();
var crc16 = require("./utils/crc16");
var modbusSerialDebug = require("debug")("modbus-serial");

var util = require("util");
var events = require("events");
var EventEmitter = events.EventEmitter || events;

var PORT_NOT_OPEN_MESSAGE = "Port Not Open";
var PORT_NOT_OPEN_ERRNO = "ECONNREFUSED";

Expand Down Expand Up @@ -243,6 +247,135 @@ function _cancelTimeout(timeoutHandle) {
clearTimeout(timeoutHandle);
}

/**
* Handle incoming data from the Modbus port.
*
* @param {Buffer} data The data received
* @private
*/
function _onReceive(data) {
var modbus = this;
var error;

// set locale helpers variables
var transaction = modbus._transactions[modbus._port._transactionIdRead];

// the _transactionIdRead can be missing, ignore wrong transaction it's
if (!transaction) {
return;
}

/* cancel the timeout */
_cancelTimeout(transaction._timeoutHandle);
transaction._timeoutHandle = undefined;

/* check if the timeout fired */
if (transaction._timeoutFired === true) {
// we have already called back with an error, so don't generate a new callback
return;
}

/* check incoming data
*/

/* check minimal length
*/
if (!transaction.lengthUnknown && data.length < 5) {
error = "Data length error, expected " +
transaction.nextLength + " got " + data.length;
if (transaction.next)
transaction.next(new Error(error));
return;
}

/* check message CRC
* if CRC is bad raise an error
*/
var crcIn = data.readUInt16LE(data.length - 2);
if (crcIn !== crc16(data.slice(0, -2))) {
error = "CRC error";
if (transaction.next)
transaction.next(new Error(error));
return;
}

// if crc is OK, read address and function code
var address = data.readUInt8(0);
var code = data.readUInt8(1);

/* check for modbus exception
*/
if (data.length >= 5 &&
code === (0x80 | transaction.nextCode)) {
var errorCode = data.readUInt8(2);
if (transaction.next) {
error = new Error("Modbus exception " + errorCode + ": " + (modbusErrorMessages[errorCode] || "Unknown error"));
error.modbusCode = errorCode;
transaction.next(error);
}
return;
}

/* check message length
* if we do not expect this data
* raise an error
*/
if (!transaction.lengthUnknown && data.length !== transaction.nextLength) {
error = "Data length error, expected " +
transaction.nextLength + " got " + data.length;
if (transaction.next)
transaction.next(new Error(error));
return;
}

/* check message address and code
* if we do not expect this message
* raise an error
*/
if (address !== transaction.nextAddress || code !== transaction.nextCode) {
error = "Unexpected data error, expected " +
transaction.nextAddress + " got " + address;
if (transaction.next)
transaction.next(new Error(error));
return;
}

/* parse incoming data
*/

switch (code) {
case 1:
case 2:
// Read Coil Status (FC=01)
// Read Input Status (FC=02)
_readFC2(data, transaction.next);
break;
case 3:
case 4:
// Read Input Registers (FC=04)
// Read Holding Registers (FC=03)
_readFC4(data, transaction.next);
break;
case 5:
// Force Single Coil
_readFC5(data, transaction.next);
break;
case 6:
// Preset Single Register
_readFC6(data, transaction.next);
break;
case 15:
case 16:
// Force Multiple Coils
// Preset Multiple Registers
_readFC16(data, transaction.next);
break;
case 43:
// read device identification
_readFC43(data, modbus, transaction.next);
}
}

/**
* Class making ModbusRTU calls fun and easy.
*
Expand All @@ -256,7 +389,12 @@ var ModbusRTU = function(port) {
this._transactions = {};
this._timeout = null; // timeout in msec before unanswered request throws timeout error
this._unitID = 1;

this._onReceive = _onReceive.bind(this);

EventEmitter.call(this);
};
util.inherits(ModbusRTU, EventEmitter);

/**
* Open the serial port and register Modbus parsers
Expand All @@ -280,127 +418,13 @@ ModbusRTU.prototype.open = function(callback) {
modbus._port._transactionIdWrite = 1;

/* On serial port success
* register the modbus parser functions
* (re-)register the modbus parser functions
*/
modbus._port.on("data", function(data) {
// set locale helpers variables
var transaction = modbus._transactions[modbus._port._transactionIdRead];

// the _transactionIdRead can be missing, ignore wrong transaction it's
if (!transaction) {
return;
}

/* cancel the timeout */
_cancelTimeout(transaction._timeoutHandle);
transaction._timeoutHandle = undefined;

/* check if the timeout fired */
if (transaction._timeoutFired === true) {
// we have already called back with an error, so don't generate a new callback
return;
}

/* check incoming data
*/

/* check minimal length
*/
if (!transaction.lengthUnknown && data.length < 5) {
error = "Data length error, expected " +
transaction.nextLength + " got " + data.length;
if (transaction.next)
transaction.next(new Error(error));
return;
}

/* check message CRC
* if CRC is bad raise an error
*/
var crcIn = data.readUInt16LE(data.length - 2);
if (crcIn !== crc16(data.slice(0, -2))) {
error = "CRC error";
if (transaction.next)
transaction.next(new Error(error));
return;
}

// if crc is OK, read address and function code
var address = data.readUInt8(0);
var code = data.readUInt8(1);

/* check for modbus exception
*/
if (data.length >= 5 &&
code === (0x80 | transaction.nextCode)) {
var errorCode = data.readUInt8(2);
if (transaction.next) {
error = new Error("Modbus exception " + errorCode + ": " + (modbusErrorMessages[errorCode] || "Unknown error"));
error.modbusCode = errorCode;
transaction.next(error);
}
return;
}

/* check message length
* if we do not expect this data
* raise an error
*/
if (!transaction.lengthUnknown && data.length !== transaction.nextLength) {
error = "Data length error, expected " +
transaction.nextLength + " got " + data.length;
if (transaction.next)
transaction.next(new Error(error));
return;
}

/* check message address and code
* if we do not expect this message
* raise an error
*/
if (address !== transaction.nextAddress || code !== transaction.nextCode) {
error = "Unexpected data error, expected " +
transaction.nextAddress + " got " + address;
if (transaction.next)
transaction.next(new Error(error));
return;
}

/* parse incoming data
*/

switch (code) {
case 1:
case 2:
// Read Coil Status (FC=01)
// Read Input Status (FC=02)
_readFC2(data, transaction.next);
break;
case 3:
case 4:
// Read Input Registers (FC=04)
// Read Holding Registers (FC=03)
_readFC4(data, transaction.next);
break;
case 5:
// Force Single Coil
_readFC5(data, transaction.next);
break;
case 6:
// Preset Single Register
_readFC6(data, transaction.next);
break;
case 15:
case 16:
// Force Multiple Coils
// Preset Multiple Registers
_readFC16(data, transaction.next);
break;
case 43:
// read device identification
_readFC43(data, modbus, transaction.next);
}
});
modbus._port.removeListener("data", modbus._onReceive);
modbus._port.on("data", modbus._onReceive);

/* Hook the close event so we can relay it to our callers. */
modbus._port.once("close", modbus.emit.bind(modbus, "close"));

/* On serial port open OK call next function with no error */
if (callback)
Expand Down
1 change: 1 addition & 0 deletions ports/tcpport.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ var TcpPort = function(ip, options) {
modbus.openFlag = false;
modbusSerialDebug("TCP port: signal close: " + had_error);
handleCallback(had_error);
modbus.emit("close");
});

this._client.on("error", function(had_error) {
Expand Down
1 change: 1 addition & 0 deletions ports/tcprtubufferedport.js
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ var TcpRTUBufferedPort = function(ip, options) {
this._client.on("close", function(had_error) {
modbus.openFlag = false;
handleCallback(had_error);
modbus.emit("close");
});

this._client.on("error", function(had_error) {
Expand Down
1 change: 1 addition & 0 deletions ports/telnetport.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ var TelnetPort = function(ip, options) {
this._client.on("close", function(had_error) {
self.openFlag = false;
handleCallback(had_error);
self.emit("close");
});

this._client.on("error", function(had_error) {
Expand Down

0 comments on commit 1d593c0

Please sign in to comment.