From ba797bbaddd3d3a12ad98cabafbc6ef950ded77d Mon Sep 17 00:00:00 2001 From: Tom Winter Date: Tue, 13 Aug 2024 14:22:57 +0200 Subject: [PATCH] feat: prevent new ReportCalculation if one is still pending (#32) --- .../aam-backend-service/build.gradle.kts | 1 + .../changes/jobs/CouchDbChangeDetectionJob.kt | 2 +- .../queue/DefaultChangeEventPublisher.kt | 5 +- .../core/DefaultNotificationEventPublisher.kt | 3 +- ...efaultReportDocumentChangeEventConsumer.kt | 42 ++++--- .../DefaultCreateReportCalculationUseCase.kt | 26 ++++- .../DefaultReportCalculationChangeUseCase.kt | 5 +- .../storage/DefaultReportingStorage.kt | 1 - .../src/main/resources/application.yaml | 6 +- ...ltReportDocumentChangeEventConsumerTest.kt | 103 ++++++++++++++++++ ...faultCreateReportCalculationUseCaseTest.kt | 63 +++++++++++ 11 files changed, 224 insertions(+), 33 deletions(-) create mode 100644 application/aam-backend-service/src/test/kotlin/com/aamdigital/aambackendservice/reporting/report/core/DefaultReportDocumentChangeEventConsumerTest.kt create mode 100644 application/aam-backend-service/src/test/kotlin/com/aamdigital/aambackendservice/reporting/reportcalculation/core/DefaultCreateReportCalculationUseCaseTest.kt diff --git a/application/aam-backend-service/build.gradle.kts b/application/aam-backend-service/build.gradle.kts index 3fe25c5..bd0afec 100644 --- a/application/aam-backend-service/build.gradle.kts +++ b/application/aam-backend-service/build.gradle.kts @@ -63,6 +63,7 @@ dependencies { testImplementation("io.cucumber:cucumber-spring:7.14.0") testImplementation("org.junit.vintage:junit-vintage-engine:5.10.2") + testImplementation("org.mockito.kotlin:mockito-kotlin:5.4.0") testImplementation("org.junit.jupiter:junit-jupiter-engine:5.10.2") testImplementation("io.projectreactor:reactor-test") diff --git a/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/changes/jobs/CouchDbChangeDetectionJob.kt b/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/changes/jobs/CouchDbChangeDetectionJob.kt index de65493..0dbb6f6 100644 --- a/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/changes/jobs/CouchDbChangeDetectionJob.kt +++ b/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/changes/jobs/CouchDbChangeDetectionJob.kt @@ -17,7 +17,7 @@ class CouchDbChangeDetectionJob( private var MAX_ERROR_COUNT: Int = 5 } - @Scheduled(fixedDelay = 15000) + @Scheduled(fixedDelay = 8000) fun checkForCouchDbChanges() { if (ERROR_COUNTER >= MAX_ERROR_COUNT) { return diff --git a/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/changes/queue/DefaultChangeEventPublisher.kt b/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/changes/queue/DefaultChangeEventPublisher.kt index 7e3d642..6a3569c 100644 --- a/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/changes/queue/DefaultChangeEventPublisher.kt +++ b/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/changes/queue/DefaultChangeEventPublisher.kt @@ -7,7 +7,6 @@ import com.aamdigital.aambackendservice.reporting.changes.core.ChangeEventPublis import com.aamdigital.aambackendservice.reporting.domain.event.DatabaseChangeEvent import com.aamdigital.aambackendservice.reporting.domain.event.DocumentChangeEvent import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import org.slf4j.LoggerFactory import org.springframework.amqp.AmqpException import org.springframework.amqp.rabbit.core.RabbitTemplate @@ -50,7 +49,7 @@ class DefaultChangeEventPublisher( logger.trace( "[DefaultDatabaseChangeEventPublisher]: publish message to channel '{}' Payload: {}", channel, - jacksonObjectMapper().writeValueAsString(message) + objectMapper.writeValueAsString(message) ) return message } @@ -82,7 +81,7 @@ class DefaultChangeEventPublisher( logger.trace( "[DefaultDocumentChangeEventPublisher]: publish message to channel '{}' Payload: {}", exchange, - jacksonObjectMapper().writeValueAsString(message) + objectMapper.writeValueAsString(message) ) return message } diff --git a/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/notification/core/DefaultNotificationEventPublisher.kt b/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/notification/core/DefaultNotificationEventPublisher.kt index 358705b..9e35517 100644 --- a/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/notification/core/DefaultNotificationEventPublisher.kt +++ b/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/notification/core/DefaultNotificationEventPublisher.kt @@ -5,7 +5,6 @@ import com.aamdigital.aambackendservice.error.InternalServerException import com.aamdigital.aambackendservice.queue.core.QueueMessage import com.aamdigital.aambackendservice.reporting.domain.event.NotificationEvent import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import org.slf4j.LoggerFactory import org.springframework.amqp.AmqpException import org.springframework.amqp.rabbit.core.RabbitTemplate @@ -48,7 +47,7 @@ class DefaultNotificationEventPublisher( logger.trace( "[DefaultNotificationEventPublisher]: publish message to channel '{}' Payload: {}", channel, - jacksonObjectMapper().writeValueAsString(message) + objectMapper.writeValueAsString(message) ) return message diff --git a/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/report/core/DefaultReportDocumentChangeEventConsumer.kt b/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/report/core/DefaultReportDocumentChangeEventConsumer.kt index b278333..6b2868e 100644 --- a/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/report/core/DefaultReportDocumentChangeEventConsumer.kt +++ b/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/report/core/DefaultReportDocumentChangeEventConsumer.kt @@ -26,7 +26,10 @@ class DefaultReportDocumentChangeEventConsumer( @RabbitListener( queues = [ReportQueueConfiguration.DOCUMENT_CHANGES_REPORT_QUEUE], - ackMode = "MANUAL" + ackMode = "MANUAL", + // avoid concurrent processing so that we do not trigger multiple calculations for same data unnecessarily + concurrency = "1-1", + batch = "1" ) override fun consume(rawMessage: String, message: Message, channel: Channel): Mono { val type = try { @@ -37,13 +40,13 @@ class DefaultReportDocumentChangeEventConsumer( when (type.qualifiedName) { DocumentChangeEvent::class.qualifiedName -> { - val payload = messageParser.getPayload( + val payload: DocumentChangeEvent = messageParser.getPayload( body = rawMessage.toByteArray(), kClass = DocumentChangeEvent::class ) if (payload.documentId.startsWith("ReportConfig:")) { - logger.info(payload.toString()) + logger.trace(payload.toString()) // todo if aggregationDefinition is different, skip trigger ReportCalculation @@ -70,19 +73,22 @@ class DefaultReportDocumentChangeEventConsumer( documentChangeEvent = payload ) .flatMap { affectedReports -> - Mono.zip( - affectedReports.map { report -> - createReportCalculationUseCase - .createReportCalculation( - request = CreateReportCalculationRequest( - report = report, - args = mutableMapOf() - ) + Mono.zip(affectedReports.map { report -> + createReportCalculationUseCase + .createReportCalculation( + request = CreateReportCalculationRequest( + report = report, + args = mutableMapOf() ) - } - ) { it.map { } } + ) + }) { + it.iterator() + } + } + .flatMap { Mono.empty() } + .doOnError { + logger.error(it.localizedMessage) } - .flatMap { Mono.empty() } } @@ -92,9 +98,11 @@ class DefaultReportDocumentChangeEventConsumer( type.qualifiedName, ) - throw AmqpRejectAndDontRequeueException( - "[NO_USECASE_CONFIGURED] Could not found matching use case for: ${type.qualifiedName}", - ) + return Mono.error { + throw AmqpRejectAndDontRequeueException( + "[NO_USECASE_CONFIGURED] Could not find matching use case for: ${type.qualifiedName}", + ) + } } } } diff --git a/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/reportcalculation/core/DefaultCreateReportCalculationUseCase.kt b/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/reportcalculation/core/DefaultCreateReportCalculationUseCase.kt index c9bc301..3775872 100644 --- a/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/reportcalculation/core/DefaultCreateReportCalculationUseCase.kt +++ b/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/reportcalculation/core/DefaultCreateReportCalculationUseCase.kt @@ -21,9 +21,26 @@ class DefaultCreateReportCalculationUseCase( args = request.args ) - return reportingStorage.storeCalculation(calculation) - .map { - handleResponse(it) + return reportingStorage.fetchCalculations(request.report) + .flatMap { reportCalculations -> + val i = reportCalculations.filter { reportCalculation -> + reportCalculation.status == ReportCalculationStatus.PENDING && + reportCalculation.args == calculation.args + } + + if (i.isNotEmpty()) { + Mono.just( + CreateReportCalculationResult.Success( + DomainReference( + id = i.first().id + ) + ) + ) + } else { + reportingStorage.storeCalculation(calculation).map { + handleResponse(it) + } + } } .onErrorResume { handleError(it) @@ -39,8 +56,7 @@ class DefaultCreateReportCalculationUseCase( private fun handleError(it: Throwable): Mono { return Mono.just( CreateReportCalculationResult.Failure( - errorCode = CreateReportCalculationResult.ErrorCode.INTERNAL_SERVER_ERROR, - cause = it + errorCode = CreateReportCalculationResult.ErrorCode.INTERNAL_SERVER_ERROR, cause = it ) ) } diff --git a/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/reportcalculation/core/DefaultReportCalculationChangeUseCase.kt b/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/reportcalculation/core/DefaultReportCalculationChangeUseCase.kt index 51758ef..68cb873 100644 --- a/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/reportcalculation/core/DefaultReportCalculationChangeUseCase.kt +++ b/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/reportcalculation/core/DefaultReportCalculationChangeUseCase.kt @@ -36,11 +36,10 @@ class DefaultReportCalculationChangeUseCase( .sortedBy { it.calculationCompleted } } .flatMap { - val existingDigest = it.last().attachments["data.json"]?.digest + val existingDigest = it.lastOrNull()?.attachments?.get("data.json")?.digest val currentDigest = currentReportCalculation.attachments["data.json"]?.digest - if (it.isEmpty() - || existingDigest != currentDigest + if (existingDigest != currentDigest ) { notificationService.sendNotifications( report = currentReportCalculation.report, diff --git a/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/storage/DefaultReportingStorage.kt b/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/storage/DefaultReportingStorage.kt index 965861c..f7cb4ce 100644 --- a/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/storage/DefaultReportingStorage.kt +++ b/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/storage/DefaultReportingStorage.kt @@ -101,6 +101,5 @@ class DefaultReportingStorage( calculationCompleted = entity.doc.calculationCompleted, args = entity.doc.args, attachments = entity.doc.attachments - ) } diff --git a/application/aam-backend-service/src/main/resources/application.yaml b/application/aam-backend-service/src/main/resources/application.yaml index d865b84..55c61b6 100644 --- a/application/aam-backend-service/src/main/resources/application.yaml +++ b/application/aam-backend-service/src/main/resources/application.yaml @@ -10,6 +10,10 @@ spring: url: r2dbc:h2:file://././data/dbh2;DB_CLOSE_DELAY=-1 username: local password: local + rabbitmq: + listener: + simple: + prefetch: 1 management: endpoint: @@ -27,7 +31,7 @@ sqs-client-configuration: max-in-memory-size-in-mega-bytes: 16 couch-db-client-configuration: - max-in-memory-size-in-mega-bytes: 16 + max-in-memory-size-in-mega-bytes: 64 --- diff --git a/application/aam-backend-service/src/test/kotlin/com/aamdigital/aambackendservice/reporting/report/core/DefaultReportDocumentChangeEventConsumerTest.kt b/application/aam-backend-service/src/test/kotlin/com/aamdigital/aambackendservice/reporting/report/core/DefaultReportDocumentChangeEventConsumerTest.kt new file mode 100644 index 0000000..9888f10 --- /dev/null +++ b/application/aam-backend-service/src/test/kotlin/com/aamdigital/aambackendservice/reporting/report/core/DefaultReportDocumentChangeEventConsumerTest.kt @@ -0,0 +1,103 @@ +package com.aamdigital.aambackendservice.reporting.report.core + +import com.aamdigital.aambackendservice.error.InternalServerException +import com.aamdigital.aambackendservice.queue.core.QueueMessageParser +import com.aamdigital.aambackendservice.reporting.reportcalculation.core.CreateReportCalculationUseCase +import com.aamdigital.aambackendservice.reporting.reportcalculation.core.ReportCalculationChangeUseCase +import com.rabbitmq.client.Channel +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.mockito.Mock +import org.mockito.junit.jupiter.MockitoExtension +import org.mockito.kotlin.any +import org.mockito.kotlin.reset +import org.mockito.kotlin.whenever +import org.springframework.amqp.AmqpRejectAndDontRequeueException +import org.springframework.amqp.core.Message +import reactor.test.StepVerifier + +@ExtendWith(MockitoExtension::class) +class DefaultReportDocumentChangeEventConsumerTest { + + private lateinit var service: ReportDocumentChangeEventConsumer + + @Mock + lateinit var messageParser: QueueMessageParser + + @Mock + lateinit var mockMessage: Message + + @Mock + lateinit var mockChannel: Channel + + @Mock + lateinit var createReportCalculationUseCase: CreateReportCalculationUseCase + + @Mock + lateinit var reportCalculationChangeUseCase: ReportCalculationChangeUseCase + + @Mock + lateinit var identifyAffectedReportsUseCase: IdentifyAffectedReportsUseCase + + @BeforeEach + fun setUp() { + reset( + messageParser, + createReportCalculationUseCase, + reportCalculationChangeUseCase, + identifyAffectedReportsUseCase + ) + + service = DefaultReportDocumentChangeEventConsumer( + messageParser = messageParser, + createReportCalculationUseCase = createReportCalculationUseCase, + reportCalculationChangeUseCase = reportCalculationChangeUseCase, + identifyAffectedReportsUseCase = identifyAffectedReportsUseCase + ) + } + + @Test + fun `should return MonoError with AmqpRejectAndDontRequeueException when MessageParser throws exception`() { + // given + val rawMessage = "foo" + + whenever(messageParser.getTypeKClass(any())) + .thenAnswer { + throw InternalServerException() + } + + StepVerifier + // when + .create(service.consume(rawMessage, mockMessage, mockChannel)) + // then + .expectErrorSatisfies { + assertThat(it).isInstanceOf(AmqpRejectAndDontRequeueException::class.java) + Assertions.assertTrue(it.localizedMessage.startsWith("[INTERNAL_SERVER_ERROR]")) + } + .verify() + } + + @Test + fun `should return MonoError with AmqpRejectAndDontRequeueException when EventType is unknown`() { + // given + val rawMessage = "foo" + + whenever(messageParser.getTypeKClass(any())) + .thenAnswer { + String::class + } + + StepVerifier + // when + .create(service.consume(rawMessage, mockMessage, mockChannel)) + // then + .expectErrorSatisfies { + assertThat(it).isInstanceOf(AmqpRejectAndDontRequeueException::class.java) + Assertions.assertTrue(it.localizedMessage.startsWith("[NO_USECASE_CONFIGURED]")) + } + .verify() + } +} diff --git a/application/aam-backend-service/src/test/kotlin/com/aamdigital/aambackendservice/reporting/reportcalculation/core/DefaultCreateReportCalculationUseCaseTest.kt b/application/aam-backend-service/src/test/kotlin/com/aamdigital/aambackendservice/reporting/reportcalculation/core/DefaultCreateReportCalculationUseCaseTest.kt new file mode 100644 index 0000000..3e2ee89 --- /dev/null +++ b/application/aam-backend-service/src/test/kotlin/com/aamdigital/aambackendservice/reporting/reportcalculation/core/DefaultCreateReportCalculationUseCaseTest.kt @@ -0,0 +1,63 @@ +package com.aamdigital.aambackendservice.reporting.reportcalculation.core + +import com.aamdigital.aambackendservice.domain.DomainReference +import com.aamdigital.aambackendservice.error.InternalServerException +import com.aamdigital.aambackendservice.reporting.report.core.ReportingStorage +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.mockito.Mock +import org.mockito.junit.jupiter.MockitoExtension +import org.mockito.kotlin.any +import org.mockito.kotlin.reset +import org.mockito.kotlin.whenever +import reactor.core.publisher.Mono +import reactor.test.StepVerifier + +@ExtendWith(MockitoExtension::class) +class DefaultCreateReportCalculationUseCaseTest { + + private lateinit var service: CreateReportCalculationUseCase + + @Mock + lateinit var reportingStorage: ReportingStorage + + @BeforeEach + fun setUp() { + reset(reportingStorage) + service = DefaultCreateReportCalculationUseCase(reportingStorage) + } + + @Test + fun `should return Failure when ReportingStorage throws error`() { + // given + whenever(reportingStorage.fetchCalculations(any())) + .thenAnswer { + Mono.error> { + InternalServerException() + } + } + + StepVerifier + // when + .create( + service.createReportCalculation( + CreateReportCalculationRequest( + report = DomainReference("Report:1"), + args = mutableMapOf() + ) + ) + ) + // then + .assertNext { + assertThat(it).isInstanceOf(CreateReportCalculationResult.Failure::class.java) + Assertions.assertEquals( + CreateReportCalculationResult.ErrorCode.INTERNAL_SERVER_ERROR, + (it as CreateReportCalculationResult.Failure).errorCode + ) + } + .verifyComplete() + } +}