Skip to content

Commit

Permalink
addded tracing log
Browse files Browse the repository at this point in the history
  • Loading branch information
CriMDev97 committed Nov 30, 2023
1 parent ab9d88e commit 0dcb5d0
Show file tree
Hide file tree
Showing 11 changed files with 189 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFac
return SqsMessageListenerContainerFactory
.builder()
.configure(options -> options
.acknowledgementMode(AcknowledgementMode.ON_SUCCESS)
//.acknowledgementMode(AcknowledgementMode.ON_SUCCESS)
.maxConcurrentMessages(10)
.maxMessagesPerPoll(10))
.sqsAsyncClient(sqsAsyncClient())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package it.pagopa.interop.signalhub.persister.config;


import it.pagopa.interop.signalhub.persister.logging.ContextLifter;
import it.pagopa.interop.signalhub.persister.logging.MdcContextLifter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Operators;

@Configuration
public class ReactorConfiguration {


@Bean
public void contextLifterConfiguration() {
Hooks.onEachOperator(MdcContextLifter.class.getSimpleName(),
Operators.lift((sc, sub) -> new ContextLifter<>(sub)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package it.pagopa.interop.signalhub.persister.logging;

import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.util.context.Context;

public class ContextLifter<T> implements CoreSubscriber<T> {
private final CoreSubscriber<T> actualSubscriber;
private final Context context;

public ContextLifter(CoreSubscriber<T> actualSubscriber) {
this.actualSubscriber = actualSubscriber;
this.context = actualSubscriber.currentContext();
}

@Override
public Context currentContext() {
return context;
}

@Override
public void onSubscribe(Subscription subscription) {
actualSubscriber.onSubscribe(subscription);
}

@Override
public void onNext(T t) {
MdcContextLifter.setContextToMdc(context);
try {
actualSubscriber.onNext(t);
} finally {
MdcContextLifter.clearMdc();
}
}

@Override
public void onError(Throwable throwable) {
MdcContextLifter.setContextToMdc(context);
try {
actualSubscriber.onError(throwable);
} finally {
MdcContextLifter.clearMdc();
}
}

@Override
public void onComplete() {
MdcContextLifter.setContextToMdc(context);
try {
actualSubscriber.onComplete();
} finally {
MdcContextLifter.clearMdc();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package it.pagopa.interop.signalhub.persister.logging;

import org.slf4j.MDC;
import reactor.core.publisher.Signal;
import reactor.util.context.Context;

import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;

import static it.pagopa.interop.signalhub.persister.utils.Const.TRACE_ID_KEY;

public class MdcContextLifter implements Consumer<Signal<?>> {

@Override
public void accept(Signal<?> signal) {
if (!signal.isOnComplete() && !signal.isOnError()) {
Optional<Map.Entry<Object, Object>> context = signal.getContextView().stream()
.filter(cxt -> cxt.getKey().equals(TRACE_ID_KEY))
.findFirst();

context.ifPresent(ctx -> MDC.put(TRACE_ID_KEY, (String)ctx.getValue()));
} else {
MDC.clear();
}
}

public static void setContextToMdc(Context context) {
context.stream().forEach(entry -> {
if (entry.getKey().equals(TRACE_ID_KEY)){
MDC.put(TRACE_ID_KEY, (String) entry.getValue());
}
});
}

public static void clearMdc(){
MDC.clear();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;

import java.util.Map;
import java.util.concurrent.CompletableFuture;

import static it.pagopa.interop.signalhub.persister.utils.Const.TRACE_ID_KEY;


@Slf4j
@Component
Expand All @@ -26,12 +30,13 @@ public class SqsInternalListener {

@SqsListener(value = "${aws.internal-queue-name}")
public CompletableFuture<Void> pullFromAwsInternalQueue(@Payload String node, @Headers Map<String, Object> headers) {
log.info("payloadBody: {}, headers: {}, PullFromInternalQueue received input", node, headers);
String correlationId = (String) headers.get(SignalMapper.CORRELATION_ID_HEADER_KEY);

return Mono.just(node)
.contextWrite(Context.of(TRACE_ID_KEY, correlationId))
.doOnNext(json -> log.info("payloadBody: {}, headers: {}, PullFromInternalQueue received input", node, headers))
.map(json -> Utility.jsonToObject(node, SignalEvent.class))
.map((signalEvent) -> signalMapper.signalEventToSignal(signalEvent, correlationId))
.map(signalEvent -> signalMapper.signalEventToSignal(signalEvent, correlationId))
.flatMap(signalService::signalServiceFlow)
.then()
.toFuture();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package it.pagopa.interop.signalhub.persister.utils;

public class Const {
public static final String TRACE_ID_KEY = "traceId";
}
1 change: 1 addition & 0 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
logging.level.root=INFO
spring.main.web-appplication-type=reactive
management.endpoint.health.show-details=always
logging.pattern.level="%2p [%X{traceId:-}]"

# Disable auto cloud formation
cloud.aws.stack.auto=false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ class SignalMapperTest {
private String correlationId;
private String eserviceId;
private String objectType;
private Long indexSignal;
private String signalType;


Expand All @@ -32,9 +31,9 @@ void signalToSignalEventTest() {
assertNotNull(signalEvent);
assertEquals(signalEvent.getObjectId(), this.objectId);
assertEquals(signalEvent.getEserviceId(), this.eserviceId);
assertEquals(signalEvent.getSignalType(), SignalType.CREATE);
assertEquals(SignalType.CREATE, signalEvent.getSignalType());
assertEquals(signalEvent.getObjectType(), this.objectType);
assertEquals(signalEvent.getIndexSignal(), this.indexSignal);
assertEquals(signalEvent.getSignalId(), this.signalId);
}

@Test
Expand All @@ -57,7 +56,7 @@ void signalEventToSignalTest() {
SignalEvent signalEvent = getSignalEvent();
Signal signal = signalMapper.signalEventToSignal(signalEvent, this.correlationId);
assertNotNull(signal);
assertEquals(signal.getSignalId(), this.indexSignal);
assertEquals(signal.getSignalId(), this.signalId);
assertEquals(signal.getObjectId(), this.objectId);
assertEquals(signal.getEserviceId(), this.eserviceId);
assertEquals(signal.getCorrelationId(), this.correlationId);
Expand All @@ -76,7 +75,7 @@ void signalEventToSignalNullCaseTest() {
SignalEvent signalEvent = getSignalEvent();
signal = signalMapper.signalEventToSignal(signalEvent, null);
assertNotNull(signal);
assertEquals(signal.getSignalId(), this.indexSignal);
assertEquals(signal.getSignalId(), this.signalId);
assertEquals(signal.getObjectId(), this.objectId);
assertEquals(signal.getEserviceId(), this.eserviceId);
assertEquals(signal.getObjectType(), this.objectType);
Expand Down Expand Up @@ -108,7 +107,7 @@ private SignalEvent getSignalEvent() {
signalEvent.setObjectType(this.objectType);
signalEvent.setEserviceId(this.eserviceId);
signalEvent.setObjectId(this.objectId);
signalEvent.setIndexSignal(this.indexSignal);
signalEvent.setSignalId(this.signalId);
return signalEvent;
}

Expand All @@ -118,7 +117,6 @@ private void setUp() {
this.correlationId = "0A";
this.objectType = "ESERVICE";
this.eserviceId = "OBJ1";
this.indexSignal = 0L;
this.signalType = SignalType.CREATE.toString();
this.signalMapper = Mappers.getMapper(SignalMapper.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class SqsInternalListenerTest {
private String correlationId;
private String eserviceId;
private String objectType;
private Long indexSignal;
private Long signalId;


@BeforeEach
Expand Down Expand Up @@ -160,7 +160,7 @@ private SignalEvent getSignalEvent() {
signalEvent.setObjectType(this.objectType);
signalEvent.setEserviceId(this.eserviceId);
signalEvent.setObjectId(this.objectId);
signalEvent.setIndexSignal(this.indexSignal);
signalEvent.setSignalId(this.signalId);
return signalEvent;
}

Expand All @@ -169,6 +169,6 @@ private void setUp() {
this.correlationId = "0A";
this.eserviceId = "OBJ1";
this.objectType = "ESERVICE";
this.indexSignal = 0L;
this.signalId = 0L;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ void jsonToObjectTest() {
assertEquals("OBJ1", signalEvent.getObjectId());
assertEquals("T1", signalEvent.getObjectType());
assertEquals("E1", signalEvent.getEserviceId());
assertEquals("1", signalEvent.getIndexSignal().toString());
assertEquals("1", signalEvent.getSignalId().toString());
}

@Test
Expand Down
Loading

0 comments on commit 0dcb5d0

Please sign in to comment.