From 94f4d8bbd67c818e72e44adff2f1633d7f7e2b85 Mon Sep 17 00:00:00 2001 From: Sebastian Date: Thu, 15 Feb 2024 20:37:00 +0100 Subject: [PATCH] feat: initial implementation of detect-changes module (#17) to subscribe database _changes feed, detect those affecting a report and triggering events see #16 --------- Co-authored-by: Tom Winter --- .env | 4 +- src/app.controller.spec.ts | 184 ------------------ src/app.controller.ts | 94 --------- src/app.module.ts | 4 + src/couchdb/couch-db-client.service.ts | 22 +++ src/couchdb/dtos.ts | 35 ++++ src/domain/report-data-change-event.ts | 13 ++ src/domain/report.ts | 15 +- .../core/notification.service.spec.ts | 18 ++ src/notification/core/notification.service.ts | 25 +++ src/notification/notification.module.ts | 8 + .../core/report-change-detector.spec.ts | 68 +++++++ .../core/report-change-detector.ts | 54 +++++ .../core/report-changes.service.spec.ts | 79 ++++++++ .../core/report-changes.service.ts | 170 ++++++++++++++++ src/report-changes/report-changes.module.ts | 20 ++ .../storage/couchdb-changes.service.spec.ts | 156 +++++++++++++++ .../storage/couchdb-changes.service.ts | 131 +++++++++++++ .../storage/database-changes.service.ts | 12 ++ src/report-changes/test-controller.ts | 20 ++ .../report-calculation.controller.spec.ts | 2 + .../report-calculation.controller.ts | 35 ++-- src/report/controller/report.controller.ts | 5 +- src/report/core/report-storage.ts | 6 +- ...eport-calculation-use-case.service.spec.ts | 24 +++ ...ate-report-calculation-use-case.service.ts | 74 +++++++ src/report/report.module.ts | 3 + .../repository/report-repository.service.ts | 9 +- src/report/storage/report-storage.service.ts | 28 ++- 29 files changed, 1011 insertions(+), 307 deletions(-) delete mode 100644 src/app.controller.spec.ts delete mode 100644 src/app.controller.ts create mode 100644 src/domain/report-data-change-event.ts create mode 100644 src/notification/core/notification.service.spec.ts create mode 100644 src/notification/core/notification.service.ts create mode 100644 src/notification/notification.module.ts create mode 100644 src/report-changes/core/report-change-detector.spec.ts create mode 100644 src/report-changes/core/report-change-detector.ts create mode 100644 src/report-changes/core/report-changes.service.spec.ts create mode 100644 src/report-changes/core/report-changes.service.ts create mode 100644 src/report-changes/report-changes.module.ts create mode 100644 src/report-changes/storage/couchdb-changes.service.spec.ts create mode 100644 src/report-changes/storage/couchdb-changes.service.ts create mode 100644 src/report-changes/storage/database-changes.service.ts create mode 100644 src/report-changes/test-controller.ts create mode 100644 src/report/core/use-cases/create-report-calculation-use-case.service.spec.ts create mode 100644 src/report/core/use-cases/create-report-calculation-use-case.service.ts diff --git a/.env b/.env index 462418b..5e64c44 100644 --- a/.env +++ b/.env @@ -1,7 +1,9 @@ SENTRY_DSN= PORT= DATABASE_URL=http://127.0.0.1:5984 -DATABASE_ADMIN=admin +DATABASE_USER=admin DATABASE_PASSWORD=admin QUERY_URL=http://127.0.0.1:4984 SCHEMA_CONFIG_ID=_design/sqlite:config +REPORT_DATABASE_URL=http://127.0.0.1:5984 +REPORT_DATABASE_NAME=app \ No newline at end of file diff --git a/src/app.controller.spec.ts b/src/app.controller.spec.ts deleted file mode 100644 index 4303657..0000000 --- a/src/app.controller.spec.ts +++ /dev/null @@ -1,184 +0,0 @@ -import { AppController } from './app.controller'; -import { Test, TestingModule } from '@nestjs/testing'; -import { firstValueFrom, of, throwError } from 'rxjs'; -import { HttpService } from '@nestjs/axios'; -import { BadRequestException, HttpException, HttpStatus } from '@nestjs/common'; -import { SqlReport } from './sql-report'; -import { ConfigService } from '@nestjs/config'; -import { QueryBody } from './query-body.dto'; - -describe('AppController', () => { - let controller: AppController; - let mockHttp: { post: jest.Mock; get: jest.Mock }; - const dbUrl = 'database:3000'; - const queryUrl = 'query:3000'; - const schemaConfigId = '_design/sqlite:config'; - const adminAuth = { username: 'admin', password: 'adminPW' }; - - beforeEach(async () => { - mockHttp = { - post: jest.fn().mockReturnValue(of({ data: undefined })), - get: jest.fn().mockReturnValue(of({ data: undefined })), - }; - const mockConfigService = { - getOrThrow: (key: any) => { - switch (key) { - case 'DATABASE_URL': - return dbUrl; - case 'QUERY_URL': - return queryUrl; - case 'SCHEMA_CONFIG_ID': - return schemaConfigId; - case 'DATABASE_ADMIN': - return adminAuth.username; - case 'DATABASE_PASSWORD': - return adminAuth.password; - default: - throw Error('missing mock value for ' + key); - } - }, - }; - const module: TestingModule = await Test.createTestingModule({ - providers: [ - AppController, - { provide: HttpService, useValue: mockHttp }, - { provide: ConfigService, useValue: mockConfigService }, - ], - }).compile(); - - controller = module.get(AppController); - }); - - it('should create', () => { - expect(controller).toBeDefined(); - }); - - it('should forward report query to SQS and return result', (done) => { - const report: SqlReport = { - mode: 'sql', - aggregationDefinitions: ['SELECT * FROM someTable'], - }; - mockHttp.get.mockReturnValue(of({ data: report })); - const queryResult = [{ some: 'data' }]; - mockHttp.post.mockReturnValue(of({ data: queryResult })); - - controller - .queryData('ReportConfig:some-id', 'app', 'valid token') - .subscribe((res) => { - expect(mockHttp.get).toHaveBeenCalledWith( - `${dbUrl}/app/ReportConfig:some-id`, - { headers: { Authorization: 'valid token' } }, - ); - expect(mockHttp.post).toHaveBeenCalledWith( - `${queryUrl}/app/${schemaConfigId}`, - { query: report.aggregationDefinitions[0] }, - { auth: adminAuth }, - ); - expect(res).toEqual(queryResult); - - done(); - }); - }); - - it('should add dates as args to query request if "?" is used', async () => { - const report: SqlReport = { - mode: 'sql', - aggregationDefinitions: [], - }; - mockHttp.get.mockReturnValue(of({ data: report })); - const body: QueryBody = { from: '2023-01-01', to: '2024-01-01' }; - - // No "?" in query - report.aggregationDefinitions = ['SELECT * FROM Note']; - await firstValueFrom( - controller.queryData('ReportConfig:some-id', 'app', 'valid token', body), - ); - expect(mockHttp.post).toHaveBeenCalledWith( - `${queryUrl}/app/${schemaConfigId}`, - { - query: report.aggregationDefinitions[0], - }, - { auth: adminAuth }, - ); - - // two "?" in query - report.aggregationDefinitions = [ - 'SELECT * FROM Note WHERE e.date BETWEEN ? AND ?', - ]; - await firstValueFrom( - controller.queryData('ReportConfig:some-id', 'app', 'valid token', body), - ); - expect(mockHttp.post).toHaveBeenCalledWith( - `${queryUrl}/app/${schemaConfigId}`, - { - query: report.aggregationDefinitions[0], - args: [body.from, body.to], - }, - { auth: adminAuth }, - ); - }); - - it('should concatenate the result of multiple SELECT queries', (done) => { - const firstResult = [{ value: 'first' }, { value: 'second' }]; - const secondResult = [{ value: 'third' }]; - const report: SqlReport = { - mode: 'sql', - aggregationDefinitions: ['SELECT * FROM Child', 'SELECT * FROM School'], - }; - mockHttp.get.mockReturnValue(of({ data: report })); - mockHttp.post - .mockReturnValueOnce(of({ data: firstResult })) - .mockReturnValueOnce(of({ data: secondResult })); - - controller - .queryData('ReportConfig:some-id', 'app', 'valid token') - .subscribe((res) => { - expect(mockHttp.post).toHaveBeenCalledWith( - `${queryUrl}/app/${schemaConfigId}`, - { query: report.aggregationDefinitions[0] }, - { auth: adminAuth }, - ); - expect(mockHttp.post).toHaveBeenCalledWith( - `${queryUrl}/app/${schemaConfigId}`, - { query: report.aggregationDefinitions[1] }, - { auth: adminAuth }, - ); - expect(res).toEqual([...firstResult, ...secondResult]); - - done(); - }); - }); - - it('should throw error if user is not permitted to request report', (done) => { - mockHttp.get.mockReturnValue( - throwError(() => ({ - response: { data: 'Unauthorized', status: 401 }, - })), - ); - controller - .queryData('ReportConfig:some-id', 'app', 'invalid token') - .subscribe({ - error: (err: HttpException) => { - expect(err.getStatus()).toBe(HttpStatus.UNAUTHORIZED); - done(); - }, - }); - }); - - it('should throw error trying to query a non-sql report', (done) => { - const report: SqlReport = { - mode: 'exporting' as any, - aggregationDefinitions: [], - }; - mockHttp.get.mockReturnValue(of({ data: report })); - - controller - .queryData('ReportConfig:some-id', 'app', 'valid token') - .subscribe({ - error: (err) => { - expect(err).toBeInstanceOf(BadRequestException); - done(); - }, - }); - }); -}); diff --git a/src/app.controller.ts b/src/app.controller.ts deleted file mode 100644 index d218116..0000000 --- a/src/app.controller.ts +++ /dev/null @@ -1,94 +0,0 @@ -import { - BadRequestException, - Body, - Controller, - Headers, - HttpException, - Param, - Post, -} from '@nestjs/common'; -import { HttpService } from '@nestjs/axios'; -import { ConfigService } from '@nestjs/config'; -import { catchError, concat, map, mergeMap, Observable, toArray } from 'rxjs'; -import { SqlReport } from './sql-report'; -import { QueryBody } from './query-body.dto'; - -@Controller('report') -export class AppController { - private dbUrl = this.configService.getOrThrow('DATABASE_URL'); - private queryUrl = this.configService.getOrThrow('QUERY_URL'); - private schemaDocId = this.configService.getOrThrow('SCHEMA_CONFIG_ID'); - private couchAdmin = this.configService.getOrThrow('DATABASE_ADMIN'); - private couchPassword = this.configService.getOrThrow('DATABASE_PASSWORD'); - constructor( - private http: HttpService, - private configService: ConfigService, - ) {} - - @Post(':db/:id') - queryData( - @Param('id') reportId: string, - @Param('db') db: string, - @Headers('Authorization') token: string, - @Body() body?: QueryBody, - ) { - return this.http - .get(`${this.dbUrl}/${db}/${reportId}`, { - headers: { Authorization: token }, - }) - .pipe( - mergeMap(({ data }) => this.executeReport(data, db, body)), - catchError((err) => { - throw err.response?.data - ? new HttpException(err.response.data, err.response.status) - : err; - }), - ); - } - - private executeReport(report: SqlReport, db: string, args?: QueryBody) { - if (report.mode !== 'sql') { - throw new BadRequestException('Not an SQL report'); - } - if (!report.aggregationDefinitions) { - throw new BadRequestException('Report query not configured'); - } - - // execute all requests in sequence - return concat( - ...report.aggregationDefinitions.map((query) => - this.getQueryResult(query, args, db), - ), - ).pipe( - // combine results of each request - toArray(), - map((res) => res.flat()), - ); - } - - private getQueryResult( - query: string, - args: QueryBody | undefined, - db: string, - ): Observable { - const data: SqsRequest = { query: query }; - // There needs to be the same amount of "?" in the query as elements in "args" - if (args?.from && args?.to && query.match(/\?/g)?.length === 2) { - data.args = [args.from, args.to]; - } - return this.http - .post(`${this.queryUrl}/${db}/${this.schemaDocId}`, data, { - auth: { username: this.couchAdmin, password: this.couchPassword }, - }) - .pipe(map(({ data }) => data)); - } -} - -/** - * Request body as required by the SQS service. See SQS docs for more info. - * {@link https://neighbourhood.ie/products-and-services/structured-query-server} - */ -interface SqsRequest { - query: string; - args?: any[]; -} diff --git a/src/app.module.ts b/src/app.module.ts index 2f5b0d5..658cb5a 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -6,6 +6,8 @@ import { ConfigModule, ConfigService } from '@nestjs/config'; import { HttpModule } from '@nestjs/axios'; import { ReportModule } from './report/report.module'; import { ScheduleModule } from '@nestjs/schedule'; +import { ReportChangesModule } from './report-changes/report-changes.module'; +import { NotificationModule } from './notification/notification.module'; const lowSeverityLevels: SeverityLevel[] = ['log', 'info']; @@ -59,6 +61,8 @@ const lowSeverityLevels: SeverityLevel[] = ['log', 'info']; }, }), ReportModule, + ReportChangesModule, + NotificationModule, ], controllers: [], }) diff --git a/src/couchdb/couch-db-client.service.ts b/src/couchdb/couch-db-client.service.ts index ea3d252..4833ff8 100644 --- a/src/couchdb/couch-db-client.service.ts +++ b/src/couchdb/couch-db-client.service.ts @@ -2,6 +2,7 @@ import { Injectable, Logger } from '@nestjs/common'; import { catchError, map, Observable, of, switchMap } from 'rxjs'; import { HttpService } from '@nestjs/axios'; import { AxiosHeaders } from 'axios'; +import { CouchDbChangesResponse } from './dtos'; @Injectable() export class CouchDbClient { @@ -118,4 +119,25 @@ export class CouchDbClient { private handleError(err: any) { this.logger.debug(err); } + + changes( + databaseUrl: string, + databaseName: string, + config?: any, + ): Observable { + return this.httpService + .get( + `${databaseUrl}/${databaseName}/_changes`, + config, + ) + .pipe( + map((response) => { + return response.data; + }), + catchError((err) => { + this.handleError(err); + throw err; + }), + ); + } } diff --git a/src/couchdb/dtos.ts b/src/couchdb/dtos.ts index dcacffe..0d282c1 100644 --- a/src/couchdb/dtos.ts +++ b/src/couchdb/dtos.ts @@ -26,3 +26,38 @@ export class FindResponse { docs: T[]; } + +/** + * Response from the CouchDB changes endpoint, listing database docs that have changed + * since the given last change (last_seq). + * + * see https://docs.couchdb.org/en/stable/api/database/changes.html + */ +export interface CouchDbChangesResponse { + /** Last change update sequence */ + last_seq: string; + + /** array of docs with changes */ + results: CouchDbChangeResult[]; + + /** Count of remaining items in the feed */ + pending: number; +} + +/** + * A single result entry from a CouchDB changes feed, + * indicating one doc has changed. + * + * see https://docs.couchdb.org/en/stable/api/database/changes.html + */ +export interface CouchDbChangeResult { + /** _id of a doc with changes */ + id: string; + + /** List of document’s leaves with single field rev. */ + changes: { rev: string }[]; + + seq: string; + + doc?: any; +} diff --git a/src/domain/report-data-change-event.ts b/src/domain/report-data-change-event.ts new file mode 100644 index 0000000..5581de5 --- /dev/null +++ b/src/domain/report-data-change-event.ts @@ -0,0 +1,13 @@ +import { Reference } from './reference'; +import { ReportCalculation } from './report-calculation'; + +/** + * Used as core that a report's calculated results have changed, due to updates in the underlying database. + */ +export interface ReportDataChangeEvent { + /** The report for which data has changed */ + report: Reference; + + /** The calculation containing the latest data after the change, ready to be fetched */ + calculation: ReportCalculation; +} diff --git a/src/domain/report.ts b/src/domain/report.ts index 1d4102d..d800e62 100644 --- a/src/domain/report.ts +++ b/src/domain/report.ts @@ -12,11 +12,14 @@ export interface ReportSchema { export class Report { id: string; name: string; + mode: string | undefined; schema: ReportSchema | undefined; + queries: string[]; - constructor(id: string, name: string) { + constructor(id: string, name: string, queries: string[]) { this.id = id; this.name = name; + this.queries = queries; } setId(id: string): Report { @@ -24,6 +27,11 @@ export class Report { return this; } + setMode(mode: string): Report { + this.mode = mode; + return this; + } + setName(name: string): Report { this.name = name; return this; @@ -33,4 +41,9 @@ export class Report { this.schema = schema; return this; } + + setQueries(queries: string[]): Report { + this.queries = queries; + return this; + } } diff --git a/src/notification/core/notification.service.spec.ts b/src/notification/core/notification.service.spec.ts new file mode 100644 index 0000000..65bd59d --- /dev/null +++ b/src/notification/core/notification.service.spec.ts @@ -0,0 +1,18 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { NotificationService } from './notification.service'; + +describe('NotificationService', () => { + let service: NotificationService; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [NotificationService], + }).compile(); + + service = module.get(NotificationService); + }); + + it('should be defined', () => { + expect(service).toBeDefined(); + }); +}); diff --git a/src/notification/core/notification.service.ts b/src/notification/core/notification.service.ts new file mode 100644 index 0000000..d5b2a17 --- /dev/null +++ b/src/notification/core/notification.service.ts @@ -0,0 +1,25 @@ +import { Injectable } from '@nestjs/common'; +import { Observable, of } from 'rxjs'; +import { Reference } from '../../domain/reference'; +import { ReportDataChangeEvent } from '../../domain/report-data-change-event'; + +/** + * Manage core subscriptions and delivering events to subscribers. + */ +@Injectable() +export class NotificationService { + /** + * Get the list of reports for which notifications are subscribed by at least one client. + */ + activeReports(): Observable { + // TODO: is this emitting the whole list every time the subscriptions change, as the name suggests? + // or individual id when added (but then, how is unsubscribe tracked?) + // may be easier if I can just directly get the list of currently active reports + return of([]); + } + + /** + * Trigger a core event for the given report to any active subscribers. + */ + triggerNotification(event: ReportDataChangeEvent): void {} +} diff --git a/src/notification/notification.module.ts b/src/notification/notification.module.ts new file mode 100644 index 0000000..bafb43b --- /dev/null +++ b/src/notification/notification.module.ts @@ -0,0 +1,8 @@ +import { Module } from '@nestjs/common'; +import { NotificationService } from './core/notification.service'; + +@Module({ + providers: [NotificationService], + exports: [NotificationService], +}) +export class NotificationModule {} diff --git a/src/report-changes/core/report-change-detector.spec.ts b/src/report-changes/core/report-change-detector.spec.ts new file mode 100644 index 0000000..6a42b63 --- /dev/null +++ b/src/report-changes/core/report-change-detector.spec.ts @@ -0,0 +1,68 @@ +import { EntityDoc, ReportChangeDetector } from './report-change-detector'; +import { Report } from '../../domain/report'; +import { DocChangeDetails } from './report-changes.service'; + +describe('ReportChangeDetector', () => { + function testReportChangeDetection( + sqlStatement: string, + testCases: [EntityDoc, boolean][], + ) { + const report: Partial = { + id: 'test-report-id', + mode: 'sql', + queries: [sqlStatement], + }; + const service = new ReportChangeDetector(report as Report); + + for (const [newDoc, expectedResult] of testCases) { + const mockedDocChange: DocChangeDetails = { + change: { + id: newDoc._id, + changes: [], + seq: '', + }, + new: newDoc, + previous: newDoc, + }; + expect(service.affectsReport(mockedDocChange)).toBe(expectedResult); + } + } + + it('should detect doc change that triggers report change for basic "SELECT *" report', () => { + testReportChangeDetection('SELECT * FROM EventNote', [ + [{ _id: 'EventNote:1' }, true], + [{ _id: 'Event:1' }, false], + ]); + }); + + xit('should detect only docs used in SELECT clause are relevant', () => { + testReportChangeDetection('SELECT name FROM EventNote', [ + [{ _id: 'EventNote:1', name: 'foo' }, true], + [{ _id: 'EventNote:field-missing' }, false], // TODO: not implemented yet + [{ _id: 'Event:other-type', name: 'foo' }, false], + ]); + }); + + xit('should detect only docs with previous or new value of field matching WHERE clause are relevant', () => { + testReportChangeDetection( + "SELECT * FROM EventNote WHERE location='Berlin'", + [ + [{ _id: 'EventNote:1', location: 'Berlin' }, true], + [{ _id: 'EventNote:field-not-matching', location: 'New York' }, false], // TODO: not implemented yet + [{ _id: 'EventNote:field-missing' }, false], + [{ _id: 'Event:other-type', location: 'Berlin' }, false], + ], + ); + }); + + it('should detect fields in joins and complex json properties', () => { + testReportChangeDetection( + "SELECT c.name as Name, AttStatus as Status FROM Child c JOIN (SELECT json_extract(att.value, '$[0]') AS attendanceChildId, json_extract(att.value, '$[1].status') AS AttStatus FROM EventNote e, json_each(e.childrenAttendance) att) ON attendanceChildId=c._id", + [ + // TODO + [{ _id: 'EventNote:1' }, true], + [{ _id: 'Child:1' }, true], + ], + ); + }); +}); diff --git a/src/report-changes/core/report-change-detector.ts b/src/report-changes/core/report-change-detector.ts new file mode 100644 index 0000000..b3b38e2 --- /dev/null +++ b/src/report-changes/core/report-change-detector.ts @@ -0,0 +1,54 @@ +import { Report } from '../../domain/report'; +import { DocChangeDetails } from './report-changes.service'; + +/** + * Simple class encapsulating the logic to determine if a specific report is affected by a change in the database. + */ +export class ReportChangeDetector { + public report: Report; + public lastCalculationHash: string | undefined; + + private sqlTableNames: string[] = []; + + constructor(report: Report) { + this.report = report; + this.updateReportConfig(report); + } + + updateReportConfig(report: Report) { + this.report = report; + + this.sqlTableNames = this.getSqlTableNames(report); + } + + private getSqlTableNames(report: Report) { + const sqlFromTableRegex = /FROM\s+(\w+)/g; + + return report.queries + .map((sql: string) => + [...sql.matchAll(sqlFromTableRegex)].map( + (match) => match[1] /* matching regex group (table name) */, + ), + ) + .flat(); + } + + affectsReport(doc: DocChangeDetails): boolean { + const entityType = doc.change.id.split(':')[0]; + if (!this.sqlTableNames.includes(entityType)) { + return false; + } + + // TODO: better detection if doc affects report + return true; + } +} + +/** + * A doc in the database representing an entity managed in the frontend. + */ +export interface EntityDoc { + _id: string; + + [key: string]: any; +} diff --git a/src/report-changes/core/report-changes.service.spec.ts b/src/report-changes/core/report-changes.service.spec.ts new file mode 100644 index 0000000..ba18622 --- /dev/null +++ b/src/report-changes/core/report-changes.service.spec.ts @@ -0,0 +1,79 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { ReportChangesService } from './report-changes.service'; +import { BehaviorSubject, map, of, Subject } from 'rxjs'; +import { NotificationService } from '../../notification/core/notification.service'; +import { Reference } from '../../domain/reference'; +import { DefaultReportStorage } from '../../report/storage/report-storage.service'; +import { CouchdbChangesService } from '../storage/couchdb-changes.service'; +import { CreateReportCalculationUseCase } from '../../report/core/use-cases/create-report-calculation-use-case.service'; +import { DatabaseChangeResult } from '../storage/database-changes.service'; + +describe('ReportChangesService', () => { + let service: ReportChangesService; + let mockNotificationService: Partial; + + let activeReports: BehaviorSubject; + let mockedChangesStream: Subject; + + beforeEach(async () => { + mockedChangesStream = new Subject(); + activeReports = new BehaviorSubject([]); + mockNotificationService = { + activeReports: () => + activeReports + .asObservable() + .pipe(map((reportIds) => reportIds.map((id) => new Reference(id)))), + triggerNotification: jest.fn(), + }; + + const module: TestingModule = await Test.createTestingModule({ + providers: [ + ReportChangesService, + { provide: NotificationService, useValue: mockNotificationService }, + { + provide: DefaultReportStorage, + useValue: { fetchReport: () => of() }, + }, + { + provide: CouchdbChangesService, + useValue: { subscribeToAllNewChanges: () => mockedChangesStream }, + }, + { + provide: CreateReportCalculationUseCase, + useValue: null, + }, + ], + }).compile(); + + service = module.get(ReportChangesService); + }); + + xit('should trigger core after adding active report through NotificationService', (done) => { + const testReportId = 'report1'; + activeReports.next([testReportId]); + + // TODO mock a couchDbService.changes event + + ( + mockNotificationService.triggerNotification as jest.Mock + ).mockImplementation((reportId: string) => { + expect(reportId).toBe(testReportId); + done(); + }); + }); + + xit('should trigger core after adding active report through NotificationService', async () => { + activeReports.next(['report1']); + activeReports.next(['report2' /* removed report1 */]); + + // TODO mock a couchDbService.changes event + + await new Promise(process.nextTick); // wait for any async operations to finish + expect( + mockNotificationService.triggerNotification, + ).not.toHaveBeenCalledWith('report1'); + expect(mockNotificationService.triggerNotification).toHaveBeenCalledWith( + 'report2', + ); + }); +}); diff --git a/src/report-changes/core/report-changes.service.ts b/src/report-changes/core/report-changes.service.ts new file mode 100644 index 0000000..11a10c7 --- /dev/null +++ b/src/report-changes/core/report-changes.service.ts @@ -0,0 +1,170 @@ +import { Injectable } from '@nestjs/common'; +import { EntityDoc, ReportChangeDetector } from './report-change-detector'; +import { NotificationService } from '../../notification/core/notification.service'; +import { Reference } from '../../domain/reference'; +import { ReportDataChangeEvent } from '../../domain/report-data-change-event'; +import { ReportCalculationOutcomeSuccess } from '../../domain/report-calculation'; +import { Report } from '../../domain/report'; +import { CouchdbChangesService } from '../storage/couchdb-changes.service'; +import { DefaultReportStorage } from '../../report/storage/report-storage.service'; +import { filter, map, mergeAll, Observable, switchMap, tap, zip } from 'rxjs'; +import { + CreateReportCalculationFailed, + CreateReportCalculationUseCase, +} from '../../report/core/use-cases/create-report-calculation-use-case.service'; +import { DatabaseChangeResult } from '../storage/database-changes.service'; + +@Injectable() +export class ReportChangesService { + private reportMonitors = new Map(); + + constructor( + private notificationService: NotificationService, + private reportStorage: DefaultReportStorage, + private couchdbChangesRepository: CouchdbChangesService, + private createReportCalculation: CreateReportCalculationUseCase, + ) { + this.notificationService + .activeReports() + .subscribe((reports: Reference[]) => { + reports.forEach((r) => this.registerReportMonitoring(r)); + for (const [id, monitor] of this.reportMonitors.entries()) { + if (!reports.some((r) => r.id === id)) { + this.reportMonitors.delete(id); + } + } + }); + + this.monitorCouchDbChanges(); + } + + async registerReportMonitoring(report: Reference) { + if (!this.reportMonitors.has(report.id)) { + this.setReportMonitor(report); + } + } + + private setReportMonitor(report: Reference) { + this.reportStorage + .fetchReport(report) + .subscribe((report: Report | undefined) => { + if (!report) { + return; + } + + this.reportMonitors.set(report.id, new ReportChangeDetector(report)); + }); + } + + private checkReportConfigUpdate(change: DatabaseChangeResult) { + if (this.reportMonitors.has(change.id)) { + this.setReportMonitor(new Reference(change.id)); + return; + } + + // TODO: reportId should in future be without prefix, probably? + // (then remove to fallback code above) + const id = change.id.split(':'); + if ( + id.length === 2 && + id[0] === 'ReportConfig' && + this.reportMonitors.has(id[1]) + ) { + this.setReportMonitor(new Reference(change.id)); + } + } + + monitorCouchDbChanges() { + this.couchdbChangesRepository + .subscribeToAllNewChanges() + .pipe( + mergeAll(), + tap((change: DatabaseChangeResult) => + this.checkReportConfigUpdate(change), + ), + map((c: DatabaseChangeResult) => this.getChangeDetails(c)), + switchMap((change: DocChangeDetails) => + this.changeIsAffectingReport(change), + ), + // TODO: collect a batch of changes for a while before checking? + ) + .subscribe((affectedReports: ReportDataChangeEvent[]) => { + affectedReports.forEach((event) => { + this.notificationService.triggerNotification(event); + console.log('Report change detected:', event); + }); + }); + } + + /** + * Load current and previous doc for advanced change detection across all reports. + * @param change + * @private + */ + private getChangeDetails(change: DatabaseChangeResult): DocChangeDetails { + // TODO: storage to get any doc from DB (for a _rev also!) + // until then, only the .change with the id can be used in ReportChangeDetector + // can also use ?include_docs=true in the changes request to get the latest doc + + return { + change: change, + previous: { _id: '' }, // cache this here to avoid requests? + new: { _id: '' }, + }; + } + + private changeIsAffectingReport( + docChange: DocChangeDetails, + ): Observable { + const affectedReports: Observable[] = []; + + for (const [reportId, changeDetector] of this.reportMonitors.entries()) { + if (!changeDetector.affectsReport(docChange)) { + continue; + } + + const reportChangeEventObservable = this.createReportCalculation + .startReportCalculation(changeDetector.report) + .pipe( + switchMap((outcome) => { + if (outcome instanceof CreateReportCalculationFailed) { + // TODO: what do we do here in case of failure? + throw new Error('Report calculation failed'); + } + + return this.createReportCalculation.getCompletedReportCalculation( + new Reference(outcome.result.id), + ); + }), + filter( + (calcUpdate) => + (calcUpdate.outcome as ReportCalculationOutcomeSuccess) + ?.result_hash !== changeDetector.lastCalculationHash, + ), + tap( + (calcUpdate) => + (changeDetector.lastCalculationHash = ( + calcUpdate.outcome as ReportCalculationOutcomeSuccess + )?.result_hash), + ), + map( + (result) => + ({ + report: result.report, + calculation: result, + } as ReportDataChangeEvent), + ), + ); + + affectedReports.push(reportChangeEventObservable); + } + + return zip(affectedReports); + } +} + +export interface DocChangeDetails { + change: DatabaseChangeResult; + previous: EntityDoc; + new: EntityDoc; +} diff --git a/src/report-changes/report-changes.module.ts b/src/report-changes/report-changes.module.ts new file mode 100644 index 0000000..cb8b0f9 --- /dev/null +++ b/src/report-changes/report-changes.module.ts @@ -0,0 +1,20 @@ +import { Module } from '@nestjs/common'; +import { ReportChangesService } from './core/report-changes.service'; +import { CouchdbChangesService } from './storage/couchdb-changes.service'; +import { NotificationModule } from '../notification/notification.module'; +import { ReportModule } from '../report/report.module'; +import { CouchDbClient } from '../couchdb/couch-db-client.service'; +import { HttpModule } from '@nestjs/axios'; +import { TestController } from './test-controller'; + +@Module({ + controllers: [TestController], + imports: [NotificationModule, ReportModule, HttpModule], + providers: [ + ReportChangesService, + CouchdbChangesService, + CouchDbClient, // TODO: pack this into a CouchDbModule together with HttpModule import etc. + ], + exports: [ReportChangesService], +}) +export class ReportChangesModule {} diff --git a/src/report-changes/storage/couchdb-changes.service.spec.ts b/src/report-changes/storage/couchdb-changes.service.spec.ts new file mode 100644 index 0000000..945ca46 --- /dev/null +++ b/src/report-changes/storage/couchdb-changes.service.spec.ts @@ -0,0 +1,156 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { CouchdbChangesService } from './couchdb-changes.service'; +import { HttpModule } from '@nestjs/axios'; +import { ConfigService } from '@nestjs/config'; +import { CouchDbClient } from '../../couchdb/couch-db-client.service'; +import { finalize, of } from 'rxjs'; +import { CouchDbChangesResponse } from '../../couchdb/dtos'; +import { DatabaseChangeResult } from './database-changes.service'; + +describe('CouchdbChangesService', () => { + let service: CouchdbChangesService; + + let mockCouchdbChanges: jest.Mock; + + let changesRequestCounter = 0; + let mockedLastSeq = 0; + + function simulateNextDbChange() { + mockedLastSeq++; + } + + beforeEach(async () => { + changesRequestCounter = 0; + mockCouchdbChanges = jest.fn().mockImplementation(() => { + changesRequestCounter++; + return of({ + last_seq: mockedLastSeq.toString(), + pending: 0, + results: [], + } as CouchDbChangesResponse); + }); + + const module: TestingModule = await Test.createTestingModule({ + imports: [HttpModule], + providers: [ + CouchdbChangesService, + { provide: CouchDbClient, useValue: { changes: mockCouchdbChanges } }, + { + provide: ConfigService, + useValue: { + getOrThrow: jest.fn(() => { + return 'foo'; + }), + }, + }, + ], + }).compile(); + + service = module.get(CouchdbChangesService); + }); + + beforeEach(() => { + jest.useFakeTimers(); + }); + afterEach(() => { + jest.runOnlyPendingTimers(); + jest.useRealTimers(); + }); + + it('should keep polling changes until client unsubscribes', () => { + const newChangesReceived: DatabaseChangeResult[][] = []; + + const changes$ = service + .subscribeToAllNewChanges() + .subscribe((r) => newChangesReceived.push(r)); + expect(newChangesReceived.length).toBe(1); + + simulateNextDbChange(); + jest.runOnlyPendingTimers(); // simulate next polling request after 60s + expect(newChangesReceived.length).toBe(2); + expect(changesRequestCounter).toBe(2); + + changes$.unsubscribe(); + + changesRequestCounter = 0; + simulateNextDbChange(); + jest.runOnlyPendingTimers(); + jest.runOnlyPendingTimers(); + // no new calls to have been made: + expect(changesRequestCounter).toBe(0); + }); + + it('should reuse existing polling for additional subscribers', () => { + let rec1: DatabaseChangeResult[][] = []; + let rec2: DatabaseChangeResult[][] = []; + + function resetCounters() { + changesRequestCounter = 0; + rec1 = []; + rec2 = []; + } + + const sub1 = service + .subscribeToAllNewChanges() + .subscribe((r) => rec1.push(r)); + expect(rec1.length).toBe(1); + + const sub2 = service + .subscribeToAllNewChanges() + .subscribe((r) => rec2.push(r)); + expect(rec2.length).toBe(1); + expect(rec1.length).toBe(1); + + resetCounters(); + simulateNextDbChange(); + jest.runOnlyPendingTimers(); + // only one request for both subscribers + expect(changesRequestCounter).toBe(1); + expect(rec1.length).toEqual(1); + expect(rec1).toEqual(rec2); + + sub1.unsubscribe(); + resetCounters(); + simulateNextDbChange(); + jest.runOnlyPendingTimers(); + expect(changesRequestCounter).toBe(1); + expect(rec2.length).toBe(1); + + sub2.unsubscribe(); + resetCounters(); + simulateNextDbChange(); + jest.runOnlyPendingTimers(); + jest.runOnlyPendingTimers(); + // no new calls to have been made: + expect(changesRequestCounter).toBe(0); + }); + + it('should fetch all pending', (done) => { + const received: CouchDbChangesResponse[] = []; + mockCouchdbChanges + .mockReturnValueOnce( + of({ + last_seq: '1', + pending: 1, + results: [], + } as CouchDbChangesResponse), + ) + .mockReturnValueOnce( + of({ + last_seq: '2', + pending: 0, + results: [], + } as CouchDbChangesResponse), + ); + + service + .fetchChanges('-1', true) + .pipe( + finalize(() => { + expect(received.length).toBe(2); + done(); + }), + ) + .subscribe((r) => received.push(r)); + }); +}); diff --git a/src/report-changes/storage/couchdb-changes.service.ts b/src/report-changes/storage/couchdb-changes.service.ts new file mode 100644 index 0000000..a959705 --- /dev/null +++ b/src/report-changes/storage/couchdb-changes.service.ts @@ -0,0 +1,131 @@ +import { + ForbiddenException, + Injectable, + NotFoundException, + UnauthorizedException, +} from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { + catchError, + EMPTY, + expand, + filter, + finalize, + map, + mergeMap, + Observable, + of, + repeat, + ReplaySubject, + Subscription, + tap, +} from 'rxjs'; +import { CouchDbClient } from '../../couchdb/couch-db-client.service'; +import { CouchDbChangesResponse } from '../../couchdb/dtos'; +import { + DatabaseChangeResult, + DatabaseChangesService, +} from './database-changes.service'; + +/** + * Access _changes from a CouchDB + */ +@Injectable() +export class CouchdbChangesService extends DatabaseChangesService { + // TODO: centralize this config by refactoring couchdbClient and providing configured clients through DI + // TODO: check if this is the correct db for our changes from app + private dbUrl: string = this.configService.getOrThrow('DATABASE_URL'); + private databaseName = 'app'; // TODO: move to config and clean up .env, clarifying different DBs there + private databaseUser: string = this.configService.getOrThrow('DATABASE_USER'); + private databasePassword: string = + this.configService.getOrThrow('DATABASE_PASSWORD'); + + private authHeaderValue: string; + + constructor( + private couchdbClient: CouchDbClient, + private configService: ConfigService, + ) { + super(); + const authHeader = Buffer.from( + `${this.databaseUser}:${this.databasePassword}`, + ).toString('base64'); + this.authHeaderValue = `Basic ${authHeader}`; + } + + private changesSubj = new ReplaySubject(1); + private changesSubscription: Subscription | undefined; + + subscribeToAllNewChanges(): Observable { + if (!this.changesSubscription) { + let lastSeq = 'now'; + const changesFeed = of({}).pipe( + mergeMap(() => this.fetchChanges(lastSeq, true)), + filter((res) => res.last_seq !== lastSeq), + tap((res) => (lastSeq = res.last_seq)), + // poll regularly to get latest changes + repeat({ delay: 10000 }), + tap((res) => console.log(res)), + ); + + this.changesSubscription = changesFeed + .pipe(map((res) => res.results)) + .subscribe(this.changesSubj); + } + + return this.changesSubj.asObservable().pipe( + finalize(() => { + if (!this.changesSubj.observed) { + // stop polling + this.changesSubscription?.unsubscribe(); + this.changesSubscription = undefined; + } + }), + ); + } + + /** + * Get the changes since the given sequence number + * + * @param since The sequence number to start from (optional, if not given start from now only) + * @param getAllPending Whether to trigger multiple requests and emit multiple values before completing, in case the first request has more pending changes + */ + fetchChanges( + since = 'now', + getAllPending = false, + ): Observable { + return this.couchdbClient + .changes(this.dbUrl, this.databaseName, { + params: { + since: since, + }, + headers: { + Authorization: this.authHeaderValue, + }, + }) + .pipe( + // get all changes, if necessary in multiple requests: + expand((res) => + getAllPending && res.pending > 0 + ? this.fetchChanges(res.last_seq) + : EMPTY, + ), + catchError((err, caught) => { + this.handleError(err); + throw caught; + }), + ); + } + + private handleError(err: any) { + if (err.response.status === 401) { + throw new UnauthorizedException(); + } + if (err.response.status === 403) { + throw new ForbiddenException(); + } + if (err.response.status === 404) { + throw new NotFoundException(); + } + } +} diff --git a/src/report-changes/storage/database-changes.service.ts b/src/report-changes/storage/database-changes.service.ts new file mode 100644 index 0000000..dab7679 --- /dev/null +++ b/src/report-changes/storage/database-changes.service.ts @@ -0,0 +1,12 @@ +import { Observable } from 'rxjs'; +import { CouchDbChangeResult } from '../../couchdb/dtos'; + +/** + * Provides access to a stream of document changes for a database. + */ +export abstract class DatabaseChangesService { + abstract subscribeToAllNewChanges(): Observable; +} + +// TODO: don't expose the CouchDb specific changes interface, map this to domain interfaces (maybe use DocChangeDetails)? +export type DatabaseChangeResult = CouchDbChangeResult; diff --git a/src/report-changes/test-controller.ts b/src/report-changes/test-controller.ts new file mode 100644 index 0000000..25dd530 --- /dev/null +++ b/src/report-changes/test-controller.ts @@ -0,0 +1,20 @@ +import { Controller, Get } from '@nestjs/common'; +import { ReportChangesService } from './core/report-changes.service'; +import { Reference } from '../domain/reference'; +import { NotificationService } from '../notification/core/notification.service'; + +// TODO: remove as soon as webhooks are implemented! +@Controller('/test') +export class TestController { + constructor( + private changeDetectionService: ReportChangesService, + private notificationService: NotificationService, + ) {} + + @Get('/register') + register() { + return this.changeDetectionService + .registerReportMonitoring(new Reference('ReportConfig:1')) + .catch((e) => console.log(e)); + } +} diff --git a/src/report/controller/report-calculation.controller.spec.ts b/src/report/controller/report-calculation.controller.spec.ts index 315363d..a9fefa1 100644 --- a/src/report/controller/report-calculation.controller.spec.ts +++ b/src/report/controller/report-calculation.controller.spec.ts @@ -7,6 +7,7 @@ import { ReportController } from './report.controller'; import { ReportCalculationRepository } from '../repository/report-calculation-repository.service'; import { ReportRepository } from '../repository/report-repository.service'; import { ConfigService } from '@nestjs/config'; +import { CreateReportCalculationUseCase } from '../core/use-cases/create-report-calculation-use-case.service'; describe('ReportCalculationController', () => { let controller: ReportCalculationController; @@ -21,6 +22,7 @@ describe('ReportCalculationController', () => { ReportController, ReportCalculationRepository, ReportRepository, + CreateReportCalculationUseCase, { provide: ConfigService, useValue: { diff --git a/src/report/controller/report-calculation.controller.ts b/src/report/controller/report-calculation.controller.ts index 5f72bf2..9b27412 100644 --- a/src/report/controller/report-calculation.controller.ts +++ b/src/report/controller/report-calculation.controller.ts @@ -2,6 +2,7 @@ import { Controller, Get, Headers, + InternalServerErrorException, NotFoundException, Param, Post, @@ -11,33 +12,39 @@ import { map, Observable, switchMap } from 'rxjs'; import { ReportCalculation } from '../../domain/report-calculation'; import { Reference } from '../../domain/reference'; import { ReportData } from '../../domain/report-data'; -import { v4 as uuidv4 } from 'uuid'; +import { + CreateReportCalculationFailed, + CreateReportCalculationUseCase, +} from '../core/use-cases/create-report-calculation-use-case.service'; @Controller('/api/v1/reporting') export class ReportCalculationController { - constructor(private reportStorage: DefaultReportStorage) {} + constructor( + private reportStorage: DefaultReportStorage, + private createReportCalculation: CreateReportCalculationUseCase, + ) {} @Post('/report-calculation/report/:reportId') startCalculation( @Headers('Authorization') token: string, @Param('reportId') reportId: string, ): Observable { - return this.reportStorage.fetchReport(token, new Reference(reportId)).pipe( + return this.reportStorage.fetchReport(new Reference(reportId), token).pipe( switchMap((value) => { if (!value) { throw new NotFoundException(); } - return this.reportStorage - .storeCalculation( - new ReportCalculation( - `ReportCalculation:${uuidv4()}`, - new Reference(reportId), - ), - ) - .pipe( - map((reportCalculation) => new Reference(reportCalculation.id)), - ); + return this.createReportCalculation.startReportCalculation(value).pipe( + map((outcome) => { + if (outcome instanceof CreateReportCalculationFailed) { + // TODO: other error codes? + throw new InternalServerErrorException(); + } + + return new Reference(outcome.result.id); + }), + ); }), ); } @@ -64,7 +71,7 @@ export class ReportCalculationController { } return this.reportStorage - .fetchReport(token, new Reference(calculation.report.id)) + .fetchReport(new Reference(calculation.report.id), token) .pipe( map((report) => { if (!report) { diff --git a/src/report/controller/report.controller.ts b/src/report/controller/report.controller.ts index 7f890c8..2f8a47d 100644 --- a/src/report/controller/report.controller.ts +++ b/src/report/controller/report.controller.ts @@ -33,9 +33,10 @@ export class ReportController { @Param('reportId') reportId: string, ): Observable { return this.reportStorage - .fetchReport(token, new Reference(reportId)) - .pipe(switchMap((report) => this.getReportDto(report))); + .fetchReport(new Reference(reportId), token) + .pipe(switchMap((report) => this.getReportDto(report as any))); // TODO: fix for undefined report } + private getReportDto(report: Report): Observable { return this.reportStorage .isCalculationOngoing(new Reference(report.id)) diff --git a/src/report/core/report-storage.ts b/src/report/core/report-storage.ts index b969162..a889c1c 100644 --- a/src/report/core/report-storage.ts +++ b/src/report/core/report-storage.ts @@ -1,6 +1,6 @@ import { Reference } from '../../domain/reference'; import { Report } from '../../domain/report'; -import { Observable } from 'rxjs'; +import { Observable, Subject } from 'rxjs'; import { ReportCalculation } from '../../domain/report-calculation'; import { ReportData } from '../../domain/report-data'; @@ -8,8 +8,8 @@ export interface ReportStorage { fetchAllReports(authToken: string, mode: string): Observable; fetchReport( - authToken: string, reportRef: Reference, + authToken?: string | undefined, ): Observable; fetchPendingCalculations(): Observable; @@ -27,4 +27,6 @@ export interface ReportStorage { fetchData(runRef: Reference): Observable; isCalculationOngoing(reportRef: Reference): Observable; + + reportCalculationUpdated: Subject; } diff --git a/src/report/core/use-cases/create-report-calculation-use-case.service.spec.ts b/src/report/core/use-cases/create-report-calculation-use-case.service.spec.ts new file mode 100644 index 0000000..ce801f2 --- /dev/null +++ b/src/report/core/use-cases/create-report-calculation-use-case.service.spec.ts @@ -0,0 +1,24 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { CreateReportCalculationUseCase } from './create-report-calculation-use-case.service'; +import { DefaultReportStorage } from '../../storage/report-storage.service'; + +describe('CreateReportCalculationUseCaseService', () => { + let service: CreateReportCalculationUseCase; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [ + CreateReportCalculationUseCase, + { provide: DefaultReportStorage, useValue: {} }, + ], + }).compile(); + + service = module.get( + CreateReportCalculationUseCase, + ); + }); + + it('should be defined', () => { + expect(service).toBeDefined(); + }); +}); diff --git a/src/report/core/use-cases/create-report-calculation-use-case.service.ts b/src/report/core/use-cases/create-report-calculation-use-case.service.ts new file mode 100644 index 0000000..63c9124 --- /dev/null +++ b/src/report/core/use-cases/create-report-calculation-use-case.service.ts @@ -0,0 +1,74 @@ +import { Injectable } from '@nestjs/common'; +import { Reference } from '../../../domain/reference'; +import { filter, map, merge, Observable, take } from 'rxjs'; +import { + ReportCalculation, + ReportCalculationStatus, +} from '../../../domain/report-calculation'; +import { v4 as uuidv4 } from 'uuid'; +import { Report } from '../../../domain/report'; +import { DefaultReportStorage } from '../../storage/report-storage.service'; + +@Injectable() +export class CreateReportCalculationUseCase { + constructor(private reportStorage: DefaultReportStorage) {} + + startReportCalculation( + report: Report, + ): Observable { + const calculation = new ReportCalculation( + `ReportCalculation:${uuidv4()}`, + new Reference(report.id), + ); + return this.reportStorage + .storeCalculation(calculation) + .pipe( + map((calculation) => new CreateReportCalculationSuccess(calculation)), + ); + } + + getCompletedReportCalculation( + reportCalculation: Reference, + ): Observable { + return merge( + this.reportStorage.fetchCalculation(reportCalculation).pipe( + map((calc) => { + if (!calc) { + throw new Error('Report calculation not found'); + // TODO: can this really return undefined? Looks like it would throw instead (which seems a good way to handle it to me) + } + return calc as ReportCalculation; + }), + ), + this.reportStorage.reportCalculationUpdated, + ).pipe( + filter((calcUpdate) => calcUpdate?.id === reportCalculation.id), + filter( + (calcUpdate) => + calcUpdate?.status === ReportCalculationStatus.FINISHED_SUCCESS || + calcUpdate?.status === ReportCalculationStatus.FINISHED_ERROR, + ), + take(1), + ); + } +} + +export type CreateReportCalculationOutcome = + | CreateReportCalculationSuccess + | CreateReportCalculationFailed; + +export class CreateReportCalculationSuccess { + constructor(public result: ReportCalculation) {} +} + +export class CreateReportCalculationFailed { + constructor( + public errorMessage: string, + public errorCode: CreateReportCalculationError, + public error?: any, + ) {} +} + +export enum CreateReportCalculationError { + NotImplemented, +} diff --git a/src/report/report.module.ts b/src/report/report.module.ts index 53cc2e0..b916f34 100644 --- a/src/report/report.module.ts +++ b/src/report/report.module.ts @@ -9,6 +9,7 @@ import { ReportCalculationTask } from './tasks/report-calculation-task.service'; import { ReportCalculationProcessor } from './tasks/report-calculation-processor.service'; import { SqsReportCalculator } from './core/sqs-report-calculator.service'; import { CouchDbClient } from '../couchdb/couch-db-client.service'; +import { CreateReportCalculationUseCase } from './core/use-cases/create-report-calculation-use-case.service'; @Module({ controllers: [ReportController, ReportCalculationController], @@ -21,6 +22,8 @@ import { CouchDbClient } from '../couchdb/couch-db-client.service'; ReportCalculationProcessor, SqsReportCalculator, CouchDbClient, + CreateReportCalculationUseCase, ], + exports: [DefaultReportStorage, CreateReportCalculationUseCase], }) export class ReportModule {} diff --git a/src/report/repository/report-repository.service.ts b/src/report/repository/report-repository.service.ts index d4955cc..17f2c75 100644 --- a/src/report/repository/report-repository.service.ts +++ b/src/report/repository/report-repository.service.ts @@ -47,7 +47,9 @@ export class ReportRepository { this.authHeaderValue = `Basic ${authHeader}`; } - fetchReports(authToken: string): Observable { + fetchReports( + authToken: string = this.authHeaderValue, + ): Observable { return this.http .get(`${this.dbUrl}/app/_all_docs`, { params: { @@ -68,7 +70,10 @@ export class ReportRepository { ); } - fetchReport(authToken: string, reportId: string): Observable { + fetchReport( + reportId: string, + authToken: string = this.authHeaderValue, + ): Observable { return this.http .get(`${this.dbUrl}/app/${reportId}`, { headers: { diff --git a/src/report/storage/report-storage.service.ts b/src/report/storage/report-storage.service.ts index d885bbe..254a654 100644 --- a/src/report/storage/report-storage.service.ts +++ b/src/report/storage/report-storage.service.ts @@ -2,7 +2,7 @@ import { Reference } from '../../domain/reference'; import { Report } from '../../domain/report'; import { ReportStorage } from '../core/report-storage'; import { ReportRepository } from '../repository/report-repository.service'; -import { map, Observable, switchMap } from 'rxjs'; +import { map, Observable, Subject, switchMap, tap } from 'rxjs'; import { Injectable, NotFoundException } from '@nestjs/common'; import { ReportCalculation, @@ -21,6 +21,8 @@ export class DefaultReportStorage implements ReportStorage { private reportCalculationRepository: ReportCalculationRepository, ) {} + reportCalculationUpdated = new Subject(); + fetchAllReports(authToken: string, mode = 'sql'): Observable { return this.reportRepository.fetchReports(authToken).pipe( map((response) => { @@ -31,7 +33,11 @@ export class DefaultReportStorage implements ReportStorage { return response.rows .filter((row) => row.doc.mode === mode) .map((reportEntity) => - new Report(reportEntity.id, reportEntity.doc.title).setSchema({ + new Report( + reportEntity.id, + reportEntity.doc.title, + reportEntity.doc.aggregationDefinitions, + ).setSchema({ fields: reportEntity.doc.aggregationDefinitions, // todo generate actual fields here }), ); @@ -39,11 +45,18 @@ export class DefaultReportStorage implements ReportStorage { ); } - fetchReport(authToken: string, reportRef: Reference): Observable { - return this.reportRepository.fetchReport(authToken, reportRef.id).pipe( - map((reportDoc) => { - return new Report(reportDoc._id, reportDoc.title).setSchema({ - fields: reportDoc.aggregationDefinitions, // todo generate actual fields here + fetchReport( + reportRef: Reference, + authToken?: string | undefined, + ): Observable { + return this.reportRepository.fetchReport(reportRef.id, authToken).pipe( + map((report) => { + return new Report( + report._id, + report.title, + report.aggregationDefinitions, + ).setSchema({ + fields: report.aggregationDefinitions, // todo generate actual fields here }); }), ); @@ -126,6 +139,7 @@ export class DefaultReportStorage implements ReportStorage { return value; } }), + tap((calculation) => this.reportCalculationUpdated.next(calculation)), ); }