Skip to content

Commit

Permalink
Surface live objects to the end users only when they become valid
Browse files Browse the repository at this point in the history
Valid live objects are those for which the realtime client has seen the
corresponding CREATE operation.

Resolves DTP-1104
  • Loading branch information
VeskeR committed Dec 5, 2024
1 parent 74b34e0 commit 92c8f11
Show file tree
Hide file tree
Showing 7 changed files with 424 additions and 45 deletions.
4 changes: 2 additions & 2 deletions src/plugins/liveobjects/livecounter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>

const previousDataRef = this._dataRef;
// override all relevant data for this object with data from the state object
this._createOperationIsMerged = false;
this._setCreateOperationIsMerged(false);
this._dataRef = { data: stateObject.counter?.count ?? 0 };
// should default to empty map if site timeserials do not exist on the state object, so that any future operation can be applied to this object
this._siteTimeserials = stateObject.siteTimeserials ?? {};
Expand All @@ -149,7 +149,7 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
// if we got here, it means that current counter instance is missing the initial value in its data reference,
// which we're going to add now.
this._dataRef.data += stateOperation.counter?.count ?? 0;
this._createOperationIsMerged = true;
this._setCreateOperationIsMerged(true);

return { update: { inc: stateOperation.counter?.count ?? 0 } };
}
Expand Down
88 changes: 73 additions & 15 deletions src/plugins/liveobjects/livemap.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import deepEqual from 'deep-equal';

import type * as API from '../../../ably';
import { LiveObject, LiveObjectData, LiveObjectUpdate, LiveObjectUpdateNoop } from './liveobject';
import { BufferedOperation, LiveObject, LiveObjectData, LiveObjectUpdate, LiveObjectUpdateNoop } from './liveobject';
import { LiveObjects } from './liveobjects';
import {
MapSemantics,
Expand Down Expand Up @@ -77,10 +77,12 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,

/**
* Returns the value associated with the specified key in the underlying Map object.
* If no element is associated with the specified key, undefined is returned.
* If the value that is associated to the provided key is an objectId string of another Live Object,
* then you will get a reference to that Live Object if it exists in the local pool, or undefined otherwise.
* If the value is not an objectId, then you will get that value.
*
* - If no entry is associated with the specified key, `undefined` is returned.
* - If map entry is tombstoned (deleted), `undefined` is returned.
* - If the value associated with the provided key is an objectId string of another Live Object, a reference to that Live Object
* is returned, provided it exists in the local pool and is valid. Otherwise, `undefined` is returned.
* - If the value is not an objectId, then that value is returned.
*/
// force the key to be of type string as we only allow strings as key in a map
get<TKey extends keyof T & string>(key: TKey): T[TKey] {
Expand All @@ -94,14 +96,26 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
return undefined as T[TKey];
}

// data exists for non-tombstoned elements
// data always exists for non-tombstoned elements
const data = element.data!;

if ('value' in data) {
// map entry has a primitive type value, just return it as is.
return data.value as T[TKey];
} else {
return this._liveObjects.getPool().get(data.objectId) as T[TKey];
}

// map entry points to another object, get it from the pool
const refObject: LiveObject | undefined = this._liveObjects.getPool().get(data.objectId);
if (!refObject) {
return undefined as T[TKey];
}

if (!refObject.isValid()) {
// non-valid objects must not be surfaced to the end users
return undefined as T[TKey];
}

return refObject as API.LiveObject as T[TKey];
}

size(): number {
Expand All @@ -112,6 +126,13 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
continue;
}

// data always exists for non-tombstoned elements
const data = value.data!;
if ('objectId' in data && !this._liveObjects.getPool().get(data.objectId)?.isValid()) {
// should not count non-valid objects
continue;
}

size++;
}

Expand Down Expand Up @@ -145,6 +166,16 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
// as it's important to mark that the op was processed by the object
this._siteTimeserials[opSiteCode] = opOriginTimeserial;

