diff --git a/ably.d.ts b/ably.d.ts index 18b7ee720..2f3dd499a 100644 --- a/ably.d.ts +++ b/ably.d.ts @@ -871,6 +871,14 @@ declare namespace ChannelModes { * The client can receive presence messages. */ type PRESENCE_SUBSCRIBE = 'PRESENCE_SUBSCRIBE'; + /** + * The client can publish LiveObjects state messages. + */ + type STATE_PUBLISH = 'STATE_PUBLISH'; + /** + * The client can receive LiveObjects state messages. + */ + type STATE_SUBSCRIBE = 'STATE_SUBSCRIBE'; /** * The client is resuming an existing connection. */ @@ -885,6 +893,8 @@ export type ChannelMode = | ChannelModes.SUBSCRIBE | ChannelModes.PRESENCE | ChannelModes.PRESENCE_SUBSCRIBE + | ChannelModes.STATE_PUBLISH + | ChannelModes.STATE_SUBSCRIBE | ChannelModes.ATTACH_RESUME; /** diff --git a/scripts/moduleReport.ts b/scripts/moduleReport.ts index 671f75ce8..e260f0783 100644 --- a/scripts/moduleReport.ts +++ b/scripts/moduleReport.ts @@ -6,7 +6,7 @@ import { gzip } from 'zlib'; import Table from 'cli-table'; // The maximum size we allow for a minimal useful Realtime bundle (i.e. one that can subscribe to a channel) -const minimalUsefulRealtimeBundleSizeThresholdsKiB = { raw: 99, gzip: 30 }; +const minimalUsefulRealtimeBundleSizeThresholdsKiB = { raw: 100, gzip: 31 }; const baseClientNames = ['BaseRest', 'BaseRealtime']; @@ -314,6 +314,8 @@ async function checkLiveObjectsPluginFiles() { 'src/plugins/liveobjects/liveobject.ts', 'src/plugins/liveobjects/liveobjects.ts', 'src/plugins/liveobjects/liveobjectspool.ts', + 'src/plugins/liveobjects/statemessage.ts', + 'src/plugins/liveobjects/syncliveobjectsdatapool.ts', ]); return checkBundleFiles(pluginBundleInfo, allowedFiles, 100); diff --git a/src/common/lib/client/baserealtime.ts b/src/common/lib/client/baserealtime.ts index 8afbd429d..d6b7c35e9 100644 --- a/src/common/lib/client/baserealtime.ts +++ b/src/common/lib/client/baserealtime.ts @@ -13,12 +13,14 @@ import { ModularPlugins, RealtimePresencePlugin } from './modularplugins'; import { TransportNames } from 'common/constants/TransportName'; import { TransportImplementations } from 'common/platform'; import Defaults from '../util/defaults'; +import type * as LiveObjectsPlugin from 'plugins/liveobjects'; /** `BaseRealtime` is an export of the tree-shakable version of the SDK, and acts as the base class for the `DefaultRealtime` class exported by the non tree-shakable version. */ class BaseRealtime extends BaseClient { readonly _RealtimePresence: RealtimePresencePlugin | null; + readonly _LiveObjectsPlugin: typeof LiveObjectsPlugin | null; // Extra transport implementations available to this client, in addition to those in Platform.Transports.bundledImplementations readonly _additionalTransportImplementations: TransportImplementations; _channels: any; @@ -58,6 +60,7 @@ class BaseRealtime extends BaseClient { this._additionalTransportImplementations = BaseRealtime.transportImplementationsFromPlugins(this.options.plugins); this._RealtimePresence = this.options.plugins?.RealtimePresence ?? null; + this._LiveObjectsPlugin = this.options.plugins?.LiveObjects ?? null; this.connection = new Connection(this, this.options); this._channels = new Channels(this); if (this.options.autoConnect !== false) this.connect(); diff --git a/src/common/lib/client/realtimechannel.ts b/src/common/lib/client/realtimechannel.ts index 1a2554f6b..928650ad6 100644 --- a/src/common/lib/client/realtimechannel.ts +++ b/src/common/lib/client/realtimechannel.ts @@ -12,6 +12,7 @@ import Message, { fromValuesArray as messagesFromValuesArray, encodeArray as encodeMessagesArray, decode as decodeMessage, + decodeData, getMessagesSize, CipherOptions, EncodingDecodingContext, @@ -533,12 +534,18 @@ class RealtimeChannel extends EventEmitter { const resumed = message.hasFlag('RESUMED'); const hasPresence = message.hasFlag('HAS_PRESENCE'); const hasBacklog = message.hasFlag('HAS_BACKLOG'); + const hasState = message.hasFlag('HAS_STATE'); if (this.state === 'attached') { if (!resumed) { - /* On a loss of continuity, the presence set needs to be re-synced */ + // we have lost continuity. + // the presence set needs to be re-synced if (this._presence) { this._presence.onAttached(hasPresence); } + // the Live Objects state needs to be re-synced + if (this._liveObjects) { + this._liveObjects.onAttached(hasState); + } } const change = new ChannelStateChange(this.state, this.state, resumed, hasBacklog, message.error); this._allChannelChanges.emit('update', change); @@ -549,7 +556,7 @@ class RealtimeChannel extends EventEmitter { /* RTL5i: re-send DETACH and remain in the 'detaching' state */ this.checkPendingState(); } else { - this.notifyState('attached', message.error, resumed, hasPresence, hasBacklog); + this.notifyState('attached', message.error, resumed, hasPresence, hasBacklog, hasState); } break; } @@ -613,6 +620,40 @@ class RealtimeChannel extends EventEmitter { } break; } + + case actions.STATE_SYNC: { + if (!this._liveObjects) { + return; + } + + const { id, connectionId, timestamp } = message; + const options = this.channelOptions; + + const stateMessages = message.state ?? []; + for (let i = 0; i < stateMessages.length; i++) { + try { + const stateMessage = stateMessages[i]; + + await this.client._LiveObjectsPlugin?.StateMessage.decode(stateMessage, options, decodeData); + + if (!stateMessage.connectionId) stateMessage.connectionId = connectionId; + if (!stateMessage.timestamp) stateMessage.timestamp = timestamp; + if (!stateMessage.id) stateMessage.id = id + ':' + i; + } catch (e) { + Logger.logAction( + this.logger, + Logger.LOG_ERROR, + 'RealtimeChannel.processMessage()', + (e as Error).toString(), + ); + } + } + + this._liveObjects.handleStateSyncMessage(stateMessages, message.channelSerial); + + break; + } + case actions.MESSAGE: { //RTL17 if (this.state !== 'attached') { @@ -743,6 +784,7 @@ class RealtimeChannel extends EventEmitter { resumed?: boolean, hasPresence?: boolean, hasBacklog?: boolean, + hasState?: boolean, ): void { Logger.logAction( this.logger, @@ -763,6 +805,9 @@ class RealtimeChannel extends EventEmitter { if (this._presence) { this._presence.actOnChannelState(state, hasPresence, reason); } + if (this._liveObjects) { + this._liveObjects.actOnChannelState(state, hasState); + } if (state === 'suspended' && this.connectionManager.state.sendEvents) { this.startRetryTimer(); } else { diff --git a/src/common/lib/transport/comettransport.ts b/src/common/lib/transport/comettransport.ts index ab468b6cc..00ecb804d 100644 --- a/src/common/lib/transport/comettransport.ts +++ b/src/common/lib/transport/comettransport.ts @@ -353,7 +353,11 @@ abstract class CometTransport extends Transport { if (items && items.length) for (let i = 0; i < items.length; i++) this.onProtocolMessage( - protocolMessageFromDeserialized(items[i], this.connectionManager.realtime._RealtimePresence), + protocolMessageFromDeserialized( + items[i], + this.connectionManager.realtime._RealtimePresence, + this.connectionManager.realtime._LiveObjectsPlugin, + ), ); } catch (e) { Logger.logAction( diff --git a/src/common/lib/transport/connectionmanager.ts b/src/common/lib/transport/connectionmanager.ts index 8ef56dcc2..56364d6c9 100644 --- a/src/common/lib/transport/connectionmanager.ts +++ b/src/common/lib/transport/connectionmanager.ts @@ -1805,7 +1805,8 @@ class ConnectionManager extends EventEmitter { Logger.LOG_MICRO, 'ConnectionManager.send()', - 'queueing msg; ' + stringifyProtocolMessage(msg, this.realtime._RealtimePresence), + 'queueing msg; ' + + stringifyProtocolMessage(msg, this.realtime._RealtimePresence, this.realtime._LiveObjectsPlugin), ); } this.queue(msg, callback); diff --git a/src/common/lib/transport/protocol.ts b/src/common/lib/transport/protocol.ts index cb91a1db2..88a5947e4 100644 --- a/src/common/lib/transport/protocol.ts +++ b/src/common/lib/transport/protocol.ts @@ -77,7 +77,11 @@ class Protocol extends EventEmitter { Logger.LOG_MICRO, 'Protocol.send()', 'sending msg; ' + - stringifyProtocolMessage(pendingMessage.message, this.transport.connectionManager.realtime._RealtimePresence), + stringifyProtocolMessage( + pendingMessage.message, + this.transport.connectionManager.realtime._RealtimePresence, + this.transport.connectionManager.realtime._LiveObjectsPlugin, + ), ); } pendingMessage.sendAttempted = true; diff --git a/src/common/lib/transport/transport.ts b/src/common/lib/transport/transport.ts index 23ad1ec05..3f298e24f 100644 --- a/src/common/lib/transport/transport.ts +++ b/src/common/lib/transport/transport.ts @@ -128,7 +128,11 @@ abstract class Transport extends EventEmitter { 'received on ' + this.shortName + ': ' + - stringifyProtocolMessage(message, this.connectionManager.realtime._RealtimePresence) + + stringifyProtocolMessage( + message, + this.connectionManager.realtime._RealtimePresence, + this.connectionManager.realtime._LiveObjectsPlugin, + ) + '; connectionId = ' + this.connectionManager.connectionId, ); diff --git a/src/common/lib/transport/websockettransport.ts b/src/common/lib/transport/websockettransport.ts index 3e12f1d67..a85a6f077 100644 --- a/src/common/lib/transport/websockettransport.ts +++ b/src/common/lib/transport/websockettransport.ts @@ -140,6 +140,7 @@ class WebSocketTransport extends Transport { data, this.connectionManager.realtime._MsgPack, this.connectionManager.realtime._RealtimePresence, + this.connectionManager.realtime._LiveObjectsPlugin, this.format, ), ); diff --git a/src/common/lib/types/message.ts b/src/common/lib/types/message.ts index 7cc8b80ac..c2a1f1cdb 100644 --- a/src/common/lib/types/message.ts +++ b/src/common/lib/types/message.ts @@ -154,17 +154,36 @@ export async function decode( message: Message | PresenceMessage, inputContext: CipherOptions | EncodingDecodingContext | ChannelOptions, ): Promise { + const { data, encoding, error } = await decodeData(message.data, message.encoding, inputContext); + message.data = data; + message.encoding = encoding; + + if (error) { + throw error; + } +} + +export async function decodeData( + data: any, + encoding: string | null | undefined, + inputContext: CipherOptions | EncodingDecodingContext | ChannelOptions, +): Promise<{ + error?: ErrorInfo; + data: any; + encoding: string | null | undefined; +}> { const context = normaliseContext(inputContext); + let lastPayload = data; + let decodedData = data; + let finalEncoding: string | null | undefined = encoding; + let decodingError: ErrorInfo | undefined = undefined; - let lastPayload = message.data; - const encoding = message.encoding; if (encoding) { const xforms = encoding.split('/'); - let lastProcessedEncodingIndex, - encodingsToProcess = xforms.length, - data = message.data; - + let lastProcessedEncodingIndex; + let encodingsToProcess = xforms.length; let xform = ''; + try { while ((lastProcessedEncodingIndex = encodingsToProcess) > 0) { // eslint-disable-next-line security/detect-unsafe-regex @@ -173,16 +192,16 @@ export async function decode( xform = match[1]; switch (xform) { case 'base64': - data = Platform.BufferUtils.base64Decode(String(data)); + decodedData = Platform.BufferUtils.base64Decode(String(decodedData)); if (lastProcessedEncodingIndex == xforms.length) { - lastPayload = data; + lastPayload = decodedData; } continue; case 'utf-8': - data = Platform.BufferUtils.utf8Decode(data); + decodedData = Platform.BufferUtils.utf8Decode(decodedData); continue; case 'json': - data = JSON.parse(data); + decodedData = JSON.parse(decodedData); continue; case 'cipher': if ( @@ -196,7 +215,7 @@ export async function decode( if (xformAlgorithm != cipher.algorithm) { throw new Error('Unable to decrypt message with given cipher; incompatible cipher params'); } - data = await cipher.decrypt(data); + decodedData = await cipher.decrypt(decodedData); continue; } else { throw new Error('Unable to decrypt message; not an encrypted channel'); @@ -220,10 +239,12 @@ export async function decode( // vcdiff expects Uint8Arrays, can't copy with ArrayBuffers. const deltaBaseBuffer = Platform.BufferUtils.toBuffer(deltaBase as Buffer); - data = Platform.BufferUtils.toBuffer(data); + decodedData = Platform.BufferUtils.toBuffer(decodedData); - data = Platform.BufferUtils.arrayBufferViewToBuffer(context.plugins.vcdiff.decode(data, deltaBaseBuffer)); - lastPayload = data; + decodedData = Platform.BufferUtils.arrayBufferViewToBuffer( + context.plugins.vcdiff.decode(decodedData, deltaBaseBuffer), + ); + lastPayload = decodedData; } catch (e) { throw new ErrorInfo('Vcdiff delta decode failed with ' + e, 40018, 400); } @@ -234,18 +255,30 @@ export async function decode( } } catch (e) { const err = e as ErrorInfo; - throw new ErrorInfo( - 'Error processing the ' + xform + ' encoding, decoder returned ‘' + err.message + '’', + decodingError = new ErrorInfo( + `Error processing the ${xform} encoding, decoder returned ‘${err.message}’`, err.code || 40013, 400, ); } finally { - message.encoding = + finalEncoding = (lastProcessedEncodingIndex as number) <= 0 ? null : xforms.slice(0, lastProcessedEncodingIndex).join('/'); - message.data = data; } } + + if (decodingError) { + return { + error: decodingError, + data: decodedData, + encoding: finalEncoding, + }; + } + context.baseEncodedPreviousPayload = lastPayload; + return { + data: decodedData, + encoding: finalEncoding, + }; } export async function fromResponseBody( diff --git a/src/common/lib/types/protocolmessage.ts b/src/common/lib/types/protocolmessage.ts index eaa622a8d..74a1e64db 100644 --- a/src/common/lib/types/protocolmessage.ts +++ b/src/common/lib/types/protocolmessage.ts @@ -8,6 +8,8 @@ import PresenceMessage, { fromValues as presenceMessageFromValues, fromValuesArray as presenceMessagesFromValuesArray, } from './presencemessage'; +import type * as LiveObjectsPlugin from 'plugins/liveobjects'; +import Platform from '../../platform'; export const actions = { HEARTBEAT: 0, @@ -29,6 +31,8 @@ export const actions = { SYNC: 16, AUTH: 17, ACTIVATE: 18, + STATE: 19, + STATE_SYNC: 20, }; export const ActionName: string[] = []; @@ -48,9 +52,18 @@ const flags: { [key: string]: number } = { PUBLISH: 1 << 17, SUBSCRIBE: 1 << 18, PRESENCE_SUBSCRIBE: 1 << 19, + STATE_SUBSCRIBE: 1 << 24, + STATE_PUBLISH: 1 << 25, + HAS_STATE: 1 << 26, }; const flagNames = Object.keys(flags); -flags.MODE_ALL = flags.PRESENCE | flags.PUBLISH | flags.SUBSCRIBE | flags.PRESENCE_SUBSCRIBE; +flags.MODE_ALL = + flags.PRESENCE | + flags.PUBLISH | + flags.SUBSCRIBE | + flags.PRESENCE_SUBSCRIBE | + flags.STATE_SUBSCRIBE | + flags.STATE_PUBLISH; function toStringArray(array?: any[]): string { const result = []; @@ -62,7 +75,14 @@ function toStringArray(array?: any[]): string { return '[ ' + result.join(', ') + ' ]'; } -export const channelModes = ['PRESENCE', 'PUBLISH', 'SUBSCRIBE', 'PRESENCE_SUBSCRIBE']; +export const channelModes = [ + 'PRESENCE', + 'PUBLISH', + 'SUBSCRIBE', + 'PRESENCE_SUBSCRIBE', + 'STATE_SUBSCRIBE', + 'STATE_PUBLISH', +]; export const serialize = Utils.encodeBody; @@ -70,15 +90,17 @@ export function deserialize( serialized: unknown, MsgPack: MsgPack | null, presenceMessagePlugin: PresenceMessagePlugin | null, + liveObjectsPlugin: typeof LiveObjectsPlugin | null, format?: Utils.Format, ): ProtocolMessage { const deserialized = Utils.decodeBody>(serialized, MsgPack, format); - return fromDeserialized(deserialized, presenceMessagePlugin); + return fromDeserialized(deserialized, presenceMessagePlugin, liveObjectsPlugin); } export function fromDeserialized( deserialized: Record, presenceMessagePlugin: PresenceMessagePlugin | null, + liveObjectsPlugin: typeof LiveObjectsPlugin | null, ): ProtocolMessage { const error = deserialized.error; if (error) deserialized.error = ErrorInfo.fromValues(error as ErrorInfo); @@ -91,21 +113,47 @@ export function fromDeserialized( for (let i = 0; i < presence.length; i++) presence[i] = presenceMessagePlugin.presenceMessageFromValues(presence[i], true); } - return Object.assign(new ProtocolMessage(), { ...deserialized, presence }); + + let state: LiveObjectsPlugin.StateMessage[] | undefined = undefined; + if (liveObjectsPlugin) { + state = deserialized.state as LiveObjectsPlugin.StateMessage[]; + if (state) { + for (let i = 0; i < state.length; i++) { + state[i] = liveObjectsPlugin.StateMessage.fromValues(state[i], Platform); + } + } + } + + return Object.assign(new ProtocolMessage(), { ...deserialized, presence, state }); } /** - * Used by the tests. + * Used internally by the tests. + * + * LiveObjectsPlugin code can't be included as part of the core library to prevent size growth, + * so if a test needs to build Live Object state messages, then it must provide LiveObjectsPlugin. */ -export function fromDeserializedIncludingDependencies(deserialized: Record): ProtocolMessage { - return fromDeserialized(deserialized, { presenceMessageFromValues, presenceMessagesFromValuesArray }); +export function makeFromDeserializedWithDependencies(dependencies?: { + LiveObjectsPlugin: typeof LiveObjectsPlugin | null; +}) { + return (deserialized: Record): ProtocolMessage => { + return fromDeserialized( + deserialized, + { presenceMessageFromValues, presenceMessagesFromValuesArray }, + dependencies?.LiveObjectsPlugin ?? null, + ); + }; } export function fromValues(values: unknown): ProtocolMessage { return Object.assign(new ProtocolMessage(), values); } -export function stringify(msg: any, presenceMessagePlugin: PresenceMessagePlugin | null): string { +export function stringify( + msg: any, + presenceMessagePlugin: PresenceMessagePlugin | null, + liveObjectsPlugin: typeof LiveObjectsPlugin | null, +): string { let result = '[ProtocolMessage'; if (msg.action !== undefined) result += '; action=' + ActionName[msg.action] || msg.action; @@ -119,6 +167,9 @@ export function stringify(msg: any, presenceMessagePlugin: PresenceMessagePlugin if (msg.messages) result += '; messages=' + toStringArray(messagesFromValuesArray(msg.messages)); if (msg.presence && presenceMessagePlugin) result += '; presence=' + toStringArray(presenceMessagePlugin.presenceMessagesFromValuesArray(msg.presence)); + if (msg.state && liveObjectsPlugin) { + result += '; state=' + toStringArray(liveObjectsPlugin.StateMessage.fromValuesArray(msg.state, Platform)); + } if (msg.error) result += '; error=' + ErrorInfo.fromValues(msg.error).toString(); if (msg.auth && msg.auth.accessToken) result += '; token=' + msg.auth.accessToken; if (msg.flags) result += '; flags=' + flagNames.filter(msg.hasFlag).join(','); @@ -150,8 +201,14 @@ class ProtocolMessage { channelSerial?: string | null; msgSerial?: number; messages?: Message[]; - // This will be undefined if we skipped decoding this property due to user not requesting presence functionality — see `fromDeserialized` + /** + * This will be undefined if we skipped decoding this property due to user not requesting Presence functionality — see {@link fromDeserialized} + */ presence?: PresenceMessage[]; + /** + * This will be undefined if we skipped decoding this property due to user not requesting LiveObjects functionality — see {@link fromDeserialized} + */ + state?: LiveObjectsPlugin.StateMessage[]; auth?: unknown; connectionDetails?: Record; diff --git a/src/platform/nativescript/index.ts b/src/platform/nativescript/index.ts index 5a57dbe07..448d513d7 100644 --- a/src/platform/nativescript/index.ts +++ b/src/platform/nativescript/index.ts @@ -3,7 +3,7 @@ import { DefaultRest } from '../../common/lib/client/defaultrest'; import { DefaultRealtime } from '../../common/lib/client/defaultrealtime'; import Platform from '../../common/platform'; import ErrorInfo from '../../common/lib/types/errorinfo'; -import { fromDeserializedIncludingDependencies as protocolMessageFromDeserialized } from '../../common/lib/types/protocolmessage'; +import { makeFromDeserializedWithDependencies as makeProtocolMessageFromDeserialized } from '../../common/lib/types/protocolmessage'; // Platform Specific import BufferUtils from '../web/lib/util/bufferutils'; @@ -52,5 +52,5 @@ export default { Rest: DefaultRest, Realtime: DefaultRealtime, msgpack, - protocolMessageFromDeserialized, + makeProtocolMessageFromDeserialized, }; diff --git a/src/platform/nodejs/index.ts b/src/platform/nodejs/index.ts index 057d412e6..cca312e1e 100644 --- a/src/platform/nodejs/index.ts +++ b/src/platform/nodejs/index.ts @@ -3,7 +3,7 @@ import { DefaultRest } from '../../common/lib/client/defaultrest'; import { DefaultRealtime } from '../../common/lib/client/defaultrealtime'; import Platform from '../../common/platform'; import ErrorInfo from '../../common/lib/types/errorinfo'; -import { fromDeserializedIncludingDependencies as protocolMessageFromDeserialized } from '../../common/lib/types/protocolmessage'; +import { makeFromDeserializedWithDependencies as makeProtocolMessageFromDeserialized } from '../../common/lib/types/protocolmessage'; // Platform Specific import BufferUtils from './lib/util/bufferutils'; @@ -46,5 +46,5 @@ module.exports = { Rest: DefaultRest, Realtime: DefaultRealtime, msgpack: null, - protocolMessageFromDeserialized, + makeProtocolMessageFromDeserialized, }; diff --git a/src/platform/react-native/index.ts b/src/platform/react-native/index.ts index 4153714d3..79914a5b6 100644 --- a/src/platform/react-native/index.ts +++ b/src/platform/react-native/index.ts @@ -3,7 +3,7 @@ import { DefaultRest } from '../../common/lib/client/defaultrest'; import { DefaultRealtime } from '../../common/lib/client/defaultrealtime'; import Platform from '../../common/platform'; import ErrorInfo from '../../common/lib/types/errorinfo'; -import { fromDeserializedIncludingDependencies as protocolMessageFromDeserialized } from '../../common/lib/types/protocolmessage'; +import { makeFromDeserializedWithDependencies as makeProtocolMessageFromDeserialized } from '../../common/lib/types/protocolmessage'; // Platform Specific import BufferUtils from '../web/lib/util/bufferutils'; @@ -55,5 +55,5 @@ export default { Rest: DefaultRest, Realtime: DefaultRealtime, msgpack, - protocolMessageFromDeserialized, + makeProtocolMessageFromDeserialized, }; diff --git a/src/platform/web/index.ts b/src/platform/web/index.ts index f262373c8..f1c0ddd57 100644 --- a/src/platform/web/index.ts +++ b/src/platform/web/index.ts @@ -3,7 +3,7 @@ import { DefaultRest } from '../../common/lib/client/defaultrest'; import { DefaultRealtime } from '../../common/lib/client/defaultrealtime'; import Platform from '../../common/platform'; import ErrorInfo from '../../common/lib/types/errorinfo'; -import { fromDeserializedIncludingDependencies as protocolMessageFromDeserialized } from '../../common/lib/types/protocolmessage'; +import { makeFromDeserializedWithDependencies as makeProtocolMessageFromDeserialized } from '../../common/lib/types/protocolmessage'; // Platform Specific import BufferUtils from './lib/util/bufferutils'; @@ -45,11 +45,12 @@ if (Platform.Config.agent) { Platform.Defaults.agent += ' ' + Platform.Config.agent; } -export { DefaultRest as Rest, DefaultRealtime as Realtime, msgpack, protocolMessageFromDeserialized, ErrorInfo }; +export { DefaultRest as Rest, DefaultRealtime as Realtime, msgpack, makeProtocolMessageFromDeserialized, ErrorInfo }; export default { ErrorInfo, Rest: DefaultRest, Realtime: DefaultRealtime, msgpack, + makeProtocolMessageFromDeserialized, }; diff --git a/src/plugins/liveobjects/index.ts b/src/plugins/liveobjects/index.ts index ff1d234a7..350024ae9 100644 --- a/src/plugins/liveobjects/index.ts +++ b/src/plugins/liveobjects/index.ts @@ -1,7 +1,9 @@ import { LiveObjects } from './liveobjects'; +import { StateMessage } from './statemessage'; -export { LiveObjects }; +export { LiveObjects, StateMessage }; export default { LiveObjects, + StateMessage, }; diff --git a/src/plugins/liveobjects/livecounter.ts b/src/plugins/liveobjects/livecounter.ts index fbc9ac7d9..06398d6e9 100644 --- a/src/plugins/liveobjects/livecounter.ts +++ b/src/plugins/liveobjects/livecounter.ts @@ -1,6 +1,6 @@ -import { LiveObject } from './liveobject'; +import { LiveObject, LiveObjectData } from './liveobject'; -export interface LiveCounterData { +export interface LiveCounterData extends LiveObjectData { data: number; } diff --git a/src/plugins/liveobjects/livemap.ts b/src/plugins/liveobjects/livemap.ts index 8e2696219..b147dcf80 100644 --- a/src/plugins/liveobjects/livemap.ts +++ b/src/plugins/liveobjects/livemap.ts @@ -1,6 +1,5 @@ -import { LiveObject } from './liveobject'; - -export type StateValue = string | number | boolean | Uint8Array; +import { LiveObject, LiveObjectData } from './liveobject'; +import { StateValue } from './statemessage'; export interface ObjectIdStateData { /** @@ -23,7 +22,7 @@ export interface MapEntry { data: StateData; } -export interface LiveMapData { +export interface LiveMapData extends LiveObjectData { data: Map; } diff --git a/src/plugins/liveobjects/liveobject.ts b/src/plugins/liveobjects/liveobject.ts index 945e09ced..d062fec37 100644 --- a/src/plugins/liveobjects/liveobject.ts +++ b/src/plugins/liveobjects/liveobject.ts @@ -1,12 +1,13 @@ import { LiveObjects } from './liveobjects'; -interface LiveObjectData { +export interface LiveObjectData { data: any; } export abstract class LiveObject { protected _dataRef: T; protected _objectId: string; + protected _regionalTimeserial?: string; constructor( protected _liveObjects: LiveObjects, @@ -24,6 +25,27 @@ export abstract class LiveObject { return this._objectId; } + /** + * @internal + */ + getRegionalTimeserial(): string | undefined { + return this._regionalTimeserial; + } + + /** + * @internal + */ + setData(newDataRef: T): void { + this._dataRef = newDataRef; + } + + /** + * @internal + */ + setRegionalTimeserial(regionalTimeserial: string): void { + this._regionalTimeserial = regionalTimeserial; + } + private _createObjectId(): string { // TODO: implement object id generation based on live object type and initial value return Math.random().toString().substring(2); diff --git a/src/plugins/liveobjects/liveobjects.ts b/src/plugins/liveobjects/liveobjects.ts index 9ade0b6b2..bc7ce74a6 100644 --- a/src/plugins/liveobjects/liveobjects.ts +++ b/src/plugins/liveobjects/liveobjects.ts @@ -1,17 +1,28 @@ import type BaseClient from 'common/lib/client/baseclient'; import type RealtimeChannel from 'common/lib/client/realtimechannel'; +import type * as API from '../../../ably'; +import { LiveCounter } from './livecounter'; import { LiveMap } from './livemap'; +import { LiveObject } from './liveobject'; import { LiveObjectsPool, ROOT_OBJECT_ID } from './liveobjectspool'; +import { StateMessage } from './statemessage'; +import { SyncLiveObjectsDataPool } from './syncliveobjectsdatapool'; export class LiveObjects { private _client: BaseClient; private _channel: RealtimeChannel; private _liveObjectsPool: LiveObjectsPool; + private _syncLiveObjectsDataPool: SyncLiveObjectsDataPool; + private _syncInProgress: boolean; + private _currentSyncId: string | undefined; + private _currentSyncCursor: string | undefined; constructor(channel: RealtimeChannel) { this._channel = channel; this._client = channel.client; this._liveObjectsPool = new LiveObjectsPool(this); + this._syncLiveObjectsDataPool = new SyncLiveObjectsDataPool(this); + this._syncInProgress = true; } async getRoot(): Promise { @@ -25,4 +36,142 @@ export class LiveObjects { getPool(): LiveObjectsPool { return this._liveObjectsPool; } + + /** + * @internal + */ + getClient(): BaseClient { + return this._client; + } + + /** + * @internal + */ + handleStateSyncMessage(stateMessages: StateMessage[], syncChannelSerial: string | null | undefined): void { + const { syncId, syncCursor } = this._parseSyncChannelSerial(syncChannelSerial); + if (this._currentSyncId !== syncId) { + this._startNewSync(syncId, syncCursor); + } + + // TODO: delegate state messages to _syncLiveObjectsDataPool and create new live and data objects + + // if this is the last (or only) message in a sequence of sync updates, end the sync + if (!syncCursor) { + this._endSync(); + } + } + + /** + * @internal + */ + onAttached(hasState?: boolean): void { + this._client.Logger.logAction( + this._client.logger, + this._client.Logger.LOG_MINOR, + 'LiveObjects.onAttached()', + 'channel = ' + this._channel.name + ', hasState = ' + hasState, + ); + + if (hasState) { + this._startNewSync(undefined); + } else { + // no HAS_STATE flag received on attach, can end SYNC sequence immediately + // and treat it as no state on a channel + this._liveObjectsPool.reset(); + this._syncLiveObjectsDataPool.reset(); + this._endSync(); + } + } + + /** + * @internal + */ + actOnChannelState(state: API.ChannelState, hasState?: boolean): void { + switch (state) { + case 'attached': + this.onAttached(hasState); + break; + + case 'detached': + case 'failed': + // TODO: do something + break; + + case 'suspended': + // TODO: do something + break; + } + } + + private _startNewSync(syncId?: string, syncCursor?: string): void { + this._syncLiveObjectsDataPool.reset(); + this._currentSyncId = syncId; + this._currentSyncCursor = syncCursor; + this._syncInProgress = true; + } + + private _endSync(): void { + this._applySync(); + this._syncLiveObjectsDataPool.reset(); + this._currentSyncId = undefined; + this._currentSyncCursor = undefined; + this._syncInProgress = false; + } + + private _parseSyncChannelSerial(syncChannelSerial: string | null | undefined): { + syncId: string | undefined; + syncCursor: string | undefined; + } { + let match: RegExpMatchArray | null; + let syncId: string | undefined = undefined; + let syncCursor: string | undefined = undefined; + if (syncChannelSerial && (match = syncChannelSerial.match(/^([\w-]+):(.*)$/))) { + syncId = match[1]; + syncCursor = match[2]; + } + + return { + syncId, + syncCursor, + }; + } + + private _applySync(): void { + if (this._syncLiveObjectsDataPool.isEmpty()) { + return; + } + + const receivedObjectIds = new Set(); + + for (const [objectId, entry] of this._syncLiveObjectsDataPool.entries()) { + receivedObjectIds.add(objectId); + const existingObject = this._liveObjectsPool.get(objectId); + + if (existingObject) { + existingObject.setData(entry.objectData); + existingObject.setRegionalTimeserial(entry.regionalTimeserial); + continue; + } + + let newObject: LiveObject; + switch (entry.objectType) { + case 'LiveCounter': + newObject = new LiveCounter(this, entry.objectData, objectId); + break; + + case 'LiveMap': + newObject = new LiveMap(this, entry.objectData, objectId); + break; + + default: + throw new this._client.ErrorInfo(`Unknown live object type: ${entry.objectType}`, 40000, 400); + } + newObject.setRegionalTimeserial(entry.regionalTimeserial); + + this._liveObjectsPool.set(objectId, newObject); + } + + // need to remove LiveObject instances from the LiveObjectsPool for which objectIds were not received during the SYNC sequence + this._liveObjectsPool.deleteExtraObjectIds([...receivedObjectIds]); + } } diff --git a/src/plugins/liveobjects/liveobjectspool.ts b/src/plugins/liveobjects/liveobjectspool.ts index 3431992c1..5f46c1a0a 100644 --- a/src/plugins/liveobjects/liveobjectspool.ts +++ b/src/plugins/liveobjects/liveobjectspool.ts @@ -1,23 +1,46 @@ +import type BaseClient from 'common/lib/client/baseclient'; import { LiveMap } from './livemap'; import { LiveObject } from './liveobject'; import { LiveObjects } from './liveobjects'; -export type ObjectId = string; export const ROOT_OBJECT_ID = 'root'; +/** + * @internal + */ export class LiveObjectsPool { - private _pool: Map; + private _client: BaseClient; + private _pool: Map; constructor(private _liveObjects: LiveObjects) { + this._client = this._liveObjects.getClient(); this._pool = this._getInitialPool(); } - get(objectId: ObjectId): LiveObject | undefined { + get(objectId: string): LiveObject | undefined { return this._pool.get(objectId); } - private _getInitialPool(): Map { - const pool = new Map(); + /** + * Deletes objects from the pool for which object ids are not found in the provided array of ids. + */ + deleteExtraObjectIds(objectIds: string[]): void { + const poolObjectIds = [...this._pool.keys()]; + const extraObjectIds = this._client.Utils.arrSubtract(poolObjectIds, objectIds); + + extraObjectIds.forEach((x) => this._pool.delete(x)); + } + + set(objectId: string, liveObject: LiveObject): void { + this._pool.set(objectId, liveObject); + } + + reset(): void { + this._pool = this._getInitialPool(); + } + + private _getInitialPool(): Map { + const pool = new Map(); const root = new LiveMap(this._liveObjects, null, ROOT_OBJECT_ID); pool.set(root.getObjectId(), root); return pool; diff --git a/src/plugins/liveobjects/statemessage.ts b/src/plugins/liveobjects/statemessage.ts new file mode 100644 index 000000000..c8063e455 --- /dev/null +++ b/src/plugins/liveobjects/statemessage.ts @@ -0,0 +1,309 @@ +import type { decodeData } from 'common/lib/types/message'; +import type Platform from 'common/platform'; +import type { ChannelOptions } from 'common/types/channel'; + +export enum StateOperationAction { + MAP_CREATE = 0, + MAP_SET = 1, + MAP_REMOVE = 2, + COUNTER_CREATE = 3, + COUNTER_INC = 4, +} + +export enum MapSemantics { + LWW = 0, +} + +/** A StateValue represents a concrete leaf value in a state object graph. */ +export type StateValue = string | number | boolean | Buffer | Uint8Array; + +/** StateData captures a value in a state object. */ +export interface StateData { + /** A reference to another state object, used to support composable state objects. */ + objectId?: string; + /** + * The encoding the client should use to interpret the value. + * Analogous to the `encoding` field on the `Message` and `PresenceMessage` types. + */ + encoding?: string; + /** A concrete leaf value in the state object graph. */ + value?: StateValue; +} + +/** A StateMapOp describes an operation to be applied to a Map object. */ +export interface StateMapOp { + /** The key of the map entry to which the operation should be applied. */ + key: string; + /** The data that the map entry should contain if the operation is a MAP_SET operation. */ + data?: StateData; +} + +/** A StateCounterOp describes an operation to be applied to a Counter object. */ +export interface StateCounterOp { + /** The data value that should be added to the counter */ + amount: number; +} + +/** A MapEntry represents the value at a given key in a Map object. */ +export interface StateMapEntry { + /** Indicates whether the map entry has been removed. */ + tombstone?: boolean; + /** The *origin* timeserial of the last operation that was applied to the map entry. */ + timeserial: string; + /** The data that represents the value of the map entry. */ + data: StateData; +} + +/** A Map object represents a map of key-value pairs. */ +export interface StateMap { + /** The conflict-resolution semantics used by the map object. */ + semantics?: MapSemantics; + // The map entries, indexed by key. + entries?: Record; +} + +/** A Counter object represents an incrementable and decrementable value */ +export interface StateCounter { + /** The value of the counter */ + count?: number; + /** + * Indicates (true) if the counter has seen an explicit create operation + * and false if the counter was created with a default value when + * processing a regular operation. + */ + created: boolean; +} + +/** A StateOperation describes an operation to be applied to a state object. */ +export interface StateOperation { + /** Defines the operation to be applied to the state object. */ + action: StateOperationAction; + /** The object ID of the state object to which the operation should be applied. */ + objectId: string; + /** The payload for the operation if it is an operation on a Map object type. */ + mapOp?: StateMapOp; + /** The payload for the operation if it is an operation on a Counter object type. */ + counterOp?: StateCounterOp; + /** + * The payload for the operation if the operation is MAP_CREATE. + * Defines the initial value for the map object. + */ + map?: StateMap; + /** + * The payload for the operation if the operation is COUNTER_CREATE. + * Defines the initial value for the counter object. + */ + counter?: StateCounter; + /** + * The nonce, must be present on create operations. This is the random part + * that has been hashed with the type and initial value to create the object ID. + */ + nonce?: string; +} + +/** A StateObject describes the instantaneous state of an object. */ +export interface StateObject { + /** The identifier of the state object. */ + objectId: string; + /** The *regional* timeserial of the last operation that was applied to this state object. */ + regionalTimeserial: string; + /** The data that represents the state of the object if it is a Map object type. */ + map?: StateMap; + /** The data that represents the state of the object if it is a Counter object type. */ + counter?: StateCounter; +} + +/** + * @internal + */ +export class StateMessage { + id?: string; + timestamp?: number; + clientId?: string; + connectionId?: string; + channel?: string; + extras?: any; + /** Describes an operation to be applied to a state object. */ + operation?: StateOperation; + /** Describes the instantaneous state of an object. */ + object?: StateObject; + /** Timeserial format */ + serial?: string; + + constructor(private _platform: typeof Platform) {} + + static async decode( + message: StateMessage, + inputContext: ChannelOptions, + decodeDataFn: typeof decodeData, + ): Promise { + // TODO: decide how to handle individual errors from decoding values. currently we throw first ever error we get + + const decodeMapEntry = async ( + entry: StateMapEntry, + ctx: ChannelOptions, + decode: typeof decodeData, + ): Promise => { + const { data, encoding, error } = await decode(entry.data.value, entry.data.encoding, ctx); + entry.data.value = data; + entry.data.encoding = encoding ?? undefined; + + if (error) { + throw error; + } + }; + + if (message.object?.map?.entries) { + for (const entry of Object.values(message.object.map.entries)) { + decodeMapEntry(entry, inputContext, decodeDataFn); + } + } + + if (message.operation?.map) { + for (const entry of Object.values(message.operation.map)) { + decodeMapEntry(entry, inputContext, decodeDataFn); + } + } + + if (message.operation?.mapOp?.data && 'value' in message.operation?.mapOp?.data) { + const mapOpData = message.operation.mapOp.data; + const { data, encoding, error } = await decodeDataFn(mapOpData.value, mapOpData.encoding, inputContext); + mapOpData.value = data; + mapOpData.encoding = encoding ?? undefined; + + if (error) { + throw error; + } + } + } + + static fromValues(values: StateMessage | Record, platform: typeof Platform): StateMessage { + return Object.assign(new StateMessage(platform), values); + } + + static fromValuesArray(values: unknown[], platform: typeof Platform): StateMessage[] { + const count = values.length; + const result = new Array(count); + + for (let i = 0; i < count; i++) { + result[i] = this.fromValues(values[i] as Record, platform); + } + + return result; + } + + /** + * Overload toJSON() to intercept JSON.stringify() + * @return {*} + */ + toJSON(): { + id?: string; + clientId?: string; + operation?: StateOperation; + object?: StateObject; + extras?: any; + } { + // need to encode buffer data to base64 if present and if we're returning a real JSON. + // although msgpack also calls toJSON() directly, + // we know it is a JSON.stringify() call if we have a non-empty arguments list. + // if withBase64Encoding = true - JSON.stringify() call + // if withBase64Encoding = false - we were called by msgpack + const withBase64Encoding = arguments.length > 0; + + let operationCopy: StateOperation | undefined = undefined; + if (this.operation) { + // deep copy "operation" prop so we can modify it here. + // buffer values won't be correctly copied, so we will need to set them again explictly + operationCopy = JSON.parse(JSON.stringify(this.operation)) as StateOperation; + + if (operationCopy.mapOp?.data && 'value' in operationCopy.mapOp.data) { + // use original "operation" prop when encoding values, so we have access to original buffer values. + operationCopy.mapOp.data = this._encodeStateData(this.operation.mapOp?.data!, withBase64Encoding); + } + + if (operationCopy.map?.entries) { + Object.entries(operationCopy.map.entries).forEach(([key, entry]) => { + // use original "operation" prop when encoding values, so we have access to original buffer values. + entry.data = this._encodeStateData(this.operation?.map?.entries?.[key].data!, withBase64Encoding); + }); + } + } + + let object: StateObject | undefined = undefined; + if (this.object) { + // deep copy "object" prop so we can modify it here. + // buffer values won't be correctly copied, so we will need to set them again explictly + object = JSON.parse(JSON.stringify(this.object)) as StateObject; + + if (object.map?.entries) { + Object.entries(object.map.entries).forEach(([key, entry]) => { + // use original "object" prop when encoding values, so we have access to original buffer values. + entry.data = this._encodeStateData(this.object?.map?.entries?.[key].data!, withBase64Encoding); + }); + } + } + + return { + id: this.id, + clientId: this.clientId, + operation: operationCopy, + object: object, + extras: this.extras, + }; + } + + toString(): string { + let result = '[StateMessage'; + + if (this.id) result += '; id=' + this.id; + if (this.timestamp) result += '; timestamp=' + this.timestamp; + if (this.clientId) result += '; clientId=' + this.clientId; + if (this.connectionId) result += '; connectionId=' + this.connectionId; + // TODO: prettify output for operation and object and encode buffers. + // see examples for data in Message and PresenceMessage + if (this.operation) result += '; operation=' + JSON.stringify(this.operation); + if (this.object) result += '; object=' + JSON.stringify(this.object); + if (this.extras) result += '; extras=' + JSON.stringify(this.extras); + if (this.serial) result += '; serial=' + this.serial; + + result += ']'; + + return result; + } + + private _encodeStateData(data: StateData, withBase64Encoding: boolean): StateData { + const { value, encoding } = this._encodeStateValue(data?.value, data?.encoding, withBase64Encoding); + return { + ...data, + value, + encoding, + }; + } + + private _encodeStateValue( + value: StateValue | undefined, + encoding: string | undefined, + withBase64Encoding: boolean, + ): { + value: StateValue | undefined; + encoding: string | undefined; + } { + if (!value || !this._platform.BufferUtils.isBuffer(value)) { + return { value, encoding }; + } + + if (withBase64Encoding) { + return { + value: this._platform.BufferUtils.base64Encode(value), + encoding: encoding ? encoding + '/base64' : 'base64', + }; + } + + // toBuffer returns a datatype understandable by + // that platform's msgpack implementation (Buffer in node, Uint8Array in browsers) + return { + value: this._platform.BufferUtils.toBuffer(value), + encoding, + }; + } +} diff --git a/src/plugins/liveobjects/syncliveobjectsdatapool.ts b/src/plugins/liveobjects/syncliveobjectsdatapool.ts new file mode 100644 index 000000000..1d30c5ad6 --- /dev/null +++ b/src/plugins/liveobjects/syncliveobjectsdatapool.ts @@ -0,0 +1,35 @@ +import { LiveObjectData } from './liveobject'; +import { LiveObjects } from './liveobjects'; + +export interface LiveObjectDataEntry { + objectData: LiveObjectData; + regionalTimeserial: string; + objectType: 'LiveMap' | 'LiveCounter'; +} + +/** + * @internal + */ +export class SyncLiveObjectsDataPool { + private _pool: Map; + + constructor(private _liveObjects: LiveObjects) { + this._pool = new Map(); + } + + entries() { + return this._pool.entries(); + } + + size(): number { + return this._pool.size; + } + + isEmpty(): boolean { + return this.size() === 0; + } + + reset(): void { + this._pool = new Map(); + } +} diff --git a/test/common/modules/private_api_recorder.js b/test/common/modules/private_api_recorder.js index 848004242..af478b713 100644 --- a/test/common/modules/private_api_recorder.js +++ b/test/common/modules/private_api_recorder.js @@ -49,7 +49,7 @@ define(['test/support/output_directory_paths'], function (outputDirectoryPaths) 'call.msgpack.encode', 'call.presence._myMembers.put', 'call.presence.waitSync', - 'call.protocolMessageFromDeserialized', + 'call.makeProtocolMessageFromDeserialized', 'call.realtime.baseUri', 'call.rest.baseUri', 'call.rest.http.do', diff --git a/test/realtime/channel.test.js b/test/realtime/channel.test.js index 7d2f5a76f..92c1777a3 100644 --- a/test/realtime/channel.test.js +++ b/test/realtime/channel.test.js @@ -4,7 +4,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async var exports = {}; var _exports = {}; var expect = chai.expect; - var createPM = Ably.protocolMessageFromDeserialized; + var createPM = Ably.makeProtocolMessageFromDeserialized(); function checkCanSubscribe(channel, testChannel) { return function (callback) { @@ -1259,7 +1259,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async }); helper.recordPrivateApi('call.connectionManager.activeProtocol.getTransport'); var transport = realtime.connection.connectionManager.activeProtocol.getTransport(); - helper.recordPrivateApi('call.protocolMessageFromDeserialized'); + helper.recordPrivateApi('call.makeProtocolMessageFromDeserialized'); helper.recordPrivateApi('call.transport.onProtocolMessage'); transport.onProtocolMessage( createPM({ @@ -1309,7 +1309,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async } helper.recordPrivateApi('call.Platform.nextTick'); Ably.Realtime.Platform.Config.nextTick(function () { - helper.recordPrivateApi('call.protocolMessageFromDeserialized'); + helper.recordPrivateApi('call.makeProtocolMessageFromDeserialized'); helper.recordPrivateApi('call.transport.onProtocolMessage'); transport.onProtocolMessage( createPM({ @@ -1360,7 +1360,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async }); helper.recordPrivateApi('call.connectionManager.activeProtocol.getTransport'); var transport = realtime.connection.connectionManager.activeProtocol.getTransport(); - helper.recordPrivateApi('call.protocolMessageFromDeserialized'); + helper.recordPrivateApi('call.makeProtocolMessageFromDeserialized'); helper.recordPrivateApi('call.transport.onProtocolMessage'); transport.onProtocolMessage( createPM({ @@ -1408,7 +1408,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async }); helper.recordPrivateApi('call.connectionManager.activeProtocol.getTransport'); var transport = realtime.connection.connectionManager.activeProtocol.getTransport(); - helper.recordPrivateApi('call.protocolMessageFromDeserialized'); + helper.recordPrivateApi('call.makeProtocolMessageFromDeserialized'); helper.recordPrivateApi('call.transport.onProtocolMessage'); transport.onProtocolMessage( createPM({ @@ -1614,7 +1614,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async setTimeout(function () { helper.recordPrivateApi('call.connectionManager.activeProtocol.getTransport'); var transport = realtime.connection.connectionManager.activeProtocol.getTransport(); - helper.recordPrivateApi('call.protocolMessageFromDeserialized'); + helper.recordPrivateApi('call.makeProtocolMessageFromDeserialized'); helper.recordPrivateApi('call.transport.onProtocolMessage'); transport.onProtocolMessage(createPM({ action: 11, channel: channelName })); }, 0); diff --git a/test/realtime/connection.test.js b/test/realtime/connection.test.js index c8551286c..82c506ae5 100644 --- a/test/realtime/connection.test.js +++ b/test/realtime/connection.test.js @@ -2,7 +2,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async, chai) { var expect = chai.expect; - var createPM = Ably.protocolMessageFromDeserialized; + var createPM = Ably.makeProtocolMessageFromDeserialized(); describe('realtime/connection', function () { this.timeout(60 * 1000); @@ -336,7 +336,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async }); helper.recordPrivateApi('call.connectionManager.activeProtocol.getTransport'); helper.recordPrivateApi('call.transport.onProtocolMessage'); - helper.recordPrivateApi('call.protocolMessageFromDeserialized'); + helper.recordPrivateApi('call.makeProtocolMessageFromDeserialized'); connectionManager.activeProtocol.getTransport().onProtocolMessage( createPM({ action: 4, diff --git a/test/realtime/failure.test.js b/test/realtime/failure.test.js index 970e7b90e..8b59c51e7 100644 --- a/test/realtime/failure.test.js +++ b/test/realtime/failure.test.js @@ -3,7 +3,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async, chai) { var expect = chai.expect; var noop = function () {}; - var createPM = Ably.protocolMessageFromDeserialized; + var createPM = Ably.makeProtocolMessageFromDeserialized(); describe('realtime/failure', function () { this.timeout(60 * 1000); @@ -644,7 +644,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async helper.closeAndFinish(done, realtime); }); }); - helper.recordPrivateApi('call.protocolMessageFromDeserialized'); + helper.recordPrivateApi('call.makeProtocolMessageFromDeserialized'); helper.recordPrivateApi('call.connectionManager.activeProtocol.getTransport'); helper.recordPrivateApi('call.transport.onProtocolMessage'); connectionManager.activeProtocol.getTransport().onProtocolMessage( diff --git a/test/realtime/live_objects.test.js b/test/realtime/live_objects.test.js index e66a882a8..ad6ae3895 100644 --- a/test/realtime/live_objects.test.js +++ b/test/realtime/live_objects.test.js @@ -8,7 +8,7 @@ define(['ably', 'shared_helper', 'async', 'chai', 'live_objects'], function ( LiveObjectsPlugin, ) { var expect = chai.expect; - var createPM = Ably.protocolMessageFromDeserialized; + var createPM = Ably.makeProtocolMessageFromDeserialized({ LiveObjectsPlugin }); function LiveObjectsRealtime(helper, options) { return helper.AblyRealtime({ ...options, plugins: { LiveObjects: LiveObjectsPlugin } }); diff --git a/test/realtime/message.test.js b/test/realtime/message.test.js index 6d0fe8f7c..f56946a20 100644 --- a/test/realtime/message.test.js +++ b/test/realtime/message.test.js @@ -3,7 +3,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async, chai) { var expect = chai.expect; let config = Ably.Realtime.Platform.Config; - var createPM = Ably.protocolMessageFromDeserialized; + var createPM = Ably.makeProtocolMessageFromDeserialized(); var publishIntervalHelper = function (currentMessageNum, channel, dataFn, onPublish) { return function () { @@ -1146,7 +1146,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async helper.recordPrivateApi('write.connectionManager.connectionDetails.maxMessageSize'); connectionDetails.maxMessageSize = 64; helper.recordPrivateApi('call.connectionManager.activeProtocol.getTransport'); - helper.recordPrivateApi('call.protocolMessageFromDeserialized'); + helper.recordPrivateApi('call.makeProtocolMessageFromDeserialized'); helper.recordPrivateApi('call.transport.onProtocolMessage'); connectionManager.activeProtocol.getTransport().onProtocolMessage( createPM({ diff --git a/test/realtime/presence.test.js b/test/realtime/presence.test.js index f2de186fd..2be0762da 100644 --- a/test/realtime/presence.test.js +++ b/test/realtime/presence.test.js @@ -2,7 +2,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async, chai) { var expect = chai.expect; - var createPM = Ably.protocolMessageFromDeserialized; + var createPM = Ably.makeProtocolMessageFromDeserialized(); var PresenceMessage = Ably.Realtime.PresenceMessage; function extractClientIds(presenceSet) { @@ -2096,7 +2096,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async cb(); }); /* Inject an ATTACHED with RESUMED and HAS_PRESENCE both false */ - helper.recordPrivateApi('call.protocolMessageFromDeserialized'); + helper.recordPrivateApi('call.makeProtocolMessageFromDeserialized'); channel.processMessage( createPM({ action: 11, diff --git a/test/realtime/sync.test.js b/test/realtime/sync.test.js index dccbdeff5..2f3a4a9af 100644 --- a/test/realtime/sync.test.js +++ b/test/realtime/sync.test.js @@ -2,7 +2,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async, chai) { var expect = chai.expect; - var createPM = Ably.protocolMessageFromDeserialized; + var createPM = Ably.makeProtocolMessageFromDeserialized(); describe('realtime/sync', function () { this.timeout(60 * 1000); @@ -50,7 +50,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async channelName = 'syncexistingset', channel = realtime.channels.get(channelName); - helper.recordPrivateApi('call.protocolMessageFromDeserialized'); + helper.recordPrivateApi('call.makeProtocolMessageFromDeserialized'); helper.recordPrivateApi('call.channel.processMessage'); await channel.processMessage( createPM({ @@ -179,7 +179,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async channelName = 'sync_member_arrives_in_middle', channel = realtime.channels.get(channelName); - helper.recordPrivateApi('call.protocolMessageFromDeserialized'); + helper.recordPrivateApi('call.makeProtocolMessageFromDeserialized'); helper.recordPrivateApi('call.channel.processMessage'); await channel.processMessage( createPM({ @@ -291,7 +291,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async channelName = 'sync_member_arrives_normally_after_came_in_sync', channel = realtime.channels.get(channelName); - helper.recordPrivateApi('call.protocolMessageFromDeserialized'); + helper.recordPrivateApi('call.makeProtocolMessageFromDeserialized'); helper.recordPrivateApi('call.channel.processMessage'); await channel.processMessage( createPM({ @@ -383,7 +383,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async channelName = 'sync_member_arrives_normally_before_comes_in_sync', channel = realtime.channels.get(channelName); - helper.recordPrivateApi('call.protocolMessageFromDeserialized'); + helper.recordPrivateApi('call.makeProtocolMessageFromDeserialized'); helper.recordPrivateApi('call.channel.processMessage'); await channel.processMessage( createPM({ @@ -479,7 +479,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async channelName = 'sync_ordering', channel = realtime.channels.get(channelName); - helper.recordPrivateApi('call.protocolMessageFromDeserialized'); + helper.recordPrivateApi('call.makeProtocolMessageFromDeserialized'); helper.recordPrivateApi('call.channel.processMessage'); await channel.processMessage( createPM({