Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] [DTP-1104] Surface live objects to the end users only when they become valid #1932

Draft
wants to merge 3 commits into
base: integration/liveobjects
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ably.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2103,12 +2103,12 @@ export type DefaultRoot =
*/
export declare interface LiveMap<T extends LiveMapType> extends LiveObject<LiveMapUpdate> {
/**
* Returns the value associated with a given key. Returns `undefined` if the key doesn't exist in a map.
* Returns the value associated with a given key. Returns `undefined` if the key doesn't exist in a map or if the associated {@link LiveObject} doesn't exist.
*
* @param key - The key to retrieve the value for.
* @returns A {@link LiveObject}, a primitive type (string, number, boolean, or binary data) or `undefined` if the key doesn't exist in a map.
* @returns A {@link LiveObject}, a primitive type (string, number, boolean, or binary data) or `undefined` if the key doesn't exist in a map or the associated {@link LiveObject} doesn't exist.
*/
get<TKey extends keyof T & string>(key: TKey): T[TKey];
get<TKey extends keyof T & string>(key: TKey): T[TKey] extends StateValue ? T[TKey] : T[TKey] | undefined;

/**
* Returns the number of key/value pairs in the map.
Expand Down
4 changes: 2 additions & 2 deletions src/plugins/liveobjects/livecounter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>

const previousDataRef = this._dataRef;
// override all relevant data for this object with data from the state object
this._createOperationIsMerged = false;
this._setCreateOperationIsMerged(false);
this._dataRef = { data: stateObject.counter?.count ?? 0 };
// should default to empty map if site timeserials do not exist on the state object, so that any future operation can be applied to this object
this._siteTimeserials = stateObject.siteTimeserials ?? {};
Expand All @@ -149,7 +149,7 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
// if we got here, it means that current counter instance is missing the initial value in its data reference,
// which we're going to add now.
this._dataRef.data += stateOperation.counter?.count ?? 0;
this._createOperationIsMerged = true;
this._setCreateOperationIsMerged(true);

return { update: { inc: stateOperation.counter?.count ?? 0 } };
}
Expand Down
90 changes: 74 additions & 16 deletions src/plugins/liveobjects/livemap.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import deepEqual from 'deep-equal';

import type * as API from '../../../ably';
import { LiveObject, LiveObjectData, LiveObjectUpdate, LiveObjectUpdateNoop } from './liveobject';
import { BufferedOperation, LiveObject, LiveObjectData, LiveObjectUpdate, LiveObjectUpdateNoop } from './liveobject';
import { LiveObjects } from './liveobjects';
import {
MapSemantics,
Expand Down Expand Up @@ -77,13 +77,15 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,

/**
* Returns the value associated with the specified key in the underlying Map object.
* If no element is associated with the specified key, undefined is returned.
* If the value that is associated to the provided key is an objectId string of another Live Object,
* then you will get a reference to that Live Object if it exists in the local pool, or undefined otherwise.
* If the value is not an objectId, then you will get that value.
*
* - If no entry is associated with the specified key, `undefined` is returned.
* - If map entry is tombstoned (deleted), `undefined` is returned.
* - If the value associated with the provided key is an objectId string of another Live Object, a reference to that Live Object
* is returned, provided it exists in the local pool and is valid. Otherwise, `undefined` is returned.
* - If the value is not an objectId, then that value is returned.
*/
// force the key to be of type string as we only allow strings as key in a map
get<TKey extends keyof T & string>(key: TKey): T[TKey] {
get<TKey extends keyof T & string>(key: TKey): T[TKey] extends StateValue ? T[TKey] : T[TKey] | undefined {
const element = this._dataRef.data.get(key);

if (element === undefined) {
Expand All @@ -94,14 +96,26 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
return undefined as T[TKey];
}

// data exists for non-tombstoned elements
// data always exists for non-tombstoned elements
const data = element.data!;

if ('value' in data) {
// map entry has a primitive type value, just return it as is.
return data.value as T[TKey];
} else {
return this._liveObjects.getPool().get(data.objectId) as T[TKey];
}

// map entry points to another object, get it from the pool
const refObject: LiveObject | undefined = this._liveObjects.getPool().get(data.objectId);
if (!refObject) {
return undefined as T[TKey];
}

if (!refObject.isValid()) {
// non-valid objects must not be surfaced to the end users
return undefined as T[TKey];
}

return refObject as API.LiveObject as T[TKey];
Comment on lines +107 to +118
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ensure safe handling of undefined references in get method

In the get method, the non-null assertion operator ! is used when accessing the refObject. Although createZeroValueObjectIfNotExists ensures the object exists, it's prudent to handle the possibility of undefined to prevent runtime errors.

Consider updating the code to handle potential undefined values gracefully:

- const refObject: LiveObject | undefined = this._liveObjects.getPool().get(data.objectId);
- if (!refObject) {
-   return undefined as T[TKey];
- }
-
- if (!refObject.isValid()) {
-   // non-valid objects must not be surfaced to the end users
-   return undefined as T[TKey];
- }
-
- return refObject as API.LiveObject as T[TKey];
+ const refObject = this._liveObjects.getPool().get(data.objectId);
+ if (refObject && refObject.isValid()) {
+   return refObject as API.LiveObject as T[TKey];
+ } else {
+   // Non-valid or undefined objects must not be surfaced to the end users
+   return undefined as T[TKey];
+ }

This change ensures that refObject is both defined and valid before returning it.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// map entry points to another object, get it from the pool
const refObject: LiveObject | undefined = this._liveObjects.getPool().get(data.objectId);
if (!refObject) {
return undefined as T[TKey];
}
if (!refObject.isValid()) {
// non-valid objects must not be surfaced to the end users
return undefined as T[TKey];
}
return refObject as API.LiveObject as T[TKey];
// map entry points to another object, get it from the pool
const refObject = this._liveObjects.getPool().get(data.objectId);
if (refObject && refObject.isValid()) {
return refObject as API.LiveObject as T[TKey];
} else {
// Non-valid or undefined objects must not be surfaced to the end users
return undefined as T[TKey];
}

}

size(): number {
Expand All @@ -112,6 +126,13 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
continue;
}

// data always exists for non-tombstoned elements
const data = value.data!;
if ('objectId' in data && !this._liveObjects.getPool().get(data.objectId)?.isValid()) {
// should not count non-valid objects
continue;
}

size++;
}

Expand Down Expand Up @@ -145,6 +166,16 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
// as it's important to mark that the op was processed by the object
this._siteTimeserials[opSiteCode] = opOriginTimeserial;

if (msg.isMapSetWithObjectIdReference() && !this._liveObjects.getPool().get(op.mapOp?.data?.objectId!)?.isValid()) {
// invalid objects must not be surfaced to the end users, so we cannot apply this MAP_SET operation on the map yet,
// as it will set the key to point to the invalid object. we also can't just update the key on a map right now, as
// that would require us to send an update event for the key, and the user will end up with a key on map which got
// updated to return undefined, which is undesired. instead we should buffer the MAP_SET operation until referenced
// object becomes valid
this._handleMapSetWithInvalidObjectReference(op.mapOp!, opOriginTimeserial);
return;
}

let update: LiveMapUpdate | LiveObjectUpdateNoop;
switch (op.action) {
case StateOperationAction.MAP_CREATE:
Expand Down Expand Up @@ -231,7 +262,7 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,

const previousDataRef = this._dataRef;
// override all relevant data for this object with data from the state object
this._createOperationIsMerged = false;
this._setCreateOperationIsMerged(false);
this._dataRef = this._liveMapDataFromMapEntries(stateObject.map?.entries ?? {});
// should default to empty map if site timeserials do not exist on the state object, so that any future operation can be applied to this object
this._siteTimeserials = stateObject.siteTimeserials ?? {};
Expand Down Expand Up @@ -331,7 +362,7 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
Object.assign(aggregatedUpdate.update, update.update);
});

