Skip to content

Commit

Permalink
model: integrate sync func with stream rewind
Browse files Browse the repository at this point in the history
The SyncFunc now returns the sequenceID and stateTimestamp along with
the model state snapshot. This is used by the model to rewind to the
correct point in the stream.
  • Loading branch information
mschristensen committed Oct 17, 2023
1 parent e09b579 commit 73c671e
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 57 deletions.
6 changes: 5 additions & 1 deletion src/Model.discontinuity.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ describe('Model', () => {

let counter = 0;

const sync = vi.fn(async () => `${counter++}`);
const sync = vi.fn(async () => ({
data: `${counter++}`,
sequenceID: '0',
stateTimestamp: new Date(),
}));
const model = new Model<string>('test', { ably, channelName, logger });
const mergeFn = vi.fn(async (_, event) => {
return event.data;
Expand Down
139 changes: 103 additions & 36 deletions src/Model.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ vi.mock('./stream/StreamFactory', () => {
}
async pause() {}
async resume() {}
subscribe(): void {}
async subscribe() {}
unsubscribe(): void {}
async dispose() {}
async reset() {}
async sync() {}
}

const streams: { [key: string]: IStream } = {};
Expand Down Expand Up @@ -90,7 +90,7 @@ describe('Model', () => {
const synchronised = new Promise((resolve) => (completeSync = resolve));
const sync = vi.fn(async () => {
await synchronised;
return simpleTestData;
return { data: simpleTestData, sequenceID: '0', stateTimestamp: new Date() };
});
const model = new Model<TestData>('test', {
ably,
Expand All @@ -117,7 +117,7 @@ describe('Model', () => {
const sync = vi.fn(async () => {
await synchronised;

return { ...simpleTestData, bar: { baz: ++counter } };
return { data: { ...simpleTestData, bar: { baz: ++counter } }, sequenceID: '0', stateTimestamp: new Date() };
});
const model = new Model<TestData>('test', {
ably,
Expand All @@ -128,7 +128,6 @@ describe('Model', () => {
await statePromise(model, 'preparing');
completeSync();

await ready;
await statePromise(model, 'ready');
await expect(ready).resolves.toEqual({ current: 'ready', previous: 'preparing', reason: undefined });

Expand Down Expand Up @@ -160,7 +159,11 @@ describe('Model', () => {
s1.subscribe = vi.fn();
s1.pause = vi.fn();
s1.resume = vi.fn();
const sync = vi.fn(async () => simpleTestData);
const sync = vi.fn(async () => ({
data: simpleTestData,
sequenceID: '0',
stateTimestamp: new Date(),
}));

const model = new Model<TestData>('test', { ably, channelName, logger });

Expand All @@ -185,7 +188,11 @@ describe('Model', () => {
const s1 = streams.newStream({ channelName });
s1.subscribe = vi.fn();
s1.unsubscribe = vi.fn();
const sync = vi.fn(async () => simpleTestData);
const sync = vi.fn(async () => ({
data: simpleTestData,
sequenceID: '0',
stateTimestamp: new Date(),
}));

const model = new Model<TestData>('test', { ably, channelName, logger });

Expand All @@ -208,11 +215,15 @@ describe('Model', () => {
channelEvents: new Subject<Types.Message>(),
};

streams.newStream({ channelName }).subscribe = vi.fn((callback) =>
events.channelEvents.subscribe((message) => callback(null, message)),
);
streams.newStream({ channelName }).subscribe = vi.fn(async (callback) => {
events.channelEvents.subscribe((message) => callback(null, message));
});

const sync = vi.fn(async () => 'data_0'); // defines initial version of model
const sync = vi.fn(async () => ({
data: 'data_0',
sequenceID: '0',
stateTimestamp: new Date(),
})); // defines initial version of model
const model = new Model<string>('test', { ably, channelName, logger });

const mergeFn = vi.fn(async (_, event) => event.data);
Expand Down Expand Up @@ -263,7 +274,11 @@ describe('Model', () => {
});

it<ModelTestContext>('subscribes after initialisation', async ({ channelName, ably, logger }) => {
const sync = vi.fn(async () => 'data_0'); // defines initial version of model
const sync = vi.fn(async () => ({
data: 'data_0',
sequenceID: '0',
stateTimestamp: new Date(),
})); // defines initial version of model
const model = new Model<string>('test', { ably, channelName, logger });

await model.$register({ $sync: sync });
Expand Down Expand Up @@ -293,15 +308,19 @@ describe('Model', () => {
s1.subscribe = vi.fn();
const model = new Model<string>('test', { ably, channelName, logger });

const sync = async () => 'foobar';
const sync = async () => ({
data: 'foobar',
sequenceID: '0',
stateTimestamp: new Date(),
});
model.$register({
$sync: sync,
});
expect(() =>
await expect(
model.$register({
$sync: sync,
}),
).toThrow('$register was already called');
).rejects.toThrow('$register was already called');
});

it<ModelTestContext>('fails to register after initialization', async ({ channelName, ably, logger, streams }) => {
Expand All @@ -319,11 +338,15 @@ describe('Model', () => {
s1.subscribe = vi.fn();
const model = new ModelWithSetState<string>('test', { ably, channelName, logger });

const sync = async () => 'foobar';
const sync = async () => ({
data: 'foobar',
sequenceID: '0',
stateTimestamp: new Date(),
});

model.setState('ready');

expect(() => model.$register({ $sync: sync })).toThrow(
await expect(model.$register({ $sync: sync })).rejects.toThrow(
`$register can only be called when the model is in the initialized state`,
);
});
Expand All @@ -335,7 +358,11 @@ describe('Model', () => {

const mergeFn = vi.fn(async (_, event) => event.data);
await model.$register({
$sync: async () => 'data_0',
$sync: async () => ({
data: 'data_0',
sequenceID: '0',
stateTimestamp: new Date(),
}),
$merge: mergeFn,
});

Expand Down Expand Up @@ -373,15 +400,19 @@ describe('Model', () => {
s1.subscribe = vi.fn();

const events = new Subject<Types.Message>();
s1.subscribe = vi.fn((callback) => {
s1.subscribe = vi.fn(async (callback) => {
events.subscribe((message) => callback(null, message));
});

const model = new Model<string>('test', { ably, channelName, logger });

const mergeFn = vi.fn(async (_, event) => event.data);
await model.$register({
$sync: async () => 'data_0',
$sync: async () => ({
data: 'data_0',
sequenceID: '0',
stateTimestamp: new Date(),
}),
$merge: mergeFn,
});

Expand Down Expand Up @@ -426,15 +457,19 @@ describe('Model', () => {
s1.subscribe = vi.fn();

const events = { e1: new Subject<Types.Message>() };
s1.subscribe = vi.fn((callback) => {
s1.subscribe = vi.fn(async (callback) => {
events.e1.subscribe((message) => callback(null, message));
});

const model = new Model<string>('test', { ably, channelName, logger });

const mergeFn = vi.fn(async (_, event) => event.data);
await model.$register({
$sync: async () => 'data_0',
$sync: async () => ({
data: 'data_0',
sequenceID: '0',
stateTimestamp: new Date(),
}),
$merge: mergeFn,
});

Expand Down Expand Up @@ -484,15 +519,19 @@ describe('Model', () => {
s1.subscribe = vi.fn();

const events = { e1: new Subject<Types.Message>() };
s1.subscribe = vi.fn((callback) => {
s1.subscribe = vi.fn(async (callback) => {
events.e1.subscribe((message) => callback(null, message));
});

const model = new Model<string>('test', { ably, channelName, logger });

const mergeFn = vi.fn(async (_, event) => event.data);
await model.$register({
$sync: async () => 'data_0',
$sync: async () => ({
data: 'data_0',
sequenceID: '0',
stateTimestamp: new Date(),
}),
$merge: mergeFn,
});

Expand Down Expand Up @@ -541,15 +580,19 @@ describe('Model', () => {
s1.subscribe = vi.fn();

const events = { e1: new Subject<Types.Message>() };
s1.subscribe = vi.fn((callback) => {
s1.subscribe = vi.fn(async (callback) => {
events.e1.subscribe((message) => callback(null, message));
});

const model = new Model<string>('test', { ably, channelName, logger });

const mergeFn = vi.fn(async (state, event) => state + event.data);
await model.$register({
$sync: async () => '0',
$sync: async () => ({
data: '0',
sequenceID: '0',
stateTimestamp: new Date(),
}),
$merge: mergeFn,
});

Expand Down Expand Up @@ -622,15 +665,19 @@ describe('Model', () => {
s1.subscribe = vi.fn();

const events = { e1: new Subject<Types.Message>() };
s1.subscribe = vi.fn((callback) => {
s1.subscribe = vi.fn(async (callback) => {
events.e1.subscribe((message) => callback(null, message));
});

const model = new Model<string>('test', { ably, channelName, logger });

const mergeFn = vi.fn(async (state, event) => state + event.data);
await model.$register({
$sync: async () => '0',
$sync: async () => ({
data: '0',
sequenceID: '0',
stateTimestamp: new Date(),
}),
$merge: mergeFn,
});

Expand Down Expand Up @@ -710,7 +757,11 @@ describe('Model', () => {
const mergeFn = vi.fn(async (state, event) => state + event.data);

await model.$register({
$sync: async () => '0',
$sync: async () => ({
data: '0',
sequenceID: '0',
stateTimestamp: new Date(),
}),
$merge: mergeFn,
});

Expand Down Expand Up @@ -738,14 +789,18 @@ describe('Model', () => {
const events = {
channelEvents: new Subject<Types.Message>(),
};
s1.subscribe = vi.fn((callback) => {
s1.subscribe = vi.fn(async (callback) => {
events.channelEvents.subscribe((message) => callback(null, message));
});
s1.unsubscribe = vi.fn();

let counter = 0;

const sync = vi.fn(async () => `${counter}`);
const sync = vi.fn(async () => ({
data: `${counter}`,
sequenceID: '0',
stateTimestamp: new Date(),
}));
const model = new Model<string>('test', { ably, channelName, logger });

const mergeFn = vi.fn(async (_, event) => {
Expand Down Expand Up @@ -810,7 +865,11 @@ describe('Model', () => {
return state + event.data;
});
await model.$register({
$sync: async () => '0',
$sync: async () => ({
data: '0',
sequenceID: '0',
stateTimestamp: new Date(),
}),
$merge: mergeFn,
});

Expand Down Expand Up @@ -846,15 +905,19 @@ describe('Model', () => {
s1.subscribe = vi.fn();

const events = { e1: new Subject<Types.Message>() };
s1.subscribe = vi.fn((callback) => {
s1.subscribe = vi.fn(async (callback) => {
events.e1.subscribe((message) => callback(null, message));
});

const model = new Model<string>('test', { ably, channelName, logger });

const mergeFn = vi.fn(async (state, event) => state + event.data);
await model.$register({
$sync: async () => '0',
$sync: async () => ({
data: '0',
sequenceID: '0',
stateTimestamp: new Date(),
}),
$merge: mergeFn,
});

Expand All @@ -880,15 +943,19 @@ describe('Model', () => {
s1.subscribe = vi.fn();

const events = new Subject<Types.Message>();
s1.subscribe = vi.fn((callback) => {
s1.subscribe = vi.fn(async (callback) => {
events.subscribe((message) => callback(null, message));
});

const model = new Model<string>('test', { ably, channelName, logger });

const mergeFn = vi.fn(async (state, event) => state + event.data);
await model.$register({
$sync: async () => '0',
$sync: async () => ({
data: '0',
sequenceID: '0',
stateTimestamp: new Date(),
}),
$merge: mergeFn,
});

Expand Down
Loading

0 comments on commit 73c671e

Please sign in to comment.