diff --git a/src/common/lib/client/realtimechannel.ts b/src/common/lib/client/realtimechannel.ts index b7ea4e577..a3b7a5d13 100644 --- a/src/common/lib/client/realtimechannel.ts +++ b/src/common/lib/client/realtimechannel.ts @@ -30,7 +30,7 @@ import { ChannelOptions } from '../../types/channel'; import { normaliseChannelOptions } from '../util/defaults'; import { PaginatedResult } from './paginatedresource'; import type { PushChannel } from 'plugins/push'; -import type { LiveObjects } from 'plugins/liveobjects'; +import type { LiveObjects, StateMessage } from 'plugins/liveobjects'; interface RealtimeHistoryParams { start?: number; @@ -589,100 +589,42 @@ class RealtimeChannel extends EventEmitter { if (!message.presence) break; // eslint-disable-next-line no-fallthrough case actions.PRESENCE: { - const presence = message.presence; + const presenceMessages = message.presence; - if (!presence) { + if (!presenceMessages) { break; } - const { id, connectionId, timestamp } = message; - - const options = this.channelOptions; - let presenceMsg: PresenceMessage; - for (let i = 0; i < presence.length; i++) { - try { - presenceMsg = presence[i]; - await decodePresenceMessage(presenceMsg, options); - if (!presenceMsg.connectionId) presenceMsg.connectionId = connectionId; - if (!presenceMsg.timestamp) presenceMsg.timestamp = timestamp; - if (!presenceMsg.id) presenceMsg.id = id + ':' + i; - } catch (e) { - Logger.logAction( - this.logger, - Logger.LOG_ERROR, - 'RealtimeChannel.processMessage()', - (e as Error).toString(), - ); - } - } - if (this._presence) { - this._presence.setPresence(presence, isSync, syncChannelSerial as any); - } - break; - } - - case actions.STATE: { - if (!this._liveObjects) { - return; - } - - const { id, connectionId, timestamp } = message; const options = this.channelOptions; + await this._decodeAndPrepareMessages(message, presenceMessages, (msg) => decodePresenceMessage(msg, options)); - const stateMessages = message.state ?? []; - for (let i = 0; i < stateMessages.length; i++) { - try { - const stateMessage = stateMessages[i]; - - await this.client._LiveObjectsPlugin?.StateMessage.decode(stateMessage, options, decodeData); - - if (!stateMessage.connectionId) stateMessage.connectionId = connectionId; - if (!stateMessage.timestamp) stateMessage.timestamp = timestamp; - if (!stateMessage.id) stateMessage.id = id + ':' + i; - } catch (e) { - Logger.logAction( - this.logger, - Logger.LOG_ERROR, - 'RealtimeChannel.processMessage()', - (e as Error).toString(), - ); - } + if (this._presence) { + this._presence.setPresence(presenceMessages, isSync, syncChannelSerial as any); } - - this._liveObjects.handleStateMessages(stateMessages, message.channelSerial); - break; } + // STATE and STATE_SYNC message processing share most of the logic, so group them together + case actions.STATE: case actions.STATE_SYNC: { if (!this._liveObjects) { return; } - const { id, connectionId, timestamp } = message; + const stateMessages = message.state ?? []; const options = this.channelOptions; - const stateMessages = message.state ?? []; - for (let i = 0; i < stateMessages.length; i++) { - try { - const stateMessage = stateMessages[i]; - - await this.client._LiveObjectsPlugin?.StateMessage.decode(stateMessage, options, decodeData); - - if (!stateMessage.connectionId) stateMessage.connectionId = connectionId; - if (!stateMessage.timestamp) stateMessage.timestamp = timestamp; - if (!stateMessage.id) stateMessage.id = id + ':' + i; - } catch (e) { - Logger.logAction( - this.logger, - Logger.LOG_ERROR, - 'RealtimeChannel.processMessage()', - (e as Error).toString(), - ); - } - } + await this._decodeAndPrepareMessages(message, stateMessages, (msg) => + this.client._LiveObjectsPlugin + ? this.client._LiveObjectsPlugin.StateMessage.decode(msg, options, decodeData) + : Promise.resolve(), + ); - this._liveObjects.handleStateSyncMessages(stateMessages, message.channelSerial); + if (message.action === actions.STATE) { + this._liveObjects.handleStateMessages(stateMessages, message.channelSerial); + } else { + this._liveObjects.handleStateSyncMessages(stateMessages, message.channelSerial); + } break; } @@ -707,10 +649,7 @@ class RealtimeChannel extends EventEmitter { const messages = message.messages as Array, firstMessage = messages[0], - lastMessage = messages[messages.length - 1], - id = message.id, - connectionId = message.connectionId, - timestamp = message.timestamp; + lastMessage = messages[messages.length - 1]; if ( firstMessage.extras && @@ -728,36 +667,37 @@ class RealtimeChannel extends EventEmitter { break; } - for (let i = 0; i < messages.length; i++) { - const msg = messages[i]; - try { - await decodeMessage(msg, this._decodingContext); - } catch (e) { + const { unrecoverableError } = await this._decodeAndPrepareMessages( + message, + messages, + (msg) => decodeMessage(msg, this._decodingContext), + (e) => { /* decrypt failed .. the most likely cause is that we have the wrong key */ - Logger.logAction( - this.logger, - Logger.LOG_ERROR, - 'RealtimeChannel.processMessage()', - (e as Error).toString(), - ); - switch ((e as ErrorInfo).code) { + const errorInfo = e as ErrorInfo; + + switch (errorInfo.code) { case 40018: /* decode failure */ - this._startDecodeFailureRecovery(e as ErrorInfo); - return; + this._startDecodeFailureRecovery(errorInfo); + return { unrecoverableError: true }; + case 40019: /* No vcdiff plugin passed in - no point recovering, give up */ // eslint-disable-next-line no-fallthrough case 40021: /* Browser does not support deltas, similarly no point recovering */ - this.notifyState('failed', e as ErrorInfo); - return; + this.notifyState('failed', errorInfo); + return { unrecoverableError: true }; + + default: + return { unrecoverableError: false }; } - } - if (!msg.connectionId) msg.connectionId = connectionId; - if (!msg.timestamp) msg.timestamp = timestamp; - if (!msg.id) msg.id = id + ':' + i; + }, + ); + if (unrecoverableError) { + return; } + this._lastPayload.messageId = lastMessage.id; this._lastPayload.protocolMessageChannelSerial = message.channelSerial; this.onEvent(messages); @@ -787,6 +727,51 @@ class RealtimeChannel extends EventEmitter { } } + /** + * Mutates provided messages by adding `connectionId`, `timestamp` and `id` fields, and decoding message data. + * + * @returns `unrecoverableError` flag. If `true` indicates that unrecoverable error was encountered during message decoding + * and any further message processing should be stopped. Always equals to `false` if `decodeErrorRecoveryHandler` was not provided + */ + private async _decodeAndPrepareMessages( + protocolMessage: ProtocolMessage, + messages: T[], + decodeFn: (msg: T) => Promise, + decodeErrorRecoveryHandler?: (e: Error) => { unrecoverableError: boolean }, + ): Promise<{ unrecoverableError: boolean }> { + const { id, connectionId, timestamp } = protocolMessage; + + for (let i = 0; i < messages.length; i++) { + try { + const msg = messages[i]; + + // decode underlying data for a message + await decodeFn(msg); + + if (!msg.connectionId) msg.connectionId = connectionId; + if (!msg.timestamp) msg.timestamp = timestamp; + if (!msg.id) msg.id = id + ':' + i; + } catch (e) { + Logger.logAction( + this.logger, + Logger.LOG_ERROR, + 'RealtimeChannel.decodeAndPrepareMessages()', + (e as Error).toString(), + ); + + if (decodeErrorRecoveryHandler) { + const { unrecoverableError } = decodeErrorRecoveryHandler(e as Error); + if (unrecoverableError) { + // break out of for loop by returning + return { unrecoverableError: true }; + } + } + } + } + + return { unrecoverableError: false }; + } + _startDecodeFailureRecovery(reason: ErrorInfo): void { if (!this._lastPayload.decodeFailureRecoveryInProgress) { Logger.logAction(