Skip to content
This repository has been archived by the owner on Mar 14, 2024. It is now read-only.

fix: update activeReports before checking changes #39

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 98 additions & 43 deletions src/report-changes/core/report-changes.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,69 +1,124 @@
import { Test, TestingModule } from '@nestjs/testing';
import { ReportChangesService } from './report-changes.service';
import { BehaviorSubject, map, of, Subject } from 'rxjs';
import { BehaviorSubject, of, Subject } from 'rxjs';
import { NotificationService } from '../../notification/core/notification.service';
import { Reference } from '../../domain/reference';
import { ReportingStorage } from '../../report/storage/reporting-storage.service';
import { CouchDbChangesService } from '../storage/couch-db-changes.service';
import { CreateReportCalculationUseCase } from '../../report/core/use-cases/create-report-calculation-use-case.service';
import { DatabaseChangeResult } from '../storage/database-changes.service';
import {
CreateReportCalculationSuccess,
CreateReportCalculationUseCase,
} from '../../report/core/use-cases/create-report-calculation-use-case.service';
import {
DatabaseChangeResult,
DocChangeDetails,
} from '../storage/database-changes.service';
import { ReportSchemaGenerator } from '../../report/core/report-schema-generator';
import { ReportChangeDetector } from './report-change-detector';
import {
ReportCalculation,
ReportCalculationStatus,
} from '../../domain/report-calculation';

