Skip to content

Commit

Permalink
feat: prevent new ReportCalculation if one is still pending (#32)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwwinter authored Aug 13, 2024
1 parent 2d5311a commit ba797bb
Show file tree
Hide file tree
Showing 11 changed files with 224 additions and 33 deletions.
1 change: 1 addition & 0 deletions application/aam-backend-service/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ dependencies {
testImplementation("io.cucumber:cucumber-spring:7.14.0")
testImplementation("org.junit.vintage:junit-vintage-engine:5.10.2")

testImplementation("org.mockito.kotlin:mockito-kotlin:5.4.0")
testImplementation("org.junit.jupiter:junit-jupiter-engine:5.10.2")
testImplementation("io.projectreactor:reactor-test")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class CouchDbChangeDetectionJob(
private var MAX_ERROR_COUNT: Int = 5
}

@Scheduled(fixedDelay = 15000)
@Scheduled(fixedDelay = 8000)
fun checkForCouchDbChanges() {
if (ERROR_COUNTER >= MAX_ERROR_COUNT) {
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import com.aamdigital.aambackendservice.reporting.changes.core.ChangeEventPublis
import com.aamdigital.aambackendservice.reporting.domain.event.DatabaseChangeEvent
import com.aamdigital.aambackendservice.reporting.domain.event.DocumentChangeEvent
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import org.slf4j.LoggerFactory
import org.springframework.amqp.AmqpException
import org.springframework.amqp.rabbit.core.RabbitTemplate
Expand Down Expand Up @@ -50,7 +49,7 @@ class DefaultChangeEventPublisher(
logger.trace(
"[DefaultDatabaseChangeEventPublisher]: publish message to channel '{}' Payload: {}",
channel,
jacksonObjectMapper().writeValueAsString(message)
objectMapper.writeValueAsString(message)
)
return message
}
Expand Down Expand Up @@ -82,7 +81,7 @@ class DefaultChangeEventPublisher(
logger.trace(
"[DefaultDocumentChangeEventPublisher]: publish message to channel '{}' Payload: {}",
exchange,
jacksonObjectMapper().writeValueAsString(message)
objectMapper.writeValueAsString(message)
)
return message
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import com.aamdigital.aambackendservice.error.InternalServerException
import com.aamdigital.aambackendservice.queue.core.QueueMessage
import com.aamdigital.aambackendservice.reporting.domain.event.NotificationEvent
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import org.slf4j.LoggerFactory
import org.springframework.amqp.AmqpException
import org.springframework.amqp.rabbit.core.RabbitTemplate
Expand Down Expand Up @@ -48,7 +47,7 @@ class DefaultNotificationEventPublisher(
logger.trace(
"[DefaultNotificationEventPublisher]: publish message to channel '{}' Payload: {}",
channel,
jacksonObjectMapper().writeValueAsString(message)
objectMapper.writeValueAsString(message)
)

return message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ class DefaultReportDocumentChangeEventConsumer(

@RabbitListener(
queues = [ReportQueueConfiguration.DOCUMENT_CHANGES_REPORT_QUEUE],
ackMode = "MANUAL"
ackMode = "MANUAL",
// avoid concurrent processing so that we do not trigger multiple calculations for same data unnecessarily
concurrency = "1-1",
batch = "1"
)
override fun consume(rawMessage: String, message: Message, channel: Channel): Mono<Unit> {
val type = try {
Expand All @@ -37,13 +40,13 @@ class DefaultReportDocumentChangeEventConsumer(

when (type.qualifiedName) {
DocumentChangeEvent::class.qualifiedName -> {
val payload = messageParser.getPayload(
val payload: DocumentChangeEvent = messageParser.getPayload(
body = rawMessage.toByteArray(),
kClass = DocumentChangeEvent::class
)

if (payload.documentId.startsWith("ReportConfig:")) {
logger.info(payload.toString())
logger.trace(payload.toString())

// todo if aggregationDefinition is different, skip trigger ReportCalculation

Expand All @@ -70,19 +73,22 @@ class DefaultReportDocumentChangeEventConsumer(
documentChangeEvent = payload
)
.flatMap { affectedReports ->
Mono.zip(
affectedReports.map { report ->
createReportCalculationUseCase
.createReportCalculation(
request = CreateReportCalculationRequest(
report = report,
args = mutableMapOf()
)
Mono.zip(affectedReports.map { report ->
createReportCalculationUseCase
.createReportCalculation(
request = CreateReportCalculationRequest(
report = report,
args = mutableMapOf()
)
}
) { it.map { } }
)
}) {
it.iterator()
}
}
.flatMap { Mono.empty<Unit>() }
.doOnError {
logger.error(it.localizedMessage)
}
.flatMap { Mono.empty() }

}

Expand All @@ -92,9 +98,11 @@ class DefaultReportDocumentChangeEventConsumer(
type.qualifiedName,
)

throw AmqpRejectAndDontRequeueException(
"[NO_USECASE_CONFIGURED] Could not found matching use case for: ${type.qualifiedName}",
)
return Mono.error {
throw AmqpRejectAndDontRequeueException(
"[NO_USECASE_CONFIGURED] Could not find matching use case for: ${type.qualifiedName}",
)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,26 @@ class DefaultCreateReportCalculationUseCase(
args = request.args
)

return reportingStorage.storeCalculation(calculation)
.map {
handleResponse(it)
return reportingStorage.fetchCalculations(request.report)
.flatMap { reportCalculations ->
val i = reportCalculations.filter { reportCalculation ->
reportCalculation.status == ReportCalculationStatus.PENDING &&
reportCalculation.args == calculation.args
}

if (i.isNotEmpty()) {
Mono.just(
CreateReportCalculationResult.Success(
DomainReference(
id = i.first().id
)
)
)
} else {
reportingStorage.storeCalculation(calculation).map {
handleResponse(it)
}
}
}
.onErrorResume {
handleError(it)
Expand All @@ -39,8 +56,7 @@ class DefaultCreateReportCalculationUseCase(
private fun handleError(it: Throwable): Mono<CreateReportCalculationResult> {
return Mono.just(
CreateReportCalculationResult.Failure(
errorCode = CreateReportCalculationResult.ErrorCode.INTERNAL_SERVER_ERROR,
cause = it
errorCode = CreateReportCalculationResult.ErrorCode.INTERNAL_SERVER_ERROR, cause = it
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,10 @@ class DefaultReportCalculationChangeUseCase(
.sortedBy { it.calculationCompleted }
}
.flatMap {
val existingDigest = it.last().attachments["data.json"]?.digest
val existingDigest = it.lastOrNull()?.attachments?.get("data.json")?.digest
val currentDigest = currentReportCalculation.attachments["data.json"]?.digest

if (it.isEmpty()
|| existingDigest != currentDigest
if (existingDigest != currentDigest
) {
notificationService.sendNotifications(
report = currentReportCalculation.report,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,5 @@ class DefaultReportingStorage(
calculationCompleted = entity.doc.calculationCompleted,
args = entity.doc.args,
attachments = entity.doc.attachments

)
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ spring:
url: r2dbc:h2:file://././data/dbh2;DB_CLOSE_DELAY=-1
username: local
password: local
rabbitmq:
listener:
simple:
prefetch: 1

management:
endpoint:
Expand All @@ -27,7 +31,7 @@ sqs-client-configuration:
max-in-memory-size-in-mega-bytes: 16

couch-db-client-configuration:
max-in-memory-size-in-mega-bytes: 16
max-in-memory-size-in-mega-bytes: 64

---

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package com.aamdigital.aambackendservice.reporting.report.core

import com.aamdigital.aambackendservice.error.InternalServerException
import com.aamdigital.aambackendservice.queue.core.QueueMessageParser
import com.aamdigital.aambackendservice.reporting.reportcalculation.core.CreateReportCalculationUseCase
import com.aamdigital.aambackendservice.reporting.reportcalculation.core.ReportCalculationChangeUseCase
import com.rabbitmq.client.Channel
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import org.mockito.Mock
import org.mockito.junit.jupiter.MockitoExtension
import org.mockito.kotlin.any
import org.mockito.kotlin.reset
import org.mockito.kotlin.whenever
import org.springframework.amqp.AmqpRejectAndDontRequeueException
import org.springframework.amqp.core.Message
import reactor.test.StepVerifier

@ExtendWith(MockitoExtension::class)
class DefaultReportDocumentChangeEventConsumerTest {

private lateinit var service: ReportDocumentChangeEventConsumer

@Mock
lateinit var messageParser: QueueMessageParser

@Mock
lateinit var mockMessage: Message

@Mock
lateinit var mockChannel: Channel

@Mock
lateinit var createReportCalculationUseCase: CreateReportCalculationUseCase

@Mock
lateinit var reportCalculationChangeUseCase: ReportCalculationChangeUseCase

@Mock
lateinit var identifyAffectedReportsUseCase: IdentifyAffectedReportsUseCase

@BeforeEach
fun setUp() {
reset(
messageParser,
createReportCalculationUseCase,
reportCalculationChangeUseCase,
identifyAffectedReportsUseCase
)

service = DefaultReportDocumentChangeEventConsumer(
messageParser = messageParser,
createReportCalculationUseCase = createReportCalculationUseCase,
reportCalculationChangeUseCase = reportCalculationChangeUseCase,
identifyAffectedReportsUseCase = identifyAffectedReportsUseCase
)
}

@Test
fun `should return MonoError with AmqpRejectAndDontRequeueException when MessageParser throws exception`() {
// given
val rawMessage = "foo"

whenever(messageParser.getTypeKClass(any()))
.thenAnswer {
throw InternalServerException()
}

StepVerifier
// when
.create(service.consume(rawMessage, mockMessage, mockChannel))
// then
.expectErrorSatisfies {
assertThat(it).isInstanceOf(AmqpRejectAndDontRequeueException::class.java)
Assertions.assertTrue(it.localizedMessage.startsWith("[INTERNAL_SERVER_ERROR]"))
}
.verify()
}

@Test
fun `should return MonoError with AmqpRejectAndDontRequeueException when EventType is unknown`() {
// given
val rawMessage = "foo"

whenever(messageParser.getTypeKClass(any()))
.thenAnswer {
String::class
}

StepVerifier
// when
.create(service.consume(rawMessage, mockMessage, mockChannel))
// then
.expectErrorSatisfies {
assertThat(it).isInstanceOf(AmqpRejectAndDontRequeueException::class.java)
Assertions.assertTrue(it.localizedMessage.startsWith("[NO_USECASE_CONFIGURED]"))
}
.verify()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.aamdigital.aambackendservice.reporting.reportcalculation.core

import com.aamdigital.aambackendservice.domain.DomainReference
import com.aamdigital.aambackendservice.error.InternalServerException
import com.aamdigital.aambackendservice.reporting.report.core.ReportingStorage
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import org.mockito.Mock
import org.mockito.junit.jupiter.MockitoExtension
import org.mockito.kotlin.any
import org.mockito.kotlin.reset
import org.mockito.kotlin.whenever
import reactor.core.publisher.Mono
import reactor.test.StepVerifier

@ExtendWith(MockitoExtension::class)
class DefaultCreateReportCalculationUseCaseTest {

private lateinit var service: CreateReportCalculationUseCase

@Mock
lateinit var reportingStorage: ReportingStorage

@BeforeEach
fun setUp() {
reset(reportingStorage)
service = DefaultCreateReportCalculationUseCase(reportingStorage)
}

@Test
fun `should return Failure when ReportingStorage throws error`() {
// given
whenever(reportingStorage.fetchCalculations(any()))
.thenAnswer {
Mono.error<List<*>> {
InternalServerException()
}
}

StepVerifier
// when
.create(
service.createReportCalculation(
CreateReportCalculationRequest(
report = DomainReference("Report:1"),
args = mutableMapOf()
)
)
)
// then
.assertNext {
assertThat(it).isInstanceOf(CreateReportCalculationResult.Failure::class.java)
Assertions.assertEquals(
CreateReportCalculationResult.ErrorCode.INTERNAL_SERVER_ERROR,
(it as CreateReportCalculationResult.Failure).errorCode
)
}
.verifyComplete()
}
}

0 comments on commit ba797bb

Please sign in to comment.