Skip to content

Commit

Permalink
Add channel groups
Browse files Browse the repository at this point in the history
Add a client side channel group that listens for active channels and
subscribes/unsubscribes as the set of active changes.

Add a consumer group based on presence set

Add typing information for ChannelGroups

Fix tests for v2

ChannelGroups is a default realtime module

types: add channel group typings

consumergroup: use modulo-based hashing scheme

The `hashring` package is node-only as it depends on the native `crypto`
package. Replaced with a simple modulo hash scheme for now.

Fixes the case where the channel is already attached and the channel is
obtained with new rewind options that require a re-attach.

Updates the consumer group partitioning test to more robustly assert
that channels are partitioned across consumers.

channelgroup: make get sync, subscribe async

This matches the pattern used by Channels, which is sync to obtain the
channel and async on subscribe in order to await attachment. In the
channel group case, awaiting the subscribe awaits the joining of the
consumer group.

consumergroup: make consumerId required field

consumergroup: make hashring a required field

format: apply prettier formatting rules

jsdoc: add consumer group docs

channelgroups: add active channel name option

channelgroup: add explicit join method

Exposes the join method on the channel group, which is analogous to
attach on the channel. Uses this in tests for more robust assertions.

Additionally avoids re-attaching to the consumer group channel if
already attached, and obtains presence membership synchronously in the
join.

consumergroup: use subscribe over on

The `on` method was not reliable across clients, despite being
documented in
https://ably.com/docs/presence-occupancy/presence?lang=java#synced, so
use subscribe instead.

channelgroup: fix assigned channel processing

We need to keep the total set of active channels around as updating the
assigned channel set when the membership changes requires computing the
new assignments from the complete channel set, not the previously set of
assigned channels.

consumergroup: include consumerId in logs

consumergroups: add test for consumer group resize

test: remove explicit join from test

lint: apply formatting and cleanup

test: replace var with let

test: remove unnecessary outer try-catch

test: add prefix to channels

Avoid channel name collisions causing tests to fail from concurrent test
runs in CI.

channelgroup: detach from channel on un-assignment

channelgroup: add unsubscribe listener method

test: fix rebalance test waits for consumers

channelgroup: add leave method

test: remove dangling console logs

test: test consumer group scale down event

test: prefix consumer group channel

Similar to the active channel, we need to avoid conflicts.

consumergroup: fix current member tracking

We store the current active set of members in the hashring.

test: fix done condition w/ at-least-once delivery

Messages can be delivered more than once during a consumer group
rescaling event, so deduplicate the results when checking the end
condition.

channelgroups: use Utils.inspectError in logs

channelgroups: do not share channel object

The Channels object used by the ChannelGroup for internal channel
operations no longer shares the same object exposed on the client via
the .channels() method. This is to ensure that independent usage of an
individual channel that happens to be included in a channel group is not
impacted by its usage in the channel group.

test: tidy up leave test

Now that we can correctly handle a channel group and channel being used
independently from the same client, this tidies up the leave test to
remove the additional client previously needed.

test: rename waitForConsumers for clarity

test: rename waitForConsumers for clarity

channelgroups: do not share channel object

The Channels object used by the ChannelGroup for internal channel
operations no longer shares the same object exposed on the client via
the .channels() method. This is to ensure that independent usage of an
individual channel that happens to be included in a channel group is not
impacted by its usage in the channel group.

channelgroups: add module integration

modules: update ChannelGroups module definitions

channelgroup: add temp rewind channel group option

channelgroup: unsubscribe channel after timeout

In order to avoid keeping the channel alive, we add a configurable
timeout after which the channel will be unsubscribed if no messages are
received. This is to avoid keeping the channel active. This can lead to
missed messages if the a message is published after the client
unsubscribes and before the channel becomes inactive. This is an
acceptable edge case for the client-side simulation, especially with the
default 1h timeout.

deps: remove unused hashring types pkg

utils: remove arrIndexOf polyfill

consumergroup: rename hashring to locator

test: use async style tests for channel groups

Replaces the use of the `done()` callback with an async function style
test.

This allows us to await channel publish results and more easily handle
race conditions in tests.

channelgroup: use qualifier options

