Skip to content

Commit

Permalink
Implement LiveObjects pool init from state SYNC sequence
Browse files Browse the repository at this point in the history
Resolves DTP-949
  • Loading branch information
VeskeR committed Oct 16, 2024
1 parent e770822 commit c4d098b
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 2 deletions.
9 changes: 8 additions & 1 deletion src/plugins/liveobjects/liveobjects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ export class LiveObjects {
return this._liveObjectsPool;
}

/**
* @internal
*/
getChannel(): RealtimeChannel {
return this._channel;
}

/**
* @internal
*/
Expand All @@ -53,7 +60,7 @@ export class LiveObjects {
this._startNewSync(syncId, syncCursor);
}

// TODO: delegate state messages to _syncLiveObjectsDataPool and create new live and data objects
this._syncLiveObjectsDataPool.applyStateMessages(stateMessages);

// if this is the last (or only) message in a sequence of sync updates, end the sync
if (!syncCursor) {
Expand Down
90 changes: 89 additions & 1 deletion src/plugins/liveobjects/syncliveobjectsdatapool.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import type BaseClient from 'common/lib/client/baseclient';
import RealtimeChannel from 'common/lib/client/realtimechannel';
import { LiveCounterData } from './livecounter';
import { LiveMapData, MapEntry, ObjectIdStateData, StateData, ValueStateData } from './livemap';
import { LiveObjectData } from './liveobject';
import { LiveObjects } from './liveobjects';
import { MapSemantics } from './statemessage';
import { MapSemantics, StateMessage, StateObject } from './statemessage';

export interface LiveObjectDataEntry {
objectData: LiveObjectData;
Expand All @@ -24,9 +28,13 @@ export type AnyDataEntry = LiveCounterDataEntry | LiveMapDataEntry;
* @internal
*/
export class SyncLiveObjectsDataPool {
private _client: BaseClient;
private _channel: RealtimeChannel;
private _pool: Map<string, AnyDataEntry>;

constructor(private _liveObjects: LiveObjects) {
this._client = this._liveObjects.getClient();
this._channel = this._liveObjects.getChannel();
this._pool = new Map<string, AnyDataEntry>();
}

Expand All @@ -45,4 +53,84 @@ export class SyncLiveObjectsDataPool {
reset(): void {
this._pool = new Map<string, AnyDataEntry>();
}

applyStateMessages(stateMessages: StateMessage[]): void {
for (const stateMessage of stateMessages) {
if (!stateMessage.object) {
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MAJOR,
'LiveObjects.SyncLiveObjectsDataPool.applyStateMessages()',
`state message is received during SYNC without 'object' field, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`,
);
continue;
}

const stateObject = stateMessage.object;

if (stateObject.counter) {
this._pool.set(stateObject.objectId, this._createLiveCounterDataEntry(stateObject));
} else if (stateObject.map) {
this._pool.set(stateObject.objectId, this._createLiveMapDataEntry(stateObject));
} else {
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MINOR,
'LiveObjects.SyncLiveObjectsDataPool.applyStateMessages()',
`received unsupported state object message during SYNC, expected 'counter' or 'map' to be present; message id: ${stateMessage.id}, channel: ${this._channel.name}`,
);
}
}
}

private _createLiveCounterDataEntry(stateObject: StateObject): LiveCounterDataEntry {
const counter = stateObject.counter!;

const objectData: LiveCounterData = {
data: counter.count ?? 0,
};
const newEntry: LiveCounterDataEntry = {
created: counter.created,
objectData,
objectType: 'LiveCounter',
regionalTimeserial: stateObject.regionalTimeserial,
};

return newEntry;
}

private _createLiveMapDataEntry(stateObject: StateObject): LiveMapDataEntry {
const map = stateObject.map!;

const objectData: LiveMapData = {
data: new Map<string, MapEntry>(),
};
// need to iterate over entries manually to work around optional parameters from state object entries type
Object.entries(map.entries ?? {}).forEach(([key, entryFromMessage]) => {
let liveData: StateData;
if (typeof entryFromMessage.data.objectId !== 'undefined') {
liveData = { objectId: entryFromMessage.data.objectId } as ObjectIdStateData;
} else {
liveData = { encoding: entryFromMessage.data.encoding, value: entryFromMessage.data.value } as ValueStateData;
}

const liveDataEntry: MapEntry = {
...entryFromMessage,
// true only if we received explicit true. otherwise always false
tombstone: entryFromMessage.tombstone === true,
data: liveData,
};

objectData.data.set(key, liveDataEntry);
});

const newEntry: LiveMapDataEntry = {
objectData,
objectType: 'LiveMap',
regionalTimeserial: stateObject.regionalTimeserial,
semantics: map.semantics ?? MapSemantics.LWW,
};

return newEntry;
}
}

0 comments on commit c4d098b

Please sign in to comment.