From 74b34e006e71d93e5a4c8e74ba42b6498286b3ed Mon Sep 17 00:00:00 2001 From: Andrew Bulat Date: Wed, 4 Dec 2024 05:43:32 +0000 Subject: [PATCH 1/3] Move `applyStateMessages` to LiveObjects class --- src/plugins/liveobjects/liveobjects.ts | 47 ++++++++++++++++++++-- src/plugins/liveobjects/liveobjectspool.ts | 45 --------------------- 2 files changed, 44 insertions(+), 48 deletions(-) diff --git a/src/plugins/liveobjects/liveobjects.ts b/src/plugins/liveobjects/liveobjects.ts index 75b743d5b..aa064de78 100644 --- a/src/plugins/liveobjects/liveobjects.ts +++ b/src/plugins/liveobjects/liveobjects.ts @@ -6,7 +6,7 @@ import { LiveCounter } from './livecounter'; import { LiveMap } from './livemap'; import { LiveObject, LiveObjectUpdate } from './liveobject'; import { LiveObjectsPool, ROOT_OBJECT_ID } from './liveobjectspool'; -import { StateMessage } from './statemessage'; +import { StateMessage, StateOperationAction } from './statemessage'; import { SyncLiveObjectsDataPool } from './syncliveobjectsdatapool'; enum LiveObjectsEvents { @@ -101,7 +101,7 @@ export class LiveObjects { return; } - this._liveObjectsPool.applyStateMessages(stateMessages); + this._applyStateMessages(stateMessages); } /** @@ -159,7 +159,7 @@ export class LiveObjects { this._applySync(); // should apply buffered state operations after we applied the SYNC data. // can use regular state messages application logic - this._liveObjectsPool.applyStateMessages(this._bufferedStateOperations); + this._applyStateMessages(this._bufferedStateOperations); this._bufferedStateOperations = []; this._syncLiveObjectsDataPool.reset(); @@ -232,4 +232,45 @@ export class LiveObjects { // call subscription callbacks for all updated existing objects existingObjectUpdates.forEach(({ object, update }) => object.notifyUpdated(update)); } + + private _applyStateMessages(stateMessages: StateMessage[]): void { + for (const stateMessage of stateMessages) { + if (!stateMessage.operation) { + this._client.Logger.logAction( + this._client.logger, + this._client.Logger.LOG_MAJOR, + 'LiveObjects._applyStateMessages()', + `state operation message is received without 'operation' field, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`, + ); + continue; + } + + const stateOperation = stateMessage.operation; + + switch (stateOperation.action) { + case StateOperationAction.MAP_CREATE: + case StateOperationAction.COUNTER_CREATE: + case StateOperationAction.MAP_SET: + case StateOperationAction.MAP_REMOVE: + case StateOperationAction.COUNTER_INC: + // we can receive an op for an object id we don't have yet in the pool. instead of buffering such operations, + // we can create a zero-value object for the provided object id and apply the operation to that zero-value object. + // this also means that all objects are capable of applying the corresponding *_CREATE ops on themselves, + // since they need to be able to eventually initialize themselves from that *_CREATE op. + // so to simplify operations handling, we always try to create a zero-value object in the pool first, + // and then we can always apply the operation on the existing object in the pool. + this._liveObjectsPool.createZeroValueObjectIfNotExists(stateOperation.objectId); + this._liveObjectsPool.get(stateOperation.objectId)!.applyOperation(stateOperation, stateMessage); + break; + + default: + this._client.Logger.logAction( + this._client.logger, + this._client.Logger.LOG_MAJOR, + 'LiveObjects._applyStateMessages()', + `received unsupported action in state operation message: ${stateOperation.action}, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`, + ); + } + } + } } diff --git a/src/plugins/liveobjects/liveobjectspool.ts b/src/plugins/liveobjects/liveobjectspool.ts index 2c57f1084..eb42d47b4 100644 --- a/src/plugins/liveobjects/liveobjectspool.ts +++ b/src/plugins/liveobjects/liveobjectspool.ts @@ -1,11 +1,9 @@ import type BaseClient from 'common/lib/client/baseclient'; -import type RealtimeChannel from 'common/lib/client/realtimechannel'; import { LiveCounter } from './livecounter'; import { LiveMap } from './livemap'; import { LiveObject } from './liveobject'; import { LiveObjects } from './liveobjects'; import { ObjectId } from './objectid'; -import { StateMessage, StateOperationAction } from './statemessage'; export const ROOT_OBJECT_ID = 'root'; @@ -14,12 +12,10 @@ export const ROOT_OBJECT_ID = 'root'; */ export class LiveObjectsPool { private _client: BaseClient; - private _channel: RealtimeChannel; private _pool: Map; constructor(private _liveObjects: LiveObjects) { this._client = this._liveObjects.getClient(); - this._channel = this._liveObjects.getChannel(); this._pool = this._getInitialPool(); } @@ -66,47 +62,6 @@ export class LiveObjectsPool { this.set(objectId, zeroValueObject); } - applyStateMessages(stateMessages: StateMessage[]): void { - for (const stateMessage of stateMessages) { - if (!stateMessage.operation) { - this._client.Logger.logAction( - this._client.logger, - this._client.Logger.LOG_MAJOR, - 'LiveObjects.LiveObjectsPool.applyStateMessages()', - `state operation message is received without 'operation' field, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`, - ); - continue; - } - - const stateOperation = stateMessage.operation; - - switch (stateOperation.action) { - case StateOperationAction.MAP_CREATE: - case StateOperationAction.COUNTER_CREATE: - case StateOperationAction.MAP_SET: - case StateOperationAction.MAP_REMOVE: - case StateOperationAction.COUNTER_INC: - // we can receive an op for an object id we don't have yet in the pool. instead of buffering such operations, - // we can create a zero-value object for the provided object id and apply the operation to that zero-value object. - // this also means that all objects are capable of applying the corresponding *_CREATE ops on themselves, - // since they need to be able to eventually initialize themselves from that *_CREATE op. - // so to simplify operations handling, we always try to create a zero-value object in the pool first, - // and then we can always apply the operation on the existing object in the pool. - this.createZeroValueObjectIfNotExists(stateOperation.objectId); - this.get(stateOperation.objectId)!.applyOperation(stateOperation, stateMessage); - break; - - default: - this._client.Logger.logAction( - this._client.logger, - this._client.Logger.LOG_MAJOR, - 'LiveObjects.LiveObjectsPool.applyStateMessages()', - `received unsupported action in state operation message: ${stateOperation.action}, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`, - ); - } - } - } - private _getInitialPool(): Map { const pool = new Map(); const root = LiveMap.zeroValue(this._liveObjects, ROOT_OBJECT_ID); From 449ab52a163e98515db72a10e4aa12076058ed86 Mon Sep 17 00:00:00 2001 From: Andrew Bulat Date: Thu, 5 Dec 2024 04:26:15 +0000 Subject: [PATCH 2/3] Surface live objects to the end users only when they become valid Valid live objects are those for which the realtime client has seen the corresponding CREATE operation. Resolves DTP-1104 --- src/plugins/liveobjects/livecounter.ts | 4 +- src/plugins/liveobjects/livemap.ts | 88 +++++- src/plugins/liveobjects/liveobject.ts | 60 ++++ src/plugins/liveobjects/liveobjects.ts | 2 + src/plugins/liveobjects/liveobjectspool.ts | 4 + src/plugins/liveobjects/statemessage.ts | 7 + test/realtime/live_objects.test.js | 341 +++++++++++++++++++-- 7 files changed, 461 insertions(+), 45 deletions(-) diff --git a/src/plugins/liveobjects/livecounter.ts b/src/plugins/liveobjects/livecounter.ts index dc144c325..9cf085365 100644 --- a/src/plugins/liveobjects/livecounter.ts +++ b/src/plugins/liveobjects/livecounter.ts @@ -123,7 +123,7 @@ export class LiveCounter extends LiveObject const previousDataRef = this._dataRef; // override all relevant data for this object with data from the state object - this._createOperationIsMerged = false; + this._setCreateOperationIsMerged(false); this._dataRef = { data: stateObject.counter?.count ?? 0 }; // should default to empty map if site timeserials do not exist on the state object, so that any future operation can be applied to this object this._siteTimeserials = stateObject.siteTimeserials ?? {}; @@ -149,7 +149,7 @@ export class LiveCounter extends LiveObject // if we got here, it means that current counter instance is missing the initial value in its data reference, // which we're going to add now. this._dataRef.data += stateOperation.counter?.count ?? 0; - this._createOperationIsMerged = true; + this._setCreateOperationIsMerged(true); return { update: { inc: stateOperation.counter?.count ?? 0 } }; } diff --git a/src/plugins/liveobjects/livemap.ts b/src/plugins/liveobjects/livemap.ts index 1f3729c5f..e4b1200ef 100644 --- a/src/plugins/liveobjects/livemap.ts +++ b/src/plugins/liveobjects/livemap.ts @@ -1,7 +1,7 @@ import deepEqual from 'deep-equal'; import type * as API from '../../../ably'; -import { LiveObject, LiveObjectData, LiveObjectUpdate, LiveObjectUpdateNoop } from './liveobject'; +import { BufferedOperation, LiveObject, LiveObjectData, LiveObjectUpdate, LiveObjectUpdateNoop } from './liveobject'; import { LiveObjects } from './liveobjects'; import { MapSemantics, @@ -77,10 +77,12 @@ export class LiveMap extends LiveObject(key: TKey): T[TKey] { @@ -94,14 +96,26 @@ export class LiveMap extends LiveObject extends LiveObject extends LiveObject extends LiveObject extends LiveObject extends LiveObject { + try { + const update = this._applyMapSet(op, opOriginTimeserial); + this.notifyUpdated(update); + } catch (error) { + this._client.Logger.logAction( + this._client.logger, + this._client.Logger.LOG_ERROR, + `LiveMap._handleMapSetWithInvalidObjectReference()`, + `error applying buffered MAP_SET operation: ${this._client.Utils.inspectError(error)}`, + ); + } finally { + this._bufferedOperations.delete(bufferedOperation); + } + }); + + const bufferedOperation: BufferedOperation = { + cancel: () => off(), + }; + this._bufferedOperations.add(bufferedOperation); + } + private _applyMapCreate(op: StateOperation): LiveMapUpdate | LiveObjectUpdateNoop { if (this._createOperationIsMerged) { // There can't be two different create operation for the same object id, because the object id @@ -395,11 +458,6 @@ export class LiveMap extends LiveObject; protected _createOperationIsMerged: boolean; + protected _bufferedOperations: Set; protected constructor( protected _liveObjects: LiveObjects, @@ -51,6 +64,7 @@ export abstract class LiveObject< this._objectId = objectId; // use empty timeserials vector by default, so any future operation can be applied to this object this._siteTimeserials = {}; + this._bufferedOperations = new Set(); } subscribe(listener: (update: TUpdate) => void): SubscribeResponse { @@ -99,6 +113,42 @@ export abstract class LiveObject< this._eventEmitter.emit(LiveObjectEvents.Updated, update); } + /** + * Object is considered a "valid object" if we have seen the create operation for that object. + * + * Non-valid objects should be treated as though they don't exist from the perspective of the public API for the end users, + * i.e. the public access API that would return this object instead should return an `undefined`. In other words, non-valid + * objects are not surfaced to the end users and they're not able to interact with it. + * + * Once the create operation for the object has been seen and merged, the object becomes valid and can be exposed to the end users. + * + * @internal + */ + isValid(): boolean { + return this._createOperationIsMerged; + } + + /** + * @internal + */ + onceValid(listener: () => void): OnEventResponse { + this._eventEmitter.once(LiveObjectEvents.Valid, listener); + + const off = () => { + this._eventEmitter.off(LiveObjectEvents.Valid, listener); + }; + + return { off }; + } + + /** + * @internal + */ + cancelBufferedOperations(): void { + this._bufferedOperations.forEach((x) => x.cancel()); + this._bufferedOperations.clear(); + } + /** * Returns true if the given origin timeserial indicates that the operation to which it belongs should be applied to the object. * @@ -118,6 +168,16 @@ export abstract class LiveObject< return !siteTimeserial || opOriginTimeserial > siteTimeserial; } + protected _setCreateOperationIsMerged(createOperationIsMerged: boolean): void { + const shouldNotifyValid = + createOperationIsMerged === true && this._createOperationIsMerged !== createOperationIsMerged; + this._createOperationIsMerged = createOperationIsMerged; + + if (shouldNotifyValid) { + this._eventEmitter.emit(LiveObjectEvents.Valid); + } + } + private _createObjectId(): string { // TODO: implement object id generation based on live object type and initial value return Math.random().toString().substring(2); diff --git a/src/plugins/liveobjects/liveobjects.ts b/src/plugins/liveobjects/liveobjects.ts index aa064de78..f3b07e395 100644 --- a/src/plugins/liveobjects/liveobjects.ts +++ b/src/plugins/liveobjects/liveobjects.ts @@ -149,6 +149,8 @@ export class LiveObjects { private _startNewSync(syncId?: string, syncCursor?: string): void { // need to discard all buffered state operation messages on new sync start this._bufferedStateOperations = []; + // cancel any buffered operations for all objects in the pool, as we're overriding the current state and they will no longer be valid + this._liveObjectsPool.cancelBufferedOperations(); this._syncLiveObjectsDataPool.reset(); this._currentSyncId = syncId; this._currentSyncCursor = syncCursor; diff --git a/src/plugins/liveobjects/liveobjectspool.ts b/src/plugins/liveobjects/liveobjectspool.ts index eb42d47b4..0b1fec6cf 100644 --- a/src/plugins/liveobjects/liveobjectspool.ts +++ b/src/plugins/liveobjects/liveobjectspool.ts @@ -62,6 +62,10 @@ export class LiveObjectsPool { this.set(objectId, zeroValueObject); } + cancelBufferedOperations(): void { + this._pool.forEach((x) => x.cancelBufferedOperations()); + } + private _getInitialPool(): Map { const pool = new Map(); const root = LiveMap.zeroValue(this._liveObjects, ROOT_OBJECT_ID); diff --git a/src/plugins/liveobjects/statemessage.ts b/src/plugins/liveobjects/statemessage.ts index 74b385630..f6be7fa4b 100644 --- a/src/plugins/liveobjects/statemessage.ts +++ b/src/plugins/liveobjects/statemessage.ts @@ -318,6 +318,13 @@ export class StateMessage { }; } + /** + * Returns true if this state message is a state operation with `MAP_SET` action and it sets a map entry to point to another objectId. + */ + isMapSetWithObjectIdReference(): boolean { + return this.operation?.action === StateOperationAction.MAP_SET && this.operation.mapOp?.data?.objectId != null; + } + /** * Overload toJSON() to intercept JSON.stringify() * @return {*} diff --git a/test/realtime/live_objects.test.js b/test/realtime/live_objects.test.js index 01eb376fc..b480f0d0f 100644 --- a/test/realtime/live_objects.test.js +++ b/test/realtime/live_objects.test.js @@ -704,28 +704,33 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], } // check only operations with correct timeserials were applied - const expectedMapValues = [ - { foo: 'bar' }, - { foo: 'bar' }, - { foo: 'bar', baz: 'qux' }, // applied MAP_CREATE - { foo: 'bar', baz: 'qux' }, // applied MAP_CREATE - { foo: 'bar', baz: 'qux' }, // applied MAP_CREATE + const expectedMaps = [ + { exists: false }, // MAP_CREATE not applied, object is not valid and we should get undefined + { exists: false }, // MAP_CREATE not applied, object is not valid and we should get undefined + { exists: true, data: { foo: 'bar', baz: 'qux' } }, // applied MAP_CREATE, object is valid + { exists: true, data: { foo: 'bar', baz: 'qux' } }, // applied MAP_CREATE, object is valid + { exists: true, data: { foo: 'bar', baz: 'qux' } }, // applied MAP_CREATE, object is valid ]; for (const [i, mapId] of mapIds.entries()) { - const expectedMapValue = expectedMapValues[i]; - const expectedKeysCount = Object.keys(expectedMapValue).length; + const expectedMap = expectedMaps[i]; + if (!expectedMap.exists) { + expect(root.get(mapId), `Check map #${i + 1} does not exist on root as MAP_CREATE op was not applied`) + .to.not.exist; + } else { + const expectedKeysCount = Object.keys(expectedMap.data).length; - expect(root.get(mapId).size()).to.equal( - expectedKeysCount, - `Check map #${i + 1} has expected number of keys after MAP_CREATE ops`, - ); - Object.entries(expectedMapValue).forEach(([key, value]) => { - expect(root.get(mapId).get(key)).to.equal( - value, - `Check map #${i + 1} has expected value for "${key}" key after MAP_CREATE ops`, + expect(root.get(mapId).size()).to.equal( + expectedKeysCount, + `Check map #${i + 1} has expected number of keys after MAP_CREATE ops`, ); - }); + Object.entries(expectedMap.data).forEach(([key, value]) => { + expect(root.get(mapId).get(key)).to.equal( + value, + `Check map #${i + 1} has expected value for "${key}" key after MAP_CREATE ops`, + ); + }); + } } }, }, @@ -895,6 +900,278 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], }, }, + { + description: 'MAP_SET with reference to an invalid object is buffered until object becomes valid', + action: async (ctx) => { + const { root, liveObjectsHelper, channel } = ctx; + + const mapId = liveObjectsHelper.fakeMapObjectId(); + const counterId = liveObjectsHelper.fakeCounterObjectId(); + // MAP_SET on root to non-valid object + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 0, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: 'map', data: { objectId: mapId } })], + }); + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 1, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: 'counter', data: { objectId: counterId } })], + }); + + expect(root.get('map'), 'Check map does not exist on root until map is valid').to.not.exist; + expect(root.get('counter'), 'Check counter does not exist on root until counter is valid').to.not.exist; + + // send CREATE ops which should make objects valid and add them to the root + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 2, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.mapCreateOp({ objectId: mapId })], + }); + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 3, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.counterCreateOp({ objectId: counterId })], + }); + + expect( + root.get('map'), + 'Check map exists on root after MAP_CREATE was seen and buffered MAP_SET op was applied', + ).to.exist; + expect( + root.get('counter'), + 'Check counter exists on root after COUNTER_CREATE was seen and buffered MAP_SET op was applied', + ).to.exist; + }, + }, + + { + description: + 'MAP_SET with reference to an invalid object does not update the existing key until object becomes valid', + action: async (ctx) => { + const { root, liveObjectsHelper, channel } = ctx; + + // set some initial value for a key on root + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 0, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: 'foo', data: { value: 1 } })], + }); + + const counterId = liveObjectsHelper.fakeCounterObjectId(); + // MAP_SET same key with non-valid object reference + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 1, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: 'foo', data: { objectId: counterId } })], + }); + + expect(root.get('foo')).to.equal( + 1, + 'Check key "foo" was not updated by MAP_SET op with reference to an invalid object', + ); + + // send CREATE ops which should make objects valid and add them to the root + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 2, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.counterCreateOp({ objectId: counterId })], + }); + + expectInstanceOf( + root.get('foo'), + 'LiveCounter', + 'Check key "foo" was updated by buffered MAP_SET op once the referenced object became valid', + ); + }, + }, + + { + description: 'MAP_SET with reference to an invalid object triggers subscription callback only when applied', + action: async (ctx) => { + const { root, liveObjectsHelper, channel } = ctx; + + // subscribe to updates on root. should only proc once CREATE ops are received for referenced objects + let subscribeCallbackCalledCount = 0; + const keyUpdated = { + map: false, + counter: false, + }; + root.subscribe(({ update }) => { + subscribeCallbackCalledCount++; + Object.keys(update).forEach((x) => (keyUpdated[x] = true)); + }); + + const mapId = liveObjectsHelper.fakeMapObjectId(); + const counterId = liveObjectsHelper.fakeCounterObjectId(); + // MAP_SET on root to non-valid object + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 0, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: 'map', data: { objectId: mapId } })], + }); + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 1, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: 'counter', data: { objectId: counterId } })], + }); + + expect(subscribeCallbackCalledCount).to.equal( + 0, + `Check subscription callback on root wasn't called for MAP_SET operations with invalid objects`, + ); + expect(keyUpdated.map).to.equal( + false, + 'Check "map" key was not updated via a subscription callback on root', + ); + expect(keyUpdated.counter).to.equal( + false, + 'Check "counter" key was not updated via a subscription callback on root', + ); + + // send CREATE ops which should make objects valid and add them to the root + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 2, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.mapCreateOp({ objectId: mapId })], + }); + + expect(subscribeCallbackCalledCount).to.equal( + 1, + `Check subscription callback for root is called correct number of times once MAP_SET ops are applied for valid objects`, + ); + expect(keyUpdated.map).to.equal(true, 'Check "map" key was updated via a subscription callback on root'); + + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 3, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.counterCreateOp({ objectId: counterId })], + }); + + expect(subscribeCallbackCalledCount).to.equal( + 2, + `Check subscription callback for root is called correct number of times once MAP_SET ops are applied for valid objects`, + ); + expect(keyUpdated.counter).to.equal( + true, + 'Check "counter" key was updated via a subscription callback on root', + ); + }, + }, + + { + description: + 'MAP_SET with reference to an invalid object is applied once object becomes valid even if site timeserials have updated', + action: async (ctx) => { + const { root, liveObjectsHelper, channel } = ctx; + + const counterId = liveObjectsHelper.fakeCounterObjectId(); + // MAP_SET on root to non-valid object + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 0, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: 'counter', data: { objectId: counterId } })], + }); + + // send another MAP_SET on root with higher timeserial than buffered MAP_SET + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 99, 0), // higher timeserial than buffered MAP_SET above + siteCode: 'aaa', + state: [ + liveObjectsHelper.mapSetOp({ + objectId: 'root', + key: 'otherKey', + data: { value: 1 }, + }), + ], + }); + + expect(root.get('otherKey')).to.equal( + 1, + 'Check another key was updated on root while a MAP_SET operation is buffered', + ); + expect(root.get('counter'), 'Check counter does not exist on root until counter is valid').to.not.exist; + + // send CREATE op which should make objects valid and add them to the root + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('bbb', 0, 0), + siteCode: 'bbb', + state: [liveObjectsHelper.counterCreateOp({ objectId: counterId })], + }); + + expect( + root.get('counter'), + 'Check counter exists on root after COUNTER_CREATE was seen and buffered MAP_SET op was applied', + ).to.exist; + }, + }, + + { + description: + 'buffered MAP_SET with reference to an invalid object is discarded when new STATE_SYNC sequence starts', + action: async (ctx) => { + const { root, liveObjectsHelper, channel } = ctx; + + const counterId = liveObjectsHelper.fakeCounterObjectId(); + // MAP_SET on root to non-valid object + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 0, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: 'counter', data: { objectId: counterId } })], + }); + + expect(root.get('counter'), 'Check counter does not exist on root as counter is not valid').to.not.exist; + + // inject STATE_SYNC message with empty serial so it is ended immediately + await liveObjectsHelper.processStateObjectMessageOnChannel({ + channel, + syncSerial: 'serial:', + }); + + // send COUNTER_CREATE op and set it on another key on root. only this new MAP_SET op should be applied + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 1, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.counterCreateOp({ objectId: counterId })], + }); + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 2, 0), + siteCode: 'aaa', + state: [ + liveObjectsHelper.mapSetOp({ + objectId: 'root', + key: 'anotherCounterKey', + data: { objectId: counterId }, + }), + ], + }); + + expect( + root.get('counter'), + 'Check MAP_SET for "counter" key was discarded on new STATE_SYNC sequence and not applied on root even when counter became valid', + ).to.not.exist; + expect(root.get('anotherCounterKey'), 'Check valid counter was set on "anotherCounterKey" key on root').to + .exist; + }, + }, + { description: 'can apply MAP_REMOVE state operation messages', action: async (ctx) => { @@ -1126,21 +1403,28 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], } // check only operations with correct timeserials were applied - const expectedCounterValues = [ - 1, - 1, - 11, // applied COUNTER_CREATE - 11, // applied COUNTER_CREATE - 11, // applied COUNTER_CREATE + const expectedCounters = [ + { exists: false }, // COUNTER_CREATE not applied, object is not valid and we should get undefined + { exists: false }, // COUNTER_CREATE not applied, object is not valid and we should get undefined + { exists: true, value: 11 }, // applied COUNTER_CREATE + { exists: true, value: 11 }, // applied COUNTER_CREATE + { exists: true, value: 11 }, // applied COUNTER_CREATE ]; for (const [i, counterId] of counterIds.entries()) { - const expectedValue = expectedCounterValues[i]; + const expectedCounter = expectedCounters[i]; - expect(root.get(counterId).value()).to.equal( - expectedValue, - `Check counter #${i + 1} has expected value after COUNTER_CREATE ops`, - ); + if (!expectedCounter.exists) { + expect( + root.get(counterId), + `Check counter #${i + 1} does not exist on root as COUNTER_CREATE op was not applied`, + ).to.not.exist; + } else { + expect(root.get(counterId).value()).to.equal( + expectedCounter.value, + `Check counter #${i + 1} has expected value after COUNTER_CREATE ops`, + ); + } } }, }, @@ -1426,6 +1710,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], bbb: lexicoTimeserial('bbb', 2, 0), ccc: lexicoTimeserial('ccc', 5, 0), }, + initialEntries: {}, materialisedEntries: { foo1: { timeserial: lexicoTimeserial('bbb', 0, 0), data: { value: 'bar' } }, foo2: { timeserial: lexicoTimeserial('bbb', 0, 0), data: { value: 'bar' } }, From 6ef073dc6c7e0fcd2de8a55d1f244bc8bb2ad07d Mon Sep 17 00:00:00 2001 From: Andrew Bulat Date: Thu, 5 Dec 2024 06:20:38 +0000 Subject: [PATCH 3/3] Update `LiveMap.get` to always return `undefined` value as an option for keys that reference other LiveObjects Referenced live objects can be not valid (haven't seen a create op) and we should not surface such objects to the end user and return `undefined` instead. --- ably.d.ts | 6 +++--- src/plugins/liveobjects/livemap.ts | 2 +- test/package/browser/template/src/ably.config.d.ts | 4 ++-- test/package/browser/template/src/index-liveobjects.ts | 4 +++- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/ably.d.ts b/ably.d.ts index 5d1aa1f82..938de0d7c 100644 --- a/ably.d.ts +++ b/ably.d.ts @@ -2103,12 +2103,12 @@ export type DefaultRoot = */ export declare interface LiveMap extends LiveObject { /** - * Returns the value associated with a given key. Returns `undefined` if the key doesn't exist in a map. + * Returns the value associated with a given key. Returns `undefined` if the key doesn't exist in a map or if the associated {@link LiveObject} doesn't exist. * * @param key - The key to retrieve the value for. - * @returns A {@link LiveObject}, a primitive type (string, number, boolean, or binary data) or `undefined` if the key doesn't exist in a map. + * @returns A {@link LiveObject}, a primitive type (string, number, boolean, or binary data) or `undefined` if the key doesn't exist in a map or the associated {@link LiveObject} doesn't exist. */ - get(key: TKey): T[TKey]; + get(key: TKey): T[TKey] extends StateValue ? T[TKey] : T[TKey] | undefined; /** * Returns the number of key/value pairs in the map. diff --git a/src/plugins/liveobjects/livemap.ts b/src/plugins/liveobjects/livemap.ts index e4b1200ef..43eef94f6 100644 --- a/src/plugins/liveobjects/livemap.ts +++ b/src/plugins/liveobjects/livemap.ts @@ -85,7 +85,7 @@ export class LiveMap extends LiveObject(key: TKey): T[TKey] { + get(key: TKey): T[TKey] extends StateValue ? T[TKey] : T[TKey] | undefined { const element = this._dataRef.data.get(key); if (element === undefined) { diff --git a/test/package/browser/template/src/ably.config.d.ts b/test/package/browser/template/src/ably.config.d.ts index e5bca7718..3b3c69ddb 100644 --- a/test/package/browser/template/src/ably.config.d.ts +++ b/test/package/browser/template/src/ably.config.d.ts @@ -5,13 +5,13 @@ type CustomRoot = { stringKey: string; booleanKey: boolean; couldBeUndefined?: string; - mapKey?: LiveMap<{ + mapKey: LiveMap<{ foo: 'bar'; nestedMap?: LiveMap<{ baz: 'qux'; }>; }>; - counterKey?: LiveCounter; + counterKey: LiveCounter; }; declare global { diff --git a/test/package/browser/template/src/index-liveobjects.ts b/test/package/browser/template/src/index-liveobjects.ts index 1cd27b021..6bd7b3547 100644 --- a/test/package/browser/template/src/index-liveobjects.ts +++ b/test/package/browser/template/src/index-liveobjects.ts @@ -30,8 +30,10 @@ globalThis.testAblyPackage = async function () { const aBoolean: boolean = root.get('booleanKey'); const couldBeUndefined: string | undefined = root.get('couldBeUndefined'); // live objects on a root: + // LiveMap.get can still return undefined for LiveObject typed properties even if custom typings have them as non-optional. + // objects can be non-valid and result in the undefined value const counter: Ably.LiveCounter | undefined = root.get('counterKey'); - const map: LiveObjectsTypes['root']['mapKey'] = root.get('mapKey'); + const map: LiveObjectsTypes['root']['mapKey'] | undefined = root.get('mapKey'); // check string literal types works // need to use nullish coalescing as we didn't actually create any data on the root, // so the next calls would fail. we only need to check that TypeScript types work