Skip to content
This repository has been archived by the owner on Mar 14, 2024. It is now read-only.

Commit

Permalink
fix: update activeReports before checking changes
Browse files Browse the repository at this point in the history
  • Loading branch information
sleidig committed Feb 28, 2024
1 parent 9732920 commit 1a0c4db
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 67 deletions.
141 changes: 98 additions & 43 deletions src/report-changes/core/report-changes.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,69 +1,124 @@
import { Test, TestingModule } from '@nestjs/testing';
import { ReportChangesService } from './report-changes.service';
import { BehaviorSubject, map, of, Subject } from 'rxjs';
import { BehaviorSubject, of, Subject } from 'rxjs';
import { NotificationService } from '../../notification/core/notification.service';
import { Reference } from '../../domain/reference';
import { ReportingStorage } from '../../report/storage/reporting-storage.service';
import { CouchDbChangesService } from '../storage/couch-db-changes.service';
import { CreateReportCalculationUseCase } from '../../report/core/use-cases/create-report-calculation-use-case.service';
import { DatabaseChangeResult } from '../storage/database-changes.service';
import {
CreateReportCalculationSuccess,
CreateReportCalculationUseCase,
} from '../../report/core/use-cases/create-report-calculation-use-case.service';
import {
DatabaseChangeResult,
DocChangeDetails,
} from '../storage/database-changes.service';
import { ReportSchemaGenerator } from '../../report/core/report-schema-generator';
import { ReportChangeDetector } from './report-change-detector';
import {
ReportCalculation,
ReportCalculationStatus,
} from '../../domain/report-calculation';

