From 7b9cfa9d1bf5c0b66fe07a3945be0950632f2826 Mon Sep 17 00:00:00 2001 From: Andrew Bulat Date: Fri, 25 Oct 2024 11:08:35 +0100 Subject: [PATCH] Extract common code for message processing in RealtimeChannel to a separate function Resolves #1907 --- scripts/moduleReport.ts | 2 +- src/common/lib/client/realtimechannel.ts | 118 ++++++++++++++--------- 2 files changed, 73 insertions(+), 47 deletions(-) diff --git a/scripts/moduleReport.ts b/scripts/moduleReport.ts index 25daba894..6014d6ee4 100644 --- a/scripts/moduleReport.ts +++ b/scripts/moduleReport.ts @@ -6,7 +6,7 @@ import { gzip } from 'zlib'; import Table from 'cli-table'; // The maximum size we allow for a minimal useful Realtime bundle (i.e. one that can subscribe to a channel) -const minimalUsefulRealtimeBundleSizeThresholdsKiB = { raw: 98, gzip: 30 }; +const minimalUsefulRealtimeBundleSizeThresholdsKiB = { raw: 99, gzip: 30 }; const baseClientNames = ['BaseRest', 'BaseRealtime']; diff --git a/src/common/lib/client/realtimechannel.ts b/src/common/lib/client/realtimechannel.ts index 24ffe62cf..0c10adedf 100644 --- a/src/common/lib/client/realtimechannel.ts +++ b/src/common/lib/client/realtimechannel.ts @@ -569,34 +569,17 @@ class RealtimeChannel extends EventEmitter { if (!message.presence) break; // eslint-disable-next-line no-fallthrough case actions.PRESENCE: { - const presence = message.presence; + const presenceMessages = message.presence; - if (!presence) { + if (!presenceMessages) { break; } - const { id, connectionId, timestamp } = message; - const options = this.channelOptions; - let presenceMsg: PresenceMessage; - for (let i = 0; i < presence.length; i++) { - try { - presenceMsg = presence[i]; - await decodePresenceMessage(presenceMsg, options); - if (!presenceMsg.connectionId) presenceMsg.connectionId = connectionId; - if (!presenceMsg.timestamp) presenceMsg.timestamp = timestamp; - if (!presenceMsg.id) presenceMsg.id = id + ':' + i; - } catch (e) { - Logger.logAction( - this.logger, - Logger.LOG_ERROR, - 'RealtimeChannel.processMessage()', - (e as Error).toString(), - ); - } - } + await this._decodeAndPrepareMessages(message, presenceMessages, (msg) => decodePresenceMessage(msg, options)); + if (this._presence) { - this._presence.setPresence(presence, isSync, syncChannelSerial as any); + this._presence.setPresence(presenceMessages, isSync, syncChannelSerial as any); } break; } @@ -620,10 +603,7 @@ class RealtimeChannel extends EventEmitter { const messages = message.messages as Array, firstMessage = messages[0], - lastMessage = messages[messages.length - 1], - id = message.id, - connectionId = message.connectionId, - timestamp = message.timestamp; + lastMessage = messages[messages.length - 1]; if ( firstMessage.extras && @@ -641,36 +621,37 @@ class RealtimeChannel extends EventEmitter { break; } - for (let i = 0; i < messages.length; i++) { - const msg = messages[i]; - try { - await decodeMessage(msg, this._decodingContext); - } catch (e) { + const { unrecoverableError } = await this._decodeAndPrepareMessages( + message, + messages, + (msg) => decodeMessage(msg, this._decodingContext), + (e) => { /* decrypt failed .. the most likely cause is that we have the wrong key */ - Logger.logAction( - this.logger, - Logger.LOG_ERROR, - 'RealtimeChannel.processMessage()', - (e as Error).toString(), - ); - switch ((e as ErrorInfo).code) { + const errorInfo = e as ErrorInfo; + + switch (errorInfo.code) { case 40018: /* decode failure */ - this._startDecodeFailureRecovery(e as ErrorInfo); - return; + this._startDecodeFailureRecovery(errorInfo); + return { unrecoverableError: true }; + case 40019: /* No vcdiff plugin passed in - no point recovering, give up */ // eslint-disable-next-line no-fallthrough case 40021: /* Browser does not support deltas, similarly no point recovering */ - this.notifyState('failed', e as ErrorInfo); - return; + this.notifyState('failed', errorInfo); + return { unrecoverableError: true }; + + default: + return { unrecoverableError: false }; } - } - if (!msg.connectionId) msg.connectionId = connectionId; - if (!msg.timestamp) msg.timestamp = timestamp; - if (!msg.id) msg.id = id + ':' + i; + }, + ); + if (unrecoverableError) { + return; } + this._lastPayload.messageId = lastMessage.id; this._lastPayload.protocolMessageChannelSerial = message.channelSerial; this.onEvent(messages); @@ -700,6 +681,51 @@ class RealtimeChannel extends EventEmitter { } } + /** + * Mutates provided messages by adding `connectionId`, `timestamp` and `id` fields, and decoding message data. + * + * @returns `unrecoverableError` flag. If `true` indicates that unrecoverable error was encountered during message decoding + * and any further message processing should be stopped. Always equals to `false` if `decodeErrorRecoveryHandler` was not provided + */ + private async _decodeAndPrepareMessages( + protocolMessage: ProtocolMessage, + messages: T[], + decodeFn: (msg: T) => Promise, + decodeErrorRecoveryHandler?: (e: Error) => { unrecoverableError: boolean }, + ): Promise<{ unrecoverableError: boolean }> { + const { id, connectionId, timestamp } = protocolMessage; + + for (let i = 0; i < messages.length; i++) { + try { + const msg = messages[i]; + + // decode underlying data for a message + await decodeFn(msg); + + if (!msg.connectionId) msg.connectionId = connectionId; + if (!msg.timestamp) msg.timestamp = timestamp; + if (!msg.id) msg.id = id + ':' + i; + } catch (e) { + Logger.logAction( + this.logger, + Logger.LOG_ERROR, + 'RealtimeChannel.decodeAndPrepareMessages()', + (e as Error).toString(), + ); + + if (decodeErrorRecoveryHandler) { + const { unrecoverableError } = decodeErrorRecoveryHandler(e as Error); + if (unrecoverableError) { + // break out of for loop by returning + return { unrecoverableError: true }; + } + } + } + } + + return { unrecoverableError: false }; + } + _startDecodeFailureRecovery(reason: ErrorInfo): void { if (!this._lastPayload.decodeFailureRecoveryInProgress) { Logger.logAction(