Skip to content

Commit

Permalink
Merge pull request #695 from janhq/fix/download-event
Browse files Browse the repository at this point in the history
fix: download event not updated in client
  • Loading branch information
namchuai authored Jun 14, 2024
2 parents 2474535 + 5147b41 commit 5b7e6dc
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 27 deletions.
4 changes: 2 additions & 2 deletions cortex-js/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import { SeedService } from './usecases/seed/seed.service';
import { FileManagerModule } from './infrastructure/services/file-manager/file-manager.module';
import { AppLoggerMiddleware } from './infrastructure/middlewares/app.logger.middleware';
import { EventEmitterModule } from '@nestjs/event-emitter';
import { AppController } from './infrastructure/controllers/app.controller';
import { DownloadManagerModule } from './download-manager/download-manager.module';
import { EventsController } from './infrastructure/controllers/events.controller';

@Module({
imports: [
Expand All @@ -40,7 +40,7 @@ import { DownloadManagerModule } from './download-manager/download-manager.modul
ModelRepositoryModule,
DownloadManagerModule,
],
controllers: [AppController],
controllers: [EventsController],
providers: [SeedService],
})
export class AppModule implements NestModule {
Expand Down
15 changes: 9 additions & 6 deletions cortex-js/src/download-manager/download-manager.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,7 @@ export class DownloadManagerService {
constructor(
private readonly httpService: HttpService,
private readonly eventEmitter: EventEmitter2,
) {
// start emitting download state each 500ms
// setInterval(() => {
// this.eventEmitter.emit('download.event', this.allDownloadStates);
// }, 500);
}
) {}

async abortDownload(downloadId: string) {
if (!this.abortControllers[downloadId]) {
Expand All @@ -37,6 +32,7 @@ export class DownloadManagerService {
this.allDownloadStates = this.allDownloadStates.filter(
(downloadState) => downloadState.id !== downloadId,
);
this.eventEmitter.emit('download.event.aborted', this.allDownloadStates);
}

async submitDownloadRequest(
Expand Down Expand Up @@ -160,6 +156,7 @@ export class DownloadManagerService {
(downloadState) => downloadState.id !== downloadId,
);
}
this.eventEmitter.emit('download.event', this.allDownloadStates);
});

writer.on('error', (error) => {
Expand All @@ -186,6 +183,7 @@ export class DownloadManagerService {
this.allDownloadStates = this.allDownloadStates.filter(
(downloadState) => downloadState.id !== downloadId,
);
this.eventEmitter.emit('download.event', this.allDownloadStates);
});

response.data.on('data', (chunk: any) => {
Expand All @@ -202,8 +200,13 @@ export class DownloadManagerService {
if (downloadItem) {
downloadItem.size.transferred = transferredBytes;
}
this.eventEmitter.emit('download.event', this.allDownloadStates);
});

response.data.pipe(writer);
}

getDownloadStates() {
return this.allDownloadStates;
}
}
19 changes: 0 additions & 19 deletions cortex-js/src/infrastructure/controllers/app.controller.ts

This file was deleted.

43 changes: 43 additions & 0 deletions cortex-js/src/infrastructure/controllers/events.controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import {
DownloadState,
DownloadStateEvent,
} from '@/domain/models/download.interface';
import { DownloadManagerService } from '@/download-manager/download-manager.service';
import { Controller, Sse } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { Observable, fromEvent, map, merge, of, throttleTime } from 'rxjs';

@Controller('events')
export class EventsController {
constructor(
private readonly downloadManagerService: DownloadManagerService,
private readonly eventEmitter: EventEmitter2,
) {}

@Sse('download')
downloadEvent(): Observable<DownloadStateEvent> {
// Welcome message Observable
const latestDownloadState$: Observable<DownloadStateEvent> = of({
data: this.downloadManagerService.getDownloadStates(),
});

const downloadAbortEvent$ = fromEvent<DownloadState[]>(
this.eventEmitter,
'download.event.aborted',
).pipe(map((downloadState) => ({ data: downloadState })));

const downloadEvent$ = fromEvent<DownloadState[]>(
this.eventEmitter,
'download.event',
).pipe(
map((downloadState) => ({ data: downloadState })),
throttleTime(1000),
);

return merge(
latestDownloadState$,
downloadEvent$,
downloadAbortEvent$,
).pipe();
}
}

0 comments on commit 5b7e6dc

Please sign in to comment.