if (msg.isMapSetWithObjectIdReference() && !this._liveObjects.getPool().get(op.mapOp?.data?.objectId!)?.isValid()) {
// invalid objects must not be surfaced to the end users, so we cannot apply this MAP_SET operation on the map yet,
// as it will set the key to point to the invalid object. we also can't just update the key on a map right now, as
// that would require us to send an update event for the key, and the user will end up with a key on map which got
// updated to return undefined, which is undesired. instead we should buffer the MAP_SET operation until referenced
// object becomes valid
this._handleMapSetWithInvalidObjectReference(op.mapOp!, opOriginTimeserial);
return;
}

let update: LiveMapUpdate | LiveObjectUpdateNoop;
switch (op.action) {
case StateOperationAction.MAP_CREATE:
Expand Down Expand Up @@ -231,7 +262,7 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,

const previousDataRef = this._dataRef;
// override all relevant data for this object with data from the state object
this._createOperationIsMerged = false;
this._setCreateOperationIsMerged(false);
this._dataRef = this._liveMapDataFromMapEntries(stateObject.map?.entries ?? {});
// should default to empty map if site timeserials do not exist on the state object, so that any future operation can be applied to this object
this._siteTimeserials = stateObject.siteTimeserials ?? {};
Expand Down Expand Up @@ -331,7 +362,7 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
Object.assign(aggregatedUpdate.update, update.update);
});

this._createOperationIsMerged = true;
this._setCreateOperationIsMerged(true);

return aggregatedUpdate;
}
Expand All @@ -344,6 +375,38 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
);
}

private _handleMapSetWithInvalidObjectReference(op: StateMapOp, opOriginTimeserial: string | undefined): void {
const refObjectId = op?.data?.objectId!;
// ensure referenced object always exist so we can subscribe to it becoming valid
this._liveObjects.getPool().createZeroValueObjectIfNotExists(refObjectId);

// wait until the referenced object becomes valid, then apply MAP_SET operation,
// as it will now point to the existing valid object
const { off } = this._liveObjects
.getPool()
.get(refObjectId)!
.onceValid(() => {
try {
const update = this._applyMapSet(op, opOriginTimeserial);
this.notifyUpdated(update);
} catch (error) {
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_ERROR,
`LiveMap._handleMapSetWithInvalidObjectReference()`,
`error applying buffered MAP_SET operation: ${this._client.Utils.inspectError(error)}`,
);
} finally {
this._bufferedOperations.delete(bufferedOperation);
}
});

const bufferedOperation: BufferedOperation = {
cancel: () => off(),
};
this._bufferedOperations.add(bufferedOperation);
}