describe('ReportChangesService', () => {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
let service: ReportChangesService;
let mockNotificationService: Partial<NotificationService>;
let mockCreateReportCalculationUseCase: Partial<CreateReportCalculationUseCase>;
let mockReportStorage: Partial<ReportingStorage>;

let activeReports: BehaviorSubject<string[]>;
let mockedChangesStream: Subject<DatabaseChangeResult[]>;
let mockedChangesStream: Subject<DocChangeDetails>;

beforeEach(async () => {
mockedChangesStream = new Subject<DatabaseChangeResult[]>();
activeReports = new BehaviorSubject<string[]>([]);
mockedChangesStream = new Subject<DocChangeDetails>();
mockNotificationService = {
activeReports: () =>
activeReports
.asObservable()
.pipe(map((reportIds) => reportIds.map((id) => new Reference(id)))),
activeReports: () => of([]),
triggerNotification: jest.fn(),
};
mockCreateReportCalculationUseCase = { startReportCalculation: jest.fn() };
mockReportStorage = {
fetchReport: jest.fn(),
};

const module: TestingModule = await Test.createTestingModule({
providers: [
ReportChangesService,
{ provide: NotificationService, useValue: mockNotificationService },
{
provide: ReportingStorage,
useValue: { fetchReport: () => of() },
},
{
provide: CouchDbChangesService,
useValue: { subscribeToAllNewChanges: () => mockedChangesStream },
},
{
provide: CreateReportCalculationUseCase,
useValue: null,
},
],
}).compile();

service = module.get<ReportChangesService>(ReportChangesService);
});
service = new ReportChangesService(
mockNotificationService as NotificationService,
mockReportStorage as ReportingStorage,
{
subscribeToAllNewChangesWithDocs: () => mockedChangesStream,
} as Partial<CouchDbChangesService> as CouchDbChangesService,
mockCreateReportCalculationUseCase as CreateReportCalculationUseCase,
new ReportSchemaGenerator(),
);

xit('should trigger core after adding active report through NotificationService', (done) => {
const testReportId = 'report1';
activeReports.next([testReportId]);
jest.useFakeTimers();
});

// TODO mock a couchDbService.changes event
afterEach(() => {
jest.runOnlyPendingTimers();
jest.useRealTimers();
});

(
mockNotificationService.triggerNotification as jest.Mock
).mockImplementation((reportId: string) => {
expect(reportId).toBe(testReportId);
done();
function simulateDocChange(
change: Partial<DatabaseChangeResult> = { id: 'Person:1', doc: {} },
) {
mockedChangesStream.next({
change: change as DatabaseChangeResult,
previousDoc: undefined,
newDoc: change.doc,
});
}

it('should not check changes if NotificationService has active reports', () => {
mockNotificationService.activeReports = jest.fn().mockReturnValue(of([]));

simulateDocChange();
jest.runOnlyPendingTimers();

expect(
mockCreateReportCalculationUseCase.startReportCalculation,
).not.toHaveBeenCalled();
});

it('should check changes if NotificationService has active reports', (done) => {
mockNotificationService.triggerNotification = jest
.fn()
.mockImplementation((event) => {
console.log('triggerNotification', event);
done();
});

const report = {
id: 'ReportConfig:1',
queries: ['SELECT _id FROM Person'],
};

jest
.spyOn(ReportChangeDetector.prototype, 'affectsReport')
.mockImplementation(() => true);

mockNotificationService.activeReports = jest
.fn()
.mockReturnValue(of([{ id: report.id }]));
mockReportStorage.fetchReport = jest.fn().mockReturnValue(of(report));

const calculationResult: ReportCalculation = new ReportCalculation(
'1',
report,
)
.setStatus(ReportCalculationStatus.FINISHED_SUCCESS)
.setOutcome({ result_hash: '123' });
mockCreateReportCalculationUseCase.startReportCalculation = jest
.fn()
.mockReturnValue(
of(new CreateReportCalculationSuccess(calculationResult)),
);
mockCreateReportCalculationUseCase.getCompletedReportCalculation = jest
.fn()
.mockReturnValue(of(calculationResult));

simulateDocChange({ id: 'Person:1', doc: {} });
});

xit('should trigger core after adding active report through NotificationService', async () => {
xit('should trigger even after adding active report through NotificationService', async () => {
activeReports.next(['report1']);
activeReports.next(['report2' /* removed report1 */]);

Expand Down
71 changes: 47 additions & 24 deletions src/report-changes/core/report-changes.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,16 @@ import { ReportCalculationOutcomeSuccess } from '../../domain/report-calculation
import { Report } from '../../domain/report';
import { CouchDbChangesService } from '../storage/couch-db-changes.service';
import { ReportingStorage } from '../../report/storage/reporting-storage.service';
import { filter, map, Observable, switchMap, tap, zip } from 'rxjs';
import {
filter,
firstValueFrom,
from,
map,
Observable,
switchMap,
tap,
zip,
} from 'rxjs';
import {
CreateReportCalculationFailed,
CreateReportCalculationUseCase,
Expand All @@ -27,40 +36,50 @@ export class ReportChangesService {
private createReportCalculation: CreateReportCalculationUseCase,
private reportSchemaGenerator: IReportSchemaGenerator,
) {
this.notificationService
.activeReports()
.subscribe((reports: Reference[]) => {
reports.forEach((r) => this.registerReportMonitoring(r));
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for (const [id, monitor] of this.reportMonitors.entries()) {
if (!reports.some((r) => r.id === id)) {
this.reportMonitors.delete(id);
}
}
});

this.monitorCouchDbChanges();
}

async registerReportMonitoring(report: Reference) {
if (!this.reportMonitors.has(report.id)) {
this.setReportMonitor(report);
private async updateReportMonitors() {
const reports: Reference[] = await firstValueFrom(
this.notificationService.activeReports(),
);

// delete reports that are no longer active
for (const existingMonitorId of this.reportMonitors.keys()) {
if (!reports.some((r) => r.id === existingMonitorId)) {
this.reportMonitors.delete(existingMonitorId);
}
}

for (const currentReport of reports) {
if (!this.reportMonitors.has(currentReport.id)) {
await firstValueFrom(this.setReportMonitor(currentReport));
}
}

return this.reportMonitors;
}

private setReportMonitor(report: Reference) {
this.reportStorage
.fetchReport(report)
.subscribe((report: Report | undefined) => {
private setReportMonitor(
report: Reference,
): Observable<ReportChangeDetector | undefined> {
return this.reportStorage.fetchReport(report).pipe(
map((report: Report | undefined) => {
if (!report) {
return;
}

this.reportMonitors.set(
report.id,
new ReportChangeDetector(report, this.reportSchemaGenerator),
const changeDetector = new ReportChangeDetector(
report,
this.reportSchemaGenerator,
);
});
if (changeDetector) {
this.reportMonitors.set(report.id, changeDetector);
}

return changeDetector;
}),
);
}

private checkReportConfigUpdate(change: DatabaseChangeResult) {
Expand Down Expand Up @@ -88,6 +107,10 @@ export class ReportChangesService {
tap((change: DocChangeDetails) =>
this.checkReportConfigUpdate(change.change),
),
switchMap((change: DocChangeDetails) =>
// await an update of monitors for currently active reports
from(this.updateReportMonitors()).pipe(map(() => change)),
),
Comment on lines +110 to +113
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also do a deeper refactoring and remove the ChangeDetector instances completely. But the underlying bug should be fixed ASAP, so I kept that as is for now.

switchMap((change: DocChangeDetails) =>
this.changeIsAffectingReport(change),
),
Expand Down
Loading