describe('ReportChangesService', () => {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
let service: ReportChangesService;
let mockNotificationService: Partial<NotificationService>;
let mockCreateReportCalculationUseCase: Partial<CreateReportCalculationUseCase>;
let mockReportStorage: Partial<ReportingStorage>;

let activeReports: BehaviorSubject<string[]>;
let mockedChangesStream: Subject<DatabaseChangeResult[]>;
let mockedChangesStream: Subject<DocChangeDetails>;

beforeEach(async () => {
mockedChangesStream = new Subject<DatabaseChangeResult[]>();
activeReports = new BehaviorSubject<string[]>([]);
mockedChangesStream = new Subject<DocChangeDetails>();
mockNotificationService = {
activeReports: () =>
activeReports
.asObservable()
.pipe(map((reportIds) => reportIds.map((id) => new Reference(id)))),
activeReports: () => of([]),
triggerNotification: jest.fn(),
};
mockCreateReportCalculationUseCase = { startReportCalculation: jest.fn() };
mockReportStorage = {
fetchReport: jest.fn(),
};

const module: TestingModule = await Test.createTestingModule({
providers: [
ReportChangesService,
{ provide: NotificationService, useValue: mockNotificationService },
{
provide: ReportingStorage,
useValue: { fetchReport: () => of() },
},
{
provide: CouchDbChangesService,
useValue: { subscribeToAllNewChanges: () => mockedChangesStream },
},
{
provide: CreateReportCalculationUseCase,
useValue: null,
},
],
}).compile();

service = module.get<ReportChangesService>(ReportChangesService);
});
service = new ReportChangesService(
mockNotificationService as NotificationService,
mockReportStorage as ReportingStorage,
{
subscribeToAllNewChangesWithDocs: () => mockedChangesStream,
} as Partial<CouchDbChangesService> as CouchDbChangesService,
mockCreateReportCalculationUseCase as CreateReportCalculationUseCase,
new ReportSchemaGenerator(),
);

xit('should trigger core after adding active report through NotificationService', (done) => {
const testReportId = 'report1';
activeReports.next([testReportId]);
jest.useFakeTimers();
});

// TODO mock a couchDbService.changes event
afterEach(() => {
jest.runOnlyPendingTimers();
jest.useRealTimers();
});

(
mockNotificationService.triggerNotification as jest.Mock
).mockImplementation((reportId: string) => {
expect(reportId).toBe(testReportId);
done();
function simulateDocChange(
change: Partial<DatabaseChangeResult> = { id: 'Person:1', doc: {} },
) {
mockedChangesStream.next({
change: change as DatabaseChangeResult,
previousDoc: undefined,
newDoc: change.doc,
});
}

it('should not check changes if NotificationService has active reports', () => {
mockNotificationService.activeReports = jest.fn().mockReturnValue(of([]));

simulateDocChange();
jest.runOnlyPendingTimers();

expect(
mockCreateReportCalculationUseCase.startReportCalculation,
).not.toHaveBeenCalled();
});

it('should check changes if NotificationService has active reports', (done) => {
mockNotificationService.triggerNotification = jest
.fn()
.mockImplementation((event) => {
console.log('triggerNotification', event);
done();
});

const report = {
id: 'ReportConfig:1',
queries: ['SELECT _id FROM Person'],
};

jest
.spyOn(ReportChangeDetector.prototype, 'affectsReport')
.mockImplementation(() => true);

mockNotificationService.activeReports = jest
.fn()
.mockReturnValue(of([{ id: report.id }]));
mockReportStorage.fetchReport = jest.fn().mockReturnValue(of(report));

const calculationResult: ReportCalculation = new ReportCalculation(
'1',
report,
)
.setStatus(ReportCalculationStatus.FINISHED_SUCCESS)
.setOutcome({ result_hash: '123' });
mockCreateReportCalculationUseCase.startReportCalculation = jest
.fn()
.mockReturnValue(
of(new CreateReportCalculationSuccess(calculationResult)),
);
mockCreateReportCalculationUseCase.getCompletedReportCalculation = jest
.fn()
.mockReturnValue(of(calculationResult));

simulateDocChange({ id: 'Person:1', doc: {} });
});

xit('should trigger core after adding active report through NotificationService', async () => {
xit('should trigger even after adding active report through NotificationService', async () => {
activeReports.next(['report1']);
activeReports.next(['report2' /* removed report1 */]);

Expand Down
71 changes: 47 additions & 24 deletions src/report-changes/core/report-changes.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,16 @@ import { ReportCalculationOutcomeSuccess } from '../../domain/report-calculation
import { Report } from '../../domain/report';
import { CouchDbChangesService } from '../storage/couch-db-changes.service';
import { ReportingStorage } from '../../report/storage/reporting-storage.service';
import { filter, map, Observable, switchMap, tap, zip } from 'rxjs';
import {
filter,
firstValueFrom,
from,
map,
Observable,
switchMap,
tap,
zip,
} from 'rxjs';
import {
CreateReportCalculationFailed,
CreateReportCalculationUseCase,
Expand All @@ -27,40 +36,50 @@ export class ReportChangesService {
private createReportCalculation: CreateReportCalculationUseCase,
private reportSchemaGenerator: IReportSchemaGenerator,
) {
this.notificationService
.activeReports()
.subscribe((reports: Reference[]) => {
reports.forEach((r) => this.registerReportMonitoring(r));
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for (const [id, monitor] of this.reportMonitors.entries()) {
if (!reports.some((r) => r.id === id)) {
this.reportMonitors.delete(id);
}
}
});

this.monitorCouchDbChanges();
}

async registerReportMonitoring(report: Reference) {
if (!this.reportMonitors.has(report.id)) {
this.setReportMonitor(report);
private async updateReportMonitors() {
const reports: Reference[] = await firstValueFrom(
this.notificationService.activeReports(),
);

// delete reports that are no longer active
for (const existingMonitorId of this.reportMonitors.keys()) {
if (!reports.some((r) => r.id === existingMonitorId)) {
this.reportMonitors.delete(existingMonitorId);
}
}

for (const currentReport of reports) {
if (!this.reportMonitors.has(currentReport.id)) {
await firstValueFrom(this.setReportMonitor(currentReport));
}
}

return this.reportMonitors;
}

private setReportMonitor(report: Reference) {
this.reportStorage
.fetchReport(report)
.subscribe((report: Report | undefined) => {
private setReportMonitor(
report: Reference,
): Observable<ReportChangeDetector | undefined> {
return this.reportStorage.fetchReport(report).pipe(
map((report: Report | undefined) => {
if (!report) {
return;
}

this.reportMonitors.set(
report.id,
new ReportChangeDetector(report, this.reportSchemaGenerator),
const changeDetector = new ReportChangeDetector(
report,
this.reportSchemaGenerator,
);
});
if (changeDetector) {
this.reportMonitors.set(report.id, changeDetector);
}

return changeDetector;
}),
);
}

private checkReportConfigUpdate(change: DatabaseChangeResult) {
Expand Down Expand Up @@ -88,6 +107,10 @@ export class ReportChangesService {
tap((change: DocChangeDetails) =>
this.checkReportConfigUpdate(change.change),
),
switchMap((change: DocChangeDetails) =>
// await an update of monitors for currently active reports
from(this.updateReportMonitors()).pipe(map(() => change)),
),
switchMap((change: DocChangeDetails) =>
this.changeIsAffectingReport(change),
),
Expand Down

0 comments on commit 1a0c4db

Please sign in to comment.