diff --git a/src/Model.discontinuity.test.ts b/src/Model.discontinuity.test.ts index b67068f9..889e57a7 100644 --- a/src/Model.discontinuity.test.ts +++ b/src/Model.discontinuity.test.ts @@ -46,7 +46,11 @@ describe('Model', () => { let counter = 0; - const sync = vi.fn(async () => `${counter++}`); + const sync = vi.fn(async () => ({ + data: `${counter++}`, + sequenceID: '0', + stateTimestamp: new Date(), + })); const model = new Model('test', { ably, channelName, logger }); const mergeFn = vi.fn(async (_, event) => { return event.data; diff --git a/src/Model.test.ts b/src/Model.test.ts index b9acdc7a..afd92f89 100644 --- a/src/Model.test.ts +++ b/src/Model.test.ts @@ -29,10 +29,10 @@ vi.mock('./stream/StreamFactory', () => { } async pause() {} async resume() {} - subscribe(): void {} + async subscribe() {} unsubscribe(): void {} async dispose() {} - async reset() {} + async sync() {} } const streams: { [key: string]: IStream } = {}; @@ -90,7 +90,7 @@ describe('Model', () => { const synchronised = new Promise((resolve) => (completeSync = resolve)); const sync = vi.fn(async () => { await synchronised; - return simpleTestData; + return { data: simpleTestData, sequenceID: '0', stateTimestamp: new Date() }; }); const model = new Model('test', { ably, @@ -117,7 +117,7 @@ describe('Model', () => { const sync = vi.fn(async () => { await synchronised; - return { ...simpleTestData, bar: { baz: ++counter } }; + return { data: { ...simpleTestData, bar: { baz: ++counter } }, sequenceID: '0', stateTimestamp: new Date() }; }); const model = new Model('test', { ably, @@ -128,7 +128,6 @@ describe('Model', () => { await statePromise(model, 'preparing'); completeSync(); - await ready; await statePromise(model, 'ready'); await expect(ready).resolves.toEqual({ current: 'ready', previous: 'preparing', reason: undefined }); @@ -160,7 +159,11 @@ describe('Model', () => { s1.subscribe = vi.fn(); s1.pause = vi.fn(); s1.resume = vi.fn(); - const sync = vi.fn(async () => simpleTestData); + const sync = vi.fn(async () => ({ + data: simpleTestData, + sequenceID: '0', + stateTimestamp: new Date(), + })); const model = new Model('test', { ably, channelName, logger }); @@ -185,7 +188,11 @@ describe('Model', () => { const s1 = streams.newStream({ channelName }); s1.subscribe = vi.fn(); s1.unsubscribe = vi.fn(); - const sync = vi.fn(async () => simpleTestData); + const sync = vi.fn(async () => ({ + data: simpleTestData, + sequenceID: '0', + stateTimestamp: new Date(), + })); const model = new Model('test', { ably, channelName, logger }); @@ -208,11 +215,15 @@ describe('Model', () => { channelEvents: new Subject(), }; - streams.newStream({ channelName }).subscribe = vi.fn((callback) => - events.channelEvents.subscribe((message) => callback(null, message)), - ); + streams.newStream({ channelName }).subscribe = vi.fn(async (callback) => { + events.channelEvents.subscribe((message) => callback(null, message)); + }); - const sync = vi.fn(async () => 'data_0'); // defines initial version of model + const sync = vi.fn(async () => ({ + data: 'data_0', + sequenceID: '0', + stateTimestamp: new Date(), + })); // defines initial version of model const model = new Model('test', { ably, channelName, logger }); const mergeFn = vi.fn(async (_, event) => event.data); @@ -263,7 +274,11 @@ describe('Model', () => { }); it('subscribes after initialisation', async ({ channelName, ably, logger }) => { - const sync = vi.fn(async () => 'data_0'); // defines initial version of model + const sync = vi.fn(async () => ({ + data: 'data_0', + sequenceID: '0', + stateTimestamp: new Date(), + })); // defines initial version of model const model = new Model('test', { ably, channelName, logger }); await model.$register({ $sync: sync }); @@ -293,15 +308,19 @@ describe('Model', () => { s1.subscribe = vi.fn(); const model = new Model('test', { ably, channelName, logger }); - const sync = async () => 'foobar'; + const sync = async () => ({ + data: 'foobar', + sequenceID: '0', + stateTimestamp: new Date(), + }); model.$register({ $sync: sync, }); - expect(() => + await expect( model.$register({ $sync: sync, }), - ).toThrow('$register was already called'); + ).rejects.toThrow('$register was already called'); }); it('fails to register after initialization', async ({ channelName, ably, logger, streams }) => { @@ -319,11 +338,15 @@ describe('Model', () => { s1.subscribe = vi.fn(); const model = new ModelWithSetState('test', { ably, channelName, logger }); - const sync = async () => 'foobar'; + const sync = async () => ({ + data: 'foobar', + sequenceID: '0', + stateTimestamp: new Date(), + }); model.setState('ready'); - expect(() => model.$register({ $sync: sync })).toThrow( + await expect(model.$register({ $sync: sync })).rejects.toThrow( `$register can only be called when the model is in the initialized state`, ); }); @@ -335,7 +358,11 @@ describe('Model', () => { const mergeFn = vi.fn(async (_, event) => event.data); await model.$register({ - $sync: async () => 'data_0', + $sync: async () => ({ + data: 'data_0', + sequenceID: '0', + stateTimestamp: new Date(), + }), $merge: mergeFn, }); @@ -373,7 +400,7 @@ describe('Model', () => { s1.subscribe = vi.fn(); const events = new Subject(); - s1.subscribe = vi.fn((callback) => { + s1.subscribe = vi.fn(async (callback) => { events.subscribe((message) => callback(null, message)); }); @@ -381,7 +408,11 @@ describe('Model', () => { const mergeFn = vi.fn(async (_, event) => event.data); await model.$register({ - $sync: async () => 'data_0', + $sync: async () => ({ + data: 'data_0', + sequenceID: '0', + stateTimestamp: new Date(), + }), $merge: mergeFn, }); @@ -426,7 +457,7 @@ describe('Model', () => { s1.subscribe = vi.fn(); const events = { e1: new Subject() }; - s1.subscribe = vi.fn((callback) => { + s1.subscribe = vi.fn(async (callback) => { events.e1.subscribe((message) => callback(null, message)); }); @@ -434,7 +465,11 @@ describe('Model', () => { const mergeFn = vi.fn(async (_, event) => event.data); await model.$register({ - $sync: async () => 'data_0', + $sync: async () => ({ + data: 'data_0', + sequenceID: '0', + stateTimestamp: new Date(), + }), $merge: mergeFn, }); @@ -484,7 +519,7 @@ describe('Model', () => { s1.subscribe = vi.fn(); const events = { e1: new Subject() }; - s1.subscribe = vi.fn((callback) => { + s1.subscribe = vi.fn(async (callback) => { events.e1.subscribe((message) => callback(null, message)); }); @@ -492,7 +527,11 @@ describe('Model', () => { const mergeFn = vi.fn(async (_, event) => event.data); await model.$register({ - $sync: async () => 'data_0', + $sync: async () => ({ + data: 'data_0', + sequenceID: '0', + stateTimestamp: new Date(), + }), $merge: mergeFn, }); @@ -541,7 +580,7 @@ describe('Model', () => { s1.subscribe = vi.fn(); const events = { e1: new Subject() }; - s1.subscribe = vi.fn((callback) => { + s1.subscribe = vi.fn(async (callback) => { events.e1.subscribe((message) => callback(null, message)); }); @@ -549,7 +588,11 @@ describe('Model', () => { const mergeFn = vi.fn(async (state, event) => state + event.data); await model.$register({ - $sync: async () => '0', + $sync: async () => ({ + data: '0', + sequenceID: '0', + stateTimestamp: new Date(), + }), $merge: mergeFn, }); @@ -622,7 +665,7 @@ describe('Model', () => { s1.subscribe = vi.fn(); const events = { e1: new Subject() }; - s1.subscribe = vi.fn((callback) => { + s1.subscribe = vi.fn(async (callback) => { events.e1.subscribe((message) => callback(null, message)); }); @@ -630,7 +673,11 @@ describe('Model', () => { const mergeFn = vi.fn(async (state, event) => state + event.data); await model.$register({ - $sync: async () => '0', + $sync: async () => ({ + data: '0', + sequenceID: '0', + stateTimestamp: new Date(), + }), $merge: mergeFn, }); @@ -710,7 +757,11 @@ describe('Model', () => { const mergeFn = vi.fn(async (state, event) => state + event.data); await model.$register({ - $sync: async () => '0', + $sync: async () => ({ + data: '0', + sequenceID: '0', + stateTimestamp: new Date(), + }), $merge: mergeFn, }); @@ -738,14 +789,18 @@ describe('Model', () => { const events = { channelEvents: new Subject(), }; - s1.subscribe = vi.fn((callback) => { + s1.subscribe = vi.fn(async (callback) => { events.channelEvents.subscribe((message) => callback(null, message)); }); s1.unsubscribe = vi.fn(); let counter = 0; - const sync = vi.fn(async () => `${counter}`); + const sync = vi.fn(async () => ({ + data: `${counter}`, + sequenceID: '0', + stateTimestamp: new Date(), + })); const model = new Model('test', { ably, channelName, logger }); const mergeFn = vi.fn(async (_, event) => { @@ -810,7 +865,11 @@ describe('Model', () => { return state + event.data; }); await model.$register({ - $sync: async () => '0', + $sync: async () => ({ + data: '0', + sequenceID: '0', + stateTimestamp: new Date(), + }), $merge: mergeFn, }); @@ -846,7 +905,7 @@ describe('Model', () => { s1.subscribe = vi.fn(); const events = { e1: new Subject() }; - s1.subscribe = vi.fn((callback) => { + s1.subscribe = vi.fn(async (callback) => { events.e1.subscribe((message) => callback(null, message)); }); @@ -854,7 +913,11 @@ describe('Model', () => { const mergeFn = vi.fn(async (state, event) => state + event.data); await model.$register({ - $sync: async () => '0', + $sync: async () => ({ + data: '0', + sequenceID: '0', + stateTimestamp: new Date(), + }), $merge: mergeFn, }); @@ -880,7 +943,7 @@ describe('Model', () => { s1.subscribe = vi.fn(); const events = new Subject(); - s1.subscribe = vi.fn((callback) => { + s1.subscribe = vi.fn(async (callback) => { events.subscribe((message) => callback(null, message)); }); @@ -888,7 +951,11 @@ describe('Model', () => { const mergeFn = vi.fn(async (state, event) => state + event.data); await model.$register({ - $sync: async () => '0', + $sync: async () => ({ + data: '0', + sequenceID: '0', + stateTimestamp: new Date(), + }), $merge: mergeFn, }); diff --git a/src/Model.ts b/src/Model.ts index 79fca61d..369723f4 100644 --- a/src/Model.ts +++ b/src/Model.ts @@ -23,6 +23,8 @@ import type { import type { OptimisticInvocationParams } from './types/optimistic.js'; import EventEmitter from './utilities/EventEmitter.js'; +const REWIND_INTERVAL_MARGIN_SECONDS = 5; + /** * A Model encapsulates an observable, collaborative data model backed by a transactional database in your backend. * @@ -128,11 +130,9 @@ export default class Model extends EventEmitter { - this.init(); - return new Promise((resolve) => this.whenState('ready', this.state, resolve)); - }; + public async $sync() { + await this.init(); + return new Promise((resolve) => this.whenState('ready', this.state, resolve)); } /** @@ -183,7 +183,7 @@ export default class Model extends EventEmitter} registration - The set of methods to register. * @returns A promise that resolves when the model has completed the registrtion and is ready to start emitting updates. */ - public $register(registration: Registration) { + public async $register(registration: Registration) { if (this.wasRegistered) { throw new Error('$register was already called'); } @@ -199,8 +199,9 @@ export default class Model extends EventEmitter this.whenState('ready', this.state, resolve)); + const result = new Promise((resolve) => this.whenState('ready', this.state, resolve)); + await this.init(); + return result; } /** @@ -294,27 +295,30 @@ export default class Model extends EventEmitter 2 * 60) { + throw new Error( + `rewind interval ${interval}s from state timestamp ${stateTimestamp.toString()} is greater than 2 minutes`, + ); + } const callback: StandardCallback = async (err: Error | null, event?: AblyTypes.Message) => { try { if (err) { @@ -330,9 +334,10 @@ export default class Model extends EventEmitter; resume(): Promise; - sync(rewind: string, sequenceID: string): void; - subscribe(callback: StandardCallback): void; + sync(rewind: string, sequenceID: string): Promise; + subscribe(callback: StandardCallback): Promise; unsubscribe(callback: StandardCallback): void; dispose(reason?: AblyTypes.ErrorInfo | string): Promise; } diff --git a/src/types/model.ts b/src/types/model.ts index 8bce2cd6..b5839583 100644 --- a/src/types/model.ts +++ b/src/types/model.ts @@ -107,7 +107,7 @@ export type OptimisticEventWithParams = OptimisticEvent & { * Defines a function which the library will use to pull the latest state of the model from the backend. * Invoked on initialisation and whenever some discontinuity occurs that requires a re-sync. */ -export type SyncFunc = () => Promise; +export type SyncFunc = () => Promise<{ data: T; sequenceID: string; stateTimestamp: Date }>; /** * A state transition emitted as an event from the model describing a change to the model's lifecycle.