Skip to content

Commit

Permalink
Extract common code for message processing in RealtimeChannel to a se…
Browse files Browse the repository at this point in the history
…parate function

Resolves #1907
  • Loading branch information
VeskeR committed Oct 25, 2024
1 parent 0bd7aa5 commit 7b9cfa9
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 47 deletions.
2 changes: 1 addition & 1 deletion scripts/moduleReport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { gzip } from 'zlib';
import Table from 'cli-table';

// The maximum size we allow for a minimal useful Realtime bundle (i.e. one that can subscribe to a channel)
const minimalUsefulRealtimeBundleSizeThresholdsKiB = { raw: 98, gzip: 30 };
const minimalUsefulRealtimeBundleSizeThresholdsKiB = { raw: 99, gzip: 30 };

const baseClientNames = ['BaseRest', 'BaseRealtime'];

Expand Down
118 changes: 72 additions & 46 deletions src/common/lib/client/realtimechannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -569,34 +569,17 @@ class RealtimeChannel extends EventEmitter {
if (!message.presence) break;
// eslint-disable-next-line no-fallthrough
case actions.PRESENCE: {
const presence = message.presence;
const presenceMessages = message.presence;

if (!presence) {
if (!presenceMessages) {
break;
}

const { id, connectionId, timestamp } = message;

const options = this.channelOptions;
let presenceMsg: PresenceMessage;
for (let i = 0; i < presence.length; i++) {
try {
presenceMsg = presence[i];
await decodePresenceMessage(presenceMsg, options);
if (!presenceMsg.connectionId) presenceMsg.connectionId = connectionId;
if (!presenceMsg.timestamp) presenceMsg.timestamp = timestamp;
if (!presenceMsg.id) presenceMsg.id = id + ':' + i;
} catch (e) {
Logger.logAction(
this.logger,
Logger.LOG_ERROR,
'RealtimeChannel.processMessage()',
(e as Error).toString(),
);
}
}
await this._decodeAndPrepareMessages(message, presenceMessages, (msg) => decodePresenceMessage(msg, options));

if (this._presence) {
this._presence.setPresence(presence, isSync, syncChannelSerial as any);
this._presence.setPresence(presenceMessages, isSync, syncChannelSerial as any);
}
break;
}
Expand All @@ -620,10 +603,7 @@ class RealtimeChannel extends EventEmitter {

const messages = message.messages as Array<Message>,
firstMessage = messages[0],
lastMessage = messages[messages.length - 1],
id = message.id,
connectionId = message.connectionId,
timestamp = message.timestamp;
lastMessage = messages[messages.length - 1];

if (
firstMessage.extras &&
Expand All @@ -641,36 +621,37 @@ class RealtimeChannel extends EventEmitter {
break;
}

for (let i = 0; i < messages.length; i++) {
const msg = messages[i];
try {
await decodeMessage(msg, this._decodingContext);
} catch (e) {
const { unrecoverableError } = await this._decodeAndPrepareMessages(
message,
messages,
(msg) => decodeMessage(msg, this._decodingContext),
(e) => {
/* decrypt failed .. the most likely cause is that we have the wrong key */
Logger.logAction(
this.logger,
Logger.LOG_ERROR,
'RealtimeChannel.processMessage()',
(e as Error).toString(),
);
switch ((e as ErrorInfo).code) {
const errorInfo = e as ErrorInfo;

switch (errorInfo.code) {
case 40018:
/* decode failure */
this._startDecodeFailureRecovery(e as ErrorInfo);
return;
this._startDecodeFailureRecovery(errorInfo);
return { unrecoverableError: true };

case 40019:
/* No vcdiff plugin passed in - no point recovering, give up */
// eslint-disable-next-line no-fallthrough
case 40021:
/* Browser does not support deltas, similarly no point recovering */
this.notifyState('failed', e as ErrorInfo);
return;
this.notifyState('failed', errorInfo);
return { unrecoverableError: true };

default:
return { unrecoverableError: false };
}
}
if (!msg.connectionId) msg.connectionId = connectionId;
if (!msg.timestamp) msg.timestamp = timestamp;
if (!msg.id) msg.id = id + ':' + i;
},
);
if (unrecoverableError) {
return;
}

this._lastPayload.messageId = lastMessage.id;
this._lastPayload.protocolMessageChannelSerial = message.channelSerial;
this.onEvent(messages);
Expand Down Expand Up @@ -700,6 +681,51 @@ class RealtimeChannel extends EventEmitter {
}
}

/**
* Mutates provided messages by adding `connectionId`, `timestamp` and `id` fields, and decoding message data.
*
* @returns `unrecoverableError` flag. If `true` indicates that unrecoverable error was encountered during message decoding
* and any further message processing should be stopped. Always equals to `false` if `decodeErrorRecoveryHandler` was not provided
*/
private async _decodeAndPrepareMessages<T extends Message | PresenceMessage>(
protocolMessage: ProtocolMessage,
messages: T[],
decodeFn: (msg: T) => Promise<void>,
decodeErrorRecoveryHandler?: (e: Error) => { unrecoverableError: boolean },
): Promise<{ unrecoverableError: boolean }> {
const { id, connectionId, timestamp } = protocolMessage;

for (let i = 0; i < messages.length; i++) {
try {
const msg = messages[i];

// decode underlying data for a message
await decodeFn(msg);

if (!msg.connectionId) msg.connectionId = connectionId;
if (!msg.timestamp) msg.timestamp = timestamp;
if (!msg.id) msg.id = id + ':' + i;
} catch (e) {
Logger.logAction(
this.logger,
Logger.LOG_ERROR,
'RealtimeChannel.decodeAndPrepareMessages()',
(e as Error).toString(),
);

if (decodeErrorRecoveryHandler) {
const { unrecoverableError } = decodeErrorRecoveryHandler(e as Error);
if (unrecoverableError) {
// break out of for loop by returning
return { unrecoverableError: true };
}
}
}
}

return { unrecoverableError: false };
}

_startDecodeFailureRecovery(reason: ErrorInfo): void {
if (!this._lastPayload.decodeFailureRecoveryInProgress) {
Logger.logAction(
Expand Down

0 comments on commit 7b9cfa9

Please sign in to comment.