diff --git a/ports/tcpport.js b/ports/tcpport.js index 5e4133b..25633b0 100644 --- a/ports/tcpport.js +++ b/ports/tcpport.js @@ -48,7 +48,7 @@ class TcpPort extends EventEmitter { /** @type {net.Socket?} - Optional custom socket */ this._externalSocket = null; - if(typeof ip === "object") { + if (typeof ip === "object") { options = ip; ip = undefined; } @@ -62,7 +62,7 @@ class TcpPort extends EventEmitter { } /** @type {net.TcpSocketConnectOpts} - Options for net.connect(). */ - this.connectOptions = { + this.connectOptions = { // Default options ...{ host: ip || options.ip, @@ -72,8 +72,8 @@ class TcpPort extends EventEmitter { ...options }; - if(options.socket) { - if(options.socket instanceof net.Socket) { + if (options.socket) { + if (options.socket instanceof net.Socket) { this._externalSocket = options.socket; this.openFlag = this._externalSocket.readyState === "opening" || this._externalSocket.readyState === "open"; } else { @@ -92,6 +92,7 @@ class TcpPort extends EventEmitter { // init a socket this._client = this._externalSocket || new net.Socket(this.socketOpts); + this._writeCompleted = Promise.resolve(); if (options.timeout) this._client.setTimeout(options.timeout); @@ -131,7 +132,9 @@ class TcpPort extends EventEmitter { this._client.on("connect", function() { self.openFlag = true; + self._writeCompleted = Promise.resolve(); modbusSerialDebug("TCP port: signal connect"); + self._client.setNoDelay(); handleCallback(); }); @@ -174,10 +177,10 @@ class TcpPort extends EventEmitter { * @param {(err?: Error) => void} callback */ open(callback) { - if(this._externalSocket === null) { + if (this._externalSocket === null) { this.callback = callback; this._client.connect(this.connectOptions); - } else if(this.openFlag) { + } else if (this.openFlag) { modbusSerialDebug("TCP port: external socket is opened"); callback(); // go ahead to setup existing socket } else { @@ -214,7 +217,7 @@ class TcpPort extends EventEmitter { * @param {Buffer} data */ write(data) { - if(data.length < MIN_DATA_LENGTH) { + if (data.length < MIN_DATA_LENGTH) { modbusSerialDebug("expected length of data is to small - minimum is " + MIN_DATA_LENGTH); return; } @@ -240,7 +243,26 @@ class TcpPort extends EventEmitter { }); // send buffer to slave - this._client.write(buffer); + const previousWritePromise = this._writeCompleted; + const newWritePromise = new Promise((resolveNewWrite, rejectNewWrite) => { + // Wait for the completion of any write that happened before. + previousWritePromise.finally(() => { + try { + // The previous write succeeded, write the new buffer. + if (this._client.write(buffer)) { + // Mark this write as complete. + resolveNewWrite(); + } else { + // Wait for one `drain` event to mark this write as complete. + this._client.once("drain", resolveNewWrite); + } + } catch (error) { + rejectNewWrite(error); + } + }); + }); + // Overwrite `_writeCompleted` so that the next call to `TcpPort.write` will have to wait on our write to complete. + this._writeCompleted = newWritePromise; // set next transaction id this._transactionIdWrite = (this._transactionIdWrite + 1) % MAX_TRANSACTIONS; diff --git a/test/mocks/netMock.js b/test/mocks/netMock.js index b8cd3a6..fa462c2 100644 --- a/test/mocks/netMock.js +++ b/test/mocks/netMock.js @@ -15,12 +15,17 @@ class Socket extends EventEmitter { } } + setNoDelay() { + return this; + } + end() { this.emit("close", false); } write(data) { this._data = data; + return true; } receive(buffer) { diff --git a/test/ports/tcpport.test.js b/test/ports/tcpport.test.js index 7449bb7..fca4077 100644 --- a/test/ports/tcpport.test.js +++ b/test/ports/tcpport.test.js @@ -122,10 +122,11 @@ describe("Modbus TCP port methods", function() { }); port.open(function() { port.write(Buffer.from("1103006B00037687", "hex")); - - if (port._client._data.equals(Buffer.from("0001000000061103006B0003", "hex"))) { - port._client.receive(Buffer.from("000100000006110366778899", "hex")); - } + port._writeCompleted.then(function() { + if (port._client._data.equals(Buffer.from("0001000000061103006B0003", "hex"))) { + port._client.receive(Buffer.from("000100000006110366778899", "hex")); + } + }); }); }); @@ -136,18 +137,22 @@ describe("Modbus TCP port methods", function() { }); port.open(function() { port.write(Buffer.from("1103006B00037687", "hex")); - - if (port._client._data.equals(Buffer.from("0002000000061103006B0003", "hex"))) { - port._client.receive(Buffer.from("000200000003118304", "hex")); - } + port._writeCompleted.then(function() { + if (port._client._data.equals(Buffer.from("0002000000061103006B0003", "hex"))) { + port._client.receive(Buffer.from("000200000003118304", "hex")); + } + }); }); }); }); describe("#write", function() { - it("should write a valid TCP message to the port", function() { + it("should write a valid TCP message to the port", function(done) { port.write(Buffer.from("1103006B00037687", "hex")); - expect(port._client._data.toString("hex")).to.equal("0003000000061103006b0003"); + port._writeCompleted.then(function() { + expect(port._client._data.toString("hex")).to.equal("0003000000061103006b0003"); + done(); + }); }); });