diff --git a/src/Model.discontinuity.test.ts b/src/Model.discontinuity.test.ts new file mode 100644 index 00000000..5fe0628d --- /dev/null +++ b/src/Model.discontinuity.test.ts @@ -0,0 +1,89 @@ +import { Realtime, Types } from 'ably/promises'; +import pino from 'pino'; +import { Subject, lastValueFrom, take } from 'rxjs'; +import { it, describe, expect, vi, beforeEach } from 'vitest'; + +import Model from './Model.js'; +import { ModelOptions, ModelState } from './types/model.js'; +import { MutationMethods } from './types/mutations.js'; + +vi.mock('ably/promises'); + +interface ModelTestContext extends ModelOptions {} + +const getNthEventPromise = (subject: Subject, n: number) => lastValueFrom(subject.pipe(take(n))); + +const getEventPromises = (subject: Subject, n: number) => { + const promises: Promise[] = []; + for (let i = 0; i < n; i++) { + promises.push(getNthEventPromise(subject, i + 1)); + } + return promises; +}; + +const modelStatePromise = (model: Model, state: ModelState) => + new Promise((resolve) => model.whenState(state, model.state, resolve)); + +describe('Model', () => { + beforeEach(async (context) => { + const ably = new Realtime({}); + ably.connection.whenState = vi.fn<[Types.ConnectionState], Promise>(async () => { + return { + current: 'connected', + previous: 'initialized', + }; + }); + const logger = pino({ level: 'silent' }); + context.ably = ably; + context.logger = logger; + }); + it('handles discontinuity with resync', async ({ ably, logger }) => { + const channel = ably.channels.get('foo'); + let suspendChannel: (...args: any[]) => void = () => { + throw new Error('suspended not defined'); + }; + + channel.on = vi.fn(async (name: string[], callback) => { + if (name.includes('suspended')) { + suspendChannel = () => { + callback(); + }; + } + }); + channel.subscribe = vi.fn(); + channel.attach = vi.fn(); + channel.detach = vi.fn(); + ably.channels.release = vi.fn(); + + let counter = 0; + + const sync = vi.fn(async () => `${counter++}`); + const model = new Model('test', { ably, logger }); + const update1 = vi.fn(async (state, event) => { + return event.data; + }); + await model.$register({ $sync: sync, $update: { s1: { testEvent: update1 } } }); + + expect(sync).toHaveBeenCalledOnce(); + + let subscription = new Subject(); + const subscriptionCalls = getEventPromises(subscription, 2); + const subscriptionSpy = vi.fn<[Error | null, string?]>(() => { + subscription.next(); + }); + model.subscribe(subscriptionSpy); + + await subscriptionCalls[0]; + expect(subscriptionSpy).toHaveBeenNthCalledWith(1, null, '0'); + + expect(counter).toEqual(1); + await modelStatePromise(model, 'ready'); + + suspendChannel(); + await modelStatePromise(model, 'ready'); + + await subscriptionCalls[1]; + expect(subscriptionSpy).toHaveBeenNthCalledWith(2, null, '1'); + expect(counter).toEqual(2); + }); +}); diff --git a/src/Model.test.ts b/src/Model.test.ts index c03a72dd..8dc5c5e0 100644 --- a/src/Model.test.ts +++ b/src/Model.test.ts @@ -33,6 +33,7 @@ vi.mock('./StreamRegistry', () => { subscribe(): void {} unsubscribe(): void {} async dispose() {} + async reset() {} } const streams: { [key: string]: IStream } = {}; diff --git a/src/Model.ts b/src/Model.ts index 7ab064d5..b7e60a48 100644 --- a/src/Model.ts +++ b/src/Model.ts @@ -298,6 +298,7 @@ export default class Model extends EventEmitter; resume(): Promise; + reset(): void; subscribe(callback: StandardCallback): void; unsubscribe(callback: StandardCallback): void; dispose(reason?: AblyTypes.ErrorInfo | string): Promise; @@ -69,7 +70,7 @@ export default class Stream extends EventEmitter(); + private subscriptions = new Subject(); private subscriptionMap: WeakMap, Subscription> = new WeakMap(); private readonly baseLogContext: Partial<{ scope: string; action: string }>; @@ -80,10 +81,6 @@ export default class Stream extends EventEmitter this.dispose(change.reason)); - this.ablyChannel.on(['suspended', 'update'], () => - this.subscriptions.error(new Error('discontinuity in channel connection')), - ); this.baseLogContext = { scope: `Stream#${options.channel}` }; this.init(); } @@ -115,7 +112,7 @@ export default class Stream extends EventEmitter { - this.logger.trace({ ...this.baseLogContext, action: 'error()' }); + this.logger.trace({ ...this.baseLogContext, action: 'error()', error: err.toString() }); callback(err); }, complete: () => { @@ -144,6 +141,14 @@ export default class Stream extends EventEmitter(); + this.init(); + } + private setState(state: StreamState, reason?: AblyTypes.ErrorInfo | string) { this.logger.trace({ ...this.baseLogContext, action: 'setState()', state, reason }); const previous = this.currentState; @@ -157,6 +162,10 @@ export default class Stream extends EventEmitter this.dispose(change.reason)); + this.ablyChannel.on(['suspended', 'update'], () => { + this.subscriptions.error(new Error('discontinuity in channel connection')); + }); this.setState(StreamState.PREPARING); await this.ably.connection.whenState('connected'); await this.ablyChannel.subscribe(this.onMessage.bind(this));