From 42dc15f43680d9db80fe82384ce2a04580836ce4 Mon Sep 17 00:00:00 2001 From: Andrew Bulat Date: Wed, 9 Oct 2024 10:45:59 +0100 Subject: [PATCH] Implement handling of the server-initiated state sync sequence STATE_SYNC message processing in `RealtimeChannel.processMessage` is based on the process for `PRESENCE` message. Resolves DTP-950 --- src/common/lib/client/realtimechannel.ts | 49 +++++- src/plugins/liveobjects/livecounter.ts | 4 +- src/plugins/liveobjects/livemap.ts | 7 +- src/plugins/liveobjects/liveobject.ts | 24 ++- src/plugins/liveobjects/liveobjects.ts | 150 ++++++++++++++++++ src/plugins/liveobjects/liveobjectspool.ts | 33 +++- .../liveobjects/syncliveobjectsdatapool.ts | 35 ++++ 7 files changed, 288 insertions(+), 14 deletions(-) create mode 100644 src/plugins/liveobjects/syncliveobjectsdatapool.ts diff --git a/src/common/lib/client/realtimechannel.ts b/src/common/lib/client/realtimechannel.ts index 1a2554f6b..08e1e6fe5 100644 --- a/src/common/lib/client/realtimechannel.ts +++ b/src/common/lib/client/realtimechannel.ts @@ -12,6 +12,7 @@ import Message, { fromValuesArray as messagesFromValuesArray, encodeArray as encodeMessagesArray, decode as decodeMessage, + decodeData, getMessagesSize, CipherOptions, EncodingDecodingContext, @@ -533,12 +534,18 @@ class RealtimeChannel extends EventEmitter { const resumed = message.hasFlag('RESUMED'); const hasPresence = message.hasFlag('HAS_PRESENCE'); const hasBacklog = message.hasFlag('HAS_BACKLOG'); + const hasState = message.hasFlag('HAS_STATE'); if (this.state === 'attached') { if (!resumed) { - /* On a loss of continuity, the presence set needs to be re-synced */ + // we have lost continuity. + // the presence set needs to be re-synced if (this._presence) { this._presence.onAttached(hasPresence); } + // the Live Objects state needs to be re-synced + if (this._liveObjects) { + this._liveObjects.onAttached(hasState); + } } const change = new ChannelStateChange(this.state, this.state, resumed, hasBacklog, message.error); this._allChannelChanges.emit('update', change); @@ -549,7 +556,7 @@ class RealtimeChannel extends EventEmitter { /* RTL5i: re-send DETACH and remain in the 'detaching' state */ this.checkPendingState(); } else { - this.notifyState('attached', message.error, resumed, hasPresence, hasBacklog); + this.notifyState('attached', message.error, resumed, hasPresence, hasBacklog, hasState); } break; } @@ -613,6 +620,40 @@ class RealtimeChannel extends EventEmitter { } break; } + + case actions.STATE_SYNC: { + const { id, connectionId, timestamp } = message; + const options = this.channelOptions; + + const stateMessages = message.state ?? []; + for (let i = 0; i < stateMessages.length; i++) { + try { + const stateMessage = stateMessages[i]; + + await this.client._LiveObjectsPlugin?.StateMessage.decode(stateMessage, options, decodeData); + + if (!stateMessage.connectionId) stateMessage.connectionId = connectionId; + if (!stateMessage.timestamp) stateMessage.timestamp = timestamp; + if (!stateMessage.id) stateMessage.id = id + ':' + i; + + stateMessages.push(stateMessage); + } catch (e) { + Logger.logAction( + this.logger, + Logger.LOG_ERROR, + 'RealtimeChannel.processMessage()', + (e as Error).toString(), + ); + } + } + + if (this._liveObjects) { + this._liveObjects.handleStateSyncMessage(stateMessages, message.channelSerial); + } + + break; + } + case actions.MESSAGE: { //RTL17 if (this.state !== 'attached') { @@ -743,6 +784,7 @@ class RealtimeChannel extends EventEmitter { resumed?: boolean, hasPresence?: boolean, hasBacklog?: boolean, + hasState?: boolean, ): void { Logger.logAction( this.logger, @@ -763,6 +805,9 @@ class RealtimeChannel extends EventEmitter { if (this._presence) { this._presence.actOnChannelState(state, hasPresence, reason); } + if (this._liveObjects) { + this._liveObjects.actOnChannelState(state, hasState, reason); + } if (state === 'suspended' && this.connectionManager.state.sendEvents) { this.startRetryTimer(); } else { diff --git a/src/plugins/liveobjects/livecounter.ts b/src/plugins/liveobjects/livecounter.ts index fbc9ac7d9..06398d6e9 100644 --- a/src/plugins/liveobjects/livecounter.ts +++ b/src/plugins/liveobjects/livecounter.ts @@ -1,6 +1,6 @@ -import { LiveObject } from './liveobject'; +import { LiveObject, LiveObjectData } from './liveobject'; -export interface LiveCounterData { +export interface LiveCounterData extends LiveObjectData { data: number; } diff --git a/src/plugins/liveobjects/livemap.ts b/src/plugins/liveobjects/livemap.ts index 8e2696219..b147dcf80 100644 --- a/src/plugins/liveobjects/livemap.ts +++ b/src/plugins/liveobjects/livemap.ts @@ -1,6 +1,5 @@ -import { LiveObject } from './liveobject'; - -export type StateValue = string | number | boolean | Uint8Array; +import { LiveObject, LiveObjectData } from './liveobject'; +import { StateValue } from './statemessage'; export interface ObjectIdStateData { /** @@ -23,7 +22,7 @@ export interface MapEntry { data: StateData; } -export interface LiveMapData { +export interface LiveMapData extends LiveObjectData { data: Map; } diff --git a/src/plugins/liveobjects/liveobject.ts b/src/plugins/liveobjects/liveobject.ts index 945e09ced..d062fec37 100644 --- a/src/plugins/liveobjects/liveobject.ts +++ b/src/plugins/liveobjects/liveobject.ts @@ -1,12 +1,13 @@ import { LiveObjects } from './liveobjects'; -interface LiveObjectData { +export interface LiveObjectData { data: any; } export abstract class LiveObject { protected _dataRef: T; protected _objectId: string; + protected _regionalTimeserial?: string; constructor( protected _liveObjects: LiveObjects, @@ -24,6 +25,27 @@ export abstract class LiveObject { return this._objectId; } + /** + * @internal + */ + getRegionalTimeserial(): string | undefined { + return this._regionalTimeserial; + } + + /** + * @internal + */ + setData(newDataRef: T): void { + this._dataRef = newDataRef; + } + + /** + * @internal + */ + setRegionalTimeserial(regionalTimeserial: string): void { + this._regionalTimeserial = regionalTimeserial; + } + private _createObjectId(): string { // TODO: implement object id generation based on live object type and initial value return Math.random().toString().substring(2); diff --git a/src/plugins/liveobjects/liveobjects.ts b/src/plugins/liveobjects/liveobjects.ts index 9ade0b6b2..e5a21b7ef 100644 --- a/src/plugins/liveobjects/liveobjects.ts +++ b/src/plugins/liveobjects/liveobjects.ts @@ -1,17 +1,29 @@ import type BaseClient from 'common/lib/client/baseclient'; import type RealtimeChannel from 'common/lib/client/realtimechannel'; +import type ErrorInfo from 'common/lib/types/errorinfo'; +import type * as API from '../../../ably'; +import { LiveCounter } from './livecounter'; import { LiveMap } from './livemap'; +import { LiveObject } from './liveobject'; import { LiveObjectsPool, ROOT_OBJECT_ID } from './liveobjectspool'; +import { StateMessage } from './statemessage'; +import { SyncLiveObjectsDataPool } from './syncliveobjectsdatapool'; export class LiveObjects { private _client: BaseClient; private _channel: RealtimeChannel; private _liveObjectsPool: LiveObjectsPool; + private _syncLiveObjectsDataPool: SyncLiveObjectsDataPool; + private _syncInProgress: boolean; + private _currentSyncId: string | undefined; + private _currentSyncCursor: string | undefined; constructor(channel: RealtimeChannel) { this._channel = channel; this._client = channel.client; this._liveObjectsPool = new LiveObjectsPool(this); + this._syncLiveObjectsDataPool = new SyncLiveObjectsDataPool(this); + this._syncInProgress = true; } async getRoot(): Promise { @@ -25,4 +37,142 @@ export class LiveObjects { getPool(): LiveObjectsPool { return this._liveObjectsPool; } + + /** + * @internal + */ + getClient(): BaseClient { + return this._client; + } + + /** + * @internal + */ + handleStateSyncMessage(stateMessages: StateMessage[], syncChannelSerial: string | null | undefined): void { + const { syncId, syncCursor } = this._parseSyncChannelSerial(syncChannelSerial); + if (this._currentSyncId !== syncId) { + this._startNewSync(syncId, syncCursor); + } + + // TODO: delegate state messages to _syncLiveObjectsDataPool and create new live and data objects + + // if this is the last (or only) message in a sequence of sync updates, end the sync + if (!syncCursor) { + this._endSync(); + } + } + + /** + * @internal + */ + onAttached(hasState?: boolean): void { + this._client.Logger.logAction( + this._client.logger, + this._client.Logger.LOG_MINOR, + 'LiveObjects.onAttached()', + 'channel = ' + this._channel.name + ', hasState = ' + hasState, + ); + + if (hasState) { + this._startNewSync(undefined); + } else { + // no HAS_STATE flag received on attach, can end SYNC sequence immediately + // and treat it as no state on a channel + this._liveObjectsPool.reset(); + this._syncLiveObjectsDataPool.reset(); + this._endSync(); + } + } + + /** + * @internal + */ + actOnChannelState(state: API.ChannelState, hasState?: boolean, stateReason?: ErrorInfo | null): void { + switch (state) { + case 'attached': + this.onAttached(hasState); + break; + + case 'detached': + case 'failed': + // TODO: do something + break; + + case 'suspended': + // TODO: do something + break; + } + } + + private _startNewSync(syncId?: string, syncCursor?: string): void { + this._syncLiveObjectsDataPool.reset(); + this._currentSyncId = syncId; + this._currentSyncCursor = syncCursor; + this._syncInProgress = true; + } + + private _endSync(): void { + this._applySync(); + this._syncLiveObjectsDataPool.reset(); + this._currentSyncId = undefined; + this._currentSyncCursor = undefined; + this._syncInProgress = false; + } + + private _parseSyncChannelSerial(syncChannelSerial: string | null | undefined): { + syncId: string | undefined; + syncCursor: string | undefined; + } { + let match: RegExpMatchArray | null; + let syncId: string | undefined = undefined; + let syncCursor: string | undefined = undefined; + if (syncChannelSerial && (match = syncChannelSerial.match(/^([\w-]+):(.*)$/))) { + syncId = match[1]; + syncCursor = match[2]; + } + + return { + syncId, + syncCursor, + }; + } + + private _applySync(): void { + if (this._syncLiveObjectsDataPool.isEmpty()) { + return; + } + + const receivedObjectIds = new Set(); + + for (const [objectId, entry] of this._syncLiveObjectsDataPool.entries()) { + receivedObjectIds.add(objectId); + const existingObject = this._liveObjectsPool.get(objectId); + + if (existingObject) { + existingObject.setData(entry.objectData); + existingObject.setRegionalTimeserial(entry.regionalTimeserial); + continue; + } + + let newObject: LiveObject; + switch (entry.objectType) { + case 'LiveCounter': + newObject = new LiveCounter(this, entry.objectData, objectId); + break; + + case 'LiveMap': + newObject = new LiveMap(this, entry.objectData, objectId); + break; + + default: + throw new this._client.ErrorInfo(`Unknown live object type: ${entry.objectType}`, 40000, 400); + } + newObject.setRegionalTimeserial(entry.regionalTimeserial); + + this._liveObjectsPool.set(objectId, newObject); + } + + // need to remove LiveObject instances from the LiveObjectsPool for which objectIds were not received during the SYNC sequence + this._liveObjectsPool.deleteExtraObjectIds([...receivedObjectIds]); + } } diff --git a/src/plugins/liveobjects/liveobjectspool.ts b/src/plugins/liveobjects/liveobjectspool.ts index 3431992c1..5f46c1a0a 100644 --- a/src/plugins/liveobjects/liveobjectspool.ts +++ b/src/plugins/liveobjects/liveobjectspool.ts @@ -1,23 +1,46 @@ +import type BaseClient from 'common/lib/client/baseclient'; import { LiveMap } from './livemap'; import { LiveObject } from './liveobject'; import { LiveObjects } from './liveobjects'; -export type ObjectId = string; export const ROOT_OBJECT_ID = 'root'; +/** + * @internal + */ export class LiveObjectsPool { - private _pool: Map; + private _client: BaseClient; + private _pool: Map; constructor(private _liveObjects: LiveObjects) { + this._client = this._liveObjects.getClient(); this._pool = this._getInitialPool(); } - get(objectId: ObjectId): LiveObject | undefined { + get(objectId: string): LiveObject | undefined { return this._pool.get(objectId); } - private _getInitialPool(): Map { - const pool = new Map(); + /** + * Deletes objects from the pool for which object ids are not found in the provided array of ids. + */ + deleteExtraObjectIds(objectIds: string[]): void { + const poolObjectIds = [...this._pool.keys()]; + const extraObjectIds = this._client.Utils.arrSubtract(poolObjectIds, objectIds); + + extraObjectIds.forEach((x) => this._pool.delete(x)); + } + + set(objectId: string, liveObject: LiveObject): void { + this._pool.set(objectId, liveObject); + } + + reset(): void { + this._pool = this._getInitialPool(); + } + + private _getInitialPool(): Map { + const pool = new Map(); const root = new LiveMap(this._liveObjects, null, ROOT_OBJECT_ID); pool.set(root.getObjectId(), root); return pool; diff --git a/src/plugins/liveobjects/syncliveobjectsdatapool.ts b/src/plugins/liveobjects/syncliveobjectsdatapool.ts new file mode 100644 index 000000000..1d30c5ad6 --- /dev/null +++ b/src/plugins/liveobjects/syncliveobjectsdatapool.ts @@ -0,0 +1,35 @@ +import { LiveObjectData } from './liveobject'; +import { LiveObjects } from './liveobjects'; + +export interface LiveObjectDataEntry { + objectData: LiveObjectData; + regionalTimeserial: string; + objectType: 'LiveMap' | 'LiveCounter'; +} + +/** + * @internal + */ +export class SyncLiveObjectsDataPool { + private _pool: Map; + + constructor(private _liveObjects: LiveObjects) { + this._pool = new Map(); + } + + entries() { + return this._pool.entries(); + } + + size(): number { + return this._pool.size; + } + + isEmpty(): boolean { + return this.size() === 0; + } + + reset(): void { + this._pool = new Map(); + } +}