, Application: EffectfulComponent\)
+
+
+
+This is nearly identical to the `run` found inside of `@motorcycle/run`. The
+only difference is that it makes use of the test scheduler to create the
+application's event loop. An additional property is returned with the `tick`
+that allows you to control how time progresses.
+
+
+
+
+
+ See an example
+
+```typescript
+import { run } from '@motorcycle/test'
+import { makeDomComponent, div, button, h2, query, clickEvent } from '@motorcycle/dom'
+
+function UI(sources) {
+ const { dom } = sources
+
+ const click$ = clickEvent(query('button', dom))
+
+ const count$ = scan(x => x + 1, click$)
+
+ const view$ = map(view, count$)
+
+ return { view$ }
+}
+
+function view(count: number) {
+ return div([
+ h2(`Clicked ${count} times`),
+ button('Click Me'),
+ ])
+}
+
+const Dom = fakeDomComponent({
+ 'button': {
+ click: now(fakeEvent())
+ }
+})
+
+const { tick, dispose } = run(UI, Dom)
+
+tick(500).then(dispose)
+```
+
+
+
+
+ See the code
+
+```typescript
+
+export function run<
+ Sources extends Readonly>,
+ Sinks extends Readonly>>
+>(UI: Component, Application: EffectfulComponent) {
+ const { stream: endSignal } = createProxy()
+
+ const sinkProxies = {} as Record>
+ const proxySinks: Sinks = createProxySinks(sinkProxies, endSignal)
+ const sources: Sources = Application(proxySinks)
+ const sinks: Sinks = createDisposableSinks(UI(sources), endSignal)
+
+ const { disposable, tick } = replicateSinks(sinks, sinkProxies)
+
+ function dispose() {
+ endSignal.event(scheduler.now(), void 0)
+ disposable.dispose()
+ disposeSources(sources)
+ }
+
+ return { sinks, sources, dispose, tick }
+}
+
+```
+
+
+
+
diff --git a/packages/test/package.json b/packages/test/package.json
new file mode 100644
index 00000000..c1ba7567
--- /dev/null
+++ b/packages/test/package.json
@@ -0,0 +1,43 @@
+{
+ "name": "@motorcycle/test",
+ "version": "0.0.0",
+ "description": "Testing functions for Motorcycle.ts",
+ "main": "lib/index.js",
+ "module": "lib.es2015/index.js",
+ "jsnext:main": "lib.es2015/index.js",
+ "typings": "lib/index.d.ts",
+ "types": "lib/index.d.ts",
+ "scripts": {
+ "build": "cd ../../ && node ./tools/build.js --only test",
+ "test": "yarn test:lint && yarn test:unit",
+ "test:unit": "../../node_modules/.bin/typed-test 'src/*.test.ts' 'src/**/*.test.ts'",
+ "test:lint": "../../node_modules/.bin/prettier --write --print-width 100 --tab-width 2 --no-semi --single-quote --trailing-comma es5 --parser typescript src/**/*.ts"
+ },
+ "repository": {
+ "type": "git",
+ "url": "git+https://github.com/motorcyclets/motorcycle.git"
+ },
+ "keywords": [
+ "test",
+ "typescript",
+ "motorcycle",
+ "most",
+ "functional",
+ "reactive",
+ "streams"
+ ],
+ "author": "Tylor Steinberger ",
+ "license": "MIT",
+ "bugs": {
+ "url": "https://github.com/motorcyclets/motorcycle/issues"
+ },
+ "homepage": "https://github.com/motorcyclets/motorcycle#readme",
+ "dependencies": {
+ "@most/scheduler": "0.11.0",
+ "@motorcycle/stream": "1.1.0",
+ "@motorcycle/types": "1.1.0"
+ },
+ "devDependencies": {
+ "@most/core": "0.11.2"
+ }
+}
diff --git a/packages/test/src/collectEventsFor/collectEventsFor.test.ts b/packages/test/src/collectEventsFor/collectEventsFor.test.ts
new file mode 100644
index 00000000..7d7a263a
--- /dev/null
+++ b/packages/test/src/collectEventsFor/collectEventsFor.test.ts
@@ -0,0 +1,14 @@
+import { Test, describe, given, it } from '@typed/test'
+import { periodic, scan, skip } from '@motorcycle/stream'
+
+import { collectEventsFor } from './collectEventsFor'
+
+export const test: Test = describe(`collectEvents`, [
+ given(`a delay and a Stream`, [
+ it(`returns Promise of Streams events up to that delay time`, ({ equal }) => {
+ const stream = scan(x => x + 1, 0, skip(1, periodic(10)))
+
+ return collectEventsFor(30, stream).then(equal([0, 1, 2, 3]))
+ }),
+ ]),
+])
diff --git a/packages/test/src/collectEventsFor/collectEventsFor.ts b/packages/test/src/collectEventsFor/collectEventsFor.ts
new file mode 100644
index 00000000..583eaed4
--- /dev/null
+++ b/packages/test/src/collectEventsFor/collectEventsFor.ts
@@ -0,0 +1,35 @@
+import { Delay, Stream } from '@motorcycle/types'
+import { runEffects, tap } from '@motorcycle/stream'
+
+import { createTestScheduler } from '../createTestScheduler'
+import { curry2 } from '@most/prelude'
+
+/**
+ * Collects events for a given amount of time.
+ * @name collectEventsFor(delay: Delay, stream: Stream): Promise>
+ * @example
+ * // Mocha style tests
+ * it('increasing value by one', () => {
+ * const stream = scan(x => x + 1, skip(1, periodic(10)))
+ *
+ * return collectEventsFor(30, stream).then(events => assert.deepEqual(events, [0, 1, 2, 3]))
+ * })
+ */
+export const collectEventsFor: CollectEventsFor = curry2(function collectEventsFor(
+ delay: Delay,
+ stream: Stream
+) {
+ const { tick, scheduler } = createTestScheduler()
+
+ const eventList: Array = []
+
+ runEffects(tap(a => eventList.push(a), stream), scheduler)
+
+ return tick(delay).then(() => eventList.slice())
+})
+
+export interface CollectEventsFor {
+ (delay: Delay, stream: Stream): Promise>
+ (delay: Delay): (stream: Stream) => Promise>
+ (delay: Delay): (stream: Stream) => Promise>
+}
diff --git a/packages/test/src/collectEventsFor/index.ts b/packages/test/src/collectEventsFor/index.ts
new file mode 100644
index 00000000..47ad416f
--- /dev/null
+++ b/packages/test/src/collectEventsFor/index.ts
@@ -0,0 +1 @@
+export * from './collectEventsFor'
diff --git a/packages/test/src/createTestScheduler/VirtualTimer.test.ts b/packages/test/src/createTestScheduler/VirtualTimer.test.ts
new file mode 100644
index 00000000..55bb4000
--- /dev/null
+++ b/packages/test/src/createTestScheduler/VirtualTimer.test.ts
@@ -0,0 +1,58 @@
+import { Test, describe, given, it } from '@typed/test'
+
+import { VirtualTimer } from './VirtualTimer'
+
+export const test: Test = describe(`VirtualTimer`, [
+ it(`is a Timer`, ({ equal }) => {
+ const timer = new VirtualTimer()
+
+ equal('function', typeof timer.now)
+ equal('number', typeof timer.now())
+ equal('function', typeof timer.setTimer)
+ equal('function', typeof timer.clearTimer)
+ }),
+
+ describe(`tick`, [
+ given(`a delay time`, [
+ it(`ticks time forward by delay after running tasks`, ({ equal }) => {
+ const timer = new VirtualTimer()
+ const delay = 100
+
+ timer.setTimer(() => void 0, delay)
+
+ return timer.tick(delay).then(() => {
+ equal(delay, timer.now())
+ })
+ }),
+ ]),
+ ]),
+
+ describe(`setTimer`, [
+ given(`a function and a delay`, [
+ it(`returns a handle`, ({ notEqual }) => {
+ const timer = new VirtualTimer()
+
+ const handle = timer.setTimer(() => {}, 0)
+
+ notEqual(void 0, handle)
+ }),
+ ]),
+ ]),
+
+ describe(`clearTimer`, [
+ given(`a handle`, [
+ it(`cancels the task`, ({ ok }, done) => {
+ const timer = new VirtualTimer()
+
+ const handle = timer.setTimer(() => done(new Error('Foo')), 0)
+
+ timer.clearTimer(handle)
+
+ timer.tick(100).then(() => {
+ ok(true)
+ done()
+ })
+ }),
+ ]),
+ ]),
+])
diff --git a/packages/test/src/createTestScheduler/VirtualTimer.ts b/packages/test/src/createTestScheduler/VirtualTimer.ts
new file mode 100644
index 00000000..60a5059c
--- /dev/null
+++ b/packages/test/src/createTestScheduler/VirtualTimer.ts
@@ -0,0 +1,95 @@
+import { Delay, Handle, Time, Timer } from '@motorcycle/types'
+
+/**
+ * A Timer instance with control over how time progresses.
+ *
+ * @name VirtualTimer
+ * @example
+ * import { VirtualTimer } from '@motorcycle/test'
+ *
+ * const timer = new VirtualTimer()
+ *
+ * timer.setTimer(() => console.log('Hello'), 100)
+ *
+ * timer.tick(100)
+ */
+export class VirtualTimer implements Timer {
+ protected time: Time = 0
+ protected targetTime: Time = 0
+ protected currentTime: Time = Infinity
+ protected task: (() => any) | void = void 0
+ protected timer: Handle
+ protected active: boolean = false
+ protected running: boolean = false
+ protected key: Handle = {}
+ protected promise: Promise = Promise.resolve()
+
+ constructor() {}
+
+ public now(): Time {
+ return this.time
+ }
+
+ public setTimer(fn: () => any, delay: Delay): Handle {
+ if (this.task !== void 0) throw new Error('Virtualtimer: Only supports one in-flight task')
+
+ this.task = fn
+ this.currentTime = this.time + Math.max(0, delay)
+ if (this.active) this.run()
+
+ return this.key
+ }
+
+ public clearTimer(handle: Handle) {
+ if (handle !== this.key) return
+
+ clearTimeout(this.timer)
+ this.timer = void 0
+
+ this.currentTime = Infinity
+ this.task = void 0
+ }
+
+ public tick(delay: Delay) {
+ if (delay <= 0) return this.promise
+
+ this.targetTime = this.targetTime + delay
+
+ return this.run()
+ }
+
+ protected run() {
+ if (this.running) return this.promise
+
+ this.running = true
+ this.active = true
+
+ return new Promise((resolve, reject) => {
+ this.timer = setTimeout(() => {
+ this.step().then(() => resolve()).catch(reject)
+ }, 0)
+ })
+ }
+
+ protected step() {
+ return new Promise((resolve, reject) => {
+ if (this.time >= this.targetTime) {
+ this.time = this.targetTime
+ this.currentTime = Infinity
+ this.running = false
+ return resolve()
+ }
+
+ const task = this.task
+
+ this.task = void 0
+
+ this.time = this.currentTime
+ this.currentTime = Infinity
+
+ if (typeof task === 'function') task()
+
+ this.timer = setTimeout(() => this.step().then(() => resolve()).catch(reject), 0)
+ })
+ }
+}
diff --git a/packages/test/src/createTestScheduler/createTestScheduler.test.ts b/packages/test/src/createTestScheduler/createTestScheduler.test.ts
new file mode 100644
index 00000000..f3a22fe0
--- /dev/null
+++ b/packages/test/src/createTestScheduler/createTestScheduler.test.ts
@@ -0,0 +1,40 @@
+import { Test, describe, it } from '@typed/test'
+
+import { Sink } from '@motorcycle/types'
+import { createTestScheduler } from './createTestScheduler'
+import { propagateEventTask } from '@most/core'
+
+export const test: Test = describe(`createTestScheduler`, [
+ describe(`TestScheduler`, [
+ it(`returns time starting from 0`, ({ equal }) => {
+ const { scheduler } = createTestScheduler()
+
+ equal(0, scheduler.now())
+ }),
+
+ describe(`tick`, [
+ it(`increases time after running tasks`, ({ equal }, done) => {
+ const { tick, scheduler } = createTestScheduler()
+ const delay = 100
+ const expectedValue = 500
+
+ const sink: Sink = {
+ event(time, value) {
+ equal(delay, time)
+ equal(expectedValue, value)
+ equal(delay, scheduler.now())
+ done()
+ },
+ error(_, err) {
+ done(err)
+ },
+ end() {},
+ }
+
+ scheduler.delay(delay, propagateEventTask(expectedValue, sink))
+
+ tick(delay)
+ }),
+ ]),
+ ]),
+])
diff --git a/packages/test/src/createTestScheduler/createTestScheduler.ts b/packages/test/src/createTestScheduler/createTestScheduler.ts
new file mode 100644
index 00000000..9b8652c7
--- /dev/null
+++ b/packages/test/src/createTestScheduler/createTestScheduler.ts
@@ -0,0 +1,48 @@
+import { Delay, Scheduler, Timeline } from '@motorcycle/types'
+import { newScheduler, newTimeline } from '@most/scheduler'
+
+import { VirtualTimer } from './VirtualTimer'
+
+/**
+ * Creates a test scheduler. Using the test scheduler you are the master of time.
+ *
+ * @name createTestScheduler(timeline?: Timeline): TestScheduler
+ * @example
+ * import { createTestScheduler } from '@motorcycle/test'
+ * import { now, runEffects } from '@motorcycle/stream'
+ *
+ * const { tick, scheduler } createTestScheduler()
+ *
+ * const stream = now(100)
+ *
+ * runEffects(stream, scheduler).then(() => console.log('done!'))
+ *
+ * // manually tick forward in time
+ * // tick returns a Promise that resolves when all scheduled tasks have been run.
+ * tick(100)
+ */
+export function createTestScheduler(timeline: Timeline = newTimeline()): TestScheduler {
+ const timer = new VirtualTimer()
+
+ const tick = (delay: Delay) => timer.tick(delay)
+
+ const scheduler: Scheduler = newScheduler(timer, timeline)
+
+ return { tick, scheduler }
+}
+
+/**
+ * TestScheduler
+ *
+ * @name TestScheduler
+ * @example
+ * export type TestScheduler = {
+ * readonly tick: (delay: Delay) => Promise
+ * readonly scheduler: Scheduler
+ * }
+ * @type
+ */
+export type TestScheduler = {
+ readonly tick: (delay: Delay) => Promise
+ readonly scheduler: Scheduler
+}
diff --git a/packages/test/src/createTestScheduler/index.ts b/packages/test/src/createTestScheduler/index.ts
new file mode 100644
index 00000000..500dce35
--- /dev/null
+++ b/packages/test/src/createTestScheduler/index.ts
@@ -0,0 +1,2 @@
+export * from './createTestScheduler'
+export * from './VirtualTimer'
diff --git a/packages/test/src/index.ts b/packages/test/src/index.ts
new file mode 100644
index 00000000..cb0b0dfa
--- /dev/null
+++ b/packages/test/src/index.ts
@@ -0,0 +1,3 @@
+export * from './collectEventsFor'
+export * from './createTestScheduler'
+export * from './run'
diff --git a/packages/test/src/run/disposeSources.test.ts b/packages/test/src/run/disposeSources.test.ts
new file mode 100644
index 00000000..ee37695c
--- /dev/null
+++ b/packages/test/src/run/disposeSources.test.ts
@@ -0,0 +1,23 @@
+import { Test, describe, given, it } from '@typed/test'
+
+import { disposeSources } from './disposeSources'
+
+export const test: Test = describe(`disposeSources`, [
+ given(`given Sources with a dispose method`, [
+ it(`calls the Source's dispose method`, ({ equal }) => {
+ let called = 0
+
+ const source = {
+ dispose() {
+ ++called
+ },
+ }
+
+ const sources = { source }
+
+ disposeSources(sources)
+
+ equal(1, called)
+ }),
+ ]),
+])
diff --git a/packages/test/src/run/disposeSources.ts b/packages/test/src/run/disposeSources.ts
new file mode 100644
index 00000000..bfba3727
--- /dev/null
+++ b/packages/test/src/run/disposeSources.ts
@@ -0,0 +1,13 @@
+import { Disposable } from '@motorcycle/types'
+
+export function disposeSources(sources: Sources): void {
+ Object.keys(sources).forEach(function(key: keyof Sources) {
+ const source = sources[key]
+
+ if (isDisposable(source)) source.dispose()
+ })
+}
+
+function isDisposable(x: any): x is Disposable {
+ return x && typeof x.dispose === 'function'
+}
diff --git a/packages/test/src/run/index.ts b/packages/test/src/run/index.ts
new file mode 100644
index 00000000..b8492c1b
--- /dev/null
+++ b/packages/test/src/run/index.ts
@@ -0,0 +1 @@
+export * from './run'
diff --git a/packages/test/src/run/run.test.ts b/packages/test/src/run/run.test.ts
new file mode 100644
index 00000000..9f62fcd2
--- /dev/null
+++ b/packages/test/src/run/run.test.ts
@@ -0,0 +1,129 @@
+import { Test, describe, given, it } from '@typed/test'
+import { createProxy, observe, periodic, scan, skip, take, tap, until } from '@motorcycle/stream'
+
+import { Stream } from '@motorcycle/types'
+import { run } from './run'
+
+export const test: Test = describe(`run`, [
+ given(`a Component and EffectfulComponent`, [
+ it(`returns { sinks: Sinks, sources: Sources, dispose: () => void, tick: (delay: Delay) => Promise }`, ({
+ equal,
+ ok,
+ }) => {
+ type Sources = {
+ foo: Stream
+ }
+
+ type Sinks = {
+ bar: Stream
+ }
+
+ function UI(): Sinks {
+ const bar = scan(x => x + 1, 0, skip(1, periodic(10)))
+
+ return { bar }
+ }
+
+ function Application(): Sources {
+ const foo = scan(x => x + 1, 0, skip(1, periodic(10)))
+
+ return { foo }
+ }
+
+ const { sinks, sources, dispose, tick } = run(UI, Application)
+
+ equal('object', typeof sinks)
+ ok(isStream(sinks.bar))
+ equal('object', typeof sources)
+ ok(isStream(sources.foo))
+ equal('function', typeof dispose)
+
+ return tick(500).then(dispose)
+ }),
+
+ it(`replicates events from Sinks to Sources`, ({ equal }, done) => {
+ type Sources = {
+ foo: Stream
+ }
+
+ type Sinks = {
+ bar: Stream
+ }
+
+ function UI(sources: Sources): Sinks {
+ const { foo } = sources
+
+ collectEvents(take(3, foo)).then(equal([0, 1, 2])).then(() => done()).catch(done)
+
+ const bar = scan(x => x + 5, 0, skip(1, periodic(10)))
+
+ return { bar }
+ }
+
+ function Application(): Sources {
+ const foo = scan(x => x + 1, 0, skip(1, periodic(1)))
+
+ return { foo }
+ }
+
+ run(UI, Application)
+ }),
+
+ describe(`dispose`, [
+ it(`stops sinks from emitting`, ({ equal }) => {
+ const test = scan(x => x + 1, 0, skip(1, periodic(100)))
+
+ const UI = () => ({ test })
+
+ const Application = () => ({})
+
+ const { sinks, dispose } = run(UI, Application)
+
+ setTimeout(() => dispose(), 250)
+
+ return collectEvents(sinks.test).then(equal([0, 1, 2]))
+ }),
+
+ it(`stops Sources with .dispose method from emitting`, ({ equal }) => {
+ const test = scan(x => x + 1, 0, skip(1, periodic(100)))
+
+ type Sources = {
+ readonly test: {
+ readonly foo: Stream
+ dispose(): void
+ }
+ }
+
+ const UI = () => ({ test })
+
+ const { stream: endSignal } = createProxy()
+ const foo = until(endSignal, test)
+
+ const Application = () => ({
+ test: {
+ foo,
+ dispose: () => endSignal.event(0, void 0),
+ },
+ })
+
+ const { sources, dispose } = run(UI, Application)
+
+ return collectEvents(
+ tap(n => {
+ if (n === 3) dispose()
+ }, sources.test.foo)
+ ).then(equal([0, 1, 2, 3]))
+ }),
+ ]),
+ ]),
+])
+
+function collectEvents(stream: Stream): Promise> {
+ const events: Array = []
+
+ return observe(x => events.push(x), stream).then(() => events)
+}
+
+function isStream(x: any): x is Stream {
+ return x && typeof x.run === 'function'
+}
diff --git a/packages/test/src/run/run.ts b/packages/test/src/run/run.ts
new file mode 100644
index 00000000..e8a4106c
--- /dev/null
+++ b/packages/test/src/run/run.ts
@@ -0,0 +1,67 @@
+import { Component, EffectfulComponent, Stream } from '@motorcycle/types'
+import { ProxyStream, createProxy, scheduler } from '@motorcycle/stream'
+import { createDisposableSinks, createProxySinks, replicateSinks } from './sinks'
+
+import { disposeSources } from './disposeSources'
+
+/**
+ * This is nearly identical to the `run` found inside of `@motorcycle/run`. The
+ * only difference is that it makes use of the test scheduler to create the
+ * application's event loop. An additional property is returned with the `tick`
+ * that allows you to control how time progresses.
+ *
+ * @name run(UI: Component, Application: EffectfulComponent)
+ * @example
+ * import { run } from '@motorcycle/test'
+ * import { makeDomComponent, div, button, h2, query, clickEvent } from '@motorcycle/dom'
+ *
+ * function UI(sources) {
+ * const { dom } = sources
+ *
+ * const click$ = clickEvent(query('button', dom))
+ *
+ * const count$ = scan(x => x + 1, click$)
+ *
+ * const view$ = map(view, count$)
+ *
+ * return { view$ }
+ * }
+ *
+ * function view(count: number) {
+ * return div([
+ * h2(`Clicked ${count} times`),
+ * button('Click Me'),
+ * ])
+ * }
+ *
+ * const Dom = fakeDomComponent({
+ * 'button': {
+ * click: now(fakeEvent())
+ * }
+ * })
+ *
+ * const { tick, dispose } = run(UI, Dom)
+ *
+ * tick(500).then(dispose)
+ */
+export function run<
+ Sources extends Readonly>,
+ Sinks extends Readonly>>
+>(UI: Component, Application: EffectfulComponent) {
+ const { stream: endSignal } = createProxy()
+
+ const sinkProxies = {} as Record>
+ const proxySinks: Sinks = createProxySinks(sinkProxies, endSignal)
+ const sources: Sources = Application(proxySinks)
+ const sinks: Sinks = createDisposableSinks(UI(sources), endSignal)
+
+ const { disposable, tick } = replicateSinks(sinks, sinkProxies)
+
+ function dispose() {
+ endSignal.event(scheduler.now(), void 0)
+ disposable.dispose()
+ disposeSources(sources)
+ }
+
+ return { sinks, sources, dispose, tick }
+}
diff --git a/packages/test/src/run/sinks/createDisposableSinks.test.ts b/packages/test/src/run/sinks/createDisposableSinks.test.ts
new file mode 100644
index 00000000..e780f0fa
--- /dev/null
+++ b/packages/test/src/run/sinks/createDisposableSinks.test.ts
@@ -0,0 +1,20 @@
+import { Test, describe, given, it } from '@typed/test'
+import { at, drain, periodic } from '@motorcycle/stream'
+
+import { createDisposableSinks } from './createDisposableSinks'
+
+export const test: Test = describe(``, [
+ given(`Sinks and EndSignal`, [
+ it(`disposes Sinks when EndSignal emits`, ({ ok }) => {
+ const sinks = {
+ test: periodic(100),
+ }
+
+ const endSignal = at(10, void 0)
+
+ const { test } = createDisposableSinks(sinks, endSignal)
+
+ return drain(test).then(() => ok(true))
+ }),
+ ]),
+])
diff --git a/packages/test/src/run/sinks/createDisposableSinks.ts b/packages/test/src/run/sinks/createDisposableSinks.ts
new file mode 100644
index 00000000..e287e56f
--- /dev/null
+++ b/packages/test/src/run/sinks/createDisposableSinks.ts
@@ -0,0 +1,18 @@
+import { Stream } from '@motorcycle/types'
+import { until } from '@motorcycle/stream'
+
+export function createDisposableSinks>>>(
+ sinks: Sinks,
+ endSignal: Stream
+): Sinks {
+ return Object.keys(sinks).reduce(function createDisposableSink(
+ disposableSinks: Sinks,
+ sinkName: keyof Sinks
+ ): Sinks {
+ const disposableSink = until(endSignal, sinks[sinkName])
+
+ disposableSinks[sinkName] = disposableSink
+
+ return disposableSinks
+ }, {} as Sinks)
+}
diff --git a/packages/test/src/run/sinks/createProxySinks.test.ts b/packages/test/src/run/sinks/createProxySinks.test.ts
new file mode 100644
index 00000000..ae598db6
--- /dev/null
+++ b/packages/test/src/run/sinks/createProxySinks.test.ts
@@ -0,0 +1,33 @@
+import { Test, describe, given, it } from '@typed/test'
+import { createProxy, drain } from '@motorcycle/stream'
+
+import { Stream } from '@motorcycle/types'
+import { createProxySinks } from './createProxySinks'
+
+export const test: Test = describe(`createProxySinks`, [
+ it(`is a function`, ({ equal }) => {
+ equal('function', typeof createProxySinks)
+ }),
+
+ given(`Sinks and ProxyStream`, [
+ it(`returns an object`, ({ equal }) => {
+ equal('object', typeof createProxySinks({}, createProxy().stream))
+ }),
+
+ it(`returns a collection of streams`, ({ equal }) => {
+ const sinks = createProxySinks<{ dom: Stream }>({} as any, createProxy().stream)
+
+ equal('function', typeof sinks.dom.run)
+ }),
+
+ it(`ends the returned streams when ProxyStream emits`, ({ ok }) => {
+ const { stream: endSignal } = createProxy()
+
+ const sinks = createProxySinks({}, endSignal)
+
+ setTimeout(() => endSignal.event(Date.now(), void 0))
+
+ return drain(sinks.foo).then(() => ok(true))
+ }),
+ ]),
+])
diff --git a/packages/test/src/run/sinks/createProxySinks.ts b/packages/test/src/run/sinks/createProxySinks.ts
new file mode 100644
index 00000000..f273029a
--- /dev/null
+++ b/packages/test/src/run/sinks/createProxySinks.ts
@@ -0,0 +1,20 @@
+import { ProxyStream, createProxy, until } from '@motorcycle/stream'
+
+import { Stream } from '@motorcycle/types'
+
+export function createProxySinks }>(
+ sinks: Record>,
+ endSignal: Stream
+): Sinks {
+ return new Proxy(sinks, {
+ get(target: Sinks, property: keyof Sinks) {
+ if (!target[property]) {
+ const { stream } = createProxy()
+
+ target[property] = stream
+ }
+
+ return until(endSignal, target[property])
+ },
+ })
+}
diff --git a/packages/test/src/run/sinks/index.ts b/packages/test/src/run/sinks/index.ts
new file mode 100644
index 00000000..bd700ec7
--- /dev/null
+++ b/packages/test/src/run/sinks/index.ts
@@ -0,0 +1,3 @@
+export * from './createDisposableSinks'
+export * from './createProxySinks'
+export * from './replicateSinks'
diff --git a/packages/test/src/run/sinks/replicateSinks.test.ts b/packages/test/src/run/sinks/replicateSinks.test.ts
new file mode 100644
index 00000000..38d29d69
--- /dev/null
+++ b/packages/test/src/run/sinks/replicateSinks.test.ts
@@ -0,0 +1,38 @@
+import { Test, describe, given, it } from '@typed/test'
+import { createProxy, now, observe } from '@motorcycle/stream'
+
+import { replicateSinks } from './replicateSinks'
+
+export const test: Test = describe(`replicateSinks`, [
+ given(`Sink and Proxy Sinks`, [
+ it(`replicates events from Sinks to ProxySinks`, ({ equal }) => {
+ const sinks = {
+ other: now(2),
+ }
+
+ const sinkProxies = {
+ other: createProxy().stream,
+ }
+
+ const { tick } = replicateSinks(sinks, sinkProxies)
+
+ tick(500)
+
+ return observe(equal(2), sinkProxies.other)
+ }),
+
+ it(`returns a disposable`, ({ equal }) => {
+ const sinks = {
+ other: now(2),
+ }
+
+ const sinkProxies = {
+ other: createProxy().stream,
+ }
+
+ const { disposable } = replicateSinks(sinks, sinkProxies)
+
+ equal('function', typeof disposable.dispose)
+ }),
+ ]),
+])
diff --git a/packages/test/src/run/sinks/replicateSinks.ts b/packages/test/src/run/sinks/replicateSinks.ts
new file mode 100644
index 00000000..e5570400
--- /dev/null
+++ b/packages/test/src/run/sinks/replicateSinks.ts
@@ -0,0 +1,33 @@
+import { Disposable, Stream } from '@motorcycle/types'
+
+import { ProxyStream } from '@motorcycle/stream'
+import { createTestScheduler } from '../../createTestScheduler'
+import { disposeAll } from '@most/disposable'
+
+export function replicateSinks>>>(
+ sinks: Sinks,
+ sinkProxies: Record>
+) {
+ const sinkNames = Object.keys(sinks).filter(name => !!sinkProxies[name])
+
+ const { tick, scheduler } = createTestScheduler()
+
+ function replicateSink(name: keyof Sinks): Disposable {
+ return sinks[name].run(sinkProxies[name], scheduler)
+ }
+
+ function disposeSinkProxy(name: keyof Sinks) {
+ sinkProxies[name].end(scheduler.now())
+ }
+
+ const disposables = sinkNames.map(replicateSink)
+
+ function dispose() {
+ disposeAll(disposables).dispose()
+ Object.keys(sinkProxies).forEach(disposeSinkProxy)
+ }
+
+ const disposable = { dispose }
+
+ return { disposable, tick }
+}
diff --git a/packages/test/yarn.lock b/packages/test/yarn.lock
new file mode 100644
index 00000000..43887aea
--- /dev/null
+++ b/packages/test/yarn.lock
@@ -0,0 +1,54 @@
+# THIS IS AN AUTOGENERATED FILE. DO NOT EDIT THIS FILE DIRECTLY.
+# yarn lockfile v1
+
+
+"167@0.36.0":
+ version "0.36.0"
+ resolved "https://registry.yarnpkg.com/167/-/167-0.36.0.tgz#040850b3b60fda4fe1b6305676bd42dc0ffe3dae"
+
+"@most/core@0.11.2":
+ version "0.11.2"
+ resolved "https://registry.yarnpkg.com/@most/core/-/core-0.11.2.tgz#6e3615fb4fc5a6a32cd35acb10a513a0e56df150"
+ dependencies:
+ "@most/disposable" "^0.11.0"
+ "@most/prelude" "^1.6.2"
+ "@most/scheduler" "^0.11.0"
+ "@most/types" "^0.10.0"
+
+"@most/disposable@0.11.0", "@most/disposable@^0.11.0":
+ version "0.11.0"
+ resolved "https://registry.yarnpkg.com/@most/disposable/-/disposable-0.11.0.tgz#ad9d0d160d932a6881785f363083a44d8400cef5"
+ dependencies:
+ "@most/prelude" "^1.6.2"
+ "@most/types" "^0.10.0"
+
+"@most/prelude@^1.6.2":
+ version "1.6.2"
+ resolved "https://registry.yarnpkg.com/@most/prelude/-/prelude-1.6.2.tgz#e022db0a3522ea45a427f739570b99f6a6b49162"
+
+"@most/scheduler@0.11.0", "@most/scheduler@^0.11.0":
+ version "0.11.0"
+ resolved "https://registry.yarnpkg.com/@most/scheduler/-/scheduler-0.11.0.tgz#0f67a0f097c9358e4308931e206c03a30f87faf4"
+ dependencies:
+ "@most/prelude" "^1.6.2"
+ "@most/types" "^0.10.0"
+
+"@most/types@^0.10.0":
+ version "0.10.0"
+ resolved "https://registry.yarnpkg.com/@most/types/-/types-0.10.0.tgz#5a60b581915349b210a13fed1563e9d06cba287f"
+
+"@motorcycle/stream@1.1.0":
+ version "1.1.0"
+ resolved "https://registry.yarnpkg.com/@motorcycle/stream/-/stream-1.1.0.tgz#ff2535f76b8c57716bc5c2c6d7d857210b0dfd78"
+ dependencies:
+ "167" "0.36.0"
+ "@most/core" "0.11.2"
+ "@most/disposable" "0.11.0"
+ "@most/scheduler" "0.11.0"
+ "@motorcycle/types" "1.1.0"
+
+"@motorcycle/types@1.1.0":
+ version "1.1.0"
+ resolved "https://registry.yarnpkg.com/@motorcycle/types/-/types-1.1.0.tgz#fa69b8876276b26f3adbe88793e5ff942e0aed2f"
+ dependencies:
+ "@most/types" "^0.10.0"