From a21ae41948023e6c7e5d2dbe4e87daf806193a82 Mon Sep 17 00:00:00 2001 From: Mike Christensen Date: Mon, 16 Oct 2023 14:30:09 +0100 Subject: [PATCH] stream: move stream logic to separate folder --- src/Model.test.ts | 20 +++++++-------- src/Model.ts | 4 +-- src/StreamRegistry.ts | 34 -------------------------- src/{ => stream}/SlidingWindow.test.ts | 4 +-- src/{ => stream}/SlidingWindow.ts | 2 +- src/{ => stream}/Stream.test.ts | 4 +-- src/{ => stream}/Stream.ts | 6 ++--- src/{ => stream}/StreamFactory.test.ts | 0 src/{ => stream}/StreamFactory.ts | 0 src/types/model.ts | 2 +- 10 files changed, 21 insertions(+), 55 deletions(-) delete mode 100644 src/StreamRegistry.ts rename src/{ => stream}/SlidingWindow.test.ts (96%) rename src/{ => stream}/SlidingWindow.ts (95%) rename src/{ => stream}/Stream.test.ts (98%) rename src/{ => stream}/Stream.ts (97%) rename src/{ => stream}/StreamFactory.test.ts (100%) rename src/{ => stream}/StreamFactory.ts (100%) diff --git a/src/Model.test.ts b/src/Model.test.ts index 8b30ecd4..3ca235da 100644 --- a/src/Model.test.ts +++ b/src/Model.test.ts @@ -4,8 +4,8 @@ import { Subject } from 'rxjs'; import { it, describe, expect, afterEach, vi, beforeEach } from 'vitest'; import Model from './Model.js'; -import { StreamOptions, IStream, StreamState } from './Stream.js'; -import { IStreamFactory } from './StreamFactory.js'; +import { StreamOptions, IStream, StreamState } from './stream/Stream.js'; +import { IStreamFactory } from './stream/StreamFactory.js'; import type { ModelState, ModelStateChange, ModelOptions, Event } from './types/model.d.ts'; import type { MutationMethods, EventComparator, MutationContext } from './types/mutations.d.ts'; import { createMessage, customMessage } from './utilities/test/messages.js'; @@ -19,7 +19,7 @@ vi.mock('ably/promises'); // same cache of Stream instances so that the StreamFactory instantiated in the // model returns the same Stream instances as the StreamFactory instantiated // in these tests. -vi.mock('./StreamFactory', () => { +vi.mock('./stream/StreamFactory', () => { class MockStream implements IStream { constructor(readonly options: Pick) {} get state() { @@ -74,7 +74,7 @@ describe('Model', () => { const logger = pino({ level: 'silent' }); context.ably = ably; context.logger = logger; - const { default: provider } = await import('./StreamFactory.js'); + const { default: provider } = await import('./stream/StreamFactory.js'); context.streams = new provider({ ably, logger }); context.channelName = 'models:myModelTest:events'; }); @@ -126,11 +126,11 @@ describe('Model', () => { logger, }); const ready = model.$register({ $sync: sync }); - await modelStatePromise(model, 'preparing'); + await statePromise(model, 'preparing'); completeSync(); await ready; - await modelStatePromise(model, 'ready'); + await statePromise(model, 'ready'); await expect(ready).resolves.toEqual({ current: 'ready', previous: 'preparing', reason: undefined }); expect(sync).toHaveBeenCalledOnce(); @@ -145,10 +145,10 @@ describe('Model', () => { }); const resynced = model.$sync(); - await modelStatePromise(model, 'preparing'); + await statePromise(model, 'preparing'); completeSync(); await resynced; - await modelStatePromise(model, 'ready'); + await statePromise(model, 'ready'); expect(sync).toHaveBeenCalledTimes(2); const want = { ...simpleTestData, bar: { baz: 2 } }; @@ -331,8 +331,8 @@ describe('Model', () => { it('fails to register after initialization', async ({ channelName, ably, logger, streams }) => { // extend the Model class to get access to protected member setState class ModelWithSetState extends Model { - constructor(readonly name: string, channelName: string, options: ModelOptions) { - super(name, channelName, options); + constructor(readonly name: string, options: ModelOptions) { + super(name, options); } setState(state: ModelState) { super.setState(state); diff --git a/src/Model.ts b/src/Model.ts index 9b32456d..461c170e 100644 --- a/src/Model.ts +++ b/src/Model.ts @@ -5,8 +5,8 @@ import { Subject, Subscription } from 'rxjs'; import { toError } from './Errors.js'; import MutationsRegistry from './MutationsRegistry.js'; import PendingConfirmationRegistry from './PendingConfirmationRegistry.js'; -import { IStream } from './Stream.js'; -import StreamFactory, { IStreamFactory } from './StreamFactory.js'; +import { IStream } from './stream/Stream.js'; +import StreamFactory, { IStreamFactory as IStreamFactory } from './stream/StreamFactory.js'; import type { StandardCallback } from './types/callbacks'; import { MergeFunc } from './types/merge.js'; import type { diff --git a/src/StreamRegistry.ts b/src/StreamRegistry.ts deleted file mode 100644 index 9b2ee5cb..00000000 --- a/src/StreamRegistry.ts +++ /dev/null @@ -1,34 +0,0 @@ -import Stream, { IStream, StreamOptions } from './Stream.js'; - -export interface IStreamRegistry { - newStream(options: Pick): IStream; - //get streams(): { [key: string]: IStream }; -} - -/** - * The StreamRegistry class encapsulates a set of names stream instances that are - * used to deliver change events to a model. - */ -export default class StreamRegistry implements IStreamRegistry { - /** - * @param {Pick} options - The default options used when instantiating a stream. - */ - constructor(private readonly options: Pick) { - if (options.eventBufferOptions) { - const bufferMs = options.eventBufferOptions?.bufferMs || 0; - if (bufferMs < 0) { - throw new Error(`EventBufferOptions bufferMs cannot be less than zero: ${bufferMs}`); - } - } - } - - /** - * Retrieve an existing stream instance for the given channel or create a new one if it doesn't yet exist. - * @param {Pick} options - The options used in conjunction with the default options when instantiating a stream - * @returns {IStream} The pre-existing or newly created stream instance. - */ - // TODO: should this cache the streams? - newStream(options: Pick) { - return new Stream(Object.assign(this.options, options)); - } -} diff --git a/src/SlidingWindow.test.ts b/src/stream/SlidingWindow.test.ts similarity index 96% rename from src/SlidingWindow.test.ts rename to src/stream/SlidingWindow.test.ts index c1e7506b..a6633bed 100644 --- a/src/SlidingWindow.test.ts +++ b/src/stream/SlidingWindow.test.ts @@ -1,8 +1,8 @@ import { it, describe, expect, vi } from 'vitest'; import SlidingWindow from './SlidingWindow.js'; -import { createMessage } from './utilities/test/messages.js'; -import { timeout } from './utilities/test/promises.js'; +import { createMessage } from '../utilities/test/messages.js'; +import { timeout } from '../utilities/test/promises.js'; describe('SlidingWindow', () => { it('emits events immediately with no timeout', async () => { diff --git a/src/SlidingWindow.ts b/src/stream/SlidingWindow.ts similarity index 95% rename from src/SlidingWindow.ts rename to src/stream/SlidingWindow.ts index c922ebdd..abf2e294 100644 --- a/src/SlidingWindow.ts +++ b/src/stream/SlidingWindow.ts @@ -1,6 +1,6 @@ import { Types as AblyTypes } from 'ably'; -import type { EventOrderer } from './types/mutations'; +import type { EventOrderer } from '../types/mutations'; export default class SlidingWindow { private messages: AblyTypes.Message[] = []; diff --git a/src/Stream.test.ts b/src/stream/Stream.test.ts similarity index 98% rename from src/Stream.test.ts rename to src/stream/Stream.test.ts index 0a6a715e..f725bf60 100644 --- a/src/Stream.test.ts +++ b/src/stream/Stream.test.ts @@ -4,8 +4,8 @@ import { Subject } from 'rxjs'; import { it, describe, expect, beforeEach, afterEach, vi } from 'vitest'; import Stream, { StreamOptions, StreamState } from './Stream.js'; -import { createMessage } from './utilities/test/messages.js'; -import { statePromise } from './utilities/test/promises.js'; +import { createMessage } from '../utilities/test/messages.js'; +import { statePromise } from '../utilities/test/promises.js'; vi.mock('ably/promises'); diff --git a/src/Stream.ts b/src/stream/Stream.ts similarity index 97% rename from src/Stream.ts rename to src/stream/Stream.ts index bae590b7..621e5c9a 100644 --- a/src/Stream.ts +++ b/src/stream/Stream.ts @@ -3,9 +3,9 @@ import { Logger } from 'pino'; import { Subject, Subscription } from 'rxjs'; import SlidingWindow from './SlidingWindow.js'; -import type { StandardCallback } from './types/callbacks'; -import type { EventOrderer } from './types/mutations.js'; -import EventEmitter from './utilities/EventEmitter.js'; +import type { StandardCallback } from '../types/callbacks'; +import type { EventOrderer } from '../types/mutations.js'; +import EventEmitter from '../utilities/EventEmitter.js'; /** * StreamState represents the possible lifecycle states of a stream. diff --git a/src/StreamFactory.test.ts b/src/stream/StreamFactory.test.ts similarity index 100% rename from src/StreamFactory.test.ts rename to src/stream/StreamFactory.test.ts diff --git a/src/StreamFactory.ts b/src/stream/StreamFactory.ts similarity index 100% rename from src/StreamFactory.ts rename to src/stream/StreamFactory.ts diff --git a/src/types/model.ts b/src/types/model.ts index 2394ae52..53c27a90 100644 --- a/src/types/model.ts +++ b/src/types/model.ts @@ -3,7 +3,7 @@ import type { Logger, LevelWithSilent } from 'pino'; import type { MergeFunc } from './merge'; import type { EventComparator, MutationMethods, MutationOptions, MutationRegistration } from './mutations'; -import type { EventBufferOptions } from '../Stream'; +import type { EventBufferOptions } from '../stream/Stream'; /** * Options used to configure all model instances.