From c8713d1984a2661e055f8603fd1b6e7bf02fa620 Mon Sep 17 00:00:00 2001 From: Andrew Bulat Date: Fri, 22 Nov 2024 08:10:32 +0000 Subject: [PATCH] Handle `createOp` field on StateObject messages Refactors LiveMap/LiveCounter object creation and moves most of the creation related busy work inside those classes. Resolves DTP-1076 --- src/plugins/liveobjects/livecounter.ts | 128 ++++++----- src/plugins/liveobjects/livemap.ts | 204 ++++++++++++------ src/plugins/liveobjects/liveobject.ts | 83 ++++--- src/plugins/liveobjects/liveobjects.ts | 19 +- src/plugins/liveobjects/liveobjectspool.ts | 75 +------ src/plugins/liveobjects/statemessage.ts | 35 ++- .../liveobjects/syncliveobjectsdatapool.ts | 41 +--- test/common/modules/live_objects_helper.js | 23 +- test/realtime/live_objects.test.js | 8 +- 9 files changed, 343 insertions(+), 273 deletions(-) diff --git a/src/plugins/liveobjects/livecounter.ts b/src/plugins/liveobjects/livecounter.ts index 0327adafc..bfa3a99cc 100644 --- a/src/plugins/liveobjects/livecounter.ts +++ b/src/plugins/liveobjects/livecounter.ts @@ -1,7 +1,7 @@ import { LiveObject, LiveObjectData, LiveObjectUpdate, LiveObjectUpdateNoop } from './liveobject'; import { LiveObjects } from './liveobjects'; -import { StateCounter, StateCounterOp, StateMessage, StateOperation, StateOperationAction } from './statemessage'; -import { DefaultTimeserial, Timeserial } from './timeserial'; +import { StateCounterOp, StateMessage, StateObject, StateOperation, StateOperationAction } from './statemessage'; +import { DefaultTimeserial } from './timeserial'; export interface LiveCounterData extends LiveObjectData { data: number; @@ -12,46 +12,29 @@ export interface LiveCounterUpdate extends LiveObjectUpdate { } export class LiveCounter extends LiveObject { - constructor( - liveObjects: LiveObjects, - private _created: boolean, - initialData?: LiveCounterData | null, - objectId?: string, - siteTimeserials?: Record, - ) { - super(liveObjects, initialData, objectId, siteTimeserials); - } - /** * Returns a {@link LiveCounter} instance with a 0 value. * * @internal */ - static zeroValue( - liveobjects: LiveObjects, - isCreated: boolean, - objectId?: string, - siteTimeserials?: Record, - ): LiveCounter { - return new LiveCounter(liveobjects, isCreated, null, objectId, siteTimeserials); - } - - value(): number { - return this._dataRef.data; + static zeroValue(liveobjects: LiveObjects, objectId: string): LiveCounter { + return new LiveCounter(liveobjects, objectId); } /** + * Returns a {@link LiveCounter} instance based on the provided state object. + * The provided state object must hold a valid counter object data. + * * @internal */ - isCreated(): boolean { - return this._created; + static fromStateObject(liveobjects: LiveObjects, stateObject: StateObject): LiveCounter { + const obj = new LiveCounter(liveobjects, stateObject.objectId); + obj.overrideWithStateObject(stateObject); + return obj; } - /** - * @internal - */ - setCreated(created: boolean): void { - this._created = created; + value(): number { + return this._dataRef.data; } /** @@ -83,7 +66,7 @@ export class LiveCounter extends LiveObject let update: LiveCounterUpdate | LiveObjectUpdateNoop; switch (op.action) { case StateOperationAction.COUNTER_CREATE: - update = this._applyCounterCreate(op.counter); + update = this._applyCounterCreate(op); break; case StateOperationAction.COUNTER_INC: @@ -107,15 +90,69 @@ export class LiveCounter extends LiveObject this.notifyUpdated(update); } + /** + * @internal + */ + overrideWithStateObject(stateObject: StateObject): LiveCounterUpdate { + if (stateObject.objectId !== this.getObjectId()) { + throw new this._client.ErrorInfo( + `Invalid state object: state object objectId=${stateObject.objectId}; LiveCounter objectId=${this.getObjectId()}`, + 50000, + 500, + ); + } + + if (!this._client.Utils.isNil(stateObject.createOp)) { + // it is expected that create operation can be missing in the state object, so only validate it when it exists + if (stateObject.createOp.objectId !== this.getObjectId()) { + throw new this._client.ErrorInfo( + `Invalid state object: state object createOp objectId=${stateObject.createOp?.objectId}; LiveCounter objectId=${this.getObjectId()}`, + 50000, + 500, + ); + } + + if (stateObject.createOp.action !== StateOperationAction.COUNTER_CREATE) { + throw new this._client.ErrorInfo( + `Invalid state object: state object createOp action=${stateObject.createOp?.action}; LiveCounter objectId=${this.getObjectId()}`, + 50000, + 500, + ); + } + } + + const previousDataRef = this._dataRef; + // override all relevant data for this object with data from the state object + this._createOperationIsMerged = false; + this._dataRef = { data: stateObject.counter?.count ?? 0 }; + this._siteTimeserials = this._timeserialMapFromStringMap(stateObject.siteTimeserials); + if (!this._client.Utils.isNil(stateObject.createOp)) { + this._mergeInitialDataFromCreateOperation(stateObject.createOp); + } + + return this._updateFromDataDiff(previousDataRef, this._dataRef); + } + protected _getZeroValueData(): LiveCounterData { return { data: 0 }; } - protected _updateFromDataDiff(currentDataRef: LiveCounterData, newDataRef: LiveCounterData): LiveCounterUpdate { - const counterDiff = newDataRef.data - currentDataRef.data; + protected _updateFromDataDiff(prevDataRef: LiveCounterData, newDataRef: LiveCounterData): LiveCounterUpdate { + const counterDiff = newDataRef.data - prevDataRef.data; return { update: { inc: counterDiff } }; } + protected _mergeInitialDataFromCreateOperation(stateOperation: StateOperation): LiveCounterUpdate { + // if a counter object is missing for the COUNTER_CREATE op, the initial value is implicitly 0 in this case. + // note that it is intentional to SUM the incoming count from the create op. + // if we got here, it means that current counter instance is missing the initial value in its data reference, + // which we're going to add now. + this._dataRef.data += stateOperation.counter?.count ?? 0; + this._createOperationIsMerged = true; + + return { update: { inc: stateOperation.counter?.count ?? 0 } }; + } + private _throwNoPayloadError(op: StateOperation): void { throw new this._client.ErrorInfo( `No payload found for ${op.action} op for LiveCounter objectId=${this.getObjectId()}`, @@ -124,32 +161,21 @@ export class LiveCounter extends LiveObject ); } - private _applyCounterCreate(op: StateCounter | undefined): LiveCounterUpdate | LiveObjectUpdateNoop { - if (this.isCreated()) { - // skip COUNTER_CREATE op if this counter is already created + private _applyCounterCreate(op: StateOperation): LiveCounterUpdate | LiveObjectUpdateNoop { + if (this._createOperationIsMerged) { + // There can't be two different create operation for the same object id, because the object id + // fully encodes that operation. This means we can safely ignore any new incoming create operations + // if we already merged it once. this._client.Logger.logAction( this._client.logger, this._client.Logger.LOG_MICRO, 'LiveCounter._applyCounterCreate()', - `skipping applying COUNTER_CREATE op on a counter instance as it is already created; objectId=${this._objectId}`, + `skipping applying COUNTER_CREATE op on a counter instance as it was already applied before; objectId=${this._objectId}`, ); return { noop: true }; } - if (this._client.Utils.isNil(op)) { - // if a counter object is missing for the COUNTER_CREATE op, the initial value is implicitly 0 in this case. - // we need to SUM the initial value to the current value due to the reasons below, but since it's a 0, we can skip addition operation - this.setCreated(true); - return { update: { inc: 0 } }; - } - - // note that it is intentional to SUM the incoming count from the create op. - // if we get here, it means that current counter instance wasn't initialized from the COUNTER_CREATE op, - // so it is missing the initial value that we're going to add now. - this._dataRef.data += op.count ?? 0; - this.setCreated(true); - - return { update: { inc: op.count ?? 0 } }; + return this._mergeInitialDataFromCreateOperation(op); } private _applyCounterInc(op: StateCounterOp): LiveCounterUpdate { diff --git a/src/plugins/liveobjects/livemap.ts b/src/plugins/liveobjects/livemap.ts index de19baa96..b29654c02 100644 --- a/src/plugins/liveobjects/livemap.ts +++ b/src/plugins/liveobjects/livemap.ts @@ -1,14 +1,13 @@ import deepEqual from 'deep-equal'; -import type BaseClient from 'common/lib/client/baseclient'; import { LiveObject, LiveObjectData, LiveObjectUpdate, LiveObjectUpdateNoop } from './liveobject'; import { LiveObjects } from './liveobjects'; import { MapSemantics, - StateMap, StateMapEntry, StateMapOp, StateMessage, + StateObject, StateOperation, StateOperationAction, StateValue, @@ -50,11 +49,9 @@ export class LiveMap extends LiveObject { constructor( liveObjects: LiveObjects, private _semantics: MapSemantics, - initialData?: LiveMapData | null, - objectId?: string, - siteTimeserials?: Record, + objectId: string, ) { - super(liveObjects, initialData, objectId, siteTimeserials); + super(liveObjects, objectId); } /** @@ -62,41 +59,20 @@ export class LiveMap extends LiveObject { * * @internal */ - static zeroValue(liveobjects: LiveObjects, objectId?: string, siteTimeserials?: Record): LiveMap { - return new LiveMap(liveobjects, MapSemantics.LWW, null, objectId, siteTimeserials); + static zeroValue(liveobjects: LiveObjects, objectId: string): LiveMap { + return new LiveMap(liveobjects, MapSemantics.LWW, objectId); } /** + * Returns a {@link LiveMap} instance based on the provided state object. + * The provided state object must hold a valid map object data. + * * @internal */ - static liveMapDataFromMapEntries(client: BaseClient, entries: Record): LiveMapData { - const liveMapData: LiveMapData = { - data: new Map(), - }; - - // need to iterate over entries manually to work around optional parameters from state object entries type - Object.entries(entries ?? {}).forEach(([key, entry]) => { - let liveData: StateData; - if (typeof entry.data.objectId !== 'undefined') { - liveData = { objectId: entry.data.objectId } as ObjectIdStateData; - } else { - liveData = { encoding: entry.data.encoding, value: entry.data.value } as ValueStateData; - } - - const liveDataEntry: MapEntry = { - ...entry, - timeserial: entry.timeserial - ? DefaultTimeserial.calculateTimeserial(client, entry.timeserial) - : DefaultTimeserial.zeroValueTimeserial(client), - // true only if we received explicit true. otherwise always false - tombstone: entry.tombstone === true, - data: liveData, - }; - - liveMapData.data.set(key, liveDataEntry); - }); - - return liveMapData; + static fromStateObject(liveobjects: LiveObjects, stateObject: StateObject): LiveMap { + const obj = new LiveMap(liveobjects, stateObject.map?.semantics!, stateObject.objectId); + obj.overrideWithStateObject(stateObject); + return obj; } /** @@ -170,7 +146,7 @@ export class LiveMap extends LiveObject { let update: LiveMapUpdate | LiveObjectUpdateNoop; switch (op.action) { case StateOperationAction.MAP_CREATE: - update = this._applyMapCreate(op.map); + update = this._applyMapCreate(op); break; case StateOperationAction.MAP_SET: @@ -204,14 +180,73 @@ export class LiveMap extends LiveObject { this.notifyUpdated(update); } + /** + * @internal + */ + overrideWithStateObject(stateObject: StateObject): LiveMapUpdate { + if (stateObject.objectId !== this.getObjectId()) { + throw new this._client.ErrorInfo( + `Invalid state object: state object objectId=${stateObject.objectId}; LiveMap objectId=${this.getObjectId()}`, + 50000, + 500, + ); + } + + if (stateObject.map?.semantics !== this._semantics) { + throw new this._client.ErrorInfo( + `Invalid state object: state object map semantics=${stateObject.map?.semantics}; LiveMap semantics=${this._semantics}`, + 50000, + 500, + ); + } + + if (!this._client.Utils.isNil(stateObject.createOp)) { + // it is expected that create operation can be missing in the state object, so only validate it when it exists + if (stateObject.createOp.objectId !== this.getObjectId()) { + throw new this._client.ErrorInfo( + `Invalid state object: state object createOp objectId=${stateObject.createOp?.objectId}; LiveMap objectId=${this.getObjectId()}`, + 50000, + 500, + ); + } + + if (stateObject.createOp.action !== StateOperationAction.MAP_CREATE) { + throw new this._client.ErrorInfo( + `Invalid state object: state object createOp action=${stateObject.createOp?.action}; LiveMap objectId=${this.getObjectId()}`, + 50000, + 500, + ); + } + + if (stateObject.createOp.map?.semantics !== this._semantics) { + throw new this._client.ErrorInfo( + `Invalid state object: state object createOp map semantics=${stateObject.createOp.map?.semantics}; LiveMap semantics=${this._semantics}`, + 50000, + 500, + ); + } + } + + const previousDataRef = this._dataRef; + // override all relevant data for this object with data from the state object + this._createOperationIsMerged = false; + this._dataRef = this._liveMapDataFromMapEntries(stateObject.map?.entries ?? {}); + this._siteTimeserials = this._timeserialMapFromStringMap(stateObject.siteTimeserials); + if (!this._client.Utils.isNil(stateObject.createOp)) { + this._mergeInitialDataFromCreateOperation(stateObject.createOp); + } + + return this._updateFromDataDiff(previousDataRef, this._dataRef); + } + protected _getZeroValueData(): LiveMapData { return { data: new Map() }; } - protected _updateFromDataDiff(currentDataRef: LiveMapData, newDataRef: LiveMapData): LiveMapUpdate { + protected _updateFromDataDiff(prevDataRef: LiveMapData, newDataRef: LiveMapData): LiveMapUpdate { const update: LiveMapUpdate = { update: {} }; - for (const [key, currentEntry] of currentDataRef.data.entries()) { + for (const [key, currentEntry] of prevDataRef.data.entries()) { // any non-tombstoned properties that exist on a current map, but not in the new data - got removed if (currentEntry.tombstone === false && !newDataRef.data.has(key)) { update.update[key] = 'removed'; @@ -219,7 +254,7 @@ export class LiveMap extends LiveObject { } for (const [key, newEntry] of newDataRef.data.entries()) { - if (!currentDataRef.data.has(key)) { + if (!prevDataRef.data.has(key)) { // if property does not exist in the current map, but new data has it as a non-tombstoned property - got updated if (newEntry.tombstone === false) { update.update[key] = 'updated'; @@ -233,7 +268,7 @@ export class LiveMap extends LiveObject { } // properties that exist both in current and new map data need to have their values compared to decide on the update type - const currentEntry = currentDataRef.data.get(key)!; + const currentEntry = prevDataRef.data.get(key)!; // compare tombstones first if (currentEntry.tombstone === true && newEntry.tombstone === false) { @@ -262,33 +297,17 @@ export class LiveMap extends LiveObject { return update; } - private _throwNoPayloadError(op: StateOperation): void { - throw new this._client.ErrorInfo( - `No payload found for ${op.action} op for LiveMap objectId=${this.getObjectId()}`, - 50000, - 500, - ); - } - - private _applyMapCreate(op: StateMap | undefined): LiveMapUpdate | LiveObjectUpdateNoop { - if (this._client.Utils.isNil(op)) { + protected _mergeInitialDataFromCreateOperation(stateOperation: StateOperation): LiveMapUpdate { + if (this._client.Utils.isNil(stateOperation.map)) { // if a map object is missing for the MAP_CREATE op, the initial value is implicitly an empty map. // in this case there is nothing to merge into the current map, so we can just end processing the op. return { update: {} }; } - if (this._semantics !== op.semantics) { - throw new this._client.ErrorInfo( - `Cannot apply MAP_CREATE op on LiveMap objectId=${this.getObjectId()}; map's semantics=${this._semantics}, but op expected ${op.semantics}`, - 50000, - 500, - ); - } - const aggregatedUpdate: LiveMapUpdate | LiveObjectUpdateNoop = { update: {} }; // in order to apply MAP_CREATE op for an existing map, we should merge their underlying entries keys. // we can do this by iterating over entries from MAP_CREATE op and apply changes on per-key basis as if we had MAP_SET, MAP_REMOVE operations. - Object.entries(op.entries ?? {}).forEach(([key, entry]) => { + Object.entries(stateOperation.map.entries ?? {}).forEach(([key, entry]) => { // for MAP_CREATE op we must use dedicated timeserial field available on an entry, instead of a timeserial on a message const opOriginTimeserial = entry.timeserial ? DefaultTimeserial.calculateTimeserial(this._client, entry.timeserial) @@ -311,9 +330,44 @@ export class LiveMap extends LiveObject { Object.assign(aggregatedUpdate.update, update.update); }); + this._createOperationIsMerged = true; + return aggregatedUpdate; } + private _throwNoPayloadError(op: StateOperation): void { + throw new this._client.ErrorInfo( + `No payload found for ${op.action} op for LiveMap objectId=${this.getObjectId()}`, + 50000, + 500, + ); + } + + private _applyMapCreate(op: StateOperation): LiveMapUpdate | LiveObjectUpdateNoop { + if (this._createOperationIsMerged) { + // There can't be two different create operation for the same object id, because the object id + // fully encodes that operation. This means we can safely ignore any new incoming create operations + // if we already merged it once. + this._client.Logger.logAction( + this._client.logger, + this._client.Logger.LOG_MICRO, + 'LiveMap._applyMapCreate()', + `skipping applying MAP_CREATE op on a map instance as it was already applied before; objectId=${this._objectId}`, + ); + return { noop: true }; + } + + if (this._semantics !== op.map?.semantics) { + throw new this._client.ErrorInfo( + `Cannot apply MAP_CREATE op on LiveMap objectId=${this.getObjectId()}; map's semantics=${this._semantics}, but op expected ${op.map?.semantics}`, + 50000, + 500, + ); + } + + return this._mergeInitialDataFromCreateOperation(op); + } + private _applyMapSet(op: StateMapOp, opOriginTimeserial: Timeserial): LiveMapUpdate | LiveObjectUpdateNoop { const { ErrorInfo, Utils } = this._client; @@ -398,4 +452,34 @@ export class LiveMap extends LiveObject { return { update: { [op.key]: 'removed' } }; } + + private _liveMapDataFromMapEntries(entries: Record): LiveMapData { + const liveMapData: LiveMapData = { + data: new Map(), + }; + + // need to iterate over entries manually to work around optional parameters from state object entries type + Object.entries(entries ?? {}).forEach(([key, entry]) => { + let liveData: StateData; + if (typeof entry.data.objectId !== 'undefined') { + liveData = { objectId: entry.data.objectId } as ObjectIdStateData; + } else { + liveData = { encoding: entry.data.encoding, value: entry.data.value } as ValueStateData; + } + + const liveDataEntry: MapEntry = { + ...entry, + timeserial: entry.timeserial + ? DefaultTimeserial.calculateTimeserial(this._client, entry.timeserial) + : DefaultTimeserial.zeroValueTimeserial(this._client), + // true only if we received explicit true. otherwise always false + tombstone: entry.tombstone === true, + data: liveData, + }; + + liveMapData.data.set(key, liveDataEntry); + }); + + return liveMapData; + } } diff --git a/src/plugins/liveobjects/liveobject.ts b/src/plugins/liveobjects/liveobject.ts index e18762dbd..04ae7d4e2 100644 --- a/src/plugins/liveobjects/liveobject.ts +++ b/src/plugins/liveobjects/liveobject.ts @@ -1,8 +1,8 @@ import type BaseClient from 'common/lib/client/baseclient'; import type EventEmitter from 'common/lib/util/eventemitter'; import { LiveObjects } from './liveobjects'; -import { StateMessage, StateOperation } from './statemessage'; -import { Timeserial } from './timeserial'; +import { StateMessage, StateObject, StateOperation } from './statemessage'; +import { DefaultTimeserial, Timeserial } from './timeserial'; enum LiveObjectEvents { Updated = 'Updated', @@ -32,22 +32,26 @@ export abstract class LiveObject< > { protected _client: BaseClient; protected _eventEmitter: EventEmitter; - protected _dataRef: TData; protected _objectId: string; + /** + * Represents an aggregated value for an object, which combines the initial value for an object from the create operation, + * and all state operations applied to the object. + */ + protected _dataRef: TData; protected _siteTimeserials: Record; + protected _createOperationIsMerged: boolean; - constructor( + protected constructor( protected _liveObjects: LiveObjects, - initialData?: TData | null, - objectId?: string, - siteTimeserials?: Record, + objectId: string, ) { this._client = this._liveObjects.getClient(); this._eventEmitter = new this._client.EventEmitter(this._client.logger); - this._dataRef = initialData ?? this._getZeroValueData(); - this._objectId = objectId ?? this._createObjectId(); + this._dataRef = this._getZeroValueData(); + this._createOperationIsMerged = false; + this._objectId = objectId; // use empty timeserials vector by default, so any future operation can be applied to this object - this._siteTimeserials = siteTimeserials ?? {}; + this._siteTimeserials = {}; } subscribe(listener: (update: TUpdate) => void): SubscribeResponse { @@ -83,26 +87,10 @@ export abstract class LiveObject< } /** - * Sets a new data reference for the LiveObject and returns an update object that describes the changes applied based on the object's previous value. + * Emits the {@link LiveObjectEvents.Updated} event with provided update object if it isn't a noop. * * @internal */ - setData(newDataRef: TData): TUpdate { - const update = this._updateFromDataDiff(this._dataRef, newDataRef); - this._dataRef = newDataRef; - return update; - } - - /** - * @internal - */ - setSiteTimeserials(siteTimeserials: Record): void { - this._siteTimeserials = siteTimeserials; - } - - /** - * @internal - */ notifyUpdated(update: TUpdate | LiveObjectUpdateNoop): void { // should not emit update event if update was noop if ((update as LiveObjectUpdateNoop).noop) { @@ -123,18 +111,57 @@ export abstract class LiveObject< return !siteTimeserial || opOriginTimeserial.after(siteTimeserial); } + protected _timeserialMapFromStringMap(stringTimeserialsMap: Record): Record { + const objTimeserialsMap = Object.entries(stringTimeserialsMap).reduce( + (acc, v) => { + const [key, timeserialString] = v; + acc[key] = DefaultTimeserial.calculateTimeserial(this._client, timeserialString); + return acc; + }, + {} as Record, + ); + + return objTimeserialsMap; + } + private _createObjectId(): string { // TODO: implement object id generation based on live object type and initial value return Math.random().toString().substring(2); } /** + * Apply state operation message on live object. + * * @internal */ abstract applyOperation(op: StateOperation, msg: StateMessage): void; + /** + * Overrides internal data for live object with data from the given state object. + * Provided state object should hold a valid data for current live object, e.g. counter data for LiveCounter, map data for LiveMap. + * + * State objects are received during SYNC sequence, and SYNC sequence is a source of truth for the current state of the objects, + * so we can use the data received from the SYNC sequence directly and override any data values or site timeserials this live object has + * without the need to merge them. + * + * Returns an update object that describes the changes applied based on the object's previous value. + * + * @internal + */ + abstract overrideWithStateObject(stateObject: StateObject): TUpdate; protected abstract _getZeroValueData(): TData; /** * Calculate the update object based on the current Live Object data and incoming new data. */ - protected abstract _updateFromDataDiff(currentDataRef: TData, newDataRef: TData): TUpdate; + protected abstract _updateFromDataDiff(prevDataRef: TData, newDataRef: TData): TUpdate; + /** + * Merges the initial data from the create operation into the live object state. + * + * Client SDKs do not need to keep around the state operation that created the object, + * so we can merge the initial data the first time we receive it for the object, + * and work with aggregated value after that. + * + * This saves us from needing to merge the initial value with operations applied to + * the object every time the object is read. + */ + protected abstract _mergeInitialDataFromCreateOperation(stateOperation: StateOperation): TUpdate; } diff --git a/src/plugins/liveobjects/liveobjects.ts b/src/plugins/liveobjects/liveobjects.ts index 2310b8691..49ad4eae3 100644 --- a/src/plugins/liveobjects/liveobjects.ts +++ b/src/plugins/liveobjects/liveobjects.ts @@ -7,7 +7,7 @@ import { LiveMap } from './livemap'; import { LiveObject, LiveObjectUpdate } from './liveobject'; import { LiveObjectsPool, ROOT_OBJECT_ID } from './liveobjectspool'; import { StateMessage } from './statemessage'; -import { LiveCounterDataEntry, SyncLiveObjectsDataPool } from './syncliveobjectsdatapool'; +import { SyncLiveObjectsDataPool } from './syncliveobjectsdatapool'; enum LiveObjectsEvents { SyncCompleted = 'SyncCompleted', @@ -196,16 +196,9 @@ export class LiveObjects { const regionalTimeserialObj = DefaultTimeserial.calculateTimeserial(this._client, entry.regionalTimeserial); if (existingObject) { - // SYNC sequence is a source of truth for the current state of the objects, - // so we can use the data received from the SYNC sequence directly - // without the need to merge data values or site timeserials. - const update = existingObject.setData(entry.objectData); - existingObject.setSiteTimeserials(entry.siteTimeserials); - if (existingObject instanceof LiveCounter) { - existingObject.setCreated((entry as LiveCounterDataEntry).created); - } - // store updates for existing objects to call subscription callbacks for all of them once the SYNC sequence is completed. - // this will ensure that clients get notified about changes only once everything was applied. + const update = existingObject.overrideWithStateObject(entry.stateObject); + // store updates to call subscription callbacks for all of them once the SYNC sequence is completed. + // this will ensure that clients get notified about the changes only once everything has been applied. existingObjectUpdates.push({ object: existingObject, update }); continue; } @@ -215,11 +208,11 @@ export class LiveObjects { const objectType = entry.objectType; switch (objectType) { case 'LiveCounter': - newObject = new LiveCounter(this, entry.created, entry.objectData, objectId, entry.siteTimeserials); + newObject = LiveCounter.fromStateObject(this, entry.stateObject); break; case 'LiveMap': - newObject = new LiveMap(this, entry.semantics, entry.objectData, objectId, entry.siteTimeserials); + newObject = LiveMap.fromStateObject(this, entry.stateObject); break; default: diff --git a/src/plugins/liveobjects/liveobjectspool.ts b/src/plugins/liveobjects/liveobjectspool.ts index 22f1edb94..a0ee92462 100644 --- a/src/plugins/liveobjects/liveobjectspool.ts +++ b/src/plugins/liveobjects/liveobjectspool.ts @@ -5,8 +5,7 @@ import { LiveMap } from './livemap'; import { LiveObject } from './liveobject'; import { LiveObjects } from './liveobjects'; import { ObjectId } from './objectid'; -import { MapSemantics, StateMessage, StateOperation, StateOperationAction } from './statemessage'; -import { DefaultTimeserial, Timeserial } from './timeserial'; +import { StateMessage, StateOperationAction } from './statemessage'; export const ROOT_OBJECT_ID = 'root'; @@ -60,7 +59,7 @@ export class LiveObjectsPool { } case 'counter': - zeroValueObject = LiveCounter.zeroValue(this._liveObjects, false, objectId); + zeroValueObject = LiveCounter.zeroValue(this._liveObjects, objectId); break; } @@ -79,35 +78,20 @@ export class LiveObjectsPool { continue; } - const opOriginTimeserial = DefaultTimeserial.calculateTimeserial(this._client, stateMessage.serial); const stateOperation = stateMessage.operation; switch (stateOperation.action) { case StateOperationAction.MAP_CREATE: case StateOperationAction.COUNTER_CREATE: - if (this.get(stateOperation.objectId)) { - // object wich such id already exists (we may have created a zero-value object before, or this is a duplicate *_CREATE op), - // so delegate application of the op to that object - this.get(stateOperation.objectId)!.applyOperation(stateOperation, stateMessage); - break; - } - - // otherwise we can create new objects in the pool - if (stateOperation.action === StateOperationAction.MAP_CREATE) { - this._handleMapCreate(stateOperation, opOriginTimeserial); - } - - if (stateOperation.action === StateOperationAction.COUNTER_CREATE) { - this._handleCounterCreate(stateOperation, opOriginTimeserial); - } - break; - case StateOperationAction.MAP_SET: case StateOperationAction.MAP_REMOVE: case StateOperationAction.COUNTER_INC: // we can receive an op for an object id we don't have yet in the pool. instead of buffering such operations, - // we create a zero-value object for the provided object id, and apply operation for that zero-value object. - // when we eventually receive a corresponding *_CREATE op for that object, its application will be handled by that zero-value object. + // we can create a zero-value object for the provided object id, and apply operation for that zero-value object. + // this also means that all objects are capable of applying the corresponding *_CREATE ops on themselves, + // since they need to ba able to eventually initialize themselves from that *_CREATE op. + // so to simplify operations handling, we always try to create a zero-value object in the pool first, + // and then we can always apply the operation on the existing object in the pool. this.createZeroValueObjectIfNotExists(stateOperation.objectId); this.get(stateOperation.objectId)!.applyOperation(stateOperation, stateMessage); break; @@ -129,49 +113,4 @@ export class LiveObjectsPool { pool.set(root.getObjectId(), root); return pool; } - - private _handleCounterCreate(stateOperation: StateOperation, opOriginTimeserial: Timeserial): void { - // should use op's origin timeserial as the initial value for the object's site timeserials vector - const siteTimeserials = { - [opOriginTimeserial.siteCode]: opOriginTimeserial, - }; - let counter: LiveCounter; - if (this._client.Utils.isNil(stateOperation.counter)) { - // if a counter object is missing for the COUNTER_CREATE op, the initial value is implicitly a zero-value counter. - counter = LiveCounter.zeroValue(this._liveObjects, true, stateOperation.objectId, siteTimeserials); - } else { - counter = new LiveCounter( - this._liveObjects, - true, - { data: stateOperation.counter.count ?? 0 }, - stateOperation.objectId, - siteTimeserials, - ); - } - - this.set(stateOperation.objectId, counter); - } - - private _handleMapCreate(stateOperation: StateOperation, opOriginTimeserial: Timeserial): void { - // should use op's origin timeserial as the initial value for the object's site timeserials vector - const siteTimeserials = { - [opOriginTimeserial.siteCode]: opOriginTimeserial, - }; - let map: LiveMap; - if (this._client.Utils.isNil(stateOperation.map)) { - // if a map object is missing for the MAP_CREATE op, the initial value is implicitly a zero-value map. - map = LiveMap.zeroValue(this._liveObjects, stateOperation.objectId, siteTimeserials); - } else { - const objectData = LiveMap.liveMapDataFromMapEntries(this._client, stateOperation.map.entries ?? {}); - map = new LiveMap( - this._liveObjects, - stateOperation.map.semantics ?? MapSemantics.LWW, - objectData, - stateOperation.objectId, - siteTimeserials, - ); - } - - this.set(stateOperation.objectId, map); - } } diff --git a/src/plugins/liveobjects/statemessage.ts b/src/plugins/liveobjects/statemessage.ts index 08bbb2fa1..f271ef59e 100644 --- a/src/plugins/liveobjects/statemessage.ts +++ b/src/plugins/liveobjects/statemessage.ts @@ -71,12 +71,6 @@ export interface StateMap { export interface StateCounter { /** The value of the counter */ count?: number; - /** - * Indicates (true) if the counter has seen an explicit create operation - * and false if the counter was created with a default value when - * processing a regular operation. - */ - created: boolean; } /** A StateOperation describes an operation to be applied to a state object. */ @@ -112,9 +106,21 @@ export interface StateObject { objectId: string; /** A vector of origin timeserials keyed by site code of the last operation that was applied to this state object. */ siteTimeserials: Record; - /** The data that represents the state of the object if it is a Map object type. */ + /** + * The operation that created the state object. + * + * Can be missing if create operation for the object is not known at this point. + */ + createOp?: StateOperation; + /** + * The data that represents the result of applying all operations to a Map object + * excluding the initial value from the create operation if it is a Map object type. + */ map?: StateMap; - /** The data that represents the state of the object if it is a Counter object type. */ + /** + * The data that represents the result of applying all operations to a Counter object + * excluding the initial value from the create operation if it is a Counter object type. + */ counter?: StateCounter; } @@ -148,6 +154,14 @@ export class StateMessage { await this._decodeMapEntries(message.object.map.entries, inputContext, decodeDataFn); } + if (message.object?.createOp?.map?.entries) { + await this._decodeMapEntries(message.object.createOp.map.entries, inputContext, decodeDataFn); + } + + if (message.object?.createOp?.mapOp?.data && 'value' in message.object.createOp.mapOp.data) { + await this._decodeStateData(message.object.createOp.mapOp.data, inputContext, decodeDataFn); + } + if (message.operation?.map?.entries) { await this._decodeMapEntries(message.operation.map.entries, inputContext, decodeDataFn); } @@ -240,6 +254,11 @@ export class StateMessage { }); } + if (stateObjectCopy.createOp) { + // use original "stateObject" object when encoding values, so we have access to original buffer values. + stateObjectCopy.createOp = this._encodeStateOperation(platform, stateObject.createOp!, withBase64Encoding); + } + return stateObjectCopy; } diff --git a/src/plugins/liveobjects/syncliveobjectsdatapool.ts b/src/plugins/liveobjects/syncliveobjectsdatapool.ts index 194c3165c..663a1bb21 100644 --- a/src/plugins/liveobjects/syncliveobjectsdatapool.ts +++ b/src/plugins/liveobjects/syncliveobjectsdatapool.ts @@ -1,30 +1,24 @@ import type BaseClient from 'common/lib/client/baseclient'; import type RealtimeChannel from 'common/lib/client/realtimechannel'; -import { LiveCounterData } from './livecounter'; -import { LiveMap } from './livemap'; -import { LiveObjectData } from './liveobject'; import { LiveObjects } from './liveobjects'; -import { MapSemantics, StateMessage, StateObject } from './statemessage'; -import { DefaultTimeserial, Timeserial } from './timeserial'; +import { StateMessage, StateObject } from './statemessage'; export interface LiveObjectDataEntry { - objectData: LiveObjectData; - siteTimeserials: Record; + stateObject: StateObject; objectType: 'LiveMap' | 'LiveCounter'; } export interface LiveCounterDataEntry extends LiveObjectDataEntry { - created: boolean; objectType: 'LiveCounter'; } export interface LiveMapDataEntry extends LiveObjectDataEntry { objectType: 'LiveMap'; - semantics: MapSemantics; } export type AnyDataEntry = LiveCounterDataEntry | LiveMapDataEntry; +// TODO: investigate if this class is still needed after changes with createOp. objects are now initialized from the stateObject and this class does minimal processing /** * @internal */ @@ -85,45 +79,20 @@ export class SyncLiveObjectsDataPool { } private _createLiveCounterDataEntry(stateObject: StateObject): LiveCounterDataEntry { - const counter = stateObject.counter!; - - const objectData: LiveCounterData = { - data: counter.count ?? 0, - }; const newEntry: LiveCounterDataEntry = { - objectData, + stateObject, objectType: 'LiveCounter', - siteTimeserials: this._timeserialMapFromStringMap(stateObject.siteTimeserials), - created: counter.created, }; return newEntry; } private _createLiveMapDataEntry(stateObject: StateObject): LiveMapDataEntry { - const map = stateObject.map!; - const objectData = LiveMap.liveMapDataFromMapEntries(this._client, map.entries ?? {}); - const newEntry: LiveMapDataEntry = { - objectData, + stateObject, objectType: 'LiveMap', - siteTimeserials: this._timeserialMapFromStringMap(stateObject.siteTimeserials), - semantics: map.semantics ?? MapSemantics.LWW, }; return newEntry; } - - private _timeserialMapFromStringMap(stringTimeserialsMap: Record): Record { - const objTimeserialsMap = Object.entries(stringTimeserialsMap).reduce( - (acc, v) => { - const [key, timeserialString] = v; - acc[key] = DefaultTimeserial.calculateTimeserial(this._client, timeserialString); - return acc; - }, - {} as Record, - ); - - return objTimeserialsMap; - } } diff --git a/test/common/modules/live_objects_helper.js b/test/common/modules/live_objects_helper.js index 9f8e98783..6fb99b02f 100644 --- a/test/common/modules/live_objects_helper.js +++ b/test/common/modules/live_objects_helper.js @@ -101,6 +101,9 @@ define(['ably', 'shared_helper', 'live_objects'], function (Ably, Helper, LiveOb action: ACTIONS.MAP_CREATE, nonce: nonce(), objectId, + map: { + semantics: 0, + }, }, }; @@ -175,31 +178,41 @@ define(['ably', 'shared_helper', 'live_objects'], function (Ably, Helper, LiveOb } mapObject(opts) { - const { objectId, siteTimeserials, entries } = opts; + const { objectId, siteTimeserials, initialEntries, materialisedEntries } = opts; const obj = { object: { objectId, siteTimeserials, - map: { entries }, + map: { + semantics: 0, + entries: materialisedEntries, + }, }, }; + if (initialEntries) { + obj.object.createOp = this.mapCreateOp({ objectId, entries: initialEntries }); + } + return obj; } counterObject(opts) { - const { objectId, siteTimeserials, count } = opts; + const { objectId, siteTimeserials, initialCount, materialisedCount } = opts; const obj = { object: { objectId, siteTimeserials, counter: { - created: true, - count, + count: materialisedCount, }, }, }; + if (initialCount != null) { + obj.object.createOp = this.counterCreateOp({ objectId, count: initialCount }); + } + return obj; } diff --git a/test/realtime/live_objects.test.js b/test/realtime/live_objects.test.js index 44c99bdaa..0b8011669 100644 --- a/test/realtime/live_objects.test.js +++ b/test/realtime/live_objects.test.js @@ -278,7 +278,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], liveObjectsHelper.mapObject({ objectId: 'root', siteTimeserials: { '000': '000@0-0' }, - entries: { key: { timeserial: '000@0-0', data: { value: 1 } } }, + initialEntries: { key: { timeserial: '000@0-0', data: { value: 1 } } }, }), ], }); @@ -1383,7 +1383,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], bbb: 'bbb@2-0', ccc: 'ccc@5-0', }, - entries: { + materialisedEntries: { foo1: { timeserial: 'bbb@0-0', data: { value: 'bar' } }, foo2: { timeserial: 'bbb@0-0', data: { value: 'bar' } }, foo3: { timeserial: 'ccc@5-0', data: { value: 'bar' } }, @@ -1399,13 +1399,13 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], siteTimeserials: { bbb: 'bbb@1-0', }, - count: 1, + initialCount: 1, }), // add objects to the root so they're discoverable in the state tree liveObjectsHelper.mapObject({ objectId: 'root', siteTimeserials: { '000': '000@0-0' }, - entries: { + initialEntries: { map: { timeserial: '000@0-0', data: { objectId: mapId } }, counter: { timeserial: '000@0-0', data: { objectId: counterId } }, },