Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

channelgroup: emit channel assigned/active events #1725

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion ably.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ export type ChannelEvent =
| ChannelEvents.DETACHING
| ChannelEvents.UPDATE;

/**
* Describes the events emitted by a {@link ChannelGroup} object.
*/
export type ChannelGroupEvent = 'active.updated' | 'assigned.updated';

/**
* The `ConnectionStates` namespace describes the possible values of the {@link ConnectionState} type.
*/
Expand Down Expand Up @@ -1489,6 +1494,12 @@ export type channelAndMessageCallback<T> = (channel: string, message: T) => void
* @param changeStateChange - The state change that occurred.
*/
export type channelEventCallback = (changeStateChange: ChannelStateChange) => void;
/**
* The callback used for the events emitted by {@link ChannelGroup}.
*
* @param channels - The set of channels.
*/
export type channelGroupCallback = (channels: string[]) => void;
/**
* The callback used for the events emitted by {@link Connection}.
*
Expand Down Expand Up @@ -2020,7 +2031,7 @@ export declare interface Channel {
*
* Enables messages to be subscribed to on a group of channels.
*/
export declare interface ChannelGroup {
export declare interface ChannelGroup extends EventEmitter<channelGroupCallback, string[], ChannelGroupEvent> {
/**
* Registers a listener for messages on this channel group. The caller supplies a listener function, which is called each time one or more messages arrives on the channels in the group.
*
Expand Down
6 changes: 5 additions & 1 deletion src/common/lib/client/baserealtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ class ConsumerGroup extends EventEmitter {
}
}

class ChannelGroup {
class ChannelGroup extends EventEmitter {
activeChannels: string[] = [];
assignedChannels: string[] = [];
active: RealtimeChannel;
Expand All @@ -246,6 +246,7 @@ class ChannelGroup {
consumerGroup: ConsumerGroup;

constructor(readonly channels: Channels, readonly filter: string, readonly options?: API.ChannelGroupOptions) {
super();
this.subscriptions = new EventEmitter();
this.active = channels.get(this.safeChannelName(options?.activeChannel || '$ably:active'));
this.consumerGroup = new ConsumerGroup(channels, options?.consumerGroup?.name);
Expand All @@ -266,6 +267,7 @@ class ChannelGroup {
await this.active.setOptions({ params: { rewind: '1' } });
await this.active.subscribe((msg: any) => {
this.activeChannels = msg.data.active;
this.emit('active.updated', this.activeChannels);
this.updateAssignedChannels();
});
}
Expand All @@ -275,6 +277,7 @@ class ChannelGroup {
await this.active.detach();
await this.consumerGroup.leave();
this.assignedChannels = [];
this.emit('assigned.updated', this.assignedChannels);
this.removeSubscriptions(Object.keys(this.subscribedChannels));
}

Expand Down Expand Up @@ -310,6 +313,7 @@ class ChannelGroup {
'ChannelGroups.updateAssignedChannels',
'assignedChannels=' + this.assignedChannels + ' consumerId=' + this.consumerGroup.consumerId,
);
this.emit('assigned.updated', this.assignedChannels);
}

private unsubscribeTimeout(channel: string) {
Expand Down
Loading