From 819876c3b8d33ee8d240290db93ee2f928b09d28 Mon Sep 17 00:00:00 2001 From: Mike Christensen Date: Thu, 19 Oct 2023 17:06:26 +0100 Subject: [PATCH] middleware: update comments and docstrings --- src/stream/Middleware.ts | 21 +++++++++++++++------ src/stream/Stream.ts | 11 ++++++++++- 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/src/stream/Middleware.ts b/src/stream/Middleware.ts index 3cd9b686..da4d340a 100644 --- a/src/stream/Middleware.ts +++ b/src/stream/Middleware.ts @@ -91,7 +91,16 @@ export class SlidingWindow extends MiddlewareBase { } } -// TODO handle case when cannot paginate far back enough before finding sequence +/** + * Middleware which emits messages from a position in the stream determined by a sequenceID. + * The caller paginates back through history and passes in each page which the middleware + * uses to seek for the specified position. Concurrently, incoming live messages can be buffered. + * Messages are re-ordered according to the sequence ID. For historical messages, the entire history + * is re-ordered; for live messages a sliding window is applied. + * When the position is reached, the middleware emits all messages in-order from the resume position + * forwards, which includes the historical plus the live messages. + * Subsequently live messages can continue to be added, re-ordered within a sliding window, and emitted. + */ export class OrderedHistoryResumer extends MiddlewareBase { private currentState: 'seeking' | 'ready' = 'seeking'; private historicalMessages: AblyTypes.Message[] = []; @@ -108,10 +117,6 @@ export class OrderedHistoryResumer extends MiddlewareBase { this.slidingWindow.subscribe(this.onMessage.bind(this)); } - public get state() { - return this.currentState; - } - private onMessage(err: Error | null, message: AblyTypes.Message | null) { if (err) { super.error(err); @@ -124,6 +129,10 @@ export class OrderedHistoryResumer extends MiddlewareBase { return this.eventOrderer(a, b) * -1; } + public get state() { + return this.currentState; + } + public addHistoricalMessages(messages: AblyTypes.Message[]): boolean { if (this.currentState !== 'seeking') { throw new Error('can only add historical messages while in seeking state'); @@ -138,7 +147,7 @@ export class OrderedHistoryResumer extends MiddlewareBase { // It is not optimal to sort the entire thing with each page as out-of-orderiness // is localised within a two minute window, but being more clever about this requires // tracking message timestamps and complicates the logic. - // Given the number of messages is likely to be reasonably small, this approach is okay for now. + // Given the number of messages is likely to be reasonably small, this approach is fine. // // Note that because of potential out-of-orderiness by sequenceID due to possible CGO order, // it's possible this function discovers the boundary in the stream but a more recent message appears diff --git a/src/stream/Stream.ts b/src/stream/Stream.ts index 43ab88c2..7d3e94cb 100644 --- a/src/stream/Stream.ts +++ b/src/stream/Stream.ts @@ -205,6 +205,12 @@ export default class Stream extends EventEmitter