Skip to content

Commit

Permalink
Merge pull request #1926 from ably/DTP-1078/lexico-timeserials
Browse files Browse the repository at this point in the history
[DTP-1078] Use lexico timeserials and `siteCode` field in StateMessages
  • Loading branch information
VeskeR authored Nov 29, 2024
2 parents 40e258d + 9988504 commit 8cdf89e
Show file tree
Hide file tree
Showing 8 changed files with 291 additions and 430 deletions.
1 change: 0 additions & 1 deletion scripts/moduleReport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,6 @@ async function checkLiveObjectsPluginFiles() {
'src/plugins/liveobjects/objectid.ts',
'src/plugins/liveobjects/statemessage.ts',
'src/plugins/liveobjects/syncliveobjectsdatapool.ts',
'src/plugins/liveobjects/timeserial.ts',
]);

return checkBundleFiles(pluginBundleInfo, allowedFiles, 100);
Expand Down
13 changes: 7 additions & 6 deletions src/plugins/liveobjects/livecounter.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { LiveObject, LiveObjectData, LiveObjectUpdate, LiveObjectUpdateNoop } from './liveobject';
import { LiveObjects } from './liveobjects';
import { StateCounterOp, StateMessage, StateObject, StateOperation, StateOperationAction } from './statemessage';
import { DefaultTimeserial } from './timeserial';

export interface LiveCounterData extends LiveObjectData {
data: number;
Expand Down Expand Up @@ -49,19 +48,20 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
);
}

const opOriginTimeserial = DefaultTimeserial.calculateTimeserial(this._client, msg.serial);
if (!this._canApplyOperation(opOriginTimeserial)) {
const opOriginTimeserial = msg.serial!;
const opSiteCode = msg.siteCode!;
if (!this._canApplyOperation(opOriginTimeserial, opSiteCode)) {
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MICRO,
'LiveCounter.applyOperation()',
`skipping ${op.action} op: op timeserial ${opOriginTimeserial.toString()} <= site timeserial ${this._siteTimeserials[opOriginTimeserial.siteCode].toString()}; objectId=${this._objectId}`,
`skipping ${op.action} op: op timeserial ${opOriginTimeserial.toString()} <= site timeserial ${this._siteTimeserials[opSiteCode]?.toString()}; objectId=${this._objectId}`,
);
return;
}
// should update stored site timeserial immediately. doesn't matter if we successfully apply the op,
// as it's important to mark that the op was processed by the object
this._siteTimeserials[opOriginTimeserial.siteCode] = opOriginTimeserial;
this._siteTimeserials[opSiteCode] = opOriginTimeserial;

let update: LiveCounterUpdate | LiveObjectUpdateNoop;
switch (op.action) {
Expand Down Expand Up @@ -125,7 +125,8 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
// override all relevant data for this object with data from the state object
this._createOperationIsMerged = false;
this._dataRef = { data: stateObject.counter?.count ?? 0 };
this._siteTimeserials = this._timeserialMapFromStringMap(stateObject.siteTimeserials);
// should default to empty map if site timeserials do not exist on the state object, so that any future operation can be applied to this object
this._siteTimeserials = stateObject.siteTimeserials ?? {};
if (!this._client.Utils.isNil(stateObject.createOp)) {
this._mergeInitialDataFromCreateOperation(stateObject.createOp);
}
Expand Down
73 changes: 47 additions & 26 deletions src/plugins/liveobjects/livemap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import {
StateOperationAction,
StateValue,
} from './statemessage';
import { DefaultTimeserial, Timeserial } from './timeserial';

export interface ObjectIdStateData {
/** A reference to another state object, used to support composable state objects. */
Expand All @@ -34,7 +33,7 @@ export type StateData = ObjectIdStateData | ValueStateData;

export interface MapEntry {
tombstone: boolean;
timeserial: Timeserial;
timeserial: string | undefined;
data: StateData | undefined;
}

Expand Down Expand Up @@ -131,19 +130,20 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
);
}

const opOriginTimeserial = DefaultTimeserial.calculateTimeserial(this._client, msg.serial);
if (!this._canApplyOperation(opOriginTimeserial)) {
const opOriginTimeserial = msg.serial!;
const opSiteCode = msg.siteCode!;
if (!this._canApplyOperation(opOriginTimeserial, opSiteCode)) {
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MICRO,
'LiveMap.applyOperation()',
`skipping ${op.action} op: op timeserial ${opOriginTimeserial.toString()} <= site timeserial ${this._siteTimeserials[opOriginTimeserial.siteCode].toString()}; objectId=${this._objectId}`,
`skipping ${op.action} op: op timeserial ${opOriginTimeserial.toString()} <= site timeserial ${this._siteTimeserials[opSiteCode]?.toString()}; objectId=${this._objectId}`,
);
return;
}
// should update stored site timeserial immediately. doesn't matter if we successfully apply the op,
// as it's important to mark that the op was processed by the object
this._siteTimeserials[opOriginTimeserial.siteCode] = opOriginTimeserial;
this._siteTimeserials[opSiteCode] = opOriginTimeserial;

