Skip to content

Commit

Permalink
Add dedeuplication and configurable ordering of buffer events
Browse files Browse the repository at this point in the history
  • Loading branch information
zknill committed Sep 28, 2023
1 parent 5c61944 commit c8ca109
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 22 deletions.
42 changes: 42 additions & 0 deletions src/SlidingWindow.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,32 @@ describe('SlidingWindow', () => {
expect(onExpire).toHaveBeenNthCalledWith(2, msg2);
});

it('reorders events in the buffer with custom order', async () => {
const onExpire = vi.fn();
const sliding = new SlidingWindow(1, onExpire, (a, b) => {
if (a.id < b.id) {
return 1;
}

return -1;
});

const msg3 = createMessage(3);
const msg2 = createMessage(2);
const msg1 = createMessage(1);

sliding.addMessage(msg3);
sliding.addMessage(msg2);
sliding.addMessage(msg1);

await new Promise((resolve) => setTimeout(resolve, 1));

expect(onExpire).toHaveBeenCalledTimes(3);
expect(onExpire).toHaveBeenNthCalledWith(1, msg3);
expect(onExpire).toHaveBeenNthCalledWith(2, msg2);
expect(onExpire).toHaveBeenNthCalledWith(3, msg1);
});

it('ignores expired events when reordering', async () => {
const onExpire = vi.fn();
const sliding = new SlidingWindow(1, onExpire);
Expand All @@ -70,4 +96,20 @@ describe('SlidingWindow', () => {
expect(onExpire).toHaveBeenNthCalledWith(2, msg1);
expect(onExpire).toHaveBeenNthCalledWith(3, msg2);
});

it('deduplicates events in the buffer', async () => {
const onExpire = vi.fn();
const sliding = new SlidingWindow(1, onExpire);

const msg2a = createMessage(2);
const msg2b = createMessage(2);

sliding.addMessage(msg2a);
sliding.addMessage(msg2b);

await new Promise((resolve) => setTimeout(resolve, 1));

expect(onExpire).toHaveBeenCalledTimes(1);
expect(onExpire).toHaveBeenNthCalledWith(1, msg2a);
});
});
39 changes: 24 additions & 15 deletions src/SlidingWindow.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,26 @@
import { Types as AblyTypes } from 'ably';

