Skip to content

Commit

Permalink
WIP: integrate stream sync with model
Browse files Browse the repository at this point in the history
  • Loading branch information
mschristensen committed Oct 17, 2023
1 parent e09b579 commit f7b8a56
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 19 deletions.
9 changes: 4 additions & 5 deletions src/Model.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ vi.mock('./stream/StreamFactory', () => {
subscribe(): void {}
unsubscribe(): void {}
async dispose() {}
async reset() {}
async sync() {}
}

const streams: { [key: string]: IStream } = {};
Expand Down Expand Up @@ -128,7 +128,6 @@ describe('Model', () => {
await statePromise(model, 'preparing');
completeSync();

await ready;
await statePromise(model, 'ready');
await expect(ready).resolves.toEqual({ current: 'ready', previous: 'preparing', reason: undefined });

Expand Down Expand Up @@ -297,11 +296,11 @@ describe('Model', () => {
model.$register({
$sync: sync,
});
expect(() =>
await expect(
model.$register({
$sync: sync,
}),
).toThrow('$register was already called');
).rejects.toThrow('$register was already called');
});

it<ModelTestContext>('fails to register after initialization', async ({ channelName, ably, logger, streams }) => {
Expand All @@ -323,7 +322,7 @@ describe('Model', () => {

model.setState('ready');

expect(() => model.$register({ $sync: sync })).toThrow(
await expect(model.$register({ $sync: sync })).rejects.toThrow(
`$register can only be called when the model is in the initialized state`,
);
});
Expand Down
27 changes: 13 additions & 14 deletions src/Model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,9 @@ export default class Model<T> extends EventEmitter<Record<ModelState, ModelState
* The sync function that allows the model to be manually resynced
* @returns A promise that resolves when the model has successfully re-synchronised its state and is ready to start emitting updates.
*/
public get $sync() {
return () => {
this.init();
return new Promise((resolve) => this.whenState('ready', this.state, resolve));
};
public async $sync() {
await this.init();
return new Promise((resolve) => this.whenState('ready', this.state, resolve));
}

/**
Expand Down Expand Up @@ -183,7 +181,7 @@ export default class Model<T> extends EventEmitter<Record<ModelState, ModelState
* @param {Registration<T>} registration - The set of methods to register.
* @returns A promise that resolves when the model has completed the registrtion and is ready to start emitting updates.
*/
public $register(registration: Registration<T>) {
public async $register(registration: Registration<T>) {
if (this.wasRegistered) {
throw new Error('$register was already called');
}
Expand All @@ -199,8 +197,9 @@ export default class Model<T> extends EventEmitter<Record<ModelState, ModelState
this.merge = registration.$merge;
}

this.init();
return new Promise((resolve) => this.whenState('ready', this.state, resolve));
const result = new Promise((resolve) => this.whenState('ready', this.state, resolve));
await this.init();
return result;
}

/**
Expand Down Expand Up @@ -296,7 +295,7 @@ export default class Model<T> extends EventEmitter<Record<ModelState, ModelState
this.setState('preparing', reason);

this.removeStream();
this.addStream();
await this.addStream();

const data = await this.syncFunc();
this.setOptimisticData(data);
Expand All @@ -305,16 +304,15 @@ export default class Model<T> extends EventEmitter<Record<ModelState, ModelState
}

private removeStream() {
this.stream.reset();
const callback = this.streamSubscriptionsMap.get(this.stream);
if (callback) {
this.stream.unsubscribe(callback);
}
this.stream.reset();
this.streamSubscriptionsMap = new WeakMap();
this.stream.dispose();
this.streamSubscriptionsMap.delete(this.stream);
}

private addStream() {
private async addStream() {
const callback: StandardCallback<AblyTypes.Message> = async (err: Error | null, event?: AblyTypes.Message) => {
try {
if (err) {
Expand All @@ -330,9 +328,10 @@ export default class Model<T> extends EventEmitter<Record<ModelState, ModelState
}
await this.onStreamEvent({ ...event!, confirmed: true, rejected, ...(uuid && { uuid }) });
} catch (err) {
this.init(toError(err));
await this.init(toError(err));
}
};
await this.stream.sync('0s', '0');
this.stream.subscribe(callback);
this.streamSubscriptionsMap.set(this.stream, callback);
}
Expand Down

0 comments on commit f7b8a56

Please sign in to comment.