diff --git a/src/SimConnectSocket.ts b/src/SimConnectSocket.ts index 8ce08ce..6dfce62 100644 --- a/src/SimConnectSocket.ts +++ b/src/SimConnectSocket.ts @@ -57,6 +57,10 @@ class SimConnectSocket extends Duplex { _readingPaused; + _pendingBodyLengthRemaining = 0; + + _pendingBody = Buffer.from([]); + constructor() { super({ objectMode: true }); this._readingPaused = false; @@ -97,19 +101,39 @@ class SimConnectSocket extends Duplex { _onReadable() { while (!this._readingPaused) { - // Read message length header - const lenBuf = this._socket.read(HEADER_LENGTH); - if (!lenBuf) return; - const bodyLength = lenBuf.readInt32LE() - HEADER_LENGTH; - - // Read message body - const body: Buffer = this._socket.read(bodyLength); - if (!body) { - // Put header back in read buffer - this._socket.unshift(lenBuf); + // Are there bytes remaining on a pending message + if (this._pendingBodyLengthRemaining && this._appendPending()) { return; } + let body: Buffer; + if (this._pendingBody.length) { + // Read message from the pending body + body = this._pendingBody; + this._pendingBody = Buffer.from([]); + } else { + // Read message length header + const lenBuf = this._socket.read(HEADER_LENGTH); + if (!lenBuf) return; + const bodyLength = lenBuf.readInt32LE() - HEADER_LENGTH; + + // If expected body is longer than readable buffer + if (bodyLength > this._socket.readableLength) { + this._pendingBodyLengthRemaining = bodyLength; + this._appendPending(); + return; // wait for another _onReadable + } + + // Read message body + body = this._socket.read(bodyLength); + + if (!body) { + // Put header back in read buffer + this._socket.unshift(lenBuf); + return; + } + } + const message: SimConnectMessage = { // Mandatory fields protocolVersion: body.readInt32LE(0), @@ -125,6 +149,15 @@ class SimConnectSocket extends Duplex { } } + _appendPending() { + const readLength = Math.min(this._pendingBodyLengthRemaining, this._socket.readableLength); + this._pendingBody = Buffer.concat([this._pendingBody, this._socket.read(readLength)]); + this._pendingBodyLengthRemaining -= readLength; + + // If there are still more to read, wait for another _onReadable + return !!this._pendingBodyLengthRemaining; + } + _read() { this._readingPaused = false; setImmediate(this._onReadable.bind(this));