diff --git a/src/common/lib/client/realtimechannel.ts b/src/common/lib/client/realtimechannel.ts index d9cbabd81..4cdcd2312 100644 --- a/src/common/lib/client/realtimechannel.ts +++ b/src/common/lib/client/realtimechannel.ts @@ -621,6 +621,39 @@ class RealtimeChannel extends EventEmitter { break; } + case actions.STATE: { + if (!this._liveObjects) { + return; + } + + const { id, connectionId, timestamp } = message; + const options = this.channelOptions; + + const stateMessages = message.state ?? []; + for (let i = 0; i < stateMessages.length; i++) { + try { + const stateMessage = stateMessages[i]; + + await this.client._LiveObjectsPlugin?.StateMessage.decode(stateMessage, options, decodeData); + + if (!stateMessage.connectionId) stateMessage.connectionId = connectionId; + if (!stateMessage.timestamp) stateMessage.timestamp = timestamp; + if (!stateMessage.id) stateMessage.id = id + ':' + i; + } catch (e) { + Logger.logAction( + this.logger, + Logger.LOG_ERROR, + 'RealtimeChannel.processMessage()', + (e as Error).toString(), + ); + } + } + + this._liveObjects.handleStateMessages(stateMessages); + + break; + } + case actions.STATE_SYNC: { if (!this._liveObjects) { return; diff --git a/src/plugins/liveobjects/liveobjects.ts b/src/plugins/liveobjects/liveobjects.ts index a8e8ab885..0fb0886ed 100644 --- a/src/plugins/liveobjects/liveobjects.ts +++ b/src/plugins/liveobjects/liveobjects.ts @@ -1,7 +1,7 @@ import type BaseClient from 'common/lib/client/baseclient'; import type RealtimeChannel from 'common/lib/client/realtimechannel'; -import type * as API from '../../../ably'; import type EventEmitter from 'common/lib/util/eventemitter'; +import type * as API from '../../../ably'; import { LiveCounter } from './livecounter'; import { LiveMap } from './livemap'; import { LiveObject } from './liveobject'; @@ -81,6 +81,17 @@ export class LiveObjects { } } + /** + * @internal + */ + handleStateMessages(stateMessages: StateMessage[]): void { + if (this._syncInProgress) { + // TODO: handle buffering of state messages during SYNC + } + + this._liveObjectsPool.applyStateMessages(stateMessages); + } + /** * @internal */ @@ -131,6 +142,8 @@ export class LiveObjects { } private _endSync(): void { + // TODO: handle applying buffered state messages when SYNC is finished + this._applySync(); this._syncLiveObjectsDataPool.reset(); this._currentSyncId = undefined; diff --git a/src/plugins/liveobjects/liveobjectspool.ts b/src/plugins/liveobjects/liveobjectspool.ts index 959411a73..02123f17c 100644 --- a/src/plugins/liveobjects/liveobjectspool.ts +++ b/src/plugins/liveobjects/liveobjectspool.ts @@ -1,10 +1,11 @@ import type BaseClient from 'common/lib/client/baseclient'; +import type RealtimeChannel from 'common/lib/client/realtimechannel'; import { LiveCounter } from './livecounter'; import { LiveMap } from './livemap'; import { LiveObject } from './liveobject'; import { LiveObjects } from './liveobjects'; import { ObjectId } from './objectid'; -import { MapSemantics } from './statemessage'; +import { MapSemantics, StateMessage, StateOperation, StateOperationAction } from './statemessage'; export const ROOT_OBJECT_ID = 'root'; @@ -13,10 +14,12 @@ export const ROOT_OBJECT_ID = 'root'; */ export class LiveObjectsPool { private _client: BaseClient; + private _channel: RealtimeChannel; private _pool: Map; constructor(private _liveObjects: LiveObjects) { this._client = this._liveObjects.getClient(); + this._channel = this._liveObjects.getChannel(); this._pool = this._getInitialPool(); } @@ -63,10 +66,102 @@ export class LiveObjectsPool { this.set(objectId, zeroValueObject); } + applyStateMessages(stateMessages: StateMessage[]): void { + for (const stateMessage of stateMessages) { + if (!stateMessage.operation) { + this._client.Logger.logAction( + this._client.logger, + this._client.Logger.LOG_MAJOR, + 'LiveObjects.LiveObjectsPool.applyStateMessages()', + `state operation message is received without 'operation' field, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`, + ); + continue; + } + + const stateOperation = stateMessage.operation; + + switch (stateOperation.action) { + case StateOperationAction.MAP_CREATE: + case StateOperationAction.COUNTER_CREATE: + if (this.get(stateOperation.objectId)) { + // object wich such id already exists (we may have created a zero-value object before, or this is a duplicate *_CREATE op), + // so delegate application of the op to that object + // TODO: invoke subscription callbacks for an object when applied + this.get(stateOperation.objectId)!.applyOperation(stateOperation, stateMessage); + break; + } + + // otherwise we can create new objects in the pool + if (stateOperation.action === StateOperationAction.MAP_CREATE) { + this._handleMapCreate(stateOperation); + } + + if (stateOperation.action === StateOperationAction.COUNTER_CREATE) { + this._handleCounterCreate(stateOperation); + } + break; + + case StateOperationAction.MAP_SET: + case StateOperationAction.MAP_REMOVE: + case StateOperationAction.COUNTER_INC: + // we can receive an op for an object id we don't have yet in the pool. instead of buffering such operations, + // we create a zero-value object for the provided object id, and apply operation for that zero-value object. + // when we eventually receive a corresponding *_CREATE op for that object, its application will be handled by that zero-value object. + this.createZeroValueObjectIfNotExists(stateOperation.objectId); + // TODO: invoke subscription callbacks for an object when applied + this.get(stateOperation.objectId)!.applyOperation(stateOperation, stateMessage); + break; + + default: + this._client.Logger.logAction( + this._client.logger, + this._client.Logger.LOG_MAJOR, + 'LiveObjects.LiveObjectsPool.applyStateMessages()', + `received unsupported action in state operation message: ${stateOperation.action}, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`, + ); + } + } + } + private _getInitialPool(): Map { const pool = new Map(); const root = new LiveMap(this._liveObjects, MapSemantics.LWW, null, ROOT_OBJECT_ID); pool.set(root.getObjectId(), root); return pool; } + + private _handleCounterCreate(stateOperation: StateOperation): void { + let counter: LiveCounter; + if (this._client.Utils.isNil(stateOperation.counter)) { + // if a counter object is missing for the COUNTER_CREATE op, the initial value is implicitly 0 in this case. + counter = new LiveCounter(this._liveObjects, true, { data: 0 }, stateOperation.objectId); + } else { + counter = new LiveCounter( + this._liveObjects, + true, + { data: stateOperation.counter.count ?? 0 }, + stateOperation.objectId, + ); + } + + this.set(stateOperation.objectId, counter); + } + + private _handleMapCreate(stateOperation: StateOperation): void { + let map: LiveMap; + if (this._client.Utils.isNil(stateOperation.map)) { + // if a map object is missing for the MAP_CREATE op, the initial value is implicitly an empty map. + map = new LiveMap(this._liveObjects, MapSemantics.LWW, null, stateOperation.objectId); + } else { + const objectData = LiveMap.liveMapDataFromMapEntries(this._client, stateOperation.map.entries ?? {}); + map = new LiveMap( + this._liveObjects, + stateOperation.map.semantics ?? MapSemantics.LWW, + objectData, + stateOperation.objectId, + ); + } + + this.set(stateOperation.objectId, map); + } }