diff --git a/src/report-changes/core/report-changes.service.spec.ts b/src/report-changes/core/report-changes.service.spec.ts index ff0ba5a..06fa355 100644 --- a/src/report-changes/core/report-changes.service.spec.ts +++ b/src/report-changes/core/report-changes.service.spec.ts @@ -1,69 +1,124 @@ -import { Test, TestingModule } from '@nestjs/testing'; import { ReportChangesService } from './report-changes.service'; -import { BehaviorSubject, map, of, Subject } from 'rxjs'; +import { BehaviorSubject, of, Subject } from 'rxjs'; import { NotificationService } from '../../notification/core/notification.service'; -import { Reference } from '../../domain/reference'; import { ReportingStorage } from '../../report/storage/reporting-storage.service'; import { CouchDbChangesService } from '../storage/couch-db-changes.service'; -import { CreateReportCalculationUseCase } from '../../report/core/use-cases/create-report-calculation-use-case.service'; -import { DatabaseChangeResult } from '../storage/database-changes.service'; +import { + CreateReportCalculationSuccess, + CreateReportCalculationUseCase, +} from '../../report/core/use-cases/create-report-calculation-use-case.service'; +import { + DatabaseChangeResult, + DocChangeDetails, +} from '../storage/database-changes.service'; +import { ReportSchemaGenerator } from '../../report/core/report-schema-generator'; +import { ReportChangeDetector } from './report-change-detector'; +import { + ReportCalculation, + ReportCalculationStatus, +} from '../../domain/report-calculation'; describe('ReportChangesService', () => { // eslint-disable-next-line @typescript-eslint/no-unused-vars let service: ReportChangesService; let mockNotificationService: Partial; + let mockCreateReportCalculationUseCase: Partial; + let mockReportStorage: Partial; let activeReports: BehaviorSubject; - let mockedChangesStream: Subject; + let mockedChangesStream: Subject; beforeEach(async () => { - mockedChangesStream = new Subject(); - activeReports = new BehaviorSubject([]); + mockedChangesStream = new Subject(); mockNotificationService = { - activeReports: () => - activeReports - .asObservable() - .pipe(map((reportIds) => reportIds.map((id) => new Reference(id)))), + activeReports: () => of([]), triggerNotification: jest.fn(), }; + mockCreateReportCalculationUseCase = { startReportCalculation: jest.fn() }; + mockReportStorage = { + fetchReport: jest.fn(), + }; - const module: TestingModule = await Test.createTestingModule({ - providers: [ - ReportChangesService, - { provide: NotificationService, useValue: mockNotificationService }, - { - provide: ReportingStorage, - useValue: { fetchReport: () => of() }, - }, - { - provide: CouchDbChangesService, - useValue: { subscribeToAllNewChanges: () => mockedChangesStream }, - }, - { - provide: CreateReportCalculationUseCase, - useValue: null, - }, - ], - }).compile(); - - service = module.get(ReportChangesService); - }); + service = new ReportChangesService( + mockNotificationService as NotificationService, + mockReportStorage as ReportingStorage, + { + subscribeToAllNewChangesWithDocs: () => mockedChangesStream, + } as Partial as CouchDbChangesService, + mockCreateReportCalculationUseCase as CreateReportCalculationUseCase, + new ReportSchemaGenerator(), + ); - xit('should trigger core after adding active report through NotificationService', (done) => { - const testReportId = 'report1'; - activeReports.next([testReportId]); + jest.useFakeTimers(); + }); - // TODO mock a couchDbService.changes event + afterEach(() => { + jest.runOnlyPendingTimers(); + jest.useRealTimers(); + }); - ( - mockNotificationService.triggerNotification as jest.Mock - ).mockImplementation((reportId: string) => { - expect(reportId).toBe(testReportId); - done(); + function simulateDocChange( + change: Partial = { id: 'Person:1', doc: {} }, + ) { + mockedChangesStream.next({ + change: change as DatabaseChangeResult, + previousDoc: undefined, + newDoc: change.doc, }); + } + + it('should not check changes if NotificationService has active reports', () => { + mockNotificationService.activeReports = jest.fn().mockReturnValue(of([])); + + simulateDocChange(); + jest.runOnlyPendingTimers(); + + expect( + mockCreateReportCalculationUseCase.startReportCalculation, + ).not.toHaveBeenCalled(); + }); + + it('should check changes if NotificationService has active reports', (done) => { + mockNotificationService.triggerNotification = jest + .fn() + .mockImplementation((event) => { + console.log('triggerNotification', event); + done(); + }); + + const report = { + id: 'ReportConfig:1', + queries: ['SELECT _id FROM Person'], + }; + + jest + .spyOn(ReportChangeDetector.prototype, 'affectsReport') + .mockImplementation(() => true); + + mockNotificationService.activeReports = jest + .fn() + .mockReturnValue(of([{ id: report.id }])); + mockReportStorage.fetchReport = jest.fn().mockReturnValue(of(report)); + + const calculationResult: ReportCalculation = new ReportCalculation( + '1', + report, + ) + .setStatus(ReportCalculationStatus.FINISHED_SUCCESS) + .setOutcome({ result_hash: '123' }); + mockCreateReportCalculationUseCase.startReportCalculation = jest + .fn() + .mockReturnValue( + of(new CreateReportCalculationSuccess(calculationResult)), + ); + mockCreateReportCalculationUseCase.getCompletedReportCalculation = jest + .fn() + .mockReturnValue(of(calculationResult)); + + simulateDocChange({ id: 'Person:1', doc: {} }); }); - xit('should trigger core after adding active report through NotificationService', async () => { + xit('should trigger even after adding active report through NotificationService', async () => { activeReports.next(['report1']); activeReports.next(['report2' /* removed report1 */]); diff --git a/src/report-changes/core/report-changes.service.ts b/src/report-changes/core/report-changes.service.ts index acaa87b..7c0f82f 100644 --- a/src/report-changes/core/report-changes.service.ts +++ b/src/report-changes/core/report-changes.service.ts @@ -6,7 +6,16 @@ import { ReportCalculationOutcomeSuccess } from '../../domain/report-calculation import { Report } from '../../domain/report'; import { CouchDbChangesService } from '../storage/couch-db-changes.service'; import { ReportingStorage } from '../../report/storage/reporting-storage.service'; -import { filter, map, Observable, switchMap, tap, zip } from 'rxjs'; +import { + filter, + firstValueFrom, + from, + map, + Observable, + switchMap, + tap, + zip, +} from 'rxjs'; import { CreateReportCalculationFailed, CreateReportCalculationUseCase, @@ -27,40 +36,50 @@ export class ReportChangesService { private createReportCalculation: CreateReportCalculationUseCase, private reportSchemaGenerator: IReportSchemaGenerator, ) { - this.notificationService - .activeReports() - .subscribe((reports: Reference[]) => { - reports.forEach((r) => this.registerReportMonitoring(r)); - // eslint-disable-next-line @typescript-eslint/no-unused-vars - for (const [id, monitor] of this.reportMonitors.entries()) { - if (!reports.some((r) => r.id === id)) { - this.reportMonitors.delete(id); - } - } - }); - this.monitorCouchDbChanges(); } - async registerReportMonitoring(report: Reference) { - if (!this.reportMonitors.has(report.id)) { - this.setReportMonitor(report); + private async updateReportMonitors() { + const reports: Reference[] = await firstValueFrom( + this.notificationService.activeReports(), + ); + + // delete reports that are no longer active + for (const existingMonitorId of this.reportMonitors.keys()) { + if (!reports.some((r) => r.id === existingMonitorId)) { + this.reportMonitors.delete(existingMonitorId); + } + } + + for (const currentReport of reports) { + if (!this.reportMonitors.has(currentReport.id)) { + await firstValueFrom(this.setReportMonitor(currentReport)); + } } + + return this.reportMonitors; } - private setReportMonitor(report: Reference) { - this.reportStorage - .fetchReport(report) - .subscribe((report: Report | undefined) => { + private setReportMonitor( + report: Reference, + ): Observable { + return this.reportStorage.fetchReport(report).pipe( + map((report: Report | undefined) => { if (!report) { return; } - this.reportMonitors.set( - report.id, - new ReportChangeDetector(report, this.reportSchemaGenerator), + const changeDetector = new ReportChangeDetector( + report, + this.reportSchemaGenerator, ); - }); + if (changeDetector) { + this.reportMonitors.set(report.id, changeDetector); + } + + return changeDetector; + }), + ); } private checkReportConfigUpdate(change: DatabaseChangeResult) { @@ -88,6 +107,10 @@ export class ReportChangesService { tap((change: DocChangeDetails) => this.checkReportConfigUpdate(change.change), ), + switchMap((change: DocChangeDetails) => + // await an update of monitors for currently active reports + from(this.updateReportMonitors()).pipe(map(() => change)), + ), switchMap((change: DocChangeDetails) => this.changeIsAffectingReport(change), ),