From b0a53d78d69bd5565a3f3a084ef96eeb6ba5c761 Mon Sep 17 00:00:00 2001 From: Andre Klang Date: Sat, 2 Dec 2023 14:45:18 +0100 Subject: [PATCH] Minor refactor in socket-reader, and fixing issue when there are no bytes to read --- src/SimConnectSocket.ts | 89 +++++++++++++++++++++++------------------ 1 file changed, 49 insertions(+), 40 deletions(-) diff --git a/src/SimConnectSocket.ts b/src/SimConnectSocket.ts index 6dfce62..a9b7801 100644 --- a/src/SimConnectSocket.ts +++ b/src/SimConnectSocket.ts @@ -102,60 +102,69 @@ class SimConnectSocket extends Duplex { _onReadable() { while (!this._readingPaused) { // Are there bytes remaining on a pending message - if (this._pendingBodyLengthRemaining && this._appendPending()) { - return; + if (this._pendingBodyLengthRemaining) { + this._appendPending(); } - let body: Buffer; - if (this._pendingBody.length) { - // Read message from the pending body - body = this._pendingBody; + // If there's no bytes remaining on a pending message, but there are in the pendingBody + if (!this._pendingBodyLengthRemaining && this._pendingBody.length) { + // Read message from the pending body, and send it + this._pushMessage(this._pendingBody); this._pendingBody = Buffer.from([]); - } else { - // Read message length header - const lenBuf = this._socket.read(HEADER_LENGTH); - if (!lenBuf) return; - const bodyLength = lenBuf.readInt32LE() - HEADER_LENGTH; - - // If expected body is longer than readable buffer - if (bodyLength > this._socket.readableLength) { - this._pendingBodyLengthRemaining = bodyLength; - this._appendPending(); - return; // wait for another _onReadable - } - - // Read message body - body = this._socket.read(bodyLength); - - if (!body) { - // Put header back in read buffer - this._socket.unshift(lenBuf); - return; - } + return; } - const message: SimConnectMessage = { - // Mandatory fields - protocolVersion: body.readInt32LE(0), - packetTypeId: body.readInt32LE(4), - data: new RawBuffer(body.slice(8)), - }; + // Read message length header + const lenBuf = this._socket.read(HEADER_LENGTH); + if (!lenBuf) return; + const bodyLength = lenBuf.readInt32LE() - HEADER_LENGTH; + + // If expected body is longer than readable buffer + if (bodyLength > this._socket.readableLength) { + this._pendingBodyLengthRemaining = bodyLength; + this._appendPending(); + return; // wait for another _onReadable + } - // Add object to read buffer - const pushOk = this.push(message); + // Read message body + const body = this._socket.read(bodyLength); + + if (!body) { + // Put header back in read buffer + this._socket.unshift(lenBuf); + return; + } - // Pause reading if consumer is slow - if (!pushOk) this._readingPaused = true; + this._pushMessage(body); } } _appendPending() { const readLength = Math.min(this._pendingBodyLengthRemaining, this._socket.readableLength); - this._pendingBody = Buffer.concat([this._pendingBody, this._socket.read(readLength)]); + + if (!readLength) return; + + const appendBuf = this._socket.read(readLength); + + if (!appendBuf) return; + + this._pendingBody = Buffer.concat([this._pendingBody, appendBuf]); this._pendingBodyLengthRemaining -= readLength; + } + + _pushMessage(body: Buffer) { + const message: SimConnectMessage = { + // Mandatory fields + protocolVersion: body.readInt32LE(0), + packetTypeId: body.readInt32LE(4), + data: new RawBuffer(body.slice(8)), + }; + + // Add object to read buffer + const pushOk = this.push(message); - // If there are still more to read, wait for another _onReadable - return !!this._pendingBodyLengthRemaining; + // Pause reading if consumer is slow + if (!pushOk) this._readingPaused = true; } _read() {