Skip to content

Commit

Permalink
Rename StreamRegistry to StreamFactory
Browse files Browse the repository at this point in the history
The StreamFactory is now only responsible for creating streams given the
channel name and the set of default options. It doesn't hold on to the
references to stream after creating them.
  • Loading branch information
zknill committed Oct 13, 2023
1 parent dba28d4 commit 99e6914
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 25 deletions.
18 changes: 9 additions & 9 deletions src/Model.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@ import { it, describe, expect, afterEach, vi, beforeEach } from 'vitest';

import Model from './Model.js';
import { StreamOptions, IStream, StreamState } from './Stream.js';
import { IStreamRegistry } from './StreamRegistry.js';
import { IStreamFactory } from './StreamFactory.js';
import type { ModelState, ModelStateChange, ModelOptions, Event } from './types/model.d.ts';
import type { MutationMethods, EventComparator, MutationContext } from './types/mutations.d.ts';
import { createMessage, customMessage } from './utilities/test/messages.js';

vi.mock('ably/promises');

// Mocks the StreamRegistry import so that we can modify the Stream instances
// Mocks the StreamFactory import so that we can modify the Stream instances
// used by the model to spy on their methods.
// This implementation ensures that all instances of StreamRegistry use the
// same cache of Stream instances so that the StreamRegistry instantiated in the
// model returns the same Stream instances as the StreamRegistry instantiated
// This implementation ensures that all instances of StreamFactory use the
// same cache of Stream instances so that the StreamFactory instantiated in the
// model returns the same Stream instances as the StreamFactory instantiated
// in these tests.
vi.mock('./StreamRegistry', () => {
vi.mock('./StreamFactory', () => {
class MockStream implements IStream {
constructor(readonly options: Pick<StreamOptions, 'channel'>) {}
get state() {
Expand All @@ -39,7 +39,7 @@ vi.mock('./StreamRegistry', () => {
const streams: { [key: string]: IStream } = {};

return {
default: class implements IStreamRegistry {
default: class implements IStreamFactory {
newStream(options: Pick<StreamOptions, 'channel'>) {
if (!streams[options.channel]) {
streams[options.channel] = new MockStream(options);
Expand All @@ -65,7 +65,7 @@ const simpleTestData: TestData = {
};

interface ModelTestContext extends ModelOptions {
streams: IStreamRegistry;
streams: IStreamFactory;
channelName: string;
}

Expand All @@ -88,7 +88,7 @@ describe('Model', () => {
const logger = pino({ level: 'silent' });
context.ably = ably;
context.logger = logger;
const { default: provider } = await import('./StreamRegistry.js');
const { default: provider } = await import('./StreamFactory.js');
context.streams = new provider({ ably, logger });
context.channelName = 'models:myModelTest:events';
});
Expand Down
8 changes: 3 additions & 5 deletions src/Model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { toError } from './Errors.js';
import MutationsRegistry from './MutationsRegistry.js';
import PendingConfirmationRegistry from './PendingConfirmationRegistry.js';
import { IStream } from './Stream.js';
import StreamRegistry, { IStreamRegistry } from './StreamRegistry.js';
import StreamFactory, { IStreamFactory as IStreamFactory } from './StreamFactory.js';
import type { StandardCallback } from './types/callbacks';
import { MergeFunc } from './types/merge.js';
import type {
Expand Down Expand Up @@ -51,9 +51,7 @@ export default class Model<T, M extends MutationMethods> extends EventEmitter<Re
private merge?: MergeFunc<T>;

private readonly stream: IStream;

// TODO: make rename to stream factory
private readonly streamFactory: IStreamRegistry;
private readonly streamFactory: IStreamFactory;
private readonly mutationsRegistry: MutationsRegistry<M>;

private optimisticEvents: OptimisticEventWithParams[] = [];
Expand All @@ -76,7 +74,7 @@ export default class Model<T, M extends MutationMethods> extends EventEmitter<Re
this.logger = options.logger;
this.baseLogContext = { scope: `Model:${name}` };

this.streamFactory = new StreamRegistry({
this.streamFactory = new StreamFactory({
ably: options.ably,
logger: options.logger,
eventBufferOptions: options.eventBufferOptions,
Expand Down
9 changes: 4 additions & 5 deletions src/StreamRegistry.test.ts → src/StreamFactory.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Subject } from 'rxjs';
import { it, describe, expect, beforeEach, afterEach, vi } from 'vitest';

import Stream, { StreamOptions, StreamState } from './Stream.js';
import StreamRegistry from './StreamRegistry.js';
import StreamFactory from './StreamFactory.js';
import { createMessage } from './utilities/test/messages.js';

vi.mock('ably/promises');
Expand Down Expand Up @@ -229,19 +229,18 @@ describe('Stream', () => {
});

it<StreamTestContext>('succeeds with gte zero event buffer ms', async ({ ably, logger }) => {
new StreamRegistry({ eventBufferOptions: { bufferMs: 0 }, ably, logger });
new StreamRegistry({ eventBufferOptions: { bufferMs: 1 }, ably, logger });
new StreamFactory({ eventBufferOptions: { bufferMs: 0 }, ably, logger });
new StreamFactory({ eventBufferOptions: { bufferMs: 1 }, ably, logger });
});

it<StreamTestContext>('fails with lt zero event buffer ms', async ({ ably, logger }) => {
try {
new StreamRegistry({ eventBufferOptions: { bufferMs: -1 }, ably, logger });
new StreamFactory({ eventBufferOptions: { bufferMs: -1 }, ably, logger });
expect(true).toBe(false);
} catch (err) {
expect(err.toString(), 'Stream registry should have thrown an error').not.toContain('AssertionError');
}
});

// TODO discontinuity
// TODO reauth https://ably.com/docs/realtime/channels?lang=nodejs#fatal-errors
});
11 changes: 5 additions & 6 deletions src/StreamRegistry.ts → src/StreamFactory.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
import Stream, { IStream, StreamOptions } from './Stream.js';

export interface IStreamRegistry {
export interface IStreamFactory {
newStream(options: Pick<StreamOptions, 'channel'>): IStream;
//get streams(): { [key: string]: IStream };
}

/**
* The StreamRegistry class encapsulates a set of names stream instances that are
* The StreamFactory class creates Stream instances that are
* used to deliver change events to a model.
*/
export default class StreamRegistry implements IStreamRegistry {
export default class StreamFactory implements IStreamFactory {
/**
* @param {Pick<StreamOptions, 'ably' | 'logger'>} options - The default options used when instantiating a stream.
*/
Expand All @@ -23,9 +22,9 @@ export default class StreamRegistry implements IStreamRegistry {
}

/**
* Retrieve an existing stream instance for the given channel or create a new one if it doesn't yet exist.
* Create a new Stream instance for the given channel.
* @param {Pick<StreamOptions, 'channel'>} options - The options used in conjunction with the default options when instantiating a stream
* @returns {IStream} The pre-existing or newly created stream instance.
* @returns {IStream} The newly created stream instance.
*/
// TODO: should this cache the streams?
newStream(options: Pick<StreamOptions, 'channel'>) {
Expand Down

0 comments on commit 99e6914

Please sign in to comment.