Skip to content

Commit

Permalink
Merge pull request #1911 from ably/1907/extract-common-message-decoding
Browse files Browse the repository at this point in the history
[ECO-5054] Extract common code for message processing in RealtimeChannel to a separate function
  • Loading branch information
VeskeR authored Nov 29, 2024
2 parents 90ac849 + 282b188 commit 416c9b0
Showing 1 changed file with 72 additions and 46 deletions.
118 changes: 72 additions & 46 deletions src/common/lib/client/realtimechannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -569,34 +569,17 @@ class RealtimeChannel extends EventEmitter {
if (!message.presence) break;
// eslint-disable-next-line no-fallthrough
case actions.PRESENCE: {
const presence = message.presence;
const presenceMessages = message.presence;

if (!presence) {
if (!presenceMessages) {
break;
}

const { id, connectionId, timestamp } = message;

const options = this.channelOptions;
let presenceMsg: PresenceMessage;
for (let i = 0; i < presence.length; i++) {
try {
presenceMsg = presence[i];
await decodePresenceMessage(presenceMsg, options);
if (!presenceMsg.connectionId) presenceMsg.connectionId = connectionId;
if (!presenceMsg.timestamp) presenceMsg.timestamp = timestamp;
if (!presenceMsg.id) presenceMsg.id = id + ':' + i;
} catch (e) {
Logger.logAction(
this.logger,
Logger.LOG_ERROR,
'RealtimeChannel.processMessage()',
(e as Error).toString(),
);
}
}
await this._decodeAndPrepareMessages(message, presenceMessages, (msg) => decodePresenceMessage(msg, options));

if (this._presence) {
this._presence.setPresence(presence, isSync, syncChannelSerial as any);
this._presence.setPresence(presenceMessages, isSync, syncChannelSerial as any);
}
break;
}
Expand All @@ -620,10 +603,7 @@ class RealtimeChannel extends EventEmitter {

const messages = message.messages as Array<Message>,
firstMessage = messages[0],
lastMessage = messages[messages.length - 1],
id = message.id,
connectionId = message.connectionId,
timestamp = message.timestamp;
lastMessage = messages[messages.length - 1];

if (
firstMessage.extras &&
Expand All @@ -641,36 +621,37 @@ class RealtimeChannel extends EventEmitter {
break;
}

for (let i = 0; i < messages.length; i++) {
const msg = messages[i];
try {
await decodeMessage(msg, this._decodingContext);
} catch (e) {
const { unrecoverableError } = await this._decodeAndPrepareMessages(
message,
messages,
(msg) => decodeMessage(msg, this._decodingContext),
(e) => {
/* decrypt failed .. the most likely cause is that we have the wrong key */
Logger.logAction(
this.logger,
Logger.LOG_ERROR,
'RealtimeChannel.processMessage()',
(e as Error).toString(),
);
switch ((e as ErrorInfo).code) {
const errorInfo = e as ErrorInfo;

switch (errorInfo.code) {
case 40018:
/* decode failure */
this._startDecodeFailureRecovery(e as ErrorInfo);
return;
this._startDecodeFailureRecovery(errorInfo);
return { unrecoverableError: true };

case 40019:
/* No vcdiff plugin passed in - no point recovering, give up */
// eslint-disable-next-line no-fallthrough
case 40021:
/* Browser does not support deltas, similarly no point recovering */
this.notifyState('failed', e as ErrorInfo);
return;
this.notifyState('failed', errorInfo);
return { unrecoverableError: true };

default:
return { unrecoverableError: false };
}
}
if (!msg.connectionId) msg.connectionId = connectionId;
if (!msg.timestamp) msg.timestamp = timestamp;
if (!msg.id) msg.id = id + ':' + i;
},
);
if (unrecoverableError) {
return;
}

this._lastPayload.messageId = lastMessage.id;
this._lastPayload.protocolMessageChannelSerial = message.channelSerial;
this.onEvent(messages);
Expand Down Expand Up @@ -700,6 +681,51 @@ class RealtimeChannel extends EventEmitter {
}
}

/**
* Mutates provided messages by adding `connectionId`, `timestamp` and `id` fields, and decoding message data.
*
* @returns `unrecoverableError` flag. If `true` indicates that unrecoverable error was encountered during message decoding
* and any further message processing should be stopped. Always equals to `false` if `decodeErrorRecoveryHandler` was not provided
*/
private async _decodeAndPrepareMessages<T extends Message | PresenceMessage>(
protocolMessage: ProtocolMessage,
messages: T[],
decodeFn: (msg: T) => Promise<void>,
decodeErrorRecoveryHandler?: (e: Error) => { unrecoverableError: boolean },
): Promise<{ unrecoverableError: boolean }> {
const { id, connectionId, timestamp } = protocolMessage;

for (let i = 0; i < messages.length; i++) {
const msg = messages[i];

try {
// decode underlying data for a message
await decodeFn(msg);
} catch (e) {
Logger.logAction(
this.logger,
Logger.LOG_ERROR,
'RealtimeChannel.decodeAndPrepareMessages()',
(e as Error).toString(),
);

if (decodeErrorRecoveryHandler) {
const { unrecoverableError } = decodeErrorRecoveryHandler(e as Error);
if (unrecoverableError) {
// break out of for loop by returning
return { unrecoverableError: true };
}
}
}

if (!msg.connectionId) msg.connectionId = connectionId;
if (!msg.timestamp) msg.timestamp = timestamp;
if (!msg.id) msg.id = id + ':' + i;
}

return { unrecoverableError: false };
}

_startDecodeFailureRecovery(reason: ErrorInfo): void {
if (!this._lastPayload.decodeFailureRecoveryInProgress) {
Logger.logAction(
Expand Down

0 comments on commit 416c9b0

Please sign in to comment.