export default class SlidingWindow {
// TODO: do I need to make this threadsafe somehow?
private messages: AblyTypes.Message[] = [];

constructor(private readonly windowSizeMs: number, private onExpire: (message: AblyTypes.Message) => void) {}
constructor(
private readonly windowSizeMs: number,
private onExpire: (message: AblyTypes.Message) => void,
private readonly eventOrderer: (a: AblyTypes.Message, b: AblyTypes.Message) => number = defaultOrderLexicoId,
) {}

public addMessage(message: AblyTypes.Message) {
if (this.windowSizeMs == 0) {
if (this.windowSizeMs === 0) {
this.onExpire(message);
return;
}

this.messages.push(message);
this.messages.sort((a, b) => {
if (a.id < b.id) {
return -1;
}

if (a.id == b.id) {
return 0;
}
if (this.messages.map((msg) => msg.id).includes(message.id)) {
return;
}

return 1;
});
this.messages.push(message);
this.messages.sort(this.eventOrderer);

setTimeout(() => {
this.expire(message);
Expand All @@ -33,7 +30,7 @@ export default class SlidingWindow {
private expire(message: AblyTypes.Message) {
const idx = this.messages.indexOf(message);

if (idx == -1) {
if (idx === -1) {
return;
}

Expand All @@ -42,3 +39,15 @@ export default class SlidingWindow {
});
}
}

function defaultOrderLexicoId(a: AblyTypes.Message, b: AblyTypes.Message): number {
if (a.id < b.id) {
return -1;
}

if (a.id === b.id) {
return 0;
}

return 1;
}
28 changes: 22 additions & 6 deletions src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,27 @@ export type StreamOptions = {
channel: string;
ably: AblyTypes.RealtimePromise;
logger: Logger;
eventBufferOptions?: EventBufferOptions;
};

export type EventBufferOptions = {
/**
* bufferms is the period of time events are held in a buffer
* for reordering and deduplicating. By default this is zero,
* which disables the buffer. Setting bufferMs to a non-zero
* value enables the buffer. The buffer is a sliding window.
*/
bufferMs?: number;
/**
* reorderBufferMs is the ms length of the sliding window used to buffer messages for recordering,
* it defaults to zero, i.e. no buffering.
/*/
reorderBufferMs?: number;
* eventOrderer defines the correct order of events. By default,
* when the buffer is enabled the event order is the lexicographical
* order of the message ids within the buffer.
*/
eventOrderer?: EventOrderer;
};

type EventOrderer = (a: AblyTypes.Message, b: AblyTypes.Message) => number;

/**
* A state transition emitted as an event from the stream describing a change to the stream's lifecycle.
*/
Expand Down Expand Up @@ -89,8 +103,10 @@ export default class Stream extends EventEmitter<Record<StreamState, StreamState
this.logger = options.logger;
this.ablyChannel = this.ably.channels.get(this.options.channel);
this.baseLogContext = { scope: `Stream#${options.channel}` };
this.slidingWindow = new SlidingWindow(options.reorderBufferMs || 0, (message: AblyTypes.Message) =>
this.subscriptions.next(message),
this.slidingWindow = new SlidingWindow(
options.eventBufferOptions?.bufferMs || 0,
(message: AblyTypes.Message) => this.subscriptions.next(message),
options.eventBufferOptions?.eventOrderer,
);
this.init();
}
Expand Down
15 changes: 15 additions & 0 deletions src/StreamRegistry.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Subject } from 'rxjs';
import { it, describe, expect, beforeEach, afterEach, vi } from 'vitest';

import Stream, { StreamOptions, StreamState } from './Stream.js';
import StreamRegistry from './StreamRegistry.js';
import { createMessage } from './utilities/test/messages.js';

vi.mock('ably/promises');
Expand Down Expand Up @@ -227,6 +228,20 @@ describe('Stream', () => {
expect(ablyChannel.subscribe).toHaveBeenCalledOnce();
});

it<StreamTestContext>('succeeds with gte zero event buffer ms', async ({ ably, logger }) => {
new StreamRegistry({ eventBufferOptions: { bufferMs: 0 }, ably, logger });
new StreamRegistry({ eventBufferOptions: { bufferMs: 1 }, ably, logger });
});

it<StreamTestContext>('fails with lt zero event buffer ms', async ({ ably, logger }) => {
try {
new StreamRegistry({ eventBufferOptions: { bufferMs: -1 }, ably, logger });
expect(true).toBe(false);
} catch (err) {
expect(err.toString(), 'Stream registry should have thrown an error').not.toContain('AssertionError');
}
});

// TODO discontinuity
// TODO reauth https://ably.com/docs/realtime/channels?lang=nodejs#fatal-errors
});
9 changes: 8 additions & 1 deletion src/StreamRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,14 @@ export default class StreamRegistry implements IStreamRegistry {
/**
* @param {Pick<StreamOptions, 'ably' | 'logger'>} options - The default options used when instantiating a stream.
*/
constructor(readonly options: Pick<StreamOptions, 'ably' | 'logger'>) {}
constructor(readonly options: Pick<StreamOptions, 'ably' | 'logger' | 'eventBufferOptions'>) {
if (options.eventBufferOptions !== null) {
const bufferMs = options.eventBufferOptions?.bufferMs || 0;
if (bufferMs < 0) {
throw new Error(`EventBufferOptions bufferMs cannot be less than zero: ${bufferMs}`);
}
}
}

/**
* Retrieve an existing stream instance for the given channel or create a new one if it doesn't yet exist.
Expand Down
2 changes: 2 additions & 0 deletions src/types/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { Logger, LevelWithSilent } from 'pino';

import type { EventComparator, MutationMethods, MutationOptions, MutationRegistration } from './mutations';
import type { UpdateFunc } from './updates';
import type { EventBufferOptions } from '../Stream';

/**
* Options used to configure all model instances.
Expand All @@ -20,6 +21,7 @@ export type ModelOptions = {
ably: AblyTypes.RealtimePromise;
logger: Logger;
defaultMutationOptions?: Partial<MutationOptions>;
eventBufferOptions?: EventBufferOptions;
};

/**
Expand Down

0 comments on commit c8ca109

Please sign in to comment.