Skip to content

Commit

Permalink
materialisation: Add new message attributes and actions handling
Browse files Browse the repository at this point in the history
- Added new message attributes, including `action`, `serial`, `refSerial`, `refType`, `updatedAt`, `deletedAt`, and `operation`.
Additionally, create functions to map message actions between string and number representations.
This update also changes the `fromValues` function to handle action transformations.
- Added support to `Publish` so it now correctly sets the `message.action` flag to `message_create`
  • Loading branch information
splindsay-92 committed Oct 21, 2024
1 parent 0bd7aa5 commit 5825ad7
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 21 deletions.
94 changes: 93 additions & 1 deletion ably.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2335,12 +2335,104 @@ export interface Message {
* Timestamp of when the message was received by Ably, as milliseconds since the Unix epoch.
*/
timestamp?: number;
/**
* The action type of the message, one of the {@link MessageAction} enum values.
*/
action?: MessageAction;
/**
* This message's unique serial.
*/
serial?: string;
/**
* The serial of the message that this message is a reference to.
*/
refSerial?: string;
/**
* The type of reference this message is, in relation to the message it references.
*/
refType?: string;
/**
* If an `update` operation was applied to this message, this will be the timestamp the update occurred.
*/
updatedAt?: number;
/**
* If a `deletion` operation was applied to this message, this will be the timestamp the deletion occurred.
*/
deletedAt?: number;
/**
* If this message resulted from an operation, this will contain the operation details.
*/
operation?: Operation;
}

/**
* Contains the details of an operation, such as update of deletion, supplied by the actioning client.
*/
export interface Operation {
/**
* The client ID of the client that initiated the operation.
*/
clientId?: string;
/**
* The description provided by the client that initiated the operation.
*/
description?: string;
/**
* A JSON object of string key-value pairs that may contain metadata associated with the operation.
*/
metadata?: Record<string, string>;
}

/**
* The namespace containing the different types of message actions.
*/
declare namespace MessageActions {
/**
* Message action has not been set.
*/
type MESSAGE_UNSET = 'message_unset';
/**
* Message action for a newly created message.
*/
type MESSAGE_CREATE = 'message_create';
/**
* Message action for an updated message.
*/
type MESSAGE_UPDATE = 'message_update';
/**
* Message action for a deleted message.
*/
type MESSAGE_DELETE = 'message_delete';
/**
* Message action for a newly created annotation.
*/
type MESSAGE_ANNOTATION_CREATE = 'message_annotation_create';
/**
* Message action for a deleted annotation.
*/
type MESSAGE_ANNOTATION_DELETE = 'message_annotation_delete';
/**
* Message action for a meta-message that contains channel occupancy information.
*/
type MESSAGE_META_OCCUPANCY = 'message_meta_occupancy';
}

/**
* Describes the possible action types used on an {@link Message}.
*/
export type MessageAction =
| MessageActions.MESSAGE_UNSET
| MessageActions.MESSAGE_CREATE
| MessageActions.MESSAGE_UPDATE
| MessageActions.MESSAGE_DELETE
| MessageActions.MESSAGE_ANNOTATION_CREATE
| MessageActions.MESSAGE_ANNOTATION_DELETE
| MessageActions.MESSAGE_META_OCCUPANCY;

/**
* A message received from Ably.
*/
export type InboundMessage = Message & Required<Pick<Message, 'id' | 'timestamp'>>;
export type InboundMessage = Message & Required<Pick<Message, 'id' | 'timestamp' | 'serial' | 'action'>>;

