From 8e31a6a6d702213804fbe32f109226850ea3198a Mon Sep 17 00:00:00 2001 From: Andrew Bulat Date: Fri, 18 Oct 2024 06:46:10 +0100 Subject: [PATCH 01/11] Add Timeserial class to LiveObjects plugin This is a copy of Timeserial implementation from chat-js repo [1], with some changes: - inversion of control change to pass in the reference to Ably BaseClient, as we cannot import it directly in a plugin - seriesId comparison fix for empty seriesId. This is based on the identical fix in realtime [2] - slightly better error handling [1] https://github.com/ably/ably-chat-js/blob/main/src/core/timeserial.ts [2] https://github.com/ably/realtime/pull/6678/files#diff-31896aac1d68683fb340b9ac488b1cfd5b96eb9c0b79f2260e637c5004721e98R134-R143 --- src/plugins/liveobjects/timeserial.ts | 179 ++++++++++++++++++++++++++ 1 file changed, 179 insertions(+) create mode 100644 src/plugins/liveobjects/timeserial.ts diff --git a/src/plugins/liveobjects/timeserial.ts b/src/plugins/liveobjects/timeserial.ts new file mode 100644 index 000000000..553f960a5 --- /dev/null +++ b/src/plugins/liveobjects/timeserial.ts @@ -0,0 +1,179 @@ +import type BaseClient from 'common/lib/client/baseclient'; + +/** + * Represents a parsed timeserial. + */ +export interface Timeserial { + /** + * The series ID of the timeserial. + */ + readonly seriesId: string; + + /** + * The timestamp of the timeserial. + */ + readonly timestamp: number; + + /** + * The counter of the timeserial. + */ + readonly counter: number; + + /** + * The index of the timeserial. + */ + readonly index?: number; + + toString(): string; + + before(timeserial: Timeserial | string): boolean; + + after(timeserial: Timeserial | string): boolean; + + equal(timeserial: Timeserial | string): boolean; +} + +/** + * Default implementation of the Timeserial interface. Used internally to parse and compare timeserials. + * + * @internal + */ +export class DefaultTimeserial implements Timeserial { + public readonly seriesId: string; + public readonly timestamp: number; + public readonly counter: number; + public readonly index?: number; + + private constructor( + private _client: BaseClient, + seriesId: string, + timestamp: number, + counter: number, + index?: number, + ) { + this.seriesId = seriesId; + this.timestamp = timestamp; + this.counter = counter; + this.index = index; + } + + /** + * Returns the string representation of the timeserial object. + * @returns The timeserial string. + */ + toString(): string { + return `${this.seriesId}@${this.timestamp.toString()}-${this.counter.toString()}${this.index ? `:${this.index.toString()}` : ''}`; + } + + /** + * Calculate the timeserial object from a timeserial string. + * + * @param timeserial The timeserial string to parse. + * @returns The parsed timeserial object. + * @throws {@link BaseClient.ErrorInfo | ErrorInfo} if timeserial is invalid. + */ + static calculateTimeserial(client: BaseClient, timeserial: string | null | undefined): Timeserial { + if (client.Utils.isNil(timeserial)) { + throw new client.ErrorInfo(`Invalid timeserial: ${timeserial}`, 50000, 500); + } + + const [seriesId, rest] = timeserial.split('@'); + if (!seriesId || !rest) { + throw new client.ErrorInfo(`Invalid timeserial: ${timeserial}`, 50000, 500); + } + + const [timestamp, counterAndIndex] = rest.split('-'); + if (!timestamp || !counterAndIndex) { + throw new client.ErrorInfo(`Invalid timeserial: ${timeserial}`, 50000, 500); + } + + const [counter, index] = counterAndIndex.split(':'); + if (!counter) { + throw new client.ErrorInfo(`Invalid timeserial: ${timeserial}`, 50000, 500); + } + + return new DefaultTimeserial( + client, + seriesId, + Number(timestamp), + Number(counter), + index ? Number(index) : undefined, + ); + } + + /** + * Compares this timeserial to the supplied timeserial, returning a number indicating their relative order. + * @param timeserialToCompare The timeserial to compare against. Can be a string or a Timeserial object. + * @returns 0 if the timeserials are equal, <0 if the first timeserial is less than the second, >0 if the first timeserial is greater than the second. + * @throws {@link BaseClient.ErrorInfo | ErrorInfo} if comparison timeserial is invalid. + */ + private _timeserialCompare(timeserialToCompare: string | Timeserial): number { + const secondTimeserial = + typeof timeserialToCompare === 'string' + ? DefaultTimeserial.calculateTimeserial(this._client, timeserialToCompare) + : timeserialToCompare; + + // Compare the timestamp + const timestampDiff = this.timestamp - secondTimeserial.timestamp; + if (timestampDiff) { + return timestampDiff; + } + + // Compare the counter + const counterDiff = this.counter - secondTimeserial.counter; + if (counterDiff) { + return counterDiff; + } + + // Compare the seriesId + // An empty seriesId is considered less than a non-empty one + if (!this.seriesId && secondTimeserial.seriesId) { + return -1; + } + if (this.seriesId && !secondTimeserial.seriesId) { + return 1; + } + // Otherwise compare seriesId lexicographically + const seriesIdDiff = + this.seriesId === secondTimeserial.seriesId ? 0 : this.seriesId < secondTimeserial.seriesId ? -1 : 1; + + if (seriesIdDiff) { + return seriesIdDiff; + } + + // Compare the index, if present + return this.index !== undefined && secondTimeserial.index !== undefined ? this.index - secondTimeserial.index : 0; + } + + /** + * Determines if this timeserial occurs logically before the given timeserial. + * + * @param timeserial The timeserial to compare against. Can be a string or a Timeserial object. + * @returns true if this timeserial precedes the given timeserial, in global order. + * @throws {@link BaseClient.ErrorInfo | ErrorInfo} if the given timeserial is invalid. + */ + before(timeserial: Timeserial | string): boolean { + return this._timeserialCompare(timeserial) < 0; + } + + /** + * Determines if this timeserial occurs logically after the given timeserial. + * + * @param timeserial The timeserial to compare against. Can be a string or a Timeserial object. + * @returns true if this timeserial follows the given timeserial, in global order. + * @throws {@link BaseClient.ErrorInfo | ErrorInfo} if the given timeserial is invalid. + */ + after(timeserial: Timeserial | string): boolean { + return this._timeserialCompare(timeserial) > 0; + } + + /** + * Determines if this timeserial is equal to the given timeserial. + * @param timeserial The timeserial to compare against. Can be a string or a Timeserial object. + * @returns true if this timeserial is equal to the given timeserial. + * @throws {@link BaseClient.ErrorInfo | ErrorInfo} if the given timeserial is invalid. + */ + equal(timeserial: Timeserial | string): boolean { + return this._timeserialCompare(timeserial) === 0; + } +} From 692f6eddd689d37e7d994c4f7ada784930f2a0ef Mon Sep 17 00:00:00 2001 From: Andrew Bulat Date: Fri, 18 Oct 2024 08:23:20 +0100 Subject: [PATCH 02/11] Add ObjectId class to parse object id strings --- src/plugins/liveobjects/objectid.ts | 35 +++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 src/plugins/liveobjects/objectid.ts diff --git a/src/plugins/liveobjects/objectid.ts b/src/plugins/liveobjects/objectid.ts new file mode 100644 index 000000000..c968eefd6 --- /dev/null +++ b/src/plugins/liveobjects/objectid.ts @@ -0,0 +1,35 @@ +import type BaseClient from 'common/lib/client/baseclient'; + +export type LiveObjectType = 'map' | 'counter'; + +/** + * Represents a parsed object id. + * + * @internal + */ +export class ObjectId { + private constructor( + readonly type: LiveObjectType, + readonly hash: string, + ) {} + + /** + * Create ObjectId instance from hashed object id string. + */ + static fromString(client: BaseClient, objectId: string | null | undefined): ObjectId { + if (client.Utils.isNil(objectId)) { + throw new client.ErrorInfo('Invalid object id string', 50000, 500); + } + + const [type, hash] = objectId.split(':'); + if (!type || !hash) { + throw new client.ErrorInfo('Invalid object id string', 50000, 500); + } + + if (!['map', 'counter'].includes(type)) { + throw new client.ErrorInfo(`Invalid object type in object id: ${objectId}`, 50000, 500); + } + + return new ObjectId(type as LiveObjectType, hash); + } +} From 131fc4e1a97aea5ea8ed07f3d232a06a87ad6915 Mon Sep 17 00:00:00 2001 From: Andrew Bulat Date: Fri, 18 Oct 2024 08:44:15 +0100 Subject: [PATCH 03/11] Preparation for CRDT operations implementation - Add handling of `created` field to LiveCounter - Add handling of `tombstone` field on entries to LiveMap - Change `timeserial` in LiveMap entries to be of type Timeserial - Change `data` in LiveMap entries to be optionally undefined --- src/plugins/liveobjects/livecounter.ts | 24 +++++++++++++++ src/plugins/liveobjects/livemap.ts | 30 +++++++++++++++---- src/plugins/liveobjects/liveobjects.ts | 7 +++-- .../liveobjects/syncliveobjectsdatapool.ts | 4 ++- 4 files changed, 56 insertions(+), 9 deletions(-) diff --git a/src/plugins/liveobjects/livecounter.ts b/src/plugins/liveobjects/livecounter.ts index 06398d6e9..48dfa5365 100644 --- a/src/plugins/liveobjects/livecounter.ts +++ b/src/plugins/liveobjects/livecounter.ts @@ -1,14 +1,38 @@ import { LiveObject, LiveObjectData } from './liveobject'; +import { LiveObjects } from './liveobjects'; export interface LiveCounterData extends LiveObjectData { data: number; } export class LiveCounter extends LiveObject { + constructor( + liveObjects: LiveObjects, + private _created: boolean, + initialData?: LiveCounterData | null, + objectId?: string, + ) { + super(liveObjects, initialData, objectId); + } + value(): number { return this._dataRef.data; } + /** + * @internal + */ + isCreated(): boolean { + return this._created; + } + + /** + * @internal + */ + setCreated(created: boolean): void { + this._created = created; + } + protected _getZeroValueData(): LiveCounterData { return { data: 0 }; } diff --git a/src/plugins/liveobjects/livemap.ts b/src/plugins/liveobjects/livemap.ts index 06cf160f4..8c13b6d40 100644 --- a/src/plugins/liveobjects/livemap.ts +++ b/src/plugins/liveobjects/livemap.ts @@ -1,6 +1,7 @@ import { LiveObject, LiveObjectData } from './liveobject'; import { LiveObjects } from './liveobjects'; import { MapSemantics, StateValue } from './statemessage'; +import { Timeserial } from './timeserial'; export interface ObjectIdStateData { /** A reference to another state object, used to support composable state objects. */ @@ -21,8 +22,8 @@ export type StateData = ObjectIdStateData | ValueStateData; export interface MapEntry { tombstone: boolean; - timeserial: string; - data: StateData; + timeserial: Timeserial; + data: StateData | undefined; } export interface LiveMapData extends LiveObjectData { @@ -53,15 +54,32 @@ export class LiveMap extends LiveObject { return undefined; } - if ('value' in element.data) { - return element.data.value; + if (element.tombstone === true) { + return undefined; + } + + // data exists for non-tombstoned elements + const data = element.data!; + + if ('value' in data) { + return data.value; } else { - return this._liveObjects.getPool().get(element.data.objectId); + return this._liveObjects.getPool().get(data.objectId); } } size(): number { - return this._dataRef.data.size; + let size = 0; + for (const value of this._dataRef.data.values()) { + if (value.tombstone === true) { + // should not count deleted entries + continue; + } + + size++; + } + + return size; } protected _getZeroValueData(): LiveMapData { diff --git a/src/plugins/liveobjects/liveobjects.ts b/src/plugins/liveobjects/liveobjects.ts index e67ee350e..11a5ef3a6 100644 --- a/src/plugins/liveobjects/liveobjects.ts +++ b/src/plugins/liveobjects/liveobjects.ts @@ -7,7 +7,7 @@ import { LiveMap } from './livemap'; import { LiveObject } from './liveobject'; import { LiveObjectsPool, ROOT_OBJECT_ID } from './liveobjectspool'; import { StateMessage } from './statemessage'; -import { SyncLiveObjectsDataPool } from './syncliveobjectsdatapool'; +import { LiveCounterDataEntry, SyncLiveObjectsDataPool } from './syncliveobjectsdatapool'; enum LiveObjectsEvents { SyncCompleted = 'SyncCompleted', @@ -171,6 +171,9 @@ export class LiveObjects { if (existingObject) { existingObject.setData(entry.objectData); existingObject.setRegionalTimeserial(entry.regionalTimeserial); + if (existingObject instanceof LiveCounter) { + existingObject.setCreated((entry as LiveCounterDataEntry).created); + } continue; } @@ -179,7 +182,7 @@ export class LiveObjects { const objectType = entry.objectType; switch (objectType) { case 'LiveCounter': - newObject = new LiveCounter(this, entry.objectData, objectId); + newObject = new LiveCounter(this, entry.created, entry.objectData, objectId); break; case 'LiveMap': diff --git a/src/plugins/liveobjects/syncliveobjectsdatapool.ts b/src/plugins/liveobjects/syncliveobjectsdatapool.ts index 73de256ff..22e83c7a1 100644 --- a/src/plugins/liveobjects/syncliveobjectsdatapool.ts +++ b/src/plugins/liveobjects/syncliveobjectsdatapool.ts @@ -5,6 +5,7 @@ import { LiveMapData, MapEntry, ObjectIdStateData, StateData, ValueStateData } f import { LiveObjectData } from './liveobject'; import { LiveObjects } from './liveobjects'; import { MapSemantics, StateMessage, StateObject } from './statemessage'; +import { DefaultTimeserial } from './timeserial'; export interface LiveObjectDataEntry { objectData: LiveObjectData; @@ -90,10 +91,10 @@ export class SyncLiveObjectsDataPool { data: counter.count ?? 0, }; const newEntry: LiveCounterDataEntry = { - created: counter.created, objectData, objectType: 'LiveCounter', regionalTimeserial: stateObject.regionalTimeserial, + created: counter.created, }; return newEntry; @@ -116,6 +117,7 @@ export class SyncLiveObjectsDataPool { const liveDataEntry: MapEntry = { ...entryFromMessage, + timeserial: DefaultTimeserial.calculateTimeserial(this._client, entryFromMessage.timeserial), // true only if we received explicit true. otherwise always false tombstone: entryFromMessage.tombstone === true, data: liveData, From 9b83e941c00ea92db4934b7f34d883ef4bb311c0 Mon Sep 17 00:00:00 2001 From: Andrew Bulat Date: Fri, 18 Oct 2024 06:18:55 +0100 Subject: [PATCH 04/11] Implement support for applying incoming operations to LiveMap/LiveCounter This adds implementation for CRDT operations for LiveMap/LiveCounter classes to be able to handle incoming state operation messages. Resolves DTP-954 --- src/plugins/liveobjects/livecounter.ts | 73 +++++++++ src/plugins/liveobjects/livemap.ts | 170 ++++++++++++++++++++- src/plugins/liveobjects/liveobject.ts | 8 + src/plugins/liveobjects/liveobjectspool.ts | 23 +++ 4 files changed, 272 insertions(+), 2 deletions(-) diff --git a/src/plugins/liveobjects/livecounter.ts b/src/plugins/liveobjects/livecounter.ts index 48dfa5365..b82b9f852 100644 --- a/src/plugins/liveobjects/livecounter.ts +++ b/src/plugins/liveobjects/livecounter.ts @@ -1,5 +1,6 @@ import { LiveObject, LiveObjectData } from './liveobject'; import { LiveObjects } from './liveobjects'; +import { StateCounter, StateCounterOp, StateOperation, StateOperationAction } from './statemessage'; export interface LiveCounterData extends LiveObjectData { data: number; @@ -33,7 +34,79 @@ export class LiveCounter extends LiveObject { this._created = created; } + /** + * @internal + */ + applyOperation(op: StateOperation): void { + if (op.objectId !== this.getObjectId()) { + throw new this._client.ErrorInfo( + `Cannot apply state operation with objectId=${op.objectId}, to this LiveCounter with objectId=${this.getObjectId()}`, + 50000, + 500, + ); + } + + switch (op.action) { + case StateOperationAction.COUNTER_CREATE: + this._applyCounterCreate(op.counter); + break; + + case StateOperationAction.COUNTER_INC: + if (this._client.Utils.isNil(op.counterOp)) { + this._throwNoPayloadError(op); + } else { + this._applyCounterInc(op.counterOp); + } + break; + + default: + throw new this._client.ErrorInfo( + `Invalid ${op.action} op for LiveCounter objectId=${this.getObjectId()}`, + 50000, + 500, + ); + } + } + protected _getZeroValueData(): LiveCounterData { return { data: 0 }; } + + private _throwNoPayloadError(op: StateOperation): void { + throw new this._client.ErrorInfo( + `No payload found for ${op.action} op for LiveCounter objectId=${this.getObjectId()}`, + 50000, + 500, + ); + } + + private _applyCounterCreate(op: StateCounter | undefined): void { + if (this.isCreated()) { + // skip COUNTER_CREATE op if this counter is already created + this._client.Logger.logAction( + this._client.logger, + this._client.Logger.LOG_MICRO, + 'LiveCounter._applyCounterCreate()', + `skipping applying COUNTER_CREATE op on a counter instance as it is already created; objectId=${this._objectId}`, + ); + return; + } + + if (this._client.Utils.isNil(op)) { + // if a counter object is missing for the COUNTER_CREATE op, the initial value is implicitly 0 in this case. + // we need to SUM the initial value to the current value due to the reasons below, but since it's a 0, we can skip addition operation + this.setCreated(true); + return; + } + + // note that it is intentional to SUM the incoming count from the create op. + // if we get here, it means that current counter instance wasn't initialized from the COUNTER_CREATE op, + // so it is missing the initial value that we're going to add now. + this._dataRef.data += op.count ?? 0; + this.setCreated(true); + } + + private _applyCounterInc(op: StateCounterOp): void { + this._dataRef.data += op.amount; + } } diff --git a/src/plugins/liveobjects/livemap.ts b/src/plugins/liveobjects/livemap.ts index 8c13b6d40..30c943c3a 100644 --- a/src/plugins/liveobjects/livemap.ts +++ b/src/plugins/liveobjects/livemap.ts @@ -1,7 +1,15 @@ import { LiveObject, LiveObjectData } from './liveobject'; import { LiveObjects } from './liveobjects'; -import { MapSemantics, StateValue } from './statemessage'; -import { Timeserial } from './timeserial'; +import { + MapSemantics, + StateMap, + StateMapOp, + StateMessage, + StateOperation, + StateOperationAction, + StateValue, +} from './statemessage'; +import { DefaultTimeserial, Timeserial } from './timeserial'; export interface ObjectIdStateData { /** A reference to another state object, used to support composable state objects. */ @@ -82,7 +90,165 @@ export class LiveMap extends LiveObject { return size; } + /** + * @internal + */ + applyOperation(op: StateOperation, msg: StateMessage): void { + if (op.objectId !== this.getObjectId()) { + throw new this._client.ErrorInfo( + `Cannot apply state operation with objectId=${op.objectId}, to this LiveMap with objectId=${this.getObjectId()}`, + 50000, + 500, + ); + } + + switch (op.action) { + case StateOperationAction.MAP_CREATE: + this._applyMapCreate(op.map); + break; + + case StateOperationAction.MAP_SET: + if (this._client.Utils.isNil(op.mapOp)) { + this._throwNoPayloadError(op); + } else { + this._applyMapSet(op.mapOp, msg.serial); + } + break; + + case StateOperationAction.MAP_REMOVE: + if (this._client.Utils.isNil(op.mapOp)) { + this._throwNoPayloadError(op); + } else { + this._applyMapRemove(op.mapOp, msg.serial); + } + break; + + default: + throw new this._client.ErrorInfo( + `Invalid ${op.action} op for LiveMap objectId=${this.getObjectId()}`, + 50000, + 500, + ); + } + } + protected _getZeroValueData(): LiveMapData { return { data: new Map() }; } + + private _throwNoPayloadError(op: StateOperation): void { + throw new this._client.ErrorInfo( + `No payload found for ${op.action} op for LiveMap objectId=${this.getObjectId()}`, + 50000, + 500, + ); + } + + private _applyMapCreate(op: StateMap | undefined): void { + if (this._client.Utils.isNil(op)) { + // if a map object is missing for the MAP_CREATE op, the initial value is implicitly an empty map. + // in this case there is nothing to merge into the current map, so we can just end processing the op. + return; + } + + if (this._semantics !== op.semantics) { + throw new this._client.ErrorInfo( + `Cannot apply MAP_CREATE op on LiveMap objectId=${this.getObjectId()}; map's semantics=${this._semantics}, but op expected ${op.semantics}`, + 50000, + 500, + ); + } + + // in order to apply MAP_CREATE op for an existing map, we should merge their underlying entries keys. + // we can do this by iterating over entries from MAP_CREATE op and apply changes on per-key basis as if we had MAP_SET, MAP_REMOVE operations. + Object.entries(op.entries ?? {}).forEach(([key, entry]) => { + // for MAP_CREATE op we must use dedicated timeserial field available on an entry, instead of a timeserial on a message + const opOriginTimeserial = entry.timeserial; + if (entry.tombstone === true) { + // entry in MAP_CREATE op is deleted, try to apply MAP_REMOVE op + this._applyMapRemove({ key }, opOriginTimeserial); + } else { + // entry in MAP_CREATE op is not deleted, try to set it via MAP_SET op + this._applyMapSet({ key, data: entry.data }, opOriginTimeserial); + } + }); + } + + private _applyMapSet(op: StateMapOp, opOriginTimeserialStr: string | undefined): void { + const { ErrorInfo, Utils } = this._client; + + const opTimeserial = DefaultTimeserial.calculateTimeserial(this._client, opOriginTimeserialStr); + const existingEntry = this._dataRef.data.get(op.key); + if (existingEntry && opTimeserial.before(existingEntry.timeserial)) { + // the operation's origin timeserial < the entry's timeserial, ignore the operation. + this._client.Logger.logAction( + this._client.logger, + this._client.Logger.LOG_MICRO, + 'LiveMap._applyMapSet()', + `skipping updating key="${op.key}" as existing key entry has greater timeserial: ${existingEntry.timeserial.toString()}, than the op: ${opOriginTimeserialStr}; objectId=${this._objectId}`, + ); + return; + } + + if (Utils.isNil(op.data) || (Utils.isNil(op.data.value) && Utils.isNil(op.data.objectId))) { + throw new ErrorInfo( + `Invalid state data for MAP_SET op on objectId=${this.getObjectId()} on key=${op.key}`, + 50000, + 500, + ); + } + + let liveData: StateData; + if (!Utils.isNil(op.data.objectId)) { + liveData = { objectId: op.data.objectId } as ObjectIdStateData; + // this MAP_SET op is setting a key to point to another object via its object id, + // but it is possible that we don't have the corresponding object in the pool yet (for example, we haven't seen the *_CREATE op for it). + // we don't want to return undefined from this map's .get() method even if we don't have the object, + // so instead we create a zero-value object for that object id if it not exists. + this._liveObjects.getPool().createZeroValueObjectIfNotExists(op.data.objectId); + } else { + liveData = { encoding: op.data.encoding, value: op.data.value } as ValueStateData; + } + + if (existingEntry) { + existingEntry.tombstone = false; + existingEntry.timeserial = opTimeserial; + existingEntry.data = liveData; + } else { + const newEntry: MapEntry = { + tombstone: false, + timeserial: opTimeserial, + data: liveData, + }; + this._dataRef.data.set(op.key, newEntry); + } + } + + private _applyMapRemove(op: StateMapOp, opOriginTimeserialStr: string | undefined): void { + const opTimeserial = DefaultTimeserial.calculateTimeserial(this._client, opOriginTimeserialStr); + const existingEntry = this._dataRef.data.get(op.key); + if (existingEntry && opTimeserial.before(existingEntry.timeserial)) { + // the operation's origin timeserial < the entry's timeserial, ignore the operation. + this._client.Logger.logAction( + this._client.logger, + this._client.Logger.LOG_MICRO, + 'LiveMap._applyMapRemove()', + `skipping removing key="${op.key}" as existing key entry has greater timeserial: ${existingEntry.timeserial.toString()}, than the op: ${opOriginTimeserialStr}; objectId=${this._objectId}`, + ); + return; + } + + if (existingEntry) { + existingEntry.tombstone = true; + existingEntry.timeserial = opTimeserial; + existingEntry.data = undefined; + } else { + const newEntry: MapEntry = { + tombstone: true, + timeserial: opTimeserial, + data: undefined, + }; + this._dataRef.data.set(op.key, newEntry); + } + } } diff --git a/src/plugins/liveobjects/liveobject.ts b/src/plugins/liveobjects/liveobject.ts index d062fec37..2e33cb0b2 100644 --- a/src/plugins/liveobjects/liveobject.ts +++ b/src/plugins/liveobjects/liveobject.ts @@ -1,10 +1,13 @@ +import type BaseClient from 'common/lib/client/baseclient'; import { LiveObjects } from './liveobjects'; +import { StateMessage, StateOperation } from './statemessage'; export interface LiveObjectData { data: any; } export abstract class LiveObject { + protected _client: BaseClient; protected _dataRef: T; protected _objectId: string; protected _regionalTimeserial?: string; @@ -14,6 +17,7 @@ export abstract class LiveObject { initialData?: T | null, objectId?: string, ) { + this._client = this._liveObjects.getClient(); this._dataRef = initialData ?? this._getZeroValueData(); this._objectId = objectId ?? this._createObjectId(); } @@ -51,5 +55,9 @@ export abstract class LiveObject { return Math.random().toString().substring(2); } + /** + * @internal + */ + abstract applyOperation(op: StateOperation, msg: StateMessage): void; protected abstract _getZeroValueData(): T; } diff --git a/src/plugins/liveobjects/liveobjectspool.ts b/src/plugins/liveobjects/liveobjectspool.ts index f4345d61e..959411a73 100644 --- a/src/plugins/liveobjects/liveobjectspool.ts +++ b/src/plugins/liveobjects/liveobjectspool.ts @@ -1,7 +1,9 @@ import type BaseClient from 'common/lib/client/baseclient'; +import { LiveCounter } from './livecounter'; import { LiveMap } from './livemap'; import { LiveObject } from './liveobject'; import { LiveObjects } from './liveobjects'; +import { ObjectId } from './objectid'; import { MapSemantics } from './statemessage'; export const ROOT_OBJECT_ID = 'root'; @@ -40,6 +42,27 @@ export class LiveObjectsPool { this._pool = this._getInitialPool(); } + createZeroValueObjectIfNotExists(objectId: string): void { + if (this.get(objectId)) { + return; + } + + const parsedObjectId = ObjectId.fromString(this._client, objectId); + let zeroValueObject: LiveObject; + switch (parsedObjectId.type) { + case 'map': { + zeroValueObject = new LiveMap(this._liveObjects, MapSemantics.LWW, null, objectId); + break; + } + + case 'counter': + zeroValueObject = new LiveCounter(this._liveObjects, false, null, objectId); + break; + } + + this.set(objectId, zeroValueObject); + } + private _getInitialPool(): Map { const pool = new Map(); const root = new LiveMap(this._liveObjects, MapSemantics.LWW, null, ROOT_OBJECT_ID); From 9f98ef68ae043dcadde4e2d84b4d30bdbf9c02ee Mon Sep 17 00:00:00 2001 From: Andrew Bulat Date: Mon, 21 Oct 2024 13:20:17 +0100 Subject: [PATCH 05/11] Preparation for applying incoming state operations - add static LiveMap.liveMapDataFromMapEntries() method with logic previously used in SyncLiveObjectsDataPool. This method will be used in other places to create LiveMapData - minor refactoring and log improvements --- src/common/lib/client/realtimechannel.ts | 2 +- src/plugins/liveobjects/livemap.ts | 30 ++++++++++++++ src/plugins/liveobjects/liveobjects.ts | 8 ++-- src/plugins/liveobjects/statemessage.ts | 2 +- .../liveobjects/syncliveobjectsdatapool.ts | 41 ++++--------------- 5 files changed, 45 insertions(+), 38 deletions(-) diff --git a/src/common/lib/client/realtimechannel.ts b/src/common/lib/client/realtimechannel.ts index 928650ad6..d9cbabd81 100644 --- a/src/common/lib/client/realtimechannel.ts +++ b/src/common/lib/client/realtimechannel.ts @@ -649,7 +649,7 @@ class RealtimeChannel extends EventEmitter { } } - this._liveObjects.handleStateSyncMessage(stateMessages, message.channelSerial); + this._liveObjects.handleStateSyncMessages(stateMessages, message.channelSerial); break; } diff --git a/src/plugins/liveobjects/livemap.ts b/src/plugins/liveobjects/livemap.ts index 30c943c3a..90582d3a7 100644 --- a/src/plugins/liveobjects/livemap.ts +++ b/src/plugins/liveobjects/livemap.ts @@ -1,8 +1,10 @@ +import type BaseClient from 'common/lib/client/baseclient'; import { LiveObject, LiveObjectData } from './liveobject'; import { LiveObjects } from './liveobjects'; import { MapSemantics, StateMap, + StateMapEntry, StateMapOp, StateMessage, StateOperation, @@ -48,6 +50,34 @@ export class LiveMap extends LiveObject { super(liveObjects, initialData, objectId); } + static liveMapDataFromMapEntries(client: BaseClient, entries: Record): LiveMapData { + const liveMapData: LiveMapData = { + data: new Map(), + }; + + // need to iterate over entries manually to work around optional parameters from state object entries type + Object.entries(entries ?? {}).forEach(([key, entry]) => { + let liveData: StateData; + if (typeof entry.data.objectId !== 'undefined') { + liveData = { objectId: entry.data.objectId } as ObjectIdStateData; + } else { + liveData = { encoding: entry.data.encoding, value: entry.data.value } as ValueStateData; + } + + const liveDataEntry: MapEntry = { + ...entry, + timeserial: DefaultTimeserial.calculateTimeserial(client, entry.timeserial), + // true only if we received explicit true. otherwise always false + tombstone: entry.tombstone === true, + data: liveData, + }; + + liveMapData.data.set(key, liveDataEntry); + }); + + return liveMapData; + } + /** * Returns the value associated with the specified key in the underlying Map object. * If no element is associated with the specified key, undefined is returned. diff --git a/src/plugins/liveobjects/liveobjects.ts b/src/plugins/liveobjects/liveobjects.ts index 11a5ef3a6..a8e8ab885 100644 --- a/src/plugins/liveobjects/liveobjects.ts +++ b/src/plugins/liveobjects/liveobjects.ts @@ -67,13 +67,13 @@ export class LiveObjects { /** * @internal */ - handleStateSyncMessage(stateMessages: StateMessage[], syncChannelSerial: string | null | undefined): void { + handleStateSyncMessages(stateMessages: StateMessage[], syncChannelSerial: string | null | undefined): void { const { syncId, syncCursor } = this._parseSyncChannelSerial(syncChannelSerial); if (this._currentSyncId !== syncId) { this._startNewSync(syncId, syncCursor); } - this._syncLiveObjectsDataPool.applyStateMessages(stateMessages); + this._syncLiveObjectsDataPool.applyStateSyncMessages(stateMessages); // if this is the last (or only) message in a sequence of sync updates, end the sync if (!syncCursor) { @@ -93,7 +93,7 @@ export class LiveObjects { ); if (hasState) { - this._startNewSync(undefined); + this._startNewSync(); } else { // no HAS_STATE flag received on attach, can end SYNC sequence immediately // and treat it as no state on a channel @@ -190,7 +190,7 @@ export class LiveObjects { break; default: - throw new this._client.ErrorInfo(`Unknown live object type: ${objectType}`, 40000, 400); + throw new this._client.ErrorInfo(`Unknown live object type: ${objectType}`, 50000, 500); } newObject.setRegionalTimeserial(entry.regionalTimeserial); diff --git a/src/plugins/liveobjects/statemessage.ts b/src/plugins/liveobjects/statemessage.ts index 858f87ea1..2fd293a33 100644 --- a/src/plugins/liveobjects/statemessage.ts +++ b/src/plugins/liveobjects/statemessage.ts @@ -127,7 +127,7 @@ export class StateMessage { operation?: StateOperation; /** Describes the instantaneous state of an object. */ object?: StateObject; - /** Timeserial format */ + /** Timeserial format. Contains the origin timeserial for this state message. */ serial?: string; constructor(private _platform: typeof Platform) {} diff --git a/src/plugins/liveobjects/syncliveobjectsdatapool.ts b/src/plugins/liveobjects/syncliveobjectsdatapool.ts index 22e83c7a1..8f0f8d648 100644 --- a/src/plugins/liveobjects/syncliveobjectsdatapool.ts +++ b/src/plugins/liveobjects/syncliveobjectsdatapool.ts @@ -1,11 +1,10 @@ import type BaseClient from 'common/lib/client/baseclient'; -import RealtimeChannel from 'common/lib/client/realtimechannel'; +import type RealtimeChannel from 'common/lib/client/realtimechannel'; import { LiveCounterData } from './livecounter'; -import { LiveMapData, MapEntry, ObjectIdStateData, StateData, ValueStateData } from './livemap'; +import { LiveMap } from './livemap'; import { LiveObjectData } from './liveobject'; import { LiveObjects } from './liveobjects'; import { MapSemantics, StateMessage, StateObject } from './statemessage'; -import { DefaultTimeserial } from './timeserial'; export interface LiveObjectDataEntry { objectData: LiveObjectData; @@ -55,14 +54,14 @@ export class SyncLiveObjectsDataPool { this._pool.clear(); } - applyStateMessages(stateMessages: StateMessage[]): void { + applyStateSyncMessages(stateMessages: StateMessage[]): void { for (const stateMessage of stateMessages) { if (!stateMessage.object) { this._client.Logger.logAction( this._client.logger, this._client.Logger.LOG_MAJOR, - 'LiveObjects.SyncLiveObjectsDataPool.applyStateMessages()', - `state message is received during SYNC without 'object' field, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`, + 'LiveObjects.SyncLiveObjectsDataPool.applyStateSyncMessages()', + `state object message is received during SYNC without 'object' field, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`, ); continue; } @@ -76,9 +75,9 @@ export class SyncLiveObjectsDataPool { } else { this._client.Logger.logAction( this._client.logger, - this._client.Logger.LOG_MINOR, - 'LiveObjects.SyncLiveObjectsDataPool.applyStateMessages()', - `received unsupported state object message during SYNC, expected 'counter' or 'map' to be present; message id: ${stateMessage.id}, channel: ${this._channel.name}`, + this._client.Logger.LOG_MAJOR, + 'LiveObjects.SyncLiveObjectsDataPool.applyStateSyncMessages()', + `received unsupported state object message during SYNC, expected 'counter' or 'map' to be present, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`, ); } } @@ -102,29 +101,7 @@ export class SyncLiveObjectsDataPool { private _createLiveMapDataEntry(stateObject: StateObject): LiveMapDataEntry { const map = stateObject.map!; - - const objectData: LiveMapData = { - data: new Map(), - }; - // need to iterate over entries manually to work around optional parameters from state object entries type - Object.entries(map.entries ?? {}).forEach(([key, entryFromMessage]) => { - let liveData: StateData; - if (typeof entryFromMessage.data.objectId !== 'undefined') { - liveData = { objectId: entryFromMessage.data.objectId } as ObjectIdStateData; - } else { - liveData = { encoding: entryFromMessage.data.encoding, value: entryFromMessage.data.value } as ValueStateData; - } - - const liveDataEntry: MapEntry = { - ...entryFromMessage, - timeserial: DefaultTimeserial.calculateTimeserial(this._client, entryFromMessage.timeserial), - // true only if we received explicit true. otherwise always false - tombstone: entryFromMessage.tombstone === true, - data: liveData, - }; - - objectData.data.set(key, liveDataEntry); - }); + const objectData = LiveMap.liveMapDataFromMapEntries(this._client, map.entries ?? {}); const newEntry: LiveMapDataEntry = { objectData, From 0b646e2e8cdcad7da7c8544b6f8505a3cb38b98a Mon Sep 17 00:00:00 2001 From: Andrew Bulat Date: Mon, 21 Oct 2024 13:17:54 +0100 Subject: [PATCH 06/11] Implement application of incoming state operation messages outside of sync sequence Resolves DTP-956 --- src/common/lib/client/realtimechannel.ts | 33 ++++++++ src/plugins/liveobjects/liveobjects.ts | 15 +++- src/plugins/liveobjects/liveobjectspool.ts | 97 +++++++++++++++++++++- 3 files changed, 143 insertions(+), 2 deletions(-) diff --git a/src/common/lib/client/realtimechannel.ts b/src/common/lib/client/realtimechannel.ts index d9cbabd81..4cdcd2312 100644 --- a/src/common/lib/client/realtimechannel.ts +++ b/src/common/lib/client/realtimechannel.ts @@ -621,6 +621,39 @@ class RealtimeChannel extends EventEmitter { break; } + case actions.STATE: { + if (!this._liveObjects) { + return; + } + + const { id, connectionId, timestamp } = message; + const options = this.channelOptions; + + const stateMessages = message.state ?? []; + for (let i = 0; i < stateMessages.length; i++) { + try { + const stateMessage = stateMessages[i]; + + await this.client._LiveObjectsPlugin?.StateMessage.decode(stateMessage, options, decodeData); + + if (!stateMessage.connectionId) stateMessage.connectionId = connectionId; + if (!stateMessage.timestamp) stateMessage.timestamp = timestamp; + if (!stateMessage.id) stateMessage.id = id + ':' + i; + } catch (e) { + Logger.logAction( + this.logger, + Logger.LOG_ERROR, + 'RealtimeChannel.processMessage()', + (e as Error).toString(), + ); + } + } + + this._liveObjects.handleStateMessages(stateMessages); + + break; + } + case actions.STATE_SYNC: { if (!this._liveObjects) { return; diff --git a/src/plugins/liveobjects/liveobjects.ts b/src/plugins/liveobjects/liveobjects.ts index a8e8ab885..0fb0886ed 100644 --- a/src/plugins/liveobjects/liveobjects.ts +++ b/src/plugins/liveobjects/liveobjects.ts @@ -1,7 +1,7 @@ import type BaseClient from 'common/lib/client/baseclient'; import type RealtimeChannel from 'common/lib/client/realtimechannel'; -import type * as API from '../../../ably'; import type EventEmitter from 'common/lib/util/eventemitter'; +import type * as API from '../../../ably'; import { LiveCounter } from './livecounter'; import { LiveMap } from './livemap'; import { LiveObject } from './liveobject'; @@ -81,6 +81,17 @@ export class LiveObjects { } } + /** + * @internal + */ + handleStateMessages(stateMessages: StateMessage[]): void { + if (this._syncInProgress) { + // TODO: handle buffering of state messages during SYNC + } + + this._liveObjectsPool.applyStateMessages(stateMessages); + } + /** * @internal */ @@ -131,6 +142,8 @@ export class LiveObjects { } private _endSync(): void { + // TODO: handle applying buffered state messages when SYNC is finished + this._applySync(); this._syncLiveObjectsDataPool.reset(); this._currentSyncId = undefined; diff --git a/src/plugins/liveobjects/liveobjectspool.ts b/src/plugins/liveobjects/liveobjectspool.ts index 959411a73..02123f17c 100644 --- a/src/plugins/liveobjects/liveobjectspool.ts +++ b/src/plugins/liveobjects/liveobjectspool.ts @@ -1,10 +1,11 @@ import type BaseClient from 'common/lib/client/baseclient'; +import type RealtimeChannel from 'common/lib/client/realtimechannel'; import { LiveCounter } from './livecounter'; import { LiveMap } from './livemap'; import { LiveObject } from './liveobject'; import { LiveObjects } from './liveobjects'; import { ObjectId } from './objectid'; -import { MapSemantics } from './statemessage'; +import { MapSemantics, StateMessage, StateOperation, StateOperationAction } from './statemessage'; export const ROOT_OBJECT_ID = 'root'; @@ -13,10 +14,12 @@ export const ROOT_OBJECT_ID = 'root'; */ export class LiveObjectsPool { private _client: BaseClient; + private _channel: RealtimeChannel; private _pool: Map; constructor(private _liveObjects: LiveObjects) { this._client = this._liveObjects.getClient(); + this._channel = this._liveObjects.getChannel(); this._pool = this._getInitialPool(); } @@ -63,10 +66,102 @@ export class LiveObjectsPool { this.set(objectId, zeroValueObject); } + applyStateMessages(stateMessages: StateMessage[]): void { + for (const stateMessage of stateMessages) { + if (!stateMessage.operation) { + this._client.Logger.logAction( + this._client.logger, + this._client.Logger.LOG_MAJOR, + 'LiveObjects.LiveObjectsPool.applyStateMessages()', + `state operation message is received without 'operation' field, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`, + ); + continue; + } + + const stateOperation = stateMessage.operation; + + switch (stateOperation.action) { + case StateOperationAction.MAP_CREATE: + case StateOperationAction.COUNTER_CREATE: + if (this.get(stateOperation.objectId)) { + // object wich such id already exists (we may have created a zero-value object before, or this is a duplicate *_CREATE op), + // so delegate application of the op to that object + // TODO: invoke subscription callbacks for an object when applied + this.get(stateOperation.objectId)!.applyOperation(stateOperation, stateMessage); + break; + } + + // otherwise we can create new objects in the pool + if (stateOperation.action === StateOperationAction.MAP_CREATE) { + this._handleMapCreate(stateOperation); + } + + if (stateOperation.action === StateOperationAction.COUNTER_CREATE) { + this._handleCounterCreate(stateOperation); + } + break; + + case StateOperationAction.MAP_SET: + case StateOperationAction.MAP_REMOVE: + case StateOperationAction.COUNTER_INC: + // we can receive an op for an object id we don't have yet in the pool. instead of buffering such operations, + // we create a zero-value object for the provided object id, and apply operation for that zero-value object. + // when we eventually receive a corresponding *_CREATE op for that object, its application will be handled by that zero-value object. + this.createZeroValueObjectIfNotExists(stateOperation.objectId); + // TODO: invoke subscription callbacks for an object when applied + this.get(stateOperation.objectId)!.applyOperation(stateOperation, stateMessage); + break; + + default: + this._client.Logger.logAction( + this._client.logger, + this._client.Logger.LOG_MAJOR, + 'LiveObjects.LiveObjectsPool.applyStateMessages()', + `received unsupported action in state operation message: ${stateOperation.action}, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`, + ); + } + } + } + private _getInitialPool(): Map { const pool = new Map(); const root = new LiveMap(this._liveObjects, MapSemantics.LWW, null, ROOT_OBJECT_ID); pool.set(root.getObjectId(), root); return pool; } + + private _handleCounterCreate(stateOperation: StateOperation): void { + let counter: LiveCounter; + if (this._client.Utils.isNil(stateOperation.counter)) { + // if a counter object is missing for the COUNTER_CREATE op, the initial value is implicitly 0 in this case. + counter = new LiveCounter(this._liveObjects, true, { data: 0 }, stateOperation.objectId); + } else { + counter = new LiveCounter( + this._liveObjects, + true, + { data: stateOperation.counter.count ?? 0 }, + stateOperation.objectId, + ); + } + + this.set(stateOperation.objectId, counter); + } + + private _handleMapCreate(stateOperation: StateOperation): void { + let map: LiveMap; + if (this._client.Utils.isNil(stateOperation.map)) { + // if a map object is missing for the MAP_CREATE op, the initial value is implicitly an empty map. + map = new LiveMap(this._liveObjects, MapSemantics.LWW, null, stateOperation.objectId); + } else { + const objectData = LiveMap.liveMapDataFromMapEntries(this._client, stateOperation.map.entries ?? {}); + map = new LiveMap( + this._liveObjects, + stateOperation.map.semantics ?? MapSemantics.LWW, + objectData, + stateOperation.objectId, + ); + } + + this.set(stateOperation.objectId, map); + } } From ef54890d5445d26cbee319c797416f0a1c1589e1 Mon Sep 17 00:00:00 2001 From: Andrew Bulat Date: Mon, 21 Oct 2024 15:50:28 +0100 Subject: [PATCH 07/11] Preparation for LiveObjects tests for applying incoming operations - LiveObjectsHelper refactoring - timeserials format fix in existing LiveObjects tests (add seriesId part) --- test/common/modules/live_objects_helper.js | 92 ++++++++++++++-------- test/realtime/live_objects.test.js | 16 ++-- 2 files changed, 71 insertions(+), 37 deletions(-) diff --git a/test/common/modules/live_objects_helper.js b/test/common/modules/live_objects_helper.js index 2f6ffc384..b2793804e 100644 --- a/test/common/modules/live_objects_helper.js +++ b/test/common/modules/live_objects_helper.js @@ -17,6 +17,10 @@ define(['shared_helper'], function (Helper) { } class LiveObjectsHelper { + constructor(helper) { + this._rest = helper.AblyRest({ useBinaryProtocol: false }); + } + /** * Creates next LiveObjects state tree on a provided channel name: * @@ -27,39 +31,37 @@ define(['shared_helper'], function (Helper) { * root "initialValueCounter" -> Counter#2 count=10 * root "referencedCounter" -> Counter#3 count=20 */ - async initForChannel(helper, channelName) { - const rest = helper.AblyRest({ useBinaryProtocol: false }); - - const emptyCounter = await this._createAndSetOnMap(rest, channelName, { + async initForChannel(channelName) { + const emptyCounter = await this.createAndSetOnMap(channelName, { mapObjectId: 'root', key: 'emptyCounter', - createOp: this._counterCreateOp(), + createOp: this.counterCreateOp(), }); - const initialValueCounter = await this._createAndSetOnMap(rest, channelName, { + const initialValueCounter = await this.createAndSetOnMap(channelName, { mapObjectId: 'root', key: 'initialValueCounter', - createOp: this._counterCreateOp({ count: 10 }), + createOp: this.counterCreateOp({ count: 10 }), }); - const referencedCounter = await this._createAndSetOnMap(rest, channelName, { + const referencedCounter = await this.createAndSetOnMap(channelName, { mapObjectId: 'root', key: 'referencedCounter', - createOp: this._counterCreateOp({ count: 20 }), + createOp: this.counterCreateOp({ count: 20 }), }); - const emptyMap = await this._createAndSetOnMap(rest, channelName, { + const emptyMap = await this.createAndSetOnMap(channelName, { mapObjectId: 'root', key: 'emptyMap', - createOp: this._mapCreateOp(), + createOp: this.mapCreateOp(), }); - const referencedMap = await this._createAndSetOnMap(rest, channelName, { + const referencedMap = await this.createAndSetOnMap(channelName, { mapObjectId: 'root', key: 'referencedMap', - createOp: this._mapCreateOp({ entries: { counterKey: { data: { objectId: referencedCounter.objectId } } } }), + createOp: this.mapCreateOp({ entries: { counterKey: { data: { objectId: referencedCounter.objectId } } } }), }); - const valuesMap = await this._createAndSetOnMap(rest, channelName, { + const valuesMap = await this.createAndSetOnMap(channelName, { mapObjectId: 'root', key: 'valuesMap', - createOp: this._mapCreateOp({ + createOp: this.mapCreateOp({ entries: { stringKey: { data: { value: 'stringValue' } }, emptyStringKey: { data: { value: '' } }, @@ -77,20 +79,19 @@ define(['shared_helper'], function (Helper) { }); } - async _createAndSetOnMap(rest, channelName, opts) { + async createAndSetOnMap(channelName, opts) { const { mapObjectId, key, createOp } = opts; - const createResult = await this._stateRequest(rest, channelName, createOp); - await this._stateRequest( - rest, + const createResult = await this.stateRequest(channelName, createOp); + await this.stateRequest( channelName, - this._mapSetOp({ objectId: mapObjectId, key, data: { objectId: createResult.objectId } }), + this.mapSetOp({ objectId: mapObjectId, key, data: { objectId: createResult.objectId } }), ); return createResult; } - _mapCreateOp(opts) { + mapCreateOp(opts) { const { objectId, entries } = opts ?? {}; const op = { operation: { @@ -107,26 +108,38 @@ define(['shared_helper'], function (Helper) { return op; } - _mapSetOp(opts) { + mapSetOp(opts) { const { objectId, key, data } = opts ?? {}; const op = { operation: { action: ACTIONS.MAP_SET, objectId, + mapOp: { + key, + data, + }, }, }; - if (key && data) { - op.operation.mapOp = { - key, - data, - }; - } + return op; + } + + mapRemoveOp(opts) { + const { objectId, key } = opts ?? {}; + const op = { + operation: { + action: ACTIONS.MAP_REMOVE, + objectId, + mapOp: { + key, + }, + }, + }; return op; } - _counterCreateOp(opts) { + counterCreateOp(opts) { const { objectId, count } = opts ?? {}; const op = { operation: { @@ -143,7 +156,22 @@ define(['shared_helper'], function (Helper) { return op; } - async _stateRequest(rest, channelName, opBody) { + counterIncOp(opts) { + const { objectId, amount } = opts ?? {}; + const op = { + operation: { + action: ACTIONS.COUNTER_INC, + objectId, + counterOp: { + amount, + }, + }, + }; + + return op; + } + + async stateRequest(channelName, opBody) { if (Array.isArray(opBody)) { throw new Error(`Only single object state requests are supported`); } @@ -151,7 +179,7 @@ define(['shared_helper'], function (Helper) { const method = 'post'; const path = `/channels/${channelName}/state`; - const response = await rest.request(method, path, 3, null, opBody, null); + const response = await this._rest.request(method, path, 3, null, opBody, null); if (response.success) { // only one operation in request, so need only first item. @@ -167,5 +195,5 @@ define(['shared_helper'], function (Helper) { } } - return (module.exports = new LiveObjectsHelper()); + return (module.exports = LiveObjectsHelper); }); diff --git a/test/realtime/live_objects.test.js b/test/realtime/live_objects.test.js index e3c0d078e..ae2515738 100644 --- a/test/realtime/live_objects.test.js +++ b/test/realtime/live_objects.test.js @@ -36,7 +36,8 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], return; } - LiveObjectsHelper.initForChannel(helper, liveObjectsFixturesChannel) + new LiveObjectsHelper(helper) + .initForChannel(liveObjectsFixturesChannel) .then(done) .catch((err) => done(err)); }); @@ -77,7 +78,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], { object: { objectId: 'root', - regionalTimeserial: '@0-0', + regionalTimeserial: 'a@0-0', map: {}, }, }, @@ -220,7 +221,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], // wait for initial STATE_SYNC sequence to complete await liveObjects.getRoot(); - // inject STATE_SYNC message to emulate start of new sequence + // inject STATE_SYNC message to emulate start of a new sequence helper.recordPrivateApi('call.channel.processMessage'); helper.recordPrivateApi('call.makeProtocolMessageFromDeserialized'); await channel.processMessage( @@ -259,11 +260,11 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], { object: { objectId: 'root', - regionalTimeserial: '@0-0', + regionalTimeserial: 'a@0-0', map: { entries: { key: { - timeserial: '@0-0', + timeserial: 'a@0-0', data: { value: 1, }, @@ -285,6 +286,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], }, client); }); + /** @nospec */ it('builds state object tree from STATE_SYNC sequence on channel attachment', async function () { const helper = this.test.helper; const client = RealtimeWithLiveObjects(helper); @@ -338,6 +340,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], }, client); }); + /** @nospec */ it('LiveCounter is initialized with initial value from STATE_SYNC sequence', async function () { const helper = this.test.helper; const client = RealtimeWithLiveObjects(helper); @@ -362,6 +365,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], }, client); }); + /** @nospec */ it('LiveMap is initialized with initial value from STATE_SYNC sequence', async function () { const helper = this.test.helper; const client = RealtimeWithLiveObjects(helper); @@ -408,6 +412,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], }, client); }); + /** @nospec */ it('LiveMaps can reference the same object in their keys', async function () { const helper = this.test.helper; const client = RealtimeWithLiveObjects(helper); @@ -447,6 +452,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], }); }); + /** @nospec */ it('can attach to channel with LiveObjects state modes', async function () { const helper = this.test.helper; const client = helper.AblyRealtime(); From bda102caab6e67219d8a38374f6b773ab8dd0362 Mon Sep 17 00:00:00 2001 From: Andrew Bulat Date: Tue, 22 Oct 2024 06:34:25 +0100 Subject: [PATCH 08/11] Add LiveObjects tests for applying incoming operation messages outside sync sequence --- test/realtime/live_objects.test.js | 499 ++++++++++++++++++++++++++++- 1 file changed, 498 insertions(+), 1 deletion(-) diff --git a/test/realtime/live_objects.test.js b/test/realtime/live_objects.test.js index ae2515738..c0da311f3 100644 --- a/test/realtime/live_objects.test.js +++ b/test/realtime/live_objects.test.js @@ -53,7 +53,51 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], }); /** @nospec */ - it('doesn’t break when it receives a STATE_SYNC ProtocolMessage', async function () { + it(`doesn't break when it receives a STATE ProtocolMessage`, async function () { + const helper = this.test.helper; + const testClient = helper.AblyRealtime(); + + await helper.monitorConnectionThenCloseAndFinish(async () => { + const testChannel = testClient.channels.get('channel'); + await testChannel.attach(); + + const receivedMessagePromise = new Promise((resolve) => testChannel.subscribe(resolve)); + + const publishClient = helper.AblyRealtime(); + + await helper.monitorConnectionThenCloseAndFinish(async () => { + // inject STATE message that should be ignored and not break anything without LiveObjects plugin + helper.recordPrivateApi('call.channel.processMessage'); + helper.recordPrivateApi('call.makeProtocolMessageFromDeserialized'); + await testChannel.processMessage( + createPM({ + action: 19, + channel: 'channel', + channelSerial: 'serial:', + state: [ + { + operation: { + action: 1, + objectId: 'root', + mapOp: { key: 'stringKey', data: { value: 'stringValue' } }, + }, + serial: 'a@0-0', + }, + ], + }), + ); + + const publishChannel = publishClient.channels.get('channel'); + await publishChannel.publish(null, 'test'); + + // regular message subscriptions should still work after processing STATE_SYNC message without LiveObjects plugin + await receivedMessagePromise; + }, publishClient); + }, testClient); + }); + + /** @nospec */ + it(`doesn't break when it receives a STATE_SYNC ProtocolMessage`, async function () { const helper = this.test.helper; const testClient = helper.AblyRealtime(); @@ -450,6 +494,459 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], ); }, client); }); + + const primitiveKeyData = [ + { key: 'stringKey', data: { value: 'stringValue' } }, + { key: 'emptyStringKey', data: { value: '' } }, + { + key: 'bytesKey', + data: { value: 'eyJwcm9kdWN0SWQiOiAiMDAxIiwgInByb2R1Y3ROYW1lIjogImNhciJ9', encoding: 'base64' }, + }, + { key: 'emptyBytesKey', data: { value: '', encoding: 'base64' } }, + { key: 'numberKey', data: { value: 1 } }, + { key: 'zeroKey', data: { value: 0 } }, + { key: 'trueKey', data: { value: true } }, + { key: 'falseKey', data: { value: false } }, + ]; + const primitiveMapsFixtures = [ + { name: 'emptyMap' }, + { + name: 'valuesMap', + entries: primitiveKeyData.reduce((acc, v) => { + acc[v.key] = { data: v.data }; + return acc; + }, {}), + }, + ]; + const countersFixtures = [ + { name: 'emptyCounter' }, + { name: 'zeroCounter', count: 0 }, + { name: 'valueCounter', count: 10 }, + { name: 'negativeValueCounter', count: -10 }, + { name: 'maxSafeIntegerCounter', count: Number.MAX_SAFE_INTEGER }, + { name: 'negativeMaxSafeIntegerCounter', count: -Number.MAX_SAFE_INTEGER }, + ]; + const applyOperationsScenarios = [ + { + description: 'MAP_CREATE with primitives', + action: async (ctx) => { + const { root, liveObjectsHelper, channelName } = ctx; + + // LiveObjects public API allows us to check value of objects we've created based on MAP_CREATE ops + // if we assign those objects to another map (root for example), as there is no way to access those objects from the internal pool directly. + // however, in this test we put heavy focus on the data that is being created as the result of the MAP_CREATE op. + + // check no maps exist on root + primitiveMapsFixtures.forEach((fixture) => { + const key = fixture.name; + expect(root.get(key, `Check "${key}" key doesn't exist on root before applying MAP_CREATE ops`)).to.not + .exist; + }); + + // create new maps and set on root + await Promise.all( + primitiveMapsFixtures.map((fixture) => + liveObjectsHelper.createAndSetOnMap(channelName, { + mapObjectId: 'root', + key: fixture.name, + createOp: liveObjectsHelper.mapCreateOp({ entries: fixture.entries }), + }), + ), + ); + + // check created maps + primitiveMapsFixtures.forEach((fixture) => { + const key = fixture.name; + const mapObj = root.get(key); + + // check all maps exist on root + expect(mapObj, `Check map at "${key}" key in root exists`).to.exist; + expect(mapObj.constructor.name).to.equal( + 'LiveMap', + `Check map at "${key}" key in root is of type LiveMap`, + ); + + // check primitive maps have correct values + expect(mapObj.size()).to.equal( + Object.keys(fixture.entries ?? {}).length, + `Check map "${key}" has correct number of keys`, + ); + + Object.entries(fixture.entries ?? {}).forEach(([key, keyData]) => { + if (keyData.data.encoding) { + expect( + BufferUtils.areBuffersEqual(mapObj.get(key), BufferUtils.base64Decode(keyData.data.value)), + `Check map "${key}" has correct value for "${key}" key`, + ).to.be.true; + } else { + expect(mapObj.get(key)).to.equal( + keyData.data.value, + `Check map "${key}" has correct value for "${key}" key`, + ); + } + }); + }); + }, + }, + + { + description: 'MAP_CREATE with object ids', + action: async (ctx) => { + const { root, liveObjectsHelper, channelName } = ctx; + const withReferencesMapKey = 'withReferencesMap'; + + // LiveObjects public API allows us to check value of objects we've created based on MAP_CREATE ops + // if we assign those objects to another map (root for example), as there is no way to access those objects from the internal pool directly. + // however, in this test we put heavy focus on the data that is being created as the result of the MAP_CREATE op. + + // check map does not exist on root + expect( + root.get( + withReferencesMapKey, + `Check "${withReferencesMapKey}" key doesn't exist on root before applying MAP_CREATE ops`, + ), + ).to.not.exist; + + // create map with references. need to created referenced objects first to obtain their object ids + const { objectId: referencedMapObjectId } = await liveObjectsHelper.stateRequest( + channelName, + liveObjectsHelper.mapCreateOp({ entries: { stringKey: { data: { value: 'stringValue' } } } }), + ); + const { objectId: referencedCounterObjectId } = await liveObjectsHelper.stateRequest( + channelName, + liveObjectsHelper.counterCreateOp({ count: 1 }), + ); + await liveObjectsHelper.createAndSetOnMap(channelName, { + mapObjectId: 'root', + key: withReferencesMapKey, + createOp: liveObjectsHelper.mapCreateOp({ + entries: { + mapReference: { data: { objectId: referencedMapObjectId } }, + counterReference: { data: { objectId: referencedCounterObjectId } }, + }, + }), + }); + + // check map with references exist on root + const withReferencesMap = root.get(withReferencesMapKey); + expect(withReferencesMap, `Check map at "${withReferencesMapKey}" key in root exists`).to.exist; + expect(withReferencesMap.constructor.name).to.equal( + 'LiveMap', + `Check map at "${withReferencesMapKey}" key in root is of type LiveMap`, + ); + + // check map with references has correct values + expect(withReferencesMap.size()).to.equal( + 2, + `Check map "${withReferencesMapKey}" has correct number of keys`, + ); + + const referencedCounter = withReferencesMap.get('counterReference'); + const referencedMap = withReferencesMap.get('mapReference'); + + expect(referencedCounter, `Check counter at "counterReference" exists`).to.exist; + expect(referencedCounter.constructor.name).to.equal( + 'LiveCounter', + `Check counter at "counterReference" key is of type LiveCounter`, + ); + expect(referencedCounter.value()).to.equal(1, 'Check counter at "counterReference" key has correct value'); + + expect(referencedMap, `Check map at "mapReference" key exists`).to.exist; + expect(referencedMap.constructor.name).to.equal( + 'LiveMap', + `Check map at "mapReference" key is of type LiveMap`, + ); + expect(referencedMap.size()).to.equal(1, 'Check map at "mapReference" key has correct number of keys'); + expect(referencedMap.get('stringKey')).to.equal( + 'stringValue', + 'Check map at "mapReference" key has correct "stringKey" value', + ); + }, + }, + + { + description: 'MAP_SET with primitives', + action: async (ctx) => { + const { root, liveObjectsHelper, channelName } = ctx; + + // check root is empty before ops + primitiveKeyData.forEach((keyData) => { + expect( + root.get(keyData.key, `Check "${keyData.key}" key doesn't exist on root before applying MAP_SET ops`), + ).to.not.exist; + }); + + // apply MAP_SET ops + await Promise.all( + primitiveKeyData.map((keyData) => + liveObjectsHelper.stateRequest( + channelName, + liveObjectsHelper.mapSetOp({ + objectId: 'root', + key: keyData.key, + data: keyData.data, + }), + ), + ), + ); + + // check everything is applied correctly + primitiveKeyData.forEach((keyData) => { + if (keyData.data.encoding) { + expect( + BufferUtils.areBuffersEqual(root.get(keyData.key), BufferUtils.base64Decode(keyData.data.value)), + `Check root has correct value for "${keyData.key}" key after MAP_SET op`, + ).to.be.true; + } else { + expect(root.get(keyData.key)).to.equal( + keyData.data.value, + `Check root has correct value for "${keyData.key}" key after MAP_SET op`, + ); + } + }); + }, + }, + + { + description: 'MAP_SET with object ids', + action: async (ctx) => { + const { root, liveObjectsHelper, channelName } = ctx; + + // check no object ids are set on root + expect( + root.get('keyToCounter', `Check "keyToCounter" key doesn't exist on root before applying MAP_SET ops`), + ).to.not.exist; + expect(root.get('keyToMap', `Check "keyToMap" key doesn't exist on root before applying MAP_SET ops`)).to + .not.exist; + + // create new objects and set on root + await liveObjectsHelper.createAndSetOnMap(channelName, { + mapObjectId: 'root', + key: 'keyToCounter', + createOp: liveObjectsHelper.counterCreateOp({ count: 1 }), + }); + + await liveObjectsHelper.createAndSetOnMap(channelName, { + mapObjectId: 'root', + key: 'keyToMap', + createOp: liveObjectsHelper.mapCreateOp({ + entries: { + stringKey: { data: { value: 'stringValue' } }, + }, + }), + }); + + // check root has refs to new objects and they are not zero-value + const counter = root.get('keyToCounter'); + const map = root.get('keyToMap'); + + expect(counter, 'Check counter at "keyToCounter" key in root exists').to.exist; + expect(counter.constructor.name).to.equal( + 'LiveCounter', + 'Check counter at "keyToCounter" key in root is of type LiveCounter', + ); + expect(counter.value()).to.equal(1, 'Check counter at "keyToCounter" key in root has correct value'); + + expect(map, 'Check map at "keyToMap" key in root exists').to.exist; + expect(map.constructor.name).to.equal('LiveMap', 'Check map at "keyToMap" key in root is of type LiveMap'); + expect(map.size()).to.equal(1, 'Check map at "keyToMap" key in root has correct number of keys'); + expect(map.get('stringKey')).to.equal( + 'stringValue', + 'Check map at "keyToMap" key in root has correct "stringKey" value', + ); + }, + }, + + { + description: 'MAP_REMOVE', + action: async (ctx) => { + const { root, liveObjectsHelper, channelName } = ctx; + const mapKey = 'map'; + + // create new map and set on root + const { objectId: mapObjectId } = await liveObjectsHelper.createAndSetOnMap(channelName, { + mapObjectId: 'root', + key: mapKey, + createOp: liveObjectsHelper.mapCreateOp({ + entries: { + shouldStay: { data: { value: 'foo' } }, + shouldDelete: { data: { value: 'bar' } }, + }, + }), + }); + + const map = root.get(mapKey); + // check map has expected keys before MAP_REMOVE ops + expect(map.size()).to.equal( + 2, + `Check map at "${mapKey}" key in root has correct number of keys before MAP_REMOVE`, + ); + expect(map.get('shouldStay')).to.equal( + 'foo', + `Check map at "${mapKey}" key in root has correct "shouldStay" value before MAP_REMOVE`, + ); + expect(map.get('shouldDelete')).to.equal( + 'bar', + `Check map at "${mapKey}" key in root has correct "shouldDelete" value before MAP_REMOVE`, + ); + + // send MAP_REMOVE op + await liveObjectsHelper.stateRequest( + channelName, + liveObjectsHelper.mapRemoveOp({ + objectId: mapObjectId, + key: 'shouldDelete', + }), + ); + + // check map has correct keys after MAP_REMOVE ops + expect(map.size()).to.equal( + 1, + `Check map at "${mapKey}" key in root has correct number of keys after MAP_REMOVE`, + ); + expect(map.get('shouldStay')).to.equal( + 'foo', + `Check map at "${mapKey}" key in root has correct "shouldStay" value after MAP_REMOVE`, + ); + expect( + map.get('shouldDelete'), + `Check map at "${mapKey}" key in root has no "shouldDelete" key after MAP_REMOVE`, + ).to.not.exist; + }, + }, + + { + description: 'COUNTER_CREATE', + action: async (ctx) => { + const { root, liveObjectsHelper, channelName } = ctx; + + // LiveObjects public API allows us to check value of objects we've created based on COUNTER_CREATE ops + // if we assign those objects to another map (root for example), as there is no way to access those objects from the internal pool directly. + // however, in this test we put heavy focus on the data that is being created as the result of the COUNTER_CREATE op. + + // check no counters exist on root + countersFixtures.forEach((fixture) => { + const key = fixture.name; + expect(root.get(key, `Check "${key}" key doesn't exist on root before applying COUNTER_CREATE ops`)).to + .not.exist; + }); + + // create new counters and set on root + await Promise.all( + countersFixtures.map((fixture) => + liveObjectsHelper.createAndSetOnMap(channelName, { + mapObjectId: 'root', + key: fixture.name, + createOp: liveObjectsHelper.counterCreateOp({ count: fixture.count }), + }), + ), + ); + + // check created counters + countersFixtures.forEach((fixture) => { + const key = fixture.name; + const counterObj = root.get(key); + + // check all counters exist on root + expect(counterObj, `Check counter at "${key}" key in root exists`).to.exist; + expect(counterObj.constructor.name).to.equal( + 'LiveCounter', + `Check counter at "${key}" key in root is of type LiveCounter`, + ); + + // check counters have correct values + expect(counterObj.value()).to.equal( + // if count was not set, should default to 0 + fixture.count ?? 0, + `Check counter at "${key}" key in root has correct value`, + ); + }); + }, + }, + + { + description: 'COUNTER_INC', + action: async (ctx) => { + const { root, liveObjectsHelper, channelName } = ctx; + const counterKey = 'counter'; + let expectedCounterValue = 0; + + // create new counter and set on root + const { objectId: counterObjectId } = await liveObjectsHelper.createAndSetOnMap(channelName, { + mapObjectId: 'root', + key: counterKey, + createOp: liveObjectsHelper.counterCreateOp({ count: expectedCounterValue }), + }); + + const counter = root.get(counterKey); + // check counter has expected value before COUNTER_INC + expect(counter.value()).to.equal( + expectedCounterValue, + `Check counter at "${counterKey}" key in root has correct value before COUNTER_INC`, + ); + + const increments = [ + 1, // value=1 + 10, // value=11 + 100, // value=111 + 1000000, // value=1000111 + -1000111, // value=0 + -1, // value=-1 + -10, // value=-11 + -100, // value=-111 + -1000000, // value=-1000111 + 1000111, // value=0 + Number.MAX_SAFE_INTEGER, // value=9007199254740991 + // do next decrements in 2 steps as opposed to multiplying by -2 to prevent overflow + -Number.MAX_SAFE_INTEGER, // value=0 + -Number.MAX_SAFE_INTEGER, // value=-9007199254740991 + ]; + + // send increments one at a time and check expected value + for (let i = 0; i < increments.length; i++) { + const increment = increments[i]; + + await liveObjectsHelper.stateRequest( + channelName, + liveObjectsHelper.counterIncOp({ + objectId: counterObjectId, + amount: increment, + }), + ); + expectedCounterValue += increment; + + expect(counter.value()).to.equal( + expectedCounterValue, + `Check counter at "${counterKey}" key in root has correct value after ${i + 1} COUNTER_INC ops`, + ); + } + }, + }, + ]; + + for (const scenario of applyOperationsScenarios) { + if (scenario.skip === true) { + continue; + } + + /** @nospec */ + it(`can apply ${scenario.description} state operation messages`, async function () { + const helper = this.test.helper; + const liveObjectsHelper = new LiveObjectsHelper(helper); + const client = RealtimeWithLiveObjects(helper); + + await helper.monitorConnectionThenCloseAndFinish(async () => { + const channelName = `channel_can_apply_${scenario.description}`; + const channel = client.channels.get(channelName, channelOptionsWithLiveObjects()); + const liveObjects = channel.liveObjects; + + await channel.attach(); + const root = await liveObjects.getRoot(); + + await scenario.action({ root, liveObjectsHelper, channelName }); + }, client); + }); + } }); /** @nospec */ From 396085263e2d44e694d61359f19b71e402c718f4 Mon Sep 17 00:00:00 2001 From: Andrew Bulat Date: Tue, 22 Oct 2024 10:09:20 +0100 Subject: [PATCH 09/11] Update `moduleReport` config with latest bundle info --- scripts/moduleReport.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/scripts/moduleReport.ts b/scripts/moduleReport.ts index e260f0783..083f18ab9 100644 --- a/scripts/moduleReport.ts +++ b/scripts/moduleReport.ts @@ -6,7 +6,7 @@ import { gzip } from 'zlib'; import Table from 'cli-table'; // The maximum size we allow for a minimal useful Realtime bundle (i.e. one that can subscribe to a channel) -const minimalUsefulRealtimeBundleSizeThresholdsKiB = { raw: 100, gzip: 31 }; +const minimalUsefulRealtimeBundleSizeThresholdsKiB = { raw: 101, gzip: 31 }; const baseClientNames = ['BaseRest', 'BaseRealtime']; @@ -310,12 +310,15 @@ async function checkLiveObjectsPluginFiles() { // These are the files that are allowed to contribute >= `threshold` bytes to the LiveObjects bundle. const allowedFiles = new Set([ 'src/plugins/liveobjects/index.ts', + 'src/plugins/liveobjects/livecounter.ts', 'src/plugins/liveobjects/livemap.ts', 'src/plugins/liveobjects/liveobject.ts', 'src/plugins/liveobjects/liveobjects.ts', 'src/plugins/liveobjects/liveobjectspool.ts', + 'src/plugins/liveobjects/objectid.ts', 'src/plugins/liveobjects/statemessage.ts', 'src/plugins/liveobjects/syncliveobjectsdatapool.ts', + 'src/plugins/liveobjects/timeserial.ts', ]); return checkBundleFiles(pluginBundleInfo, allowedFiles, 100); From 3ed285d2d31a12ab6112cd366ef3e5ab5d864df9 Mon Sep 17 00:00:00 2001 From: Andrew Bulat Date: Fri, 25 Oct 2024 08:36:28 +0100 Subject: [PATCH 10/11] Add `zeroValue` static method to LiveMap and LiveCounter --- src/plugins/liveobjects/livecounter.ts | 9 +++++++++ src/plugins/liveobjects/livemap.ts | 9 +++++++++ src/plugins/liveobjects/liveobjectspool.ts | 14 +++++++------- 3 files changed, 25 insertions(+), 7 deletions(-) diff --git a/src/plugins/liveobjects/livecounter.ts b/src/plugins/liveobjects/livecounter.ts index b82b9f852..a8c3db683 100644 --- a/src/plugins/liveobjects/livecounter.ts +++ b/src/plugins/liveobjects/livecounter.ts @@ -16,6 +16,15 @@ export class LiveCounter extends LiveObject { super(liveObjects, initialData, objectId); } + /** + * Returns a {@link LiveCounter} instance with a 0 value. + * + * @internal + */ + static zeroValue(liveobjects: LiveObjects, isCreated: boolean, objectId?: string): LiveCounter { + return new LiveCounter(liveobjects, isCreated, null, objectId); + } + value(): number { return this._dataRef.data; } diff --git a/src/plugins/liveobjects/livemap.ts b/src/plugins/liveobjects/livemap.ts index 90582d3a7..2b8c27c55 100644 --- a/src/plugins/liveobjects/livemap.ts +++ b/src/plugins/liveobjects/livemap.ts @@ -50,6 +50,15 @@ export class LiveMap extends LiveObject { super(liveObjects, initialData, objectId); } + /** + * Returns a {@link LiveMap} instance with an empty map data. + * + * @internal + */ + static zeroValue(liveobjects: LiveObjects, objectId?: string): LiveMap { + return new LiveMap(liveobjects, MapSemantics.LWW, null, objectId); + } + static liveMapDataFromMapEntries(client: BaseClient, entries: Record): LiveMapData { const liveMapData: LiveMapData = { data: new Map(), diff --git a/src/plugins/liveobjects/liveobjectspool.ts b/src/plugins/liveobjects/liveobjectspool.ts index 02123f17c..6db02c78e 100644 --- a/src/plugins/liveobjects/liveobjectspool.ts +++ b/src/plugins/liveobjects/liveobjectspool.ts @@ -54,12 +54,12 @@ export class LiveObjectsPool { let zeroValueObject: LiveObject; switch (parsedObjectId.type) { case 'map': { - zeroValueObject = new LiveMap(this._liveObjects, MapSemantics.LWW, null, objectId); + zeroValueObject = LiveMap.zeroValue(this._liveObjects, objectId); break; } case 'counter': - zeroValueObject = new LiveCounter(this._liveObjects, false, null, objectId); + zeroValueObject = LiveCounter.zeroValue(this._liveObjects, false, objectId); break; } @@ -125,7 +125,7 @@ export class LiveObjectsPool { private _getInitialPool(): Map { const pool = new Map(); - const root = new LiveMap(this._liveObjects, MapSemantics.LWW, null, ROOT_OBJECT_ID); + const root = LiveMap.zeroValue(this._liveObjects, ROOT_OBJECT_ID); pool.set(root.getObjectId(), root); return pool; } @@ -133,8 +133,8 @@ export class LiveObjectsPool { private _handleCounterCreate(stateOperation: StateOperation): void { let counter: LiveCounter; if (this._client.Utils.isNil(stateOperation.counter)) { - // if a counter object is missing for the COUNTER_CREATE op, the initial value is implicitly 0 in this case. - counter = new LiveCounter(this._liveObjects, true, { data: 0 }, stateOperation.objectId); + // if a counter object is missing for the COUNTER_CREATE op, the initial value is implicitly a zero-value counter. + counter = LiveCounter.zeroValue(this._liveObjects, true, stateOperation.objectId); } else { counter = new LiveCounter( this._liveObjects, @@ -150,8 +150,8 @@ export class LiveObjectsPool { private _handleMapCreate(stateOperation: StateOperation): void { let map: LiveMap; if (this._client.Utils.isNil(stateOperation.map)) { - // if a map object is missing for the MAP_CREATE op, the initial value is implicitly an empty map. - map = new LiveMap(this._liveObjects, MapSemantics.LWW, null, stateOperation.objectId); + // if a map object is missing for the MAP_CREATE op, the initial value is implicitly a zero-value map. + map = LiveMap.zeroValue(this._liveObjects, stateOperation.objectId); } else { const objectData = LiveMap.liveMapDataFromMapEntries(this._client, stateOperation.map.entries ?? {}); map = new LiveMap( From b988c35ec5e39e0489a773a27ca19edf2b376a75 Mon Sep 17 00:00:00 2001 From: Andrew Bulat Date: Wed, 30 Oct 2024 11:38:20 +0000 Subject: [PATCH 11/11] Fix LiveObjects tests fail due to class name changes with static properties --- test/realtime/live_objects.test.js | 51 +++++++++++++++--------------- 1 file changed, 25 insertions(+), 26 deletions(-) diff --git a/test/realtime/live_objects.test.js b/test/realtime/live_objects.test.js index c0da311f3..885760acf 100644 --- a/test/realtime/live_objects.test.js +++ b/test/realtime/live_objects.test.js @@ -24,6 +24,12 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], }; } + function expectInstanceOf(object, className, msg) { + // esbuild changes the name for classes with static method to include an underscore as prefix. + // so LiveMap becomes _LiveMap. we account for it here. + expect(object.constructor.name).to.match(new RegExp(`_?${className}`), msg); + } + describe('realtime/live_objects', function () { this.timeout(60 * 1000); @@ -146,7 +152,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], const helper = this.test.helper; const client = RealtimeWithLiveObjects(helper, { autoConnect: false }); const channel = client.channels.get('channel'); - expect(channel.liveObjects.constructor.name).to.equal('LiveObjects'); + expectInstanceOf(channel.liveObjects, 'LiveObjects'); }); /** @nospec */ @@ -161,7 +167,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], await channel.attach(); const root = await liveObjects.getRoot(); - expect(root.constructor.name).to.equal('LiveMap'); + expectInstanceOf(root, 'LiveMap', 'root object should be of LiveMap type'); }, client); }); @@ -178,7 +184,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], const root = await liveObjects.getRoot(); helper.recordPrivateApi('call.LiveObject.getObjectId'); - expect(root.getObjectId()).to.equal('root'); + expect(root.getObjectId()).to.equal('root', 'root object should have an object id "root"'); }, client); }); @@ -352,16 +358,13 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], counterKeys.forEach((key) => { const counter = root.get(key); expect(counter, `Check counter at key="${key}" in root exists`).to.exist; - expect(counter.constructor.name).to.equal( - 'LiveCounter', - `Check counter at key="${key}" in root is of type LiveCounter`, - ); + expectInstanceOf(counter, 'LiveCounter', `Check counter at key="${key}" in root is of type LiveCounter`); }); mapKeys.forEach((key) => { const map = root.get(key); expect(map, `Check map at key="${key}" in root exists`).to.exist; - expect(map.constructor.name).to.equal('LiveMap', `Check map at key="${key}" in root is of type LiveMap`); + expectInstanceOf(map, 'LiveMap', `Check map at key="${key}" in root is of type LiveMap`); }); const valuesMap = root.get('valuesMap'); @@ -474,10 +477,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], const counterFromReferencedMap = referencedMap.get('counterKey'); expect(counterFromReferencedMap, 'Check nested counter exists at a key in a map').to.exist; - expect(counterFromReferencedMap.constructor.name).to.equal( - 'LiveCounter', - 'Check nested counter is of type LiveCounter', - ); + expectInstanceOf(counterFromReferencedMap, 'LiveCounter', 'Check nested counter is of type LiveCounter'); expect(counterFromReferencedMap).to.equal( referencedCounter, 'Check nested counter is the same object instance as counter on the root', @@ -486,7 +486,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], const mapFromValuesMap = valuesMap.get('mapKey'); expect(mapFromValuesMap, 'Check nested map exists at a key in a map').to.exist; - expect(mapFromValuesMap.constructor.name).to.equal('LiveMap', 'Check nested map is of type LiveMap'); + expectInstanceOf(mapFromValuesMap, 'LiveMap', 'Check nested map is of type LiveMap'); expect(mapFromValuesMap.size()).to.equal(1, 'Check nested map has correct number of keys'); expect(mapFromValuesMap).to.equal( referencedMap, @@ -561,10 +561,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], // check all maps exist on root expect(mapObj, `Check map at "${key}" key in root exists`).to.exist; - expect(mapObj.constructor.name).to.equal( - 'LiveMap', - `Check map at "${key}" key in root is of type LiveMap`, - ); + expectInstanceOf(mapObj, 'LiveMap', `Check map at "${key}" key in root is of type LiveMap`); // check primitive maps have correct values expect(mapObj.size()).to.equal( @@ -630,7 +627,8 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], // check map with references exist on root const withReferencesMap = root.get(withReferencesMapKey); expect(withReferencesMap, `Check map at "${withReferencesMapKey}" key in root exists`).to.exist; - expect(withReferencesMap.constructor.name).to.equal( + expectInstanceOf( + withReferencesMap, 'LiveMap', `Check map at "${withReferencesMapKey}" key in root is of type LiveMap`, ); @@ -645,17 +643,16 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], const referencedMap = withReferencesMap.get('mapReference'); expect(referencedCounter, `Check counter at "counterReference" exists`).to.exist; - expect(referencedCounter.constructor.name).to.equal( + expectInstanceOf( + referencedCounter, 'LiveCounter', `Check counter at "counterReference" key is of type LiveCounter`, ); expect(referencedCounter.value()).to.equal(1, 'Check counter at "counterReference" key has correct value'); expect(referencedMap, `Check map at "mapReference" key exists`).to.exist; - expect(referencedMap.constructor.name).to.equal( - 'LiveMap', - `Check map at "mapReference" key is of type LiveMap`, - ); + expectInstanceOf(referencedMap, 'LiveMap', `Check map at "mapReference" key is of type LiveMap`); + expect(referencedMap.size()).to.equal(1, 'Check map at "mapReference" key has correct number of keys'); expect(referencedMap.get('stringKey')).to.equal( 'stringValue', @@ -741,14 +738,15 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], const map = root.get('keyToMap'); expect(counter, 'Check counter at "keyToCounter" key in root exists').to.exist; - expect(counter.constructor.name).to.equal( + expectInstanceOf( + counter, 'LiveCounter', 'Check counter at "keyToCounter" key in root is of type LiveCounter', ); expect(counter.value()).to.equal(1, 'Check counter at "keyToCounter" key in root has correct value'); expect(map, 'Check map at "keyToMap" key in root exists').to.exist; - expect(map.constructor.name).to.equal('LiveMap', 'Check map at "keyToMap" key in root is of type LiveMap'); + expectInstanceOf(map, 'LiveMap', 'Check map at "keyToMap" key in root is of type LiveMap'); expect(map.size()).to.equal(1, 'Check map at "keyToMap" key in root has correct number of keys'); expect(map.get('stringKey')).to.equal( 'stringValue', @@ -849,7 +847,8 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], // check all counters exist on root expect(counterObj, `Check counter at "${key}" key in root exists`).to.exist; - expect(counterObj.constructor.name).to.equal( + expectInstanceOf( + counterObj, 'LiveCounter', `Check counter at "${key}" key in root is of type LiveCounter`, );