diff --git a/src/Errors.ts b/src/Errors.ts index 818b9fa4..27c16236 100644 --- a/src/Errors.ts +++ b/src/Errors.ts @@ -1,7 +1,5 @@ import toString from 'lodash/toString'; -import type { UpdateTargets } from './UpdatesRegistry.js'; - export function toError(err: any) { return err instanceof Error || err instanceof AggregateError ? err : new Error(toString(err)); } @@ -19,25 +17,3 @@ export class RegistrationError extends Error { this.name = 'RegistrationError'; } } - -/** - * Represents an error that occurs when a given update function specified by - * the target is not registered on the model. - * - * @internal - * @extends {RegistrationError} - */ -export class UpdateRegistrationError extends RegistrationError { - constructor(targets: Partial) { - let message = 'update not registered'; - if (targets.channel && !targets.event) { - message = `update for channel '${targets.channel}' not registered`; - } else if (!targets.channel && targets.event) { - message = `update for event '${targets.event}' not registered`; - } else if (targets.channel && targets.event) { - message = `update for event '${targets.event}' on channel '${targets.channel}' not registered`; - } - super(message); - this.name = 'UpdateRegistrationError'; - } -} diff --git a/src/Model.discontinuity.test.ts b/src/Model.discontinuity.test.ts index b6c7e8ae..7894391f 100644 --- a/src/Model.discontinuity.test.ts +++ b/src/Model.discontinuity.test.ts @@ -36,8 +36,9 @@ describe('Model', () => { const logger = pino({ level: 'silent' }); context.ably = ably; context.logger = logger; + context.channelName = 'models:myModel:events'; }); - it('handles discontinuity with resync', async ({ ably, logger }) => { + it('handles discontinuity with resync', async ({ channelName, ably, logger }) => { const channel = ably.channels.get('foo'); let suspendChannel: (...args: any[]) => void = () => { throw new Error('suspended not defined'); @@ -58,11 +59,11 @@ describe('Model', () => { let counter = 0; const sync = vi.fn(async () => `${counter++}`); - const model = new Model('test', { ably, logger }); - const update1 = vi.fn(async (state, event) => { + const model = new Model('test', { ably, channelName, logger }); + const mergeFn = vi.fn(async (state, event) => { return event.data; }); - await model.$register({ $sync: sync, $update: { s1: { testEvent: update1 } } }); + await model.$register({ $sync: sync, $merge: mergeFn }); expect(sync).toHaveBeenCalledOnce(); diff --git a/src/Model.test.ts b/src/Model.test.ts index 8dc5c5e0..24b1d580 100644 --- a/src/Model.test.ts +++ b/src/Model.test.ts @@ -6,20 +6,20 @@ import { it, describe, expect, afterEach, vi, beforeEach } from 'vitest'; import Model from './Model.js'; import { StreamOptions, IStream, StreamState } from './Stream.js'; -import { IStreamRegistry } from './StreamRegistry.js'; +import { IStreamFactory } from './StreamFactory.js'; import type { ModelState, ModelStateChange, ModelOptions, Event } from './types/model.d.ts'; import type { MutationMethods, EventComparator, MutationContext } from './types/mutations.d.ts'; import { createMessage, customMessage } from './utilities/test/messages.js'; vi.mock('ably/promises'); -// Mocks the StreamRegistry import so that we can modify the Stream instances +// Mocks the StreamFactory import so that we can modify the Stream instances // used by the model to spy on their methods. -// This implementation ensures that all instances of StreamRegistry use the -// same cache of Stream instances so that the StreamRegistry instantiated in the -// model returns the same Stream instances as the StreamRegistry instantiated +// This implementation ensures that all instances of StreamFactory use the +// same cache of Stream instances so that the StreamFactory instantiated in the +// model returns the same Stream instances as the StreamFactory instantiated // in these tests. -vi.mock('./StreamRegistry', () => { +vi.mock('./StreamFactory', () => { class MockStream implements IStream { constructor(readonly options: Pick) {} get state() { @@ -35,19 +35,17 @@ vi.mock('./StreamRegistry', () => { async dispose() {} async reset() {} } + const streams: { [key: string]: IStream } = {}; return { - default: class implements IStreamRegistry { - getOrCreate(options: Pick) { + default: class implements IStreamFactory { + newStream(options: Pick) { if (!streams[options.channel]) { streams[options.channel] = new MockStream(options); } return streams[options.channel]; } - get streams() { - return streams; - } }, }; }); @@ -67,7 +65,8 @@ const simpleTestData: TestData = { }; interface ModelTestContext extends ModelOptions { - streams: IStreamRegistry; + streams: IStreamFactory; + channelName: string; } const modelStatePromise = (model: Model, state: ModelState) => @@ -89,15 +88,16 @@ describe('Model', () => { const logger = pino({ level: 'silent' }); context.ably = ably; context.logger = logger; - const { default: provider } = await import('./StreamRegistry.js'); + const { default: provider } = await import('./StreamFactory.js'); context.streams = new provider({ ably, logger }); + context.channelName = 'models:myModelTest:events'; }); afterEach(() => { vi.restoreAllMocks(); }); - it('enters ready state after sync', async ({ ably, logger }) => { + it('enters ready state after sync', async ({ channelName, ably, logger }) => { // the promise returned by the subscribe method resolves when we have successfully attached to the channel let completeSync: (...args: any[]) => void = () => { throw new Error('completeSync not defined'); @@ -109,6 +109,7 @@ describe('Model', () => { }); const model = new Model Promise }>('test', { ably, + channelName, logger, }); const ready = model.$register({ $sync: sync }); @@ -121,99 +122,70 @@ describe('Model', () => { expect(model.confirmed).toEqual(simpleTestData); }); - it('pauses and resumes the model', async ({ ably, logger, streams }) => { - const s1 = streams.getOrCreate({ channel: 's1' }); - const s2 = streams.getOrCreate({ channel: 's2' }); + it('pauses and resumes the model', async ({ channelName, ably, logger, streams }) => { + const s1 = streams.newStream({ channel: channelName }); s1.subscribe = vi.fn(); - s2.subscribe = vi.fn(); s1.pause = vi.fn(); - s2.pause = vi.fn(); s1.resume = vi.fn(); - s2.resume = vi.fn(); const sync = vi.fn(async () => simpleTestData); - const model = new Model('test', { ably, logger }); + const model = new Model('test', { ably, channelName, logger }); // register update function so that streams get created await model.$register({ $sync: sync, - $update: { - s1: { event: async (state) => state }, - s2: { event: async (state) => state }, - }, + $merge: async (state) => state, }); expect(s1.subscribe).toHaveBeenCalledOnce(); - expect(s2.subscribe).toHaveBeenCalledOnce(); await model.$pause(); expect(model.state).toBe('paused'); expect(s1.pause).toHaveBeenCalledOnce(); - expect(s2.pause).toHaveBeenCalledOnce(); await model.$resume(); expect(model.state).toBe('ready'); expect(s1.resume).toHaveBeenCalledOnce(); - expect(s2.resume).toHaveBeenCalledOnce(); }); - it('disposes of the model', async ({ ably, logger, streams }) => { - const s1 = streams.getOrCreate({ channel: 's1' }); - const s2 = streams.getOrCreate({ channel: 's2' }); + it('disposes of the model', async ({ channelName, ably, logger, streams }) => { + const s1 = streams.newStream({ channel: channelName }); s1.subscribe = vi.fn(); - s2.subscribe = vi.fn(); s1.unsubscribe = vi.fn(); - s2.unsubscribe = vi.fn(); const sync = vi.fn(async () => simpleTestData); - const model = new Model('test', { ably, logger }); + const model = new Model('test', { ably, channelName, logger }); // register update function so that streams get created await model.$register({ $sync: sync, - $update: { - s1: { event: async (state) => state }, - s2: { event: async (state) => state }, - }, + $merge: async (state) => state, }); expect(sync).toHaveBeenCalledOnce(); expect(s1.subscribe).toHaveBeenCalledOnce(); - expect(s2.subscribe).toHaveBeenCalledOnce(); await model.$dispose(); expect(model.state).toBe('disposed'); expect(s1.unsubscribe).toHaveBeenCalledOnce(); - expect(s2.unsubscribe).toHaveBeenCalledOnce(); }); - it('subscribes to updates', async ({ ably, logger, streams }) => { - // event subjects used to invoke the stream subscription callbacks - // registered by the model, to simulate stream data + it('subscribes to updates', async ({ channelName, ably, logger, streams }) => { const events = { - e1: new Subject(), - e2: new Subject(), + channelEvents: new Subject(), }; - streams.getOrCreate({ channel: 's1' }).subscribe = vi.fn((callback) => - events.e1.subscribe((message) => callback(null, message)), - ); - streams.getOrCreate({ channel: 's2' }).subscribe = vi.fn((callback) => - events.e2.subscribe((message) => callback(null, message)), + streams.newStream({ channel: channelName }).subscribe = vi.fn((callback) => + events.channelEvents.subscribe((message) => callback(null, message)), ); const sync = vi.fn(async () => 'data_0'); // defines initial version of model - const model = new Model('test', { ably, logger }); + const model = new Model('test', { ably, channelName, logger }); - const update1 = vi.fn(async (state, event) => event.data); - const update2 = vi.fn(async (state, event) => event.data); - const update3 = vi.fn(async (state, event) => event.data); + const mergeFn = vi.fn(async (state, event) => event.data); await model.$register({ $sync: sync, - $update: { - s1: { name_1: update1, name_3: update3 }, - s2: { name_2: update2, name_3: update3 }, - }, + $merge: mergeFn, }); expect(sync).toHaveBeenCalledOnce(); @@ -229,35 +201,27 @@ describe('Model', () => { expect(subscriptionSpy).toHaveBeenCalledTimes(1); expect(subscriptionSpy).toHaveBeenNthCalledWith(1, null, 'data_0'); - events.e1.next(createMessage(1)); + events.channelEvents.next(createMessage(1)); await subscriptionCalls[1]; - expect(update1).toHaveBeenCalledTimes(1); - expect(update2).toHaveBeenCalledTimes(0); - expect(update3).toHaveBeenCalledTimes(0); + expect(mergeFn).toHaveBeenCalledTimes(1); expect(subscriptionSpy).toHaveBeenCalledTimes(2); expect(subscriptionSpy).toHaveBeenNthCalledWith(2, null, 'data_1'); - events.e2.next(createMessage(2)); + events.channelEvents.next(createMessage(2)); await subscriptionCalls[2]; - expect(update1).toHaveBeenCalledTimes(1); - expect(update2).toHaveBeenCalledTimes(1); - expect(update3).toHaveBeenCalledTimes(0); + expect(mergeFn).toHaveBeenCalledTimes(2); expect(subscriptionSpy).toHaveBeenCalledTimes(3); expect(subscriptionSpy).toHaveBeenNthCalledWith(3, null, 'data_2'); - events.e1.next(createMessage(3)); + events.channelEvents.next(createMessage(3)); await subscriptionCalls[3]; - expect(update1).toHaveBeenCalledTimes(1); - expect(update2).toHaveBeenCalledTimes(1); - expect(update3).toHaveBeenCalledTimes(1); + expect(mergeFn).toHaveBeenCalledTimes(3); expect(subscriptionSpy).toHaveBeenCalledTimes(4); expect(subscriptionSpy).toHaveBeenNthCalledWith(4, null, 'data_3'); - events.e2.next(createMessage(3)); + events.channelEvents.next(createMessage(3)); await subscriptionCalls[4]; - expect(update1).toHaveBeenCalledTimes(1); - expect(update2).toHaveBeenCalledTimes(1); - expect(update3).toHaveBeenCalledTimes(2); + expect(mergeFn).toHaveBeenCalledTimes(4); expect(subscriptionSpy).toHaveBeenCalledTimes(5); expect(subscriptionSpy).toHaveBeenNthCalledWith(5, null, 'data_3'); @@ -265,9 +229,9 @@ describe('Model', () => { expect(model.confirmed).toEqual('data_3'); }); - it('subscribes after initialisation', async ({ ably, logger }) => { + it('subscribes after initialisation', async ({ channelName, ably, logger }) => { const sync = vi.fn(async () => 'data_0'); // defines initial version of model - const model = new Model('test', { ably, logger }); + const model = new Model('test', { ably, channelName, logger }); await model.$register({ $sync: sync }); @@ -291,13 +255,12 @@ describe('Model', () => { expect(model.confirmed).toEqual('data_0'); }); - it('executes a registered mutation', async ({ ably, logger, streams }) => { - const s1 = streams.getOrCreate({ channel: 's1' }); - const s2 = streams.getOrCreate({ channel: 's2' }); + it('executes a registered mutation', async ({ channelName, ably, logger, streams }) => { + const s1 = streams.newStream({ channel: channelName }); s1.subscribe = vi.fn(); - s2.subscribe = vi.fn(); const model = new Model Promise }>('test', { ably, + channelName, logger, }); @@ -312,12 +275,10 @@ describe('Model', () => { expect(mutation).toHaveBeenCalledWith({ events: [] }, 'bar', 123); }); - it('fails to register twice', async ({ ably, logger, streams }) => { - const s1 = streams.getOrCreate({ channel: 's1' }); - const s2 = streams.getOrCreate({ channel: 's2' }); + it('fails to register twice', async ({ channelName, ably, logger, streams }) => { + const s1 = streams.newStream({ channel: channelName }); s1.subscribe = vi.fn(); - s2.subscribe = vi.fn(); - const model = new Model Promise }>('test', { ably, logger }); + const model = new Model Promise }>('test', { ably, channelName, logger }); const mutation = vi.fn(); const sync = async () => 'foobar'; @@ -333,7 +294,7 @@ describe('Model', () => { ).toThrow('$register was already called'); }); - it('fails to register after initialization', async ({ ably, logger, streams }) => { + it('fails to register after initialization', async ({ channelName, ably, logger, streams }) => { // extend the Model class to get access to protected member setState class ModelWithSetState extends Model { constructor(readonly name: string, options: ModelOptions) { @@ -344,11 +305,9 @@ describe('Model', () => { } } - const s1 = streams.getOrCreate({ channel: 's1' }); - const s2 = streams.getOrCreate({ channel: 's2' }); + const s1 = streams.newStream({ channel: channelName }); s1.subscribe = vi.fn(); - s2.subscribe = vi.fn(); - const model = new ModelWithSetState Promise }>('test', { ably, logger }); + const model = new ModelWithSetState Promise }>('test', { ably, channelName, logger }); const mutation = vi.fn(); const sync = async () => 'foobar'; @@ -360,12 +319,15 @@ describe('Model', () => { ); }); - it('fails to execute mutation with unregistered stream', async ({ ably, logger, streams }) => { - const s1 = streams.getOrCreate({ channel: 's1' }); - const s2 = streams.getOrCreate({ channel: 's2' }); + it('fails to execute mutation with unregistered stream', async ({ + channelName, + ably, + logger, + streams, + }) => { + const s1 = streams.newStream({ channel: channelName }); s1.subscribe = vi.fn(); - s2.subscribe = vi.fn(); - const model = new Model Promise }>('test', { ably, logger }); + const model = new Model Promise }>('test', { ably, channelName, logger }); const mutation = vi.fn(async () => 'test'); await model.$register({ @@ -377,18 +339,16 @@ describe('Model', () => { ); }); - it('updates model state with optimistic event', async ({ ably, logger, streams }) => { - const s1 = streams.getOrCreate({ channel: 's1' }); - const s2 = streams.getOrCreate({ channel: 's2' }); + it('updates model state with optimistic event', async ({ channelName, ably, logger, streams }) => { + const s1 = streams.newStream({ channel: channelName }); s1.subscribe = vi.fn(); - s2.subscribe = vi.fn(); - const model = new Model Promise }>('test', { ably, logger }); + const model = new Model Promise }>('test', { ably, channelName, logger }); - const update1 = vi.fn(async (state, event) => event.data); + const mergeFn = vi.fn(async (state, event) => event.data); const mutation = vi.fn(async () => 'test'); await model.$register({ $sync: async () => 'data_0', - $update: { s1: { testEvent: update1 } }, + $merge: mergeFn, $mutate: { foo: mutation }, }); @@ -411,7 +371,7 @@ describe('Model', () => { expect(model.optimistic).toEqual('data_0'); expect(model.confirmed).toEqual('data_0'); - await model.mutations.foo.$expect({ events: [{ channel: 's1', name: 'testEvent', data: 'data_1' }] })(); + await model.mutations.foo.$expect({ events: [{ channel: channelName, name: 'testEvent', data: 'data_1' }] })(); await optimisticSubscriptionCalls[1]; expect(model.optimistic).toEqual('data_1'); @@ -421,24 +381,22 @@ describe('Model', () => { expect(confirmedSubscriptionSpy).toHaveBeenCalledTimes(1); }); - it('confirms an optimistic event', async ({ ably, logger, streams }) => { - const s1 = streams.getOrCreate({ channel: 's1' }); - const s2 = streams.getOrCreate({ channel: 's2' }); + it('confirms an optimistic event', async ({ channelName, ably, logger, streams }) => { + const s1 = streams.newStream({ channel: channelName }); s1.subscribe = vi.fn(); - s2.subscribe = vi.fn(); const events = { e1: new Subject() }; s1.subscribe = vi.fn((callback) => { events.e1.subscribe((message) => callback(null, message)); }); - const model = new Model Promise }>('test', { ably, logger }); + const model = new Model Promise }>('test', { ably, channelName, logger }); - const update1 = vi.fn(async (state, event) => event.data); + const mergeFn = vi.fn(async (state, event) => event.data); const mutation = vi.fn(async () => 'test'); await model.$register({ $sync: async () => 'data_0', - $update: { s1: { testEvent: update1 } }, + $merge: mergeFn, $mutate: { foo: mutation }, }); @@ -461,7 +419,7 @@ describe('Model', () => { expect(model.optimistic).toEqual('data_0'); expect(model.confirmed).toEqual('data_0'); - await model.mutations.foo.$expect({ events: [{ channel: 's1', name: 'testEvent', data: 'data_1' }] })(); + await model.mutations.foo.$expect({ events: [{ channel: channelName, name: 'testEvent', data: 'data_1' }] })(); await optimisticSubscriptionCalls[1]; expect(model.optimistic).toEqual('data_1'); @@ -477,24 +435,22 @@ describe('Model', () => { expect(model.confirmed).toEqual('data_1'); }); - it('confirms an optimistic event by uuid', async ({ ably, logger, streams }) => { - const s1 = streams.getOrCreate({ channel: 's1' }); - const s2 = streams.getOrCreate({ channel: 's2' }); + it('confirms an optimistic event by uuid', async ({ channelName, ably, logger, streams }) => { + const s1 = streams.newStream({ channel: channelName }); s1.subscribe = vi.fn(); - s2.subscribe = vi.fn(); const events = { e1: new Subject() }; s1.subscribe = vi.fn((callback) => { events.e1.subscribe((message) => callback(null, message)); }); - const model = new Model Promise }>('test', { ably, logger }); + const model = new Model Promise }>('test', { ably, channelName, logger }); - const update1 = vi.fn(async (state, event) => event.data); + const mergeFn = vi.fn(async (state, event) => event.data); const mutation = vi.fn(async () => 'test'); await model.$register({ $sync: async () => 'data_0', - $update: { s1: { testEvent: update1 } }, + $merge: mergeFn, $mutate: { foo: mutation }, }); @@ -518,7 +474,7 @@ describe('Model', () => { expect(model.confirmed).toEqual('data_0'); await model.mutations.foo.$expect({ - events: [{ uuid: 'some-custom-id', channel: 's1', name: 'testEvent', data: 'data_1' }], + events: [{ uuid: 'some-custom-id', channel: channelName, name: 'testEvent', data: 'data_1' }], })(); await optimisticSubscriptionCalls[1]; @@ -538,11 +494,9 @@ describe('Model', () => { expect(model.confirmed).toEqual('confirmed_data'); }); - it('mutation can access the optimistic events', async ({ ably, logger, streams }) => { - const s1 = streams.getOrCreate({ channel: 's1' }); - const s2 = streams.getOrCreate({ channel: 's2' }); + it('mutation can access the optimistic events', async ({ channelName, ably, logger, streams }) => { + const s1 = streams.newStream({ channel: channelName }); s1.subscribe = vi.fn(); - s2.subscribe = vi.fn(); const events = { e1: new Subject() }; s1.subscribe = vi.fn((callback) => { @@ -552,9 +506,9 @@ describe('Model', () => { const model = new Model< string, { foo: (context: MutationContext, arg: string) => Promise<{ context?: string; arg: string }> } - >('test', { ably, logger }); + >('test', { ably, channelName, logger }); - const update1 = vi.fn(async (state, event) => event.data); + const mergeFn = vi.fn(async (state, event) => event.data); const mutation = vi.fn(async function (context: MutationContext, arg: string) { if (!context.events.length) { return { arg }; @@ -566,7 +520,7 @@ describe('Model', () => { }); await model.$register({ $sync: async () => 'data_0', - $update: { s1: { context: update1 } }, + $merge: mergeFn, $mutate: { foo: mutation }, }); @@ -574,29 +528,27 @@ describe('Model', () => { expect(result1).toEqual({ arg: 'arg' }); const [result2] = await model.mutations.foo.$expect({ - events: [{ channel: 's1', name: 'context', data: 'data_1' }], + events: [{ channel: channelName, name: 'context', data: 'data_1' }], })('arg'); expect(result2).toEqual({ context: 'context', arg: 'arg' }); }); - it('explicitly rejects an optimistic event', async ({ ably, logger, streams }) => { - const s1 = streams.getOrCreate({ channel: 's1' }); - const s2 = streams.getOrCreate({ channel: 's2' }); + it('explicitly rejects an optimistic event', async ({ channelName, ably, logger, streams }) => { + const s1 = streams.newStream({ channel: channelName }); s1.subscribe = vi.fn(); - s2.subscribe = vi.fn(); const events = { e1: new Subject() }; s1.subscribe = vi.fn((callback) => { events.e1.subscribe((message) => callback(null, message)); }); - const model = new Model Promise }>('test', { ably, logger }); + const model = new Model Promise }>('test', { ably, channelName, logger }); - const update1 = vi.fn(async (state, event) => event.data); + const mergeFn = vi.fn(async (state, event) => event.data); const mutation = vi.fn(async () => 'test'); await model.$register({ $sync: async () => 'data_0', - $update: { s1: { testEvent: update1 } }, + $merge: mergeFn, $mutate: { foo: mutation }, }); @@ -620,7 +572,7 @@ describe('Model', () => { expect(model.confirmed).toEqual('data_0'); const [result, confirmation] = await model.mutations.foo.$expect({ - events: [{ channel: 's1', name: 'testEvent', data: 'data_1' }], + events: [{ channel: channelName, name: 'testEvent', data: 'data_1' }], })(); expect(result).toEqual('test'); @@ -632,7 +584,7 @@ describe('Model', () => { expect(confirmedSubscriptionSpy).toHaveBeenCalledTimes(1); events.e1.next(customMessage('id_1', 'testEvent', 'data_1', { 'x-ably-models-reject': 'true' })); - await expect(confirmation).rejects.toThrow('events contain rejections: channel:s1 name:testEvent'); + await expect(confirmation).rejects.toThrow(`events contain rejections: channel:${channelName} name:testEvent`); await optimisticSubscriptionCalls[2]; expect(confirmedSubscriptionSpy).toHaveBeenCalledTimes(1); expect(optimisticSubscriptionSpy).toHaveBeenCalledTimes(3); @@ -641,8 +593,13 @@ describe('Model', () => { expect(model.confirmed).toEqual('data_0'); }); - it('confirms an optimistic event with a custom comparator', async ({ ably, logger, streams }) => { - const s1 = streams.getOrCreate({ channel: 's1' }); + it('confirms an optimistic event with a custom comparator', async ({ + channelName, + ably, + logger, + streams, + }) => { + const s1 = streams.newStream({ channel: channelName }); s1.subscribe = vi.fn(); const events = { e1: new Subject() }; @@ -650,15 +607,15 @@ describe('Model', () => { events.e1.subscribe((message) => callback(null, message)); }); - const model = new Model Promise }>('test', { ably, logger }); + const model = new Model Promise }>('test', { ably, channelName, logger }); const nameOnlyComparator: EventComparator = (optimistic: Event, confirmed: Event) => optimistic.name === confirmed.name; - const update1 = vi.fn(async (state, event) => event.data); + const mergeFn = vi.fn(async (state, event) => event.data); const mutation = vi.fn(async () => 'test'); await model.$register({ $sync: async () => 'data_0', - $update: { s1: { testEvent: update1 } }, + $merge: mergeFn, $mutate: { foo: { func: mutation, @@ -686,7 +643,7 @@ describe('Model', () => { expect(model.optimistic).toEqual('data_0'); expect(model.confirmed).toEqual('data_0'); - await model.mutations.foo.$expect({ events: [{ channel: 's1', name: 'testEvent', data: 'data_1' }] })(); + await model.mutations.foo.$expect({ events: [{ channel: channelName, name: 'testEvent', data: 'data_1' }] })(); await optimisticSubscriptionCalls[1]; expect(model.optimistic).toEqual('data_1'); @@ -704,24 +661,22 @@ describe('Model', () => { expect(model.optimistic).toEqual('confirmation'); }); - it('confirms optimistic events out of order', async ({ ably, logger, streams }) => { - const s1 = streams.getOrCreate({ channel: 's1' }); - const s2 = streams.getOrCreate({ channel: 's2' }); + it('confirms optimistic events out of order', async ({ channelName, ably, logger, streams }) => { + const s1 = streams.newStream({ channel: channelName }); s1.subscribe = vi.fn(); - s2.subscribe = vi.fn(); const events = { e1: new Subject() }; s1.subscribe = vi.fn((callback) => { events.e1.subscribe((message) => callback(null, message)); }); - const model = new Model Promise }>('test', { ably, logger }); + const model = new Model Promise }>('test', { ably, channelName, logger }); - const update1 = vi.fn(async (state, event) => state + event.data); + const mergeFn = vi.fn(async (state, event) => state + event.data); const mutation = vi.fn(async () => 'test'); await model.$register({ $sync: async () => '0', - $update: { s1: { testEvent: update1 } }, + $merge: mergeFn, $mutate: { foo: mutation }, }); @@ -744,8 +699,8 @@ describe('Model', () => { expect(model.optimistic).toEqual('0'); expect(model.confirmed).toEqual('0'); - await model.mutations.foo.$expect({ events: [{ channel: 's1', name: 'testEvent', data: '1' }] })(); - await model.mutations.foo.$expect({ events: [{ channel: 's1', name: 'testEvent', data: '2' }] })(); + await model.mutations.foo.$expect({ events: [{ channel: channelName, name: 'testEvent', data: '1' }] })(); + await model.mutations.foo.$expect({ events: [{ channel: channelName, name: 'testEvent', data: '2' }] })(); // optimistic updates are applied in the order the mutations were called await optimisticSubscriptionCalls[1]; @@ -784,128 +739,27 @@ describe('Model', () => { expect(optimisticSubscriptionSpy).toHaveBeenCalledTimes(4); // unchanged }); - it('confirms optimistic events from multiple streams', async ({ ably, logger, streams }) => { - const s1 = streams.getOrCreate({ channel: 's1' }); - const s2 = streams.getOrCreate({ channel: 's2' }); - s1.subscribe = vi.fn(); - s2.subscribe = vi.fn(); - - const events = { - e1: new Subject(), - e2: new Subject(), - }; - s1.subscribe = vi.fn((callback) => { - events.e1.subscribe((message) => callback(null, message)); - }); - s2.subscribe = vi.fn((callback) => { - events.e2.subscribe((message) => callback(null, message)); - }); - - const model = new Model Promise }>('test', { ably, logger }); - - // Defines an update function which concatenates strings. - // This is a non-commutative operation which let's us inspect the order in - // in which updates are applied to the speculative vs confirmed states. - const update1 = vi.fn(async (state, event) => state + event.data); - const mutation = vi.fn(async () => 'test'); - await model.$register({ - $sync: async () => '0', - $update: { s1: { testEvent: update1 }, s2: { testEvent: update1 } }, - $mutate: { foo: mutation }, - }); - - let optimisticSubscription = new Subject(); - const optimisticSubscriptionCalls = getEventPromises(optimisticSubscription, 6); - const optimisticSubscriptionSpy = vi.fn<[Error | null, string?]>(() => optimisticSubscription.next()); - model.subscribe(optimisticSubscriptionSpy); - - let confirmedSubscription = new Subject(); - const confirmedSubscriptionCalls = getEventPromises(confirmedSubscription, 4); - const confirmedSubscriptionSpy = vi.fn<[Error | null, string?]>(() => confirmedSubscription.next()); - model.subscribe(confirmedSubscriptionSpy, { optimistic: false }); - - await optimisticSubscriptionCalls[0]; - await confirmedSubscriptionCalls[0]; - expect(optimisticSubscriptionSpy).toHaveBeenCalledTimes(1); - expect(optimisticSubscriptionSpy).toHaveBeenNthCalledWith(1, null, '0'); - expect(confirmedSubscriptionSpy).toHaveBeenCalledTimes(1); - expect(confirmedSubscriptionSpy).toHaveBeenNthCalledWith(1, null, '0'); - expect(model.optimistic).toEqual('0'); - expect(model.confirmed).toEqual('0'); - - await model.mutations.foo.$expect({ events: [{ channel: 's1', name: 'testEvent', data: '1' }] })(); - await model.mutations.foo.$expect({ events: [{ channel: 's2', name: 'testEvent', data: '2' }] })(); - await model.mutations.foo.$expect({ events: [{ channel: 's1', name: 'testEvent', data: '3' }] })(); - - // optimistic updates are applied in the order the mutations were called - await optimisticSubscriptionCalls[1]; - await optimisticSubscriptionCalls[2]; - await optimisticSubscriptionCalls[3]; - expect(model.optimistic).toEqual('0123'); - expect(model.confirmed).toEqual('0'); - expect(optimisticSubscriptionSpy).toHaveBeenCalledTimes(4); - expect(optimisticSubscriptionSpy).toHaveBeenNthCalledWith(2, null, '01'); - expect(optimisticSubscriptionSpy).toHaveBeenNthCalledWith(3, null, '012'); - expect(optimisticSubscriptionSpy).toHaveBeenNthCalledWith(4, null, '0123'); - expect(confirmedSubscriptionSpy).toHaveBeenCalledTimes(1); - - // Optimistic updates must be confirmed in-order only in the context of a single stream, - // so here we confirm s2 in a different order to the order the mutation were optimistically applied, - // and assert that the confirmed state is constructed in the correct order (which differs from the - // order in which the speculative state is constructed). - - // confirm the first expected event - events.e1.next(customMessage('id_1', 'testEvent', '1')); - await optimisticSubscriptionCalls[4]; - expect(model.optimistic).toEqual('0123'); - await confirmedSubscriptionCalls[1]; - expect(model.confirmed).toEqual('01'); - expect(confirmedSubscriptionSpy).toHaveBeenCalledTimes(2); - expect(confirmedSubscriptionSpy).toHaveBeenNthCalledWith(2, null, '01'); - expect(optimisticSubscriptionSpy).toHaveBeenCalledTimes(5); - expect(optimisticSubscriptionSpy).toHaveBeenNthCalledWith(5, null, '0123'); - - // confirm the third expected event (second event on the first stream) - events.e1.next(customMessage('id_2', 'testEvent', '3')); - await optimisticSubscriptionCalls[5]; - expect(model.optimistic).toEqual('0132'); - await confirmedSubscriptionCalls[2]; - expect(model.confirmed).toEqual('013'); - expect(confirmedSubscriptionSpy).toHaveBeenCalledTimes(3); - expect(confirmedSubscriptionSpy).toHaveBeenNthCalledWith(3, null, '013'); - expect(optimisticSubscriptionSpy).toHaveBeenCalledTimes(6); - expect(optimisticSubscriptionSpy).toHaveBeenNthCalledWith(6, null, '0132'); - - // confirm the second expected event (first event on the second stream) - events.e2.next(customMessage('id_1', 'testEvent', '2')); - await optimisticSubscriptionCalls[6]; - expect(model.optimistic).toEqual('0132'); - await confirmedSubscriptionCalls[3]; - expect(model.confirmed).toEqual('0132'); - expect(confirmedSubscriptionSpy).toHaveBeenCalledTimes(4); - expect(confirmedSubscriptionSpy).toHaveBeenNthCalledWith(4, null, '0132'); - expect(optimisticSubscriptionSpy).toHaveBeenCalledTimes(6); - expect(optimisticSubscriptionSpy).toHaveBeenNthCalledWith(6, null, '0132'); - }); - - it('rebases optimistic events on top of confirmed state', async ({ ably, logger, streams }) => { - const s1 = streams.getOrCreate({ channel: 's1' }); - const s2 = streams.getOrCreate({ channel: 's2' }); + it('rebases optimistic events on top of confirmed state', async ({ + channelName, + ably, + logger, + streams, + }) => { + const s1 = streams.newStream({ channel: channelName }); s1.subscribe = vi.fn(); - s2.subscribe = vi.fn(); const events = { e1: new Subject() }; s1.subscribe = vi.fn((callback) => { events.e1.subscribe((message) => callback(null, message)); }); - const model = new Model Promise }>('test', { ably, logger }); + const model = new Model Promise }>('test', { ably, channelName, logger }); - const update1 = vi.fn(async (state, event) => state + event.data); + const mergeFn = vi.fn(async (state, event) => state + event.data); const mutation = vi.fn(async () => 'test'); await model.$register({ $sync: async () => '0', - $update: { s1: { testEvent: update1 } }, + $merge: mergeFn, $mutate: { foo: mutation }, }); @@ -928,8 +782,8 @@ describe('Model', () => { expect(model.optimistic).toEqual('0'); expect(model.confirmed).toEqual('0'); - await model.mutations.foo.$expect({ events: [{ channel: 's1', name: 'testEvent', data: '1' }] })(); - await model.mutations.foo.$expect({ events: [{ channel: 's1', name: 'testEvent', data: '2' }] })(); + await model.mutations.foo.$expect({ events: [{ channel: channelName, name: 'testEvent', data: '1' }] })(); + await model.mutations.foo.$expect({ events: [{ channel: channelName, name: 'testEvent', data: '2' }] })(); // optimistic updates are applied in the order the mutations were called await optimisticSubscriptionCalls[1]; @@ -976,11 +830,9 @@ describe('Model', () => { expect(optimisticSubscriptionSpy).toHaveBeenNthCalledWith(5, null, '0132'); }); - it('revert optimistic events if mutate fails', async ({ ably, logger, streams }) => { - const s1 = streams.getOrCreate({ channel: 's1' }); - const s2 = streams.getOrCreate({ channel: 's2' }); + it('revert optimistic events if mutate fails', async ({ channelName, ably, logger, streams }) => { + const s1 = streams.newStream({ channel: channelName }); s1.subscribe = vi.fn(); - s2.subscribe = vi.fn(); const model = new Model< string, @@ -988,9 +840,9 @@ describe('Model', () => { mutation1: () => Promise; mutation2: () => Promise; } - >('test', { ably, logger }); + >('test', { ably, channelName, logger }); - const update1 = vi.fn(async (state, event) => state + event.data); + const mergeFn = vi.fn(async (state, event) => state + event.data); const mutation1 = vi.fn(async () => 'test'); const mutation2 = vi.fn(async () => { throw new Error('mutation failed'); @@ -998,24 +850,24 @@ describe('Model', () => { await model.$register({ $sync: async () => '0', - $update: { s1: { testEvent: update1 } }, + $merge: mergeFn, $mutate: { mutation1, mutation2 }, }); const result1 = await model.mutations.mutation1.$expect({ events: [ - { channel: 's1', name: 'testEvent', data: '1' }, - { channel: 's1', name: 'testEvent', data: '2' }, - { channel: 's1', name: 'testEvent', data: '3' }, + { channel: channelName, name: 'testEvent', data: '1' }, + { channel: channelName, name: 'testEvent', data: '2' }, + { channel: channelName, name: 'testEvent', data: '3' }, ], })(); expect(result1[0]).toEqual('test'); await expect( model.mutations.mutation2.$expect({ events: [ - { channel: 's1', name: 'testEvent', data: '4' }, - { channel: 's1', name: 'testEvent', data: '5' }, - { channel: 's1', name: 'testEvent', data: '6' }, + { channel: channelName, name: 'testEvent', data: '4' }, + { channel: channelName, name: 'testEvent', data: '5' }, + { channel: channelName, name: 'testEvent', data: '6' }, ], })(), ).rejects.toThrow('mutation failed'); @@ -1023,39 +875,32 @@ describe('Model', () => { }); // If applying a received stream update throws, the model reverts to the PREPARING state and re-syncs. - it('resync if stream apply update fails', async ({ ably, logger, streams }) => { - const s1 = streams.getOrCreate({ channel: 's1' }); - const s2 = streams.getOrCreate({ channel: 's2' }); + it('resync if stream apply update fails', async ({ channelName, ably, logger, streams }) => { + const s1 = streams.newStream({ channel: channelName }); s1.subscribe = vi.fn(); - s2.subscribe = vi.fn(); // event subjects used to invoke the stream subscription callbacks // registered by the model, to simulate stream data const events = { - e1: new Subject(), - e2: new Subject(), + channelEvents: new Subject(), }; s1.subscribe = vi.fn((callback) => { - events.e1.subscribe((message) => callback(null, message)); + events.channelEvents.subscribe((message) => callback(null, message)); }); s1.unsubscribe = vi.fn(); - s2.subscribe = vi.fn((callback) => { - events.e2.subscribe((message) => callback(null, message)); - }); - s2.unsubscribe = vi.fn(); let counter = 0; const sync = vi.fn(async () => `${counter}`); - const model = new Model('test', { ably, logger }); + const model = new Model('test', { ably, channelName, logger }); - const update1 = vi.fn(async (state, event) => { + const mergeFn = vi.fn(async (state, event) => { if (event.data === '3') { throw new Error('test'); } return event.data; }); - await model.$register({ $sync: sync, $update: { s1: { testEvent: update1 } } }); + await model.$register({ $sync: sync, $merge: mergeFn }); expect(sync).toHaveBeenCalledOnce(); @@ -1069,36 +914,42 @@ describe('Model', () => { await subscriptionCalls[0]; expect(subscriptionSpy).toHaveBeenNthCalledWith(1, null, '0'); - events.e1.next(customMessage('id_1', 'testEvent', String(++counter))); + events.channelEvents.next(customMessage('id_1', 'testEvent', String(++counter))); await subscriptionCalls[1]; expect(subscriptionSpy).toHaveBeenNthCalledWith(2, null, '1'); + expect(mergeFn).toHaveBeenCalledTimes(1); - events.e1.next(customMessage('id_2', 'testEvent', String(++counter))); + events.channelEvents.next(customMessage('id_2', 'testEvent', String(++counter))); await subscriptionCalls[2]; expect(subscriptionSpy).toHaveBeenNthCalledWith(3, null, '2'); + expect(mergeFn).toHaveBeenCalledTimes(2); // The 3rd event throws when applying the update, which should // trigger a resync and get the latest counter value. const preparingPromise = modelStatePromise(model, 'preparing'); - events.e1.next(customMessage('id_3', 'testEvent', String(++counter))); + events.channelEvents.next(customMessage('id_3', 'testEvent', String(++counter))); const { reason } = (await preparingPromise) as ModelStateChange; expect(reason).to.toBeDefined(); expect(reason!.message).toEqual('test'); await subscriptionCalls[3]; expect(subscriptionSpy).toHaveBeenNthCalledWith(4, null, '3'); + expect(mergeFn).toHaveBeenCalledTimes(3); expect(model.state).toEqual('ready'); }); // Tests if applying optimistic events throws, the the optimistic events are reverted. - it('revert optimistic events if apply update fails', async ({ ably, logger, streams }) => { - const s1 = streams.getOrCreate({ channel: 's1' }); - const s2 = streams.getOrCreate({ channel: 's2' }); + it('revert optimistic events if apply update fails', async ({ + channelName, + ably, + logger, + streams, + }) => { + const s1 = streams.newStream({ channel: channelName }); s1.subscribe = vi.fn(); - s2.subscribe = vi.fn(); - const model = new Model Promise }>('test', { ably, logger }); + const model = new Model Promise }>('test', { ably, channelName, logger }); - const updateFn = vi.fn(async (state, event) => { + const mergeFn = vi.fn(async (state, event) => { if (event.data === '6') { throw new Error('update error'); } @@ -1107,23 +958,23 @@ describe('Model', () => { const mutation = vi.fn(async () => 'test'); await model.$register({ $sync: async () => '0', - $update: { s1: { testEvent: updateFn } }, + $merge: mergeFn, $mutate: { mutation }, }); const [result1] = await model.mutations.mutation.$expect({ events: [ - { channel: 's1', name: 'testEvent', data: '1' }, - { channel: 's1', name: 'testEvent', data: '2' }, - { channel: 's1', name: 'testEvent', data: '3' }, + { channel: channelName, name: 'testEvent', data: '1' }, + { channel: channelName, name: 'testEvent', data: '2' }, + { channel: channelName, name: 'testEvent', data: '3' }, ], })(); expect(result1).toEqual('test'); await expect( model.mutations.mutation.$expect({ events: [ - { channel: 's1', name: 'testEvent', data: '4' }, - { channel: 's1', name: 'testEvent', data: '5' }, - { channel: 's1', name: 'testEvent', data: '6' }, + { channel: channelName, name: 'testEvent', data: '4' }, + { channel: channelName, name: 'testEvent', data: '5' }, + { channel: channelName, name: 'testEvent', data: '6' }, ], })(), ).rejects.toThrow('update error'); @@ -1132,41 +983,40 @@ describe('Model', () => { }); // Tests if the mutation throws, the the optimistic events are reverted. - it('revert optimistic events if mutation fails', async ({ ably, logger, streams }) => { - const s1 = streams.getOrCreate({ channel: 's1' }); - const s2 = streams.getOrCreate({ channel: 's2' }); + it('revert optimistic events if mutation fails', async ({ channelName, ably, logger, streams }) => { + const s1 = streams.newStream({ channel: 's1' }); s1.subscribe = vi.fn(); - s2.subscribe = vi.fn(); const model = new Model Promise; mutation2: () => Promise }>('test', { ably, + channelName, logger, }); - const updateFn = vi.fn(async (state, event) => state + event.data); + const mergeFn = vi.fn(async (state, event) => state + event.data); const mutation1 = vi.fn(async () => 'test'); const mutation2 = async () => { throw new Error('mutation failed'); }; await model.$register({ $sync: async () => '0', - $update: { s1: { testEvent: updateFn } }, + $merge: mergeFn, $mutate: { mutation1, mutation2 }, }); const [result1] = await model.mutations.mutation1.$expect({ events: [ - { channel: 's1', name: 'testEvent', data: '1' }, - { channel: 's1', name: 'testEvent', data: '2' }, - { channel: 's1', name: 'testEvent', data: '3' }, + { channel: channelName, name: 'testEvent', data: '1' }, + { channel: channelName, name: 'testEvent', data: '2' }, + { channel: channelName, name: 'testEvent', data: '3' }, ], })(); expect(result1).toEqual('test'); await expect( model.mutations.mutation2.$expect({ events: [ - { channel: 's1', name: 'testEvent', data: '4' }, - { channel: 's1', name: 'testEvent', data: '5' }, - { channel: 's1', name: 'testEvent', data: '6' }, + { channel: channelName, name: 'testEvent', data: '4' }, + { channel: channelName, name: 'testEvent', data: '5' }, + { channel: channelName, name: 'testEvent', data: '6' }, ], })(), ).rejects.toThrow('mutation failed'); @@ -1176,16 +1026,17 @@ describe('Model', () => { // Tests if applying optimistic events throws *and* the mutation throws, the the optimistic events are reverted. it('revert optimistic events if the mutation fails and apply update fails', async ({ + channelName, ably, logger, streams, }) => { - const s1 = streams.getOrCreate({ channel: 's1' }); + const s1 = streams.newStream({ channel: channelName }); s1.subscribe = vi.fn(); - const model = new Model Promise }>('test', { ably, logger }); + const model = new Model Promise }>('test', { ably, channelName, logger }); - const update1 = async (state, event) => { + const mergeFn = async (state, event) => { if (event.data === '3') { throw new Error('update error'); } @@ -1196,15 +1047,15 @@ describe('Model', () => { }; await model.$register({ $sync: async () => '0', - $update: { s1: { testEvent: update1 } }, + $merge: mergeFn, $mutate: { mutation }, }); await expect( model.mutations.mutation.$expect({ events: [ - { channel: 's1', name: 'testEvent', data: '1' }, - { channel: 's1', name: 'testEvent', data: '2' }, - { channel: 's1', name: 'testEvent', data: '3' }, + { channel: channelName, name: 'testEvent', data: '1' }, + { channel: channelName, name: 'testEvent', data: '2' }, + { channel: channelName, name: 'testEvent', data: '3' }, ], })(), ).rejects.toThrow(new Error('update error')); // mutation not invoked if optimistic update fails, so we only expect an update error @@ -1212,24 +1063,27 @@ describe('Model', () => { expect(model.optimistic).toEqual('0'); }); - it('optimistic event confirmation confirmed before timeout', async ({ ably, logger, streams }) => { - const s1 = streams.getOrCreate({ channel: 's1' }); - const s2 = streams.getOrCreate({ channel: 's2' }); + it('optimistic event confirmation confirmed before timeout', async ({ + channelName, + ably, + logger, + streams, + }) => { + const s1 = streams.newStream({ channel: channelName }); s1.subscribe = vi.fn(); - s2.subscribe = vi.fn(); const events = { e1: new Subject() }; s1.subscribe = vi.fn((callback) => { events.e1.subscribe((message) => callback(null, message)); }); - const model = new Model Promise }>('test', { ably, logger }); + const model = new Model Promise }>('test', { ably, channelName, logger }); - const update1 = vi.fn(async (state, event) => state + event.data); + const mergeFn = vi.fn(async (state, event) => state + event.data); const mutation = vi.fn(async () => 'test'); await model.$register({ $sync: async () => '0', - $update: { s1: { testEvent: update1 } }, + $merge: mergeFn, $mutate: { foo: { func: mutation, @@ -1239,9 +1093,9 @@ describe('Model', () => { }); const [result, confirmation] = await model.mutations.foo.$expect({ events: [ - { channel: 's1', name: 'testEvent', data: '1' }, - { channel: 's1', name: 'testEvent', data: '2' }, - { channel: 's1', name: 'testEvent', data: '3' }, + { channel: channelName, name: 'testEvent', data: '1' }, + { channel: channelName, name: 'testEvent', data: '2' }, + { channel: channelName, name: 'testEvent', data: '3' }, ], })(); expect(result).toEqual('test'); @@ -1255,24 +1109,22 @@ describe('Model', () => { expect(model.optimistic).toEqual('0123'); }); - it('optimistic event confirmation timeout', async ({ ably, logger, streams }) => { - const s1 = streams.getOrCreate({ channel: 's1' }); - const s2 = streams.getOrCreate({ channel: 's2' }); + it('optimistic event confirmation timeout', async ({ channelName, ably, logger, streams }) => { + const s1 = streams.newStream({ channel: 's1' }); s1.subscribe = vi.fn(); - s2.subscribe = vi.fn(); const events = { e1: new Subject() }; s1.subscribe = vi.fn((callback) => { events.e1.subscribe((message) => callback(null, message)); }); - const model = new Model Promise }>('test', { ably, logger }); + const model = new Model Promise }>('test', { ably, channelName, logger }); - const update1 = vi.fn(async (state, event) => state + event.data); + const mergeFn = vi.fn(async (state, event) => state + event.data); const mutation = vi.fn(async () => 'test'); await model.$register({ $sync: async () => '0', - $update: { s1: { testEvent: update1 } }, + $merge: mergeFn, $mutate: { foo: { func: mutation, @@ -1284,9 +1136,9 @@ describe('Model', () => { // Mutate and check the returned promise is rejected with a timeout. const [, confirmation] = await model.mutations.foo.$expect({ events: [ - { channel: 's1', name: 'testEvent', data: '1' }, - { channel: 's1', name: 'testEvent', data: '2' }, - { channel: 's1', name: 'testEvent', data: '3' }, + { channel: channelName, name: 'testEvent', data: '1' }, + { channel: channelName, name: 'testEvent', data: '2' }, + { channel: channelName, name: 'testEvent', data: '3' }, ], })(); expect(model.optimistic).toEqual('0123'); diff --git a/src/Model.ts b/src/Model.ts index b7e60a48..2cac5d09 100644 --- a/src/Model.ts +++ b/src/Model.ts @@ -2,12 +2,13 @@ import type { Types as AblyTypes } from 'ably/promises.js'; import type { Logger } from 'pino'; import { Subject, Subscription } from 'rxjs'; -import { toError, UpdateRegistrationError } from './Errors.js'; +import { toError } from './Errors.js'; import MutationsRegistry from './MutationsRegistry.js'; import PendingConfirmationRegistry from './PendingConfirmationRegistry.js'; import { IStream } from './Stream.js'; -import StreamRegistry from './StreamRegistry.js'; +import StreamFactory, { IStreamFactory as IStreamFactory } from './StreamFactory.js'; import type { StandardCallback } from './types/callbacks'; +import { MergeFunc } from './types/merge.js'; import type { OptimisticEventWithParams, ModelState, @@ -20,7 +21,6 @@ import type { ConfirmedEvent, } from './types/model.js'; import type { MutationMethods } from './types/mutations.js'; -import UpdatesRegistry from './UpdatesRegistry.js'; import EventEmitter from './utilities/EventEmitter.js'; /** @@ -48,8 +48,10 @@ export default class Model extends EventEmitter = async () => { throw new Error('sync func not registered'); }; - private readonly streamRegistry: StreamRegistry; - private readonly updatesRegistry: UpdatesRegistry = new UpdatesRegistry(); + private merge?: MergeFunc; + + private readonly stream: IStream; + private readonly streamFactory: IStreamFactory; private readonly mutationsRegistry: MutationsRegistry; private optimisticEvents: OptimisticEventWithParams[] = []; @@ -69,8 +71,15 @@ export default class Model extends EventEmitter( { apply: this.applyOptimisticEvents.bind(this), @@ -114,8 +123,8 @@ export default class Model extends EventEmitter stream.pause())); } /** @@ -124,7 +133,7 @@ export default class Model extends EventEmitter stream.resume())); + await this.stream.resume(); this.setState('ready'); } @@ -138,13 +147,12 @@ export default class Model extends EventEmitter extends EventEmitter} registration - The set of methods to register. * @returns A promise that resolves when the model has completed the registrtion and is ready to start emitting updates. @@ -168,14 +176,11 @@ export default class Model extends EventEmitter extends EventEmitter event.params.timeout === events[0].params.timeout)) { - throw new Error('expected every optimistic event in batch to have the same timeout'); - } + for (const event of events) { - if (!this.streamRegistry.streams[event.channel]) { + if (event.channel !== this.stream.channel) { throw new Error(`stream with name '${event.channel}' not registered on model '${this.name}'`); } } + + if (!events.every((event) => event.params.timeout === events[0].params.timeout)) { + throw new Error('expected every optimistic event in batch to have the same timeout'); + } const pendingConfirmation = await this.pendingConfirmationsRegistry.add(events); const optimistic = this.onStreamEvents(events); return [optimistic, pendingConfirmation.promise]; @@ -279,11 +286,8 @@ export default class Model extends EventEmitter extends EventEmitter = async (err: Error | null, event?: AblyTypes.Message) => { try { if (err) { @@ -323,8 +324,8 @@ export default class Model extends EventEmitter extends EventEmitter { - this.logger.trace({ ...this.baseLogContext, action: 'applyUpdates()', initialData, event }); - let data = initialData; - const updates = this.updatesRegistry.get({ channel: event.channel, event: event.name }); - for (const update of updates) { - data = await update.func(data, event); + private async applyUpdate(initialData: T, event: OptimisticEvent | ConfirmedEvent): Promise { + this.logger.trace({ ...this.baseLogContext, action: 'applyUpdate()', initialData, event }); + if (!this.merge) { + throw new Error('merge func not registered'); } + const data = await this.merge(initialData, event); return data; } - private async applyOptimisticUpdates(initialData: T, event: OptimisticEvent) { - const data = await this.applyUpdates(initialData, event); + private async applyOptimisticUpdate(initialData: T, event: OptimisticEvent) { + const data = await this.applyUpdate(initialData, event); this.setOptimisticData(data); } - private async applyConfirmedUpdates(initialData: T, event: ConfirmedEvent) { - const data = await this.applyUpdates(initialData, event); + private async applyConfirmedUpdate(initialData: T, event: ConfirmedEvent) { + const data = await this.applyUpdate(initialData, event); this.setConfirmedData(data); } @@ -380,22 +380,15 @@ export default class Model extends EventEmitter extends EventEmitter extends EventEmitter { beforeEach((context) => { context.ably = new Realtime({ key: 'abc:def' }); + context.channelName = 'models:myTestModel:updates'; + + // make sure the various channel and connection + // functions are mounted on the ably realtime mock + const ablyChannel = context.ably.channels.get(context.channelName); + ablyChannel.on = vi.fn(); + ablyChannel.subscribe = vi.fn(); + context.ably.connection.whenState = vi.fn(); }); it('expects the injected client to be of the type RealtimePromise', ({ ably }) => { @@ -19,11 +28,11 @@ describe('Models', () => { expectTypeOf(models.ably).toMatchTypeOf(); }); - it('getting a model with the same name returns the same instance', ({ ably }) => { + it('getting a model with the same name returns the same instance', ({ ably, channelName }) => { const models = new Models({ ably }); - const model1 = models.Model('test'); + const model1 = models.Model('test', channelName); expect(model1.name).toEqual('test'); - const model2 = models.Model('test'); + const model2 = models.Model('test', channelName); expect(model2.name).toEqual('test'); expect(model1).toEqual(model2); }); diff --git a/src/Models.ts b/src/Models.ts index 90128bb1..c4b04e85 100644 --- a/src/Models.ts +++ b/src/Models.ts @@ -8,7 +8,7 @@ import type { MutationMethods } from './types/mutations.js'; * Models captures the set of named Model instances used by your application. */ export default class Models { - private readonly options: ModelOptions; + private readonly options: Pick; private models: Record> = {}; readonly version = '0.0.1'; @@ -37,15 +37,19 @@ export default class Models { /** * Gets an existing or creates a new model instance with the given name. * @param {string} name - The unique name to identify this model instance in your application. + * @param {string} channel - The name of the channel the model will subscribe to update events on. */ - Model = (name: string) => { + Model = (name: string, channel: string) => { if (!name) { throw new Error('Model must have a non-empty name'); } if (this.models[name]) { return this.models[name] as Model; } - const model = new Model(name, this.options); + + const options: ModelOptions = { ...this.options, channelName: channel }; + + const model = new Model(name, options); this.models[name] = model; return model as Model; }; diff --git a/src/StreamRegistry.test.ts b/src/StreamFactory.test.ts similarity index 96% rename from src/StreamRegistry.test.ts rename to src/StreamFactory.test.ts index ab10196a..cd1dbb7d 100644 --- a/src/StreamRegistry.test.ts +++ b/src/StreamFactory.test.ts @@ -4,7 +4,7 @@ import { Subject } from 'rxjs'; import { it, describe, expect, beforeEach, afterEach, vi } from 'vitest'; import Stream, { StreamOptions, StreamState } from './Stream.js'; -import StreamRegistry from './StreamRegistry.js'; +import StreamFactory from './StreamFactory.js'; import { createMessage } from './utilities/test/messages.js'; vi.mock('ably/promises'); @@ -229,19 +229,18 @@ describe('Stream', () => { }); it('succeeds with gte zero event buffer ms', async ({ ably, logger }) => { - new StreamRegistry({ eventBufferOptions: { bufferMs: 0 }, ably, logger }); - new StreamRegistry({ eventBufferOptions: { bufferMs: 1 }, ably, logger }); + new StreamFactory({ eventBufferOptions: { bufferMs: 0 }, ably, logger }); + new StreamFactory({ eventBufferOptions: { bufferMs: 1 }, ably, logger }); }); it('fails with lt zero event buffer ms', async ({ ably, logger }) => { try { - new StreamRegistry({ eventBufferOptions: { bufferMs: -1 }, ably, logger }); + new StreamFactory({ eventBufferOptions: { bufferMs: -1 }, ably, logger }); expect(true).toBe(false); } catch (err) { expect(err.toString(), 'Stream registry should have thrown an error').not.toContain('AssertionError'); } }); - // TODO discontinuity // TODO reauth https://ably.com/docs/realtime/channels?lang=nodejs#fatal-errors }); diff --git a/src/StreamFactory.ts b/src/StreamFactory.ts new file mode 100644 index 00000000..3d1e0e75 --- /dev/null +++ b/src/StreamFactory.ts @@ -0,0 +1,32 @@ +import Stream, { IStream, StreamOptions } from './Stream.js'; + +export interface IStreamFactory { + newStream(options: Pick): IStream; +} + +/** + * The StreamFactory class creates Stream instances that are + * used to deliver change events to a model. + */ +export default class StreamFactory implements IStreamFactory { + /** + * @param {Pick} options - The default options used when instantiating a stream. + */ + constructor(private readonly options: Pick) { + if (options.eventBufferOptions) { + const bufferMs = options.eventBufferOptions?.bufferMs || 0; + if (bufferMs < 0) { + throw new Error(`EventBufferOptions bufferMs cannot be less than zero: ${bufferMs}`); + } + } + } + + /** + * Create a new Stream instance for the given channel. + * @param {Pick} options - The options used in conjunction with the default options when instantiating a stream + * @returns {IStream} The newly created stream instance. + */ + newStream(options: Pick) { + return new Stream(Object.assign(this.options, options)); + } +} diff --git a/src/StreamRegistry.ts b/src/StreamRegistry.ts deleted file mode 100644 index 0b7e548c..00000000 --- a/src/StreamRegistry.ts +++ /dev/null @@ -1,42 +0,0 @@ -import Stream, { IStream, StreamOptions } from './Stream.js'; - -export interface IStreamRegistry { - getOrCreate(options: Pick): IStream; - get streams(): { [key: string]: IStream }; -} - -/** - * The StreamRegistry class encapsulates a set of names stream instances that are - * used to deliver change events to a model. - */ -export default class StreamRegistry implements IStreamRegistry { - private _streams: { [key: string]: IStream } = {}; - - /** - * @param {Pick} options - The default options used when instantiating a stream. - */ - constructor(readonly options: Pick) { - if (options.eventBufferOptions) { - const bufferMs = options.eventBufferOptions?.bufferMs || 0; - if (bufferMs < 0) { - throw new Error(`EventBufferOptions bufferMs cannot be less than zero: ${bufferMs}`); - } - } - } - - /** - * Retrieve an existing stream instance for the given channel or create a new one if it doesn't yet exist. - * @param {Pick} options - The options used in conjunction with the default options when instantiating a stream - * @returns {IStream} The pre-existing or newly created stream instance. - */ - getOrCreate(options: Pick) { - if (!this._streams[options.channel]) { - this._streams[options.channel] = new Stream(Object.assign(this.options, options)); - } - return this._streams[options.channel]; - } - - public get streams() { - return this._streams; - } -} diff --git a/src/UpdatesRegistry.test.ts b/src/UpdatesRegistry.test.ts deleted file mode 100644 index 716a5d6e..00000000 --- a/src/UpdatesRegistry.test.ts +++ /dev/null @@ -1,103 +0,0 @@ -import { it, describe, expect, beforeEach } from 'vitest'; - -import { UpdateRegistrationError } from './Errors.js'; -import type { UpdateFunc } from './types/updates.js'; -import UpdatesRegistry from './UpdatesRegistry.js'; - -describe('UpdatesRegistry', () => { - let registry: UpdatesRegistry<{ value: number }>; - - const func1: UpdateFunc<{ value: number }> = async (state) => { - return { value: state.value + 1 }; - }; - - const func2: UpdateFunc<{ value: number }> = async (state) => { - return { value: state.value - 1 }; - }; - - beforeEach(() => { - registry = new UpdatesRegistry<{ value: number }>(); - }); - - it('registers an update function correctly', async () => { - registry.register(func1, { channel: 'channel1', event: 'event1' }); - const result = registry.get({ channel: 'channel1', event: 'event1' }); - expect(result.length).toBe(1); - expect(result[0].targets.channel).toBe('channel1'); - expect(result[0].targets.event).toBe('event1'); - expect(result[0].func).toBe(func1); - }); - - it('registers multiple update functions for the same channel and event', async () => { - registry.register(func1, { channel: 'channel1', event: 'event1' }); - registry.register(func2, { channel: 'channel1', event: 'event1' }); - const result = registry.get({ channel: 'channel1', event: 'event1' }); - expect(result.length).toBe(2); - expect(result[0].targets.channel).toBe('channel1'); - expect(result[0].targets.event).toBe('event1'); - expect(result[0].func).toBe(func1); - expect(result[1].targets.channel).toBe('channel1'); - expect(result[1].targets.event).toBe('event1'); - expect(result[1].func).toBe(func2); - }); - - it('registers and retrieves multiple update functions for multiple channels and events', async () => { - registry.register(func1, { channel: 'channel1', event: 'event1' }); - registry.register(func2, { channel: 'channel2', event: 'event2' }); - let result = registry.get({ channel: 'channel1', event: 'event1' }); - expect(result.length).toBe(1); - expect(result[0].targets.channel).toBe('channel1'); - expect(result[0].targets.event).toBe('event1'); - result = registry.get({ channel: 'channel2', event: 'event2' }); - expect(result.length).toBe(1); - expect(result[0].targets.channel).toBe('channel2'); - expect(result[0].targets.event).toBe('event2'); - }); - - it('returns all registered functions when no options are given', async () => { - registry.register(func1, { channel: 'channel1', event: 'event1' }); - registry.register(func2, { channel: 'channel2', event: 'event2' }); - const result = registry.get({}); - expect(result.length).toBe(2); - }); - - it('throws an error when attempting to get a non-existent channel', async () => { - expect(() => registry.get({ channel: 'channel1' })).toThrow(UpdateRegistrationError); - }); - - it('throws an error when attempting to get a non-existent channel, even if event is provided', async () => { - expect(() => registry.get({ channel: 'channel1', event: 'event1' })).toThrow(UpdateRegistrationError); - }); - - it('throws an error when attempting to get a non-existent event in a valid channel', async () => { - registry.register(func1, { channel: 'channel1', event: 'event1' }); - expect(() => registry.get({ channel: 'channel1', event: 'event2' })).toThrow(UpdateRegistrationError); - }); - - it('throws an error when attempting to get a non-existent event, even if channel is not provided', async () => { - registry.register(func1, { channel: 'channel1', event: 'event1' }); - expect(() => registry.get({ event: 'event2' })).toThrow(UpdateRegistrationError); - }); - - it('throws an error when attempting to get a non-existent event', async () => { - expect(() => registry.get({ event: 'event1' })).toThrow(UpdateRegistrationError); - }); - - it('throws an error when attempting to get a non-existent event, even if channel is provided', async () => { - expect(() => registry.get({ channel: 'channel1', event: 'event1' })).toThrow(UpdateRegistrationError); - }); - - it('returns all functions in a channel when no event is specified', async () => { - registry.register(func1, { channel: 'channel1', event: 'event1' }); - registry.register(func2, { channel: 'channel1', event: 'event2' }); - const result = registry.get({ channel: 'channel1' }); - expect(result.length).toBe(2); - }); - - it('returns all functions for an event across all channels when no channel is specified', async () => { - registry.register(func1, { channel: 'channel1', event: 'event1' }); - registry.register(func2, { channel: 'channel2', event: 'event1' }); - const result = registry.get({ event: 'event1' }); - expect(result.length).toBe(2); - }); -}); diff --git a/src/UpdatesRegistry.ts b/src/UpdatesRegistry.ts deleted file mode 100644 index 8c7cce53..00000000 --- a/src/UpdatesRegistry.ts +++ /dev/null @@ -1,53 +0,0 @@ -import { UpdateRegistrationError } from './Errors.js'; -import type { UpdateFunc, UpdateFuncs } from './types/updates.js'; - -export type UpdateTargets = { - channel: string; - event: string; -}; - -export default class UpdatesRegistry { - private registry: UpdateFuncs = {}; - - constructor() {} - - register(update: UpdateFunc, { channel, event }: UpdateTargets) { - if (!this.registry[channel]) { - this.registry[channel] = {}; - } - if (!this.registry[channel][event]) { - this.registry[channel][event] = []; - } - this.registry[channel][event].push(update); - } - - public get(targets: Partial) { - const result: { targets: UpdateTargets; func: UpdateFunc }[] = []; - if (!!targets.channel && Object.keys(this.registry).length === 0) { - throw new UpdateRegistrationError({ channel: targets.channel }); - } - for (const channel in this.registry) { - if (!!targets.channel && targets.channel !== channel) { - continue; - } - if (!!targets.channel && !this.registry[channel]) { - throw new UpdateRegistrationError({ channel: targets.channel }); - } - for (const event in this.registry[channel]) { - if (!!targets.event && targets.event !== event) { - continue; - } - if (!!targets.event && !this.registry[channel][event]) { - throw new UpdateRegistrationError({ channel: targets.channel, event: targets.event }); - } - for (const func of this.registry[channel][event]) { - result.push({ targets: { channel, event }, func }); - } - } - } - if (!!targets.event && result.length === 0) { - throw new UpdateRegistrationError({ event: targets.event }); - } - return result; - } -} diff --git a/src/index.ts b/src/index.ts index 2b5a3fb0..81de6d6a 100644 --- a/src/index.ts +++ b/src/index.ts @@ -4,6 +4,6 @@ import Models from './Models.js'; export type * from './types/callbacks.js'; export type * from './types/model.js'; export type * from './types/mutations.js'; -export type * from './types/updates.js'; +export type * from './types/merge.js'; export { Model }; export default Models; diff --git a/src/types/merge.ts b/src/types/merge.ts new file mode 100644 index 00000000..75a3bed6 --- /dev/null +++ b/src/types/merge.ts @@ -0,0 +1,7 @@ +import type { OptimisticEvent, ConfirmedEvent } from './model'; + +/** + * An function which is invoked with the latest model state (either confirmed or optimistic) + * and an event (either confirmed or optimistic) and returns the resultant model state. + */ +export type MergeFunc = (state: T, event: OptimisticEvent | ConfirmedEvent) => Promise; diff --git a/src/types/model.ts b/src/types/model.ts index 4e994c17..2394ae52 100644 --- a/src/types/model.ts +++ b/src/types/model.ts @@ -1,8 +1,8 @@ import type { Types as AblyTypes } from 'ably/promises'; import type { Logger, LevelWithSilent } from 'pino'; +import type { MergeFunc } from './merge'; import type { EventComparator, MutationMethods, MutationOptions, MutationRegistration } from './mutations'; -import type { UpdateFunc } from './updates'; import type { EventBufferOptions } from '../Stream'; /** @@ -20,6 +20,7 @@ export type ModelsOptions = { */ export type ModelOptions = { ably: AblyTypes.RealtimePromise; + channelName: string; logger: Logger; defaultMutationOptions?: Partial; eventBufferOptions?: EventBufferOptions; @@ -142,14 +143,9 @@ export type Registration = { */ $sync: SyncFunc; /** - * A mapping of channel name to event to an update function that is invoked when a message - * is received matching that channel and event name. + * The merge function that is invoked when a message is received. */ - $update?: { - [channel: string]: { - [event: string]: UpdateFunc; - }; - }; + $merge?: MergeFunc; /** * A mapping of method names to mutations describing the mutations that are available on the model that * can be invoked to mutate the underlying state of the model in the backend database. diff --git a/src/types/updates.ts b/src/types/updates.ts deleted file mode 100644 index ce15c409..00000000 --- a/src/types/updates.ts +++ /dev/null @@ -1,17 +0,0 @@ -import type { OptimisticEvent, ConfirmedEvent } from './model'; - -/** - * An function which is invoked with the latest model state (either confirmed or optimistic) - * and an event (either confirmed or optimistic) and returns the resultant model state. - */ -export type UpdateFunc = (state: T, event: OptimisticEvent | ConfirmedEvent) => Promise; - -/** - * A mapping of channel to event name to update function which determines the update function to - * invoke when an event with a given name is received on a particular channel. - */ -export type UpdateFuncs = { - [channel: string]: { - [event: string]: UpdateFunc[]; - }; -};