Skip to content

Commit

Permalink
Construct a new Subject observer after error
Browse files Browse the repository at this point in the history
Add a reset function to the Stream that will create a new Subject
observer. An observer can't be reused for anything after a call to
'error' or 'complete'. A new Subject must be created, this is
encapsulated in the stream's reset function.
  • Loading branch information
zknill committed Sep 28, 2023
1 parent 19d69cf commit 78ad26f
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 6 deletions.
89 changes: 89 additions & 0 deletions src/Model.discontinuity.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import { Realtime, Types } from 'ably/promises';
import pino from 'pino';
import { Subject, lastValueFrom, take } from 'rxjs';
import { it, describe, expect, vi, beforeEach } from 'vitest';

import Model from './Model.js';
import { ModelOptions, ModelState } from './types/model.js';
import { MutationMethods } from './types/mutations.js';

vi.mock('ably/promises');

interface ModelTestContext extends ModelOptions {}

const getNthEventPromise = <T>(subject: Subject<T>, n: number) => lastValueFrom(subject.pipe(take(n)));

const getEventPromises = <T>(subject: Subject<T>, n: number) => {
const promises: Promise<T>[] = [];
for (let i = 0; i < n; i++) {
promises.push(getNthEventPromise(subject, i + 1));
}
return promises;
};

const modelStatePromise = <T, M extends MutationMethods>(model: Model<T, M>, state: ModelState) =>
new Promise((resolve) => model.whenState(state, model.state, resolve));

describe('Model', () => {
beforeEach<ModelTestContext>(async (context) => {
const ably = new Realtime({});
ably.connection.whenState = vi.fn<[Types.ConnectionState], Promise<Types.ConnectionStateChange>>(async () => {
return {
current: 'connected',
previous: 'initialized',
};
});
const logger = pino({ level: 'silent' });
context.ably = ably;
context.logger = logger;
});
it<ModelTestContext>('handles discontinuity with resync', async ({ ably, logger }) => {
const channel = ably.channels.get('foo');
let suspendChannel: (...args: any[]) => void = () => {
throw new Error('suspended not defined');
};

channel.on = vi.fn<any, any>(async (name: string[], callback) => {
if (name.includes('suspended')) {
suspendChannel = () => {
callback();
};
}
});
channel.subscribe = vi.fn<any, any>();
channel.attach = vi.fn<any, any>();
channel.detach = vi.fn<any, any>();
ably.channels.release = vi.fn<any, any>();

let counter = 0;

const sync = vi.fn(async () => `${counter++}`);
const model = new Model<string, {}>('test', { ably, logger });
const update1 = vi.fn(async (state, event) => {
return event.data;
});
await model.$register({ $sync: sync, $update: { s1: { testEvent: update1 } } });

expect(sync).toHaveBeenCalledOnce();

let subscription = new Subject<void>();
const subscriptionCalls = getEventPromises(subscription, 2);
const subscriptionSpy = vi.fn<[Error | null, string?]>(() => {
subscription.next();
});
model.subscribe(subscriptionSpy);

await subscriptionCalls[0];
expect(subscriptionSpy).toHaveBeenNthCalledWith(1, null, '0');

expect(counter).toEqual(1);
await modelStatePromise(model, 'ready');

suspendChannel();
await modelStatePromise(model, 'ready');

await subscriptionCalls[1];
expect(subscriptionSpy).toHaveBeenNthCalledWith(2, null, '1');
expect(counter).toEqual(2);
});
});
1 change: 1 addition & 0 deletions src/Model.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ vi.mock('./StreamRegistry', () => {
subscribe(): void {}
unsubscribe(): void {}
async dispose() {}
async reset() {}
}
const streams: { [key: string]: IStream } = {};

Expand Down
1 change: 1 addition & 0 deletions src/Model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ export default class Model<T, M extends MutationMethods> extends EventEmitter<Re
if (callback) {
stream.unsubscribe(callback);
}
stream.reset();
}
this.streamSubscriptionsMap = new WeakMap();
}
Expand Down
21 changes: 15 additions & 6 deletions src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export interface IStream {
get channel(): string;
pause(): Promise<void>;
resume(): Promise<void>;
reset(): void;
subscribe(callback: StandardCallback<AblyTypes.Message>): void;
unsubscribe(callback: StandardCallback<AblyTypes.Message>): void;
dispose(reason?: AblyTypes.ErrorInfo | string): Promise<void>;
Expand All @@ -69,7 +70,7 @@ export default class Stream extends EventEmitter<Record<StreamState, StreamState
private readonly ably: AblyTypes.RealtimePromise;
private currentState: StreamState = StreamState.INITIALIZED;
private readonly ablyChannel: AblyTypes.RealtimeChannelPromise;
private readonly subscriptions = new Subject<AblyTypes.Message>();
private subscriptions = new Subject<AblyTypes.Message>();
private subscriptionMap: WeakMap<StandardCallback<AblyTypes.Message>, Subscription> = new WeakMap();

private readonly baseLogContext: Partial<{ scope: string; action: string }>;
Expand All @@ -80,10 +81,6 @@ export default class Stream extends EventEmitter<Record<StreamState, StreamState
this.ably = options.ably;
this.logger = options.logger;
this.ablyChannel = this.ably.channels.get(this.options.channel);
this.ablyChannel.on('failed', (change) => this.dispose(change.reason));
this.ablyChannel.on(['suspended', 'update'], () =>
this.subscriptions.error(new Error('discontinuity in channel connection')),
);
this.baseLogContext = { scope: `Stream#${options.channel}` };
this.init();
}
Expand Down Expand Up @@ -115,7 +112,7 @@ export default class Stream extends EventEmitter<Record<StreamState, StreamState
callback(null, message);
},
error: (err) => {
this.logger.trace({ ...this.baseLogContext, action: 'error()' });
this.logger.trace({ ...this.baseLogContext, action: 'error()', error: err.toString() });
callback(err);
},
complete: () => {
Expand Down Expand Up @@ -144,6 +141,14 @@ export default class Stream extends EventEmitter<Record<StreamState, StreamState
this.ably.channels.release(this.ablyChannel.name);
}

public async reset() {
this.logger.trace({ ...this.baseLogContext, action: 'reset()' });
this.setState(StreamState.INITIALIZED, 'reset');
this.subscriptions.unsubscribe();
this.subscriptions = new Subject<AblyTypes.Message>();
this.init();
}

private setState(state: StreamState, reason?: AblyTypes.ErrorInfo | string) {
this.logger.trace({ ...this.baseLogContext, action: 'setState()', state, reason });
const previous = this.currentState;
Expand All @@ -157,6 +162,10 @@ export default class Stream extends EventEmitter<Record<StreamState, StreamState

private async init() {
this.logger.trace({ ...this.baseLogContext, action: 'init()' });
this.ablyChannel.on('failed', (change) => this.dispose(change.reason));
this.ablyChannel.on(['suspended', 'update'], () => {
this.subscriptions.error(new Error('discontinuity in channel connection'));
});
this.setState(StreamState.PREPARING);
await this.ably.connection.whenState('connected');
await this.ablyChannel.subscribe(this.onMessage.bind(this));
Expand Down

0 comments on commit 78ad26f

Please sign in to comment.