Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: prevent new ReportCalculation if one is still pending #32

Merged
merged 4 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions application/aam-backend-service/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ dependencies {
testImplementation("io.cucumber:cucumber-spring:7.14.0")
testImplementation("org.junit.vintage:junit-vintage-engine:5.10.2")

testImplementation("org.mockito.kotlin:mockito-kotlin:5.4.0")
testImplementation("org.junit.jupiter:junit-jupiter-engine:5.10.2")
testImplementation("io.projectreactor:reactor-test")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class CouchDbChangeDetectionJob(
private var MAX_ERROR_COUNT: Int = 5
}

@Scheduled(fixedDelay = 15000)
@Scheduled(fixedDelay = 8000)
tomwwinter marked this conversation as resolved.
Show resolved Hide resolved
fun checkForCouchDbChanges() {
if (ERROR_COUNTER >= MAX_ERROR_COUNT) {
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import com.aamdigital.aambackendservice.reporting.changes.core.ChangeEventPublis
import com.aamdigital.aambackendservice.reporting.domain.event.DatabaseChangeEvent
import com.aamdigital.aambackendservice.reporting.domain.event.DocumentChangeEvent
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import org.slf4j.LoggerFactory
import org.springframework.amqp.AmqpException
import org.springframework.amqp.rabbit.core.RabbitTemplate
Expand Down Expand Up @@ -50,7 +49,7 @@ class DefaultChangeEventPublisher(
logger.trace(
"[DefaultDatabaseChangeEventPublisher]: publish message to channel '{}' Payload: {}",
channel,
jacksonObjectMapper().writeValueAsString(message)
objectMapper.writeValueAsString(message)
)
return message
}
Expand Down Expand Up @@ -82,7 +81,7 @@ class DefaultChangeEventPublisher(
logger.trace(
"[DefaultDocumentChangeEventPublisher]: publish message to channel '{}' Payload: {}",
exchange,
jacksonObjectMapper().writeValueAsString(message)
objectMapper.writeValueAsString(message)
)
return message
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import com.aamdigital.aambackendservice.error.InternalServerException
import com.aamdigital.aambackendservice.queue.core.QueueMessage
import com.aamdigital.aambackendservice.reporting.domain.event.NotificationEvent
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import org.slf4j.LoggerFactory
import org.springframework.amqp.AmqpException
import org.springframework.amqp.rabbit.core.RabbitTemplate
Expand Down Expand Up @@ -48,7 +47,7 @@ class DefaultNotificationEventPublisher(
logger.trace(
"[DefaultNotificationEventPublisher]: publish message to channel '{}' Payload: {}",
channel,
jacksonObjectMapper().writeValueAsString(message)
objectMapper.writeValueAsString(message)
)

return message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ class DefaultReportDocumentChangeEventConsumer(

@RabbitListener(
queues = [ReportQueueConfiguration.DOCUMENT_CHANGES_REPORT_QUEUE],
ackMode = "MANUAL"
ackMode = "MANUAL",
// avoid concurrent processing so that we do not trigger multiple calculations for same data unnecessarily
concurrency = "1-1",
batch = "1"
tomwwinter marked this conversation as resolved.
Show resolved Hide resolved
)
override fun consume(rawMessage: String, message: Message, channel: Channel): Mono<Unit> {
val type = try {
Expand All @@ -37,13 +40,13 @@ class DefaultReportDocumentChangeEventConsumer(

when (type.qualifiedName) {
DocumentChangeEvent::class.qualifiedName -> {
val payload = messageParser.getPayload(
val payload: DocumentChangeEvent = messageParser.getPayload(
body = rawMessage.toByteArray(),
kClass = DocumentChangeEvent::class
)

if (payload.documentId.startsWith("ReportConfig:")) {
logger.info(payload.toString())
logger.trace(payload.toString())

// todo if aggregationDefinition is different, skip trigger ReportCalculation

Expand All @@ -70,19 +73,22 @@ class DefaultReportDocumentChangeEventConsumer(
documentChangeEvent = payload
)
.flatMap { affectedReports ->
Mono.zip(
affectedReports.map { report ->
createReportCalculationUseCase
.createReportCalculation(
request = CreateReportCalculationRequest(
report = report,
args = mutableMapOf()
)
Mono.zip(affectedReports.map { report ->
createReportCalculationUseCase
.createReportCalculation(
request = CreateReportCalculationRequest(
report = report,
args = mutableMapOf()
)
}
) { it.map { } }
)
}) {
it.iterator()
}
}
.flatMap { Mono.empty<Unit>() }
.doOnError {
logger.error(it.localizedMessage)
}
.flatMap { Mono.empty() }

}

Expand All @@ -92,9 +98,11 @@ class DefaultReportDocumentChangeEventConsumer(
type.qualifiedName,
)

throw AmqpRejectAndDontRequeueException(
"[NO_USECASE_CONFIGURED] Could not found matching use case for: ${type.qualifiedName}",
)
return Mono.error {
throw AmqpRejectAndDontRequeueException(
"[NO_USECASE_CONFIGURED] Could not find matching use case for: ${type.qualifiedName}",
)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,26 @@ class DefaultCreateReportCalculationUseCase(
args = request.args
)

return reportingStorage.storeCalculation(calculation)
.map {
handleResponse(it)
return reportingStorage.fetchCalculations(request.report)
.flatMap { reportCalculations ->
val i = reportCalculations.filter { reportCalculation ->
reportCalculation.status == ReportCalculationStatus.PENDING &&
reportCalculation.args == calculation.args
}

if (i.isNotEmpty()) {
Mono.just(
CreateReportCalculationResult.Success(
tomwwinter marked this conversation as resolved.
Show resolved Hide resolved
DomainReference(
id = i.first().id
)
)
)
} else {
reportingStorage.storeCalculation(calculation).map {
handleResponse(it)
}
}
}
.onErrorResume {
handleError(it)
Expand All @@ -39,8 +56,7 @@ class DefaultCreateReportCalculationUseCase(
private fun handleError(it: Throwable): Mono<CreateReportCalculationResult> {
return Mono.just(
CreateReportCalculationResult.Failure(
errorCode = CreateReportCalculationResult.ErrorCode.INTERNAL_SERVER_ERROR,
cause = it
errorCode = CreateReportCalculationResult.ErrorCode.INTERNAL_SERVER_ERROR, cause = it
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,10 @@ class DefaultReportCalculationChangeUseCase(
.sortedBy { it.calculationCompleted }
}
.flatMap {
val existingDigest = it.last().attachments["data.json"]?.digest
val existingDigest = it.lastOrNull()?.attachments?.get("data.json")?.digest
val currentDigest = currentReportCalculation.attachments["data.json"]?.digest

if (it.isEmpty()
|| existingDigest != currentDigest
if (existingDigest != currentDigest
) {
notificationService.sendNotifications(
report = currentReportCalculation.report,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,5 @@ class DefaultReportingStorage(
calculationCompleted = entity.doc.calculationCompleted,
args = entity.doc.args,
attachments = entity.doc.attachments

)
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ spring:
url: r2dbc:h2:file://././data/dbh2;DB_CLOSE_DELAY=-1
username: local
password: local
rabbitmq:
listener:
simple:
prefetch: 1

management:
endpoint:
Expand All @@ -27,7 +31,7 @@ sqs-client-configuration:
max-in-memory-size-in-mega-bytes: 16

couch-db-client-configuration:
max-in-memory-size-in-mega-bytes: 16
max-in-memory-size-in-mega-bytes: 64

---

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package com.aamdigital.aambackendservice.reporting.report.core

import com.aamdigital.aambackendservice.error.InternalServerException
import com.aamdigital.aambackendservice.queue.core.QueueMessageParser
import com.aamdigital.aambackendservice.reporting.reportcalculation.core.CreateReportCalculationUseCase
import com.aamdigital.aambackendservice.reporting.reportcalculation.core.ReportCalculationChangeUseCase
import com.rabbitmq.client.Channel
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import org.mockito.Mock
import org.mockito.junit.jupiter.MockitoExtension
import org.mockito.kotlin.any
import org.mockito.kotlin.reset
import org.mockito.kotlin.whenever
import org.springframework.amqp.AmqpRejectAndDontRequeueException
import org.springframework.amqp.core.Message
import reactor.test.StepVerifier

@ExtendWith(MockitoExtension::class)
class DefaultReportDocumentChangeEventConsumerTest {

private lateinit var service: ReportDocumentChangeEventConsumer

@Mock
lateinit var messageParser: QueueMessageParser

@Mock
lateinit var mockMessage: Message

@Mock
lateinit var mockChannel: Channel

@Mock
lateinit var createReportCalculationUseCase: CreateReportCalculationUseCase

@Mock
lateinit var reportCalculationChangeUseCase: ReportCalculationChangeUseCase

@Mock
lateinit var identifyAffectedReportsUseCase: IdentifyAffectedReportsUseCase

@BeforeEach
fun setUp() {
reset(
messageParser,
createReportCalculationUseCase,
reportCalculationChangeUseCase,
identifyAffectedReportsUseCase
)

service = DefaultReportDocumentChangeEventConsumer(
messageParser = messageParser,
createReportCalculationUseCase = createReportCalculationUseCase,
reportCalculationChangeUseCase = reportCalculationChangeUseCase,
identifyAffectedReportsUseCase = identifyAffectedReportsUseCase
)
}

@Test
fun `should return MonoError with AmqpRejectAndDontRequeueException when MessageParser throws exception`() {
// given
val rawMessage = "foo"

whenever(messageParser.getTypeKClass(any()))
.thenAnswer {
throw InternalServerException()
}

StepVerifier
// when
.create(service.consume(rawMessage, mockMessage, mockChannel))
// then
.expectErrorSatisfies {
assertThat(it).isInstanceOf(AmqpRejectAndDontRequeueException::class.java)
Assertions.assertTrue(it.localizedMessage.startsWith("[INTERNAL_SERVER_ERROR]"))
}
.verify()
}

@Test
fun `should return MonoError with AmqpRejectAndDontRequeueException when EventType is unknown`() {
// given
val rawMessage = "foo"

whenever(messageParser.getTypeKClass(any()))
.thenAnswer {
String::class
}

StepVerifier
// when
.create(service.consume(rawMessage, mockMessage, mockChannel))
// then
.expectErrorSatisfies {
assertThat(it).isInstanceOf(AmqpRejectAndDontRequeueException::class.java)
Assertions.assertTrue(it.localizedMessage.startsWith("[NO_USECASE_CONFIGURED]"))
}
.verify()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.aamdigital.aambackendservice.reporting.reportcalculation.core

import com.aamdigital.aambackendservice.domain.DomainReference
import com.aamdigital.aambackendservice.error.InternalServerException
import com.aamdigital.aambackendservice.reporting.report.core.ReportingStorage
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import org.mockito.Mock
import org.mockito.junit.jupiter.MockitoExtension
import org.mockito.kotlin.any
import org.mockito.kotlin.reset
import org.mockito.kotlin.whenever
import reactor.core.publisher.Mono
import reactor.test.StepVerifier

@ExtendWith(MockitoExtension::class)
class DefaultCreateReportCalculationUseCaseTest {

private lateinit var service: CreateReportCalculationUseCase

@Mock
lateinit var reportingStorage: ReportingStorage

@BeforeEach
fun setUp() {
reset(reportingStorage)
service = DefaultCreateReportCalculationUseCase(reportingStorage)
}

@Test
fun `should return Failure when ReportingStorage throws error`() {
// given
whenever(reportingStorage.fetchCalculations(any()))
.thenAnswer {
Mono.error<List<*>> {
InternalServerException()
}
}

StepVerifier
// when
.create(
service.createReportCalculation(
CreateReportCalculationRequest(
report = DomainReference("Report:1"),
args = mutableMapOf()
)
)
)
// then
.assertNext {
assertThat(it).isInstanceOf(CreateReportCalculationResult.Failure::class.java)
Assertions.assertEquals(
CreateReportCalculationResult.ErrorCode.INTERNAL_SERVER_ERROR,
(it as CreateReportCalculationResult.Failure).errorCode
)
}
.verifyComplete()
}
}
Loading