From 10e13aa59dd6f4ec23780f0aaa8187b04fb4d378 Mon Sep 17 00:00:00 2001 From: Steven Lindsay Date: Thu, 17 Oct 2024 11:58:23 +0100 Subject: [PATCH] materialisation: Add new message attributes and actions handling - Added new message attributes, including `action`, `serial`, `refSerial`, `refType`, `updatedAt`, `deletedAt`, and `operation`. Additionally, create functions to map message actions between string and number representations. This update also changes the `fromValues` function to handle action transformations. - Added support to `Publish` so it now correctly sets the `message.action` flag to `message_create` --- ably.d.ts | 94 ++++++++++++++++++++++- scripts/moduleReport.ts | 2 +- src/common/lib/client/realtimechannel.ts | 11 +-- src/common/lib/client/restchannel.ts | 10 +-- src/common/lib/types/defaultmessage.ts | 11 +-- src/common/lib/types/message.ts | 98 +++++++++++++++++++++++- src/common/lib/types/protocolmessage.ts | 3 +- test/realtime/crypto.test.js | 2 + 8 files changed, 209 insertions(+), 22 deletions(-) diff --git a/ably.d.ts b/ably.d.ts index b8e85c6a4..c02d62fa4 100644 --- a/ably.d.ts +++ b/ably.d.ts @@ -2335,12 +2335,104 @@ export interface Message { * Timestamp of when the message was received by Ably, as milliseconds since the Unix epoch. */ timestamp?: number; + /** + * The action type of the message, one of the {@link MessageAction} enum values. + */ + action?: MessageAction; + /** + * This message's unique serial. + */ + serial?: string; + /** + * The serial of the message that this message is a reference to. + */ + refSerial?: string; + /** + * The type of reference this message is, in relation to the message it references. + */ + refType?: string; + /** + * If an `update` operation was applied to this message, this will be the timestamp the update occurred. + */ + updatedAt?: number; + /** + * If a `deletion` operation was applied to this message, this will be the timestamp the deletion occurred. + */ + deletedAt?: number; + /** + * If this message resulted from an operation, this will contain the operation details. + */ + operation?: Operation; } +/** + * Contains the details of an operation, such as update or deletion, supplied by the actioning client. + */ +export interface Operation { + /** + * The client ID of the client that initiated the operation. + */ + clientId?: string; + /** + * The description provided by the client that initiated the operation. + */ + description?: string; + /** + * A JSON object of string key-value pairs that may contain metadata associated with the operation. + */ + metadata?: Record; +} + +/** + * The namespace containing the different types of message actions. + */ +declare namespace MessageActions { + /** + * Message action has not been set. + */ + type MESSAGE_UNSET = 'message_unset'; + /** + * Message action for a newly created message. + */ + type MESSAGE_CREATE = 'message_create'; + /** + * Message action for an updated message. + */ + type MESSAGE_UPDATE = 'message_update'; + /** + * Message action for a deleted message. + */ + type MESSAGE_DELETE = 'message_delete'; + /** + * Message action for a newly created annotation. + */ + type MESSAGE_ANNOTATION_CREATE = 'message_annotation_create'; + /** + * Message action for a deleted annotation. + */ + type MESSAGE_ANNOTATION_DELETE = 'message_annotation_delete'; + /** + * Message action for a meta-message that contains channel occupancy information. + */ + type MESSAGE_META_OCCUPANCY = 'message_meta_occupancy'; +} + +/** + * Describes the possible action types used on an {@link Message}. + */ +export type MessageAction = + | MessageActions.MESSAGE_UNSET + | MessageActions.MESSAGE_CREATE + | MessageActions.MESSAGE_UPDATE + | MessageActions.MESSAGE_DELETE + | MessageActions.MESSAGE_ANNOTATION_CREATE + | MessageActions.MESSAGE_ANNOTATION_DELETE + | MessageActions.MESSAGE_META_OCCUPANCY; + /** * A message received from Ably. */ -export type InboundMessage = Message & Required>; +export type InboundMessage = Message & Required>; /** * Static utilities related to messages. diff --git a/scripts/moduleReport.ts b/scripts/moduleReport.ts index 25daba894..aacec5a00 100644 --- a/scripts/moduleReport.ts +++ b/scripts/moduleReport.ts @@ -6,7 +6,7 @@ import { gzip } from 'zlib'; import Table from 'cli-table'; // The maximum size we allow for a minimal useful Realtime bundle (i.e. one that can subscribe to a channel) -const minimalUsefulRealtimeBundleSizeThresholdsKiB = { raw: 98, gzip: 30 }; +const minimalUsefulRealtimeBundleSizeThresholdsKiB = { raw: 100, gzip: 31 }; const baseClientNames = ['BaseRest', 'BaseRealtime']; diff --git a/src/common/lib/client/realtimechannel.ts b/src/common/lib/client/realtimechannel.ts index 24ffe62cf..f3cd2bd86 100644 --- a/src/common/lib/client/realtimechannel.ts +++ b/src/common/lib/client/realtimechannel.ts @@ -8,13 +8,13 @@ import * as Utils from '../util/utils'; import Logger from '../util/logger'; import RealtimePresence from './realtimepresence'; import Message, { - fromValues as messageFromValues, - fromValuesArray as messagesFromValuesArray, encodeArray as encodeMessagesArray, decode as decodeMessage, getMessagesSize, CipherOptions, EncodingDecodingContext, + messageFromValuesArrayWithAction, + messageFromValuesWithAction, } from '../types/message'; import ChannelStateChange from './channelstatechange'; import ErrorInfo, { PartialErrorInfo } from '../types/errorinfo'; @@ -234,8 +234,9 @@ class RealtimeChannel extends EventEmitter { throw this.connectionManager.getError(); } if (argCount == 1) { - if (Utils.isObject(messages)) messages = [messageFromValues(messages)]; - else if (Array.isArray(messages)) messages = messagesFromValuesArray(messages); + // setting the action to `message_create` because this is a standard publish + if (Utils.isObject(messages)) messages = [messageFromValuesWithAction(messages, 'message_create')]; + else if (Array.isArray(messages)) messages = messageFromValuesArrayWithAction(messages, 'message_create'); else throw new ErrorInfo( 'The single-argument form of publish() expects a message object or an array of message objects', @@ -243,7 +244,7 @@ class RealtimeChannel extends EventEmitter { 400, ); } else { - messages = [messageFromValues({ name: args[0], data: args[1] })]; + messages = [messageFromValuesWithAction({ name: args[0], data: args[1] }, 'message_create')]; } const maxMessageSize = this.client.options.maxMessageSize; await encodeMessagesArray(messages, this.channelOptions as CipherOptions); diff --git a/src/common/lib/client/restchannel.ts b/src/common/lib/client/restchannel.ts index 3138e8611..31f45d1ee 100644 --- a/src/common/lib/client/restchannel.ts +++ b/src/common/lib/client/restchannel.ts @@ -2,12 +2,12 @@ import * as Utils from '../util/utils'; import Logger from '../util/logger'; import RestPresence from './restpresence'; import Message, { - fromValues as messageFromValues, - fromValuesArray as messagesFromValuesArray, encodeArray as encodeMessagesArray, serialize as serializeMessage, getMessagesSize, CipherOptions, + messageFromValuesWithAction, + messageFromValuesArrayWithAction, } from '../types/message'; import ErrorInfo from '../types/errorinfo'; import { PaginatedResult } from './paginatedresource'; @@ -74,13 +74,13 @@ class RestChannel { if (typeof first === 'string' || first === null) { /* (name, data, ...) */ - messages = [messageFromValues({ name: first, data: second })]; + messages = [messageFromValuesWithAction({ name: first, data: second }, 'message_create')]; params = args[2]; } else if (Utils.isObject(first)) { - messages = [messageFromValues(first)]; + messages = [messageFromValuesWithAction(first, 'message_create')]; params = args[1]; } else if (Array.isArray(first)) { - messages = messagesFromValuesArray(first); + messages = messageFromValuesArrayWithAction(first, 'message_create'); params = args[1]; } else { throw new ErrorInfo( diff --git a/src/common/lib/types/defaultmessage.ts b/src/common/lib/types/defaultmessage.ts index dfc4a02b1..79fffccf5 100644 --- a/src/common/lib/types/defaultmessage.ts +++ b/src/common/lib/types/defaultmessage.ts @@ -1,10 +1,11 @@ import Message, { CipherOptions, - fromEncoded, - fromEncodedArray, - encode, decode, + encode, EncodingDecodingContext, + fromEncoded, + fromEncodedArray, + fromValues, } from './message'; import * as API from '../../../../ably'; import Platform from 'common/platform'; @@ -25,8 +26,8 @@ export class DefaultMessage extends Message { } // Used by tests - static fromValues(values: unknown): Message { - return Object.assign(new Message(), values); + static fromValues(values: Message | Record, options?: { stringifyAction?: boolean }): Message { + return fromValues(values, options); } // Used by tests diff --git a/src/common/lib/types/message.ts b/src/common/lib/types/message.ts index 7cc8b80ac..4864cfb3b 100644 --- a/src/common/lib/types/message.ts +++ b/src/common/lib/types/message.ts @@ -9,6 +9,40 @@ import * as API from '../../../../ably'; import { IUntypedCryptoStatic } from 'common/types/ICryptoStatic'; import { MsgPack } from 'common/types/msgpack'; +const MessageActionArray: API.MessageAction[] = [ + 'message_unset', + 'message_create', + 'message_update', + 'message_delete', + 'message_annotation_create', + 'message_annotation_delete', + 'message_meta_occupancy', +]; + +function toMessageActionString(actionNumber?: number): API.MessageAction { + if (actionNumber === undefined) { + // Allow for the case where the action is not set + return 'message_unset'; + } + if (actionNumber in MessageActionArray) { + return MessageActionArray[actionNumber]; + } + throw new ErrorInfo('Unknown message action number: ' + actionNumber, 40000, 400); +} + +function toMessageActionNumber(messageAction?: API.MessageAction): number { + if (messageAction === undefined) { + // Allow for the case where the action is not set + return 0; + } + for (const [index, value] of MessageActionArray.entries()) { + if (value === messageAction) { + return index; + } + } + throw new ErrorInfo('Unknown message action: ' + messageAction, 40000, 400); +} + export type CipherOptions = { channelCipher: { encrypt: Function; @@ -82,7 +116,7 @@ export async function fromEncoded( encoded: unknown, inputOptions?: API.ChannelOptions, ): Promise { - const msg = fromValues(encoded); + const msg = fromValues(encoded as Message | Record, { stringifyAction: true }); const options = normalizeCipherOptions(Crypto, logger, inputOptions ?? null); /* if decoding fails at any point, catch and return the message decoded to * the fullest extent possible */ @@ -260,7 +294,7 @@ export async function fromResponseBody( } for (let i = 0; i < body.length; i++) { - const msg = (body[i] = fromValues(body[i])); + const msg = (body[i] = fromValues(body[i], { stringifyAction: true })); try { await decode(msg, options); } catch (e) { @@ -270,14 +304,48 @@ export async function fromResponseBody( return body; } -export function fromValues(values: unknown): Message { +/** + * This is used to return a new message with a given action type set. + * @param values - This is a message-like object, with the values to be set on the new message object. + * @param action - This is the action type that will be applied to the message + * @returns {Message} - This is a new message, with the provided action type set. + */ +export function messageFromValuesWithAction( + values: Message | Record, + action: API.MessageAction, +): Message { + return fromValues({ ...values, action: action }); +} + +/** + * This is used to return an array of new messages, each set with the provided action type. + * It will apply the same action type to ALL messages. + * @param values - This is the array of message-like objects, with the values to be set on the new messages. + * @param action - This is the action type, applied to each message in the array. + * @returns {Message[]} - This is an array of new messages, each with the provided action type set. + */ +export function messageFromValuesArrayWithAction(values: unknown[], action: API.MessageAction): Message[] { + const count = values.length, + result = new Array(count); + for (let i = 0; i < count; i++) result[i] = messageFromValuesWithAction(values[i] as Record, action); + return result; +} + +export function fromValues( + values: Message | Record, + options?: { stringifyAction?: boolean }, +): Message { + const stringifyAction = options?.stringifyAction; + if (stringifyAction) { + return Object.assign(new Message(), { ...values, action: toMessageActionString(values.action as number) }); + } return Object.assign(new Message(), values); } export function fromValuesArray(values: unknown[]): Message[] { const count = values.length, result = new Array(count); - for (let i = 0; i < count; i++) result[i] = fromValues(values[i]); + for (let i = 0; i < count; i++) result[i] = fromValues(values[i] as Record); return result; } @@ -304,6 +372,13 @@ class Message { encoding?: string | null; extras?: any; size?: number; + action?: API.MessageAction | number | undefined; + serial?: string; + refSerial?: string; + refType?: string; + updatedAt?: number; + deletedAt?: number; + operation?: API.Operation; /** * Overload toJSON() to intercept JSON.stringify() @@ -334,6 +409,13 @@ class Message { connectionId: this.connectionId, connectionKey: this.connectionKey, extras: this.extras, + serial: this.serial, + action: toMessageActionNumber(this.action as API.MessageAction), + refSerial: this.refSerial, + refType: this.refType, + updatedAt: this.updatedAt, + deletedAt: this.deletedAt, + operation: this.operation, encoding, data, }; @@ -355,6 +437,14 @@ class Message { else result += '; data (json)=' + JSON.stringify(this.data); } if (this.extras) result += '; extras=' + JSON.stringify(this.extras); + + if (this.action) result += '; action=' + this.action; + if (this.serial) result += '; serial=' + this.serial; + if (this.refSerial) result += '; refSerial=' + this.refSerial; + if (this.refType) result += '; refType=' + this.refType; + if (this.updatedAt) result += '; updatedAt=' + this.updatedAt; + if (this.deletedAt) result += '; deletedAt=' + this.deletedAt; + if (this.operation) result += '; operation=' + JSON.stringify(this.operation); result += ']'; return result; } diff --git a/src/common/lib/types/protocolmessage.ts b/src/common/lib/types/protocolmessage.ts index eaa622a8d..b78977f89 100644 --- a/src/common/lib/types/protocolmessage.ts +++ b/src/common/lib/types/protocolmessage.ts @@ -83,7 +83,8 @@ export function fromDeserialized( const error = deserialized.error; if (error) deserialized.error = ErrorInfo.fromValues(error as ErrorInfo); const messages = deserialized.messages as Message[]; - if (messages) for (let i = 0; i < messages.length; i++) messages[i] = messageFromValues(messages[i]); + if (messages) + for (let i = 0; i < messages.length; i++) messages[i] = messageFromValues(messages[i], { stringifyAction: true }); const presence = presenceMessagePlugin ? (deserialized.presence as PresenceMessage[]) : undefined; if (presenceMessagePlugin) { diff --git a/test/realtime/crypto.test.js b/test/realtime/crypto.test.js index 14cb65330..18df2f11e 100644 --- a/test/realtime/crypto.test.js +++ b/test/realtime/crypto.test.js @@ -395,6 +395,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async helper.recordPrivateApi('call.msgpack.decode'); var messageFromMsgpack = Message.fromValues( msgpack.decode(BufferUtils.base64Decode(msgpackEncodedMessage)), + { stringifyAction: true }, ); try { @@ -439,6 +440,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async helper.recordPrivateApi('call.msgpack.decode'); var messageFromMsgpack = Message.fromValues( msgpack.decode(BufferUtils.base64Decode(msgpackEncodedMessage)), + { stringifyAction: true }, ); try {