Skip to content

Commit

Permalink
stream: rename channel to channelName
Browse files Browse the repository at this point in the history
  • Loading branch information
mschristensen committed Oct 17, 2023
1 parent 77f867a commit e959fbc
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 58 deletions.
58 changes: 29 additions & 29 deletions src/Model.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ vi.mock('ably/promises');
// in these tests.
vi.mock('./stream/StreamFactory', () => {
class MockStream implements IStream {
constructor(readonly options: Pick<StreamOptions, 'channel'>) {}
constructor(readonly options: Pick<StreamOptions, 'channelName'>) {}
get state() {
return StreamState.READY;
}
get channel() {
return this.options.channel;
get channelName() {
return this.options.channelName;
}
async pause() {}
async resume() {}
Expand All @@ -40,11 +40,11 @@ vi.mock('./stream/StreamFactory', () => {

return {
default: class implements IStreamFactory {
newStream(options: Pick<StreamOptions, 'channel'>) {
if (!streams[options.channel]) {
streams[options.channel] = new MockStream(options);
newStream(options: Pick<StreamOptions, 'channelName'>) {
if (!streams[options.channelName]) {
streams[options.channelName] = new MockStream(options);
}
return streams[options.channel];
return streams[options.channelName];
}
},
};
Expand Down Expand Up @@ -157,7 +157,7 @@ describe('Model', () => {
});

it<ModelTestContext>('pauses and resumes the model', async ({ channelName, ably, logger, streams }) => {
const s1 = streams.newStream({ channel: channelName });
const s1 = streams.newStream({ channelName });
s1.subscribe = vi.fn();
s1.pause = vi.fn();
s1.resume = vi.fn();
Expand All @@ -183,7 +183,7 @@ describe('Model', () => {
});

it<ModelTestContext>('disposes of the model', async ({ channelName, ably, logger, streams }) => {
const s1 = streams.newStream({ channel: channelName });
const s1 = streams.newStream({ channelName });
s1.subscribe = vi.fn();
s1.unsubscribe = vi.fn();
const sync = vi.fn(async () => simpleTestData);
Expand All @@ -209,7 +209,7 @@ describe('Model', () => {
channelEvents: new Subject<Types.Message>(),
};

streams.newStream({ channel: channelName }).subscribe = vi.fn((callback) =>
streams.newStream({ channelName }).subscribe = vi.fn((callback) =>
events.channelEvents.subscribe((message) => callback(null, message)),
);

Expand Down Expand Up @@ -290,7 +290,7 @@ describe('Model', () => {
});

it<ModelTestContext>('executes a registered mutation', async ({ channelName, ably, logger, streams }) => {
const s1 = streams.newStream({ channel: channelName });
const s1 = streams.newStream({ channelName });
s1.subscribe = vi.fn();
const model = new Model<string, { foo: (_: MutationContext, a: string, b: number) => Promise<string> }>('test', {
ably,
Expand All @@ -310,7 +310,7 @@ describe('Model', () => {
});

it<ModelTestContext>('fails to register twice', async ({ channelName, ably, logger, streams }) => {
const s1 = streams.newStream({ channel: channelName });
const s1 = streams.newStream({ channelName });
s1.subscribe = vi.fn();
const model = new Model<string, { foo: () => Promise<void> }>('test', { ably, channelName, logger });

Expand Down Expand Up @@ -339,7 +339,7 @@ describe('Model', () => {
}
}

const s1 = streams.newStream({ channel: channelName });
const s1 = streams.newStream({ channelName });
s1.subscribe = vi.fn();
const model = new ModelWithSetState<string, { foo: () => Promise<void> }>('test', { ably, channelName, logger });

Expand All @@ -359,7 +359,7 @@ describe('Model', () => {
logger,
streams,
}) => {
const s1 = streams.newStream({ channel: channelName });
const s1 = streams.newStream({ channelName });
s1.subscribe = vi.fn();
const model = new Model<string, { foo: () => Promise<string> }>('test', { ably, channelName, logger });

Expand All @@ -374,7 +374,7 @@ describe('Model', () => {
});

it<ModelTestContext>('updates model state with optimistic event', async ({ channelName, ably, logger, streams }) => {
const s1 = streams.newStream({ channel: channelName });
const s1 = streams.newStream({ channelName });
s1.subscribe = vi.fn();
const model = new Model<string, { foo: () => Promise<string> }>('test', { ably, channelName, logger });

Expand Down Expand Up @@ -416,7 +416,7 @@ describe('Model', () => {
});

it<ModelTestContext>('confirms an optimistic event', async ({ channelName, ably, logger, streams }) => {
const s1 = streams.newStream({ channel: channelName });
const s1 = streams.newStream({ channelName });
s1.subscribe = vi.fn();

const events = { e1: new Subject<Types.Message>() };
Expand Down Expand Up @@ -470,7 +470,7 @@ describe('Model', () => {
});

it<ModelTestContext>('confirms an optimistic event by uuid', async ({ channelName, ably, logger, streams }) => {
const s1 = streams.newStream({ channel: channelName });
const s1 = streams.newStream({ channelName });
s1.subscribe = vi.fn();

const events = { e1: new Subject<Types.Message>() };
Expand Down Expand Up @@ -529,7 +529,7 @@ describe('Model', () => {
});

it<ModelTestContext>('mutation can access the optimistic events', async ({ channelName, ably, logger, streams }) => {
const s1 = streams.newStream({ channel: channelName });
const s1 = streams.newStream({ channelName });
s1.subscribe = vi.fn();

const events = { e1: new Subject<Types.Message>() };
Expand Down Expand Up @@ -568,7 +568,7 @@ describe('Model', () => {
});

it<ModelTestContext>('explicitly rejects an optimistic event', async ({ channelName, ably, logger, streams }) => {
const s1 = streams.newStream({ channel: channelName });
const s1 = streams.newStream({ channelName });
s1.subscribe = vi.fn();

const events = { e1: new Subject<Types.Message>() };
Expand Down Expand Up @@ -633,7 +633,7 @@ describe('Model', () => {
logger,
streams,
}) => {
const s1 = streams.newStream({ channel: channelName });
const s1 = streams.newStream({ channelName });
s1.subscribe = vi.fn();

const events = { e1: new Subject<Types.Message>() };
Expand Down Expand Up @@ -696,7 +696,7 @@ describe('Model', () => {
});

it<ModelTestContext>('confirms optimistic events out of order', async ({ channelName, ably, logger, streams }) => {
const s1 = streams.newStream({ channel: channelName });
const s1 = streams.newStream({ channelName });
s1.subscribe = vi.fn();

const events = { e1: new Subject<Types.Message>() };
Expand Down Expand Up @@ -779,7 +779,7 @@ describe('Model', () => {
logger,
streams,
}) => {
const s1 = streams.newStream({ channel: channelName });
const s1 = streams.newStream({ channelName });
s1.subscribe = vi.fn();

const events = { e1: new Subject<Types.Message>() };
Expand Down Expand Up @@ -865,7 +865,7 @@ describe('Model', () => {
});

it<ModelTestContext>('revert optimistic events if mutate fails', async ({ channelName, ably, logger, streams }) => {
const s1 = streams.newStream({ channel: channelName });
const s1 = streams.newStream({ channelName });
s1.subscribe = vi.fn();

const model = new Model<
Expand Down Expand Up @@ -910,7 +910,7 @@ describe('Model', () => {

// If applying a received stream update throws, the model reverts to the PREPARING state and re-syncs.
it<ModelTestContext>('resync if stream apply update fails', async ({ channelName, ably, logger, streams }) => {
const s1 = streams.newStream({ channel: channelName });
const s1 = streams.newStream({ channelName });
s1.subscribe = vi.fn();

// event subjects used to invoke the stream subscription callbacks
Expand Down Expand Up @@ -978,7 +978,7 @@ describe('Model', () => {
logger,
streams,
}) => {
const s1 = streams.newStream({ channel: channelName });
const s1 = streams.newStream({ channelName });
s1.subscribe = vi.fn();

const model = new Model<string, { mutation: () => Promise<string> }>('test', { ably, channelName, logger });
Expand Down Expand Up @@ -1018,7 +1018,7 @@ describe('Model', () => {

// Tests if the mutation throws, the the optimistic events are reverted.
it<ModelTestContext>('revert optimistic events if mutation fails', async ({ channelName, ably, logger, streams }) => {
const s1 = streams.newStream({ channel: 's1' });
const s1 = streams.newStream({ channelName: 's1' });
s1.subscribe = vi.fn();

const model = new Model<string, { mutation1: () => Promise<string>; mutation2: () => Promise<string> }>('test', {
Expand Down Expand Up @@ -1065,7 +1065,7 @@ describe('Model', () => {
logger,
streams,
}) => {
const s1 = streams.newStream({ channel: channelName });
const s1 = streams.newStream({ channelName });
s1.subscribe = vi.fn();

const model = new Model<string, { mutation: () => Promise<string> }>('test', { ably, channelName, logger });
Expand Down Expand Up @@ -1103,7 +1103,7 @@ describe('Model', () => {
logger,
streams,
}) => {
const s1 = streams.newStream({ channel: channelName });
const s1 = streams.newStream({ channelName });
s1.subscribe = vi.fn();

const events = { e1: new Subject<Types.Message>() };
Expand Down Expand Up @@ -1144,7 +1144,7 @@ describe('Model', () => {
});

it<ModelTestContext>('optimistic event confirmation timeout', async ({ channelName, ably, logger, streams }) => {
const s1 = streams.newStream({ channel: 's1' });
const s1 = streams.newStream({ channelName: 's1' });
s1.subscribe = vi.fn();

const events = { e1: new Subject<Types.Message>() };
Expand Down
6 changes: 3 additions & 3 deletions src/Model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ export default class Model<T, M extends MutationMethods> extends EventEmitter<Re
logger: options.logger,
eventBufferOptions: options.eventBufferOptions,
});
this.stream = this.streamFactory.newStream({ channel: options.channelName });
this.stream = this.streamFactory.newStream({ channelName: options.channelName });

this.mutationsRegistry = new MutationsRegistry<M>(
{
Expand Down Expand Up @@ -264,7 +264,7 @@ export default class Model<T, M extends MutationMethods> extends EventEmitter<Re
}

for (const event of events) {
if (event.channel !== this.stream.channel) {
if (event.channel !== this.stream.channelName) {
throw new Error(`stream with name '${event.channel}' not registered on model '${this.name}'`);
}
}
Expand Down Expand Up @@ -298,7 +298,7 @@ export default class Model<T, M extends MutationMethods> extends EventEmitter<Re
this.setState('preparing', reason);

this.removeStream();
this.addStream(this.stream.channel);
this.addStream(this.stream.channelName);

const data = await this.syncFunc();
this.setOptimisticData(data);
Expand Down
36 changes: 18 additions & 18 deletions src/stream/Stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { statePromise } from '../utilities/test/promises.js';
vi.mock('ably/promises');

interface StreamTestContext extends StreamOptions {
channel: string;
channelName: string;
ably: Types.RealtimePromise;
logger: Logger;
}
Expand All @@ -34,7 +34,7 @@ describe('Stream', () => {

context.ably = ably;
context.logger = pino({ level: 'silent' });
context.channel = channelName;
context.channelName = channelName;
});

afterEach(() => {
Expand All @@ -44,7 +44,7 @@ describe('Stream', () => {
it<StreamTestContext>('enters ready state when successfully attached to the channel', async ({
ably,
logger,
channel,
channelName: channel,
}) => {
// the promise returned by the subscribe method resolves when we have successfully attached to the channel
let attach: (...args: any[]) => void = () => {
Expand All @@ -55,21 +55,21 @@ describe('Stream', () => {
await attachment;
});

const stream = new Stream({ ably, logger, channel: 'foobar' });
const stream = new Stream({ ably, logger, channelName: 'foobar' });

await statePromise(stream, StreamState.PREPARING);
attach();
await statePromise(stream, StreamState.READY);
expect(ably.channels.get(channel).subscribe).toHaveBeenCalledOnce();
});

it<StreamTestContext>('subscribes to messages', async ({ ably, logger, channel }) => {
it<StreamTestContext>('subscribes to messages', async ({ ably, logger, channelName: channel }) => {
let messages = new Subject<Types.Message>();
ably.channels.get(channel).subscribe = vi.fn<any, any>((callback) => {
messages.subscribe((message) => callback(message));
});

const stream = new Stream({ ably, logger, channel });
const stream = new Stream({ ably, logger, channelName: channel });
await statePromise(stream, StreamState.READY);

const subscriptionSpy = vi.fn();
Expand All @@ -87,13 +87,13 @@ describe('Stream', () => {
expect(ably.channels.get(channel).subscribe).toHaveBeenCalledOnce();
});

it<StreamTestContext>('subscribes with multiple listeners', async ({ ably, logger, channel }) => {
it<StreamTestContext>('subscribes with multiple listeners', async ({ ably, logger, channelName: channel }) => {
let messages = new Subject<Types.Message>();
ably.channels.get(channel).subscribe = vi.fn<any, any>((callback) => {
messages.subscribe((message) => callback(message));
});

const stream = new Stream({ ably, logger, channel });
const stream = new Stream({ ably, logger, channelName: channel });
await statePromise(stream, StreamState.READY);

const subscriptionSpy1 = vi.fn();
Expand All @@ -116,13 +116,13 @@ describe('Stream', () => {
expect(ably.channels.get(channel).subscribe).toHaveBeenCalledOnce();
});

it<StreamTestContext>('unsubscribes to messages', async ({ ably, logger, channel }) => {
it<StreamTestContext>('unsubscribes to messages', async ({ ably, logger, channelName: channel }) => {
let messages = new Subject<Types.Message>();
ably.channels.get(channel).subscribe = vi.fn<any, any>((callback) => {
messages.subscribe((message) => callback(message));
});

const stream = new Stream({ ably, logger, channel });
const stream = new Stream({ ably, logger, channelName: channel });
await statePromise(stream, StreamState.READY);

const subscriptionSpy = vi.fn();
Expand All @@ -143,13 +143,13 @@ describe('Stream', () => {
expect(ably.channels.get(channel).subscribe).toHaveBeenCalledOnce();
});

it<StreamTestContext>('unsubscribes one of two listeners', async ({ ably, logger, channel }) => {
it<StreamTestContext>('unsubscribes one of two listeners', async ({ ably, logger, channelName: channel }) => {
let messages = new Subject<Types.Message>();
ably.channels.get(channel).subscribe = vi.fn<any, any>((callback) => {
messages.subscribe((message) => callback(message));
});

const stream = new Stream({ ably, logger, channel });
const stream = new Stream({ ably, logger, channelName: channel });
await statePromise(stream, StreamState.READY);

const subscriptionSpy1 = vi.fn();
Expand Down Expand Up @@ -177,8 +177,8 @@ describe('Stream', () => {
expect(ably.channels.get(channel).subscribe).toHaveBeenCalledOnce();
});

it<StreamTestContext>('pauses and resumes the stream', async ({ ably, logger, channel }) => {
const stream = new Stream({ ably, logger, channel });
it<StreamTestContext>('pauses and resumes the stream', async ({ ably, logger, channelName: channel }) => {
const stream = new Stream({ ably, logger, channelName: channel });

await statePromise(stream, StreamState.READY);
expect(ably.channels.get(channel).subscribe).toHaveBeenCalledOnce();
Expand All @@ -192,9 +192,9 @@ describe('Stream', () => {
expect(ably.channels.get(channel).attach).toHaveBeenCalledOnce();
});

it<StreamTestContext>('disposes of the stream', async ({ ably, logger, channel }) => {
it<StreamTestContext>('disposes of the stream', async ({ ably, logger, channelName: channel }) => {
ably.channels.release = vi.fn();
const stream = new Stream({ ably, logger, channel });
const stream = new Stream({ ably, logger, channelName: channel });

await statePromise(stream, StreamState.READY);
expect(ably.channels.get(channel).subscribe).toHaveBeenCalledOnce();
Expand All @@ -204,7 +204,7 @@ describe('Stream', () => {
expect(ably.channels.release).toHaveBeenCalledOnce();
});

it<StreamTestContext>('disposes of the stream on channel failed', async ({ ably, logger, channel }) => {
it<StreamTestContext>('disposes of the stream on channel failed', async ({ ably, logger, channelName: channel }) => {
ably.channels.release = vi.fn();
let fail: (...args: any[]) => void = () => {
throw new Error('fail not defined');
Expand All @@ -215,7 +215,7 @@ describe('Stream', () => {
}
});

const stream = new Stream({ ably, logger, channel });
const stream = new Stream({ ably, logger, channelName: channel });

await statePromise(stream, StreamState.READY);
expect(ably.channels.get(channel).subscribe).toHaveBeenCalledOnce();
Expand Down
Loading

0 comments on commit e959fbc

Please sign in to comment.