Skip to content

Commit

Permalink
stream: rename sync to replay
Browse files Browse the repository at this point in the history
  • Loading branch information
mschristensen committed Oct 20, 2023
1 parent 4730b20 commit 322e652
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 31 deletions.
12 changes: 6 additions & 6 deletions src/Model.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ vi.mock('./stream/StreamFactory', () => {
async subscribe() {}
unsubscribe(): void {}
async dispose() {}
async sync() {}
async replay() {}
}

const streams: { [key: string]: IStream } = {};
Expand Down Expand Up @@ -157,7 +157,7 @@ describe('Model', () => {

it<ModelTestContext>('rewinds to the correct point in the stream', async ({ channelName, ably, logger, streams }) => {
const stream = streams.newStream({ channelName });
stream.sync = vi.fn();
stream.replay = vi.fn();

let i = 0;
const sync = vi.fn(async () => {
Expand All @@ -182,13 +182,13 @@ describe('Model', () => {
});

expect(sync).toHaveBeenCalledOnce();
expect(stream.sync).toHaveBeenCalledOnce();
expect(stream.sync).toHaveBeenNthCalledWith(1, '123');
expect(stream.replay).toHaveBeenCalledOnce();
expect(stream.replay).toHaveBeenNthCalledWith(1, '123');

await model.$sync();
expect(sync).toHaveBeenCalledTimes(2);
expect(stream.sync).toHaveBeenCalledTimes(2);
expect(stream.sync).toHaveBeenNthCalledWith(2, '456');
expect(stream.replay).toHaveBeenCalledTimes(2);
expect(stream.replay).toHaveBeenNthCalledWith(2, '456');
});

it<ModelTestContext>('pauses and resumes the model', async ({ channelName, ably, logger, streams }) => {
Expand Down
2 changes: 1 addition & 1 deletion src/Model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ export default class Model<T> extends EventEmitter<Record<ModelState, ModelState
}

private async addStream(sequenceID: string) {
await this.stream.sync(sequenceID);
await this.stream.replay(sequenceID);
const callback = this.onStreamMessage.bind(this);
this.stream.subscribe(callback);
this.streamSubscriptionsMap.set(this.stream, callback);
Expand Down
42 changes: 21 additions & 21 deletions src/stream/Stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ describe('Stream', () => {
);

const stream = new Stream({ ably, logger, channelName: 'foobar' });
const synced = stream.sync('0');
const replayPromise = stream.replay('0');

await statePromise(stream, StreamState.PREPARING);
await expect(synced).resolves.toBeUndefined();
await expect(replayPromise).resolves.toBeUndefined();
expect(stream.state).toBe(StreamState.READY);

expect(channel.subscribe).toHaveBeenCalledOnce();
Expand All @@ -82,10 +82,10 @@ describe('Stream', () => {
);

const stream = new Stream({ ably, logger, channelName: 'foobar' });
const synced = stream.sync('0');
const replayPromise = stream.replay('0');

await statePromise(stream, StreamState.PREPARING);
await expect(synced).rejects.toThrow(/the channel was already attached when calling subscribe()/);
await expect(replayPromise).rejects.toThrow(/the channel was already attached when calling subscribe()/);
expect(stream.state).toBe(StreamState.ERRORED);

expect(channel.subscribe).toHaveBeenCalledTimes(1);
Expand Down Expand Up @@ -123,10 +123,10 @@ describe('Stream', () => {
});

const stream = new Stream({ ably, logger, channelName: 'foobar' });
let synced = stream.sync('1');
let replayPromise = stream.replay('1');

await statePromise(stream, StreamState.PREPARING);
await expect(synced).rejects.toThrow(/insufficient history to seek to sequenceID 1 in stream/);
await expect(replayPromise).rejects.toThrow(/insufficient history to seek to sequenceID 1 in stream/);
expect(stream.state).toBe(StreamState.ERRORED);

expect(channel.subscribe).toHaveBeenCalledOnce();
Expand All @@ -135,10 +135,10 @@ describe('Stream', () => {
expect(channel.history).toHaveBeenNthCalledWith(2, { untilAttach: true, limit: HISTORY_PAGE_SIZE });

i = 0;
synced = stream.sync('2');
replayPromise = stream.replay('2');

await statePromise(stream, StreamState.PREPARING);
await expect(synced).resolves.toBeUndefined();
await expect(replayPromise).resolves.toBeUndefined();
expect(stream.state).toBe(StreamState.READY);
expect(ably.channels.release).toHaveBeenCalledOnce();

Expand Down Expand Up @@ -184,10 +184,10 @@ describe('Stream', () => {
});

const stream = new Stream({ ably, logger, channelName: 'foobar' });
let synced = stream.sync('1');
let replayPromise = stream.replay('1');

await statePromise(stream, StreamState.PREPARING);
await expect(synced).rejects.toThrow(/insufficient history to seek to sequenceID 1 in stream/);
await expect(replayPromise).rejects.toThrow(/insufficient history to seek to sequenceID 1 in stream/);
expect(stream.state).toBe(StreamState.ERRORED);

expect(channel.subscribe).toHaveBeenCalledOnce();
Expand All @@ -197,10 +197,10 @@ describe('Stream', () => {
expect(channel.history).toHaveBeenNthCalledWith(3, { untilAttach: true, limit: HISTORY_PAGE_SIZE });

i = 0;
synced = stream.sync('2');
replayPromise = stream.replay('2');

await statePromise(stream, StreamState.PREPARING);
await expect(synced).resolves.toBeUndefined();
await expect(replayPromise).resolves.toBeUndefined();
expect(stream.state).toBe(StreamState.READY);
expect(ably.channels.release).toHaveBeenCalledOnce();

Expand Down Expand Up @@ -230,7 +230,7 @@ describe('Stream', () => {
});

const stream = new Stream({ ably, logger, channelName });
await stream.sync('0');
await stream.replay('0');
await statePromise(stream, StreamState.READY);

const subscriptionSpy = vi.fn<any, any>();
Expand Down Expand Up @@ -273,7 +273,7 @@ describe('Stream', () => {
const subscriptionSpy = vi.fn<any, any>();
stream.subscribe(subscriptionSpy);

await stream.sync('3');
await stream.replay('3');
await statePromise(stream, StreamState.READY);

// live messages
Expand Down Expand Up @@ -322,7 +322,7 @@ describe('Stream', () => {
const subscriptionSpy = vi.fn<any, any>();
stream.subscribe(subscriptionSpy);

await stream.sync('1');
await stream.replay('1');
await statePromise(stream, StreamState.READY);

// live messages
Expand Down Expand Up @@ -359,7 +359,7 @@ describe('Stream', () => {
);

const stream = new Stream({ ably, logger, channelName });
await stream.sync('0');
await stream.replay('0');
await statePromise(stream, StreamState.READY);

const subscriptionSpy1 = vi.fn();
Expand Down Expand Up @@ -402,7 +402,7 @@ describe('Stream', () => {
);

const stream = new Stream({ ably, logger, channelName });
await stream.sync('0');
await stream.replay('0');
await statePromise(stream, StreamState.READY);

const subscriptionSpy = vi.fn();
Expand Down Expand Up @@ -443,7 +443,7 @@ describe('Stream', () => {
);

const stream = new Stream({ ably, logger, channelName });
await stream.sync('0');
await stream.replay('0');
await statePromise(stream, StreamState.READY);

const subscriptionSpy1 = vi.fn();
Expand Down Expand Up @@ -488,7 +488,7 @@ describe('Stream', () => {
);

const stream = new Stream({ ably, logger, channelName });
await stream.sync('0');
await stream.replay('0');

await statePromise(stream, StreamState.READY);
expect(channel.subscribe).toHaveBeenCalledOnce();
Expand Down Expand Up @@ -519,7 +519,7 @@ describe('Stream', () => {
ably.channels.release = vi.fn();

const stream = new Stream({ ably, logger, channelName });
await stream.sync('0');
await stream.replay('0');

await statePromise(stream, StreamState.READY);
expect(channel.subscribe).toHaveBeenCalledOnce();
Expand Down Expand Up @@ -555,7 +555,7 @@ describe('Stream', () => {
});

const stream = new Stream({ ably, logger, channelName });
await stream.sync('0');
await stream.replay('0');

await statePromise(stream, StreamState.READY);
expect(channel.subscribe).toHaveBeenCalledOnce();
Expand Down
6 changes: 3 additions & 3 deletions src/stream/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ export interface IStream {
get channelName(): string;
pause(): Promise<void>;
resume(): Promise<void>;
sync(sequenceID: string): Promise<void>;
replay(sequenceID: string): Promise<void>;
subscribe(callback: StandardCallback<AblyTypes.Message>): void;
unsubscribe(callback: StandardCallback<AblyTypes.Message>): void;
dispose(reason?: AblyTypes.ErrorInfo | string): Promise<void>;
Expand Down Expand Up @@ -170,8 +170,8 @@ export default class Stream extends EventEmitter<Record<StreamState, StreamState
this.subscriptionMap = new WeakMap();
}

public async sync(sequenceID: string) {
this.logger.trace({ ...this.baseLogContext, action: 'sync()' });
public async replay(sequenceID: string) {
this.logger.trace({ ...this.baseLogContext, action: 'replay()' });
this.setState(StreamState.PREPARING);
try {
await this.reset();
Expand Down

0 comments on commit 322e652

Please sign in to comment.