From 28b8b4a26294ab6209d81c470b1f018f165d961d Mon Sep 17 00:00:00 2001 From: Andrew Bulat Date: Thu, 10 Oct 2024 08:11:18 +0100 Subject: [PATCH] Implement LiveObjects pool init from state SYNC sequence Resolves DTP-949 --- src/plugins/liveobjects/liveobjects.ts | 9 +- src/plugins/liveobjects/statemessage.ts | 1 + .../liveobjects/syncliveobjectsdatapool.ts | 88 ++++++++++++++++++- 3 files changed, 96 insertions(+), 2 deletions(-) diff --git a/src/plugins/liveobjects/liveobjects.ts b/src/plugins/liveobjects/liveobjects.ts index f83cc91b3..127980308 100644 --- a/src/plugins/liveobjects/liveobjects.ts +++ b/src/plugins/liveobjects/liveobjects.ts @@ -38,6 +38,13 @@ export class LiveObjects { return this._liveObjectsPool; } + /** + * @internal + */ + getChannel(): RealtimeChannel { + return this._channel; + } + /** * @internal */ @@ -54,7 +61,7 @@ export class LiveObjects { this._startNewSync(syncId, syncCursor); } - // TODO: delegate state messages to _syncLiveObjectsDataPool and create new live and data objects + this._syncLiveObjectsDataPool.applyStateMessages(stateMessages); // if this is the last (or only) message in a sequence of sync updates, end the sync if (!syncCursor) { diff --git a/src/plugins/liveobjects/statemessage.ts b/src/plugins/liveobjects/statemessage.ts index e885b3ed1..98689c216 100644 --- a/src/plugins/liveobjects/statemessage.ts +++ b/src/plugins/liveobjects/statemessage.ts @@ -60,6 +60,7 @@ export interface StateMap { semantics: MapSemantics; // The map entries, indexed by key. entries: Record; + // TODO: should here also be 'created' field like for StateCounter? } /** A Counter object represents an incrementable and decrementable value */ diff --git a/src/plugins/liveobjects/syncliveobjectsdatapool.ts b/src/plugins/liveobjects/syncliveobjectsdatapool.ts index 3c787eb09..731647059 100644 --- a/src/plugins/liveobjects/syncliveobjectsdatapool.ts +++ b/src/plugins/liveobjects/syncliveobjectsdatapool.ts @@ -1,6 +1,10 @@ +import type BaseClient from 'common/lib/client/baseclient'; +import RealtimeChannel from 'common/lib/client/realtimechannel'; +import { LiveCounterData } from './livecounter'; +import { LiveMapData, MapEntry, ObjectIdStateData, StateData, ValueStateData } from './livemap'; import { LiveObjectData } from './liveobject'; import { LiveObjects } from './liveobjects'; -import { MapSemantics } from './statemessage'; +import { MapSemantics, StateMessage, StateObject } from './statemessage'; export interface LiveObjectDataEntry { objectData: LiveObjectData; @@ -24,9 +28,13 @@ export type AnyDataEntry = LiveCounterDataEntry | LiveMapDataEntry; * @internal */ export class SyncLiveObjectsDataPool { + private _client: BaseClient; + private _channel: RealtimeChannel; private _pool: Map; constructor(private _liveObjects: LiveObjects) { + this._client = this._liveObjects.getClient(); + this._channel = this._liveObjects.getChannel(); this._pool = new Map(); } @@ -45,4 +53,82 @@ export class SyncLiveObjectsDataPool { reset(): void { this._pool = new Map(); } + + applyStateMessages(stateMessages: StateMessage[]): void { + for (const stateMessage of stateMessages) { + if (!stateMessage.object) { + this._client.Logger.logAction( + this._client.logger, + this._client.Logger.LOG_MAJOR, + 'LiveObjects.SyncLiveObjectsDataPool.applyStateMessages()', + `state message is received during SYNC without 'object' field, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`, + ); + continue; + } + + const stateObject = stateMessage.object; + + if (stateObject.counter) { + this._pool.set(stateObject.objectId, this._createLiveCounterDataEntry(stateObject)); + } else if (stateObject.map) { + this._pool.set(stateObject.objectId, this._createLiveMapDataEntry(stateObject)); + } else { + this._client.Logger.logAction( + this._client.logger, + this._client.Logger.LOG_MINOR, + 'LiveObjects.SyncLiveObjectsDataPool.applyStateMessages()', + `received unsupported state object message during SYNC, expected 'counter' or 'map' to be present; message id: ${stateMessage.id}, channel: ${this._channel.name}`, + ); + } + } + } + + private _createLiveMapDataEntry(stateObject: StateObject): LiveCounterDataEntry { + const counter = stateObject.counter!; + + const objectData: LiveCounterData = { + data: counter.count, + }; + const newEntry: LiveCounterDataEntry = { + created: counter.created, + objectData, + objectType: 'LiveCounter', + regionalTimeserial: stateObject.regionalTimeserial, + }; + + return newEntry; + } + + private _createLiveCounterDataEntry(stateObject: StateObject): LiveMapDataEntry { + const map = stateObject.map!; + + const objectData: LiveMapData = { + data: new Map(), + }; + // need to iterate over entries manually to work around optional parameters from state object entries type + Object.entries(map.entries).forEach(([key, entryFromMessage]) => { + let liveData: StateData; + if (typeof entryFromMessage.data.objectId !== 'undefined') { + liveData = { objectId: entryFromMessage.data.objectId } as ObjectIdStateData; + } else { + liveData = { encoding: entryFromMessage.data.encoding, value: entryFromMessage.data.value } as ValueStateData; + } + + const liveDataEntry: MapEntry = { + ...entryFromMessage, + data: liveData, + }; + + objectData.data.set(key, liveDataEntry); + }); + + const newEntry: LiveMapDataEntry = { + objectData, + objectType: 'LiveMap', + regionalTimeserial: stateObject.regionalTimeserial, + semantics: map.semantics, + }; + + return newEntry; + } }