diff --git a/scripts/moduleReport.ts b/scripts/moduleReport.ts index 0eae4017a..8d1921789 100644 --- a/scripts/moduleReport.ts +++ b/scripts/moduleReport.ts @@ -332,7 +332,6 @@ async function checkLiveObjectsPluginFiles() { 'src/plugins/liveobjects/objectid.ts', 'src/plugins/liveobjects/statemessage.ts', 'src/plugins/liveobjects/syncliveobjectsdatapool.ts', - 'src/plugins/liveobjects/timeserial.ts', ]); return checkBundleFiles(pluginBundleInfo, allowedFiles, 100); diff --git a/src/plugins/liveobjects/livecounter.ts b/src/plugins/liveobjects/livecounter.ts index bfa3a99cc..c645e1cba 100644 --- a/src/plugins/liveobjects/livecounter.ts +++ b/src/plugins/liveobjects/livecounter.ts @@ -1,7 +1,6 @@ import { LiveObject, LiveObjectData, LiveObjectUpdate, LiveObjectUpdateNoop } from './liveobject'; import { LiveObjects } from './liveobjects'; import { StateCounterOp, StateMessage, StateObject, StateOperation, StateOperationAction } from './statemessage'; -import { DefaultTimeserial } from './timeserial'; export interface LiveCounterData extends LiveObjectData { data: number; @@ -49,19 +48,20 @@ export class LiveCounter extends LiveObject ); } - const opOriginTimeserial = DefaultTimeserial.calculateTimeserial(this._client, msg.serial); - if (!this._canApplyOperation(opOriginTimeserial)) { + const opOriginTimeserial = msg.serial!; + const opSiteCode = msg.siteCode!; + if (!this._canApplyOperation(opOriginTimeserial, opSiteCode)) { this._client.Logger.logAction( this._client.logger, this._client.Logger.LOG_MICRO, 'LiveCounter.applyOperation()', - `skipping ${op.action} op: op timeserial ${opOriginTimeserial.toString()} <= site timeserial ${this._siteTimeserials[opOriginTimeserial.siteCode].toString()}; objectId=${this._objectId}`, + `skipping ${op.action} op: op timeserial ${opOriginTimeserial.toString()} <= site timeserial ${this._siteTimeserials[opSiteCode]?.toString()}; objectId=${this._objectId}`, ); return; } // should update stored site timeserial immediately. doesn't matter if we successfully apply the op, // as it's important to mark that the op was processed by the object - this._siteTimeserials[opOriginTimeserial.siteCode] = opOriginTimeserial; + this._siteTimeserials[opSiteCode] = opOriginTimeserial; let update: LiveCounterUpdate | LiveObjectUpdateNoop; switch (op.action) { @@ -125,7 +125,7 @@ export class LiveCounter extends LiveObject // override all relevant data for this object with data from the state object this._createOperationIsMerged = false; this._dataRef = { data: stateObject.counter?.count ?? 0 }; - this._siteTimeserials = this._timeserialMapFromStringMap(stateObject.siteTimeserials); + this._siteTimeserials = stateObject.siteTimeserials; if (!this._client.Utils.isNil(stateObject.createOp)) { this._mergeInitialDataFromCreateOperation(stateObject.createOp); } diff --git a/src/plugins/liveobjects/livemap.ts b/src/plugins/liveobjects/livemap.ts index 3d5109436..e6897aea6 100644 --- a/src/plugins/liveobjects/livemap.ts +++ b/src/plugins/liveobjects/livemap.ts @@ -13,7 +13,6 @@ import { StateOperationAction, StateValue, } from './statemessage'; -import { DefaultTimeserial, Timeserial } from './timeserial'; export interface ObjectIdStateData { /** A reference to another state object, used to support composable state objects. */ @@ -34,7 +33,7 @@ export type StateData = ObjectIdStateData | ValueStateData; export interface MapEntry { tombstone: boolean; - timeserial: Timeserial; + timeserial: string | undefined; data: StateData | undefined; } @@ -131,19 +130,20 @@ export class LiveMap extends LiveObject extends LiveObject extends LiveObject { // for MAP_CREATE op we must use dedicated timeserial field available on an entry, instead of a timeserial on a message - const opOriginTimeserial = entry.timeserial - ? DefaultTimeserial.calculateTimeserial(this._client, entry.timeserial) - : DefaultTimeserial.zeroValueTimeserial(this._client); + const opOriginTimeserial = entry.timeserial; let update: LiveMapUpdate | LiveObjectUpdateNoop; if (entry.tombstone === true) { // entry in MAP_CREATE op is removed, try to apply MAP_REMOVE op @@ -370,20 +368,17 @@ export class LiveMap extends LiveObject extends LiveObject extends LiveObject entryTimeserial; + } + private _liveMapDataFromMapEntries(entries: Record): LiveMapData { const liveMapData: LiveMapData = { data: new Map(), @@ -470,10 +496,7 @@ export class LiveMap extends LiveObject; + protected _siteTimeserials: Record; protected _createOperationIsMerged: boolean; protected constructor( @@ -106,22 +105,17 @@ export abstract class LiveObject< * An operation should be applied if the origin timeserial is strictly greater than the timeserial in the site timeserials for the same site. * If the site timeserials do not contain a timeserial for the site of the origin timeserial, the operation should be applied. */ - protected _canApplyOperation(opOriginTimeserial: Timeserial): boolean { - const siteTimeserial = this._siteTimeserials[opOriginTimeserial.siteCode]; - return !siteTimeserial || opOriginTimeserial.after(siteTimeserial); - } + protected _canApplyOperation(opOriginTimeserial: string | undefined, opSiteCode: string | undefined): boolean { + if (!opOriginTimeserial) { + throw new this._client.ErrorInfo(`Invalid timeserial: ${opOriginTimeserial}`, 50000, 500); + } + + if (!opSiteCode) { + throw new this._client.ErrorInfo(`Invalid site code: ${opSiteCode}`, 50000, 500); + } - protected _timeserialMapFromStringMap(stringTimeserialsMap: Record): Record { - const objTimeserialsMap = Object.entries(stringTimeserialsMap).reduce( - (acc, v) => { - const [key, timeserialString] = v; - acc[key] = DefaultTimeserial.calculateTimeserial(this._client, timeserialString); - return acc; - }, - {} as Record, - ); - - return objTimeserialsMap; + const siteTimeserial = this._siteTimeserials[opSiteCode]; + return !siteTimeserial || opOriginTimeserial > siteTimeserial; } private _createObjectId(): string { diff --git a/src/plugins/liveobjects/statemessage.ts b/src/plugins/liveobjects/statemessage.ts index cadcccca8..74b385630 100644 --- a/src/plugins/liveobjects/statemessage.ts +++ b/src/plugins/liveobjects/statemessage.ts @@ -51,8 +51,8 @@ export interface StateMapEntry { /** * The *origin* timeserial of the last operation that was applied to the map entry. * - * It is optional in a MAP_CREATE operation and might be missing, in which case the client should default to using zero-value timeserial, - * which is the "earliest possible" timeserial. This will allow any other operation to update the field based on a timeserial comparison. + * It is optional in a MAP_CREATE operation and might be missing, in which case the client should use a nullish value for it + * and treat it as the "earliest possible" timeserial for comparison purposes. */ timeserial?: string; /** The data that represents the value of the map entry. */ @@ -140,6 +140,8 @@ export class StateMessage { object?: StateObject; /** Timeserial format. Contains the origin timeserial for this state message. */ serial?: string; + /** Site code corresponding to this message's timeserial */ + siteCode?: string; constructor(private _platform: typeof Platform) {} @@ -357,12 +359,14 @@ export class StateMessage { if (this.timestamp) result += '; timestamp=' + this.timestamp; if (this.clientId) result += '; clientId=' + this.clientId; if (this.connectionId) result += '; connectionId=' + this.connectionId; + if (this.channel) result += '; channel=' + this.channel; // TODO: prettify output for operation and object and encode buffers. // see examples for data in Message and PresenceMessage if (this.operation) result += '; operation=' + JSON.stringify(this.operation); if (this.object) result += '; object=' + JSON.stringify(this.object); if (this.extras) result += '; extras=' + JSON.stringify(this.extras); if (this.serial) result += '; serial=' + this.serial; + if (this.siteCode) result += '; siteCode=' + this.siteCode; result += ']'; diff --git a/src/plugins/liveobjects/timeserial.ts b/src/plugins/liveobjects/timeserial.ts deleted file mode 100644 index bc5c53550..000000000 --- a/src/plugins/liveobjects/timeserial.ts +++ /dev/null @@ -1,190 +0,0 @@ -import type BaseClient from 'common/lib/client/baseclient'; - -/** - * Represents a parsed timeserial. - */ -export interface Timeserial { - /** - * The series ID of the timeserial. - */ - readonly seriesId: string; - - /** - * The site code of the timeserial. - */ - readonly siteCode: string; - - /** - * The timestamp of the timeserial. - */ - readonly timestamp: number; - - /** - * The counter of the timeserial. - */ - readonly counter: number; - - /** - * The index of the timeserial. - */ - readonly index?: number; - - toString(): string; - - before(timeserial: Timeserial | string): boolean; - - after(timeserial: Timeserial | string): boolean; - - equal(timeserial: Timeserial | string): boolean; -} - -/** - * Default implementation of the Timeserial interface. Used internally to parse and compare timeserials. - * - * @internal - */ -export class DefaultTimeserial implements Timeserial { - public readonly seriesId: string; - public readonly siteCode: string; - public readonly timestamp: number; - public readonly counter: number; - public readonly index?: number; - - private constructor( - private _client: BaseClient, - seriesId: string, - timestamp: number, - counter: number, - index?: number, - ) { - this.seriesId = seriesId; - this.timestamp = timestamp; - this.counter = counter; - this.index = index; - // TODO: will be removed once https://ably.atlassian.net/browse/DTP-1078 is implemented on the realtime - this.siteCode = this.seriesId.slice(0, 3); // site code is stored in the first 3 letters of the epoch, which is stored in the series id field - } - - /** - * Returns the string representation of the timeserial object. - * @returns The timeserial string. - */ - toString(): string { - return `${this.seriesId}@${this.timestamp.toString()}-${this.counter.toString()}${this.index ? `:${this.index.toString()}` : ''}`; - } - - /** - * Calculate the timeserial object from a timeserial string. - * - * @param timeserial The timeserial string to parse. - * @returns The parsed timeserial object. - * @throws {@link BaseClient.ErrorInfo | ErrorInfo} if timeserial is invalid. - */ - static calculateTimeserial(client: BaseClient, timeserial: string | null | undefined): Timeserial { - if (client.Utils.isNil(timeserial)) { - throw new client.ErrorInfo(`Invalid timeserial: ${timeserial}`, 50000, 500); - } - - const [seriesId, rest] = timeserial.split('@'); - if (!rest) { - throw new client.ErrorInfo(`Invalid timeserial: ${timeserial}`, 50000, 500); - } - - const [timestamp, counterAndIndex] = rest.split('-'); - if (!timestamp || !counterAndIndex) { - throw new client.ErrorInfo(`Invalid timeserial: ${timeserial}`, 50000, 500); - } - - const [counter, index] = counterAndIndex.split(':'); - if (!counter) { - throw new client.ErrorInfo(`Invalid timeserial: ${timeserial}`, 50000, 500); - } - - return new DefaultTimeserial( - client, - seriesId, - Number(timestamp), - Number(counter), - index ? Number(index) : undefined, - ); - } - - /** - * Returns a zero-value Timeserial `@0-0` - "earliest possible" timeserial. - * - * @returns The timeserial object. - */ - static zeroValueTimeserial(client: BaseClient): Timeserial { - return new DefaultTimeserial(client, '', 0, 0); // @0-0 - } - - /** - * Compares this timeserial to the supplied timeserial, returning a number indicating their relative order. - * @param timeserialToCompare The timeserial to compare against. Can be a string or a Timeserial object. - * @returns 0 if the timeserials are equal, <0 if the first timeserial is less than the second, >0 if the first timeserial is greater than the second. - * @throws {@link BaseClient.ErrorInfo | ErrorInfo} if comparison timeserial is invalid. - */ - private _timeserialCompare(timeserialToCompare: string | Timeserial): number { - const secondTimeserial = - typeof timeserialToCompare === 'string' - ? DefaultTimeserial.calculateTimeserial(this._client, timeserialToCompare) - : timeserialToCompare; - - // Compare the timestamp - const timestampDiff = this.timestamp - secondTimeserial.timestamp; - if (timestampDiff) { - return timestampDiff; - } - - // Compare the counter - const counterDiff = this.counter - secondTimeserial.counter; - if (counterDiff) { - return counterDiff; - } - - // Compare the seriesId lexicographically, but only if both seriesId exist - const seriesComparison = - this.seriesId && - secondTimeserial.seriesId && - this.seriesId !== secondTimeserial.seriesId && - (this.seriesId > secondTimeserial.seriesId ? 1 : -1); - if (seriesComparison) { - return seriesComparison; - } - - // Compare the index, if present - return this.index !== undefined && secondTimeserial.index !== undefined ? this.index - secondTimeserial.index : 0; - } - - /** - * Determines if this timeserial occurs logically before the given timeserial. - * - * @param timeserial The timeserial to compare against. Can be a string or a Timeserial object. - * @returns true if this timeserial precedes the given timeserial, in global order. - * @throws {@link BaseClient.ErrorInfo | ErrorInfo} if the given timeserial is invalid. - */ - before(timeserial: Timeserial | string): boolean { - return this._timeserialCompare(timeserial) < 0; - } - - /** - * Determines if this timeserial occurs logically after the given timeserial. - * - * @param timeserial The timeserial to compare against. Can be a string or a Timeserial object. - * @returns true if this timeserial follows the given timeserial, in global order. - * @throws {@link BaseClient.ErrorInfo | ErrorInfo} if the given timeserial is invalid. - */ - after(timeserial: Timeserial | string): boolean { - return this._timeserialCompare(timeserial) > 0; - } - - /** - * Determines if this timeserial is equal to the given timeserial. - * @param timeserial The timeserial to compare against. Can be a string or a Timeserial object. - * @returns true if this timeserial is equal to the given timeserial. - * @throws {@link BaseClient.ErrorInfo | ErrorInfo} if the given timeserial is invalid. - */ - equal(timeserial: Timeserial | string): boolean { - return this._timeserialCompare(timeserial) === 0; - } -} diff --git a/test/common/modules/live_objects_helper.js b/test/common/modules/live_objects_helper.js index 273a2f499..a5c7a10f3 100644 --- a/test/common/modules/live_objects_helper.js +++ b/test/common/modules/live_objects_helper.js @@ -220,9 +220,12 @@ define(['ably', 'shared_helper', 'live_objects'], function (Ably, Helper, LiveOb } stateOperationMessage(opts) { - const { channelName, serial, state } = opts; + const { channelName, serial, siteCode, state } = opts; - state?.forEach((x, i) => (x.serial = `${serial}:${i}`)); + state?.forEach((stateMessage, i) => { + stateMessage.serial = serial; + stateMessage.siteCode = siteCode; + }); return { action: 19, // STATE diff --git a/test/realtime/live_objects.test.js b/test/realtime/live_objects.test.js index ead316add..6b6cd54b4 100644 --- a/test/realtime/live_objects.test.js +++ b/test/realtime/live_objects.test.js @@ -31,20 +31,30 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], } function forScenarios(scenarios, testFn) { - // if there are scenarios marked as "only", run only them. - // otherwise go over every scenario - const onlyScenarios = scenarios.filter((x) => x.only === true); - const scenariosToRun = onlyScenarios.length > 0 ? onlyScenarios : scenarios; + for (const scenario of scenarios) { + const itFn = scenario.skip ? it.skip : scenario.only ? it.only : it; - for (const scenario of scenariosToRun) { - if (scenario.skip === true) { - continue; - } - - testFn(scenario); + itFn(scenario.description, async function () { + const helper = this.test.helper; + await testFn(helper, scenario); + }); } } + function lexicoTimeserial(seriesId, timestamp, counter, index) { + const paddedTimestamp = timestamp.toString().padStart(14, '0'); + const paddedCounter = counter.toString().padStart(3, '0'); + const paddedIndex = index != null ? index.toString().padStart(3, '0') : undefined; + + // Example: + // + // 01726585978590-001@abcdefghij:001 + // |____________| |_| |________| |_| + // | | | | + // timestamp counter seriesId idx + return `${paddedTimestamp}-${paddedCounter}@${seriesId}` + (paddedIndex ? `:${paddedIndex}` : ''); + } + describe('realtime/live_objects', function () { this.timeout(60 * 1000); @@ -91,7 +101,8 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], // inject STATE message that should be ignored and not break anything without LiveObjects plugin await liveObjectsHelper.processStateOperationMessageOnChannel({ channel: testChannel, - serial: '@0-0', + serial: lexicoTimeserial('aaa', 0, 0), + siteCode: 'aaa', state: [ liveObjectsHelper.mapSetOp({ objectId: 'root', key: 'stringKey', data: { value: 'stringValue' } }), ], @@ -125,7 +136,12 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], await liveObjectsHelper.processStateObjectMessageOnChannel({ channel: testChannel, syncSerial: 'serial:', - state: [liveObjectsHelper.mapObject({ objectId: 'root', siteTimeserials: { '000': '000@0-0' } })], + state: [ + liveObjectsHelper.mapObject({ + objectId: 'root', + siteTimeserials: { aaa: lexicoTimeserial('aaa', 0, 0) }, + }), + ], }); const publishChannel = publishClient.channels.get('channel'); @@ -292,8 +308,8 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], state: [ liveObjectsHelper.mapObject({ objectId: 'root', - siteTimeserials: { '000': '000@0-0' }, - initialEntries: { key: { timeserial: '000@0-0', data: { value: 1 } } }, + siteTimeserials: { aaa: lexicoTimeserial('aaa', 0, 0) }, + initialEntries: { key: { timeserial: lexicoTimeserial('aaa', 0, 0), data: { value: 1 } } }, }), ], }); @@ -653,28 +669,31 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], // send a MAP_SET op first to create a zero-value map with forged site timeserials vector (from the op), and set it on a root. await liveObjectsHelper.processStateOperationMessageOnChannel({ channel, - serial: 'bbb@1-0', + serial: lexicoTimeserial('bbb', 1, 0), + siteCode: 'bbb', state: [liveObjectsHelper.mapSetOp({ objectId: mapId, key: 'foo', data: { value: 'bar' } })], }); await liveObjectsHelper.processStateOperationMessageOnChannel({ channel, - serial: `aaa@${i}-0`, + serial: lexicoTimeserial('aaa', i, 0), + siteCode: 'aaa', state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: mapId, data: { objectId: mapId } })], }); }), ); // inject operations with various timeserial values - for (const [i, serial] of [ - 'bbb@0-0', // existing site, earlier CGO, not applied - 'bbb@1-0', // existing site, same CGO, not applied - 'bbb@2-0', // existing site, later CGO, applied - 'aaa@0-0', // different site, earlier CGO, applied - 'ccc@9-0', // different site, later CGO, applied + for (const [i, { serial, siteCode }] of [ + { serial: lexicoTimeserial('bbb', 0, 0), siteCode: 'bbb' }, // existing site, earlier CGO, not applied + { serial: lexicoTimeserial('bbb', 1, 0), siteCode: 'bbb' }, // existing site, same CGO, not applied + { serial: lexicoTimeserial('bbb', 2, 0), siteCode: 'bbb' }, // existing site, later CGO, applied + { serial: lexicoTimeserial('aaa', 0, 0), siteCode: 'aaa' }, // different site, earlier CGO, applied + { serial: lexicoTimeserial('ccc', 9, 0), siteCode: 'ccc' }, // different site, later CGO, applied ].entries()) { await liveObjectsHelper.processStateOperationMessageOnChannel({ channel, serial, + siteCode, state: [ liveObjectsHelper.mapCreateOp({ objectId: mapIds[i], @@ -817,39 +836,42 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], const mapId = liveObjectsHelper.fakeMapObjectId(); await liveObjectsHelper.processStateOperationMessageOnChannel({ channel, - serial: 'bbb@1-0', + serial: lexicoTimeserial('bbb', 1, 0), + siteCode: 'bbb', state: [ liveObjectsHelper.mapCreateOp({ objectId: mapId, entries: { - foo1: { timeserial: 'bbb@0-0', data: { value: 'bar' } }, - foo2: { timeserial: 'bbb@0-0', data: { value: 'bar' } }, - foo3: { timeserial: 'bbb@0-0', data: { value: 'bar' } }, - foo4: { timeserial: 'bbb@0-0', data: { value: 'bar' } }, - foo5: { timeserial: 'bbb@0-0', data: { value: 'bar' } }, - foo6: { timeserial: 'bbb@0-0', data: { value: 'bar' } }, + foo1: { timeserial: lexicoTimeserial('bbb', 0, 0), data: { value: 'bar' } }, + foo2: { timeserial: lexicoTimeserial('bbb', 0, 0), data: { value: 'bar' } }, + foo3: { timeserial: lexicoTimeserial('bbb', 0, 0), data: { value: 'bar' } }, + foo4: { timeserial: lexicoTimeserial('bbb', 0, 0), data: { value: 'bar' } }, + foo5: { timeserial: lexicoTimeserial('bbb', 0, 0), data: { value: 'bar' } }, + foo6: { timeserial: lexicoTimeserial('bbb', 0, 0), data: { value: 'bar' } }, }, }), ], }); await liveObjectsHelper.processStateOperationMessageOnChannel({ channel, - serial: 'aaa@0-0', + serial: lexicoTimeserial('aaa', 0, 0), + siteCode: 'aaa', state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: 'map', data: { objectId: mapId } })], }); // inject operations with various timeserial values - for (const [i, serial] of [ - 'bbb@0-0', // existing site, earlier site CGO, not applied - 'bbb@1-0', // existing site, same site CGO, not applied - 'bbb@2-0', // existing site, later site CGO, applied, site timeserials updated - 'bbb@2-0', // existing site, same site CGO (updated from last op), not applied - 'aaa@0-0', // different site, earlier entry CGO, not applied - 'ccc@9-0', // different site, later entry CGO, applied + for (const [i, { serial, siteCode }] of [ + { serial: lexicoTimeserial('bbb', 0, 0), siteCode: 'bbb' }, // existing site, earlier site CGO, not applied + { serial: lexicoTimeserial('bbb', 1, 0), siteCode: 'bbb' }, // existing site, same site CGO, not applied + { serial: lexicoTimeserial('bbb', 2, 0), siteCode: 'bbb' }, // existing site, later site CGO, applied, site timeserials updated + { serial: lexicoTimeserial('bbb', 2, 0), siteCode: 'bbb' }, // existing site, same site CGO (updated from last op), not applied + { serial: lexicoTimeserial('aaa', 0, 0), siteCode: 'aaa' }, // different site, earlier entry CGO, not applied + { serial: lexicoTimeserial('ccc', 9, 0), siteCode: 'ccc' }, // different site, later entry CGO, applied ].entries()) { await liveObjectsHelper.processStateOperationMessageOnChannel({ channel, serial, + siteCode, state: [liveObjectsHelper.mapSetOp({ objectId: mapId, key: `foo${i + 1}`, data: { value: 'baz' } })], }); } @@ -941,39 +963,42 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], const mapId = liveObjectsHelper.fakeMapObjectId(); await liveObjectsHelper.processStateOperationMessageOnChannel({ channel, - serial: 'bbb@1-0', + serial: lexicoTimeserial('bbb', 1, 0), + siteCode: 'bbb', state: [ liveObjectsHelper.mapCreateOp({ objectId: mapId, entries: { - foo1: { timeserial: 'bbb@0-0', data: { value: 'bar' } }, - foo2: { timeserial: 'bbb@0-0', data: { value: 'bar' } }, - foo3: { timeserial: 'bbb@0-0', data: { value: 'bar' } }, - foo4: { timeserial: 'bbb@0-0', data: { value: 'bar' } }, - foo5: { timeserial: 'bbb@0-0', data: { value: 'bar' } }, - foo6: { timeserial: 'bbb@0-0', data: { value: 'bar' } }, + foo1: { timeserial: lexicoTimeserial('bbb', 0, 0), data: { value: 'bar' } }, + foo2: { timeserial: lexicoTimeserial('bbb', 0, 0), data: { value: 'bar' } }, + foo3: { timeserial: lexicoTimeserial('bbb', 0, 0), data: { value: 'bar' } }, + foo4: { timeserial: lexicoTimeserial('bbb', 0, 0), data: { value: 'bar' } }, + foo5: { timeserial: lexicoTimeserial('bbb', 0, 0), data: { value: 'bar' } }, + foo6: { timeserial: lexicoTimeserial('bbb', 0, 0), data: { value: 'bar' } }, }, }), ], }); await liveObjectsHelper.processStateOperationMessageOnChannel({ channel, - serial: 'aaa@0-0', + serial: lexicoTimeserial('aaa', 0, 0), + siteCode: 'aaa', state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: 'map', data: { objectId: mapId } })], }); // inject operations with various timeserial values - for (const [i, serial] of [ - 'bbb@0-0', // existing site, earlier site CGO, not applied - 'bbb@1-0', // existing site, same site CGO, not applied - 'bbb@2-0', // existing site, later site CGO, applied, site timeserials updated - 'bbb@2-0', // existing site, same site CGO (updated from last op), not applied - 'aaa@0-0', // different site, earlier entry CGO, not applied - 'ccc@9-0', // different site, later entry CGO, applied + for (const [i, { serial, siteCode }] of [ + { serial: lexicoTimeserial('bbb', 0, 0), siteCode: 'bbb' }, // existing site, earlier site CGO, not applied + { serial: lexicoTimeserial('bbb', 1, 0), siteCode: 'bbb' }, // existing site, same site CGO, not applied + { serial: lexicoTimeserial('bbb', 2, 0), siteCode: 'bbb' }, // existing site, later site CGO, applied, site timeserials updated + { serial: lexicoTimeserial('bbb', 2, 0), siteCode: 'bbb' }, // existing site, same site CGO (updated from last op), not applied + { serial: lexicoTimeserial('aaa', 0, 0), siteCode: 'aaa' }, // different site, earlier entry CGO, not applied + { serial: lexicoTimeserial('ccc', 9, 0), siteCode: 'ccc' }, // different site, later entry CGO, applied ].entries()) { await liveObjectsHelper.processStateOperationMessageOnChannel({ channel, serial, + siteCode, state: [liveObjectsHelper.mapRemoveOp({ objectId: mapId, key: `foo${i + 1}` })], }); } @@ -1069,12 +1094,14 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], // send a COUNTER_INC op first to create a zero-value counter with forged site timeserials vector (from the op), and set it on a root. await liveObjectsHelper.processStateOperationMessageOnChannel({ channel, - serial: 'bbb@1-0', + serial: lexicoTimeserial('bbb', 1, 0), + siteCode: 'bbb', state: [liveObjectsHelper.counterIncOp({ objectId: counterId, amount: 1 })], }); await liveObjectsHelper.processStateOperationMessageOnChannel({ channel, - serial: `aaa@${i}-0`, + serial: lexicoTimeserial('aaa', i, 0), + siteCode: 'aaa', state: [ liveObjectsHelper.mapSetOp({ objectId: 'root', key: counterId, data: { objectId: counterId } }), ], @@ -1083,16 +1110,17 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], ); // inject operations with various timeserial values - for (const [i, serial] of [ - 'bbb@0-0', // existing site, earlier CGO, not applied - 'bbb@1-0', // existing site, same CGO, not applied - 'bbb@2-0', // existing site, later CGO, applied - 'aaa@0-0', // different site, earlier CGO, applied - 'ccc@9-0', // different site, later CGO, applied + for (const [i, { serial, siteCode }] of [ + { serial: lexicoTimeserial('bbb', 0, 0), siteCode: 'bbb' }, // existing site, earlier CGO, not applied + { serial: lexicoTimeserial('bbb', 1, 0), siteCode: 'bbb' }, // existing site, same CGO, not applied + { serial: lexicoTimeserial('bbb', 2, 0), siteCode: 'bbb' }, // existing site, later CGO, applied + { serial: lexicoTimeserial('aaa', 0, 0), siteCode: 'aaa' }, // different site, earlier CGO, applied + { serial: lexicoTimeserial('ccc', 9, 0), siteCode: 'ccc' }, // different site, later CGO, applied ].entries()) { await liveObjectsHelper.processStateOperationMessageOnChannel({ channel, serial, + siteCode, state: [liveObjectsHelper.counterCreateOp({ objectId: counterIds[i], count: 10 })], }); } @@ -1186,27 +1214,30 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], const counterId = liveObjectsHelper.fakeCounterObjectId(); await liveObjectsHelper.processStateOperationMessageOnChannel({ channel, - serial: 'bbb@1-0', + serial: lexicoTimeserial('bbb', 1, 0), + siteCode: 'bbb', state: [liveObjectsHelper.counterCreateOp({ objectId: counterId, count: 1 })], }); await liveObjectsHelper.processStateOperationMessageOnChannel({ channel, - serial: 'aaa@0-0', + serial: lexicoTimeserial('aaa', 0, 0), + siteCode: 'aaa', state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: 'counter', data: { objectId: counterId } })], }); // inject operations with various timeserial values - for (const [i, serial] of [ - 'bbb@0-0', // +10 existing site, earlier CGO, not applied - 'bbb@1-0', // +100 existing site, same CGO, not applied - 'bbb@2-0', // +1000 existing site, later CGO, applied, site timeserials updated - 'bbb@2-0', // +10000 existing site, same CGO (updated from last op), not applied - 'aaa@0-0', // +100000 different site, earlier CGO, applied - 'ccc@9-0', // +1000000 different site, later CGO, applied + for (const [i, { serial, siteCode }] of [ + { serial: lexicoTimeserial('bbb', 0, 0), siteCode: 'bbb' }, // +10 existing site, earlier CGO, not applied + { serial: lexicoTimeserial('bbb', 1, 0), siteCode: 'bbb' }, // +100 existing site, same CGO, not applied + { serial: lexicoTimeserial('bbb', 2, 0), siteCode: 'bbb' }, // +1000 existing site, later CGO, applied, site timeserials updated + { serial: lexicoTimeserial('bbb', 2, 0), siteCode: 'bbb' }, // +10000 existing site, same CGO (updated from last op), not applied + { serial: lexicoTimeserial('aaa', 0, 0), siteCode: 'aaa' }, // +100000 different site, earlier CGO, applied + { serial: lexicoTimeserial('ccc', 9, 0), siteCode: 'ccc' }, // +1000000 different site, later CGO, applied ].entries()) { await liveObjectsHelper.processStateOperationMessageOnChannel({ channel, serial, + siteCode, state: [liveObjectsHelper.counterIncOp({ objectId: counterId, amount: Math.pow(10, i + 1) })], }); } @@ -1220,25 +1251,22 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], }, ]; - forScenarios(applyOperationsScenarios, (scenario) => - /** @nospec */ - it(scenario.description, async function () { - const helper = this.test.helper; - const liveObjectsHelper = new LiveObjectsHelper(helper); - const client = RealtimeWithLiveObjects(helper); + /** @nospec */ + forScenarios(applyOperationsScenarios, async function (helper, scenario) { + const liveObjectsHelper = new LiveObjectsHelper(helper); + const client = RealtimeWithLiveObjects(helper); - await helper.monitorConnectionThenCloseAndFinish(async () => { - const channelName = scenario.description; - const channel = client.channels.get(channelName, channelOptionsWithLiveObjects()); - const liveObjects = channel.liveObjects; + await helper.monitorConnectionThenCloseAndFinish(async () => { + const channelName = scenario.description; + const channel = client.channels.get(channelName, channelOptionsWithLiveObjects()); + const liveObjects = channel.liveObjects; - await channel.attach(); - const root = await liveObjects.getRoot(); + await channel.attach(); + const root = await liveObjects.getRoot(); - await scenario.action({ root, liveObjectsHelper, channelName, channel }); - }, client); - }), - ); + await scenario.action({ root, liveObjectsHelper, channelName, channel }); + }, client); + }); const applyOperationsDuringSyncScenarios = [ { @@ -1257,7 +1285,8 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], primitiveKeyData.map((keyData) => liveObjectsHelper.processStateOperationMessageOnChannel({ channel, - serial: '@0-0', + serial: lexicoTimeserial('aaa', 0, 0), + siteCode: 'aaa', state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: keyData.key, data: keyData.data })], }), ), @@ -1287,7 +1316,8 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], primitiveKeyData.map((keyData, i) => liveObjectsHelper.processStateOperationMessageOnChannel({ channel, - serial: `aaa@${i}-0`, + serial: lexicoTimeserial('aaa', i, 0), + siteCode: 'aaa', state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: keyData.key, data: keyData.data })], }), ), @@ -1329,10 +1359,11 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], // inject operations, expect them to be discarded when sync with new sequence id starts await Promise.all( - primitiveKeyData.map((keyData) => + primitiveKeyData.map((keyData, i) => liveObjectsHelper.processStateOperationMessageOnChannel({ channel, - serial: '@0-0', + serial: lexicoTimeserial('aaa', i, 0), + siteCode: 'aaa', state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: keyData.key, data: keyData.data })], }), ), @@ -1347,7 +1378,8 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], // inject another operation that should be applied when latest sync ends await liveObjectsHelper.processStateOperationMessageOnChannel({ channel, - serial: '@0-0', + serial: lexicoTimeserial('bbb', 0, 0), + siteCode: 'bbb', state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: 'foo', data: { value: 'bar' } })], }); @@ -1391,34 +1423,34 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], liveObjectsHelper.mapObject({ objectId: mapId, siteTimeserials: { - bbb: 'bbb@2-0', - ccc: 'ccc@5-0', + bbb: lexicoTimeserial('bbb', 2, 0), + ccc: lexicoTimeserial('ccc', 5, 0), }, materialisedEntries: { - foo1: { timeserial: 'bbb@0-0', data: { value: 'bar' } }, - foo2: { timeserial: 'bbb@0-0', data: { value: 'bar' } }, - foo3: { timeserial: 'ccc@5-0', data: { value: 'bar' } }, - foo4: { timeserial: 'bbb@0-0', data: { value: 'bar' } }, - foo5: { timeserial: 'bbb@2-0', data: { value: 'bar' } }, - foo6: { timeserial: 'ccc@2-0', data: { value: 'bar' } }, - foo7: { timeserial: 'ccc@0-0', data: { value: 'bar' } }, - foo8: { timeserial: 'ccc@0-0', data: { value: 'bar' } }, + foo1: { timeserial: lexicoTimeserial('bbb', 0, 0), data: { value: 'bar' } }, + foo2: { timeserial: lexicoTimeserial('bbb', 0, 0), data: { value: 'bar' } }, + foo3: { timeserial: lexicoTimeserial('ccc', 5, 0), data: { value: 'bar' } }, + foo4: { timeserial: lexicoTimeserial('bbb', 0, 0), data: { value: 'bar' } }, + foo5: { timeserial: lexicoTimeserial('bbb', 2, 0), data: { value: 'bar' } }, + foo6: { timeserial: lexicoTimeserial('ccc', 2, 0), data: { value: 'bar' } }, + foo7: { timeserial: lexicoTimeserial('ccc', 0, 0), data: { value: 'bar' } }, + foo8: { timeserial: lexicoTimeserial('ccc', 0, 0), data: { value: 'bar' } }, }, }), liveObjectsHelper.counterObject({ objectId: counterId, siteTimeserials: { - bbb: 'bbb@1-0', + bbb: lexicoTimeserial('bbb', 1, 0), }, initialCount: 1, }), // add objects to the root so they're discoverable in the state tree liveObjectsHelper.mapObject({ objectId: 'root', - siteTimeserials: { '000': '000@0-0' }, + siteTimeserials: { aaa: lexicoTimeserial('aaa', 0, 0) }, initialEntries: { - map: { timeserial: '000@0-0', data: { objectId: mapId } }, - counter: { timeserial: '000@0-0', data: { objectId: counterId } }, + map: { timeserial: lexicoTimeserial('aaa', 0, 0), data: { objectId: mapId } }, + counter: { timeserial: lexicoTimeserial('aaa', 0, 0), data: { objectId: counterId } }, }, }), ], @@ -1426,37 +1458,39 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], // inject operations with various timeserial values // Map: - for (const [i, serial] of [ - 'bbb@1-0', // existing site, earlier site CGO, not applied - 'bbb@2-0', // existing site, same site CGO, not applied - 'bbb@3-0', // existing site, later site CGO, earlier entry CGO, not applied but site timeserial updated + for (const [i, { serial, siteCode }] of [ + { serial: lexicoTimeserial('bbb', 1, 0), siteCode: 'bbb' }, // existing site, earlier site CGO, not applied + { serial: lexicoTimeserial('bbb', 2, 0), siteCode: 'bbb' }, // existing site, same site CGO, not applied + { serial: lexicoTimeserial('bbb', 3, 0), siteCode: 'bbb' }, // existing site, later site CGO, earlier entry CGO, not applied but site timeserial updated // message with later site CGO, same entry CGO case is not possible, as timeserial from entry would be set for the corresponding site code or be less than that - 'bbb@3-0', // existing site, same site CGO (updated from last op), later entry CGO, not applied - 'bbb@4-0', // existing site, later site CGO, later entry CGO, applied - 'aaa@1-0', // different site, earlier entry CGO, not applied but site timeserial updated - 'aaa@1-0', // different site, same site CGO (updated from last op), later entry CGO, not applied + { serial: lexicoTimeserial('bbb', 3, 0), siteCode: 'bbb' }, // existing site, same site CGO (updated from last op), later entry CGO, not applied + { serial: lexicoTimeserial('bbb', 4, 0), siteCode: 'bbb' }, // existing site, later site CGO, later entry CGO, applied + { serial: lexicoTimeserial('aaa', 1, 0), siteCode: 'aaa' }, // different site, earlier entry CGO, not applied but site timeserial updated + { serial: lexicoTimeserial('aaa', 1, 0), siteCode: 'aaa' }, // different site, same site CGO (updated from last op), later entry CGO, not applied // different site with matching entry CGO case is not possible, as matching entry timeserial means that that timeserial is in the site timeserials vector - 'ddd@1-0', // different site, later entry CGO, applied + { serial: lexicoTimeserial('ddd', 1, 0), siteCode: 'ddd' }, // different site, later entry CGO, applied ].entries()) { await liveObjectsHelper.processStateOperationMessageOnChannel({ channel, serial, + siteCode, state: [liveObjectsHelper.mapSetOp({ objectId: mapId, key: `foo${i + 1}`, data: { value: 'baz' } })], }); } // Counter: - for (const [i, serial] of [ - 'bbb@0-0', // +10 existing site, earlier CGO, not applied - 'bbb@1-0', // +100 existing site, same CGO, not applied - 'bbb@2-0', // +1000 existing site, later CGO, applied, site timeserials updated - 'bbb@2-0', // +10000 existing site, same CGO (updated from last op), not applied - 'aaa@0-0', // +100000 different site, earlier CGO, applied - 'ccc@9-0', // +1000000 different site, later CGO, applied + for (const [i, { serial, siteCode }] of [ + { serial: lexicoTimeserial('bbb', 0, 0), siteCode: 'bbb' }, // +10 existing site, earlier CGO, not applied + { serial: lexicoTimeserial('bbb', 1, 0), siteCode: 'bbb' }, // +100 existing site, same CGO, not applied + { serial: lexicoTimeserial('bbb', 2, 0), siteCode: 'bbb' }, // +1000 existing site, later CGO, applied, site timeserials updated + { serial: lexicoTimeserial('bbb', 2, 0), siteCode: 'bbb' }, // +10000 existing site, same CGO (updated from last op), not applied + { serial: lexicoTimeserial('aaa', 0, 0), siteCode: 'aaa' }, // +100000 different site, earlier CGO, applied + { serial: lexicoTimeserial('ccc', 9, 0), siteCode: 'ccc' }, // +1000000 different site, later CGO, applied ].entries()) { await liveObjectsHelper.processStateOperationMessageOnChannel({ channel, serial, + siteCode, state: [liveObjectsHelper.counterIncOp({ objectId: counterId, amount: Math.pow(10, i + 1) })], }); } @@ -1510,7 +1544,8 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], primitiveKeyData.map((keyData, i) => liveObjectsHelper.processStateOperationMessageOnChannel({ channel, - serial: `aaa@${i}-0`, + serial: lexicoTimeserial('aaa', i, 0), + siteCode: 'aaa', state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: keyData.key, data: keyData.data })], }), ), @@ -1554,27 +1589,24 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], }, ]; - forScenarios(applyOperationsDuringSyncScenarios, (scenario) => - /** @nospec */ - it(scenario.description, async function () { - const helper = this.test.helper; - const liveObjectsHelper = new LiveObjectsHelper(helper); - const client = RealtimeWithLiveObjects(helper); + /** @nospec */ + forScenarios(applyOperationsDuringSyncScenarios, async function (helper, scenario) { + const liveObjectsHelper = new LiveObjectsHelper(helper); + const client = RealtimeWithLiveObjects(helper); - await helper.monitorConnectionThenCloseAndFinish(async () => { - const channelName = scenario.description; - const channel = client.channels.get(channelName, channelOptionsWithLiveObjects()); - const liveObjects = channel.liveObjects; + await helper.monitorConnectionThenCloseAndFinish(async () => { + const channelName = scenario.description; + const channel = client.channels.get(channelName, channelOptionsWithLiveObjects()); + const liveObjects = channel.liveObjects; - await channel.attach(); - // wait for getRoot() to resolve so the initial SYNC sequence is completed, - // as we're going to initiate a new one to test applying operations during SYNC sequence. - const root = await liveObjects.getRoot(); + await channel.attach(); + // wait for getRoot() to resolve so the initial SYNC sequence is completed, + // as we're going to initiate a new one to test applying operations during SYNC sequence. + const root = await liveObjects.getRoot(); - await scenario.action({ root, liveObjectsHelper, channelName, channel }); - }, client); - }), - ); + await scenario.action({ root, liveObjectsHelper, channelName, channel }); + }, client); + }); const subscriptionCallbacksScenarios = [ { @@ -2043,49 +2075,46 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], }, ]; - forScenarios(subscriptionCallbacksScenarios, (scenario) => - /** @nospec */ - it(scenario.description, async function () { - const helper = this.test.helper; - const liveObjectsHelper = new LiveObjectsHelper(helper); - const client = RealtimeWithLiveObjects(helper); + /** @nospec */ + forScenarios(subscriptionCallbacksScenarios, async function (helper, scenario) { + const liveObjectsHelper = new LiveObjectsHelper(helper); + const client = RealtimeWithLiveObjects(helper); - await helper.monitorConnectionThenCloseAndFinish(async () => { - const channelName = scenario.description; - const channel = client.channels.get(channelName, channelOptionsWithLiveObjects()); - const liveObjects = channel.liveObjects; + await helper.monitorConnectionThenCloseAndFinish(async () => { + const channelName = scenario.description; + const channel = client.channels.get(channelName, channelOptionsWithLiveObjects()); + const liveObjects = channel.liveObjects; - await channel.attach(); - const root = await liveObjects.getRoot(); + await channel.attach(); + const root = await liveObjects.getRoot(); - const sampleMapKey = 'sampleMap'; - const sampleCounterKey = 'sampleCounter'; + const sampleMapKey = 'sampleMap'; + const sampleCounterKey = 'sampleCounter'; - // prepare map and counter objects for use by the scenario - const { objectId: sampleMapObjectId } = await liveObjectsHelper.createAndSetOnMap(channelName, { - mapObjectId: 'root', - key: sampleMapKey, - createOp: liveObjectsHelper.mapCreateOp(), - }); - const { objectId: sampleCounterObjectId } = await liveObjectsHelper.createAndSetOnMap(channelName, { - mapObjectId: 'root', - key: sampleCounterKey, - createOp: liveObjectsHelper.counterCreateOp(), - }); + // prepare map and counter objects for use by the scenario + const { objectId: sampleMapObjectId } = await liveObjectsHelper.createAndSetOnMap(channelName, { + mapObjectId: 'root', + key: sampleMapKey, + createOp: liveObjectsHelper.mapCreateOp(), + }); + const { objectId: sampleCounterObjectId } = await liveObjectsHelper.createAndSetOnMap(channelName, { + mapObjectId: 'root', + key: sampleCounterKey, + createOp: liveObjectsHelper.counterCreateOp(), + }); - await scenario.action({ - root, - liveObjectsHelper, - channelName, - channel, - sampleMapKey, - sampleMapObjectId, - sampleCounterKey, - sampleCounterObjectId, - }); - }, client); - }), - ); + await scenario.action({ + root, + liveObjectsHelper, + channelName, + channel, + sampleMapKey, + sampleMapObjectId, + sampleCounterKey, + sampleCounterObjectId, + }); + }, client); + }); }); /** @nospec */