From 0ee932d33c09bfe71358c7e37db7dfed88621bf0 Mon Sep 17 00:00:00 2001 From: Steven Lindsay Date: Thu, 17 Oct 2024 11:58:23 +0100 Subject: [PATCH] materialisation: Add new message attributes and actions handling - Added new message attributes, including `action`, `serial`, `refSerial`, `refType`, `updatedAt`, `deletedAt`, and `operation`. Additionally, create functions to map message actions between string and number representations. This update also changes the `fromValues` function to handle action transformations. - Added support to `Publish` so it now correctly sets the `message.action` flag to `message_create` --- ably.d.ts | 94 +++++++++++++++++++++++- src/common/lib/client/realtimechannel.ts | 9 ++- src/common/lib/types/defaultmessage.ts | 11 +-- src/common/lib/types/message.ts | 90 ++++++++++++++++++++++- 4 files changed, 191 insertions(+), 13 deletions(-) diff --git a/ably.d.ts b/ably.d.ts index b8e85c6a4..922b0a7de 100644 --- a/ably.d.ts +++ b/ably.d.ts @@ -2335,12 +2335,104 @@ export interface Message { * Timestamp of when the message was received by Ably, as milliseconds since the Unix epoch. */ timestamp?: number; + /** + * The action type of the message, one of the {@link MessageAction} enum values. + */ + action?: MessageAction; + /** + * This message's unique serial. + */ + serial?: string; + /** + * The serial of the message that this message is a reference to. + */ + refSerial?: string; + /** + * The type of reference this message is, in relation to the message it references. + */ + refType?: string; + /** + * If an `update` operation was applied to this message, this will be the timestamp the update occurred. + */ + updatedAt?: number; + /** + * If a `deletion` operation was applied to this message, this will be the timestamp the deletion occurred. + */ + deletedAt?: number; + /** + * If this message resulted from an operation, this will contain the operation details. + */ + operation?: Operation; } +/** + * Contains the details of an operation, such as update of deletion, supplied by the actioning client. + */ +export interface Operation { + /** + * The client ID of the client that initiated the operation. + */ + clientId?: string; + /** + * The description provided by the client that initiated the operation. + */ + description?: string; + /** + * A JSON object of string key-value pairs that may contain metadata associated with the operation. + */ + metadata?: Record; +} + +/** + * The namespace containing the different types of message actions. + */ +declare namespace MessageActions { + /** + * Message action has not been set. + */ + type MESSAGE_UNSET = 'message_unset'; + /** + * Message action for a newly created message. + */ + type MESSAGE_CREATE = 'message_create'; + /** + * Message action for an updated message. + */ + type MESSAGE_UPDATE = 'message_update'; + /** + * Message action for a deleted message. + */ + type MESSAGE_DELETE = 'message_delete'; + /** + * Message action for a newly created annotation. + */ + type MESSAGE_ANNOTATION_CREATE = 'message_annotation_create'; + /** + * Message action for a deleted annotation. + */ + type MESSAGE_ANNOTATION_DELETE = 'message_annotation_delete'; + /** + * Message action for a meta-message that contains channel occupancy information. + */ + type MESSAGE_META_OCCUPANCY = 'message_meta_occupancy'; +} + +/** + * Describes the possible action types used on an {@link Message}. + */ +export type MessageAction = + | MessageActions.MESSAGE_UNSET + | MessageActions.MESSAGE_CREATE + | MessageActions.MESSAGE_UPDATE + | MessageActions.MESSAGE_DELETE + | MessageActions.MESSAGE_ANNOTATION_CREATE + | MessageActions.MESSAGE_ANNOTATION_DELETE + | MessageActions.MESSAGE_META_OCCUPANCY; + /** * A message received from Ably. */ -export type InboundMessage = Message & Required>; +export type InboundMessage = Message & Required>; /** * Static utilities related to messages. diff --git a/src/common/lib/client/realtimechannel.ts b/src/common/lib/client/realtimechannel.ts index 24ffe62cf..847f50ab1 100644 --- a/src/common/lib/client/realtimechannel.ts +++ b/src/common/lib/client/realtimechannel.ts @@ -15,6 +15,8 @@ import Message, { getMessagesSize, CipherOptions, EncodingDecodingContext, + messageFromValuesArrayWithAction, + messageFromValuesWithAction, } from '../types/message'; import ChannelStateChange from './channelstatechange'; import ErrorInfo, { PartialErrorInfo } from '../types/errorinfo'; @@ -234,8 +236,9 @@ class RealtimeChannel extends EventEmitter { throw this.connectionManager.getError(); } if (argCount == 1) { - if (Utils.isObject(messages)) messages = [messageFromValues(messages)]; - else if (Array.isArray(messages)) messages = messagesFromValuesArray(messages); + // setting the action to `message_create` because this is a standard publish + if (Utils.isObject(messages)) messages = [messageFromValuesWithAction(messages, 'message_create')]; + else if (Array.isArray(messages)) messages = messageFromValuesArrayWithAction(messages, 'message_create'); else throw new ErrorInfo( 'The single-argument form of publish() expects a message object or an array of message objects', @@ -243,7 +246,7 @@ class RealtimeChannel extends EventEmitter { 400, ); } else { - messages = [messageFromValues({ name: args[0], data: args[1] })]; + messages = [messageFromValuesWithAction({ name: args[0], data: args[1] }, 'message_create')]; } const maxMessageSize = this.client.options.maxMessageSize; await encodeMessagesArray(messages, this.channelOptions as CipherOptions); diff --git a/src/common/lib/types/defaultmessage.ts b/src/common/lib/types/defaultmessage.ts index dfc4a02b1..33246b30e 100644 --- a/src/common/lib/types/defaultmessage.ts +++ b/src/common/lib/types/defaultmessage.ts @@ -1,10 +1,11 @@ import Message, { CipherOptions, - fromEncoded, - fromEncodedArray, - encode, decode, + encode, EncodingDecodingContext, + fromEncoded, + fromEncodedArray, + fromValues, } from './message'; import * as API from '../../../../ably'; import Platform from 'common/platform'; @@ -25,8 +26,8 @@ export class DefaultMessage extends Message { } // Used by tests - static fromValues(values: unknown): Message { - return Object.assign(new Message(), values); + static fromValues(values: Message | Record, stringifyAction?: boolean): Message { + return fromValues(values, stringifyAction); } // Used by tests diff --git a/src/common/lib/types/message.ts b/src/common/lib/types/message.ts index 7cc8b80ac..2adf803f0 100644 --- a/src/common/lib/types/message.ts +++ b/src/common/lib/types/message.ts @@ -9,6 +9,33 @@ import * as API from '../../../../ably'; import { IUntypedCryptoStatic } from 'common/types/ICryptoStatic'; import { MsgPack } from 'common/types/msgpack'; +const MessageActionArray: API.MessageAction[] = [ + 'message_unset', + 'message_create', + 'message_update', + 'message_delete', + 'message_annotation_create', + 'message_annotation_delete', + 'message_meta_occupancy', +]; + +function toMessageActionString(actionNumber: number): API.MessageAction { + if (actionNumber in MessageActionArray) { + return MessageActionArray[actionNumber]; + } else { + throw new ErrorInfo(`Unsupported action number: ${actionNumber}`, 40000, 400); + } +} + +function toMessageActionNumber(messageAction: API.MessageAction): number { + for (const [index, value] of MessageActionArray.entries()) { + if (value === messageAction) { + return index; + } + } + throw new ErrorInfo(`Unsupported action string: ${messageAction}`, 40000, 400); +} + export type CipherOptions = { channelCipher: { encrypt: Function; @@ -82,7 +109,7 @@ export async function fromEncoded( encoded: unknown, inputOptions?: API.ChannelOptions, ): Promise { - const msg = fromValues(encoded); + const msg = fromValues(encoded as Message | Record, true); const options = normalizeCipherOptions(Crypto, logger, inputOptions ?? null); /* if decoding fails at any point, catch and return the message decoded to * the fullest extent possible */ @@ -260,7 +287,7 @@ export async function fromResponseBody( } for (let i = 0; i < body.length; i++) { - const msg = (body[i] = fromValues(body[i])); + const msg = (body[i] = fromValues(body[i], true)); try { await decode(msg, options); } catch (e) { @@ -270,14 +297,46 @@ export async function fromResponseBody( return body; } -export function fromValues(values: unknown): Message { +/** + * This is used to return a new message with a given action type set. + * @param values - This is a message-like object, with the values to be set on the new message object. + * @param action - This is the action type that will be applied to the message + * @returns {Message} - This is a new message, with the provided action type set. + * @throws {ErrorInfo} - If the action type is not supported. + */ +export function messageFromValuesWithAction( + values: Message | Record, + action: API.MessageAction, +): Message { + values.action = toMessageActionNumber(action); + return fromValues(values); +} + +/** + * This is used to return an array of new messages, each set with the provided action type. + * It will apply the same action type to ALL messages. + * @param values - This is the array of message-like objects, with the values to be set on the new messages. + * @param action - This is the action type, applied to each message in the array. + * @returns {Message[]} - This is an array of new messages, each with the provided action type set. + */ +export function messageFromValuesArrayWithAction(values: unknown[], action: API.MessageAction): Message[] { + const count = values.length, + result = new Array(count); + for (let i = 0; i < count; i++) result[i] = messageFromValuesWithAction(values[i] as Record, action); + return result; +} + +export function fromValues(values: Message | Record, stringifyAction?: boolean): Message { + if (stringifyAction) { + values.action = toMessageActionString(values.action as number); + } return Object.assign(new Message(), values); } export function fromValuesArray(values: unknown[]): Message[] { const count = values.length, result = new Array(count); - for (let i = 0; i < count; i++) result[i] = fromValues(values[i]); + for (let i = 0; i < count; i++) result[i] = fromValues(values[i] as Record, true); return result; } @@ -304,6 +363,13 @@ class Message { encoding?: string | null; extras?: any; size?: number; + action?: API.MessageAction | number; + serial?: string; + refSerial?: string; + refType?: string; + updatedAt?: number; + deletedAt?: number; + operation?: API.Operation; /** * Overload toJSON() to intercept JSON.stringify() @@ -334,6 +400,14 @@ class Message { connectionId: this.connectionId, connectionKey: this.connectionKey, extras: this.extras, + serial: this.serial, + // If `action` has not been set, it will be set once received by realtime + action: this.action ? toMessageActionNumber(this.action as API.MessageAction) : 0, + refSerial: this.refSerial, + refType: this.refType, + updatedAt: this.updatedAt, + deletedAt: this.deletedAt, + operation: this.operation, encoding, data, }; @@ -355,6 +429,14 @@ class Message { else result += '; data (json)=' + JSON.stringify(this.data); } if (this.extras) result += '; extras=' + JSON.stringify(this.extras); + + if (this.action) result += '; action=' + this.action; + if (this.serial) result += '; serial=' + this.serial; + if (this.refSerial) result += '; refSerial=' + this.refSerial; + if (this.refType) result += '; refType=' + this.refType; + if (this.updatedAt) result += '; updatedAt=' + this.updatedAt; + if (this.deletedAt) result += '; deletedAt=' + this.deletedAt; + if (this.operation) result += '; operation=' + JSON.stringify(this.operation); result += ']'; return result; }