From a1953db6232db6e5575106343a20f1597cdfd322 Mon Sep 17 00:00:00 2001 From: Simon Schneider <10846939+raynigon@users.noreply.github.com> Date: Fri, 9 Jun 2023 09:14:42 +0200 Subject: [PATCH 1/3] Add Logging for Kafka --- .../ecs/logging/LoggingConstants.java | 4 ++ ecs-logging-kafka/build.gradle | 12 +++++ .../kafka/ApplicationNameProvider.java | 26 ++++++++++ .../consumer/EcsConsumerInterceptor.java | 46 +++++++++++++++++ .../kafka/producer/EcsProducerConfigs.java | 8 +++ .../producer/EcsProducerInterceptor.java | 51 +++++++++++++++++++ ...ot.autoconfigure.AutoConfiguration.imports | 1 + settings.gradle | 1 + 8 files changed, 149 insertions(+) create mode 100644 ecs-logging-kafka/build.gradle create mode 100644 ecs-logging-kafka/src/main/java/com/raynigon/ecs/logging/kafka/ApplicationNameProvider.java create mode 100644 ecs-logging-kafka/src/main/java/com/raynigon/ecs/logging/kafka/consumer/EcsConsumerInterceptor.java create mode 100644 ecs-logging-kafka/src/main/java/com/raynigon/ecs/logging/kafka/producer/EcsProducerConfigs.java create mode 100644 ecs-logging-kafka/src/main/java/com/raynigon/ecs/logging/kafka/producer/EcsProducerInterceptor.java create mode 100644 ecs-logging-kafka/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports diff --git a/ecs-logging-base/src/main/java/com/raynigon/ecs/logging/LoggingConstants.java b/ecs-logging-base/src/main/java/com/raynigon/ecs/logging/LoggingConstants.java index a6454aa..665670e 100644 --- a/ecs-logging-base/src/main/java/com/raynigon/ecs/logging/LoggingConstants.java +++ b/ecs-logging-base/src/main/java/com/raynigon/ecs/logging/LoggingConstants.java @@ -11,4 +11,8 @@ public class LoggingConstants { public static final String TRANSACTION_ID_PROPERTY = "TRANSACTION_ID"; public static final String SESSION_ID_PROPERTY = "SESSION_ID"; + public static final String KAFKA_PRODUCER_NAME_HEADER = "x-producer"; + public static final String KAFKA_TRANSACTION_ID_HEADER = "x-request-id"; + public static final String KAFKA_TOPIC_PROPERTY = "KAFKA_TOPIC"; + public static final String KAFKA_KAFKA_KEY_PROPERTY = "KAFKA_KEY"; } diff --git a/ecs-logging-kafka/build.gradle b/ecs-logging-kafka/build.gradle new file mode 100644 index 0000000..71cc71c --- /dev/null +++ b/ecs-logging-kafka/build.gradle @@ -0,0 +1,12 @@ +dependencies { + implementation project(':ecs-logging-base') + implementation("org.slf4j:slf4j-api") + implementation("jakarta.annotation:jakarta.annotation-api") + compileOnly("org.springframework.kafka:spring-kafka") + + testImplementation("org.springframework.kafka:spring-kafka") + testImplementation("org.springframework.kafka:spring-kafka-test") + testImplementation("org.testcontainers:junit-jupiter") + testImplementation("org.testcontainers:spock") + testImplementation("org.testcontainers:kafka") +} \ No newline at end of file diff --git a/ecs-logging-kafka/src/main/java/com/raynigon/ecs/logging/kafka/ApplicationNameProvider.java b/ecs-logging-kafka/src/main/java/com/raynigon/ecs/logging/kafka/ApplicationNameProvider.java new file mode 100644 index 0000000..f260aab --- /dev/null +++ b/ecs-logging-kafka/src/main/java/com/raynigon/ecs/logging/kafka/ApplicationNameProvider.java @@ -0,0 +1,26 @@ +package com.raynigon.ecs.logging.kafka; + + +import jakarta.annotation.PostConstruct; +import lombok.Getter; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + + +@Component +public class ApplicationNameProvider { + + private static String sApplicationName = "unknown"; + + @Value("${spring.application.name}") + private String applicationName; + + @PostConstruct + void init(){ + sApplicationName = applicationName; + } + + public static String getApplicationName(){ + return sApplicationName; + } +} diff --git a/ecs-logging-kafka/src/main/java/com/raynigon/ecs/logging/kafka/consumer/EcsConsumerInterceptor.java b/ecs-logging-kafka/src/main/java/com/raynigon/ecs/logging/kafka/consumer/EcsConsumerInterceptor.java new file mode 100644 index 0000000..53aa767 --- /dev/null +++ b/ecs-logging-kafka/src/main/java/com/raynigon/ecs/logging/kafka/consumer/EcsConsumerInterceptor.java @@ -0,0 +1,46 @@ +package com.raynigon.ecs.logging.kafka.consumer; + +import lombok.SneakyThrows; +import org.apache.kafka.clients.consumer.ConsumerInterceptor; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.slf4j.MDC; + +import java.util.Map; +import java.util.UUID; + +import static com.raynigon.ecs.logging.LoggingConstants.*; + +public class EcsConsumerInterceptor implements ConsumerInterceptor { + + + @Override + @SneakyThrows + public void configure(Map configs) {} + + @Override + public void close() {} + + @Override + public void onCommit(Map offsets) { + MDC.remove(TRANSACTION_ID_PROPERTY); + MDC.remove(KAFKA_TOPIC_PROPERTY); + MDC.remove(KAFKA_KAFKA_KEY_PROPERTY); + } + + @Override + public ConsumerRecords onConsume(ConsumerRecords records) { + // No detailed logging is possible for multiple records + if (records.count() > 1) { + MDC.put(TRANSACTION_ID_PROPERTY, UUID.randomUUID().toString()); + return records; + } + ConsumerRecord record = records.iterator().next(); + // Add MDC Tags and debug log for traceability + MDC.put(KAFKA_TOPIC_PROPERTY, record.topic()); + MDC.put(KAFKA_KAFKA_KEY_PROPERTY, record.key().toString()); + return records; + } +} diff --git a/ecs-logging-kafka/src/main/java/com/raynigon/ecs/logging/kafka/producer/EcsProducerConfigs.java b/ecs-logging-kafka/src/main/java/com/raynigon/ecs/logging/kafka/producer/EcsProducerConfigs.java new file mode 100644 index 0000000..0622c5d --- /dev/null +++ b/ecs-logging-kafka/src/main/java/com/raynigon/ecs/logging/kafka/producer/EcsProducerConfigs.java @@ -0,0 +1,8 @@ +package com.raynigon.ecs.logging.kafka.producer; + +public class EcsProducerConfigs { + + public static final String PRODUCER_NAME_CONFIG = "ecs.producer.name"; + public static final String PRODUCER_NAME_DOC = "The name of the producer application which will be automatically inserted as header. If no name is given, the name is guessed from the environment."; + +} diff --git a/ecs-logging-kafka/src/main/java/com/raynigon/ecs/logging/kafka/producer/EcsProducerInterceptor.java b/ecs-logging-kafka/src/main/java/com/raynigon/ecs/logging/kafka/producer/EcsProducerInterceptor.java new file mode 100644 index 0000000..1427829 --- /dev/null +++ b/ecs-logging-kafka/src/main/java/com/raynigon/ecs/logging/kafka/producer/EcsProducerInterceptor.java @@ -0,0 +1,51 @@ +package com.raynigon.ecs.logging.kafka.producer; + + +import com.raynigon.ecs.logging.LoggingConstants; +import com.raynigon.ecs.logging.kafka.ApplicationNameProvider; +import org.apache.kafka.clients.producer.ProducerInterceptor; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.header.Headers; +import org.slf4j.MDC; + +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.UUID; + +public class EcsProducerInterceptor implements ProducerInterceptor { + + private String producerName = "unknown"; + + @Override + public void configure(Map configs) { + Object value = configs.get(EcsProducerConfigs.PRODUCER_NAME_CONFIG); + if (value instanceof String) { + producerName = (String) value; + return; + } + producerName = ApplicationNameProvider.getApplicationName(); + } + + @Override + public void close() { + // Nothing has to be done here + } + + @Override + public ProducerRecord onSend(ProducerRecord record) { + String transactionId = MDC.get(LoggingConstants.TRANSACTION_ID_PROPERTY); + if (transactionId == null) { + transactionId = UUID.randomUUID().toString(); + } + Headers headers = record.headers(); + headers.add(LoggingConstants.KAFKA_PRODUCER_NAME_HEADER, producerName.getBytes(StandardCharsets.UTF_8)); + headers.add(LoggingConstants.KAFKA_TRANSACTION_ID_HEADER, transactionId.getBytes(StandardCharsets.UTF_8)); + return record; + } + + @Override + public void onAcknowledgement(RecordMetadata metadata, Exception exception) { + // Nothing has to be done here + } +} diff --git a/ecs-logging-kafka/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/ecs-logging-kafka/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 0000000..d1b668b --- /dev/null +++ b/ecs-logging-kafka/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1 @@ +com.raynigon.ecs.logging.kafka.ApplicationNameProvider \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 1033c0e..67ed8f5 100644 --- a/settings.gradle +++ b/settings.gradle @@ -5,4 +5,5 @@ include 'ecs-logging-app' include 'ecs-logging-access' include 'ecs-logging-async' include 'ecs-logging-audit' +include 'ecs-logging-kafka' include 'gzip-request-filter-starter' \ No newline at end of file From 335acf8d17b07f829c4c4e3676d71de0805450a6 Mon Sep 17 00:00:00 2001 From: Simon Schneider <10846939+raynigon@users.noreply.github.com> Date: Fri, 9 Jun 2023 09:17:57 +0200 Subject: [PATCH 2/3] Update Readme --- README.md | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index ab6e199..c6131e8 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,5 @@ # Raynigon Spring Boot Libraries + [![Codacy Badge](https://api.codacy.com/project/badge/Grade/967b93564acf4b55811b08d9544b44a6)](https://app.codacy.com/gh/raynigon/spring-boot?utm_source=github.com&utm_medium=referral&utm_content=raynigon/spring-boot&utm_campaign=Badge_Grade_Settings) [![Codacy Badge](https://app.codacy.com/project/badge/Coverage/0764696c15a941c78bef58fef5082d06)](https://www.codacy.com/gh/raynigon/spring-boot/dashboard?utm_source=github.com&utm_medium=referral&utm_content=raynigon/spring-boot&utm_campaign=Badge_Coverage) [![Maven Central](https://maven-badges.herokuapp.com/maven-central/com.raynigon.spring-boot/gzip-request-filter-starter/badge.svg)](https://search.maven.org/search?q=com.raynigon.spring-boot) @@ -6,22 +7,35 @@ This repository contains some usefull libraries which can enhance your spring boot experience. ## ECS Logging -The ecs-logging-* libraries provide the functionality to log messages as json documents in the [ECS format](https://www.elastic.co/guide/en/ecs/current/index.html). + +The ecs-logging-* libraries provide the functionality to log messages as json documents in +the [ECS format](https://www.elastic.co/guide/en/ecs/current/index.html). ### ECS Logging - App + This library provides application logs in ECS format. ### ECS Logging - Access + This library provides access log functionality for tomcat. ### ECS Logging - Audit + This library provides audit log functionality for the application. These loggers need to be managed manually. ### ECS Logging - Async + This library provides the functionality of the ECS application logs, for asynchronous processes (e.g. futures, or @Async annotated methods). +### ECS Logging - Kafka + +This library enhances the application logs by adding MDC attributes for incoming messages. +This works best for single record polls, but batch processing for consumers is also supported. +The producer sets the transaction id and the producer name on the record headers. + ## Gzip Request Filter + The gzip-request-filter enabled the processing of gzip compressed requests. E.g. Post requests with much data. From ec3b5da44717422e8dced0d7de1fa7f22fe96d8c Mon Sep 17 00:00:00 2001 From: Simon Schneider <10846939+raynigon@users.noreply.github.com> Date: Fri, 9 Jun 2023 09:26:25 +0200 Subject: [PATCH 3/3] Fix Format --- .../ecs/logging/kafka/ApplicationNameProvider.java | 1 - .../logging/kafka/consumer/EcsConsumerInterceptor.java | 8 ++++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/ecs-logging-kafka/src/main/java/com/raynigon/ecs/logging/kafka/ApplicationNameProvider.java b/ecs-logging-kafka/src/main/java/com/raynigon/ecs/logging/kafka/ApplicationNameProvider.java index f260aab..43077d2 100644 --- a/ecs-logging-kafka/src/main/java/com/raynigon/ecs/logging/kafka/ApplicationNameProvider.java +++ b/ecs-logging-kafka/src/main/java/com/raynigon/ecs/logging/kafka/ApplicationNameProvider.java @@ -2,7 +2,6 @@ import jakarta.annotation.PostConstruct; -import lombok.Getter; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; diff --git a/ecs-logging-kafka/src/main/java/com/raynigon/ecs/logging/kafka/consumer/EcsConsumerInterceptor.java b/ecs-logging-kafka/src/main/java/com/raynigon/ecs/logging/kafka/consumer/EcsConsumerInterceptor.java index 53aa767..f70439f 100644 --- a/ecs-logging-kafka/src/main/java/com/raynigon/ecs/logging/kafka/consumer/EcsConsumerInterceptor.java +++ b/ecs-logging-kafka/src/main/java/com/raynigon/ecs/logging/kafka/consumer/EcsConsumerInterceptor.java @@ -18,10 +18,14 @@ public class EcsConsumerInterceptor implements ConsumerInterceptor { @Override @SneakyThrows - public void configure(Map configs) {} + public void configure(Map configs) { + // Nothing has to be done here + } @Override - public void close() {} + public void close() { + // Nothing has to be done here + } @Override public void onCommit(Map offsets) {