/**
* Static utilities related to messages.
Expand Down
11 changes: 6 additions & 5 deletions src/common/lib/client/realtimechannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import * as Utils from '../util/utils';
import Logger from '../util/logger';
import RealtimePresence from './realtimepresence';
import Message, {
fromValues as messageFromValues,
fromValuesArray as messagesFromValuesArray,
encodeArray as encodeMessagesArray,
decode as decodeMessage,
getMessagesSize,
CipherOptions,
EncodingDecodingContext,
messageFromValuesArrayWithAction,
messageFromValuesWithAction,
} from '../types/message';
import ChannelStateChange from './channelstatechange';
import ErrorInfo, { PartialErrorInfo } from '../types/errorinfo';
Expand Down Expand Up @@ -234,16 +234,17 @@ class RealtimeChannel extends EventEmitter {
throw this.connectionManager.getError();
}
if (argCount == 1) {
if (Utils.isObject(messages)) messages = [messageFromValues(messages)];
else if (Array.isArray(messages)) messages = messagesFromValuesArray(messages);
// setting the action to `message_create` because this is a standard publish
if (Utils.isObject(messages)) messages = [messageFromValuesWithAction(messages, 'message_create')];
else if (Array.isArray(messages)) messages = messageFromValuesArrayWithAction(messages, 'message_create');
else
throw new ErrorInfo(
'The single-argument form of publish() expects a message object or an array of message objects',
40013,
400,
);
} else {
messages = [messageFromValues({ name: args[0], data: args[1] })];
messages = [messageFromValuesWithAction({ name: args[0], data: args[1] }, 'message_create')];
}
const maxMessageSize = this.client.options.maxMessageSize;
await encodeMessagesArray(messages, this.channelOptions as CipherOptions);
Expand Down
10 changes: 5 additions & 5 deletions src/common/lib/client/restchannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ import * as Utils from '../util/utils';
import Logger from '../util/logger';
import RestPresence from './restpresence';
import Message, {
fromValues as messageFromValues,
fromValuesArray as messagesFromValuesArray,
encodeArray as encodeMessagesArray,
serialize as serializeMessage,
getMessagesSize,
CipherOptions,
messageFromValuesWithAction,
messageFromValuesArrayWithAction,
} from '../types/message';
import ErrorInfo from '../types/errorinfo';
import { PaginatedResult } from './paginatedresource';
Expand Down Expand Up @@ -74,13 +74,13 @@ class RestChannel {

if (typeof first === 'string' || first === null) {
/* (name, data, ...) */
messages = [messageFromValues({ name: first, data: second })];
messages = [messageFromValuesWithAction({ name: first, data: second }, 'message_create')];
params = args[2];
} else if (Utils.isObject(first)) {
messages = [messageFromValues(first)];
messages = [messageFromValuesWithAction(first, 'message_create')];
params = args[1];
} else if (Array.isArray(first)) {
messages = messagesFromValuesArray(first);
messages = messageFromValuesArrayWithAction(first, 'message_create');
params = args[1];
} else {
throw new ErrorInfo(
Expand Down
11 changes: 6 additions & 5 deletions src/common/lib/types/defaultmessage.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import Message, {
CipherOptions,
fromEncoded,
fromEncodedArray,
encode,
decode,
encode,
EncodingDecodingContext,
fromEncoded,
fromEncodedArray,
fromValues,
} from './message';
import * as API from '../../../../ably';
import Platform from 'common/platform';
Expand All @@ -25,8 +26,8 @@ export class DefaultMessage extends Message {
}

// Used by tests
static fromValues(values: unknown): Message {
return Object.assign(new Message(), values);
static fromValues(values: Message | Record<string, unknown>, stringifyAction?: boolean): Message {
return fromValues(values, stringifyAction);
}

// Used by tests
Expand Down
91 changes: 87 additions & 4 deletions src/common/lib/types/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,32 @@ import * as API from '../../../../ably';
import { IUntypedCryptoStatic } from 'common/types/ICryptoStatic';
import { MsgPack } from 'common/types/msgpack';

const MessageActionArray: API.MessageAction[] = [
'message_unset',
'message_create',
'message_update',
'message_delete',
'message_annotation_create',
'message_annotation_delete',
'message_meta_occupancy',
];

function toMessageActionString(actionNumber: number): API.MessageAction {
if (actionNumber in MessageActionArray) {
return MessageActionArray[actionNumber];
}
return 'message_unset';
}

function toMessageActionNumber(messageAction: API.MessageAction): number {
for (const [index, value] of MessageActionArray.entries()) {
if (value === messageAction) {
return index;
}
}
return 0;
}

export type CipherOptions = {
channelCipher: {
encrypt: Function;
Expand Down Expand Up @@ -82,7 +108,7 @@ export async function fromEncoded(
encoded: unknown,
inputOptions?: API.ChannelOptions,
): Promise<Message> {
const msg = fromValues(encoded);
const msg = fromValues(encoded as Message | Record<string, unknown>, true);
const options = normalizeCipherOptions(Crypto, logger, inputOptions ?? null);
/* if decoding fails at any point, catch and return the message decoded to
* the fullest extent possible */
Expand Down Expand Up @@ -260,7 +286,7 @@ export async function fromResponseBody(
}

for (let i = 0; i < body.length; i++) {
const msg = (body[i] = fromValues(body[i]));
const msg = (body[i] = fromValues(body[i], true));
try {
await decode(msg, options);
} catch (e) {
Expand All @@ -270,14 +296,48 @@ export async function fromResponseBody(
return body;
}

export function fromValues(values: unknown): Message {
/**
* This is used to return a new message with a given action type set.
* @param values - This is a message-like object, with the values to be set on the new message object.
* @param action - This is the action type that will be applied to the message
* @returns {Message} - This is a new message, with the provided action type set.
* @throws {ErrorInfo} - If the action type is not supported.
*/
export function messageFromValuesWithAction(
values: Message | Record<string, unknown>,
action: API.MessageAction,
): Message {
// Ensure the action is valid
toMessageActionNumber(action);
values.action = action;
return fromValues(values);
}

/**
* This is used to return an array of new messages, each set with the provided action type.
* It will apply the same action type to ALL messages.
* @param values - This is the array of message-like objects, with the values to be set on the new messages.
* @param action - This is the action type, applied to each message in the array.
* @returns {Message[]} - This is an array of new messages, each with the provided action type set.
*/
export function messageFromValuesArrayWithAction(values: unknown[], action: API.MessageAction): Message[] {
const count = values.length,
result = new Array(count);
for (let i = 0; i < count; i++) result[i] = messageFromValuesWithAction(values[i] as Record<string, unknown>, action);
return result;
}

export function fromValues(values: Message | Record<string, unknown>, stringifyAction?: boolean): Message {
if (stringifyAction) {
values.action = toMessageActionString(values.action as number);
}
return Object.assign(new Message(), values);
}

export function fromValuesArray(values: unknown[]): Message[] {
const count = values.length,
result = new Array(count);
for (let i = 0; i < count; i++) result[i] = fromValues(values[i]);
for (let i = 0; i < count; i++) result[i] = fromValues(values[i] as Record<string, unknown>);
return result;
}

Expand All @@ -304,6 +364,13 @@ class Message {
encoding?: string | null;
extras?: any;
size?: number;
action?: string | number | undefined;
serial?: string;
refSerial?: string;
refType?: string;
updatedAt?: number;
deletedAt?: number;
operation?: API.Operation;

/**
* Overload toJSON() to intercept JSON.stringify()
Expand Down Expand Up @@ -334,6 +401,14 @@ class Message {
connectionId: this.connectionId,
connectionKey: this.connectionKey,
extras: this.extras,
serial: this.serial,
// If `action` has not been set, it will be set once received by realtime
action: this.action ? toMessageActionNumber(this.action as API.MessageAction) : undefined,
refSerial: this.refSerial,
refType: this.refType,
updatedAt: this.updatedAt,
deletedAt: this.deletedAt,
operation: this.operation,
encoding,
data,
};
Expand All @@ -355,6 +430,14 @@ class Message {
else result += '; data (json)=' + JSON.stringify(this.data);
}
if (this.extras) result += '; extras=' + JSON.stringify(this.extras);

if (this.action) result += '; action=' + this.action;
if (this.serial) result += '; serial=' + this.serial;
if (this.refSerial) result += '; refSerial=' + this.refSerial;
if (this.refType) result += '; refType=' + this.refType;
if (this.updatedAt) result += '; updatedAt=' + this.updatedAt;
if (this.deletedAt) result += '; deletedAt=' + this.deletedAt;
if (this.operation) result += '; operation=' + JSON.stringify(this.operation);
result += ']';
return result;
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/lib/types/protocolmessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ export function fromDeserialized(
const error = deserialized.error;
if (error) deserialized.error = ErrorInfo.fromValues(error as ErrorInfo);
const messages = deserialized.messages as Message[];
if (messages) for (let i = 0; i < messages.length; i++) messages[i] = messageFromValues(messages[i]);
if (messages) for (let i = 0; i < messages.length; i++) messages[i] = messageFromValues(messages[i], true);

const presence = presenceMessagePlugin ? (deserialized.presence as PresenceMessage[]) : undefined;
if (presenceMessagePlugin) {
Expand Down
2 changes: 2 additions & 0 deletions test/realtime/crypto.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async
helper.recordPrivateApi('call.msgpack.decode');
var messageFromMsgpack = Message.fromValues(
msgpack.decode(BufferUtils.base64Decode(msgpackEncodedMessage)),
true,
);

try {
Expand Down Expand Up @@ -439,6 +440,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async
helper.recordPrivateApi('call.msgpack.decode');
var messageFromMsgpack = Message.fromValues(
msgpack.decode(BufferUtils.base64Decode(msgpackEncodedMessage)),
true,
);

try {
Expand Down

0 comments on commit 5825ad7

Please sign in to comment.