diff --git a/src/Model.test.ts b/src/Model.test.ts index 3ca235da..27b02d5e 100644 --- a/src/Model.test.ts +++ b/src/Model.test.ts @@ -21,12 +21,12 @@ vi.mock('ably/promises'); // in these tests. vi.mock('./stream/StreamFactory', () => { class MockStream implements IStream { - constructor(readonly options: Pick) {} + constructor(readonly options: Pick) {} get state() { return StreamState.READY; } - get channel() { - return this.options.channel; + get channelName() { + return this.options.channelName; } async pause() {} async resume() {} @@ -40,11 +40,11 @@ vi.mock('./stream/StreamFactory', () => { return { default: class implements IStreamFactory { - newStream(options: Pick) { - if (!streams[options.channel]) { - streams[options.channel] = new MockStream(options); + newStream(options: Pick) { + if (!streams[options.channelName]) { + streams[options.channelName] = new MockStream(options); } - return streams[options.channel]; + return streams[options.channelName]; } }, }; @@ -157,7 +157,7 @@ describe('Model', () => { }); it('pauses and resumes the model', async ({ channelName, ably, logger, streams }) => { - const s1 = streams.newStream({ channel: channelName }); + const s1 = streams.newStream({ channelName }); s1.subscribe = vi.fn(); s1.pause = vi.fn(); s1.resume = vi.fn(); @@ -183,7 +183,7 @@ describe('Model', () => { }); it('disposes of the model', async ({ channelName, ably, logger, streams }) => { - const s1 = streams.newStream({ channel: channelName }); + const s1 = streams.newStream({ channelName }); s1.subscribe = vi.fn(); s1.unsubscribe = vi.fn(); const sync = vi.fn(async () => simpleTestData); @@ -209,7 +209,7 @@ describe('Model', () => { channelEvents: new Subject(), }; - streams.newStream({ channel: channelName }).subscribe = vi.fn((callback) => + streams.newStream({ channelName }).subscribe = vi.fn((callback) => events.channelEvents.subscribe((message) => callback(null, message)), ); @@ -290,7 +290,7 @@ describe('Model', () => { }); it('executes a registered mutation', async ({ channelName, ably, logger, streams }) => { - const s1 = streams.newStream({ channel: channelName }); + const s1 = streams.newStream({ channelName }); s1.subscribe = vi.fn(); const model = new Model Promise }>('test', { ably, @@ -310,7 +310,7 @@ describe('Model', () => { }); it('fails to register twice', async ({ channelName, ably, logger, streams }) => { - const s1 = streams.newStream({ channel: channelName }); + const s1 = streams.newStream({ channelName }); s1.subscribe = vi.fn(); const model = new Model Promise }>('test', { ably, channelName, logger }); @@ -339,7 +339,7 @@ describe('Model', () => { } } - const s1 = streams.newStream({ channel: channelName }); + const s1 = streams.newStream({ channelName }); s1.subscribe = vi.fn(); const model = new ModelWithSetState Promise }>('test', { ably, channelName, logger }); @@ -359,7 +359,7 @@ describe('Model', () => { logger, streams, }) => { - const s1 = streams.newStream({ channel: channelName }); + const s1 = streams.newStream({ channelName }); s1.subscribe = vi.fn(); const model = new Model Promise }>('test', { ably, channelName, logger }); @@ -374,7 +374,7 @@ describe('Model', () => { }); it('updates model state with optimistic event', async ({ channelName, ably, logger, streams }) => { - const s1 = streams.newStream({ channel: channelName }); + const s1 = streams.newStream({ channelName }); s1.subscribe = vi.fn(); const model = new Model Promise }>('test', { ably, channelName, logger }); @@ -416,7 +416,7 @@ describe('Model', () => { }); it('confirms an optimistic event', async ({ channelName, ably, logger, streams }) => { - const s1 = streams.newStream({ channel: channelName }); + const s1 = streams.newStream({ channelName }); s1.subscribe = vi.fn(); const events = { e1: new Subject() }; @@ -470,7 +470,7 @@ describe('Model', () => { }); it('confirms an optimistic event by uuid', async ({ channelName, ably, logger, streams }) => { - const s1 = streams.newStream({ channel: channelName }); + const s1 = streams.newStream({ channelName }); s1.subscribe = vi.fn(); const events = { e1: new Subject() }; @@ -529,7 +529,7 @@ describe('Model', () => { }); it('mutation can access the optimistic events', async ({ channelName, ably, logger, streams }) => { - const s1 = streams.newStream({ channel: channelName }); + const s1 = streams.newStream({ channelName }); s1.subscribe = vi.fn(); const events = { e1: new Subject() }; @@ -568,7 +568,7 @@ describe('Model', () => { }); it('explicitly rejects an optimistic event', async ({ channelName, ably, logger, streams }) => { - const s1 = streams.newStream({ channel: channelName }); + const s1 = streams.newStream({ channelName }); s1.subscribe = vi.fn(); const events = { e1: new Subject() }; @@ -633,7 +633,7 @@ describe('Model', () => { logger, streams, }) => { - const s1 = streams.newStream({ channel: channelName }); + const s1 = streams.newStream({ channelName }); s1.subscribe = vi.fn(); const events = { e1: new Subject() }; @@ -696,7 +696,7 @@ describe('Model', () => { }); it('confirms optimistic events out of order', async ({ channelName, ably, logger, streams }) => { - const s1 = streams.newStream({ channel: channelName }); + const s1 = streams.newStream({ channelName }); s1.subscribe = vi.fn(); const events = { e1: new Subject() }; @@ -779,7 +779,7 @@ describe('Model', () => { logger, streams, }) => { - const s1 = streams.newStream({ channel: channelName }); + const s1 = streams.newStream({ channelName }); s1.subscribe = vi.fn(); const events = { e1: new Subject() }; @@ -865,7 +865,7 @@ describe('Model', () => { }); it('revert optimistic events if mutate fails', async ({ channelName, ably, logger, streams }) => { - const s1 = streams.newStream({ channel: channelName }); + const s1 = streams.newStream({ channelName }); s1.subscribe = vi.fn(); const model = new Model< @@ -910,7 +910,7 @@ describe('Model', () => { // If applying a received stream update throws, the model reverts to the PREPARING state and re-syncs. it('resync if stream apply update fails', async ({ channelName, ably, logger, streams }) => { - const s1 = streams.newStream({ channel: channelName }); + const s1 = streams.newStream({ channelName }); s1.subscribe = vi.fn(); // event subjects used to invoke the stream subscription callbacks @@ -978,7 +978,7 @@ describe('Model', () => { logger, streams, }) => { - const s1 = streams.newStream({ channel: channelName }); + const s1 = streams.newStream({ channelName }); s1.subscribe = vi.fn(); const model = new Model Promise }>('test', { ably, channelName, logger }); @@ -1018,7 +1018,7 @@ describe('Model', () => { // Tests if the mutation throws, the the optimistic events are reverted. it('revert optimistic events if mutation fails', async ({ channelName, ably, logger, streams }) => { - const s1 = streams.newStream({ channel: 's1' }); + const s1 = streams.newStream({ channelName: 's1' }); s1.subscribe = vi.fn(); const model = new Model Promise; mutation2: () => Promise }>('test', { @@ -1065,7 +1065,7 @@ describe('Model', () => { logger, streams, }) => { - const s1 = streams.newStream({ channel: channelName }); + const s1 = streams.newStream({ channelName }); s1.subscribe = vi.fn(); const model = new Model Promise }>('test', { ably, channelName, logger }); @@ -1103,7 +1103,7 @@ describe('Model', () => { logger, streams, }) => { - const s1 = streams.newStream({ channel: channelName }); + const s1 = streams.newStream({ channelName }); s1.subscribe = vi.fn(); const events = { e1: new Subject() }; @@ -1144,7 +1144,7 @@ describe('Model', () => { }); it('optimistic event confirmation timeout', async ({ channelName, ably, logger, streams }) => { - const s1 = streams.newStream({ channel: 's1' }); + const s1 = streams.newStream({ channelName: 's1' }); s1.subscribe = vi.fn(); const events = { e1: new Subject() }; diff --git a/src/Model.ts b/src/Model.ts index 461c170e..14f72a3a 100644 --- a/src/Model.ts +++ b/src/Model.ts @@ -78,7 +78,7 @@ export default class Model extends EventEmitter( { @@ -264,7 +264,7 @@ export default class Model extends EventEmitter extends EventEmitter { context.ably = ably; context.logger = pino({ level: 'silent' }); - context.channel = channelName; + context.channelName = channelName; }); afterEach(() => { @@ -44,7 +44,7 @@ describe('Stream', () => { it('enters ready state when successfully attached to the channel', async ({ ably, logger, - channel, + channelName: channel, }) => { // the promise returned by the subscribe method resolves when we have successfully attached to the channel let attach: (...args: any[]) => void = () => { @@ -55,7 +55,7 @@ describe('Stream', () => { await attachment; }); - const stream = new Stream({ ably, logger, channel: 'foobar' }); + const stream = new Stream({ ably, logger, channelName: 'foobar' }); await statePromise(stream, StreamState.PREPARING); attach(); @@ -63,13 +63,13 @@ describe('Stream', () => { expect(ably.channels.get(channel).subscribe).toHaveBeenCalledOnce(); }); - it('subscribes to messages', async ({ ably, logger, channel }) => { + it('subscribes to messages', async ({ ably, logger, channelName: channel }) => { let messages = new Subject(); ably.channels.get(channel).subscribe = vi.fn((callback) => { messages.subscribe((message) => callback(message)); }); - const stream = new Stream({ ably, logger, channel }); + const stream = new Stream({ ably, logger, channelName: channel }); await statePromise(stream, StreamState.READY); const subscriptionSpy = vi.fn(); @@ -87,13 +87,13 @@ describe('Stream', () => { expect(ably.channels.get(channel).subscribe).toHaveBeenCalledOnce(); }); - it('subscribes with multiple listeners', async ({ ably, logger, channel }) => { + it('subscribes with multiple listeners', async ({ ably, logger, channelName: channel }) => { let messages = new Subject(); ably.channels.get(channel).subscribe = vi.fn((callback) => { messages.subscribe((message) => callback(message)); }); - const stream = new Stream({ ably, logger, channel }); + const stream = new Stream({ ably, logger, channelName: channel }); await statePromise(stream, StreamState.READY); const subscriptionSpy1 = vi.fn(); @@ -116,13 +116,13 @@ describe('Stream', () => { expect(ably.channels.get(channel).subscribe).toHaveBeenCalledOnce(); }); - it('unsubscribes to messages', async ({ ably, logger, channel }) => { + it('unsubscribes to messages', async ({ ably, logger, channelName: channel }) => { let messages = new Subject(); ably.channels.get(channel).subscribe = vi.fn((callback) => { messages.subscribe((message) => callback(message)); }); - const stream = new Stream({ ably, logger, channel }); + const stream = new Stream({ ably, logger, channelName: channel }); await statePromise(stream, StreamState.READY); const subscriptionSpy = vi.fn(); @@ -143,13 +143,13 @@ describe('Stream', () => { expect(ably.channels.get(channel).subscribe).toHaveBeenCalledOnce(); }); - it('unsubscribes one of two listeners', async ({ ably, logger, channel }) => { + it('unsubscribes one of two listeners', async ({ ably, logger, channelName: channel }) => { let messages = new Subject(); ably.channels.get(channel).subscribe = vi.fn((callback) => { messages.subscribe((message) => callback(message)); }); - const stream = new Stream({ ably, logger, channel }); + const stream = new Stream({ ably, logger, channelName: channel }); await statePromise(stream, StreamState.READY); const subscriptionSpy1 = vi.fn(); @@ -177,8 +177,8 @@ describe('Stream', () => { expect(ably.channels.get(channel).subscribe).toHaveBeenCalledOnce(); }); - it('pauses and resumes the stream', async ({ ably, logger, channel }) => { - const stream = new Stream({ ably, logger, channel }); + it('pauses and resumes the stream', async ({ ably, logger, channelName: channel }) => { + const stream = new Stream({ ably, logger, channelName: channel }); await statePromise(stream, StreamState.READY); expect(ably.channels.get(channel).subscribe).toHaveBeenCalledOnce(); @@ -192,9 +192,9 @@ describe('Stream', () => { expect(ably.channels.get(channel).attach).toHaveBeenCalledOnce(); }); - it('disposes of the stream', async ({ ably, logger, channel }) => { + it('disposes of the stream', async ({ ably, logger, channelName: channel }) => { ably.channels.release = vi.fn(); - const stream = new Stream({ ably, logger, channel }); + const stream = new Stream({ ably, logger, channelName: channel }); await statePromise(stream, StreamState.READY); expect(ably.channels.get(channel).subscribe).toHaveBeenCalledOnce(); @@ -204,7 +204,7 @@ describe('Stream', () => { expect(ably.channels.release).toHaveBeenCalledOnce(); }); - it('disposes of the stream on channel failed', async ({ ably, logger, channel }) => { + it('disposes of the stream on channel failed', async ({ ably, logger, channelName: channel }) => { ably.channels.release = vi.fn(); let fail: (...args: any[]) => void = () => { throw new Error('fail not defined'); @@ -215,7 +215,7 @@ describe('Stream', () => { } }); - const stream = new Stream({ ably, logger, channel }); + const stream = new Stream({ ably, logger, channelName: channel }); await statePromise(stream, StreamState.READY); expect(ably.channels.get(channel).subscribe).toHaveBeenCalledOnce(); diff --git a/src/stream/Stream.ts b/src/stream/Stream.ts index 621e5c9a..5b3cf986 100644 --- a/src/stream/Stream.ts +++ b/src/stream/Stream.ts @@ -40,7 +40,7 @@ export enum StreamState { * Options used to configure a stream instance. */ export type StreamOptions = { - channel: string; + channelName: string; ably: AblyTypes.RealtimePromise; logger: Logger; eventBufferOptions?: EventBufferOptions; @@ -73,7 +73,7 @@ export type StreamStateChange = { export interface IStream { get state(): StreamState; - get channel(): string; + get channelName(): string; pause(): Promise; resume(): Promise; reset(): void; @@ -100,8 +100,8 @@ export default class Stream extends EventEmitter this.subscriptions.next(message), @@ -114,8 +114,8 @@ export default class Stream extends EventEmitter): IStream; + newStream(options: Pick): IStream; } /** @@ -26,7 +26,7 @@ export default class StreamFactory implements IStreamFactory { * @param {Pick} options - The options used in conjunction with the default options when instantiating a stream * @returns {IStream} The newly created stream instance. */ - newStream(options: Pick) { + newStream(options: Pick) { return new Stream(Object.assign(this.options, options)); } }