Previously we relied on a new BaseRealtime instance with it's own
Channels object to separate usage of channels in the ChannelGroup from
independent external usage of those channels from the regular
client.channels.get() method. This led to various problems with shared
Auth state such as nonces in token requests which caused connections to
terminate and tests to fail.

A simpler solution is to avoid creating a new client instance and
instead share the Channel pool, but force the library to treat channels
used from the ChannelGroup independently (with their own attachment) by
setting dummy options in the qualifier, which is used as the key in the
channel map.

This implementation does not support channels in the channel group which
already have a qualifier. This is acceptable for the experimental
client-side simulation of the feature.
  • Loading branch information
zknill authored and mschristensen committed Mar 6, 2024
1 parent 46dcda7 commit bcf965f
Show file tree
Hide file tree
Showing 13 changed files with 1,288 additions and 6 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ ably-js.iml
node_modules
npm-debug.log
.tool-versions
*.swp
*.swo
build/
react/
typedoc/generated/
Expand Down
115 changes: 114 additions & 1 deletion ably.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,56 @@ export interface ChannelOptions {
modes?: ChannelMode[];
}

/**
* Describes the consumer group. Consumers in the same group will partition the channels of that group.
*/
interface ConsumerGroupOptions {
/**
* The name of the consumer group.
* Channels will be partitioned across consumers in the same group identified by this name.
*/
name: string;
}

/**
* Allows specifying properties of a {@link ChannelGroup}
*/
interface ChannelGroupOptions {
/**
* Options for a consumer group used to partition the channels in the channel group across consumers in the consumer group.
*/
consumerGroup?: ConsumerGroupOptions;
/**
* The name of the channel that receives the set of active channels in the group.
* For correct behaviour, this channel should have persist-last enabled.
* Note that this is a temporary option only required for the client-side simulation of channel groups.
*
* @defaultValue $ably:active
*/
activeChannel?: string;
/**
* The rewind interval to use when attaching to the matched channels in the group.
* This faciltates at-least-once delivery in the event of a consumer group scaling event.
* Note that this is a temporary option only required for the client-side simulation of channel groups.
*
* @defaultValue 5s
*/
rewind?: string;

/**
* The time for which the client will remain subscribed to a given channel after the last message is received.
* In the client-side simulation of channel groups, subscribing to the channel has the effect of keeping
* the channel active for the duration of the subscription. This timeout is used to determine how long to remain
* subscribed to a channel while no messages are received on it. It is possible that a message is sent after the
* client unsubscribes and before the channel becomes inactive, which means the client may not re-subscribe to the
* channel and could miss a message.
* Note that this is a temporary option only required for the client-side simulation of channel groups.
*
* @defaultValue 60 * 60 * 1000 (1 hour)
*/
subscriptionTimeout?: number;
}

