Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DTP-954, DTP-956] Add support for applying incoming state operations outside of STATE_SYNC sequence #1897

Merged
merged 11 commits into from
Nov 8, 2024
Merged
5 changes: 4 additions & 1 deletion scripts/moduleReport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { gzip } from 'zlib';
import Table from 'cli-table';

// The maximum size we allow for a minimal useful Realtime bundle (i.e. one that can subscribe to a channel)
const minimalUsefulRealtimeBundleSizeThresholdsKiB = { raw: 100, gzip: 31 };
const minimalUsefulRealtimeBundleSizeThresholdsKiB = { raw: 101, gzip: 31 };

const baseClientNames = ['BaseRest', 'BaseRealtime'];

Expand Down Expand Up @@ -310,12 +310,15 @@ async function checkLiveObjectsPluginFiles() {
// These are the files that are allowed to contribute >= `threshold` bytes to the LiveObjects bundle.
const allowedFiles = new Set([
'src/plugins/liveobjects/index.ts',
'src/plugins/liveobjects/livecounter.ts',
'src/plugins/liveobjects/livemap.ts',
'src/plugins/liveobjects/liveobject.ts',
'src/plugins/liveobjects/liveobjects.ts',
'src/plugins/liveobjects/liveobjectspool.ts',
'src/plugins/liveobjects/objectid.ts',
'src/plugins/liveobjects/statemessage.ts',
'src/plugins/liveobjects/syncliveobjectsdatapool.ts',
'src/plugins/liveobjects/timeserial.ts',
]);

return checkBundleFiles(pluginBundleInfo, allowedFiles, 100);
Expand Down
35 changes: 34 additions & 1 deletion src/common/lib/client/realtimechannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,39 @@ class RealtimeChannel extends EventEmitter {
break;
}

case actions.STATE: {
if (!this._liveObjects) {
return;
}

const { id, connectionId, timestamp } = message;
const options = this.channelOptions;

const stateMessages = message.state ?? [];
owenpearson marked this conversation as resolved.
Show resolved Hide resolved
for (let i = 0; i < stateMessages.length; i++) {
try {
const stateMessage = stateMessages[i];

await this.client._LiveObjectsPlugin?.StateMessage.decode(stateMessage, options, decodeData);

if (!stateMessage.connectionId) stateMessage.connectionId = connectionId;
if (!stateMessage.timestamp) stateMessage.timestamp = timestamp;
if (!stateMessage.id) stateMessage.id = id + ':' + i;
} catch (e) {
Logger.logAction(
this.logger,
Logger.LOG_ERROR,
'RealtimeChannel.processMessage()',
(e as Error).toString(),
);
}
}

this._liveObjects.handleStateMessages(stateMessages);

break;
}
VeskeR marked this conversation as resolved.
Show resolved Hide resolved

case actions.STATE_SYNC: {
if (!this._liveObjects) {
return;
Expand Down Expand Up @@ -649,7 +682,7 @@ class RealtimeChannel extends EventEmitter {
}
}

this._liveObjects.handleStateSyncMessage(stateMessages, message.channelSerial);
this._liveObjects.handleStateSyncMessages(stateMessages, message.channelSerial);

break;
}
Expand Down
106 changes: 106 additions & 0 deletions src/plugins/liveobjects/livecounter.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,121 @@
import { LiveObject, LiveObjectData } from './liveobject';
import { LiveObjects } from './liveobjects';
import { StateCounter, StateCounterOp, StateOperation, StateOperationAction } from './statemessage';

export interface LiveCounterData extends LiveObjectData {
data: number;
}

export class LiveCounter extends LiveObject<LiveCounterData> {
constructor(
liveObjects: LiveObjects,
private _created: boolean,
initialData?: LiveCounterData | null,
objectId?: string,
) {
super(liveObjects, initialData, objectId);
}

/**
* Returns a {@link LiveCounter} instance with a 0 value.
*
* @internal
*/
static zeroValue(liveobjects: LiveObjects, isCreated: boolean, objectId?: string): LiveCounter {
return new LiveCounter(liveobjects, isCreated, null, objectId);
}

value(): number {
return this._dataRef.data;
}

/**
* @internal
*/
isCreated(): boolean {
return this._created;
}

/**
* @internal
*/
setCreated(created: boolean): void {
this._created = created;
}

/**
* @internal
*/
applyOperation(op: StateOperation): void {
if (op.objectId !== this.getObjectId()) {
throw new this._client.ErrorInfo(
`Cannot apply state operation with objectId=${op.objectId}, to this LiveCounter with objectId=${this.getObjectId()}`,
50000,
500,
);
}

switch (op.action) {
case StateOperationAction.COUNTER_CREATE:
this._applyCounterCreate(op.counter);
break;

case StateOperationAction.COUNTER_INC:
if (this._client.Utils.isNil(op.counterOp)) {
owenpearson marked this conversation as resolved.
Show resolved Hide resolved
this._throwNoPayloadError(op);
} else {
this._applyCounterInc(op.counterOp);
}
break;

default:
throw new this._client.ErrorInfo(
`Invalid ${op.action} op for LiveCounter objectId=${this.getObjectId()}`,
50000,
500,
);
}
}

protected _getZeroValueData(): LiveCounterData {
return { data: 0 };
}

private _throwNoPayloadError(op: StateOperation): void {
throw new this._client.ErrorInfo(
`No payload found for ${op.action} op for LiveCounter objectId=${this.getObjectId()}`,
50000,
500,
);
}

private _applyCounterCreate(op: StateCounter | undefined): void {
if (this.isCreated()) {
// skip COUNTER_CREATE op if this counter is already created
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MICRO,
'LiveCounter._applyCounterCreate()',
`skipping applying COUNTER_CREATE op on a counter instance as it is already created; objectId=${this._objectId}`,
);
return;
}

if (this._client.Utils.isNil(op)) {
// if a counter object is missing for the COUNTER_CREATE op, the initial value is implicitly 0 in this case.
// we need to SUM the initial value to the current value due to the reasons below, but since it's a 0, we can skip addition operation
this.setCreated(true);
return;
}

// note that it is intentional to SUM the incoming count from the create op.
// if we get here, it means that current counter instance wasn't initialized from the COUNTER_CREATE op,
// so it is missing the initial value that we're going to add now.
this._dataRef.data += op.count ?? 0;
VeskeR marked this conversation as resolved.
Show resolved Hide resolved
this.setCreated(true);
}

private _applyCounterInc(op: StateCounterOp): void {
this._dataRef.data += op.amount;
}
}
Loading
Loading