From 641af4f835cfa75553aaa6fba4a20a9f9992c784 Mon Sep 17 00:00:00 2001 From: Zak Henry Date: Mon, 12 Aug 2019 01:22:58 +0100 Subject: [PATCH] feat(concurrentConcatAll): Add custom operator for managing multiple concurrent inner subscriptions but maintaining the output order --- README.md | 66 +++++++--- .../src/lib/concurrent-concat-all.spec.ts | 118 ++++++++++++++++++ .../src/lib/concurrent-concat-all.ts | 58 +++++++++ .../observable-webworker/src/public-api.ts | 2 + src/readme/concurrent-concat-all.main.ts | 8 ++ 5 files changed, 232 insertions(+), 20 deletions(-) create mode 100644 projects/observable-webworker/src/lib/concurrent-concat-all.spec.ts create mode 100644 projects/observable-webworker/src/lib/concurrent-concat-all.ts create mode 100644 src/readme/concurrent-concat-all.main.ts diff --git a/README.md b/README.md index eb767eb..1365fcd 100644 --- a/README.md +++ b/README.md @@ -29,9 +29,11 @@ Simple API for using [web workers](https://developer.mozilla.org/en-US/docs/Web/ - [Worker Pool strategy](#worker-pool-strategy) - maximise the throughput of units of work by utilising all cores on the host machine ## Demo + https://cloudnc.github.io/observable-webworker ## Tutorial + https://dev.to/zakhenry/observable-webworkers-with-angular-8-4k6 ## Install @@ -89,9 +91,10 @@ export class HelloWorker implements DoWork { ``` ##### Important Note -You **must** export your worker class (`export class ...`) from the file if you're using a minifier. If you don't, your -class will be removed from the bundle, causing your worker to do nothing! - + +You **must** export your worker class (`export class ...`) from the file if you're using a minifier. If you don't, your +class will be removed from the bundle, causing your worker to do nothing! + You'll probably need to export the class anyway as you are unit testing it right?! ##### Don't like decorators? Don't use 'em! @@ -157,28 +160,32 @@ worker thread, as the return type `string` is not `Transferable`. ## Worker Pool Strategy -If you have a large amount of work that needs to be done, you can use the `fromWorkerPool` function to automatically +If you have a large amount of work that needs to be done, you can use the `fromWorkerPool` function to automatically manage a pool of workers to allow true concurrency of work, distributed evenly across all available cores. The worker pool strategy has the following features -* Work can be provided as either `Observable`, `Array`, or `Iterable` -* Concurrency is limited to `navigation.hardwareConcurrency - 1` to keep the main core free. - * This is a configurable option if you know you already have other workers running -* Workers are only created when there is need for them (work is available) -* Workers are terminated when there is no more work, freeing up threads for other processes - * for `Observable`, work is considered remaining while the observable is not completed - * for `Array`, work remains while there are items in the array - * for `Iterable`, work remains while the iterator is not `result.done` -* Workers are kept running while work remains, preventing unnecessary downloading of the worker script -* Custom observable flattening operator can be passed, allowing for custom behaviour such as correlating the output -order with input order - * default operator is `mergeAll()`, which means the output from the webworker(s) is output as soon as available - - + +- Work can be provided as either `Observable`, `Array`, or `Iterable` +- Concurrency is limited to `navigation.hardwareConcurrency - 1` to keep the main core free. + - This is a configurable option if you know you already have other workers running +- Workers are only created when there is need for them (work is available) +- Workers are terminated when there is no more work, freeing up threads for other processes + - for `Observable`, work is considered remaining while the observable is not completed + - for `Array`, work remains while there are items in the array + - for `Iterable`, work remains while the iterator is not `result.done` +- Workers are kept running while work remains, preventing unnecessary downloading of the worker script +- Custom observable flattening operator can be passed, allowing for custom behaviour such as correlating the output + order with input order + - default operator is `mergeAll()`, which means the output from the webworker(s) is output as soon as available + - observable worker exports a custom operator `concurrentConcatAll()` which manages buffers internally to run the + workers in parallel, but output the results in the same order as the input went in. See [example](#custom-operator) + - Avoid using the built in RxJS operator `concatAll()` as it does not allow for concurrency, meaning while your + output would be in the expected order, only one worker would be used. + ### Example In this simple example, we have a function that receives an array of files and returns an observable of the SHA-256 hex -hashes of those files. For simplicity we're passing the primitives back and forth, however in reality you are likely to +hashes of those files. For simplicity we're passing the primitives back and forth, however in reality you are likely to want to construct your own interface to define the messages being passed to and from the worker. #### Main Thread @@ -244,5 +251,24 @@ export class WorkerPoolHashWorker implements DoWorkUnit { Note here that the worker class `implements DoWorkUnit`. This is different to before where we implemented `DoWork` which had the slightly more complex signature of inputting an observable and outputting one. -If using the `fromWorkerPool` strategy, you must only implement `DoWorkUnit` as it relies on the completion of the +If using the `fromWorkerPool` strategy, you must only implement `DoWorkUnit` as it relies on the completion of the returned observable to indicate that the unit of work is finished processing. + +### Custom Operator Example + +If you want the output from `fromWorkerPool` to be in the same order of the input stream/array/iterator you may pass the +custom operator `concurrentConcatAll()` to the options of `fromWorkerPool`: + +```ts +// src/readme/concurrent-concat-all.main.ts + +import { Observable } from 'rxjs'; +import { fromWorkerPool, concurrentConcatAll } from 'observable-webworker'; + +export function computeHashes(files: File[]): Observable { + return fromWorkerPool(() => new Worker('./transferable.worker', { type: 'module' }), files, { + flattenOperator: concurrentConcatAll(), // <-- add this + }); +} + +``` diff --git a/projects/observable-webworker/src/lib/concurrent-concat-all.spec.ts b/projects/observable-webworker/src/lib/concurrent-concat-all.spec.ts new file mode 100644 index 0000000..7f06591 --- /dev/null +++ b/projects/observable-webworker/src/lib/concurrent-concat-all.spec.ts @@ -0,0 +1,118 @@ +import { mergeAll } from 'rxjs/operators'; +import { TestScheduler } from 'rxjs/testing'; +import { concurrentConcatAll } from './concurrent-concat-all'; + +const createScheduler = () => { + return new TestScheduler((actual, expected) => { + // asserting the two objects are equal + // e.g. using chai. + expect(actual).toEqual(expected); + }); +}; + +// Don't reformat the carefully spaced marble diagrams +// prettier-ignore +fdescribe('concurrentConcatAll', () => { + it('generate the stream correctly', () => { + const testScheduler = createScheduler(); + testScheduler.run(helpers => { + const { cold, hot, expectObservable } = helpers; + const x = cold( '--a---b--c---d--| '); + const y = cold( '----e---f--g---|'); + const e1 = hot( '--x------y-------| ', { x, y }); + const expected = '----a---b--c-e-d-f--g---|'; + + expectObservable(e1.pipe(mergeAll())).toBe(expected); + }); + }); + + it('concatenates output of inner observables', () => { + + const testScheduler = createScheduler(); + + testScheduler.run(helpers => { + const { cold, hot, expectObservable } = helpers; + const x = cold( '--a---b--c---d------|'); + const y = cold( '----e---f--g---|'); + const e1 = hot( '--x---------------y--------------|', { x, y }); + const expected = '----a---b--c---d------e---f--g---|'; + + expectObservable(e1.pipe(concurrentConcatAll())).toBe(expected); + }); + }); + + it('appends inner observables that completed first but sequenced after', () => { + + const testScheduler = createScheduler(); + + testScheduler.run(helpers => { + const { cold, hot, expectObservable } = helpers; + const x = cold( '--a---b--c---d--|'); + const y = cold( '----e---f--g---|'); + const e1 = hot( '--xy--------------------|', { x, y }); + const expected = '----a---b--c---d--(efg)-|'; + + expectObservable(e1.pipe(concurrentConcatAll())).toBe(expected); + }); + }); + + it('appends inner observables overlap other observables', () => { + + const testScheduler = createScheduler(); + + testScheduler.run(helpers => { + const { cold, hot, expectObservable } = helpers; + const x = cold( '--a---b--c---d--|'); + const y = cold( '----e---f--------g---|'); + const e1 = hot( '--x------y--------------------|', { x, y }); + const expected = '----a---b--c---d--(ef)----g---|'; + + expectObservable(e1.pipe(concurrentConcatAll())).toBe(expected); + }); + }); + + it('buffers completed inner observables', () => { + + const testScheduler = createScheduler(); + + testScheduler.run(helpers => { + const { cold, hot, expectObservable } = helpers; + const x = cold( '----------a---b--c---d--|'); + const y = cold( '-e--f--g-|'); + const e1 = hot( '--xy----------------------------|', { x, y }); + const expected = '------------a---b--c---d--(efg)-|'; + + expectObservable(e1.pipe(concurrentConcatAll())).toBe(expected); + }); + }); + + it('passes on errors asap from inner observables', () => { + + const testScheduler = createScheduler(); + + testScheduler.run(helpers => { + const { cold, hot, expectObservable } = helpers; + const x = cold( '------------a---b--c---d--|'); + const y = cold( '-e--f--g-#'); + const e1 = hot( '--xy----------------------------|', { x, y }); + const expected = '------------#'; + + expectObservable(e1.pipe(concurrentConcatAll())).toBe(expected); + }); + }); + + it('passes on errors asap from outer observables', () => { + + const testScheduler = createScheduler(); + + testScheduler.run(helpers => { + const { cold, hot, expectObservable } = helpers; + const x = cold( '------------a---b--c---d--|'); + const y = cold( '-e--f--g-#'); + const e1 = hot( '--xy----#------------------------|', { x, y }); + const expected = '--------#'; + + expectObservable(e1.pipe(concurrentConcatAll())).toBe(expected); + }); + }); +}); diff --git a/projects/observable-webworker/src/lib/concurrent-concat-all.ts b/projects/observable-webworker/src/lib/concurrent-concat-all.ts new file mode 100644 index 0000000..55bec9d --- /dev/null +++ b/projects/observable-webworker/src/lib/concurrent-concat-all.ts @@ -0,0 +1,58 @@ +import { NEVER, Observable } from 'rxjs'; +import { catchError, finalize, map, mergeMap } from 'rxjs/operators'; + +function concurrentConcatAllOperator(input$: Observable>): Observable { + const buffer: Map = new Map(); + let completedBuffers: number[] = []; + let currentIndex = 0; + + return new Observable(observer => { + const sub = input$ + .pipe( + mergeMap((inner$: Observable, index: number) => { + return inner$.pipe( + map(v => { + if (currentIndex === index) { + observer.next(v); + } else { + if (!buffer.has(index)) { + buffer.set(index, []); + } + buffer.get(index).push(v); + } + }), + finalize(() => { + if (currentIndex === index) { + completedBuffers.sort().forEach(bufferIndex => { + buffer.get(bufferIndex).forEach(v => observer.next(v)); + buffer.delete(bufferIndex); + }); + + currentIndex = completedBuffers.length ? completedBuffers.pop() + 1 : currentIndex + 1; + if (buffer.has(currentIndex)) { + buffer.get(currentIndex).forEach(v => observer.next(v)); + } + completedBuffers = []; + } else { + completedBuffers.push(index); + } + }), + catchError(e => { + observer.error(e); + return NEVER; + }), + ); + }), + ) + .subscribe({ + error: e => observer.error(e), + complete: () => observer.complete(), + }); + + return () => sub.unsubscribe(); + }); +} + +export function concurrentConcatAll(): (input$: Observable>) => Observable { + return concurrentConcatAllOperator; +} diff --git a/projects/observable-webworker/src/public-api.ts b/projects/observable-webworker/src/public-api.ts index 64e4304..e9b5b90 100644 --- a/projects/observable-webworker/src/public-api.ts +++ b/projects/observable-webworker/src/public-api.ts @@ -7,3 +7,5 @@ export * from './lib/observable-worker.decorator'; export * from './lib/run-worker'; export * from './lib/from-worker'; export * from './lib/from-worker-pool'; +// @todo move down to a 'observable-worker/helper' sub module to ensure it is not always bundled +export * from './lib/concurrent-concat-all'; diff --git a/src/readme/concurrent-concat-all.main.ts b/src/readme/concurrent-concat-all.main.ts new file mode 100644 index 0000000..eed9189 --- /dev/null +++ b/src/readme/concurrent-concat-all.main.ts @@ -0,0 +1,8 @@ +import { Observable } from 'rxjs'; +import { fromWorkerPool, concurrentConcatAll } from 'observable-webworker'; + +export function computeHashes(files: File[]): Observable { + return fromWorkerPool(() => new Worker('./transferable.worker', { type: 'module' }), files, { + flattenOperator: concurrentConcatAll(), // <-- add this + }); +}