/**
* Passes additional properties to a {@link RealtimeChannel} name to produce a new derived channel
*/
Expand Down Expand Up @@ -1426,6 +1476,13 @@ export interface TokenRevocationFailureResult {
* @param message - The message which triggered the callback.
*/
export type messageCallback<T> = (message: T) => void;
/**
* A callback which returns a channel and message argument, used for {@link ChannelGroup} subscriptions.
*
* @param channel - The channel name on which the message was received.
* @param message - The message which triggered the callback.
*/
export type channelAndMessageCallback<T> = (channel: string, message: T) => void;
/**
* The callback used for the events emitted by {@link RealtimeChannel}.
*
Expand Down Expand Up @@ -1958,6 +2015,41 @@ export declare interface Channel {
status(): Promise<ChannelDetails>;
}

/**
* This is a preview feature and may change in a future non-major release.
*
* Enables messages to be subscribed to on a group of channels.
*/
export declare interface ChannelGroup {
/**
* Registers a listener for messages on this channel group. The caller supplies a listener function, which is called each time one or more messages arrives on the channels in the group.
*
* @param callback - An event listener function.
*/
subscribe(callback: channelAndMessageCallback<InboundMessage>): Promise<void>;

/**
* Deregisters a listener for messages on this channel group.
*
* @param callback - An event listener function.
*/
unsubscribe(callback: channelAndMessageCallback<InboundMessage>): void;

/**
* Joins the consumer group if one was created for this channel group,
* and returns a promise that is resolved when the channel group is attached
* to the active channel.
*/
join(): Promise<void>;

/**
* Leaves the consumer group if one was created for this channel group,
* and returns a promise that is resolved when the channel group is detached
* from the active channel.
*/
leave(): Promise<void>;
}

/**
* Enables messages to be published and subscribed to. Also enables historic messages to be retrieved and provides access to the {@link RealtimePresence} object of a channel.
*/
Expand Down Expand Up @@ -2168,7 +2260,8 @@ export declare interface Channels<T> {
*/
get(name: string, channelOptions?: ChannelOptions): T;
/**
* @experimental This is a preview feature and may change in a future non-major release.
* This is a preview feature and may change in a future non-major release.
*
* This experimental method allows you to create custom realtime data feeds by selectively subscribing
* to receive only part of the data from the channel.
* See the [announcement post](https://pages.ably.com/subscription-filters-preview) for more information.
Expand All @@ -2190,6 +2283,22 @@ export declare interface Channels<T> {
release(name: string): void;
}

/**
* This is a preview feature and may change in a future non-major release.
*
* Creates and destroys {@link ChannelGroup} objects.
*/
export declare interface ChannelGroups {
/**
* Creates a new {@link ChannelGroup} object, with the specified {@link ChannelGroupOptions}, or returns the existing channel group object.
*
* @param filter - A regular expression defining the channel group. Only wildcards are supported.
* @param options - A {@link ChannelGroupOptions} object.
* @returns A {@link ChannelGroup} object.
*/
get(filter: string, options?: ChannelGroupOptions): ChannelGroup;
}

/**
* Contains an individual message that is sent to, or received from, Ably.
*/
Expand Down Expand Up @@ -2725,6 +2834,10 @@ export declare class Realtime implements RealtimeClient {
connect(): void;
auth: Auth;
channels: Channels<RealtimeChannel>;
/**
* This is a preview feature and may change in a future non-major release.
*/
channelGroups: ChannelGroups;
connection: Connection;
request<T = any>(
method: string,
Expand Down
29 changes: 29 additions & 0 deletions modular.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {
RealtimeClient,
Auth,
Channels,
ChannelGroups as ChannelGroupsImpl,
Channel,
HttpPaginatedResponse,
StatsParams,
Expand Down Expand Up @@ -131,6 +132,25 @@ export declare const MsgPack: unknown;
*/
export declare const RealtimePresence: unknown;

/**
* The module is experimental and the API is subject to change.
*
* Provides a {@link BaseRealtime} instance with the ability to interact with the experiemental channel groups feature.
*
* To create a client that includes this module, include it in the `ModulesMap` that you pass to the {@link BaseRealtime.constructor}:
*
* ```javascript
* import { BaseRealtime, WebSocketTransport, FetchRequest, RealtimePresence, ChannelGroups } from 'ably/modules';
* const realtime = new BaseRealtime(options, { WebSocketTransport, FetchRequest, RealtimePresence, ChannelGroups });
* ```
*
* If you do not provide this module, then attempting to use the functionality of channel groups will cause a runtime error.
*
* Note that in the experimental client-side simulation of ChannelGroups, you must provide the RealtimePresence module
* which is required for internal coordination among consumers.
*/
export declare const ChannelGroups: unknown;

/**
* Provides a {@link BaseRealtime} instance with the ability to establish a connection with the Ably realtime service using a [WebSocket](https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API) connection.
*
Expand Down Expand Up @@ -240,6 +260,11 @@ export interface ModularPlugins {
*/
RealtimePresence?: typeof RealtimePresence;

/**
* See {@link ChannelGroups | documentation for the `ChannelGroups` module}.
*/
ChannelGroups?: typeof ChannelGroups;

/**
* See {@link WebSocketTransport | documentation for the `WebSocketTransport` plugin}.
*/
Expand Down Expand Up @@ -353,6 +378,10 @@ export declare class BaseRealtime implements RealtimeClient {
connect(): void;
auth: Auth;
channels: Channels<RealtimeChannel>;
/**
* This is a preview feature and may change in a future non-major release.
*/
channelGroups: ChannelGroupsImpl;
connection: Connection;
request<T = any>(
method: string,
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions scripts/moduleReport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const pluginNames = [
'XHRRequest',
'FetchRequest',
'MessageInteractions',
'ChannelGroups',
];

// List of all free-standing functions exported by the library along with the
Expand Down
Loading

0 comments on commit bcf965f

Please sign in to comment.