Skip to content

Commit

Permalink
Implement handling of the server-initiated state sync sequence
Browse files Browse the repository at this point in the history
STATE_SYNC message processing in `RealtimeChannel.processMessage` is
based on the process for `PRESENCE` message.

Resolves DTP-950
  • Loading branch information
VeskeR committed Oct 9, 2024
1 parent c57c866 commit 42dc15f
Show file tree
Hide file tree
Showing 7 changed files with 288 additions and 14 deletions.
49 changes: 47 additions & 2 deletions src/common/lib/client/realtimechannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import Message, {
fromValuesArray as messagesFromValuesArray,
encodeArray as encodeMessagesArray,
decode as decodeMessage,
decodeData,
getMessagesSize,
CipherOptions,
EncodingDecodingContext,
Expand Down Expand Up @@ -533,12 +534,18 @@ class RealtimeChannel extends EventEmitter {
const resumed = message.hasFlag('RESUMED');
const hasPresence = message.hasFlag('HAS_PRESENCE');
const hasBacklog = message.hasFlag('HAS_BACKLOG');
const hasState = message.hasFlag('HAS_STATE');
if (this.state === 'attached') {
if (!resumed) {
/* On a loss of continuity, the presence set needs to be re-synced */
// we have lost continuity.
// the presence set needs to be re-synced
if (this._presence) {
this._presence.onAttached(hasPresence);
}
// the Live Objects state needs to be re-synced
if (this._liveObjects) {
this._liveObjects.onAttached(hasState);
}
}
const change = new ChannelStateChange(this.state, this.state, resumed, hasBacklog, message.error);
this._allChannelChanges.emit('update', change);
Expand All @@ -549,7 +556,7 @@ class RealtimeChannel extends EventEmitter {
/* RTL5i: re-send DETACH and remain in the 'detaching' state */
this.checkPendingState();
} else {
this.notifyState('attached', message.error, resumed, hasPresence, hasBacklog);
this.notifyState('attached', message.error, resumed, hasPresence, hasBacklog, hasState);
}
break;
}
Expand Down Expand Up @@ -613,6 +620,40 @@ class RealtimeChannel extends EventEmitter {
}
break;
}

case actions.STATE_SYNC: {
const { id, connectionId, timestamp } = message;
const options = this.channelOptions;

const stateMessages = message.state ?? [];
for (let i = 0; i < stateMessages.length; i++) {
try {
const stateMessage = stateMessages[i];

await this.client._LiveObjectsPlugin?.StateMessage.decode(stateMessage, options, decodeData);

if (!stateMessage.connectionId) stateMessage.connectionId = connectionId;
if (!stateMessage.timestamp) stateMessage.timestamp = timestamp;
if (!stateMessage.id) stateMessage.id = id + ':' + i;

stateMessages.push(stateMessage);
} catch (e) {
Logger.logAction(
this.logger,
Logger.LOG_ERROR,
'RealtimeChannel.processMessage()',
(e as Error).toString(),
);
}
}

if (this._liveObjects) {
this._liveObjects.handleStateSyncMessage(stateMessages, message.channelSerial);
}

break;
}

case actions.MESSAGE: {
//RTL17
if (this.state !== 'attached') {
Expand Down Expand Up @@ -743,6 +784,7 @@ class RealtimeChannel extends EventEmitter {
resumed?: boolean,
hasPresence?: boolean,
hasBacklog?: boolean,
hasState?: boolean,
): void {
Logger.logAction(
this.logger,
Expand All @@ -763,6 +805,9 @@ class RealtimeChannel extends EventEmitter {
if (this._presence) {
this._presence.actOnChannelState(state, hasPresence, reason);
}
if (this._liveObjects) {
this._liveObjects.actOnChannelState(state, hasState, reason);
}
if (state === 'suspended' && this.connectionManager.state.sendEvents) {
this.startRetryTimer();
} else {
Expand Down
4 changes: 2 additions & 2 deletions src/plugins/liveobjects/livecounter.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { LiveObject } from './liveobject';
import { LiveObject, LiveObjectData } from './liveobject';

export interface LiveCounterData {
export interface LiveCounterData extends LiveObjectData {
data: number;
}

Expand Down
7 changes: 3 additions & 4 deletions src/plugins/liveobjects/livemap.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { LiveObject } from './liveobject';

export type StateValue = string | number | boolean | Uint8Array;
import { LiveObject, LiveObjectData } from './liveobject';
import { StateValue } from './statemessage';

export interface ObjectIdStateData {
/**
Expand All @@ -23,7 +22,7 @@ export interface MapEntry {
data: StateData;
}

export interface LiveMapData {
export interface LiveMapData extends LiveObjectData {
data: Map<string, MapEntry>;
}

Expand Down
24 changes: 23 additions & 1 deletion src/plugins/liveobjects/liveobject.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import { LiveObjects } from './liveobjects';

interface LiveObjectData {
export interface LiveObjectData {
data: any;
}

export abstract class LiveObject<T extends LiveObjectData = LiveObjectData> {
protected _dataRef: T;
protected _objectId: string;
protected _regionalTimeserial?: string;

constructor(
protected _liveObjects: LiveObjects,
Expand All @@ -24,6 +25,27 @@ export abstract class LiveObject<T extends LiveObjectData = LiveObjectData> {
return this._objectId;
}

/**
* @internal
*/
getRegionalTimeserial(): string | undefined {
return this._regionalTimeserial;
}

/**
* @internal
*/
setData(newDataRef: T): void {
this._dataRef = newDataRef;
}

/**
* @internal
*/
setRegionalTimeserial(regionalTimeserial: string): void {
this._regionalTimeserial = regionalTimeserial;
}

private _createObjectId(): string {
// TODO: implement object id generation based on live object type and initial value
return Math.random().toString().substring(2);
Expand Down
150 changes: 150 additions & 0 deletions src/plugins/liveobjects/liveobjects.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,29 @@
import type BaseClient from 'common/lib/client/baseclient';
import type RealtimeChannel from 'common/lib/client/realtimechannel';
import type ErrorInfo from 'common/lib/types/errorinfo';
import type * as API from '../../../ably';
import { LiveCounter } from './livecounter';
import { LiveMap } from './livemap';
import { LiveObject } from './liveobject';
import { LiveObjectsPool, ROOT_OBJECT_ID } from './liveobjectspool';
import { StateMessage } from './statemessage';
import { SyncLiveObjectsDataPool } from './syncliveobjectsdatapool';

export class LiveObjects {
private _client: BaseClient;
private _channel: RealtimeChannel;
private _liveObjectsPool: LiveObjectsPool;
private _syncLiveObjectsDataPool: SyncLiveObjectsDataPool;
private _syncInProgress: boolean;
private _currentSyncId: string | undefined;
private _currentSyncCursor: string | undefined;

constructor(channel: RealtimeChannel) {
this._channel = channel;
this._client = channel.client;
this._liveObjectsPool = new LiveObjectsPool(this);
this._syncLiveObjectsDataPool = new SyncLiveObjectsDataPool(this);
this._syncInProgress = true;
}

async getRoot(): Promise<LiveMap> {
Expand All @@ -25,4 +37,142 @@ export class LiveObjects {
getPool(): LiveObjectsPool {
return this._liveObjectsPool;
}

/**
* @internal
*/
getClient(): BaseClient {
return this._client;
}

/**
* @internal
*/
handleStateSyncMessage(stateMessages: StateMessage[], syncChannelSerial: string | null | undefined): void {
const { syncId, syncCursor } = this._parseSyncChannelSerial(syncChannelSerial);
if (this._currentSyncId !== syncId) {
this._startNewSync(syncId, syncCursor);
}

// TODO: delegate state messages to _syncLiveObjectsDataPool and create new live and data objects

// if this is the last (or only) message in a sequence of sync updates, end the sync
if (!syncCursor) {
this._endSync();
}
}

/**
* @internal
*/
onAttached(hasState?: boolean): void {
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MINOR,
'LiveObjects.onAttached()',
'channel = ' + this._channel.name + ', hasState = ' + hasState,
);

if (hasState) {
this._startNewSync(undefined);
} else {
// no HAS_STATE flag received on attach, can end SYNC sequence immediately
// and treat it as no state on a channel
this._liveObjectsPool.reset();
this._syncLiveObjectsDataPool.reset();
this._endSync();
}
}

/**
* @internal
*/
actOnChannelState(state: API.ChannelState, hasState?: boolean, stateReason?: ErrorInfo | null): void {

Check failure on line 90 in src/plugins/liveobjects/liveobjects.ts

View workflow job for this annotation

GitHub Actions / lint

'stateReason' is defined but never used
switch (state) {
case 'attached':
this.onAttached(hasState);
break;

case 'detached':
case 'failed':
// TODO: do something
break;

case 'suspended':
// TODO: do something
break;
}
}

private _startNewSync(syncId?: string, syncCursor?: string): void {
this._syncLiveObjectsDataPool.reset();
this._currentSyncId = syncId;
this._currentSyncCursor = syncCursor;
this._syncInProgress = true;
}

private _endSync(): void {
this._applySync();
this._syncLiveObjectsDataPool.reset();
this._currentSyncId = undefined;
this._currentSyncCursor = undefined;
this._syncInProgress = false;
}

private _parseSyncChannelSerial(syncChannelSerial: string | null | undefined): {
syncId: string | undefined;
syncCursor: string | undefined;
} {
let match: RegExpMatchArray | null;
let syncId: string | undefined = undefined;
let syncCursor: string | undefined = undefined;
if (syncChannelSerial && (match = syncChannelSerial.match(/^([\w-]+):(.*)$/))) {
syncId = match[1];
syncCursor = match[2];
}

return {
syncId,
syncCursor,
};
}

private _applySync(): void {
if (this._syncLiveObjectsDataPool.isEmpty()) {
return;
}

const receivedObjectIds = new Set<string>();

for (const [objectId, entry] of this._syncLiveObjectsDataPool.entries()) {
receivedObjectIds.add(objectId);
const existingObject = this._liveObjectsPool.get(objectId);

if (existingObject) {
existingObject.setData(entry.objectData);
existingObject.setRegionalTimeserial(entry.regionalTimeserial);
continue;
}

let newObject: LiveObject;
switch (entry.objectType) {
case 'LiveCounter':
newObject = new LiveCounter(this, entry.objectData, objectId);
break;

case 'LiveMap':
newObject = new LiveMap(this, entry.objectData, objectId);
break;

default:
throw new this._client.ErrorInfo(`Unknown live object type: ${entry.objectType}`, 40000, 400);
}
newObject.setRegionalTimeserial(entry.regionalTimeserial);

this._liveObjectsPool.set(objectId, newObject);
}

// need to remove LiveObject instances from the LiveObjectsPool for which objectIds were not received during the SYNC sequence
this._liveObjectsPool.deleteExtraObjectIds([...receivedObjectIds]);
}
}
33 changes: 28 additions & 5 deletions src/plugins/liveobjects/liveobjectspool.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,46 @@
import type BaseClient from 'common/lib/client/baseclient';
import { LiveMap } from './livemap';
import { LiveObject } from './liveobject';
import { LiveObjects } from './liveobjects';

export type ObjectId = string;
export const ROOT_OBJECT_ID = 'root';

/**
* @internal
*/
export class LiveObjectsPool {
private _pool: Map<ObjectId, LiveObject>;
private _client: BaseClient;
private _pool: Map<string, LiveObject>;

constructor(private _liveObjects: LiveObjects) {
this._client = this._liveObjects.getClient();
this._pool = this._getInitialPool();
}

get(objectId: ObjectId): LiveObject | undefined {
get(objectId: string): LiveObject | undefined {
return this._pool.get(objectId);
}

private _getInitialPool(): Map<ObjectId, LiveObject> {
const pool = new Map<ObjectId, LiveObject>();
/**
* Deletes objects from the pool for which object ids are not found in the provided array of ids.
*/
deleteExtraObjectIds(objectIds: string[]): void {
const poolObjectIds = [...this._pool.keys()];
const extraObjectIds = this._client.Utils.arrSubtract(poolObjectIds, objectIds);

extraObjectIds.forEach((x) => this._pool.delete(x));
}

set(objectId: string, liveObject: LiveObject): void {
this._pool.set(objectId, liveObject);
}

reset(): void {
this._pool = this._getInitialPool();
}

private _getInitialPool(): Map<string, LiveObject> {
const pool = new Map<string, LiveObject>();
const root = new LiveMap(this._liveObjects, null, ROOT_OBJECT_ID);
pool.set(root.getObjectId(), root);
return pool;
Expand Down
Loading

0 comments on commit 42dc15f

Please sign in to comment.