From b1073a8a5f0c5fd12d21ec23c9f09aae81d4960a Mon Sep 17 00:00:00 2001 From: Mike Christensen Date: Mon, 16 Oct 2023 11:57:48 +0100 Subject: [PATCH] test: move stream tests out of registry tests --- src/Stream.test.ts | 227 +++++++++++++++++++++++++++++++++ src/StreamFactory.test.ts | 197 +--------------------------- src/utilities/test/promises.ts | 2 +- 3 files changed, 229 insertions(+), 197 deletions(-) create mode 100644 src/Stream.test.ts diff --git a/src/Stream.test.ts b/src/Stream.test.ts new file mode 100644 index 00000000..0a6a715e --- /dev/null +++ b/src/Stream.test.ts @@ -0,0 +1,227 @@ +import { Realtime, Types } from 'ably/promises'; +import pino, { type Logger } from 'pino'; +import { Subject } from 'rxjs'; +import { it, describe, expect, beforeEach, afterEach, vi } from 'vitest'; + +import Stream, { StreamOptions, StreamState } from './Stream.js'; +import { createMessage } from './utilities/test/messages.js'; +import { statePromise } from './utilities/test/promises.js'; + +vi.mock('ably/promises'); + +interface StreamTestContext extends StreamOptions { + channel: string; + ably: Types.RealtimePromise; + logger: Logger; +} + +describe('Stream', () => { + beforeEach((context) => { + const ably = new Realtime({}); + ably.connection.whenState = vi.fn<[Types.ConnectionState], Promise>(async () => { + return { + current: 'connected', + previous: 'initialized', + }; + }); + + const channelName = 'foobar'; + const channel = ably.channels.get(channelName); + channel.on = vi.fn(); + channel.attach = vi.fn(); + channel.detach = vi.fn(); + channel.subscribe = vi.fn(); + + context.ably = ably; + context.logger = pino({ level: 'silent' }); + context.channel = channelName; + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + it('enters ready state when successfully attached to the channel', async ({ + ably, + logger, + channel, + }) => { + // the promise returned by the subscribe method resolves when we have successfully attached to the channel + let attach: (...args: any[]) => void = () => { + throw new Error('attach not defined'); + }; + const attachment = new Promise((resolve) => (attach = resolve)); + ably.channels.get(channel).subscribe = vi.fn().mockImplementation(async () => { + await attachment; + }); + + const stream = new Stream({ ably, logger, channel: 'foobar' }); + + await statePromise(stream, StreamState.PREPARING); + attach(); + await statePromise(stream, StreamState.READY); + expect(ably.channels.get(channel).subscribe).toHaveBeenCalledOnce(); + }); + + it('subscribes to messages', async ({ ably, logger, channel }) => { + let messages = new Subject(); + ably.channels.get(channel).subscribe = vi.fn((callback) => { + messages.subscribe((message) => callback(message)); + }); + + const stream = new Stream({ ably, logger, channel }); + await statePromise(stream, StreamState.READY); + + const subscriptionSpy = vi.fn(); + stream.subscribe(subscriptionSpy); + + for (let i = 0; i < 10; i++) { + messages.next(createMessage(i)); + } + + expect(subscriptionSpy).toHaveBeenCalledTimes(10); + for (let i = 0; i < 10; i++) { + expect(subscriptionSpy).toHaveBeenNthCalledWith(i + 1, null, createMessage(i)); + } + + expect(ably.channels.get(channel).subscribe).toHaveBeenCalledOnce(); + }); + + it('subscribes with multiple listeners', async ({ ably, logger, channel }) => { + let messages = new Subject(); + ably.channels.get(channel).subscribe = vi.fn((callback) => { + messages.subscribe((message) => callback(message)); + }); + + const stream = new Stream({ ably, logger, channel }); + await statePromise(stream, StreamState.READY); + + const subscriptionSpy1 = vi.fn(); + stream.subscribe(subscriptionSpy1); + + const subscriptionSpy2 = vi.fn(); + stream.subscribe(subscriptionSpy2); + + for (let i = 0; i < 10; i++) { + messages.next(createMessage(i)); + } + + expect(subscriptionSpy1).toHaveBeenCalledTimes(10); + expect(subscriptionSpy2).toHaveBeenCalledTimes(10); + for (let i = 0; i < 10; i++) { + expect(subscriptionSpy1).toHaveBeenNthCalledWith(i + 1, null, createMessage(i)); + expect(subscriptionSpy2).toHaveBeenNthCalledWith(i + 1, null, createMessage(i)); + } + + expect(ably.channels.get(channel).subscribe).toHaveBeenCalledOnce(); + }); + + it('unsubscribes to messages', async ({ ably, logger, channel }) => { + let messages = new Subject(); + ably.channels.get(channel).subscribe = vi.fn((callback) => { + messages.subscribe((message) => callback(message)); + }); + + const stream = new Stream({ ably, logger, channel }); + await statePromise(stream, StreamState.READY); + + const subscriptionSpy = vi.fn(); + stream.subscribe(subscriptionSpy); + + for (let i = 0; i < 10; i++) { + if (i == 5) { + stream.unsubscribe(subscriptionSpy); + } + messages.next(createMessage(i)); + } + + expect(subscriptionSpy).toHaveBeenCalledTimes(5); + for (let i = 0; i < 5; i++) { + expect(subscriptionSpy).toHaveBeenNthCalledWith(i + 1, null, createMessage(i)); + } + + expect(ably.channels.get(channel).subscribe).toHaveBeenCalledOnce(); + }); + + it('unsubscribes one of two listeners', async ({ ably, logger, channel }) => { + let messages = new Subject(); + ably.channels.get(channel).subscribe = vi.fn((callback) => { + messages.subscribe((message) => callback(message)); + }); + + const stream = new Stream({ ably, logger, channel }); + await statePromise(stream, StreamState.READY); + + const subscriptionSpy1 = vi.fn(); + stream.subscribe(subscriptionSpy1); + + const subscriptionSpy2 = vi.fn(); + stream.subscribe(subscriptionSpy2); + + for (let i = 0; i < 10; i++) { + if (i == 5) { + stream.unsubscribe(subscriptionSpy1); + } + messages.next(createMessage(i)); + } + + expect(subscriptionSpy1).toHaveBeenCalledTimes(5); + expect(subscriptionSpy2).toHaveBeenCalledTimes(10); + for (let i = 0; i < 10; i++) { + if (i < 5) { + expect(subscriptionSpy1).toHaveBeenNthCalledWith(i + 1, null, createMessage(i)); + } + expect(subscriptionSpy2).toHaveBeenNthCalledWith(i + 1, null, createMessage(i)); + } + + expect(ably.channels.get(channel).subscribe).toHaveBeenCalledOnce(); + }); + + it('pauses and resumes the stream', async ({ ably, logger, channel }) => { + const stream = new Stream({ ably, logger, channel }); + + await statePromise(stream, StreamState.READY); + expect(ably.channels.get(channel).subscribe).toHaveBeenCalledOnce(); + + stream.pause(); + await statePromise(stream, StreamState.PAUSED); + expect(ably.channels.get(channel).detach).toHaveBeenCalledOnce(); + + stream.resume(); + await statePromise(stream, StreamState.READY); + expect(ably.channels.get(channel).attach).toHaveBeenCalledOnce(); + }); + + it('disposes of the stream', async ({ ably, logger, channel }) => { + ably.channels.release = vi.fn(); + const stream = new Stream({ ably, logger, channel }); + + await statePromise(stream, StreamState.READY); + expect(ably.channels.get(channel).subscribe).toHaveBeenCalledOnce(); + + stream.dispose(); + await statePromise(stream, StreamState.DISPOSED); + expect(ably.channels.release).toHaveBeenCalledOnce(); + }); + + it('disposes of the stream on channel failed', async ({ ably, logger, channel }) => { + ably.channels.release = vi.fn(); + let fail: (...args: any[]) => void = () => { + throw new Error('fail not defined'); + }; + ably.channels.get(channel).on = vi.fn(async (name: string, callback) => { + if (name === 'failed') { + fail = callback; + } + }); + + const stream = new Stream({ ably, logger, channel }); + + await statePromise(stream, StreamState.READY); + expect(ably.channels.get(channel).subscribe).toHaveBeenCalledOnce(); + + fail({ reason: 'test' }); + await statePromise(stream, StreamState.DISPOSED); + expect(ably.channels.release).toHaveBeenCalledOnce(); + }); +}); diff --git a/src/StreamFactory.test.ts b/src/StreamFactory.test.ts index cd1dbb7d..086944ed 100644 --- a/src/StreamFactory.test.ts +++ b/src/StreamFactory.test.ts @@ -1,11 +1,9 @@ import { Realtime, Types } from 'ably/promises'; import pino from 'pino'; -import { Subject } from 'rxjs'; import { it, describe, expect, beforeEach, afterEach, vi } from 'vitest'; -import Stream, { StreamOptions, StreamState } from './Stream.js'; +import { type StreamOptions } from './Stream.js'; import StreamFactory from './StreamFactory.js'; -import { createMessage } from './utilities/test/messages.js'; vi.mock('ably/promises'); @@ -13,9 +11,6 @@ interface StreamTestContext extends StreamOptions { ablyChannel: Types.RealtimeChannelPromise; } -const streamStatePromise = (stream: Stream, state: StreamState) => - new Promise((resolve) => stream.whenState(state, stream.state, resolve)); - describe('Stream', () => { beforeEach((context) => { const ably = new Realtime({}); @@ -38,196 +33,6 @@ describe('Stream', () => { vi.restoreAllMocks(); }); - it('enters ready state when successfully attached to the channel', async ({ - ably, - logger, - ablyChannel, - }) => { - // the promise returned by the subscribe method resolves when we have successfully attached to the channel - let attach: (...args: any[]) => void = () => { - throw new Error('attach not defined'); - }; - const attachment = new Promise((resolve) => (attach = resolve)); - ablyChannel.subscribe = vi.fn().mockImplementation(async () => { - await attachment; - }); - - const stream = new Stream({ ably, logger, channel: 'foobar' }); - - await streamStatePromise(stream, StreamState.PREPARING); - attach(); - await streamStatePromise(stream, StreamState.READY); - expect(ablyChannel.subscribe).toHaveBeenCalledOnce(); - }); - - it('pauses and resumes the stream', async ({ ably, logger, ablyChannel }) => { - ablyChannel.subscribe = vi.fn(); - ablyChannel.detach = vi.fn(); - ablyChannel.attach = vi.fn(); - - const stream = new Stream({ ably, logger, channel: ablyChannel.name }); - - await streamStatePromise(stream, StreamState.READY); - expect(ablyChannel.subscribe).toHaveBeenCalledOnce(); - - stream.pause(); - await streamStatePromise(stream, StreamState.PAUSED); - expect(ablyChannel.detach).toHaveBeenCalledOnce(); - - stream.resume(); - await streamStatePromise(stream, StreamState.READY); - expect(ablyChannel.attach).toHaveBeenCalledOnce(); - }); - - it('disposes of the stream', async ({ ably, logger, ablyChannel }) => { - ably.channels.release = vi.fn(); - - const stream = new Stream({ ably, logger, channel: ablyChannel.name }); - - await streamStatePromise(stream, StreamState.READY); - expect(ablyChannel.subscribe).toHaveBeenCalledOnce(); - - stream.dispose(); - await streamStatePromise(stream, StreamState.DISPOSED); - expect(ably.channels.release).toHaveBeenCalledOnce(); - }); - - it('disposes of the stream on channel failed', async ({ ably, logger, ablyChannel }) => { - let fail: (...args: any[]) => void = () => { - throw new Error('fail not defined'); - }; - ablyChannel.on = vi.fn(async (name: string, callback) => { - if (name === 'failed') { - fail = callback; - } - }); - - ably.channels.release = vi.fn(); - - const stream = new Stream({ ably, logger, channel: ablyChannel.name }); - - await streamStatePromise(stream, StreamState.READY); - expect(ablyChannel.subscribe).toHaveBeenCalledOnce(); - - fail({ reason: 'test' }); - await streamStatePromise(stream, StreamState.DISPOSED); - expect(ably.channels.release).toHaveBeenCalledOnce(); - }); - - it('subscribes to messages', async ({ ably, logger, ablyChannel }) => { - let messages = new Subject(); - ablyChannel.subscribe = vi.fn((callback) => { - messages.subscribe((message) => callback(message)); - }); - - const stream = new Stream({ ably, logger, channel: ablyChannel.name }); - await streamStatePromise(stream, StreamState.READY); - - const subscriptionSpy = vi.fn(); - stream.subscribe(subscriptionSpy); - - for (let i = 0; i < 10; i++) { - messages.next(createMessage(i)); - } - - expect(subscriptionSpy).toHaveBeenCalledTimes(10); - for (let i = 0; i < 10; i++) { - expect(subscriptionSpy).toHaveBeenNthCalledWith(i + 1, null, createMessage(i)); - } - - expect(ablyChannel.subscribe).toHaveBeenCalledOnce(); - }); - - it('subscribes with multiple listeners', async ({ ably, logger, ablyChannel }) => { - let messages = new Subject(); - ablyChannel.subscribe = vi.fn((callback) => { - messages.subscribe((message) => callback(message)); - }); - - const stream = new Stream({ ably, logger, channel: ablyChannel.name }); - await streamStatePromise(stream, StreamState.READY); - - const subscriptionSpy1 = vi.fn(); - stream.subscribe(subscriptionSpy1); - - const subscriptionSpy2 = vi.fn(); - stream.subscribe(subscriptionSpy2); - - for (let i = 0; i < 10; i++) { - messages.next(createMessage(i)); - } - - expect(subscriptionSpy1).toHaveBeenCalledTimes(10); - expect(subscriptionSpy2).toHaveBeenCalledTimes(10); - for (let i = 0; i < 10; i++) { - expect(subscriptionSpy1).toHaveBeenNthCalledWith(i + 1, null, createMessage(i)); - expect(subscriptionSpy2).toHaveBeenNthCalledWith(i + 1, null, createMessage(i)); - } - - expect(ablyChannel.subscribe).toHaveBeenCalledOnce(); - }); - - it('unsubscribes to messages', async ({ ably, logger, ablyChannel }) => { - let messages = new Subject(); - ablyChannel.subscribe = vi.fn((callback) => { - messages.subscribe((message) => callback(message)); - }); - - const stream = new Stream({ ably, logger, channel: ablyChannel.name }); - await streamStatePromise(stream, StreamState.READY); - - const subscriptionSpy = vi.fn(); - stream.subscribe(subscriptionSpy); - - for (let i = 0; i < 10; i++) { - if (i == 5) { - stream.unsubscribe(subscriptionSpy); - } - messages.next(createMessage(i)); - } - - expect(subscriptionSpy).toHaveBeenCalledTimes(5); - for (let i = 0; i < 5; i++) { - expect(subscriptionSpy).toHaveBeenNthCalledWith(i + 1, null, createMessage(i)); - } - - expect(ablyChannel.subscribe).toHaveBeenCalledOnce(); - }); - - it('unsubscribes one of two listeners', async ({ ably, logger, ablyChannel }) => { - let messages = new Subject(); - ablyChannel.subscribe = vi.fn((callback) => { - messages.subscribe((message) => callback(message)); - }); - - const stream = new Stream({ ably, logger, channel: ablyChannel.name }); - await streamStatePromise(stream, StreamState.READY); - - const subscriptionSpy1 = vi.fn(); - stream.subscribe(subscriptionSpy1); - - const subscriptionSpy2 = vi.fn(); - stream.subscribe(subscriptionSpy2); - - for (let i = 0; i < 10; i++) { - if (i == 5) { - stream.unsubscribe(subscriptionSpy1); - } - messages.next(createMessage(i)); - } - - expect(subscriptionSpy1).toHaveBeenCalledTimes(5); - expect(subscriptionSpy2).toHaveBeenCalledTimes(10); - for (let i = 0; i < 10; i++) { - if (i < 5) { - expect(subscriptionSpy1).toHaveBeenNthCalledWith(i + 1, null, createMessage(i)); - } - expect(subscriptionSpy2).toHaveBeenNthCalledWith(i + 1, null, createMessage(i)); - } - - expect(ablyChannel.subscribe).toHaveBeenCalledOnce(); - }); - it('succeeds with gte zero event buffer ms', async ({ ably, logger }) => { new StreamFactory({ eventBufferOptions: { bufferMs: 0 }, ably, logger }); new StreamFactory({ eventBufferOptions: { bufferMs: 1 }, ably, logger }); diff --git a/src/utilities/test/promises.ts b/src/utilities/test/promises.ts index d6fdd9bd..98d33012 100644 --- a/src/utilities/test/promises.ts +++ b/src/utilities/test/promises.ts @@ -14,7 +14,7 @@ export const getEventPromises = (subject: Subject, n: number) => { interface StateListener { state: S; - whenState(targetState: S, currentState: S, listener: EventListener, ...listenerArgs: unknown[]); + whenState(targetState: S, currentState: S, listener: EventListener, ...listenerArgs: unknown[]): void; } export const statePromise = (object: StateListener, state: S) =>