Skip to content

Commit

Permalink
Feat/timeline demo (#24)
Browse files Browse the repository at this point in the history
Feat/timeline demo
  • Loading branch information
zakhenry authored Aug 11, 2019
2 parents 97a4f61 + 1f85b08 commit fb497e6
Show file tree
Hide file tree
Showing 11 changed files with 277 additions and 38 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,5 @@ testem.log
# System Files
.DS_Store
Thumbs.db

test-files/*.txt
16 changes: 16 additions & 0 deletions generate-test-files.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/usr/bin/env bash

rm -rf test-files/*

COUNT=${1:-20}
MAXSIZE_MB=${2:-100}

for i in $(seq 1 $COUNT)
do
SIZE=$(( ( RANDOM % $MAXSIZE_MB ) + 1 ))

WORD1=$( shuf -n1 /usr/share/dict/words )
WORD2=$( shuf -n1 /usr/share/dict/words )

truncate -s "$SIZE"M "test-files/"$i"-$WORD1-$WORD2-$SIZE""MB.txt"
done
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
"@angular/platform-browser": "~8.0.0",
"@angular/platform-browser-dynamic": "~8.0.0",
"@angular/router": "~8.0.0",
"google-charts": "2.0.0",
"rxjs": "~6.4.0",
"tslib": "^1.9.0",
"zone.js": "~0.9.1"
Expand All @@ -54,6 +55,7 @@
"@angular/cli": "~8.0.2",
"@angular/compiler-cli": "~8.0.0",
"@angular/language-service": "~8.0.0",
"@types/google.visualization": "0.0.48",
"@types/jasmine": "~3.3.8",
"@types/jasminewd2": "~2.0.3",
"@types/node": "~8.9.4",
Expand Down
4 changes: 0 additions & 4 deletions src/app/app.component.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
import { Component } from '@angular/core';
import { Observable, of, Subject } from 'rxjs';
import { finalize, scan, switchMap, tap } from 'rxjs/operators';
import { fromWorker } from '../../projects/observable-webworker/src/lib/from-worker';
import { fromWorkerPool } from '../../projects/observable-webworker/src/lib/from-worker-pool';

@Component({
selector: 'app-root',
Expand Down
30 changes: 30 additions & 0 deletions src/app/google-charts.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/// <reference types="google.visualization" />
import { Injectable, Type } from '@angular/core';
import { Observable } from 'rxjs';
import { GoogleCharts } from 'google-charts';

export interface GoogleVis {
Timeline: Type<google.visualization.Timeline>;
DataTable: Type<google.visualization.DataTable>;
events: any;
}

@Injectable({
providedIn: 'root',
})
export class GoogleChartsService {
constructor() {}

public getVisualisation(...withPackages: string[]): Observable<GoogleVis> {
return new Observable(observer => {
// Load the charts library with a callback
GoogleCharts.load(() => {
GoogleCharts.api.charts.load('current', { packages: withPackages });
GoogleCharts.api.charts.setOnLoadCallback(() => {
observer.next(GoogleCharts.api.visualization);
observer.complete();
});
});
});
}
}
14 changes: 11 additions & 3 deletions src/app/multiple-worker-pool/multiple-worker-pool.component.html
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
<h2>Multiple Worker Pool</h2>
<h3 *ngIf="status$ | async as status">({{ status }})</h3>

Select multiple files to compute SHA-256 sum of, in pool of webworkers:
<p>Select multiple files of varying sizes to compute SHA-256 sum of, in pool of webworkers:</p>

<section>
<small>(No files are uploaded; they're kept entirely within your browser)</small>
</section>
<input type="file" multiple (change)="calculateSha256Multiple($event)" />

<h3 [attr.data]="workResult$ | async">Events:</h3>
<ol>
<h3 [attr.data]="chartObserver$ | async">Timeline</h3>
<div #timeline></div>

<h3>Events:</h3>
<ol [attr.data]="workResult$ | async">
<li *ngFor="let event of eventListPool$ | async">
<app-log-line *ngIf="filenames$ | async as filenames" [message]="event" [files]="filenames"></app-log-line>
</li>
Expand Down
189 changes: 175 additions & 14 deletions src/app/multiple-worker-pool/multiple-worker-pool.component.ts
Original file line number Diff line number Diff line change
@@ -1,37 +1,93 @@
import { Component } from '@angular/core';
import { Observable, ReplaySubject, Subject } from 'rxjs';
import { Component, ElementRef, ViewChild } from '@angular/core';
import {
concatAll,
animationFrameScheduler,
asyncScheduler,
combineLatest,
interval,
Observable,
ReplaySubject,
Subject,
} from 'rxjs';
import {
filter,
groupBy,
map,
mergeMap,
observeOn,
pairwise,
scan,
shareReplay,
startWith,
switchAll,
switchMap,
switchMapTo,
take,
takeUntil,
tap,
} from 'rxjs/operators';
import { fromWorkerPool } from '../../../projects/observable-webworker/src/lib/from-worker-pool';
import { ShaWorkerMessage, Thread } from '../sha-worker.types';
import { GoogleChartsService } from '../google-charts.service';
import { FileHashEvent, ShaWorkerMessage, Thread } from '../sha-worker.types';
import TimelineOptions = google.visualization.TimelineOptions;

@Component({
selector: 'app-multiple-worker-pool',
templateUrl: './multiple-worker-pool.component.html',
styleUrls: ['./multiple-worker-pool.component.scss'],
})
export class MultipleWorkerPoolComponent {
@ViewChild('timeline', { static: false, read: ElementRef }) private timelineComponent: ElementRef;

public multiFilesToHash: Subject<File[]> = new ReplaySubject(1);
public workResult$ = this.multiFilesToHash.pipe(switchMap(files => this.hashMultipleFiles(files)));
public workResult$ = this.multiFilesToHash.pipe(
observeOn(asyncScheduler),
switchMap(files => this.hashMultipleFiles(files)),
);

private filenames: string[];
public filenames$ = this.multiFilesToHash.pipe(
map(files => files.map(f => f.name)),
tap(names => (this.filenames = names)),
shareReplay(1),
);

public eventsPool$: Subject<ShaWorkerMessage> = new Subject();
public eventListPool$: Observable<ShaWorkerMessage[]> = this.eventsPool$.pipe(

public completedFiles$: Observable<string[]> = this.filenames$.pipe(
switchMap(() =>
this.eventsPool$.pipe(
groupBy(m => m.file),
mergeMap(fileMessage$ =>
fileMessage$.pipe(
filter(e => e.fileEventType === FileHashEvent.HASH_RECEIVED),
take(1),
),
),
map(message => message.file),
scan<string>((files, file) => [...files, file], []),
startWith([]),
),
),
);

public complete$: Observable<boolean> = combineLatest(this.filenames$, this.completedFiles$).pipe(
map(([files, completedFiles]) => files.length === completedFiles.length),
);

public status$: Observable<string> = this.complete$.pipe(
startWith(null),
map(isComplete => {
switch (isComplete) {
case null:
return 'Waiting for file selection';
case true:
return 'Completed';
case false:
return 'Processing files';
}
}),
);

public eventsTimedPool$: Observable<ShaWorkerMessage> = this.eventsPool$.pipe(
groupBy(m => m.file),
mergeMap(fileMessage$ => {
return fileMessage$.pipe(
Expand All @@ -45,40 +101,145 @@ export class MultipleWorkerPoolComponent {
}),
);
}),
);

public eventListPool$: Observable<ShaWorkerMessage[]> = this.eventsTimedPool$.pipe(
scan<ShaWorkerMessage>((list, event) => {
list.push(event);
return list;
}, []),
);

public chartObserver$ = combineLatest(this.filenames$, this.googleChartService.getVisualisation('timeline')).pipe(
switchMap(([filenames, visualization]) => {
const container = this.timelineComponent.nativeElement;
const chart = new visualization.Timeline(container);
const dataTable = new visualization.DataTable();

dataTable.addColumn({ type: 'string', id: 'file' });
dataTable.addColumn({ type: 'string', id: 'event' });
dataTable.addColumn({ type: 'date', id: 'Start' });
dataTable.addColumn({ type: 'date', id: 'End' });

const lastRow = new Map();

const chartOptions: TimelineOptions = {
height: 0,
};

const eventUpdates$ = this.eventsPool$.pipe(
tap(event => {
if (event.fileEventType === null) {
return;
}

if (lastRow.has(event.file)) {
dataTable.setCell(lastRow.get(event.file), 3, event.timestamp);
}

if (event.fileEventType === FileHashEvent.HASH_RECEIVED) {
lastRow.delete(event.file);
return;
}

let durationName: string;
switch (event.fileEventType) {
case FileHashEvent.SELECTED:
durationName = 'Queued, waiting for worker';
break;
case FileHashEvent.PICKED_UP:
durationName = 'Transferring file to worker';
if (this.filenames.indexOf(event.file) < navigator.hardwareConcurrency - 1) {
durationName = 'Starting worker, ' + durationName;
}
break;
case FileHashEvent.FILE_RECEIVED:
durationName = 'Reading file';
break;
case FileHashEvent.FILE_READ:
durationName = 'Computing hash';
break;
case FileHashEvent.HASH_COMPUTED:
durationName = 'Returning hash result to main thread';
break;
}

const row = dataTable.addRow([event.file, durationName, event.timestamp, event.timestamp]);
lastRow.set(event.file, row);

chartOptions.height = filenames.length * 41 + 50;

chart.draw(dataTable, chartOptions);
}),
);

const realtimeUpdater$ = interval(0, animationFrameScheduler).pipe(
tap(() => {
const rowsToUpdate = Array.from(lastRow.values());

for (const row of rowsToUpdate) {
dataTable.setCell(row, 3, new Date());
}

if (rowsToUpdate.length) {
chart.draw(dataTable, chartOptions);
}
}),
);

return eventUpdates$.pipe(
switchMapTo(realtimeUpdater$),
takeUntil(
this.complete$.pipe(
filter(c => c),
take(1),
),
),
);
}),
);

constructor(private googleChartService: GoogleChartsService) {}

private *workPool(files: File[]): IterableIterator<File> {
for (const file of files) {
yield file;
this.eventsPool$.next(this.logMessage(`file picked up for processing`, file.name));
this.eventsPool$.next(this.logMessage(FileHashEvent.PICKED_UP, `file picked up for processing`, file.name));
}
}

public hashMultipleFiles(files: File[]): Observable<ShaWorkerMessage> {
const queue: IterableIterator<File> = this.workPool(files);

return fromWorkerPool<Blob, ShaWorkerMessage>(index => {
const worker = new Worker('../secure-hash-algorithm.worker', { name: `sha-worker-${index}`, type: 'module' });
this.eventsPool$.next(this.logMessage(`worker ${index} created`));
this.eventsPool$.next(this.logMessage(null, `worker ${index} created`));
return worker;
}, this.workPool(files)).pipe(
}, queue).pipe(
tap(res => {
this.eventsPool$.next(res);
if (res.fileEventType === FileHashEvent.HASH_COMPUTED) {
this.eventsPool$.next({
...res,
fileEventType: FileHashEvent.HASH_RECEIVED,
timestamp: new Date(),
message: 'Hash received',
thread: Thread.MAIN,
});
}
}),
);
}

public calculateSha256Multiple($event): void {
const files: File[] = Array.from($event.target.files);
this.multiFilesToHash.next(files);
for (const file of files) {
this.eventsPool$.next(this.logMessage('file selected', file.name));
this.eventsPool$.next(this.logMessage(FileHashEvent.SELECTED, 'file selected', file.name));
}
this.multiFilesToHash.next(files);
}

private logMessage(message: string, file?: string): ShaWorkerMessage {
return { message, file, timestamp: new Date(), thread: Thread.MAIN };
private logMessage(eventType: FileHashEvent | null, message: string, file?: string): ShaWorkerMessage {
return { message, file, timestamp: new Date(), thread: Thread.MAIN, fileEventType: eventType };
}
}
Loading

0 comments on commit fb497e6

Please sign in to comment.