From 5c619449026a69bb00d29c0e3943d0b25eecf47f Mon Sep 17 00:00:00 2001 From: zak Date: Thu, 21 Sep 2023 17:45:52 +0100 Subject: [PATCH] Add SlidingWindow to reorder incoming events Add a sliding window to the Stream that will buffer the events for config amount of time. It uses this buffer window to reorder events and pass them to the subscription them once the timeout expires. All events before the expiring event will be passed to the subscription. For example: - Receiving [e2, e1] - Will be reordered to [e1, e2] Assuming that e2 expires from the window before e1 (because it was added to the window before e1) then both e1 and e2 will be passed to the subscription. --- src/SlidingWindow.test.ts | 73 +++++++++++++++++++++++++++++++++++++++ src/SlidingWindow.ts | 44 +++++++++++++++++++++++ src/Stream.ts | 12 ++++++- 3 files changed, 128 insertions(+), 1 deletion(-) create mode 100644 src/SlidingWindow.test.ts create mode 100644 src/SlidingWindow.ts diff --git a/src/SlidingWindow.test.ts b/src/SlidingWindow.test.ts new file mode 100644 index 00000000..f18376f2 --- /dev/null +++ b/src/SlidingWindow.test.ts @@ -0,0 +1,73 @@ +import { it, describe, expect, vi } from 'vitest'; + +import SlidingWindow from './SlidingWindow.js'; +import { createMessage } from './utilities/test/messages.js'; + +describe('SlidingWindow', () => { + it('emits events immediately with no timeout', async () => { + const onExpire = vi.fn(); + const sliding = new SlidingWindow(0, onExpire); + + const msg = createMessage(1); + sliding.addMessage(msg); + + expect(onExpire).toHaveBeenCalledTimes(1); + expect(onExpire).toHaveBeenCalledWith(msg); + }); + + it('emits events after timeout', async () => { + const onExpire = vi.fn(); + const sliding = new SlidingWindow(100, onExpire); + + const msg = createMessage(1); + sliding.addMessage(msg); + + expect(onExpire).toHaveBeenCalledTimes(0); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + expect(onExpire).toHaveBeenCalledTimes(1); + expect(onExpire).toHaveBeenCalledWith(msg); + }); + + it('reorders events in the buffer', async () => { + const onExpire = vi.fn(); + const sliding = new SlidingWindow(1, onExpire); + + const msg2 = createMessage(2); + const msg1 = createMessage(1); + + sliding.addMessage(msg2); + sliding.addMessage(msg1); + + await new Promise((resolve) => setTimeout(resolve, 1)); + + expect(onExpire).toHaveBeenCalledTimes(2); + expect(onExpire).toHaveBeenNthCalledWith(1, msg1); + expect(onExpire).toHaveBeenNthCalledWith(2, msg2); + }); + + it('ignores expired events when reordering', async () => { + const onExpire = vi.fn(); + const sliding = new SlidingWindow(1, onExpire); + + const msg3 = createMessage(3); + const msg2 = createMessage(2); + const msg1 = createMessage(1); + + // message 3 added, and expired + sliding.addMessage(msg3); + await new Promise((resolve) => setTimeout(resolve, 1)); + + // then messages 1 and 2 added, reordered, and expired + sliding.addMessage(msg2); + sliding.addMessage(msg1); + + await new Promise((resolve) => setTimeout(resolve, 1)); + + expect(onExpire).toHaveBeenCalledTimes(3); + expect(onExpire).toHaveBeenNthCalledWith(1, msg3); + expect(onExpire).toHaveBeenNthCalledWith(2, msg1); + expect(onExpire).toHaveBeenNthCalledWith(3, msg2); + }); +}); diff --git a/src/SlidingWindow.ts b/src/SlidingWindow.ts new file mode 100644 index 00000000..9c4a68e3 --- /dev/null +++ b/src/SlidingWindow.ts @@ -0,0 +1,44 @@ +import { Types as AblyTypes } from 'ably'; + +export default class SlidingWindow { + // TODO: do I need to make this threadsafe somehow? + private messages: AblyTypes.Message[] = []; + + constructor(private readonly windowSizeMs: number, private onExpire: (message: AblyTypes.Message) => void) {} + + public addMessage(message: AblyTypes.Message) { + if (this.windowSizeMs == 0) { + this.onExpire(message); + return; + } + + this.messages.push(message); + this.messages.sort((a, b) => { + if (a.id < b.id) { + return -1; + } + + if (a.id == b.id) { + return 0; + } + + return 1; + }); + + setTimeout(() => { + this.expire(message); + }, this.windowSizeMs); + } + + private expire(message: AblyTypes.Message) { + const idx = this.messages.indexOf(message); + + if (idx == -1) { + return; + } + + this.messages.splice(0, idx + 1).forEach((msg) => { + this.onExpire(msg); + }); + } +} diff --git a/src/Stream.ts b/src/Stream.ts index 50bfaf8b..a3836d9d 100644 --- a/src/Stream.ts +++ b/src/Stream.ts @@ -2,6 +2,7 @@ import { Types as AblyTypes } from 'ably'; import { Logger } from 'pino'; import { Subject, Subscription } from 'rxjs'; +import SlidingWindow from './SlidingWindow.js'; import type { StandardCallback } from './types/callbacks'; import EventEmitter from './utilities/EventEmitter.js'; @@ -41,6 +42,11 @@ export type StreamOptions = { channel: string; ably: AblyTypes.RealtimePromise; logger: Logger; + /** + * reorderBufferMs is the ms length of the sliding window used to buffer messages for recordering, + * it defaults to zero, i.e. no buffering. + /*/ + reorderBufferMs?: number; }; /** @@ -72,6 +78,7 @@ export default class Stream extends EventEmitter(); private subscriptionMap: WeakMap, Subscription> = new WeakMap(); + private slidingWindow: SlidingWindow; private readonly baseLogContext: Partial<{ scope: string; action: string }>; private readonly logger: Logger; @@ -82,6 +89,9 @@ export default class Stream extends EventEmitter + this.subscriptions.next(message), + ); this.init(); } @@ -174,6 +184,6 @@ export default class Stream extends EventEmitter