diff --git a/src/main/java/it/pagopa/interop/signalhub/persister/config/AwsBeanBuilder.java b/src/main/java/it/pagopa/interop/signalhub/persister/config/AwsBeanBuilder.java index cb20932..27c3a05 100644 --- a/src/main/java/it/pagopa/interop/signalhub/persister/config/AwsBeanBuilder.java +++ b/src/main/java/it/pagopa/interop/signalhub/persister/config/AwsBeanBuilder.java @@ -36,7 +36,7 @@ public SqsMessageListenerContainerFactory defaultSqsListenerContainerFac return SqsMessageListenerContainerFactory .builder() .configure(options -> options - .acknowledgementMode(AcknowledgementMode.ON_SUCCESS) + //.acknowledgementMode(AcknowledgementMode.ON_SUCCESS) .maxConcurrentMessages(10) .maxMessagesPerPoll(10)) .sqsAsyncClient(sqsAsyncClient()) diff --git a/src/main/java/it/pagopa/interop/signalhub/persister/config/ReactorConfiguration.java b/src/main/java/it/pagopa/interop/signalhub/persister/config/ReactorConfiguration.java new file mode 100644 index 0000000..81d368e --- /dev/null +++ b/src/main/java/it/pagopa/interop/signalhub/persister/config/ReactorConfiguration.java @@ -0,0 +1,20 @@ +package it.pagopa.interop.signalhub.persister.config; + + +import it.pagopa.interop.signalhub.persister.logging.ContextLifter; +import it.pagopa.interop.signalhub.persister.logging.MdcContextLifter; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import reactor.core.publisher.Hooks; +import reactor.core.publisher.Operators; + +@Configuration +public class ReactorConfiguration { + + + @Bean + public void contextLifterConfiguration() { + Hooks.onEachOperator(MdcContextLifter.class.getSimpleName(), + Operators.lift((sc, sub) -> new ContextLifter<>(sub))); + } +} diff --git a/src/main/java/it/pagopa/interop/signalhub/persister/logging/ContextLifter.java b/src/main/java/it/pagopa/interop/signalhub/persister/logging/ContextLifter.java new file mode 100644 index 0000000..cb1bb17 --- /dev/null +++ b/src/main/java/it/pagopa/interop/signalhub/persister/logging/ContextLifter.java @@ -0,0 +1,55 @@ +package it.pagopa.interop.signalhub.persister.logging; + +import org.reactivestreams.Subscription; +import reactor.core.CoreSubscriber; +import reactor.util.context.Context; + +public class ContextLifter implements CoreSubscriber { + private final CoreSubscriber actualSubscriber; + private final Context context; + + public ContextLifter(CoreSubscriber actualSubscriber) { + this.actualSubscriber = actualSubscriber; + this.context = actualSubscriber.currentContext(); + } + + @Override + public Context currentContext() { + return context; + } + + @Override + public void onSubscribe(Subscription subscription) { + actualSubscriber.onSubscribe(subscription); + } + + @Override + public void onNext(T t) { + MdcContextLifter.setContextToMdc(context); + try { + actualSubscriber.onNext(t); + } finally { + MdcContextLifter.clearMdc(); + } + } + + @Override + public void onError(Throwable throwable) { + MdcContextLifter.setContextToMdc(context); + try { + actualSubscriber.onError(throwable); + } finally { + MdcContextLifter.clearMdc(); + } + } + + @Override + public void onComplete() { + MdcContextLifter.setContextToMdc(context); + try { + actualSubscriber.onComplete(); + } finally { + MdcContextLifter.clearMdc(); + } + } +} diff --git a/src/main/java/it/pagopa/interop/signalhub/persister/logging/MdcContextLifter.java b/src/main/java/it/pagopa/interop/signalhub/persister/logging/MdcContextLifter.java new file mode 100644 index 0000000..8e18356 --- /dev/null +++ b/src/main/java/it/pagopa/interop/signalhub/persister/logging/MdcContextLifter.java @@ -0,0 +1,40 @@ +package it.pagopa.interop.signalhub.persister.logging; + +import org.slf4j.MDC; +import reactor.core.publisher.Signal; +import reactor.util.context.Context; + +import java.util.Map; +import java.util.Optional; +import java.util.function.Consumer; + +import static it.pagopa.interop.signalhub.persister.utils.Const.TRACE_ID_KEY; + +public class MdcContextLifter implements Consumer> { + + @Override + public void accept(Signal signal) { + if (!signal.isOnComplete() && !signal.isOnError()) { + Optional> context = signal.getContextView().stream() + .filter(cxt -> cxt.getKey().equals(TRACE_ID_KEY)) + .findFirst(); + + context.ifPresent(ctx -> MDC.put(TRACE_ID_KEY, (String)ctx.getValue())); + } else { + MDC.clear(); + } + } + + public static void setContextToMdc(Context context) { + context.stream().forEach(entry -> { + if (entry.getKey().equals(TRACE_ID_KEY)){ + MDC.put(TRACE_ID_KEY, (String) entry.getValue()); + } + }); + } + + public static void clearMdc(){ + MDC.clear(); + } + +} diff --git a/src/main/java/it/pagopa/interop/signalhub/persister/queue/consumer/SqsInternalListener.java b/src/main/java/it/pagopa/interop/signalhub/persister/queue/consumer/SqsInternalListener.java index 0562bcd..9290da0 100644 --- a/src/main/java/it/pagopa/interop/signalhub/persister/queue/consumer/SqsInternalListener.java +++ b/src/main/java/it/pagopa/interop/signalhub/persister/queue/consumer/SqsInternalListener.java @@ -12,9 +12,13 @@ import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import reactor.core.publisher.Mono; +import reactor.util.context.Context; + import java.util.Map; import java.util.concurrent.CompletableFuture; +import static it.pagopa.interop.signalhub.persister.utils.Const.TRACE_ID_KEY; + @Slf4j @Component @@ -26,12 +30,13 @@ public class SqsInternalListener { @SqsListener(value = "${aws.internal-queue-name}") public CompletableFuture pullFromAwsInternalQueue(@Payload String node, @Headers Map headers) { - log.info("payloadBody: {}, headers: {}, PullFromInternalQueue received input", node, headers); String correlationId = (String) headers.get(SignalMapper.CORRELATION_ID_HEADER_KEY); return Mono.just(node) + .contextWrite(Context.of(TRACE_ID_KEY, correlationId)) + .doOnNext(json -> log.info("payloadBody: {}, headers: {}, PullFromInternalQueue received input", node, headers)) .map(json -> Utility.jsonToObject(node, SignalEvent.class)) - .map((signalEvent) -> signalMapper.signalEventToSignal(signalEvent, correlationId)) + .map(signalEvent -> signalMapper.signalEventToSignal(signalEvent, correlationId)) .flatMap(signalService::signalServiceFlow) .then() .toFuture(); diff --git a/src/main/java/it/pagopa/interop/signalhub/persister/utils/Const.java b/src/main/java/it/pagopa/interop/signalhub/persister/utils/Const.java new file mode 100644 index 0000000..c117dc5 --- /dev/null +++ b/src/main/java/it/pagopa/interop/signalhub/persister/utils/Const.java @@ -0,0 +1,5 @@ +package it.pagopa.interop.signalhub.persister.utils; + +public class Const { + public static final String TRACE_ID_KEY = "traceId"; +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 9b14208..0af9b1e 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,6 +1,7 @@ logging.level.root=INFO spring.main.web-appplication-type=reactive management.endpoint.health.show-details=always +logging.pattern.level="%2p [%X{traceId:-}]" # Disable auto cloud formation cloud.aws.stack.auto=false diff --git a/src/test/java/it/pagopa/interop/signalhub/persister/mapper/SignalMapperTest.java b/src/test/java/it/pagopa/interop/signalhub/persister/mapper/SignalMapperTest.java index 45af576..b3ecad0 100644 --- a/src/test/java/it/pagopa/interop/signalhub/persister/mapper/SignalMapperTest.java +++ b/src/test/java/it/pagopa/interop/signalhub/persister/mapper/SignalMapperTest.java @@ -16,7 +16,6 @@ class SignalMapperTest { private String correlationId; private String eserviceId; private String objectType; - private Long indexSignal; private String signalType; @@ -32,9 +31,9 @@ void signalToSignalEventTest() { assertNotNull(signalEvent); assertEquals(signalEvent.getObjectId(), this.objectId); assertEquals(signalEvent.getEserviceId(), this.eserviceId); - assertEquals(signalEvent.getSignalType(), SignalType.CREATE); + assertEquals(SignalType.CREATE, signalEvent.getSignalType()); assertEquals(signalEvent.getObjectType(), this.objectType); - assertEquals(signalEvent.getIndexSignal(), this.indexSignal); + assertEquals(signalEvent.getSignalId(), this.signalId); } @Test @@ -57,7 +56,7 @@ void signalEventToSignalTest() { SignalEvent signalEvent = getSignalEvent(); Signal signal = signalMapper.signalEventToSignal(signalEvent, this.correlationId); assertNotNull(signal); - assertEquals(signal.getSignalId(), this.indexSignal); + assertEquals(signal.getSignalId(), this.signalId); assertEquals(signal.getObjectId(), this.objectId); assertEquals(signal.getEserviceId(), this.eserviceId); assertEquals(signal.getCorrelationId(), this.correlationId); @@ -76,7 +75,7 @@ void signalEventToSignalNullCaseTest() { SignalEvent signalEvent = getSignalEvent(); signal = signalMapper.signalEventToSignal(signalEvent, null); assertNotNull(signal); - assertEquals(signal.getSignalId(), this.indexSignal); + assertEquals(signal.getSignalId(), this.signalId); assertEquals(signal.getObjectId(), this.objectId); assertEquals(signal.getEserviceId(), this.eserviceId); assertEquals(signal.getObjectType(), this.objectType); @@ -108,7 +107,7 @@ private SignalEvent getSignalEvent() { signalEvent.setObjectType(this.objectType); signalEvent.setEserviceId(this.eserviceId); signalEvent.setObjectId(this.objectId); - signalEvent.setIndexSignal(this.indexSignal); + signalEvent.setSignalId(this.signalId); return signalEvent; } @@ -118,7 +117,6 @@ private void setUp() { this.correlationId = "0A"; this.objectType = "ESERVICE"; this.eserviceId = "OBJ1"; - this.indexSignal = 0L; this.signalType = SignalType.CREATE.toString(); this.signalMapper = Mappers.getMapper(SignalMapper.class); } diff --git a/src/test/java/it/pagopa/interop/signalhub/persister/queue/consumer/SqsInternalListenerTest.java b/src/test/java/it/pagopa/interop/signalhub/persister/queue/consumer/SqsInternalListenerTest.java index cf0370f..a692ec0 100644 --- a/src/test/java/it/pagopa/interop/signalhub/persister/queue/consumer/SqsInternalListenerTest.java +++ b/src/test/java/it/pagopa/interop/signalhub/persister/queue/consumer/SqsInternalListenerTest.java @@ -37,7 +37,7 @@ class SqsInternalListenerTest { private String correlationId; private String eserviceId; private String objectType; - private Long indexSignal; + private Long signalId; @BeforeEach @@ -160,7 +160,7 @@ private SignalEvent getSignalEvent() { signalEvent.setObjectType(this.objectType); signalEvent.setEserviceId(this.eserviceId); signalEvent.setObjectId(this.objectId); - signalEvent.setIndexSignal(this.indexSignal); + signalEvent.setSignalId(this.signalId); return signalEvent; } @@ -169,6 +169,6 @@ private void setUp() { this.correlationId = "0A"; this.eserviceId = "OBJ1"; this.objectType = "ESERVICE"; - this.indexSignal = 0L; + this.signalId = 0L; } } \ No newline at end of file diff --git a/src/test/java/it/pagopa/interop/signalhub/persister/utility/UtilityTest.java b/src/test/java/it/pagopa/interop/signalhub/persister/utility/UtilityTest.java index 9001abb..ec633d2 100644 --- a/src/test/java/it/pagopa/interop/signalhub/persister/utility/UtilityTest.java +++ b/src/test/java/it/pagopa/interop/signalhub/persister/utility/UtilityTest.java @@ -31,7 +31,7 @@ void jsonToObjectTest() { assertEquals("OBJ1", signalEvent.getObjectId()); assertEquals("T1", signalEvent.getObjectType()); assertEquals("E1", signalEvent.getEserviceId()); - assertEquals("1", signalEvent.getIndexSignal().toString()); + assertEquals("1", signalEvent.getSignalId().toString()); } @Test diff --git a/src/test/resources/schema-h2.sql b/src/test/resources/schema-h2.sql index d5cccba..6572b2a 100644 --- a/src/test/resources/schema-h2.sql +++ b/src/test/resources/schema-h2.sql @@ -1,86 +1,73 @@ CREATE TABLE IF NOT EXISTS ESERVICE ( - eservice_id VARCHAR (50) NOT NULL, - producer_id VARCHAR (50) NOT NULL, - descriptor_id VARCHAR (50) NOT NULL, - event_id BIGINT, - state VARCHAR (50) NOT NULL, - tmst_insert TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - tmst_last_edit TIMESTAMP, - UNIQUE (eservice_id, producer_id, descriptor_id), - PRIMARY KEY (eservice_id, producer_id, descriptor_id) + eservice_id VARCHAR (255) NOT NULL, + producer_id VARCHAR (255) NOT NULL, + descriptor_id VARCHAR (255) NOT NULL, + event_id BIGINT, + state VARCHAR (255) NOT NULL, + tmst_insert TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + tmst_last_edit TIMESTAMP, + UNIQUE (eservice_id, producer_id, descriptor_id), + PRIMARY KEY (eservice_id, producer_id, descriptor_id) ); -/* -CREATE INDEX IF NOT EXISTS ESERVICE_INDEX_ID ON ESERVICE USING hash (eservice_id); -CREATE INDEX IF NOT EXISTS ESERVICE_INDEX_PRODUCER_ID ON ESERVICE USING hash (producer_id); -*/ - CREATE TABLE IF NOT EXISTS CONSUMER_ESERVICE ( - agreement_id VARCHAR (50) NOT NULL, - eservice_id VARCHAR (50) NOT NULL, - consumer_id VARCHAR (50) NOT NULL, - descriptor_id VARCHAR (50) NOT NULL, - event_id BIGINT, - state VARCHAR (50) NOT NULL, - tmst_insert TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - tmst_last_edit TIMESTAMP, - UNIQUE (eservice_id, consumer_id, descriptor_id), - PRIMARY KEY (eservice_id, consumer_id, descriptor_id) + agreement_id VARCHAR (255) NOT NULL, + eservice_id VARCHAR (255) NOT NULL, + consumer_id VARCHAR (255) NOT NULL, + descriptor_id VARCHAR (255) NOT NULL, + event_id BIGINT, + state VARCHAR (255) NOT NULL, + tmst_insert TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + tmst_last_edit TIMESTAMP, + UNIQUE (eservice_id, consumer_id, descriptor_id), + PRIMARY KEY (eservice_id, consumer_id, descriptor_id) ); -/* -CREATE INDEX IF NOT EXISTS CONSUMER_ESERVICE_INDEX_ID ON CONSUMER_ESERVICE USING hash (eservice_id); -CREATE INDEX IF NOT EXISTS CONSUMER_ESERVICE_INDEX_CONSUMER_ID ON CONSUMER_ESERVICE USING hash (consumer_id); -CREATE INDEX IF NOT EXISTS CONSUMER_ESERVICE_INDEX_DESCRIPTOR_ID ON CONSUMER_ESERVICE USING hash (descriptor_id); -*/ CREATE TABLE IF NOT EXISTS SIGNAL ( - id SERIAL PRIMARY KEY, - correlation_id VARCHAR(50) NOT NULL, - signal_id BIGINT NOT NULL, - object_id VARCHAR (50) NOT NULL, - eservice_id VARCHAR (50) NOT NULL, - object_type VARCHAR (50) NOT NULL, - signal_type VARCHAR (50) NOT NULL, - tmst_insert TIMESTAMP NOT NULL, - UNIQUE (signal_id, eservice_id) + id SERIAL PRIMARY KEY, + correlation_id VARCHAR(255) NOT NULL, + signal_id BIGINT NOT NULL, + object_id VARCHAR (255) NOT NULL, + eservice_id VARCHAR (255) NOT NULL, + object_type VARCHAR (255) NOT NULL, + signal_type VARCHAR (255) NOT NULL, + tmst_insert TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + UNIQUE (signal_id, eservice_id) ); -/* H2 not support USING constructor -CREATE INDEX IF NOT EXISTS SIGNAL_INDEX_SIGNAL_ID ON SIGNAL USING hash (signal_id); -CREATE INDEX IF NOT EXISTS SIGNAL_INDEX_ESERVICE_ID ON SIGNAL USING hash (eservice_id); -*/ CREATE TABLE IF NOT EXISTS DEAD_SIGNAL ( - id SERIAL PRIMARY KEY, - correlation_id VARCHAR(50) NOT NULL, - signal_id BIGINT NOT NULL, - object_id VARCHAR (50) NOT NULL, - eservice_id VARCHAR (50) NOT NULL, - object_type VARCHAR (50) NOT NULL, - signal_type VARCHAR (50) NOT NULL, - tmst_insert TIMESTAMP NOT NULL, - error_reason VARCHAR(255) NOT NULL + id SERIAL PRIMARY KEY, + correlation_id VARCHAR(255) NOT NULL, + signal_id BIGINT NOT NULL, + object_id VARCHAR (255) NOT NULL, + eservice_id VARCHAR (255) NOT NULL, + object_type VARCHAR (255) NOT NULL, + signal_type VARCHAR (255) NOT NULL, + tmst_insert TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + error_reason VARCHAR(255) NOT NULL ); CREATE TABLE IF NOT EXISTS TRACING_BATCH ( - batch_id SERIAL PRIMARY KEY, - state VARCHAR (50) NOT NULL, - last_event_id BIGINT, - tmst_created TIMESTAMP NOT NULL + batch_id SERIAL PRIMARY KEY, + state VARCHAR (255) NOT NULL, + type VARCHAR (50) NOT NULL, + last_event_id BIGINT, + tmst_created TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS DEAD_EVENT ( - event_tmp_id SERIAL PRIMARY KEY, - tmst_insert TIMESTAMP, - error_reason VARCHAR(255) NOT NULL, - event_id BIGINT NOT NULL, - event_type VARCHAR (50) NOT NULL, - object_type VARCHAR (50) NOT NULL, - descriptor_id VARCHAR (50), - eservice_id VARCHAR (50), - agreement_id VARCHAR (50) + event_tmp_id SERIAL PRIMARY KEY, + tmst_insert TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + error_reason VARCHAR(255) NOT NULL, + event_id BIGINT NOT NULL, + event_type VARCHAR (255) NOT NULL, + object_type VARCHAR (255) NOT NULL, + descriptor_id VARCHAR (255), + eservice_id VARCHAR (255), + agreement_id VARCHAR (255) ); \ No newline at end of file