Skip to content

Commit

Permalink
Implement application of incoming state operation messages outside of…
Browse files Browse the repository at this point in the history
… sync sequence

Resolves DTP-956
  • Loading branch information
VeskeR committed Oct 22, 2024
1 parent 9149321 commit cd6be9a
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 2 deletions.
33 changes: 33 additions & 0 deletions src/common/lib/client/realtimechannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,39 @@ class RealtimeChannel extends EventEmitter {
break;
}

case actions.STATE: {
if (!this._liveObjects) {
return;
}

const { id, connectionId, timestamp } = message;
const options = this.channelOptions;

const stateMessages = message.state ?? [];
for (let i = 0; i < stateMessages.length; i++) {
try {
const stateMessage = stateMessages[i];

await this.client._LiveObjectsPlugin?.StateMessage.decode(stateMessage, options, decodeData);

if (!stateMessage.connectionId) stateMessage.connectionId = connectionId;
if (!stateMessage.timestamp) stateMessage.timestamp = timestamp;
if (!stateMessage.id) stateMessage.id = id + ':' + i;
} catch (e) {
Logger.logAction(
this.logger,
Logger.LOG_ERROR,
'RealtimeChannel.processMessage()',
(e as Error).toString(),
);
}
}

this._liveObjects.handleStateMessage(stateMessages);

break;
}

case actions.STATE_SYNC: {
if (!this._liveObjects) {
return;
Expand Down
15 changes: 14 additions & 1 deletion src/plugins/liveobjects/liveobjects.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type BaseClient from 'common/lib/client/baseclient';
import type RealtimeChannel from 'common/lib/client/realtimechannel';
import type * as API from '../../../ably';
import type EventEmitter from 'common/lib/util/eventemitter';
import type * as API from '../../../ably';
import { LiveCounter } from './livecounter';
import { LiveMap } from './livemap';
import { LiveObject } from './liveobject';
Expand Down Expand Up @@ -81,6 +81,17 @@ export class LiveObjects {
}
}

/**
* @internal
*/
handleStateMessage(stateMessages: StateMessage[]): void {
if (this._syncInProgress) {
// TODO: handle buffering of state messages during SYNC
}

this._liveObjectsPool.applyStateMessages(stateMessages);
}

/**
* @internal
*/
Expand Down Expand Up @@ -131,6 +142,8 @@ export class LiveObjects {
}

private _endSync(): void {
// TODO: handle applying buffered state messages when SYNC is finished

this._applySync();
this._syncLiveObjectsDataPool.reset();
this._currentSyncId = undefined;
Expand Down
85 changes: 84 additions & 1 deletion src/plugins/liveobjects/liveobjectspool.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import type BaseClient from 'common/lib/client/baseclient';
import type RealtimeChannel from 'common/lib/client/realtimechannel';
import { LiveCounter } from './livecounter';
import { LiveMap } from './livemap';
import { LiveObject } from './liveobject';
import { LiveObjects } from './liveobjects';
import { ObjectId } from './objectid';
import { MapSemantics } from './statemessage';
import { MapSemantics, StateMessage, StateOperation, StateOperationAction } from './statemessage';

export const ROOT_OBJECT_ID = 'root';

Expand All @@ -13,10 +14,12 @@ export const ROOT_OBJECT_ID = 'root';
*/
export class LiveObjectsPool {
private _client: BaseClient;
private _channel: RealtimeChannel;
private _pool: Map<string, LiveObject>;

constructor(private _liveObjects: LiveObjects) {
this._client = this._liveObjects.getClient();
this._channel = this._liveObjects.getChannel();
this._pool = this._getInitialPool();
}

Expand Down Expand Up @@ -66,10 +69,90 @@ export class LiveObjectsPool {
this.set(objectId, zeroValueObject);
}

applyStateMessages(stateMessages: StateMessage[]): void {
for (const stateMessage of stateMessages) {
if (!stateMessage.operation) {
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MAJOR,
'LiveObjects.LiveObjectsPool.applyStateMessages()',
`state operation message is received without 'operation' field, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`,
);
continue;
}

const stateOperation = stateMessage.operation;

switch (stateOperation.action) {
case StateOperationAction.MAP_CREATE:
case StateOperationAction.COUNTER_CREATE:
if (this.get(stateOperation.objectId)) {
// object wich such id already exists (we may have created a zero-value object before, or this is a duplicate *_CREATE op),
// so delegate application of the op to that object
// TODO: invoke subscription callbacks for an object when applied
this.get(stateOperation.objectId)!.applyOperation(stateOperation, stateMessage);
break;
}

// otherwise we can create new objects in the pool
if (stateOperation.counter) {
this._handleCounterCreate(stateOperation);
} else if (stateOperation.map) {
this._handleMapCreate(stateOperation);
} else {
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MAJOR,
'LiveObjects.LiveObjectsPool.applyStateMessages()',
`received unsupported operation in state operation message, expected 'counter' or 'map' to be present, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`,
);
}
break;

case StateOperationAction.MAP_SET:
case StateOperationAction.MAP_REMOVE:
case StateOperationAction.COUNTER_INC:
// we can receive an op for an object id we don't have yet in the pool. instead of buffering such operations,
// we create a zero-value object for the provided object id, and apply operation for that zero-value object.
// when we eventually receive a corresponding *_CREATE op for that object, its application will be handled by that zero-value object.
this.createZeroValueObjectIfNotExists(stateOperation.objectId);
// TODO: invoke subscription callbacks for an object when applied
this.get(stateOperation.objectId)!.applyOperation(stateOperation, stateMessage);
break;

default:
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MAJOR,
'LiveObjects.LiveObjectsPool.applyStateMessages()',
`received unsupported action in state operation message: ${stateOperation.action}, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`,
);
}
}
}

private _getInitialPool(): Map<string, LiveObject> {
const pool = new Map<string, LiveObject>();
const root = new LiveMap(this._liveObjects, MapSemantics.LWW, null, ROOT_OBJECT_ID);
pool.set(root.getObjectId(), root);
return pool;
}

private _handleCounterCreate(stateOperation: StateOperation): void {
const counter = stateOperation.counter!;
this.set(
stateOperation.objectId,
new LiveCounter(this._liveObjects, true, { data: counter.count ?? 0 }, stateOperation.objectId),
);
}

private _handleMapCreate(stateOperation: StateOperation): void {
const map = stateOperation.map!;
const objectData = LiveMap.liveMapDataFromMapEntries(this._client, map.entries ?? {});

this.set(
stateOperation.objectId,
new LiveMap(this._liveObjects, map.semantics ?? MapSemantics.LWW, objectData, stateOperation.objectId),
);
}
}

0 comments on commit cd6be9a

Please sign in to comment.