Skip to content

Commit

Permalink
Merge pull request #225 from raynigon/feature/kafka-logging
Browse files Browse the repository at this point in the history
Add Logging for Kafka
  • Loading branch information
raynigon authored Jun 9, 2023
2 parents a2e26ea + ec3b5da commit 84ab4c7
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 1 deletion.
16 changes: 15 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,27 +1,41 @@
# Raynigon Spring Boot Libraries

[![Codacy Badge](https://api.codacy.com/project/badge/Grade/967b93564acf4b55811b08d9544b44a6)](https://app.codacy.com/gh/raynigon/spring-boot?utm_source=github.com&utm_medium=referral&utm_content=raynigon/spring-boot&utm_campaign=Badge_Grade_Settings)
[![Codacy Badge](https://app.codacy.com/project/badge/Coverage/0764696c15a941c78bef58fef5082d06)](https://www.codacy.com/gh/raynigon/spring-boot/dashboard?utm_source=github.com&utm_medium=referral&utm_content=raynigon/spring-boot&utm_campaign=Badge_Coverage)
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/com.raynigon.spring-boot/gzip-request-filter-starter/badge.svg)](https://search.maven.org/search?q=com.raynigon.spring-boot)

This repository contains some usefull libraries which can enhance your spring boot experience.

## ECS Logging
The ecs-logging-* libraries provide the functionality to log messages as json documents in the [ECS format](https://www.elastic.co/guide/en/ecs/current/index.html).

The ecs-logging-* libraries provide the functionality to log messages as json documents in
the [ECS format](https://www.elastic.co/guide/en/ecs/current/index.html).

### ECS Logging - App

This library provides application logs in ECS format.

### ECS Logging - Access

This library provides access log functionality for tomcat.

### ECS Logging - Audit

This library provides audit log functionality for the application.
These loggers need to be managed manually.

### ECS Logging - Async

This library provides the functionality of the ECS application logs,
for asynchronous processes (e.g. futures, or @Async annotated methods).

### ECS Logging - Kafka

This library enhances the application logs by adding MDC attributes for incoming messages.
This works best for single record polls, but batch processing for consumers is also supported.
The producer sets the transaction id and the producer name on the record headers.

## Gzip Request Filter

The gzip-request-filter enabled the processing of gzip compressed requests.
E.g. Post requests with much data.
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,8 @@ public class LoggingConstants {
public static final String TRANSACTION_ID_PROPERTY = "TRANSACTION_ID";
public static final String SESSION_ID_PROPERTY = "SESSION_ID";

public static final String KAFKA_PRODUCER_NAME_HEADER = "x-producer";
public static final String KAFKA_TRANSACTION_ID_HEADER = "x-request-id";
public static final String KAFKA_TOPIC_PROPERTY = "KAFKA_TOPIC";
public static final String KAFKA_KAFKA_KEY_PROPERTY = "KAFKA_KEY";
}
12 changes: 12 additions & 0 deletions ecs-logging-kafka/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
dependencies {
implementation project(':ecs-logging-base')
implementation("org.slf4j:slf4j-api")
implementation("jakarta.annotation:jakarta.annotation-api")
compileOnly("org.springframework.kafka:spring-kafka")

testImplementation("org.springframework.kafka:spring-kafka")
testImplementation("org.springframework.kafka:spring-kafka-test")
testImplementation("org.testcontainers:junit-jupiter")
testImplementation("org.testcontainers:spock")
testImplementation("org.testcontainers:kafka")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.raynigon.ecs.logging.kafka;


import jakarta.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;


@Component
public class ApplicationNameProvider {

private static String sApplicationName = "unknown";

@Value("${spring.application.name}")
private String applicationName;

@PostConstruct
void init(){
sApplicationName = applicationName;
}

public static String getApplicationName(){
return sApplicationName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.raynigon.ecs.logging.kafka.consumer;

import lombok.SneakyThrows;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.MDC;

import java.util.Map;
import java.util.UUID;

import static com.raynigon.ecs.logging.LoggingConstants.*;

public class EcsConsumerInterceptor<K, V> implements ConsumerInterceptor<K, V> {


@Override
@SneakyThrows
public void configure(Map<String, ?> configs) {
// Nothing has to be done here
}

@Override
public void close() {
// Nothing has to be done here
}

@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
MDC.remove(TRANSACTION_ID_PROPERTY);
MDC.remove(KAFKA_TOPIC_PROPERTY);
MDC.remove(KAFKA_KAFKA_KEY_PROPERTY);
}

@Override
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
// No detailed logging is possible for multiple records
if (records.count() > 1) {
MDC.put(TRANSACTION_ID_PROPERTY, UUID.randomUUID().toString());
return records;
}
ConsumerRecord<K, V> record = records.iterator().next();
// Add MDC Tags and debug log for traceability
MDC.put(KAFKA_TOPIC_PROPERTY, record.topic());
MDC.put(KAFKA_KAFKA_KEY_PROPERTY, record.key().toString());
return records;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.raynigon.ecs.logging.kafka.producer;

public class EcsProducerConfigs {

public static final String PRODUCER_NAME_CONFIG = "ecs.producer.name";
public static final String PRODUCER_NAME_DOC = "The name of the producer application which will be automatically inserted as header. If no name is given, the name is guessed from the environment.";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.raynigon.ecs.logging.kafka.producer;


import com.raynigon.ecs.logging.LoggingConstants;
import com.raynigon.ecs.logging.kafka.ApplicationNameProvider;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.slf4j.MDC;

import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.UUID;

public class EcsProducerInterceptor<K, V> implements ProducerInterceptor<K, V> {

private String producerName = "unknown";

@Override
public void configure(Map<String, ?> configs) {
Object value = configs.get(EcsProducerConfigs.PRODUCER_NAME_CONFIG);
if (value instanceof String) {
producerName = (String) value;
return;
}
producerName = ApplicationNameProvider.getApplicationName();
}

@Override
public void close() {
// Nothing has to be done here
}

@Override
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
String transactionId = MDC.get(LoggingConstants.TRANSACTION_ID_PROPERTY);
if (transactionId == null) {
transactionId = UUID.randomUUID().toString();
}
Headers headers = record.headers();
headers.add(LoggingConstants.KAFKA_PRODUCER_NAME_HEADER, producerName.getBytes(StandardCharsets.UTF_8));
headers.add(LoggingConstants.KAFKA_TRANSACTION_ID_HEADER, transactionId.getBytes(StandardCharsets.UTF_8));
return record;
}

@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
// Nothing has to be done here
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.raynigon.ecs.logging.kafka.ApplicationNameProvider
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ include 'ecs-logging-app'
include 'ecs-logging-access'
include 'ecs-logging-async'
include 'ecs-logging-audit'
include 'ecs-logging-kafka'
include 'gzip-request-filter-starter'

0 comments on commit 84ab4c7

Please sign in to comment.