Skip to content

Commit

Permalink
middleware: update comments and docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
mschristensen committed Oct 19, 2023
1 parent 0731766 commit 819876c
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 7 deletions.
21 changes: 15 additions & 6 deletions src/stream/Middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,16 @@ export class SlidingWindow extends MiddlewareBase {
}
}

// TODO handle case when cannot paginate far back enough before finding sequence
/**
* Middleware which emits messages from a position in the stream determined by a sequenceID.
* The caller paginates back through history and passes in each page which the middleware
* uses to seek for the specified position. Concurrently, incoming live messages can be buffered.
* Messages are re-ordered according to the sequence ID. For historical messages, the entire history
* is re-ordered; for live messages a sliding window is applied.
* When the position is reached, the middleware emits all messages in-order from the resume position
* forwards, which includes the historical plus the live messages.
* Subsequently live messages can continue to be added, re-ordered within a sliding window, and emitted.
*/
export class OrderedHistoryResumer extends MiddlewareBase {
private currentState: 'seeking' | 'ready' = 'seeking';
private historicalMessages: AblyTypes.Message[] = [];
Expand All @@ -108,10 +117,6 @@ export class OrderedHistoryResumer extends MiddlewareBase {
this.slidingWindow.subscribe(this.onMessage.bind(this));
}

public get state() {
return this.currentState;
}

private onMessage(err: Error | null, message: AblyTypes.Message | null) {
if (err) {
super.error(err);
Expand All @@ -124,6 +129,10 @@ export class OrderedHistoryResumer extends MiddlewareBase {
return this.eventOrderer(a, b) * -1;
}

public get state() {
return this.currentState;
}

public addHistoricalMessages(messages: AblyTypes.Message[]): boolean {
if (this.currentState !== 'seeking') {
throw new Error('can only add historical messages while in seeking state');
Expand All @@ -138,7 +147,7 @@ export class OrderedHistoryResumer extends MiddlewareBase {
// It is not optimal to sort the entire thing with each page as out-of-orderiness
// is localised within a two minute window, but being more clever about this requires
// tracking message timestamps and complicates the logic.
// Given the number of messages is likely to be reasonably small, this approach is okay for now.
// Given the number of messages is likely to be reasonably small, this approach is fine.
//
// Note that because of potential out-of-orderiness by sequenceID due to possible CGO order,
// it's possible this function discovers the boundary in the stream but a more recent message appears
Expand Down
11 changes: 10 additions & 1 deletion src/stream/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,12 @@ export default class Stream extends EventEmitter<Record<StreamState, StreamState
}
}

/**
* Resubscribe to the channel and emit messages from the position in the stream specified by the sequenceID.
* This is achieved by attaching to the channel and paginating back through history until the boundary determined by
* the specified sequenceID is reached.
* @param sequenceID The identifier that specifies the position in the message stream (by message ID) from which to resume.
*/
private async init(sequenceID: string) {
this.logger.trace({ ...this.baseLogContext, action: 'init()' });

Expand All @@ -222,11 +228,14 @@ export default class Stream extends EventEmitter<Record<StreamState, StreamState
});
await this.ably.connection.whenState('connected');

// The attach callback is called synchronously upon receipt of an attach message from realtime
// and adding a channel subscription is also synchronous, so registering the subscription immediately
// after the attach guarantees that nothing else should execute between, so no messages will be missed.
const attachResult = await this.ablyChannel.attach();
if (!attachResult) {
throw new Error('the channel was already attached when calling attach()');
}
const subscribeResult = await this.ablyChannel.subscribe(this.onChannelMessage.bind(this));
const subscribeResult = await this.ablyChannel.subscribe(this.onChannelMessage.bind(this)); // live messages
if (subscribeResult) {
throw new Error('the channel was not attached when calling subscribe()');
}
Expand Down

0 comments on commit 819876c

Please sign in to comment.