From 8c6d3b04fc34509f56299dc71f3aff9b747b8020 Mon Sep 17 00:00:00 2001 From: Andrew Bulat Date: Tue, 5 Nov 2024 04:25:35 +0000 Subject: [PATCH] Implement direct object subscription for Live Objects - subscription callback is invoked when the live object data is updated via incoming state operation - subscription callback is invoked when the live object data is updated via a sync sequence (once sync sequence is applied to all objects) - update object is passed to a callback function that describes a granular update made to the live object Resolves DTP-958 --- src/plugins/liveobjects/livecounter.ts | 32 +++++-- src/plugins/liveobjects/livemap.ts | 103 ++++++++++++++++++--- src/plugins/liveobjects/liveobject.ts | 72 +++++++++++++- src/plugins/liveobjects/liveobjects.ts | 11 ++- src/plugins/liveobjects/liveobjectspool.ts | 2 - 5 files changed, 190 insertions(+), 30 deletions(-) diff --git a/src/plugins/liveobjects/livecounter.ts b/src/plugins/liveobjects/livecounter.ts index 61d17d9a2..6f4f8e58f 100644 --- a/src/plugins/liveobjects/livecounter.ts +++ b/src/plugins/liveobjects/livecounter.ts @@ -1,4 +1,4 @@ -import { LiveObject, LiveObjectData } from './liveobject'; +import { LiveObject, LiveObjectData, LiveObjectUpdate, LiveObjectUpdateNoop } from './liveobject'; import { LiveObjects } from './liveobjects'; import { StateCounter, StateCounterOp, StateMessage, StateOperation, StateOperationAction } from './statemessage'; import { Timeserial } from './timeserial'; @@ -7,7 +7,11 @@ export interface LiveCounterData extends LiveObjectData { data: number; } -export class LiveCounter extends LiveObject { +export interface LiveCounterUpdate extends LiveObjectUpdate { + update: { inc: number }; +} + +export class LiveCounter extends LiveObject { constructor( liveObjects: LiveObjects, private _created: boolean, @@ -62,16 +66,19 @@ export class LiveCounter extends LiveObject { ); } + let update: LiveCounterUpdate | LiveObjectUpdateNoop; switch (op.action) { case StateOperationAction.COUNTER_CREATE: - this._applyCounterCreate(op.counter); + update = this._applyCounterCreate(op.counter); break; case StateOperationAction.COUNTER_INC: if (this._client.Utils.isNil(op.counterOp)) { this._throwNoPayloadError(op); + // leave an explicit return here, so that TS knows that update object is always set after the switch statement. + return; } else { - this._applyCounterInc(op.counterOp); + update = this._applyCounterInc(op.counterOp); } break; @@ -84,12 +91,18 @@ export class LiveCounter extends LiveObject { } this.setRegionalTimeserial(opRegionalTimeserial); + this.notifyUpdated(update); } protected _getZeroValueData(): LiveCounterData { return { data: 0 }; } + protected _updateFromDataDiff(newDataRef: LiveCounterData): LiveCounterUpdate { + const counterDiff = newDataRef.data - this._dataRef.data; + return { update: { inc: counterDiff } }; + } + private _throwNoPayloadError(op: StateOperation): void { throw new this._client.ErrorInfo( `No payload found for ${op.action} op for LiveCounter objectId=${this.getObjectId()}`, @@ -98,7 +111,7 @@ export class LiveCounter extends LiveObject { ); } - private _applyCounterCreate(op: StateCounter | undefined): void { + private _applyCounterCreate(op: StateCounter | undefined): LiveCounterUpdate | LiveObjectUpdateNoop { if (this.isCreated()) { // skip COUNTER_CREATE op if this counter is already created this._client.Logger.logAction( @@ -107,14 +120,14 @@ export class LiveCounter extends LiveObject { 'LiveCounter._applyCounterCreate()', `skipping applying COUNTER_CREATE op on a counter instance as it is already created; objectId=${this._objectId}`, ); - return; + return { noop: true }; } if (this._client.Utils.isNil(op)) { // if a counter object is missing for the COUNTER_CREATE op, the initial value is implicitly 0 in this case. // we need to SUM the initial value to the current value due to the reasons below, but since it's a 0, we can skip addition operation this.setCreated(true); - return; + return { update: { inc: 0 } }; } // note that it is intentional to SUM the incoming count from the create op. @@ -122,9 +135,12 @@ export class LiveCounter extends LiveObject { // so it is missing the initial value that we're going to add now. this._dataRef.data += op.count ?? 0; this.setCreated(true); + + return { update: { inc: op.count ?? 0 } }; } - private _applyCounterInc(op: StateCounterOp): void { + private _applyCounterInc(op: StateCounterOp): LiveCounterUpdate { this._dataRef.data += op.amount; + return { update: { inc: op.amount } }; } } diff --git a/src/plugins/liveobjects/livemap.ts b/src/plugins/liveobjects/livemap.ts index b0427a9a0..1bd5ec705 100644 --- a/src/plugins/liveobjects/livemap.ts +++ b/src/plugins/liveobjects/livemap.ts @@ -1,5 +1,7 @@ +import deepEqual from 'deep-equal'; + import type BaseClient from 'common/lib/client/baseclient'; -import { LiveObject, LiveObjectData } from './liveobject'; +import { LiveObject, LiveObjectData, LiveObjectUpdate, LiveObjectUpdateNoop } from './liveobject'; import { LiveObjects } from './liveobjects'; import { MapSemantics, @@ -40,7 +42,11 @@ export interface LiveMapData extends LiveObjectData { data: Map; } -export class LiveMap extends LiveObject { +export interface LiveMapUpdate extends LiveObjectUpdate { + update: { [keyName: string]: 'updated' | 'deleted' }; +} + +export class LiveMap extends LiveObject { constructor( liveObjects: LiveObjects, private _semantics: MapSemantics, @@ -144,24 +150,29 @@ export class LiveMap extends LiveObject { ); } + let update: LiveMapUpdate | LiveObjectUpdateNoop; switch (op.action) { case StateOperationAction.MAP_CREATE: - this._applyMapCreate(op.map); + update = this._applyMapCreate(op.map); break; case StateOperationAction.MAP_SET: if (this._client.Utils.isNil(op.mapOp)) { this._throwNoPayloadError(op); + // leave an explicit return here, so that TS knows that update object is always set after the switch statement. + return; } else { - this._applyMapSet(op.mapOp, DefaultTimeserial.calculateTimeserial(this._client, msg.serial)); + update = this._applyMapSet(op.mapOp, DefaultTimeserial.calculateTimeserial(this._client, msg.serial)); } break; case StateOperationAction.MAP_REMOVE: if (this._client.Utils.isNil(op.mapOp)) { this._throwNoPayloadError(op); + // leave an explicit return here, so that TS knows that update object is always set after the switch statement. + return; } else { - this._applyMapRemove(op.mapOp, DefaultTimeserial.calculateTimeserial(this._client, msg.serial)); + update = this._applyMapRemove(op.mapOp, DefaultTimeserial.calculateTimeserial(this._client, msg.serial)); } break; @@ -174,12 +185,63 @@ export class LiveMap extends LiveObject { } this.setRegionalTimeserial(opRegionalTimeserial); + this.notifyUpdated(update); } protected _getZeroValueData(): LiveMapData { return { data: new Map() }; } + protected _updateFromDataDiff(newDataRef: LiveMapData): LiveMapUpdate { + const update: LiveMapUpdate = { update: {} }; + + for (const [key, currentEntry] of this._dataRef.data.entries()) { + // any non-tombstoned properties that exist on a current map, but not in the new data - got deleted + if (currentEntry.tombstone === false && !newDataRef.data.has(key)) { + update.update[key] = 'deleted'; + } + } + + for (const [key, newEntry] of newDataRef.data.entries()) { + if (!this._dataRef.data.has(key)) { + // if property does not exist in the current map, but new data has it as a non-tombstoned property - got updated + if (newEntry.tombstone === false) { + update.update[key] = 'updated'; + continue; + } + + // otherwise, if new data has this prop tombstoned - do nothing, as property didn't exist anyway + if (newEntry.tombstone === true) { + continue; + } + } + + // properties that exist both in current and new map data need to have their values compared to decide on the update type + const currentEntry = this._dataRef.data.get(key)!; + + // compare tombstones first + if (currentEntry.tombstone === true && newEntry.tombstone === false) { + // current prop is tombstoned, but new is not. it means prop was updated to a meaningful value + update.update[key] = 'updated'; + continue; + } + if (currentEntry.tombstone === false && newEntry.tombstone === true) { + // current prop is not tombstoned, but new is. it means prop was deleted + update.update[key] = 'deleted'; + continue; + } + + // if tombstones are matching, need to compare values with deep equals to see if it was changed + const valueChanged = !deepEqual(currentEntry.data, newEntry.data, { strict: true }); + if (valueChanged) { + update.update[key] = 'updated'; + continue; + } + } + + return update; + } + private _throwNoPayloadError(op: StateOperation): void { throw new this._client.ErrorInfo( `No payload found for ${op.action} op for LiveMap objectId=${this.getObjectId()}`, @@ -188,11 +250,11 @@ export class LiveMap extends LiveObject { ); } - private _applyMapCreate(op: StateMap | undefined): void { + private _applyMapCreate(op: StateMap | undefined): LiveMapUpdate | LiveObjectUpdateNoop { if (this._client.Utils.isNil(op)) { // if a map object is missing for the MAP_CREATE op, the initial value is implicitly an empty map. // in this case there is nothing to merge into the current map, so we can just end processing the op. - return; + return { update: {} }; } if (this._semantics !== op.semantics) { @@ -203,6 +265,7 @@ export class LiveMap extends LiveObject { ); } + const aggregatedUpdate: LiveMapUpdate | LiveObjectUpdateNoop = { update: {} }; // in order to apply MAP_CREATE op for an existing map, we should merge their underlying entries keys. // we can do this by iterating over entries from MAP_CREATE op and apply changes on per-key basis as if we had MAP_SET, MAP_REMOVE operations. Object.entries(op.entries ?? {}).forEach(([key, entry]) => { @@ -210,17 +273,28 @@ export class LiveMap extends LiveObject { const opOriginTimeserial = entry.timeserial ? DefaultTimeserial.calculateTimeserial(this._client, entry.timeserial) : DefaultTimeserial.zeroValueTimeserial(this._client); + let update: LiveMapUpdate | LiveObjectUpdateNoop; if (entry.tombstone === true) { // entry in MAP_CREATE op is deleted, try to apply MAP_REMOVE op - this._applyMapRemove({ key }, opOriginTimeserial); + update = this._applyMapRemove({ key }, opOriginTimeserial); } else { // entry in MAP_CREATE op is not deleted, try to set it via MAP_SET op - this._applyMapSet({ key, data: entry.data }, opOriginTimeserial); + update = this._applyMapSet({ key, data: entry.data }, opOriginTimeserial); + } + + // skip noop updates + if ((update as LiveObjectUpdateNoop).noop) { + return; } + + // otherwise copy update data to aggregated update + Object.assign(aggregatedUpdate.update, update.update); }); + + return aggregatedUpdate; } - private _applyMapSet(op: StateMapOp, opOriginTimeserial: Timeserial): void { + private _applyMapSet(op: StateMapOp, opOriginTimeserial: Timeserial): LiveMapUpdate | LiveObjectUpdateNoop { const { ErrorInfo, Utils } = this._client; const existingEntry = this._dataRef.data.get(op.key); @@ -235,7 +309,7 @@ export class LiveMap extends LiveObject { 'LiveMap._applyMapSet()', `skipping update for key="${op.key}": op timeserial ${opOriginTimeserial.toString()} <= entry timeserial ${existingEntry.timeserial.toString()}; objectId=${this._objectId}`, ); - return; + return { noop: true }; } if (Utils.isNil(op.data) || (Utils.isNil(op.data.value) && Utils.isNil(op.data.objectId))) { @@ -270,9 +344,10 @@ export class LiveMap extends LiveObject { }; this._dataRef.data.set(op.key, newEntry); } + return { update: { [op.key]: 'updated' } }; } - private _applyMapRemove(op: StateMapOp, opOriginTimeserial: Timeserial): void { + private _applyMapRemove(op: StateMapOp, opOriginTimeserial: Timeserial): LiveMapUpdate | LiveObjectUpdateNoop { const existingEntry = this._dataRef.data.get(op.key); if ( existingEntry && @@ -285,7 +360,7 @@ export class LiveMap extends LiveObject { 'LiveMap._applyMapRemove()', `skipping remove for key="${op.key}": op timeserial ${opOriginTimeserial.toString()} <= entry timeserial ${existingEntry.timeserial.toString()}; objectId=${this._objectId}`, ); - return; + return { noop: true }; } if (existingEntry) { @@ -300,5 +375,7 @@ export class LiveMap extends LiveObject { }; this._dataRef.data.set(op.key, newEntry); } + + return { update: { [op.key]: 'deleted' } }; } } diff --git a/src/plugins/liveobjects/liveobject.ts b/src/plugins/liveobjects/liveobject.ts index 70a294779..6682332b9 100644 --- a/src/plugins/liveobjects/liveobject.ts +++ b/src/plugins/liveobjects/liveobject.ts @@ -1,31 +1,76 @@ import type BaseClient from 'common/lib/client/baseclient'; +import type EventEmitter from 'common/lib/util/eventemitter'; import { LiveObjects } from './liveobjects'; import { StateMessage, StateOperation } from './statemessage'; import { DefaultTimeserial, Timeserial } from './timeserial'; +enum LiveObjectEvents { + Updated = 'Updated', +} + export interface LiveObjectData { data: any; } -export abstract class LiveObject { +export interface LiveObjectUpdate { + update: any; +} + +export interface LiveObjectUpdateNoop { + // have optional update field with undefined type so it's not possible to create a noop object with a meaningful update property. + update?: undefined; + noop: true; +} + +export interface SubscribeResponse { + unsubscribe(): void; +} + +export abstract class LiveObject< + TData extends LiveObjectData = LiveObjectData, + TUpdate extends LiveObjectUpdate = LiveObjectUpdate, +> { protected _client: BaseClient; - protected _dataRef: T; + protected _eventEmitter: EventEmitter; + protected _dataRef: TData; protected _objectId: string; protected _regionalTimeserial: Timeserial; constructor( protected _liveObjects: LiveObjects, - initialData?: T | null, + initialData?: TData | null, objectId?: string, regionalTimeserial?: Timeserial, ) { this._client = this._liveObjects.getClient(); + this._eventEmitter = new this._client.EventEmitter(this._client.logger); this._dataRef = initialData ?? this._getZeroValueData(); this._objectId = objectId ?? this._createObjectId(); // use zero value timeserial by default, so any future operation can be applied for this object this._regionalTimeserial = regionalTimeserial ?? DefaultTimeserial.zeroValueTimeserial(this._client); } + subscribe(listener: (update: TUpdate) => void): SubscribeResponse { + this._eventEmitter.on(LiveObjectEvents.Updated, listener); + + const unsubscribe = () => { + this._eventEmitter.off(LiveObjectEvents.Updated, listener); + }; + + return { unsubscribe }; + } + + unsubscribe(listener: (update: TUpdate) => void): void { + // current implementation of the EventEmitter will remove all listeners if .off is called without arguments or with nullish arguments. + // or when called with just an event argument, it will remove all listeners for the event. + // thus we need to check that listener does actually exist before calling .off. + if (this._client.Utils.isNil(listener)) { + return; + } + + this._eventEmitter.off(LiveObjectEvents.Updated, listener); + } + /** * @internal */ @@ -41,10 +86,14 @@ export abstract class LiveObject { } /** + * Sets a new data reference for the LiveObject and returns an update object that describes the changes applied based on the object's previous value. + * * @internal */ - setData(newDataRef: T): void { + setData(newDataRef: TData): TUpdate { + const update = this._updateFromDataDiff(newDataRef); this._dataRef = newDataRef; + return update; } /** @@ -54,6 +103,18 @@ export abstract class LiveObject { this._regionalTimeserial = regionalTimeserial; } + /** + * @internal + */ + notifyUpdated(update: TUpdate | LiveObjectUpdateNoop): void { + // should not emit update event if update was noop + if ((update as LiveObjectUpdateNoop).noop) { + return; + } + + this._eventEmitter.emit(LiveObjectEvents.Updated, update); + } + private _createObjectId(): string { // TODO: implement object id generation based on live object type and initial value return Math.random().toString().substring(2); @@ -63,5 +124,6 @@ export abstract class LiveObject { * @internal */ abstract applyOperation(op: StateOperation, msg: StateMessage, opRegionalTimeserial: Timeserial): void; - protected abstract _getZeroValueData(): T; + protected abstract _getZeroValueData(): TData; + protected abstract _updateFromDataDiff(newDataRef: TData): TUpdate; } diff --git a/src/plugins/liveobjects/liveobjects.ts b/src/plugins/liveobjects/liveobjects.ts index f541a56e0..6ff6b2239 100644 --- a/src/plugins/liveobjects/liveobjects.ts +++ b/src/plugins/liveobjects/liveobjects.ts @@ -4,7 +4,7 @@ import type EventEmitter from 'common/lib/util/eventemitter'; import type * as API from '../../../ably'; import { LiveCounter } from './livecounter'; import { LiveMap } from './livemap'; -import { LiveObject } from './liveobject'; +import { LiveObject, LiveObjectUpdate } from './liveobject'; import { LiveObjectsPool, ROOT_OBJECT_ID } from './liveobjectspool'; import { StateMessage } from './statemessage'; import { LiveCounterDataEntry, SyncLiveObjectsDataPool } from './syncliveobjectsdatapool'; @@ -197,6 +197,7 @@ export class LiveObjects { } const receivedObjectIds = new Set(); + const existingObjectUpdates: { object: LiveObject; update: LiveObjectUpdate }[] = []; for (const [objectId, entry] of this._syncLiveObjectsDataPool.entries()) { receivedObjectIds.add(objectId); @@ -204,11 +205,14 @@ export class LiveObjects { const regionalTimeserialObj = DefaultTimeserial.calculateTimeserial(this._client, entry.regionalTimeserial); if (existingObject) { - existingObject.setData(entry.objectData); + const update = existingObject.setData(entry.objectData); existingObject.setRegionalTimeserial(regionalTimeserialObj); if (existingObject instanceof LiveCounter) { existingObject.setCreated((entry as LiveCounterDataEntry).created); } + // store updates for existing objects to call subscription callbacks for all of them once the SYNC sequence is completed. + // this will ensure that clients get notified about changes only once everything was applied. + existingObjectUpdates.push({ object: existingObject, update }); continue; } @@ -233,5 +237,8 @@ export class LiveObjects { // need to remove LiveObject instances from the LiveObjectsPool for which objectIds were not received during the SYNC sequence this._liveObjectsPool.deleteExtraObjectIds([...receivedObjectIds]); + + // call subscription callbacks for all updated existing objects + existingObjectUpdates.forEach(({ object, update }) => object.notifyUpdated(update)); } } diff --git a/src/plugins/liveobjects/liveobjectspool.ts b/src/plugins/liveobjects/liveobjectspool.ts index aef14cc8d..a4ea8c92e 100644 --- a/src/plugins/liveobjects/liveobjectspool.ts +++ b/src/plugins/liveobjects/liveobjectspool.ts @@ -89,7 +89,6 @@ export class LiveObjectsPool { if (this.get(stateOperation.objectId)) { // object wich such id already exists (we may have created a zero-value object before, or this is a duplicate *_CREATE op), // so delegate application of the op to that object - // TODO: invoke subscription callbacks for an object when applied this.get(stateOperation.objectId)!.applyOperation(stateOperation, stateMessage, regionalTimeserial); break; } @@ -111,7 +110,6 @@ export class LiveObjectsPool { // we create a zero-value object for the provided object id, and apply operation for that zero-value object. // when we eventually receive a corresponding *_CREATE op for that object, its application will be handled by that zero-value object. this.createZeroValueObjectIfNotExists(stateOperation.objectId); - // TODO: invoke subscription callbacks for an object when applied this.get(stateOperation.objectId)!.applyOperation(stateOperation, stateMessage, regionalTimeserial); break;