Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow $sync to be called manually #52

Merged
merged 3 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Model.discontinuity.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { MutationMethods } from './types/mutations.js';

vi.mock('ably/promises');

interface ModelTestContext extends ModelOptions {}
type ModelTestContext = { channelName: string } & ModelOptions;
zknill marked this conversation as resolved.
Show resolved Hide resolved

const getNthEventPromise = <T>(subject: Subject<T>, n: number) => lastValueFrom(subject.pipe(take(n)));

Expand Down
55 changes: 51 additions & 4 deletions src/Model.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type TestData = {
};
};

const simpleTestData: TestData = {
let simpleTestData: TestData = {
foo: 'foobar',
bar: {
baz: 1,
Expand All @@ -66,7 +66,6 @@ const simpleTestData: TestData = {

interface ModelTestContext extends ModelOptions {
streams: IStreamFactory;
channelName: string;
}

const modelStatePromise = <T, M extends MutationMethods>(model: Model<T, M>, state: ModelState) =>
Expand Down Expand Up @@ -122,6 +121,54 @@ describe('Model', () => {
expect(model.confirmed).toEqual(simpleTestData);
});

it<ModelTestContext>('allows sync to be called manually', async ({ channelName, ably, logger }) => {
let completeSync: (...args: any[]) => void = () => {
throw new Error('completeSync not defined');
};
let synchronised = new Promise((resolve) => (completeSync = resolve));
let counter = 0;

const sync = vi.fn(async () => {
await synchronised;

return { ...simpleTestData, bar: { baz: ++counter } };
});
const model = new Model<TestData, { foo: (_: MutationContext, val: string) => Promise<number> }>('test', {
ably,
channelName,
logger,
});
const ready = model.$register({ $sync: sync });
await modelStatePromise(model, 'preparing');
completeSync();

await ready;
await modelStatePromise(model, 'ready');
await expect(ready).resolves.toEqual({ current: 'ready', previous: 'preparing', reason: undefined });

expect(sync).toHaveBeenCalledOnce();
expect(model.optimistic).toEqual(simpleTestData);
mschristensen marked this conversation as resolved.
Show resolved Hide resolved
expect(model.confirmed).toEqual(simpleTestData);

completeSync = () => {
throw new Error('completeSync should have been replaced again');
};
synchronised = new Promise((resolve) => {
completeSync = resolve;
});

const resynced = model.$sync();
await modelStatePromise(model, 'preparing');
completeSync();
await resynced;
await modelStatePromise(model, 'ready');
expect(sync).toHaveBeenCalledTimes(2);

zknill marked this conversation as resolved.
Show resolved Hide resolved
const want = { ...simpleTestData, bar: { baz: 2 } };
expect(model.optimistic).toEqual(want);
expect(model.confirmed).toEqual(want);
});

it<ModelTestContext>('pauses and resumes the model', async ({ channelName, ably, logger, streams }) => {
const s1 = streams.newStream({ channel: channelName });
s1.subscribe = vi.fn();
Expand Down Expand Up @@ -297,8 +344,8 @@ describe('Model', () => {
it<ModelTestContext>('fails to register after initialization', async ({ channelName, ably, logger, streams }) => {
// extend the Model class to get access to protected member setState
class ModelWithSetState<T, M extends MutationMethods> extends Model<T, M> {
constructor(readonly name: string, options: ModelOptions) {
super(name, options);
constructor(readonly name: string, channelName: string, options: ModelOptions) {
super(name, channelName, options);
}
setState(state: ModelState) {
super.setState(state);
Expand Down
19 changes: 15 additions & 4 deletions src/Model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { toError } from './Errors.js';
import MutationsRegistry from './MutationsRegistry.js';
import PendingConfirmationRegistry from './PendingConfirmationRegistry.js';
import { IStream } from './Stream.js';
import StreamFactory, { IStreamFactory as IStreamFactory } from './StreamFactory.js';
import StreamFactory, { IStreamFactory } from './StreamFactory.js';
import type { StandardCallback } from './types/callbacks';
import { MergeFunc } from './types/merge.js';
import type {
Expand Down Expand Up @@ -45,7 +45,7 @@ export default class Model<T, M extends MutationMethods> extends EventEmitter<Re
private optimisticData!: T;
private confirmedData!: T;

private sync: SyncFunc<T> = async () => {
private syncFunc: SyncFunc<T> = async () => {
throw new Error('sync func not registered');
};
private merge?: MergeFunc<T>;
Expand Down Expand Up @@ -117,6 +117,17 @@ export default class Model<T, M extends MutationMethods> extends EventEmitter<Re
return this.mutationsRegistry.handler;
}

/**
* The sync function that allows the model to be manually resynced
* @returns A promise that resolves when the model has successfully re-synchronised its state and is ready to start emitting updates.
*/
public get $sync() {
return () => {
this.init();
return new Promise((resolve) => this.whenState('ready', this.state, resolve));
};
}

/**
* Pauses the current model by detaching from the underlying channels and pausing processing of updates.
* @returns A promise that resolves when the model has been paused.
Expand Down Expand Up @@ -175,7 +186,7 @@ export default class Model<T, M extends MutationMethods> extends EventEmitter<Re
);
}
this.wasRegistered = true;
this.sync = registration.$sync;
this.syncFunc = registration.$sync;

if (registration.$merge) {
this.merge = registration.$merge;
Expand Down Expand Up @@ -289,7 +300,7 @@ export default class Model<T, M extends MutationMethods> extends EventEmitter<Re
this.removeStream();
this.addStream(this.stream.channel);

const data = await this.sync();
const data = await this.syncFunc();
this.setOptimisticData(data);
this.setConfirmedData(data);
this.setState('ready');
Expand Down
34 changes: 34 additions & 0 deletions src/StreamRegistry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import Stream, { IStream, StreamOptions } from './Stream.js';

export interface IStreamRegistry {
newStream(options: Pick<StreamOptions, 'channel'>): IStream;
//get streams(): { [key: string]: IStream };
}

/**
* The StreamRegistry class encapsulates a set of names stream instances that are
* used to deliver change events to a model.
*/
export default class StreamRegistry implements IStreamRegistry {
/**
* @param {Pick<StreamOptions, 'ably' | 'logger'>} options - The default options used when instantiating a stream.
*/
constructor(private readonly options: Pick<StreamOptions, 'ably' | 'logger' | 'eventBufferOptions'>) {
if (options.eventBufferOptions) {
const bufferMs = options.eventBufferOptions?.bufferMs || 0;
if (bufferMs < 0) {
throw new Error(`EventBufferOptions bufferMs cannot be less than zero: ${bufferMs}`);
}
}
}

/**
* Retrieve an existing stream instance for the given channel or create a new one if it doesn't yet exist.
* @param {Pick<StreamOptions, 'channel'>} options - The options used in conjunction with the default options when instantiating a stream
* @returns {IStream} The pre-existing or newly created stream instance.
*/
// TODO: should this cache the streams?
newStream(options: Pick<StreamOptions, 'channel'>) {
return new Stream(Object.assign(this.options, options));
}
}
Loading