Skip to content

Commit

Permalink
Preparation for applying incoming state operations
Browse files Browse the repository at this point in the history
- add static LiveMap.liveMapDataFromMapEntries() method with logic
previously used in SyncLiveObjectsDataPool. This method will be used
in other places to create LiveMapData
- minor refactoring and log improvements
  • Loading branch information
VeskeR committed Oct 25, 2024
1 parent 9b83e94 commit 9f98ef6
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 38 deletions.
2 changes: 1 addition & 1 deletion src/common/lib/client/realtimechannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ class RealtimeChannel extends EventEmitter {
}
}

this._liveObjects.handleStateSyncMessage(stateMessages, message.channelSerial);
this._liveObjects.handleStateSyncMessages(stateMessages, message.channelSerial);

break;
}
Expand Down
30 changes: 30 additions & 0 deletions src/plugins/liveobjects/livemap.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import type BaseClient from 'common/lib/client/baseclient';
import { LiveObject, LiveObjectData } from './liveobject';
import { LiveObjects } from './liveobjects';
import {
MapSemantics,
StateMap,
StateMapEntry,
StateMapOp,
StateMessage,
StateOperation,
Expand Down Expand Up @@ -48,6 +50,34 @@ export class LiveMap extends LiveObject<LiveMapData> {
super(liveObjects, initialData, objectId);
}

static liveMapDataFromMapEntries(client: BaseClient, entries: Record<string, StateMapEntry>): LiveMapData {
const liveMapData: LiveMapData = {
data: new Map<string, MapEntry>(),
};

// need to iterate over entries manually to work around optional parameters from state object entries type
Object.entries(entries ?? {}).forEach(([key, entry]) => {
let liveData: StateData;
if (typeof entry.data.objectId !== 'undefined') {
liveData = { objectId: entry.data.objectId } as ObjectIdStateData;
} else {
liveData = { encoding: entry.data.encoding, value: entry.data.value } as ValueStateData;
}

const liveDataEntry: MapEntry = {
...entry,
timeserial: DefaultTimeserial.calculateTimeserial(client, entry.timeserial),
// true only if we received explicit true. otherwise always false
tombstone: entry.tombstone === true,
data: liveData,
};

liveMapData.data.set(key, liveDataEntry);
});

return liveMapData;
}

/**
* Returns the value associated with the specified key in the underlying Map object.
* If no element is associated with the specified key, undefined is returned.
Expand Down
8 changes: 4 additions & 4 deletions src/plugins/liveobjects/liveobjects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ export class LiveObjects {
/**
* @internal
*/
handleStateSyncMessage(stateMessages: StateMessage[], syncChannelSerial: string | null | undefined): void {
handleStateSyncMessages(stateMessages: StateMessage[], syncChannelSerial: string | null | undefined): void {
const { syncId, syncCursor } = this._parseSyncChannelSerial(syncChannelSerial);
if (this._currentSyncId !== syncId) {
this._startNewSync(syncId, syncCursor);
}

this._syncLiveObjectsDataPool.applyStateMessages(stateMessages);
this._syncLiveObjectsDataPool.applyStateSyncMessages(stateMessages);

// if this is the last (or only) message in a sequence of sync updates, end the sync
if (!syncCursor) {
Expand All @@ -93,7 +93,7 @@ export class LiveObjects {
);

if (hasState) {
this._startNewSync(undefined);
this._startNewSync();
} else {
// no HAS_STATE flag received on attach, can end SYNC sequence immediately
// and treat it as no state on a channel
Expand Down Expand Up @@ -190,7 +190,7 @@ export class LiveObjects {
break;

default:
throw new this._client.ErrorInfo(`Unknown live object type: ${objectType}`, 40000, 400);
throw new this._client.ErrorInfo(`Unknown live object type: ${objectType}`, 50000, 500);
}
newObject.setRegionalTimeserial(entry.regionalTimeserial);

Expand Down
2 changes: 1 addition & 1 deletion src/plugins/liveobjects/statemessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ export class StateMessage {
operation?: StateOperation;
/** Describes the instantaneous state of an object. */
object?: StateObject;
/** Timeserial format */
/** Timeserial format. Contains the origin timeserial for this state message. */
serial?: string;

constructor(private _platform: typeof Platform) {}
Expand Down
41 changes: 9 additions & 32 deletions src/plugins/liveobjects/syncliveobjectsdatapool.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import type BaseClient from 'common/lib/client/baseclient';
import RealtimeChannel from 'common/lib/client/realtimechannel';
import type RealtimeChannel from 'common/lib/client/realtimechannel';
import { LiveCounterData } from './livecounter';
import { LiveMapData, MapEntry, ObjectIdStateData, StateData, ValueStateData } from './livemap';
import { LiveMap } from './livemap';
import { LiveObjectData } from './liveobject';
import { LiveObjects } from './liveobjects';
import { MapSemantics, StateMessage, StateObject } from './statemessage';
import { DefaultTimeserial } from './timeserial';

export interface LiveObjectDataEntry {
objectData: LiveObjectData;
Expand Down Expand Up @@ -55,14 +54,14 @@ export class SyncLiveObjectsDataPool {
this._pool.clear();
}

applyStateMessages(stateMessages: StateMessage[]): void {
applyStateSyncMessages(stateMessages: StateMessage[]): void {
for (const stateMessage of stateMessages) {
if (!stateMessage.object) {
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MAJOR,
'LiveObjects.SyncLiveObjectsDataPool.applyStateMessages()',
`state message is received during SYNC without 'object' field, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`,
'LiveObjects.SyncLiveObjectsDataPool.applyStateSyncMessages()',
`state object message is received during SYNC without 'object' field, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`,
);
continue;
}
Expand All @@ -76,9 +75,9 @@ export class SyncLiveObjectsDataPool {
} else {
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MINOR,
'LiveObjects.SyncLiveObjectsDataPool.applyStateMessages()',
`received unsupported state object message during SYNC, expected 'counter' or 'map' to be present; message id: ${stateMessage.id}, channel: ${this._channel.name}`,
this._client.Logger.LOG_MAJOR,
'LiveObjects.SyncLiveObjectsDataPool.applyStateSyncMessages()',
`received unsupported state object message during SYNC, expected 'counter' or 'map' to be present, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`,
);
}
}
Expand All @@ -102,29 +101,7 @@ export class SyncLiveObjectsDataPool {

private _createLiveMapDataEntry(stateObject: StateObject): LiveMapDataEntry {
const map = stateObject.map!;

const objectData: LiveMapData = {
data: new Map<string, MapEntry>(),
};
// need to iterate over entries manually to work around optional parameters from state object entries type
Object.entries(map.entries ?? {}).forEach(([key, entryFromMessage]) => {
let liveData: StateData;
if (typeof entryFromMessage.data.objectId !== 'undefined') {
liveData = { objectId: entryFromMessage.data.objectId } as ObjectIdStateData;
} else {
liveData = { encoding: entryFromMessage.data.encoding, value: entryFromMessage.data.value } as ValueStateData;
}

const liveDataEntry: MapEntry = {
...entryFromMessage,
timeserial: DefaultTimeserial.calculateTimeserial(this._client, entryFromMessage.timeserial),
// true only if we received explicit true. otherwise always false
tombstone: entryFromMessage.tombstone === true,
data: liveData,
};

objectData.data.set(key, liveDataEntry);
});
const objectData = LiveMap.liveMapDataFromMapEntries(this._client, map.entries ?? {});

const newEntry: LiveMapDataEntry = {
objectData,
Expand Down

0 comments on commit 9f98ef6

Please sign in to comment.