this._createOperationIsMerged = true;
this._setCreateOperationIsMerged(true);

return aggregatedUpdate;
}
Expand All @@ -344,6 +375,38 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
);
}

private _handleMapSetWithInvalidObjectReference(op: StateMapOp, opOriginTimeserial: string | undefined): void {
const refObjectId = op?.data?.objectId!;
// ensure referenced object always exist so we can subscribe to it becoming valid
this._liveObjects.getPool().createZeroValueObjectIfNotExists(refObjectId);

// wait until the referenced object becomes valid, then apply MAP_SET operation,
// as it will now point to the existing valid object
const { off } = this._liveObjects
.getPool()
.get(refObjectId)!
.onceValid(() => {
try {
const update = this._applyMapSet(op, opOriginTimeserial);
this.notifyUpdated(update);
} catch (error) {
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_ERROR,
`LiveMap._handleMapSetWithInvalidObjectReference()`,
`error applying buffered MAP_SET operation: ${this._client.Utils.inspectError(error)}`,
);
} finally {
this._bufferedOperations.delete(bufferedOperation);
}
});

const bufferedOperation: BufferedOperation = {
cancel: () => off(),
};
this._bufferedOperations.add(bufferedOperation);
}
Comment on lines +378 to +408
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Handle potential null references in _handleMapSetWithInvalidObjectReference

