Skip to content

Commit

Permalink
Extract common code for message processing in RealtimeChannel to a se…
Browse files Browse the repository at this point in the history
…parate function

Resolves #1907
  • Loading branch information
VeskeR committed Oct 25, 2024
1 parent 0a3ff83 commit ac0d8c0
Showing 1 changed file with 86 additions and 101 deletions.
187 changes: 86 additions & 101 deletions src/common/lib/client/realtimechannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import { ChannelOptions } from '../../types/channel';
import { normaliseChannelOptions } from '../util/defaults';
import { PaginatedResult } from './paginatedresource';
import type { PushChannel } from 'plugins/push';
import type { LiveObjects } from 'plugins/liveobjects';
import type { LiveObjects, StateMessage } from 'plugins/liveobjects';

interface RealtimeHistoryParams {
start?: number;
Expand Down Expand Up @@ -589,100 +589,42 @@ class RealtimeChannel extends EventEmitter {
if (!message.presence) break;
// eslint-disable-next-line no-fallthrough
case actions.PRESENCE: {
const presence = message.presence;
const presenceMessages = message.presence;

if (!presence) {
if (!presenceMessages) {
break;
}

const { id, connectionId, timestamp } = message;

const options = this.channelOptions;
let presenceMsg: PresenceMessage;
for (let i = 0; i < presence.length; i++) {
try {
presenceMsg = presence[i];
await decodePresenceMessage(presenceMsg, options);
if (!presenceMsg.connectionId) presenceMsg.connectionId = connectionId;
if (!presenceMsg.timestamp) presenceMsg.timestamp = timestamp;
if (!presenceMsg.id) presenceMsg.id = id + ':' + i;
} catch (e) {
Logger.logAction(
this.logger,
Logger.LOG_ERROR,
'RealtimeChannel.processMessage()',
(e as Error).toString(),
);
}
}
if (this._presence) {
this._presence.setPresence(presence, isSync, syncChannelSerial as any);
}
break;
}

case actions.STATE: {
if (!this._liveObjects) {
return;
}

const { id, connectionId, timestamp } = message;
const options = this.channelOptions;
await this._decodeAndPrepareMessages(message, presenceMessages, (msg) => decodePresenceMessage(msg, options));

const stateMessages = message.state ?? [];
for (let i = 0; i < stateMessages.length; i++) {
try {
const stateMessage = stateMessages[i];

await this.client._LiveObjectsPlugin?.StateMessage.decode(stateMessage, options, decodeData);

if (!stateMessage.connectionId) stateMessage.connectionId = connectionId;
if (!stateMessage.timestamp) stateMessage.timestamp = timestamp;
if (!stateMessage.id) stateMessage.id = id + ':' + i;
} catch (e) {
Logger.logAction(
this.logger,
Logger.LOG_ERROR,
'RealtimeChannel.processMessage()',
(e as Error).toString(),
);
}
if (this._presence) {
this._presence.setPresence(presenceMessages, isSync, syncChannelSerial as any);
}

this._liveObjects.handleStateMessages(stateMessages, message.channelSerial);

break;
}

// STATE and STATE_SYNC message processing share most of the logic, so group them together
case actions.STATE:
case actions.STATE_SYNC: {
if (!this._liveObjects) {
return;
}

const { id, connectionId, timestamp } = message;
const stateMessages = message.state ?? [];
const options = this.channelOptions;

const stateMessages = message.state ?? [];
for (let i = 0; i < stateMessages.length; i++) {
try {
const stateMessage = stateMessages[i];

await this.client._LiveObjectsPlugin?.StateMessage.decode(stateMessage, options, decodeData);

if (!stateMessage.connectionId) stateMessage.connectionId = connectionId;
if (!stateMessage.timestamp) stateMessage.timestamp = timestamp;
if (!stateMessage.id) stateMessage.id = id + ':' + i;
} catch (e) {
Logger.logAction(
this.logger,
Logger.LOG_ERROR,
'RealtimeChannel.processMessage()',
(e as Error).toString(),
);
}
}
await this._decodeAndPrepareMessages(message, stateMessages, (msg) =>
this.client._LiveObjectsPlugin
? this.client._LiveObjectsPlugin.StateMessage.decode(msg, options, decodeData)
: Promise.resolve(),
);

this._liveObjects.handleStateSyncMessages(stateMessages, message.channelSerial);
if (message.action === actions.STATE) {
this._liveObjects.handleStateMessages(stateMessages, message.channelSerial);
} else {
this._liveObjects.handleStateSyncMessages(stateMessages, message.channelSerial);
}

break;
}
Expand All @@ -707,10 +649,7 @@ class RealtimeChannel extends EventEmitter {

const messages = message.messages as Array<Message>,
firstMessage = messages[0],
lastMessage = messages[messages.length - 1],
id = message.id,
connectionId = message.connectionId,
timestamp = message.timestamp;
lastMessage = messages[messages.length - 1];

if (
firstMessage.extras &&
Expand All @@ -728,36 +667,37 @@ class RealtimeChannel extends EventEmitter {
break;
}

for (let i = 0; i < messages.length; i++) {
const msg = messages[i];
try {
await decodeMessage(msg, this._decodingContext);
} catch (e) {
const { unrecoverableError } = await this._decodeAndPrepareMessages(
message,
messages,
(msg) => decodeMessage(msg, this._decodingContext),
(e) => {
/* decrypt failed .. the most likely cause is that we have the wrong key */
Logger.logAction(
this.logger,
Logger.LOG_ERROR,
'RealtimeChannel.processMessage()',
(e as Error).toString(),
);
switch ((e as ErrorInfo).code) {
const errorInfo = e as ErrorInfo;

switch (errorInfo.code) {
case 40018:
/* decode failure */
this._startDecodeFailureRecovery(e as ErrorInfo);
return;
this._startDecodeFailureRecovery(errorInfo);
return { unrecoverableError: true };

case 40019:
/* No vcdiff plugin passed in - no point recovering, give up */
// eslint-disable-next-line no-fallthrough
case 40021:
/* Browser does not support deltas, similarly no point recovering */
this.notifyState('failed', e as ErrorInfo);
return;
this.notifyState('failed', errorInfo);
return { unrecoverableError: true };

default:
return { unrecoverableError: false };
}
}
if (!msg.connectionId) msg.connectionId = connectionId;
if (!msg.timestamp) msg.timestamp = timestamp;
if (!msg.id) msg.id = id + ':' + i;
},
);
if (unrecoverableError) {
return;
}

this._lastPayload.messageId = lastMessage.id;
this._lastPayload.protocolMessageChannelSerial = message.channelSerial;
this.onEvent(messages);
Expand Down Expand Up @@ -787,6 +727,51 @@ class RealtimeChannel extends EventEmitter {
}
}

/**
* Mutates provided messages by adding `connectionId`, `timestamp` and `id` fields, and decoding message data.
*
* @returns `unrecoverableError` flag. If `true` indicates that unrecoverable error was encountered during message decoding
* and any further message processing should be stopped. Always equals to `false` if `decodeErrorRecoveryHandler` was not provided
*/
private async _decodeAndPrepareMessages<T extends Message | PresenceMessage | StateMessage>(
protocolMessage: ProtocolMessage,
messages: T[],
decodeFn: (msg: T) => Promise<void>,
decodeErrorRecoveryHandler?: (e: Error) => { unrecoverableError: boolean },
): Promise<{ unrecoverableError: boolean }> {
const { id, connectionId, timestamp } = protocolMessage;

for (let i = 0; i < messages.length; i++) {
try {
const msg = messages[i];

// decode underlying data for a message
await decodeFn(msg);

if (!msg.connectionId) msg.connectionId = connectionId;
if (!msg.timestamp) msg.timestamp = timestamp;
if (!msg.id) msg.id = id + ':' + i;
} catch (e) {
Logger.logAction(
this.logger,
Logger.LOG_ERROR,
'RealtimeChannel.decodeAndPrepareMessages()',
(e as Error).toString(),
);

if (decodeErrorRecoveryHandler) {
const { unrecoverableError } = decodeErrorRecoveryHandler(e as Error);
if (unrecoverableError) {
// break out of for loop by returning
return { unrecoverableError: true };
}
}
}
}

return { unrecoverableError: false };
}

_startDecodeFailureRecovery(reason: ErrorInfo): void {
if (!this._lastPayload.decodeFailureRecoveryInProgress) {
Logger.logAction(
Expand Down

0 comments on commit ac0d8c0

Please sign in to comment.