Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DTP-950] Handle initial state sync sequence #1887

Merged
10 changes: 10 additions & 0 deletions ably.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,14 @@
* The client can receive presence messages.
*/
type PRESENCE_SUBSCRIBE = 'PRESENCE_SUBSCRIBE';
/**
* The client can publish LiveObjects state messages.
*/
type STATE_PUBLISH = 'STATE_PUBLISH';
/**
* The client can receive LiveObjects state messages.
*/
type STATE_SUBSCRIBE = 'STATE_SUBSCRIBE';
/**
* The client is resuming an existing connection.
*/
Expand All @@ -885,6 +893,8 @@
| ChannelModes.SUBSCRIBE
| ChannelModes.PRESENCE
| ChannelModes.PRESENCE_SUBSCRIBE
| ChannelModes.STATE_PUBLISH
| ChannelModes.STATE_SUBSCRIBE
| ChannelModes.ATTACH_RESUME;

/**
Expand Down Expand Up @@ -2295,7 +2305,7 @@
* and {@link ChannelOptions}, or returns the existing channel object.
*
* @experimental This is a preview feature and may change in a future non-major release.
* This experimental method allows you to create custom realtime data feeds by selectively subscribing

Check warning on line 2308 in ably.d.ts

View workflow job for this annotation

GitHub Actions / lint

Expected no lines between tags
* to receive only part of the data from the channel.
* See the [announcement post](https://pages.ably.com/subscription-filters-preview) for more information.
*
Expand Down
4 changes: 3 additions & 1 deletion scripts/moduleReport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { gzip } from 'zlib';
import Table from 'cli-table';

// The maximum size we allow for a minimal useful Realtime bundle (i.e. one that can subscribe to a channel)
const minimalUsefulRealtimeBundleSizeThresholdsKiB = { raw: 99, gzip: 30 };
const minimalUsefulRealtimeBundleSizeThresholdsKiB = { raw: 100, gzip: 31 };

const baseClientNames = ['BaseRest', 'BaseRealtime'];

Expand Down Expand Up @@ -314,6 +314,8 @@ async function checkLiveObjectsPluginFiles() {
'src/plugins/liveobjects/liveobject.ts',
'src/plugins/liveobjects/liveobjects.ts',
'src/plugins/liveobjects/liveobjectspool.ts',
'src/plugins/liveobjects/statemessage.ts',
'src/plugins/liveobjects/syncliveobjectsdatapool.ts',
]);

return checkBundleFiles(pluginBundleInfo, allowedFiles, 100);
Expand Down
3 changes: 3 additions & 0 deletions src/common/lib/client/baserealtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ import { ModularPlugins, RealtimePresencePlugin } from './modularplugins';
import { TransportNames } from 'common/constants/TransportName';
import { TransportImplementations } from 'common/platform';
import Defaults from '../util/defaults';
import type * as LiveObjectsPlugin from 'plugins/liveobjects';

/**
`BaseRealtime` is an export of the tree-shakable version of the SDK, and acts as the base class for the `DefaultRealtime` class exported by the non tree-shakable version.
*/
class BaseRealtime extends BaseClient {
readonly _RealtimePresence: RealtimePresencePlugin | null;
readonly _LiveObjectsPlugin: typeof LiveObjectsPlugin | null;
// Extra transport implementations available to this client, in addition to those in Platform.Transports.bundledImplementations
readonly _additionalTransportImplementations: TransportImplementations;
_channels: any;
Expand Down Expand Up @@ -58,6 +60,7 @@ class BaseRealtime extends BaseClient {

this._additionalTransportImplementations = BaseRealtime.transportImplementationsFromPlugins(this.options.plugins);
this._RealtimePresence = this.options.plugins?.RealtimePresence ?? null;
this._LiveObjectsPlugin = this.options.plugins?.LiveObjects ?? null;
this.connection = new Connection(this, this.options);
this._channels = new Channels(this);
if (this.options.autoConnect !== false) this.connect();
Expand Down
49 changes: 47 additions & 2 deletions src/common/lib/client/realtimechannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import Message, {
fromValuesArray as messagesFromValuesArray,
encodeArray as encodeMessagesArray,
decode as decodeMessage,
decodeData,
getMessagesSize,
CipherOptions,
EncodingDecodingContext,
Expand Down Expand Up @@ -533,12 +534,18 @@ class RealtimeChannel extends EventEmitter {
const resumed = message.hasFlag('RESUMED');
const hasPresence = message.hasFlag('HAS_PRESENCE');
const hasBacklog = message.hasFlag('HAS_BACKLOG');
const hasState = message.hasFlag('HAS_STATE');
if (this.state === 'attached') {
if (!resumed) {
/* On a loss of continuity, the presence set needs to be re-synced */
// we have lost continuity.
// the presence set needs to be re-synced
if (this._presence) {
this._presence.onAttached(hasPresence);
}
// the Live Objects state needs to be re-synced
if (this._liveObjects) {
this._liveObjects.onAttached(hasState);
}
}
const change = new ChannelStateChange(this.state, this.state, resumed, hasBacklog, message.error);
this._allChannelChanges.emit('update', change);
Expand All @@ -549,7 +556,7 @@ class RealtimeChannel extends EventEmitter {
/* RTL5i: re-send DETACH and remain in the 'detaching' state */
this.checkPendingState();
} else {
this.notifyState('attached', message.error, resumed, hasPresence, hasBacklog);
this.notifyState('attached', message.error, resumed, hasPresence, hasBacklog, hasState);
VeskeR marked this conversation as resolved.
Show resolved Hide resolved
}
break;
}
Expand Down Expand Up @@ -613,6 +620,40 @@ class RealtimeChannel extends EventEmitter {
}
break;
}

case actions.STATE_SYNC: {
if (!this._liveObjects) {
return;
}

const { id, connectionId, timestamp } = message;
const options = this.channelOptions;

const stateMessages = message.state ?? [];
for (let i = 0; i < stateMessages.length; i++) {
try {
const stateMessage = stateMessages[i];

await this.client._LiveObjectsPlugin?.StateMessage.decode(stateMessage, options, decodeData);
VeskeR marked this conversation as resolved.
Show resolved Hide resolved

if (!stateMessage.connectionId) stateMessage.connectionId = connectionId;
if (!stateMessage.timestamp) stateMessage.timestamp = timestamp;
if (!stateMessage.id) stateMessage.id = id + ':' + i;
} catch (e) {
Logger.logAction(
this.logger,
Logger.LOG_ERROR,
'RealtimeChannel.processMessage()',
(e as Error).toString(),
);
}
}
VeskeR marked this conversation as resolved.
Show resolved Hide resolved

VeskeR marked this conversation as resolved.
Show resolved Hide resolved
this._liveObjects.handleStateSyncMessage(stateMessages, message.channelSerial);

break;
}

case actions.MESSAGE: {
//RTL17
if (this.state !== 'attached') {
Expand Down Expand Up @@ -743,6 +784,7 @@ class RealtimeChannel extends EventEmitter {
resumed?: boolean,
hasPresence?: boolean,
hasBacklog?: boolean,
hasState?: boolean,
): void {
Logger.logAction(
this.logger,
Expand All @@ -763,6 +805,9 @@ class RealtimeChannel extends EventEmitter {
if (this._presence) {
this._presence.actOnChannelState(state, hasPresence, reason);
}
if (this._liveObjects) {
this._liveObjects.actOnChannelState(state, hasState);
}
VeskeR marked this conversation as resolved.
Show resolved Hide resolved
if (state === 'suspended' && this.connectionManager.state.sendEvents) {
this.startRetryTimer();
} else {
Expand Down
6 changes: 5 additions & 1 deletion src/common/lib/transport/comettransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,11 @@ abstract class CometTransport extends Transport {
if (items && items.length)
for (let i = 0; i < items.length; i++)
this.onProtocolMessage(
protocolMessageFromDeserialized(items[i], this.connectionManager.realtime._RealtimePresence),
protocolMessageFromDeserialized(
items[i],
this.connectionManager.realtime._RealtimePresence,
this.connectionManager.realtime._LiveObjectsPlugin,
),
);
} catch (e) {
Logger.logAction(
Expand Down
3 changes: 2 additions & 1 deletion src/common/lib/transport/connectionmanager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1805,7 +1805,8 @@ class ConnectionManager extends EventEmitter {

Logger.LOG_MICRO,
'ConnectionManager.send()',
'queueing msg; ' + stringifyProtocolMessage(msg, this.realtime._RealtimePresence),
'queueing msg; ' +
stringifyProtocolMessage(msg, this.realtime._RealtimePresence, this.realtime._LiveObjectsPlugin),
);
}
this.queue(msg, callback);
Expand Down
6 changes: 5 additions & 1 deletion src/common/lib/transport/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ class Protocol extends EventEmitter {
Logger.LOG_MICRO,
'Protocol.send()',
'sending msg; ' +
stringifyProtocolMessage(pendingMessage.message, this.transport.connectionManager.realtime._RealtimePresence),
stringifyProtocolMessage(
pendingMessage.message,
this.transport.connectionManager.realtime._RealtimePresence,
this.transport.connectionManager.realtime._LiveObjectsPlugin,
),
VeskeR marked this conversation as resolved.
Show resolved Hide resolved
);
}
pendingMessage.sendAttempted = true;
Expand Down
6 changes: 5 additions & 1 deletion src/common/lib/transport/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,11 @@ abstract class Transport extends EventEmitter {
'received on ' +
this.shortName +
': ' +
stringifyProtocolMessage(message, this.connectionManager.realtime._RealtimePresence) +
stringifyProtocolMessage(
message,
this.connectionManager.realtime._RealtimePresence,
this.connectionManager.realtime._LiveObjectsPlugin,
) +
VeskeR marked this conversation as resolved.
Show resolved Hide resolved
'; connectionId = ' +
this.connectionManager.connectionId,
);
Expand Down
1 change: 1 addition & 0 deletions src/common/lib/transport/websockettransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class WebSocketTransport extends Transport {
data,
this.connectionManager.realtime._MsgPack,
this.connectionManager.realtime._RealtimePresence,
this.connectionManager.realtime._LiveObjectsPlugin,
this.format,
),
);
Expand Down
69 changes: 51 additions & 18 deletions src/common/lib/types/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,17 +154,36 @@ export async function decode(
message: Message | PresenceMessage,
inputContext: CipherOptions | EncodingDecodingContext | ChannelOptions,
): Promise<void> {
const { data, encoding, error } = await decodeData(message.data, message.encoding, inputContext);
message.data = data;
message.encoding = encoding;

if (error) {
throw error;
}
}
VeskeR marked this conversation as resolved.
Show resolved Hide resolved

export async function decodeData(
data: any,
encoding: string | null | undefined,
inputContext: CipherOptions | EncodingDecodingContext | ChannelOptions,
): Promise<{
error?: ErrorInfo;
data: any;
encoding: string | null | undefined;
}> {
const context = normaliseContext(inputContext);
let lastPayload = data;
let decodedData = data;
let finalEncoding: string | null | undefined = encoding;
let decodingError: ErrorInfo | undefined = undefined;

let lastPayload = message.data;
const encoding = message.encoding;
if (encoding) {
const xforms = encoding.split('/');
let lastProcessedEncodingIndex,
encodingsToProcess = xforms.length,
data = message.data;

let lastProcessedEncodingIndex;
let encodingsToProcess = xforms.length;
let xform = '';

try {
while ((lastProcessedEncodingIndex = encodingsToProcess) > 0) {
// eslint-disable-next-line security/detect-unsafe-regex
Expand All @@ -173,16 +192,16 @@ export async function decode(
xform = match[1];
switch (xform) {
case 'base64':
data = Platform.BufferUtils.base64Decode(String(data));
decodedData = Platform.BufferUtils.base64Decode(String(decodedData));
if (lastProcessedEncodingIndex == xforms.length) {
lastPayload = data;
lastPayload = decodedData;
}
continue;
case 'utf-8':
data = Platform.BufferUtils.utf8Decode(data);
decodedData = Platform.BufferUtils.utf8Decode(decodedData);
continue;
case 'json':
data = JSON.parse(data);
decodedData = JSON.parse(decodedData);
continue;
case 'cipher':
if (
Expand All @@ -196,7 +215,7 @@ export async function decode(
if (xformAlgorithm != cipher.algorithm) {
throw new Error('Unable to decrypt message with given cipher; incompatible cipher params');
}
data = await cipher.decrypt(data);
decodedData = await cipher.decrypt(decodedData);
continue;
} else {
throw new Error('Unable to decrypt message; not an encrypted channel');
Expand All @@ -220,10 +239,12 @@ export async function decode(

// vcdiff expects Uint8Arrays, can't copy with ArrayBuffers.
const deltaBaseBuffer = Platform.BufferUtils.toBuffer(deltaBase as Buffer);
data = Platform.BufferUtils.toBuffer(data);
decodedData = Platform.BufferUtils.toBuffer(decodedData);

data = Platform.BufferUtils.arrayBufferViewToBuffer(context.plugins.vcdiff.decode(data, deltaBaseBuffer));
lastPayload = data;
decodedData = Platform.BufferUtils.arrayBufferViewToBuffer(
context.plugins.vcdiff.decode(decodedData, deltaBaseBuffer),
);
lastPayload = decodedData;
} catch (e) {
throw new ErrorInfo('Vcdiff delta decode failed with ' + e, 40018, 400);
}
Expand All @@ -234,18 +255,30 @@ export async function decode(
}
} catch (e) {
const err = e as ErrorInfo;
throw new ErrorInfo(
'Error processing the ' + xform + ' encoding, decoder returned ‘' + err.message + '’',
decodingError = new ErrorInfo(
`Error processing the ${xform} encoding, decoder returned ‘${err.message}’`,
err.code || 40013,
400,
);
} finally {
message.encoding =
finalEncoding =
VeskeR marked this conversation as resolved.
Show resolved Hide resolved
(lastProcessedEncodingIndex as number) <= 0 ? null : xforms.slice(0, lastProcessedEncodingIndex).join('/');
message.data = data;
}
}

if (decodingError) {
return {
error: decodingError,
data: decodedData,
encoding: finalEncoding,
};
}
VeskeR marked this conversation as resolved.
Show resolved Hide resolved

context.baseEncodedPreviousPayload = lastPayload;
return {
data: decodedData,
encoding: finalEncoding,
};
}

export async function fromResponseBody(
Expand Down
Loading
Loading