private _applyMapCreate(op: StateOperation): LiveMapUpdate | LiveObjectUpdateNoop {
if (this._createOperationIsMerged) {
// There can't be two different create operation for the same object id, because the object id
Expand Down Expand Up @@ -395,11 +458,6 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
let liveData: StateData;
if (!Utils.isNil(op.data.objectId)) {
liveData = { objectId: op.data.objectId } as ObjectIdStateData;
// this MAP_SET op is setting a key to point to another object via its object id,
// but it is possible that we don't have the corresponding object in the pool yet (for example, we haven't seen the *_CREATE op for it).
// we don't want to return undefined from this map's .get() method even if we don't have the object,
// so instead we create a zero-value object for that object id if it not exists.
this._liveObjects.getPool().createZeroValueObjectIfNotExists(op.data.objectId);
} else {
liveData = { encoding: op.data.encoding, value: op.data.value } as ValueStateData;
}
Expand Down
60 changes: 60 additions & 0 deletions src/plugins/liveobjects/liveobject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { StateMessage, StateObject, StateOperation } from './statemessage';

enum LiveObjectEvents {
Updated = 'Updated',
Valid = 'Valid',
}

export interface LiveObjectData {
Expand All @@ -25,6 +26,17 @@ export interface SubscribeResponse {
unsubscribe(): void;
}

export interface OnEventResponse {
off(): void;
}

/**
* Provides an interface for a buffered operation with the ability to cancel it, regardless of the buffering mechanism used
*/
export interface BufferedOperation {
cancel(): void;
}

export abstract class LiveObject<
TData extends LiveObjectData = LiveObjectData,
TUpdate extends LiveObjectUpdate = LiveObjectUpdate,
Expand All @@ -39,6 +51,7 @@ export abstract class LiveObject<
protected _dataRef: TData;
protected _siteTimeserials: Record<string, string>;
protected _createOperationIsMerged: boolean;
protected _bufferedOperations: Set<BufferedOperation>;

protected constructor(
protected _liveObjects: LiveObjects,
Expand All @@ -51,6 +64,7 @@ export abstract class LiveObject<
this._objectId = objectId;
// use empty timeserials vector by default, so any future operation can be applied to this object
this._siteTimeserials = {};
this._bufferedOperations = new Set();
}

subscribe(listener: (update: TUpdate) => void): SubscribeResponse {
Expand Down Expand Up @@ -99,6 +113,42 @@ export abstract class LiveObject<
this._eventEmitter.emit(LiveObjectEvents.Updated, update);
}

/**
* Object is considered a "valid object" if we have seen the create operation for that object.
*
* Non-valid objects should be treated as though they don't exist from the perspective of the public API for the end users,
* i.e. the public access API that would return this object instead should return an `undefined`. In other words, non-valid
* objects are not surfaced to the end users and they're not able to interact with it.
*
* Once the create operation for the object has been seen and merged, the object becomes valid and can be exposed to the end users.
*
* @internal
*/
isValid(): boolean {
return this._createOperationIsMerged;
}

/**
* @internal
*/
onceValid(listener: () => void): OnEventResponse {
this._eventEmitter.once(LiveObjectEvents.Valid, listener);

const off = () => {
this._eventEmitter.off(LiveObjectEvents.Valid, listener);
};

return { off };
}

/**
* @internal
*/
cancelBufferedOperations(): void {
this._bufferedOperations.forEach((x) => x.cancel());
this._bufferedOperations.clear();
}

/**
* Returns true if the given origin timeserial indicates that the operation to which it belongs should be applied to the object.
*
Expand All @@ -118,6 +168,16 @@ export abstract class LiveObject<
return !siteTimeserial || opOriginTimeserial > siteTimeserial;
}

protected _setCreateOperationIsMerged(createOperationIsMerged: boolean): void {
const shouldNotifyValid =
createOperationIsMerged === true && this._createOperationIsMerged !== createOperationIsMerged;
this._createOperationIsMerged = createOperationIsMerged;

if (shouldNotifyValid) {
this._eventEmitter.emit(LiveObjectEvents.Valid);
}
}

private _createObjectId(): string {
// TODO: implement object id generation based on live object type and initial value
return Math.random().toString().substring(2);
Expand Down
2 changes: 2 additions & 0 deletions src/plugins/liveobjects/liveobjects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ export class LiveObjects {
private _startNewSync(syncId?: string, syncCursor?: string): void {
// need to discard all buffered state operation messages on new sync start
this._bufferedStateOperations = [];
// cancel any buffered operations for all objects in the pool, as we're overriding the current state and they will no longer be valid
this._liveObjectsPool.cancelBufferedOperations();
this._syncLiveObjectsDataPool.reset();
this._currentSyncId = syncId;
this._currentSyncCursor = syncCursor;
Expand Down
4 changes: 4 additions & 0 deletions src/plugins/liveobjects/liveobjectspool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ export class LiveObjectsPool {
this.set(objectId, zeroValueObject);
}

cancelBufferedOperations(): void {
this._pool.forEach((x) => x.cancelBufferedOperations());
}

private _getInitialPool(): Map<string, LiveObject> {
const pool = new Map<string, LiveObject>();
const root = LiveMap.zeroValue(this._liveObjects, ROOT_OBJECT_ID);
Expand Down
7 changes: 7 additions & 0 deletions src/plugins/liveobjects/statemessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,13 @@ export class StateMessage {
};
}

/**
* Returns true if this state message is a state operation with `MAP_SET` action and it sets a map entry to point to another objectId.
*/
isMapSetWithObjectIdReference(): boolean {
return this.operation?.action === StateOperationAction.MAP_SET && this.operation.mapOp?.data?.objectId != null;
}

/**
* Overload toJSON() to intercept JSON.stringify()
* @return {*}
Expand Down
Loading

0 comments on commit 92c8f11

Please sign in to comment.