Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(concurrentConcatAll): Add custom operator for managing multiple concurrent inner subscriptions but maintaining the output order #26

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 46 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ Simple API for using [web workers](https://developer.mozilla.org/en-US/docs/Web/
- [Worker Pool strategy](#worker-pool-strategy) - maximise the throughput of units of work by utilising all cores on the host machine

## Demo

https://cloudnc.github.io/observable-webworker

## Tutorial

https://dev.to/zakhenry/observable-webworkers-with-angular-8-4k6

## Install
Expand Down Expand Up @@ -89,9 +91,10 @@ export class HelloWorker implements DoWork<string, string> {
```

##### Important Note
You **must** export your worker class (`export class ...`) from the file if you're using a minifier. If you don't, your
class will be removed from the bundle, causing your worker to do nothing!


You **must** export your worker class (`export class ...`) from the file if you're using a minifier. If you don't, your
class will be removed from the bundle, causing your worker to do nothing!

You'll probably need to export the class anyway as you are unit testing it right?!

##### Don't like decorators? Don't use 'em!
Expand Down Expand Up @@ -157,28 +160,32 @@ worker thread, as the return type `string` is not `Transferable`.

## Worker Pool Strategy

If you have a large amount of work that needs to be done, you can use the `fromWorkerPool` function to automatically
If you have a large amount of work that needs to be done, you can use the `fromWorkerPool` function to automatically
manage a pool of workers to allow true concurrency of work, distributed evenly across all available cores.

The worker pool strategy has the following features
* Work can be provided as either `Observable`, `Array`, or `Iterable`
* Concurrency is limited to `navigation.hardwareConcurrency - 1` to keep the main core free.
* This is a configurable option if you know you already have other workers running
* Workers are only created when there is need for them (work is available)
* Workers are terminated when there is no more work, freeing up threads for other processes
* for `Observable`, work is considered remaining while the observable is not completed
* for `Array`, work remains while there are items in the array
* for `Iterable`, work remains while the iterator is not `result.done`
* Workers are kept running while work remains, preventing unnecessary downloading of the worker script
* Custom observable flattening operator can be passed, allowing for custom behaviour such as correlating the output
order with input order
* default operator is `mergeAll()`, which means the output from the webworker(s) is output as soon as available



- Work can be provided as either `Observable`, `Array`, or `Iterable`
- Concurrency is limited to `navigation.hardwareConcurrency - 1` to keep the main core free.
- This is a configurable option if you know you already have other workers running
- Workers are only created when there is need for them (work is available)
- Workers are terminated when there is no more work, freeing up threads for other processes
- for `Observable`, work is considered remaining while the observable is not completed
- for `Array`, work remains while there are items in the array
- for `Iterable`, work remains while the iterator is not `result.done`
- Workers are kept running while work remains, preventing unnecessary downloading of the worker script
- Custom observable flattening operator can be passed, allowing for custom behaviour such as correlating the output
order with input order
- default operator is `mergeAll()`, which means the output from the webworker(s) is output as soon as available
- observable worker exports a custom operator `concurrentConcatAll()` which manages buffers internally to run the
workers in parallel, but output the results in the same order as the input went in. See [example](#custom-operator)
- Avoid using the built in RxJS operator `concatAll()` as it does not allow for concurrency, meaning while your
output would be in the expected order, only one worker would be used.

### Example

In this simple example, we have a function that receives an array of files and returns an observable of the SHA-256 hex
hashes of those files. For simplicity we're passing the primitives back and forth, however in reality you are likely to
hashes of those files. For simplicity we're passing the primitives back and forth, however in reality you are likely to
want to construct your own interface to define the messages being passed to and from the worker.

#### Main Thread
Expand Down Expand Up @@ -244,5 +251,24 @@ export class WorkerPoolHashWorker implements DoWorkUnit<File, string> {
Note here that the worker class `implements DoWorkUnit<File, string>`. This is different to before where we implemented
`DoWork` which had the slightly more complex signature of inputting an observable and outputting one.

If using the `fromWorkerPool` strategy, you must only implement `DoWorkUnit` as it relies on the completion of the
If using the `fromWorkerPool` strategy, you must only implement `DoWorkUnit` as it relies on the completion of the
returned observable to indicate that the unit of work is finished processing.

### Custom Operator Example

If you want the output from `fromWorkerPool` to be in the same order of the input stream/array/iterator you may pass the
custom operator `concurrentConcatAll()` to the options of `fromWorkerPool`:

```ts
// src/readme/concurrent-concat-all.main.ts

import { Observable } from 'rxjs';
import { fromWorkerPool, concurrentConcatAll } from 'observable-webworker';

export function computeHashes(files: File[]): Observable<string> {
return fromWorkerPool<File, string>(() => new Worker('./transferable.worker', { type: 'module' }), files, {
flattenOperator: concurrentConcatAll(), // <-- add this
});
}

```
118 changes: 118 additions & 0 deletions projects/observable-webworker/src/lib/concurrent-concat-all.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import { mergeAll } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { concurrentConcatAll } from './concurrent-concat-all';

const createScheduler = () => {
return new TestScheduler((actual, expected) => {
// asserting the two objects are equal
// e.g. using chai.
expect(actual).toEqual(expected);
});
};

// Don't reformat the carefully spaced marble diagrams
// prettier-ignore
fdescribe('concurrentConcatAll', () => {
it('generate the stream correctly', () => {
const testScheduler = createScheduler();
testScheduler.run(helpers => {
const { cold, hot, expectObservable } = helpers;
const x = cold( '--a---b--c---d--| ');
const y = cold( '----e---f--g---|');
const e1 = hot( '--x------y-------| ', { x, y });
const expected = '----a---b--c-e-d-f--g---|';

expectObservable(e1.pipe(mergeAll())).toBe(expected);
});
});

it('concatenates output of inner observables', () => {

const testScheduler = createScheduler();

testScheduler.run(helpers => {
const { cold, hot, expectObservable } = helpers;
const x = cold( '--a---b--c---d------|');
const y = cold( '----e---f--g---|');
const e1 = hot( '--x---------------y--------------|', { x, y });
const expected = '----a---b--c---d------e---f--g---|';

expectObservable(e1.pipe(concurrentConcatAll())).toBe(expected);
});
});

it('appends inner observables that completed first but sequenced after', () => {

const testScheduler = createScheduler();

testScheduler.run(helpers => {
const { cold, hot, expectObservable } = helpers;
const x = cold( '--a---b--c---d--|');
const y = cold( '----e---f--g---|');
const e1 = hot( '--xy--------------------|', { x, y });
const expected = '----a---b--c---d--(efg)-|';

expectObservable(e1.pipe(concurrentConcatAll())).toBe(expected);
});
});

it('appends inner observables overlap other observables', () => {

const testScheduler = createScheduler();

testScheduler.run(helpers => {
const { cold, hot, expectObservable } = helpers;
const x = cold( '--a---b--c---d--|');
const y = cold( '----e---f--------g---|');
const e1 = hot( '--x------y--------------------|', { x, y });
const expected = '----a---b--c---d--(ef)----g---|';

expectObservable(e1.pipe(concurrentConcatAll())).toBe(expected);
});
});

it('buffers completed inner observables', () => {

const testScheduler = createScheduler();

testScheduler.run(helpers => {
const { cold, hot, expectObservable } = helpers;
const x = cold( '----------a---b--c---d--|');
const y = cold( '-e--f--g-|');
const e1 = hot( '--xy----------------------------|', { x, y });
const expected = '------------a---b--c---d--(efg)-|';

expectObservable(e1.pipe(concurrentConcatAll())).toBe(expected);
});
});

it('passes on errors asap from inner observables', () => {

const testScheduler = createScheduler();

testScheduler.run(helpers => {
const { cold, hot, expectObservable } = helpers;
const x = cold( '------------a---b--c---d--|');
const y = cold( '-e--f--g-#');
const e1 = hot( '--xy----------------------------|', { x, y });
const expected = '------------#';

expectObservable(e1.pipe(concurrentConcatAll())).toBe(expected);
});
});

it('passes on errors asap from outer observables', () => {

const testScheduler = createScheduler();

testScheduler.run(helpers => {
const { cold, hot, expectObservable } = helpers;
const x = cold( '------------a---b--c---d--|');
const y = cold( '-e--f--g-#');
const e1 = hot( '--xy----#------------------------|', { x, y });
const expected = '--------#';

expectObservable(e1.pipe(concurrentConcatAll())).toBe(expected);
});
});
});
58 changes: 58 additions & 0 deletions projects/observable-webworker/src/lib/concurrent-concat-all.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { NEVER, Observable } from 'rxjs';
import { catchError, finalize, map, mergeMap } from 'rxjs/operators';

function concurrentConcatAllOperator<T>(input$: Observable<Observable<T>>): Observable<T> {
const buffer: Map<number, T[]> = new Map();
let completedBuffers: number[] = [];
let currentIndex = 0;

return new Observable<T>(observer => {
const sub = input$
.pipe(
mergeMap((inner$: Observable<T>, index: number) => {
return inner$.pipe(
map(v => {
if (currentIndex === index) {
observer.next(v);
} else {
if (!buffer.has(index)) {
buffer.set(index, []);
}
buffer.get(index).push(v);
}
}),
finalize(() => {
if (currentIndex === index) {
completedBuffers.sort().forEach(bufferIndex => {
buffer.get(bufferIndex).forEach(v => observer.next(v));
buffer.delete(bufferIndex);
});

currentIndex = completedBuffers.length ? completedBuffers.pop() + 1 : currentIndex + 1;
if (buffer.has(currentIndex)) {
buffer.get(currentIndex).forEach(v => observer.next(v));
}
completedBuffers = [];
} else {
completedBuffers.push(index);
}
}),
catchError(e => {
observer.error(e);
return NEVER;
}),
);
}),
)
.subscribe({
error: e => observer.error(e),
complete: () => observer.complete(),
});

return () => sub.unsubscribe();
});
}

export function concurrentConcatAll<T>(): (input$: Observable<Observable<T>>) => Observable<T> {
return concurrentConcatAllOperator;
}
2 changes: 2 additions & 0 deletions projects/observable-webworker/src/public-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ export * from './lib/observable-worker.decorator';
export * from './lib/run-worker';
export * from './lib/from-worker';
export * from './lib/from-worker-pool';
// @todo move down to a 'observable-worker/helper' sub module to ensure it is not always bundled
export * from './lib/concurrent-concat-all';
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to happen before merging

8 changes: 8 additions & 0 deletions src/readme/concurrent-concat-all.main.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { Observable } from 'rxjs';
import { fromWorkerPool, concurrentConcatAll } from 'observable-webworker';

export function computeHashes(files: File[]): Observable<string> {
return fromWorkerPool<File, string>(() => new Worker('./transferable.worker', { type: 'module' }), files, {
flattenOperator: concurrentConcatAll(), // <-- add this
});
}