In _handleMapSetWithInvalidObjectReference, the non-null assertion operator ! is used when accessing the refObject. To enhance reliability, consider adding checks to confirm that refObject is defined before proceeding.

Modify the code to include a check for refObject:

- const { off } = this._liveObjects
-   .getPool()
-   .get(refObjectId)!
-   .onceValid(() => {
+ const refObject = this._liveObjects.getPool().get(refObjectId);
+ if (!refObject) {
+   return;
+ }
+ const { off } = refObject.onceValid(() => {
    // ... rest of the code remains the same
  });

This ensures that the onceValid method is only called on a defined refObject, preventing potential runtime errors.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private _handleMapSetWithInvalidObjectReference(op: StateMapOp, opOriginTimeserial: string | undefined): void {
const refObjectId = op?.data?.objectId!;
// ensure referenced object always exist so we can subscribe to it becoming valid
this._liveObjects.getPool().createZeroValueObjectIfNotExists(refObjectId);
// wait until the referenced object becomes valid, then apply MAP_SET operation,
// as it will now point to the existing valid object
const { off } = this._liveObjects
.getPool()
.get(refObjectId)!
.onceValid(() => {
try {
const update = this._applyMapSet(op, opOriginTimeserial);
this.notifyUpdated(update);
} catch (error) {
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_ERROR,
`LiveMap._handleMapSetWithInvalidObjectReference()`,
`error applying buffered MAP_SET operation: ${this._client.Utils.inspectError(error)}`,
);
} finally {
this._bufferedOperations.delete(bufferedOperation);
}
});
const bufferedOperation: BufferedOperation = {
cancel: () => off(),
};
this._bufferedOperations.add(bufferedOperation);
}
private _handleMapSetWithInvalidObjectReference(op: StateMapOp, opOriginTimeserial: string | undefined): void {
const refObjectId = op?.data?.objectId!;
// ensure referenced object always exist so we can subscribe to it becoming valid
this._liveObjects.getPool().createZeroValueObjectIfNotExists(refObjectId);
// wait until the referenced object becomes valid, then apply MAP_SET operation,
// as it will now point to the existing valid object
const refObject = this._liveObjects.getPool().get(refObjectId);
if (!refObject) {
return;
}
const { off } = refObject.onceValid(() => {
try {
const update = this._applyMapSet(op, opOriginTimeserial);
this.notifyUpdated(update);
} catch (error) {
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_ERROR,
`LiveMap._handleMapSetWithInvalidObjectReference()`,
`error applying buffered MAP_SET operation: ${this._client.Utils.inspectError(error)}`,
);
} finally {
this._bufferedOperations.delete(bufferedOperation);
}
});
const bufferedOperation: BufferedOperation = {
cancel: () => off(),
};
this._bufferedOperations.add(bufferedOperation);
}


