Skip to content

Commit

Permalink
fix: queue message parsing error
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwwinter committed Mar 11, 2024
1 parent 9433235 commit 1838858
Show file tree
Hide file tree
Showing 8 changed files with 23 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.aamdigital.aambackendservice.changes.core.DatabaseChangeEventConsumer
import com.aamdigital.aambackendservice.changes.queue.DefaultChangeEventPublisher
import com.aamdigital.aambackendservice.changes.queue.DefaultDatabaseChangeEventConsumer
import com.aamdigital.aambackendservice.queue.core.QueueMessageParser
import com.fasterxml.jackson.databind.ObjectMapper
import org.springframework.amqp.core.Binding
import org.springframework.amqp.core.BindingBuilder
import org.springframework.amqp.core.FanoutExchange
Expand Down Expand Up @@ -51,8 +52,10 @@ class ChangesQueueConfiguration {

@Bean
fun defaultChangeEventPublisher(
objectMapper: ObjectMapper,
rabbitTemplate: RabbitTemplate,
): ChangeEventPublisher = DefaultChangeEventPublisher(
objectMapper = objectMapper,
rabbitTemplate = rabbitTemplate,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.aamdigital.aambackendservice.domain.event.DocumentChangeEvent
import com.aamdigital.aambackendservice.error.AamException
import com.aamdigital.aambackendservice.error.InternalServerException
import com.aamdigital.aambackendservice.queue.core.QueueMessage
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import org.slf4j.LoggerFactory
import org.springframework.amqp.AmqpException
Expand All @@ -16,6 +17,7 @@ import java.time.format.DateTimeFormatter
import java.util.*

class DefaultChangeEventPublisher(
private val objectMapper: ObjectMapper,
private val rabbitTemplate: RabbitTemplate,
) : ChangeEventPublisher {

Expand All @@ -31,10 +33,11 @@ class DefaultChangeEventPublisher(
.atOffset(ZoneOffset.UTC)
.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)
)

try {
rabbitTemplate.convertAndSend(
channel,
message
objectMapper.writeValueAsString(message)
)
} catch (ex: AmqpException) {
throw InternalServerException(
Expand Down Expand Up @@ -66,7 +69,7 @@ class DefaultChangeEventPublisher(
rabbitTemplate.convertAndSend(
exchange,
"",
message
objectMapper.writeValueAsString(message)
)
} catch (ex: AmqpException) {
throw InternalServerException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,20 @@ import org.slf4j.LoggerFactory
import org.springframework.amqp.AmqpRejectAndDontRequeueException
import org.springframework.amqp.core.Message
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.messaging.handler.annotation.Payload
import reactor.core.publisher.Mono

class DefaultDatabaseChangeEventConsumer(
private val messageParser: QueueMessageParser,
private val useCase: CreateDocumentChangeUseCase,
) : DatabaseChangeEventConsumer {

private val logger = LoggerFactory.getLogger(javaClass)

@RabbitListener(
queues = [DB_CHANGES_QUEUE],
ackMode = "MANUAL"
)
override fun consume(@Payload rawMessage: String, message: Message, channel: Channel): Mono<Unit> {
override fun consume(rawMessage: String, message: Message, channel: Channel): Mono<Unit> {
val type = try {
messageParser.getTypeKClass(rawMessage.toByteArray())
} catch (ex: AamException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ class DefaultNotificationEventConsumer(
ackMode = "MANUAL"
)
override fun consume(rawMessage: String, message: Message, channel: Channel): Mono<Unit> {

val type = try {
messageParser.getTypeKClass(rawMessage.toByteArray())
} catch (ex: AamException) {
return Mono.error { throw AmqpRejectAndDontRequeueException("[${ex.code}] ${ex.localizedMessage}", ex) }
}

when (type.qualifiedName) {
NotificationEvent::class.qualifiedName -> {
val payload = messageParser.getPayload(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.aamdigital.aambackendservice.error.AamException
import com.aamdigital.aambackendservice.error.InternalServerException
import com.aamdigital.aambackendservice.notification.core.event.NotificationEvent
import com.aamdigital.aambackendservice.queue.core.QueueMessage
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import org.slf4j.LoggerFactory
import org.springframework.amqp.AmqpException
Expand All @@ -14,6 +15,7 @@ import java.time.format.DateTimeFormatter
import java.util.*

class DefaultNotificationEventPublisher(
private val objectMapper: ObjectMapper,
private val rabbitTemplate: RabbitTemplate,
) : NotificationEventPublisher {

Expand All @@ -33,7 +35,7 @@ class DefaultNotificationEventPublisher(
try {
rabbitTemplate.convertAndSend(
channel,
message
objectMapper.writeValueAsString(message)
)
} catch (ex: AmqpException) {
throw InternalServerException(
Expand All @@ -48,7 +50,7 @@ class DefaultNotificationEventPublisher(
channel,
jacksonObjectMapper().writeValueAsString(message)
)

return message
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.aamdigital.aambackendservice.notification.core.NotificationEventConsu
import com.aamdigital.aambackendservice.notification.core.NotificationEventPublisher
import com.aamdigital.aambackendservice.notification.core.TriggerWebhookUseCase
import com.aamdigital.aambackendservice.queue.core.QueueMessageParser
import com.fasterxml.jackson.databind.ObjectMapper
import org.springframework.amqp.core.Binding
import org.springframework.amqp.core.BindingBuilder
import org.springframework.amqp.core.FanoutExchange
Expand Down Expand Up @@ -50,7 +51,10 @@ class NotificationQueueConfiguration {
@Bean
fun defaultNotificationEventPublisher(
rabbitTemplate: RabbitTemplate,
): NotificationEventPublisher = DefaultNotificationEventPublisher(rabbitTemplate)
objectMapper: ObjectMapper,
): NotificationEventPublisher = DefaultNotificationEventPublisher(
objectMapper = objectMapper, rabbitTemplate = rabbitTemplate
)

@Bean
fun defaultNotificationEventConsumer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ class DefaultReportDocumentChangeEventConsumer(
private val reportCalculationChangeUseCase: ReportCalculationChangeUseCase,
private val identifyAffectedReportsUseCase: IdentifyAffectedReportsUseCase,
) : ReportDocumentChangeEventConsumer {

private val logger = LoggerFactory.getLogger(javaClass)

@RabbitListener(
queues = [ReportQueueConfiguration.DOCUMENT_CHANGES_REPORT_QUEUE],
ackMode = "MANUAL"
)
override fun consume(rawMessage: String, messsage: Message, channel: Channel): Mono<Unit> {
override fun consume(rawMessage: String, message: Message, channel: Channel): Mono<Unit> {
val type = try {
messageParser.getTypeKClass(rawMessage.toByteArray())
} catch (ex: AamException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ import org.springframework.amqp.core.Message
import reactor.core.publisher.Mono

interface ReportDocumentChangeEventConsumer {
fun consume(rawMessage: String, messsage: Message, channel: Channel): Mono<Unit>
fun consume(rawMessage: String, message: Message, channel: Channel): Mono<Unit>
}

0 comments on commit 1838858

Please sign in to comment.