From e562964496f532d0801e1906882ea2f450d492d7 Mon Sep 17 00:00:00 2001 From: Tom Winter Date: Fri, 9 Aug 2024 13:47:42 +0200 Subject: [PATCH 1/4] feat: prevent new ReportCalculation if one is still pending --- .../changes/jobs/CouchDbChangeDetectionJob.kt | 2 +- ...efaultReportDocumentChangeEventConsumer.kt | 29 +++++++++++-------- .../DefaultCreateReportCalculationUseCase.kt | 28 +++++++++++++----- .../DefaultReportCalculationChangeUseCase.kt | 5 ++-- .../src/main/resources/application.yaml | 6 +++- 5 files changed, 46 insertions(+), 24 deletions(-) diff --git a/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/changes/jobs/CouchDbChangeDetectionJob.kt b/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/changes/jobs/CouchDbChangeDetectionJob.kt index de65493..0dbb6f6 100644 --- a/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/changes/jobs/CouchDbChangeDetectionJob.kt +++ b/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/changes/jobs/CouchDbChangeDetectionJob.kt @@ -17,7 +17,7 @@ class CouchDbChangeDetectionJob( private var MAX_ERROR_COUNT: Int = 5 } - @Scheduled(fixedDelay = 15000) + @Scheduled(fixedDelay = 8000) fun checkForCouchDbChanges() { if (ERROR_COUNTER >= MAX_ERROR_COUNT) { return diff --git a/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/report/core/DefaultReportDocumentChangeEventConsumer.kt b/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/report/core/DefaultReportDocumentChangeEventConsumer.kt index b278333..d1c574c 100644 --- a/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/report/core/DefaultReportDocumentChangeEventConsumer.kt +++ b/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/report/core/DefaultReportDocumentChangeEventConsumer.kt @@ -26,7 +26,9 @@ class DefaultReportDocumentChangeEventConsumer( @RabbitListener( queues = [ReportQueueConfiguration.DOCUMENT_CHANGES_REPORT_QUEUE], - ackMode = "MANUAL" + ackMode = "MANUAL", + concurrency = "1-1", + batch = "1" ) override fun consume(rawMessage: String, message: Message, channel: Channel): Mono { val type = try { @@ -70,19 +72,22 @@ class DefaultReportDocumentChangeEventConsumer( documentChangeEvent = payload ) .flatMap { affectedReports -> - Mono.zip( - affectedReports.map { report -> - createReportCalculationUseCase - .createReportCalculation( - request = CreateReportCalculationRequest( - report = report, - args = mutableMapOf() - ) + Mono.zip(affectedReports.map { report -> + createReportCalculationUseCase + .createReportCalculation( + request = CreateReportCalculationRequest( + report = report, + args = mutableMapOf() ) - } - ) { it.map { } } + ) + }) { + it.iterator() + } + } + .flatMap { Mono.empty() } + .doOnError { + logger.error(it.localizedMessage) } - .flatMap { Mono.empty() } } diff --git a/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/reportcalculation/core/DefaultCreateReportCalculationUseCase.kt b/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/reportcalculation/core/DefaultCreateReportCalculationUseCase.kt index c9bc301..323751c 100644 --- a/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/reportcalculation/core/DefaultCreateReportCalculationUseCase.kt +++ b/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/reportcalculation/core/DefaultCreateReportCalculationUseCase.kt @@ -21,13 +21,28 @@ class DefaultCreateReportCalculationUseCase( args = request.args ) - return reportingStorage.storeCalculation(calculation) - .map { - handleResponse(it) + return reportingStorage.fetchCalculations(request.report).flatMap { reportCalculations -> + val i = reportCalculations.filter { reportCalculation -> + reportCalculation.status == ReportCalculationStatus.PENDING && + reportCalculation.args == calculation.args } - .onErrorResume { - handleError(it) + + if (i.isNotEmpty()) { + Mono.just( + CreateReportCalculationResult.Success( + DomainReference( + id = i.first().id + ) + ) + ) + } else { + reportingStorage.storeCalculation(calculation).map { + handleResponse(it) + } } + }.onErrorResume { + handleError(it) + } } private fun handleResponse(reportCalculation: ReportCalculation): CreateReportCalculationResult { @@ -39,8 +54,7 @@ class DefaultCreateReportCalculationUseCase( private fun handleError(it: Throwable): Mono { return Mono.just( CreateReportCalculationResult.Failure( - errorCode = CreateReportCalculationResult.ErrorCode.INTERNAL_SERVER_ERROR, - cause = it + errorCode = CreateReportCalculationResult.ErrorCode.INTERNAL_SERVER_ERROR, cause = it ) ) } diff --git a/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/reportcalculation/core/DefaultReportCalculationChangeUseCase.kt b/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/reportcalculation/core/DefaultReportCalculationChangeUseCase.kt index 51758ef..68cb873 100644 --- a/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/reportcalculation/core/DefaultReportCalculationChangeUseCase.kt +++ b/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/reportcalculation/core/DefaultReportCalculationChangeUseCase.kt @@ -36,11 +36,10 @@ class DefaultReportCalculationChangeUseCase( .sortedBy { it.calculationCompleted } } .flatMap { - val existingDigest = it.last().attachments["data.json"]?.digest + val existingDigest = it.lastOrNull()?.attachments?.get("data.json")?.digest val currentDigest = currentReportCalculation.attachments["data.json"]?.digest - if (it.isEmpty() - || existingDigest != currentDigest + if (existingDigest != currentDigest ) { notificationService.sendNotifications( report = currentReportCalculation.report, diff --git a/application/aam-backend-service/src/main/resources/application.yaml b/application/aam-backend-service/src/main/resources/application.yaml index d865b84..55c61b6 100644 --- a/application/aam-backend-service/src/main/resources/application.yaml +++ b/application/aam-backend-service/src/main/resources/application.yaml @@ -10,6 +10,10 @@ spring: url: r2dbc:h2:file://././data/dbh2;DB_CLOSE_DELAY=-1 username: local password: local + rabbitmq: + listener: + simple: + prefetch: 1 management: endpoint: @@ -27,7 +31,7 @@ sqs-client-configuration: max-in-memory-size-in-mega-bytes: 16 couch-db-client-configuration: - max-in-memory-size-in-mega-bytes: 16 + max-in-memory-size-in-mega-bytes: 64 --- From ff47994ef6290f8c86f2bbbe79f48987b1b90e83 Mon Sep 17 00:00:00 2001 From: Tom Winter Date: Mon, 12 Aug 2024 15:43:46 +0200 Subject: [PATCH 2/4] test: add DefaultReportDocumentChangeEventConsumerTest --- .../aam-backend-service/build.gradle.kts | 1 + .../queue/DefaultChangeEventPublisher.kt | 5 +- .../core/DefaultNotificationEventPublisher.kt | 3 +- ...efaultReportDocumentChangeEventConsumer.kt | 12 +- ...ltReportDocumentChangeEventConsumerTest.kt | 103 ++++++++++++++++++ 5 files changed, 114 insertions(+), 10 deletions(-) create mode 100644 application/aam-backend-service/src/test/kotlin/com/aamdigital/aambackendservice/reporting/report/core/DefaultReportDocumentChangeEventConsumerTest.kt diff --git a/application/aam-backend-service/build.gradle.kts b/application/aam-backend-service/build.gradle.kts index 3fe25c5..bd0afec 100644 --- a/application/aam-backend-service/build.gradle.kts +++ b/application/aam-backend-service/build.gradle.kts @@ -63,6 +63,7 @@ dependencies { testImplementation("io.cucumber:cucumber-spring:7.14.0") testImplementation("org.junit.vintage:junit-vintage-engine:5.10.2") + testImplementation("org.mockito.kotlin:mockito-kotlin:5.4.0") testImplementation("org.junit.jupiter:junit-jupiter-engine:5.10.2") testImplementation("io.projectreactor:reactor-test") diff --git a/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/changes/queue/DefaultChangeEventPublisher.kt b/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/changes/queue/DefaultChangeEventPublisher.kt index 7e3d642..6a3569c 100644 --- a/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/changes/queue/DefaultChangeEventPublisher.kt +++ b/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/changes/queue/DefaultChangeEventPublisher.kt @@ -7,7 +7,6 @@ import com.aamdigital.aambackendservice.reporting.changes.core.ChangeEventPublis import com.aamdigital.aambackendservice.reporting.domain.event.DatabaseChangeEvent import com.aamdigital.aambackendservice.reporting.domain.event.DocumentChangeEvent import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import org.slf4j.LoggerFactory import org.springframework.amqp.AmqpException import org.springframework.amqp.rabbit.core.RabbitTemplate @@ -50,7 +49,7 @@ class DefaultChangeEventPublisher( logger.trace( "[DefaultDatabaseChangeEventPublisher]: publish message to channel '{}' Payload: {}", channel, - jacksonObjectMapper().writeValueAsString(message) + objectMapper.writeValueAsString(message) ) return message } @@ -82,7 +81,7 @@ class DefaultChangeEventPublisher( logger.trace( "[DefaultDocumentChangeEventPublisher]: publish message to channel '{}' Payload: {}", exchange, - jacksonObjectMapper().writeValueAsString(message) + objectMapper.writeValueAsString(message) ) return message } diff --git a/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/notification/core/DefaultNotificationEventPublisher.kt b/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/notification/core/DefaultNotificationEventPublisher.kt index 358705b..9e35517 100644 --- a/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/notification/core/DefaultNotificationEventPublisher.kt +++ b/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/notification/core/DefaultNotificationEventPublisher.kt @@ -5,7 +5,6 @@ import com.aamdigital.aambackendservice.error.InternalServerException import com.aamdigital.aambackendservice.queue.core.QueueMessage import com.aamdigital.aambackendservice.reporting.domain.event.NotificationEvent import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import org.slf4j.LoggerFactory import org.springframework.amqp.AmqpException import org.springframework.amqp.rabbit.core.RabbitTemplate @@ -48,7 +47,7 @@ class DefaultNotificationEventPublisher( logger.trace( "[DefaultNotificationEventPublisher]: publish message to channel '{}' Payload: {}", channel, - jacksonObjectMapper().writeValueAsString(message) + objectMapper.writeValueAsString(message) ) return message diff --git a/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/report/core/DefaultReportDocumentChangeEventConsumer.kt b/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/report/core/DefaultReportDocumentChangeEventConsumer.kt index d1c574c..7b9dc6d 100644 --- a/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/report/core/DefaultReportDocumentChangeEventConsumer.kt +++ b/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/report/core/DefaultReportDocumentChangeEventConsumer.kt @@ -39,13 +39,13 @@ class DefaultReportDocumentChangeEventConsumer( when (type.qualifiedName) { DocumentChangeEvent::class.qualifiedName -> { - val payload = messageParser.getPayload( + val payload: DocumentChangeEvent = messageParser.getPayload( body = rawMessage.toByteArray(), kClass = DocumentChangeEvent::class ) if (payload.documentId.startsWith("ReportConfig:")) { - logger.info(payload.toString()) + logger.trace(payload.toString()) // todo if aggregationDefinition is different, skip trigger ReportCalculation @@ -97,9 +97,11 @@ class DefaultReportDocumentChangeEventConsumer( type.qualifiedName, ) - throw AmqpRejectAndDontRequeueException( - "[NO_USECASE_CONFIGURED] Could not found matching use case for: ${type.qualifiedName}", - ) + return Mono.error { + throw AmqpRejectAndDontRequeueException( + "[NO_USECASE_CONFIGURED] Could not found matching use case for: ${type.qualifiedName}", + ) + } } } } diff --git a/application/aam-backend-service/src/test/kotlin/com/aamdigital/aambackendservice/reporting/report/core/DefaultReportDocumentChangeEventConsumerTest.kt b/application/aam-backend-service/src/test/kotlin/com/aamdigital/aambackendservice/reporting/report/core/DefaultReportDocumentChangeEventConsumerTest.kt new file mode 100644 index 0000000..9888f10 --- /dev/null +++ b/application/aam-backend-service/src/test/kotlin/com/aamdigital/aambackendservice/reporting/report/core/DefaultReportDocumentChangeEventConsumerTest.kt @@ -0,0 +1,103 @@ +package com.aamdigital.aambackendservice.reporting.report.core + +import com.aamdigital.aambackendservice.error.InternalServerException +import com.aamdigital.aambackendservice.queue.core.QueueMessageParser +import com.aamdigital.aambackendservice.reporting.reportcalculation.core.CreateReportCalculationUseCase +import com.aamdigital.aambackendservice.reporting.reportcalculation.core.ReportCalculationChangeUseCase +import com.rabbitmq.client.Channel +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.mockito.Mock +import org.mockito.junit.jupiter.MockitoExtension +import org.mockito.kotlin.any +import org.mockito.kotlin.reset +import org.mockito.kotlin.whenever +import org.springframework.amqp.AmqpRejectAndDontRequeueException +import org.springframework.amqp.core.Message +import reactor.test.StepVerifier + +@ExtendWith(MockitoExtension::class) +class DefaultReportDocumentChangeEventConsumerTest { + + private lateinit var service: ReportDocumentChangeEventConsumer + + @Mock + lateinit var messageParser: QueueMessageParser + + @Mock + lateinit var mockMessage: Message + + @Mock + lateinit var mockChannel: Channel + + @Mock + lateinit var createReportCalculationUseCase: CreateReportCalculationUseCase + + @Mock + lateinit var reportCalculationChangeUseCase: ReportCalculationChangeUseCase + + @Mock + lateinit var identifyAffectedReportsUseCase: IdentifyAffectedReportsUseCase + + @BeforeEach + fun setUp() { + reset( + messageParser, + createReportCalculationUseCase, + reportCalculationChangeUseCase, + identifyAffectedReportsUseCase + ) + + service = DefaultReportDocumentChangeEventConsumer( + messageParser = messageParser, + createReportCalculationUseCase = createReportCalculationUseCase, + reportCalculationChangeUseCase = reportCalculationChangeUseCase, + identifyAffectedReportsUseCase = identifyAffectedReportsUseCase + ) + } + + @Test + fun `should return MonoError with AmqpRejectAndDontRequeueException when MessageParser throws exception`() { + // given + val rawMessage = "foo" + + whenever(messageParser.getTypeKClass(any())) + .thenAnswer { + throw InternalServerException() + } + + StepVerifier + // when + .create(service.consume(rawMessage, mockMessage, mockChannel)) + // then + .expectErrorSatisfies { + assertThat(it).isInstanceOf(AmqpRejectAndDontRequeueException::class.java) + Assertions.assertTrue(it.localizedMessage.startsWith("[INTERNAL_SERVER_ERROR]")) + } + .verify() + } + + @Test + fun `should return MonoError with AmqpRejectAndDontRequeueException when EventType is unknown`() { + // given + val rawMessage = "foo" + + whenever(messageParser.getTypeKClass(any())) + .thenAnswer { + String::class + } + + StepVerifier + // when + .create(service.consume(rawMessage, mockMessage, mockChannel)) + // then + .expectErrorSatisfies { + assertThat(it).isInstanceOf(AmqpRejectAndDontRequeueException::class.java) + Assertions.assertTrue(it.localizedMessage.startsWith("[NO_USECASE_CONFIGURED]")) + } + .verify() + } +} From ce56cbaf792120dbf6c1e12a69c9e1fb24b91f3f Mon Sep 17 00:00:00 2001 From: Tom Winter Date: Mon, 12 Aug 2024 16:23:11 +0200 Subject: [PATCH 3/4] test: add tests --- .../DefaultCreateReportCalculationUseCase.kt | 36 ++++++----- .../storage/DefaultReportingStorage.kt | 1 - ...faultCreateReportCalculationUseCaseTest.kt | 63 +++++++++++++++++++ 3 files changed, 82 insertions(+), 18 deletions(-) create mode 100644 application/aam-backend-service/src/test/kotlin/com/aamdigital/aambackendservice/reporting/reportcalculation/core/DefaultCreateReportCalculationUseCaseTest.kt diff --git a/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/reportcalculation/core/DefaultCreateReportCalculationUseCase.kt b/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/reportcalculation/core/DefaultCreateReportCalculationUseCase.kt index 323751c..3775872 100644 --- a/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/reportcalculation/core/DefaultCreateReportCalculationUseCase.kt +++ b/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/reportcalculation/core/DefaultCreateReportCalculationUseCase.kt @@ -21,28 +21,30 @@ class DefaultCreateReportCalculationUseCase( args = request.args ) - return reportingStorage.fetchCalculations(request.report).flatMap { reportCalculations -> - val i = reportCalculations.filter { reportCalculation -> - reportCalculation.status == ReportCalculationStatus.PENDING && - reportCalculation.args == calculation.args - } + return reportingStorage.fetchCalculations(request.report) + .flatMap { reportCalculations -> + val i = reportCalculations.filter { reportCalculation -> + reportCalculation.status == ReportCalculationStatus.PENDING && + reportCalculation.args == calculation.args + } - if (i.isNotEmpty()) { - Mono.just( - CreateReportCalculationResult.Success( - DomainReference( - id = i.first().id + if (i.isNotEmpty()) { + Mono.just( + CreateReportCalculationResult.Success( + DomainReference( + id = i.first().id + ) ) ) - ) - } else { - reportingStorage.storeCalculation(calculation).map { - handleResponse(it) + } else { + reportingStorage.storeCalculation(calculation).map { + handleResponse(it) + } } } - }.onErrorResume { - handleError(it) - } + .onErrorResume { + handleError(it) + } } private fun handleResponse(reportCalculation: ReportCalculation): CreateReportCalculationResult { diff --git a/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/storage/DefaultReportingStorage.kt b/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/storage/DefaultReportingStorage.kt index 965861c..f7cb4ce 100644 --- a/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/storage/DefaultReportingStorage.kt +++ b/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/storage/DefaultReportingStorage.kt @@ -101,6 +101,5 @@ class DefaultReportingStorage( calculationCompleted = entity.doc.calculationCompleted, args = entity.doc.args, attachments = entity.doc.attachments - ) } diff --git a/application/aam-backend-service/src/test/kotlin/com/aamdigital/aambackendservice/reporting/reportcalculation/core/DefaultCreateReportCalculationUseCaseTest.kt b/application/aam-backend-service/src/test/kotlin/com/aamdigital/aambackendservice/reporting/reportcalculation/core/DefaultCreateReportCalculationUseCaseTest.kt new file mode 100644 index 0000000..3e2ee89 --- /dev/null +++ b/application/aam-backend-service/src/test/kotlin/com/aamdigital/aambackendservice/reporting/reportcalculation/core/DefaultCreateReportCalculationUseCaseTest.kt @@ -0,0 +1,63 @@ +package com.aamdigital.aambackendservice.reporting.reportcalculation.core + +import com.aamdigital.aambackendservice.domain.DomainReference +import com.aamdigital.aambackendservice.error.InternalServerException +import com.aamdigital.aambackendservice.reporting.report.core.ReportingStorage +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.mockito.Mock +import org.mockito.junit.jupiter.MockitoExtension +import org.mockito.kotlin.any +import org.mockito.kotlin.reset +import org.mockito.kotlin.whenever +import reactor.core.publisher.Mono +import reactor.test.StepVerifier + +@ExtendWith(MockitoExtension::class) +class DefaultCreateReportCalculationUseCaseTest { + + private lateinit var service: CreateReportCalculationUseCase + + @Mock + lateinit var reportingStorage: ReportingStorage + + @BeforeEach + fun setUp() { + reset(reportingStorage) + service = DefaultCreateReportCalculationUseCase(reportingStorage) + } + + @Test + fun `should return Failure when ReportingStorage throws error`() { + // given + whenever(reportingStorage.fetchCalculations(any())) + .thenAnswer { + Mono.error> { + InternalServerException() + } + } + + StepVerifier + // when + .create( + service.createReportCalculation( + CreateReportCalculationRequest( + report = DomainReference("Report:1"), + args = mutableMapOf() + ) + ) + ) + // then + .assertNext { + assertThat(it).isInstanceOf(CreateReportCalculationResult.Failure::class.java) + Assertions.assertEquals( + CreateReportCalculationResult.ErrorCode.INTERNAL_SERVER_ERROR, + (it as CreateReportCalculationResult.Failure).errorCode + ) + } + .verifyComplete() + } +} From 906f791cc6f95a35de2df6984d0ab1b8d1cd29e7 Mon Sep 17 00:00:00 2001 From: Tom Winter Date: Tue, 13 Aug 2024 14:19:08 +0200 Subject: [PATCH 4/4] Apply suggestions from code review Co-authored-by: Sebastian --- .../report/core/DefaultReportDocumentChangeEventConsumer.kt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/report/core/DefaultReportDocumentChangeEventConsumer.kt b/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/report/core/DefaultReportDocumentChangeEventConsumer.kt index 7b9dc6d..6b2868e 100644 --- a/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/report/core/DefaultReportDocumentChangeEventConsumer.kt +++ b/application/aam-backend-service/src/main/kotlin/com/aamdigital/aambackendservice/reporting/report/core/DefaultReportDocumentChangeEventConsumer.kt @@ -27,6 +27,7 @@ class DefaultReportDocumentChangeEventConsumer( @RabbitListener( queues = [ReportQueueConfiguration.DOCUMENT_CHANGES_REPORT_QUEUE], ackMode = "MANUAL", + // avoid concurrent processing so that we do not trigger multiple calculations for same data unnecessarily concurrency = "1-1", batch = "1" ) @@ -99,7 +100,7 @@ class DefaultReportDocumentChangeEventConsumer( return Mono.error { throw AmqpRejectAndDontRequeueException( - "[NO_USECASE_CONFIGURED] Could not found matching use case for: ${type.qualifiedName}", + "[NO_USECASE_CONFIGURED] Could not find matching use case for: ${type.qualifiedName}", ) } }