private _applyMapCreate(op: StateOperation): LiveMapUpdate | LiveObjectUpdateNoop {
if (this._createOperationIsMerged) {
// There can't be two different create operation for the same object id, because the object id
Expand Down Expand Up @@ -395,11 +458,6 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
let liveData: StateData;
if (!Utils.isNil(op.data.objectId)) {
liveData = { objectId: op.data.objectId } as ObjectIdStateData;
// this MAP_SET op is setting a key to point to another object via its object id,
// but it is possible that we don't have the corresponding object in the pool yet (for example, we haven't seen the *_CREATE op for it).
// we don't want to return undefined from this map's .get() method even if we don't have the object,
// so instead we create a zero-value object for that object id if it not exists.
this._liveObjects.getPool().createZeroValueObjectIfNotExists(op.data.objectId);
} else {
liveData = { encoding: op.data.encoding, value: op.data.value } as ValueStateData;
}
Expand Down
60 changes: 60 additions & 0 deletions src/plugins/liveobjects/liveobject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { StateMessage, StateObject, StateOperation } from './statemessage';

enum LiveObjectEvents {
Updated = 'Updated',
Valid = 'Valid',
}

export interface LiveObjectData {
Expand All @@ -25,6 +26,17 @@ export interface SubscribeResponse {
unsubscribe(): void;
}

export interface OnEventResponse {
off(): void;
}

/**
* Provides an interface for a buffered operation with the ability to cancel it, regardless of the buffering mechanism used
*/
export interface BufferedOperation {
cancel(): void;
}

export abstract class LiveObject<
TData extends LiveObjectData = LiveObjectData,
TUpdate extends LiveObjectUpdate = LiveObjectUpdate,
Expand All @@ -39,6 +51,7 @@ export abstract class LiveObject<
protected _dataRef: TData;
protected _siteTimeserials: Record<string, string>;
protected _createOperationIsMerged: boolean;
protected _bufferedOperations: Set<BufferedOperation>;

protected constructor(
protected _liveObjects: LiveObjects,
Expand All @@ -51,6 +64,7 @@ export abstract class LiveObject<
this._objectId = objectId;
// use empty timeserials vector by default, so any future operation can be applied to this object
this._siteTimeserials = {};
this._bufferedOperations = new Set();
}

subscribe(listener: (update: TUpdate) => void): SubscribeResponse {
Expand Down Expand Up @@ -99,6 +113,42 @@ export abstract class LiveObject<
this._eventEmitter.emit(LiveObjectEvents.Updated, update);
}

/**
* Object is considered a "valid object" if we have seen the create operation for that object.
*
* Non-valid objects should be treated as though they don't exist from the perspective of the public API for the end users,
* i.e. the public access API that would return this object instead should return an `undefined`. In other words, non-valid
* objects are not surfaced to the end users and they're not able to interact with it.
*
* Once the create operation for the object has been seen and merged, the object becomes valid and can be exposed to the end users.
*
* @internal
*/
isValid(): boolean {
return this._createOperationIsMerged;
}

/**
* @internal
*/
onceValid(listener: () => void): OnEventResponse {
this._eventEmitter.once(LiveObjectEvents.Valid, listener);

const off = () => {
this._eventEmitter.off(LiveObjectEvents.Valid, listener);
};

return { off };
}

/**
* @internal
*/
cancelBufferedOperations(): void {
this._bufferedOperations.forEach((x) => x.cancel());
this._bufferedOperations.clear();
}

/**
* Returns true if the given origin timeserial indicates that the operation to which it belongs should be applied to the object.
*
Expand All @@ -118,6 +168,16 @@ export abstract class LiveObject<
return !siteTimeserial || opOriginTimeserial > siteTimeserial;
}

protected _setCreateOperationIsMerged(createOperationIsMerged: boolean): void {
const shouldNotifyValid =
createOperationIsMerged === true && this._createOperationIsMerged !== createOperationIsMerged;
this._createOperationIsMerged = createOperationIsMerged;

if (shouldNotifyValid) {
this._eventEmitter.emit(LiveObjectEvents.Valid);
}
}

private _createObjectId(): string {
// TODO: implement object id generation based on live object type and initial value
return Math.random().toString().substring(2);
Expand Down
49 changes: 46 additions & 3 deletions src/plugins/liveobjects/liveobjects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { LiveCounter } from './livecounter';
import { LiveMap } from './livemap';
import { LiveObject, LiveObjectUpdate } from './liveobject';
import { LiveObjectsPool, ROOT_OBJECT_ID } from './liveobjectspool';
import { StateMessage } from './statemessage';
import { StateMessage, StateOperationAction } from './statemessage';
import { SyncLiveObjectsDataPool } from './syncliveobjectsdatapool';

enum LiveObjectsEvents {
Expand Down Expand Up @@ -101,7 +101,7 @@ export class LiveObjects {
return;
}

this._liveObjectsPool.applyStateMessages(stateMessages);
this._applyStateMessages(stateMessages);
}

/**
Expand Down Expand Up @@ -149,6 +149,8 @@ export class LiveObjects {
private _startNewSync(syncId?: string, syncCursor?: string): void {
// need to discard all buffered state operation messages on new sync start
this._bufferedStateOperations = [];
// cancel any buffered operations for all objects in the pool, as we're overriding the current state and they will no longer be valid
this._liveObjectsPool.cancelBufferedOperations();
this._syncLiveObjectsDataPool.reset();
this._currentSyncId = syncId;
this._currentSyncCursor = syncCursor;
Expand All @@ -159,7 +161,7 @@ export class LiveObjects {
this._applySync();
// should apply buffered state operations after we applied the SYNC data.
// can use regular state messages application logic
this._liveObjectsPool.applyStateMessages(this._bufferedStateOperations);
this._applyStateMessages(this._bufferedStateOperations);

this._bufferedStateOperations = [];
this._syncLiveObjectsDataPool.reset();
Expand Down Expand Up @@ -232,4 +234,45 @@ export class LiveObjects {
// call subscription callbacks for all updated existing objects
existingObjectUpdates.forEach(({ object, update }) => object.notifyUpdated(update));
}

private _applyStateMessages(stateMessages: StateMessage[]): void {
for (const stateMessage of stateMessages) {
if (!stateMessage.operation) {
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MAJOR,
'LiveObjects._applyStateMessages()',
`state operation message is received without 'operation' field, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`,
);
continue;
}

const stateOperation = stateMessage.operation;

switch (stateOperation.action) {
case StateOperationAction.MAP_CREATE:
case StateOperationAction.COUNTER_CREATE:
case StateOperationAction.MAP_SET:
case StateOperationAction.MAP_REMOVE:
case StateOperationAction.COUNTER_INC:
// we can receive an op for an object id we don't have yet in the pool. instead of buffering such operations,
// we can create a zero-value object for the provided object id and apply the operation to that zero-value object.
// this also means that all objects are capable of applying the corresponding *_CREATE ops on themselves,
// since they need to be able to eventually initialize themselves from that *_CREATE op.
// so to simplify operations handling, we always try to create a zero-value object in the pool first,
// and then we can always apply the operation on the existing object in the pool.
this._liveObjectsPool.createZeroValueObjectIfNotExists(stateOperation.objectId);
this._liveObjectsPool.get(stateOperation.objectId)!.applyOperation(stateOperation, stateMessage);
break;

default:
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MAJOR,
'LiveObjects._applyStateMessages()',
`received unsupported action in state operation message: ${stateOperation.action}, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`,
);
}
}
}
}
Loading
Loading