Skip to content

Commit

Permalink
stream: move stream logic to separate folder
Browse files Browse the repository at this point in the history
  • Loading branch information
mschristensen committed Oct 17, 2023
1 parent 07fc93c commit a21ae41
Show file tree
Hide file tree
Showing 10 changed files with 21 additions and 55 deletions.
20 changes: 10 additions & 10 deletions src/Model.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import { Subject } from 'rxjs';
import { it, describe, expect, afterEach, vi, beforeEach } from 'vitest';

import Model from './Model.js';
import { StreamOptions, IStream, StreamState } from './Stream.js';
import { IStreamFactory } from './StreamFactory.js';
import { StreamOptions, IStream, StreamState } from './stream/Stream.js';
import { IStreamFactory } from './stream/StreamFactory.js';
import type { ModelState, ModelStateChange, ModelOptions, Event } from './types/model.d.ts';
import type { MutationMethods, EventComparator, MutationContext } from './types/mutations.d.ts';
import { createMessage, customMessage } from './utilities/test/messages.js';
Expand All @@ -19,7 +19,7 @@ vi.mock('ably/promises');
// same cache of Stream instances so that the StreamFactory instantiated in the
// model returns the same Stream instances as the StreamFactory instantiated
// in these tests.
vi.mock('./StreamFactory', () => {
vi.mock('./stream/StreamFactory', () => {
class MockStream implements IStream {
constructor(readonly options: Pick<StreamOptions, 'channel'>) {}
get state() {
Expand Down Expand Up @@ -74,7 +74,7 @@ describe('Model', () => {
const logger = pino({ level: 'silent' });
context.ably = ably;
context.logger = logger;
const { default: provider } = await import('./StreamFactory.js');
const { default: provider } = await import('./stream/StreamFactory.js');
context.streams = new provider({ ably, logger });
context.channelName = 'models:myModelTest:events';
});
Expand Down Expand Up @@ -126,11 +126,11 @@ describe('Model', () => {
logger,
});
const ready = model.$register({ $sync: sync });
await modelStatePromise(model, 'preparing');
await statePromise(model, 'preparing');
completeSync();

await ready;
await modelStatePromise(model, 'ready');
await statePromise(model, 'ready');
await expect(ready).resolves.toEqual({ current: 'ready', previous: 'preparing', reason: undefined });

expect(sync).toHaveBeenCalledOnce();
Expand All @@ -145,10 +145,10 @@ describe('Model', () => {
});

const resynced = model.$sync();
await modelStatePromise(model, 'preparing');
await statePromise(model, 'preparing');
completeSync();
await resynced;
await modelStatePromise(model, 'ready');
await statePromise(model, 'ready');
expect(sync).toHaveBeenCalledTimes(2);

const want = { ...simpleTestData, bar: { baz: 2 } };
Expand Down Expand Up @@ -331,8 +331,8 @@ describe('Model', () => {
it<ModelTestContext>('fails to register after initialization', async ({ channelName, ably, logger, streams }) => {
// extend the Model class to get access to protected member setState
class ModelWithSetState<T, M extends MutationMethods> extends Model<T, M> {
constructor(readonly name: string, channelName: string, options: ModelOptions) {
super(name, channelName, options);
constructor(readonly name: string, options: ModelOptions) {
super(name, options);
}
setState(state: ModelState) {
super.setState(state);
Expand Down
4 changes: 2 additions & 2 deletions src/Model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import { Subject, Subscription } from 'rxjs';
import { toError } from './Errors.js';
import MutationsRegistry from './MutationsRegistry.js';
import PendingConfirmationRegistry from './PendingConfirmationRegistry.js';
import { IStream } from './Stream.js';
import StreamFactory, { IStreamFactory } from './StreamFactory.js';
import { IStream } from './stream/Stream.js';
import StreamFactory, { IStreamFactory as IStreamFactory } from './stream/StreamFactory.js';
import type { StandardCallback } from './types/callbacks';
import { MergeFunc } from './types/merge.js';
import type {
Expand Down
34 changes: 0 additions & 34 deletions src/StreamRegistry.ts

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { it, describe, expect, vi } from 'vitest';

import SlidingWindow from './SlidingWindow.js';
import { createMessage } from './utilities/test/messages.js';
import { timeout } from './utilities/test/promises.js';
import { createMessage } from '../utilities/test/messages.js';
import { timeout } from '../utilities/test/promises.js';

describe('SlidingWindow', () => {
it('emits events immediately with no timeout', async () => {
Expand Down
2 changes: 1 addition & 1 deletion src/SlidingWindow.ts → src/stream/SlidingWindow.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Types as AblyTypes } from 'ably';

import type { EventOrderer } from './types/mutations';
import type { EventOrderer } from '../types/mutations';

export default class SlidingWindow {
private messages: AblyTypes.Message[] = [];
Expand Down
4 changes: 2 additions & 2 deletions src/Stream.test.ts → src/stream/Stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import { Subject } from 'rxjs';
import { it, describe, expect, beforeEach, afterEach, vi } from 'vitest';

import Stream, { StreamOptions, StreamState } from './Stream.js';
import { createMessage } from './utilities/test/messages.js';
import { statePromise } from './utilities/test/promises.js';
import { createMessage } from '../utilities/test/messages.js';
import { statePromise } from '../utilities/test/promises.js';

vi.mock('ably/promises');

Expand Down
6 changes: 3 additions & 3 deletions src/Stream.ts → src/stream/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ import { Logger } from 'pino';
import { Subject, Subscription } from 'rxjs';

import SlidingWindow from './SlidingWindow.js';
import type { StandardCallback } from './types/callbacks';
import type { EventOrderer } from './types/mutations.js';
import EventEmitter from './utilities/EventEmitter.js';
import type { StandardCallback } from '../types/callbacks';
import type { EventOrderer } from '../types/mutations.js';
import EventEmitter from '../utilities/EventEmitter.js';

/**
* StreamState represents the possible lifecycle states of a stream.
Expand Down
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion src/types/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { Logger, LevelWithSilent } from 'pino';

import type { MergeFunc } from './merge';
import type { EventComparator, MutationMethods, MutationOptions, MutationRegistration } from './mutations';
import type { EventBufferOptions } from '../Stream';
import type { EventBufferOptions } from '../stream/Stream';

/**
* Options used to configure all model instances.
Expand Down

0 comments on commit a21ae41

Please sign in to comment.