Skip to content

Commit

Permalink
Minor refactor in socket-reader, and fixing issue when there are no b…
Browse files Browse the repository at this point in the history
…ytes to read
  • Loading branch information
Andre Klang committed Dec 2, 2023
1 parent c80baab commit b0a53d7
Showing 1 changed file with 49 additions and 40 deletions.
89 changes: 49 additions & 40 deletions src/SimConnectSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,60 +102,69 @@ class SimConnectSocket extends Duplex {
_onReadable() {
while (!this._readingPaused) {
// Are there bytes remaining on a pending message
if (this._pendingBodyLengthRemaining && this._appendPending()) {
return;
if (this._pendingBodyLengthRemaining) {
this._appendPending();
}

let body: Buffer;
if (this._pendingBody.length) {
// Read message from the pending body
body = this._pendingBody;
// If there's no bytes remaining on a pending message, but there are in the pendingBody
if (!this._pendingBodyLengthRemaining && this._pendingBody.length) {
// Read message from the pending body, and send it
this._pushMessage(this._pendingBody);
this._pendingBody = Buffer.from([]);
} else {
// Read message length header
const lenBuf = this._socket.read(HEADER_LENGTH);
if (!lenBuf) return;
const bodyLength = lenBuf.readInt32LE() - HEADER_LENGTH;

// If expected body is longer than readable buffer
if (bodyLength > this._socket.readableLength) {
this._pendingBodyLengthRemaining = bodyLength;
this._appendPending();
return; // wait for another _onReadable
}

// Read message body
body = this._socket.read(bodyLength);

if (!body) {
// Put header back in read buffer
this._socket.unshift(lenBuf);
return;
}
return;
}

const message: SimConnectMessage = {
// Mandatory fields
protocolVersion: body.readInt32LE(0),
packetTypeId: body.readInt32LE(4),
data: new RawBuffer(body.slice(8)),
};
// Read message length header
const lenBuf = this._socket.read(HEADER_LENGTH);
if (!lenBuf) return;
const bodyLength = lenBuf.readInt32LE() - HEADER_LENGTH;

// If expected body is longer than readable buffer
if (bodyLength > this._socket.readableLength) {
this._pendingBodyLengthRemaining = bodyLength;
this._appendPending();
return; // wait for another _onReadable
}

// Add object to read buffer
const pushOk = this.push(message);
// Read message body
const body = this._socket.read(bodyLength);

if (!body) {
// Put header back in read buffer
this._socket.unshift(lenBuf);
return;
}

// Pause reading if consumer is slow
if (!pushOk) this._readingPaused = true;
this._pushMessage(body);
}
}

_appendPending() {
const readLength = Math.min(this._pendingBodyLengthRemaining, this._socket.readableLength);
this._pendingBody = Buffer.concat([this._pendingBody, this._socket.read(readLength)]);

if (!readLength) return;

const appendBuf = this._socket.read(readLength);

if (!appendBuf) return;

this._pendingBody = Buffer.concat([this._pendingBody, appendBuf]);
this._pendingBodyLengthRemaining -= readLength;
}

_pushMessage(body: Buffer) {
const message: SimConnectMessage = {
// Mandatory fields
protocolVersion: body.readInt32LE(0),
packetTypeId: body.readInt32LE(4),
data: new RawBuffer(body.slice(8)),
};

// Add object to read buffer
const pushOk = this.push(message);

// If there are still more to read, wait for another _onReadable
return !!this._pendingBodyLengthRemaining;
// Pause reading if consumer is slow
if (!pushOk) this._readingPaused = true;
}

_read() {
Expand Down

0 comments on commit b0a53d7

Please sign in to comment.