let update: LiveMapUpdate | LiveObjectUpdateNoop;
switch (op.action) {
Expand Down Expand Up @@ -233,7 +233,8 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
// override all relevant data for this object with data from the state object
this._createOperationIsMerged = false;
this._dataRef = this._liveMapDataFromMapEntries(stateObject.map?.entries ?? {});
this._siteTimeserials = this._timeserialMapFromStringMap(stateObject.siteTimeserials);
// should default to empty map if site timeserials do not exist on the state object, so that any future operation can be applied to this object
this._siteTimeserials = stateObject.siteTimeserials ?? {};
if (!this._client.Utils.isNil(stateObject.createOp)) {
this._mergeInitialDataFromCreateOperation(stateObject.createOp);
}
Expand Down Expand Up @@ -311,9 +312,7 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
// we can do this by iterating over entries from MAP_CREATE op and apply changes on per-key basis as if we had MAP_SET, MAP_REMOVE operations.
Object.entries(stateOperation.map.entries ?? {}).forEach(([key, entry]) => {
// for MAP_CREATE op we must use dedicated timeserial field available on an entry, instead of a timeserial on a message
const opOriginTimeserial = entry.timeserial
? DefaultTimeserial.calculateTimeserial(this._client, entry.timeserial)
: DefaultTimeserial.zeroValueTimeserial(this._client);
const opOriginTimeserial = entry.timeserial;
let update: LiveMapUpdate | LiveObjectUpdateNoop;
if (entry.tombstone === true) {
// entry in MAP_CREATE op is removed, try to apply MAP_REMOVE op
Expand Down Expand Up @@ -370,20 +369,17 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
return this._mergeInitialDataFromCreateOperation(op);
}

private _applyMapSet(op: StateMapOp, opOriginTimeserial: Timeserial): LiveMapUpdate | LiveObjectUpdateNoop {
private _applyMapSet(op: StateMapOp, opOriginTimeserial: string | undefined): LiveMapUpdate | LiveObjectUpdateNoop {
const { ErrorInfo, Utils } = this._client;

const existingEntry = this._dataRef.data.get(op.key);
if (
existingEntry &&
(opOriginTimeserial.before(existingEntry.timeserial) || opOriginTimeserial.equal(existingEntry.timeserial))
) {
if (existingEntry && !this._canApplyMapOperation(existingEntry.timeserial, opOriginTimeserial)) {
// the operation's origin timeserial <= the entry's timeserial, ignore the operation.
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MICRO,
'LiveMap._applyMapSet()',
`skipping update for key="${op.key}": op timeserial ${opOriginTimeserial.toString()} <= entry timeserial ${existingEntry.timeserial.toString()}; objectId=${this._objectId}`,
`skipping update for key="${op.key}": op timeserial ${opOriginTimeserial?.toString()} <= entry timeserial ${existingEntry.timeserial?.toString()}; objectId=${this._objectId}`,
);
return { noop: true };
}
Expand Down Expand Up @@ -423,18 +419,18 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
return { update: { [op.key]: 'updated' } };
}

private _applyMapRemove(op: StateMapOp, opOriginTimeserial: Timeserial): LiveMapUpdate | LiveObjectUpdateNoop {
private _applyMapRemove(
op: StateMapOp,
opOriginTimeserial: string | undefined,
): LiveMapUpdate | LiveObjectUpdateNoop {
const existingEntry = this._dataRef.data.get(op.key);
if (
existingEntry &&
(opOriginTimeserial.before(existingEntry.timeserial) || opOriginTimeserial.equal(existingEntry.timeserial))
) {
if (existingEntry && !this._canApplyMapOperation(existingEntry.timeserial, opOriginTimeserial)) {
// the operation's origin timeserial <= the entry's timeserial, ignore the operation.
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MICRO,
'LiveMap._applyMapRemove()',
`skipping remove for key="${op.key}": op timeserial ${opOriginTimeserial.toString()} <= entry timeserial ${existingEntry.timeserial.toString()}; objectId=${this._objectId}`,
`skipping remove for key="${op.key}": op timeserial ${opOriginTimeserial?.toString()} <= entry timeserial ${existingEntry.timeserial?.toString()}; objectId=${this._objectId}`,
);
return { noop: true };
}
Expand All @@ -455,6 +451,34 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
return { update: { [op.key]: 'removed' } };
}

/**
* Returns true if the origin timeserials of the given operation and entry indicate that
* the operation should be applied to the entry, following the CRDT semantics of this LiveMap.
*/
private _canApplyMapOperation(entryTimeserial: string | undefined, opTimeserial: string | undefined): boolean {
// for LWW CRDT semantics (the only supported LiveMap semantic) an operation
// should only be applied if its timeserial is strictly greater ("after") than an entry's timeserial.

if (!entryTimeserial && !opTimeserial) {
// if both timeserials are nullish or emptry strings, we treat them as the "earliest possible" timeserials,
// in which case they are "equal", so the operation should not be applied
return false;
}

if (!entryTimeserial) {
// any op timeserial is greater than non-existing entry timeserial
return true;
}

if (!opTimeserial) {
// non-existing op timeserial is lower than any entry timeserial
return false;
}

// if both timeserials exist, compare them lexicographically
return opTimeserial > entryTimeserial;
}

private _liveMapDataFromMapEntries(entries: Record<string, StateMapEntry>): LiveMapData {
const liveMapData: LiveMapData = {
data: new Map<string, MapEntry>(),
Expand All @@ -470,10 +494,7 @@ export class LiveMap<T extends API.LiveMapType> extends LiveObject<LiveMapData,
}

const liveDataEntry: MapEntry = {
...entry,
timeserial: entry.timeserial
? DefaultTimeserial.calculateTimeserial(this._client, entry.timeserial)
: DefaultTimeserial.zeroValueTimeserial(this._client),
timeserial: entry.timeserial,
// true only if we received explicit true. otherwise always false
tombstone: entry.tombstone === true,
data: liveData,
Expand Down
28 changes: 11 additions & 17 deletions src/plugins/liveobjects/liveobject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import type BaseClient from 'common/lib/client/baseclient';
import type EventEmitter from 'common/lib/util/eventemitter';
import { LiveObjects } from './liveobjects';
import { StateMessage, StateObject, StateOperation } from './statemessage';
import { DefaultTimeserial, Timeserial } from './timeserial';

enum LiveObjectEvents {
Updated = 'Updated',
Expand Down Expand Up @@ -38,7 +37,7 @@ export abstract class LiveObject<
* and all state operations applied to the object.
*/
protected _dataRef: TData;
protected _siteTimeserials: Record<string, Timeserial>;
protected _siteTimeserials: Record<string, string>;
protected _createOperationIsMerged: boolean;

protected constructor(
Expand Down Expand Up @@ -106,22 +105,17 @@ export abstract class LiveObject<
* An operation should be applied if the origin timeserial is strictly greater than the timeserial in the site timeserials for the same site.
* If the site timeserials do not contain a timeserial for the site of the origin timeserial, the operation should be applied.
*/
protected _canApplyOperation(opOriginTimeserial: Timeserial): boolean {
const siteTimeserial = this._siteTimeserials[opOriginTimeserial.siteCode];
return !siteTimeserial || opOriginTimeserial.after(siteTimeserial);
}
protected _canApplyOperation(opOriginTimeserial: string | undefined, opSiteCode: string | undefined): boolean {
if (!opOriginTimeserial) {
throw new this._client.ErrorInfo(`Invalid timeserial: ${opOriginTimeserial}`, 50000, 500);
}

if (!opSiteCode) {
throw new this._client.ErrorInfo(`Invalid site code: ${opSiteCode}`, 50000, 500);
}

protected _timeserialMapFromStringMap(stringTimeserialsMap: Record<string, string>): Record<string, Timeserial> {
const objTimeserialsMap = Object.entries(stringTimeserialsMap).reduce(
(acc, v) => {
const [key, timeserialString] = v;
acc[key] = DefaultTimeserial.calculateTimeserial(this._client, timeserialString);
return acc;
},
{} as Record<string, Timeserial>,
);

return objTimeserialsMap;
const siteTimeserial = this._siteTimeserials[opSiteCode];
return !siteTimeserial || opOriginTimeserial > siteTimeserial;
}

private _createObjectId(): string {
Expand Down
8 changes: 6 additions & 2 deletions src/plugins/liveobjects/statemessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ export interface StateMapEntry {
/**
* The *origin* timeserial of the last operation that was applied to the map entry.
*
* It is optional in a MAP_CREATE operation and might be missing, in which case the client should default to using zero-value timeserial,
* which is the "earliest possible" timeserial. This will allow any other operation to update the field based on a timeserial comparison.
* It is optional in a MAP_CREATE operation and might be missing, in which case the client should use a nullish value for it
* and treat it as the "earliest possible" timeserial for comparison purposes.
*/
timeserial?: string;
/** The data that represents the value of the map entry. */
Expand Down Expand Up @@ -140,6 +140,8 @@ export class StateMessage {
object?: StateObject;
/** Timeserial format. Contains the origin timeserial for this state message. */
serial?: string;
/** Site code corresponding to this message's timeserial */
siteCode?: string;

constructor(private _platform: typeof Platform) {}

Expand Down Expand Up @@ -357,12 +359,14 @@ export class StateMessage {
if (this.timestamp) result += '; timestamp=' + this.timestamp;
if (this.clientId) result += '; clientId=' + this.clientId;
if (this.connectionId) result += '; connectionId=' + this.connectionId;
if (this.channel) result += '; channel=' + this.channel;
// TODO: prettify output for operation and object and encode buffers.
// see examples for data in Message and PresenceMessage
if (this.operation) result += '; operation=' + JSON.stringify(this.operation);
if (this.object) result += '; object=' + JSON.stringify(this.object);
if (this.extras) result += '; extras=' + JSON.stringify(this.extras);
if (this.serial) result += '; serial=' + this.serial;
if (this.siteCode) result += '; siteCode=' + this.siteCode;

result += ']';

Expand Down
Loading

0 comments on commit 8cdf89e

Please sign in to comment.