Skip to content

Commit

Permalink
Implement LiveObjects pool init from state SYNC sequence
Browse files Browse the repository at this point in the history
Resolves DTP-949
  • Loading branch information
VeskeR committed Oct 15, 2024
1 parent 6bb5000 commit 66414d3
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 2 deletions.
9 changes: 8 additions & 1 deletion src/plugins/liveobjects/liveobjects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ export class LiveObjects {
return this._liveObjectsPool;
}

/**
* @internal
*/
getChannel(): RealtimeChannel {
return this._channel;
}

/**
* @internal
*/
Expand All @@ -54,7 +61,7 @@ export class LiveObjects {
this._startNewSync(syncId, syncCursor);
}

// TODO: delegate state messages to _syncLiveObjectsDataPool and create new live and data objects
this._syncLiveObjectsDataPool.applyStateMessages(stateMessages);

// if this is the last (or only) message in a sequence of sync updates, end the sync
if (!syncCursor) {
Expand Down
1 change: 1 addition & 0 deletions src/plugins/liveobjects/statemessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ export interface StateMap {
semantics: MapSemantics;
// The map entries, indexed by key.
entries: Record<string, StateMapEntry>;
// TODO: should here also be 'created' field like for StateCounter?
}

/** A Counter object represents an incrementable and decrementable value */
Expand Down
88 changes: 87 additions & 1 deletion src/plugins/liveobjects/syncliveobjectsdatapool.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import type BaseClient from 'common/lib/client/baseclient';
import RealtimeChannel from 'common/lib/client/realtimechannel';
import { LiveCounterData } from './livecounter';
import { LiveMapData, MapEntry, ObjectIdStateData, StateData, ValueStateData } from './livemap';
import { LiveObjectData } from './liveobject';
import { LiveObjects } from './liveobjects';
import { MapSemantics } from './statemessage';
import { MapSemantics, StateMessage, StateObject } from './statemessage';

export interface LiveObjectDataEntry {
objectData: LiveObjectData;
Expand All @@ -24,9 +28,13 @@ export type AnyDataEntry = LiveCounterDataEntry | LiveMapDataEntry;
* @internal
*/
export class SyncLiveObjectsDataPool {
private _client: BaseClient;
private _channel: RealtimeChannel;
private _pool: Map<string, AnyDataEntry>;

constructor(private _liveObjects: LiveObjects) {
this._client = this._liveObjects.getClient();
this._channel = this._liveObjects.getChannel();
this._pool = new Map<string, AnyDataEntry>();
}

Expand All @@ -45,4 +53,82 @@ export class SyncLiveObjectsDataPool {
reset(): void {
this._pool = new Map<string, AnyDataEntry>();
}

applyStateMessages(stateMessages: StateMessage[]): void {
for (const stateMessage of stateMessages) {
if (!stateMessage.object) {
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MAJOR,
'LiveObjects.SyncLiveObjectsDataPool.applyStateMessages()',
`state message is received during SYNC without 'object' field, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`,
);
continue;
}

const stateObject = stateMessage.object;

if (stateObject.counter) {
this._pool.set(stateObject.objectId, this._createLiveCounterDataEntry(stateObject));
} else if (stateObject.map) {
this._pool.set(stateObject.objectId, this._createLiveMapDataEntry(stateObject));
} else {
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MINOR,
'LiveObjects.SyncLiveObjectsDataPool.applyStateMessages()',
`received unsupported state object message during SYNC, expected 'counter' or 'map' to be present; message id: ${stateMessage.id}, channel: ${this._channel.name}`,
);
}
}
}

private _createLiveCounterDataEntry(stateObject: StateObject): LiveCounterDataEntry {
const counter = stateObject.counter!;

const objectData: LiveCounterData = {
data: counter.count,
};
const newEntry: LiveCounterDataEntry = {
created: counter.created,
objectData,
objectType: 'LiveCounter',
regionalTimeserial: stateObject.regionalTimeserial,
};

return newEntry;
}

private _createLiveMapDataEntry(stateObject: StateObject): LiveMapDataEntry {
const map = stateObject.map!;

const objectData: LiveMapData = {
data: new Map<string, MapEntry>(),
};
// need to iterate over entries manually to work around optional parameters from state object entries type
Object.entries(map.entries).forEach(([key, entryFromMessage]) => {
let liveData: StateData;
if (typeof entryFromMessage.data.objectId !== 'undefined') {
liveData = { objectId: entryFromMessage.data.objectId } as ObjectIdStateData;
} else {
liveData = { encoding: entryFromMessage.data.encoding, value: entryFromMessage.data.value } as ValueStateData;
}

const liveDataEntry: MapEntry = {
...entryFromMessage,
data: liveData,
};

objectData.data.set(key, liveDataEntry);
});

const newEntry: LiveMapDataEntry = {
objectData,
objectType: 'LiveMap',
regionalTimeserial: stateObject.regionalTimeserial,
semantics: map.semantics,
};

return newEntry;
}
}

0 comments on commit 66414d3

Please sign in to comment.