diff --git a/src/plugins/liveobjects/defaults.ts b/src/plugins/liveobjects/defaults.ts new file mode 100644 index 000000000..dd6ea3d51 --- /dev/null +++ b/src/plugins/liveobjects/defaults.ts @@ -0,0 +1,10 @@ +export const DEFAULTS = { + gcInterval: 1000 * 60 * 5, // 5 minutes + /** + * Must be > 2 minutes to ensure we keep tombstones long enough to avoid the possibility of receiving an operation + * with an earlier origin timeserial that would not have been applied if the tombstone still existed. + * + * Applies both for map entries tombstones and object tombstones. + */ + gcGracePeriod: 1000 * 60 * 2.5, // 2.5 minutes +}; diff --git a/src/plugins/liveobjects/livecounter.ts b/src/plugins/liveobjects/livecounter.ts index c96d59afe..95b8dbe52 100644 --- a/src/plugins/liveobjects/livecounter.ts +++ b/src/plugins/liveobjects/livecounter.ts @@ -157,6 +157,14 @@ export class LiveCounter extends LiveObject return this._updateFromDataDiff(previousDataRef, this._dataRef); } + /** + * @internal + */ + onGCInterval(): void { + // nothing to GC for a counter object + return; + } + protected _getZeroValueData(): LiveCounterData { return { data: 0 }; } diff --git a/src/plugins/liveobjects/livemap.ts b/src/plugins/liveobjects/livemap.ts index 3051e89da..935e5dbff 100644 --- a/src/plugins/liveobjects/livemap.ts +++ b/src/plugins/liveobjects/livemap.ts @@ -1,6 +1,7 @@ import deepEqual from 'deep-equal'; import type * as API from '../../../ably'; +import { DEFAULTS } from './defaults'; import { LiveObject, LiveObjectData, LiveObjectUpdate, LiveObjectUpdateNoop } from './liveobject'; import { LiveObjects } from './liveobjects'; import { @@ -33,6 +34,10 @@ export type StateData = ObjectIdStateData | ValueStateData; export interface MapEntry { tombstone: boolean; + /** + * Can't use timeserial from the operation that deleted the entry for the same reason as for {@link LiveObject} tombstones, see explanation there. + */ + tombstonedAt: number | undefined; timeserial: string | undefined; data: StateData | undefined; } @@ -291,6 +296,22 @@ export class LiveMap extends LiveObject= DEFAULTS.gcGracePeriod) { + keysToDelete.push(key); + } + } + + keysToDelete.forEach((x) => this._dataRef.data.delete(x)); + } + protected _getZeroValueData(): LiveMapData { return { data: new Map() }; } @@ -455,11 +476,13 @@ export class LiveMap extends LiveObject extends LiveObject extends LiveObject; protected _createOperationIsMerged: boolean; private _tombstone: boolean; + /** + * Even though the `timeserial` from the operation that deleted the object contains the timestamp value, + * the `timeserial` should be treated as an opaque string on the client, meaning we should not attempt to parse it. + * + * Therefore, we need to set our own timestamp when the object is deleted client-side. Strictly speaking, this is + * slightly less precise, as we will GC the object later than the server, but it is an acceptable compromise. + */ + private _tombstonedAt: number | undefined; protected constructor( protected _liveObjects: LiveObjects, @@ -108,6 +116,7 @@ export abstract class LiveObject< */ tombstone(): void { this._tombstone = true; + this._tombstonedAt = Date.now(); this._dataRef = this._getZeroValueData(); // TODO: emit "deleted" event so that end users get notified about this object getting deleted } @@ -119,6 +128,13 @@ export abstract class LiveObject< return this._tombstone; } + /** + * @internal + */ + tombstonedAt(): number | undefined { + return this._tombstonedAt; + } + /** * Returns true if the given origin timeserial indicates that the operation to which it belongs should be applied to the object. * @@ -168,6 +184,11 @@ export abstract class LiveObject< * @internal */ abstract overrideWithStateObject(stateObject: StateObject): TUpdate | LiveObjectUpdateNoop; + /** + * @internal + */ + abstract onGCInterval(): void; + protected abstract _getZeroValueData(): TData; /** * Calculate the update object based on the current Live Object data and incoming new data. diff --git a/src/plugins/liveobjects/liveobjects.ts b/src/plugins/liveobjects/liveobjects.ts index 6ba3a0f2c..06bfe2158 100644 --- a/src/plugins/liveobjects/liveobjects.ts +++ b/src/plugins/liveobjects/liveobjects.ts @@ -2,6 +2,7 @@ import type BaseClient from 'common/lib/client/baseclient'; import type RealtimeChannel from 'common/lib/client/realtimechannel'; import type EventEmitter from 'common/lib/util/eventemitter'; import type * as API from '../../../ably'; +import { DEFAULTS } from './defaults'; import { LiveCounter } from './livecounter'; import { LiveMap } from './livemap'; import { LiveObject, LiveObjectUpdate, LiveObjectUpdateNoop } from './liveobject'; @@ -26,6 +27,9 @@ export class LiveObjects { private _currentSyncCursor: string | undefined; private _bufferedStateOperations: StateMessage[]; + // Used by tests + static _DEFAULTS = DEFAULTS; + constructor(channel: RealtimeChannel) { this._channel = channel; this._client = channel.client; diff --git a/src/plugins/liveobjects/liveobjectspool.ts b/src/plugins/liveobjects/liveobjectspool.ts index eb42d47b4..94f667fdb 100644 --- a/src/plugins/liveobjects/liveobjectspool.ts +++ b/src/plugins/liveobjects/liveobjectspool.ts @@ -1,4 +1,5 @@ import type BaseClient from 'common/lib/client/baseclient'; +import { DEFAULTS } from './defaults'; import { LiveCounter } from './livecounter'; import { LiveMap } from './livemap'; import { LiveObject } from './liveobject'; @@ -13,10 +14,16 @@ export const ROOT_OBJECT_ID = 'root'; export class LiveObjectsPool { private _client: BaseClient; private _pool: Map; + private _gcInterval: ReturnType; constructor(private _liveObjects: LiveObjects) { this._client = this._liveObjects.getClient(); this._pool = this._getInitialPool(); + this._gcInterval = setInterval(() => { + this._onGCInterval(); + }, DEFAULTS.gcInterval); + // call nodejs's Timeout.unref to not require Node.js event loop to remain active due to this interval. see https://nodejs.org/api/timers.html#timeoutunref + this._gcInterval.unref?.(); } get(objectId: string): LiveObject | undefined { @@ -68,4 +75,21 @@ export class LiveObjectsPool { pool.set(root.getObjectId(), root); return pool; } + + private _onGCInterval(): void { + const toDelete: string[] = []; + for (const [objectId, obj] of this._pool.entries()) { + // tombstoned objects should be removed from the pool if they have been tombstoned for longer than grace period. + // by removing them from the local pool, LiveObjects plugin no longer keeps a reference to those objects, allowing JS's + // Garbage Collection to eventually free the memory for those objects, provided the user no longer references them either. + if (obj.isTombstoned() && Date.now() - obj.tombstonedAt()! >= DEFAULTS.gcGracePeriod) { + toDelete.push(objectId); + continue; + } + + obj.onGCInterval(); + } + + toDelete.forEach((x) => this._pool.delete(x)); + } } diff --git a/test/common/modules/private_api_recorder.js b/test/common/modules/private_api_recorder.js index af478b713..74158c910 100644 --- a/test/common/modules/private_api_recorder.js +++ b/test/common/modules/private_api_recorder.js @@ -16,6 +16,9 @@ define(['test/support/output_directory_paths'], function (outputDirectoryPaths) 'call.Defaults.getPort', 'call.Defaults.normaliseOptions', 'call.EventEmitter.emit', + 'call.LiveObject.isTombstoned', + 'call.LiveObjects._liveObjectsPool._onGCInterval', + 'call.LiveObjects._liveObjectsPool.get', 'call.Message.decode', 'call.Message.encode', 'call.Platform.Config.push.storage.clear', @@ -72,6 +75,7 @@ define(['test/support/output_directory_paths'], function (outputDirectoryPaths) 'pass.clientOption.webSocketSlowTimeout', 'pass.clientOption.wsConnectivityCheckUrl', // actually ably-js public API (i.e. it’s in the TypeScript typings) but no other SDK has it. At the same time it's not entirely clear if websocket connectivity check should be considered an ably-js-specific functionality (as for other params above), so for the time being we consider it as private API 'read.Defaults.version', + 'read.LiveMap._dataRef.data', 'read.EventEmitter.events', 'read.Platform.Config.push', 'read.Realtime._transports', @@ -112,6 +116,7 @@ define(['test/support/output_directory_paths'], function (outputDirectoryPaths) 'read.transport.params.mode', 'read.transport.recvRequest.recvUri', 'read.transport.uri', + 'replace.LiveObjects._liveObjectsPool._onGCInterval', 'replace.channel.attachImpl', 'replace.channel.processMessage', 'replace.channel.sendMessage', @@ -128,6 +133,8 @@ define(['test/support/output_directory_paths'], function (outputDirectoryPaths) 'serialize.recoveryKey', 'write.Defaults.ENVIRONMENT', 'write.Defaults.wsConnectivityCheckUrl', + 'write.LiveObjects._DEFAULTS.gcGracePeriod', + 'write.LiveObjects._DEFAULTS.gcInterval', 'write.Platform.Config.push', // This implies using a mock implementation of the internal IPlatformPushConfig interface. Our mock (in push_channel_transport.js) then interacts with internal objects and private APIs of public objects to implement this interface; I haven’t added annotations for that private API usage, since there wasn’t an easy way to pass test context information into the mock. I think that for now we can just say that if we wanted to get rid of this private API usage, then we’d need to remove this mock entirely. 'write.auth.authOptions.requestHeaders', 'write.auth.key', diff --git a/test/realtime/live_objects.test.js b/test/realtime/live_objects.test.js index 22164ef79..d1b3551ff 100644 --- a/test/realtime/live_objects.test.js +++ b/test/realtime/live_objects.test.js @@ -12,6 +12,8 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], const createPM = Ably.makeProtocolMessageFromDeserialized({ LiveObjectsPlugin }); const liveObjectsFixturesChannel = 'liveobjects_fixtures'; const nextTick = Ably.Realtime.Platform.Config.nextTick; + const gcIntervalOriginal = LiveObjectsPlugin.LiveObjects._DEFAULTS.gcInterval; + const gcGracePeriodOriginal = LiveObjectsPlugin.LiveObjects._DEFAULTS.gcGracePeriod; function RealtimeWithLiveObjects(helper, options) { return helper.AblyRealtime({ ...options, plugins: { LiveObjects: LiveObjectsPlugin } }); @@ -2585,6 +2587,157 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], }); }, client); }); + + const tombstonesGCScenarios = [ + // for the next tests we need to access the private API of LiveObjects plugin in order to verify that tombstoned entities were indeed deleted after the GC grace period. + // public API hides that kind of information from the user and returns undefined for tombstoned entities even if realtime client still keeps a reference to them. + { + description: 'tombstoned object is removed from the pool after the GC grace period', + action: async (ctx) => { + const { liveObjectsHelper, channelName, channel, liveObjects, helper, waitForGCCycles } = ctx; + + // send a CREATE op, this add an object to the pool + const { objectId } = await liveObjectsHelper.stateRequest( + channelName, + liveObjectsHelper.counterCreateOp({ count: 1 }), + ); + + expect(liveObjects._liveObjectsPool.get(objectId), 'Check object exists in the pool after creation').to + .exist; + + // inject OBJECT_DELETE for the object. this should tombstone the object and make it inaccessible to the end user, but still keep it in memory in the local pool + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 0, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.objectDeleteOp({ objectId })], + }); + + helper.recordPrivateApi('call.LiveObjects._liveObjectsPool.get'); + expect( + liveObjects._liveObjectsPool.get(objectId), + 'Check object exists in the pool immediately after OBJECT_DELETE', + ).to.exist; + helper.recordPrivateApi('call.LiveObjects._liveObjectsPool.get'); + helper.recordPrivateApi('call.LiveObject.isTombstoned'); + expect(liveObjects._liveObjectsPool.get(objectId).isTombstoned()).to.equal( + true, + `Check object's "tombstone" flag is set to "true" after OBJECT_DELETE`, + ); + + // we expect 2 cycles to guarantee that grace period has expired, which will always be true based on the test config used + await waitForGCCycles(2); + + // object should be removed from the local pool entirely now, as the GC grace period has passed + helper.recordPrivateApi('call.LiveObjects._liveObjectsPool.get'); + expect( + liveObjects._liveObjectsPool.get(objectId), + 'Check object exists does not exist in the pool after the GC grace period expiration', + ).to.not.exist; + }, + }, + + { + description: 'tombstoned map entry is removed from the LiveMap after the GC grace period', + action: async (ctx) => { + const { root, liveObjectsHelper, channelName, helper, waitForGCCycles } = ctx; + + // set a key on a root + await liveObjectsHelper.stateRequest( + channelName, + liveObjectsHelper.mapSetOp({ objectId: 'root', key: 'foo', data: { value: 'bar' } }), + ); + + expect(root.get('foo')).to.equal('bar', 'Check key "foo" exists on root after MAP_SET'); + + // remove the key from the root. this should tombstone the map entry and make it inaccessible to the end user, but still keep it in memory in the underlying map + await liveObjectsHelper.stateRequest( + channelName, + liveObjectsHelper.mapRemoveOp({ objectId: 'root', key: 'foo' }), + ); + + expect(root.get('foo'), 'Check key "foo" is inaccessible via public API on root after MAP_REMOVE').to.not + .exist; + helper.recordPrivateApi('read.LiveMap._dataRef.data'); + expect( + root._dataRef.data.get('foo'), + 'Check map entry for "foo" exists on root in the underlying data immediately after MAP_REMOVE', + ).to.exist; + helper.recordPrivateApi('read.LiveMap._dataRef.data'); + expect( + root._dataRef.data.get('foo').tombstone, + 'Check map entry for "foo" on root has "tombstone" flag set to "true" after MAP_REMOVE', + ).to.exist; + + // we expect 2 cycles to guarantee that grace period has expired, which will always be true based on the test config used + await waitForGCCycles(2); + + // the entry should be removed from the underlying map now + helper.recordPrivateApi('read.LiveMap._dataRef.data'); + expect( + root._dataRef.data.get('foo'), + 'Check map entry for "foo" does not exist on root in the underlying data after the GC grace period expiration', + ).to.not.exist; + }, + }, + ]; + + /** @nospec */ + forScenarios(tombstonesGCScenarios, async function (helper, scenario) { + try { + helper.recordPrivateApi('write.LiveObjects._DEFAULTS.gcInterval'); + LiveObjectsPlugin.LiveObjects._DEFAULTS.gcInterval = 500; + helper.recordPrivateApi('write.LiveObjects._DEFAULTS.gcGracePeriod'); + LiveObjectsPlugin.LiveObjects._DEFAULTS.gcGracePeriod = 250; + + const liveObjectsHelper = new LiveObjectsHelper(helper); + const client = RealtimeWithLiveObjects(helper); + + await helper.monitorConnectionThenCloseAndFinish(async () => { + const channelName = scenario.description; + const channel = client.channels.get(channelName, channelOptionsWithLiveObjects()); + const liveObjects = channel.liveObjects; + + await channel.attach(); + const root = await liveObjects.getRoot(); + + // helper function to spy on the GC interval callback and wait for a specific number of GC cycles. + // returns a promise which will resolve when required number of cycles have happened. + const waitForGCCycles = (cycles) => { + const onGCIntervalOriginal = liveObjects._liveObjectsPool._onGCInterval; + let gcCalledTimes = 0; + return new Promise((resolve) => { + helper.recordPrivateApi('replace.LiveObjects._liveObjectsPool._onGCInterval'); + liveObjects._liveObjectsPool._onGCInterval = function () { + helper.recordPrivateApi('call.LiveObjects._liveObjectsPool._onGCInterval'); + onGCIntervalOriginal.call(this); + + gcCalledTimes++; + if (gcCalledTimes >= cycles) { + resolve(); + liveObjects._liveObjectsPool._onGCInterval = onGCIntervalOriginal; + } + }; + }); + }; + + await scenario.action({ + root, + liveObjectsHelper, + channelName, + channel, + liveObjects, + helper, + waitForGCCycles, + }); + }, client); + } finally { + helper.recordPrivateApi('write.LiveObjects._DEFAULTS.gcInterval'); + LiveObjectsPlugin.LiveObjects._DEFAULTS.gcInterval = gcIntervalOriginal; + helper.recordPrivateApi('write.LiveObjects._DEFAULTS.gcGracePeriod'); + LiveObjectsPlugin.LiveObjects._DEFAULTS.gcGracePeriod = gcGracePeriodOriginal; + } + }); }); /** @nospec */