This repository has been archived by the owner on Jul 2, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
4 changed files
with
324 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
/target | ||
/*.iml | ||
/.idea |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<groupId>com.calclab.kafka</groupId> | ||
<artifactId>kafka-logback-appender</artifactId> | ||
<packaging>jar</packaging> | ||
<version>1.0-SNAPSHOT</version> | ||
<name>kafka-logback-appender</name> | ||
|
||
<licenses> | ||
<license> | ||
<name>MIT License</name> | ||
<url>http://www.opensource.org/licenses/mit-license.php</url> | ||
<distribution>repo</distribution> | ||
</license> | ||
</licenses> | ||
|
||
<developers> | ||
<developer> | ||
<id>malex</id> | ||
<name>Marcus Spiegel</name> | ||
<email>[email protected]</email> | ||
</developer> | ||
</developers> | ||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>junit</groupId> | ||
<artifactId>junit</artifactId> | ||
<version>4.12</version> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>ch.qos.logback</groupId> | ||
<artifactId>logback-classic</artifactId> | ||
<version>1.1.7</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.slf4j</groupId> | ||
<artifactId>slf4j-api</artifactId> | ||
<version>1.7.21</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>ch.qos.logback</groupId> | ||
<artifactId>logback-core</artifactId> | ||
<version>1.1.7</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.apache.kafka</groupId> | ||
<artifactId>kafka-clients</artifactId> | ||
<version>0.10.0.0</version> | ||
</dependency> | ||
</dependencies> | ||
</project> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
package com.calclab.kafka; | ||
|
||
|
||
import ch.qos.logback.classic.spi.ILoggingEvent; | ||
|
||
import org.apache.kafka.clients.producer.KafkaProducer; | ||
import org.apache.kafka.clients.producer.Producer; | ||
import org.apache.kafka.clients.producer.ProducerRecord; | ||
import org.apache.kafka.clients.producer.RecordMetadata; | ||
|
||
import java.io.ByteArrayOutputStream; | ||
import java.io.IOException; | ||
import java.util.Properties; | ||
import java.util.concurrent.Future; | ||
|
||
public class KafkaLogbackAppender extends KafkaLogbackConfigBase<ILoggingEvent> { | ||
|
||
protected Producer producer = null; | ||
|
||
public void start() { | ||
super.start(); | ||
Properties props = getProducerProperties(); | ||
this.producer = createKafkaProducer(props); | ||
addInfo("Kafka producer connected to " + brokerList); | ||
addInfo("Logging for topic: " + topic); | ||
} | ||
|
||
@Override | ||
public void stop() { | ||
super.stop(); | ||
if (producer != null) { | ||
producer.close(); | ||
} | ||
} | ||
|
||
protected void append(ILoggingEvent event) { | ||
byte[] message = null; | ||
if (encoder != null) { | ||
try { | ||
ByteArrayOutputStream baos = new ByteArrayOutputStream(); | ||
encoder.init(baos); | ||
encoder.setContext(getContext()); | ||
encoder.doEncode(event); | ||
message = baos.toByteArray(); | ||
} catch (IOException ex) { | ||
addError("Error encoding event", ex); | ||
} | ||
} else { | ||
message = event.getMessage().getBytes(); | ||
} | ||
Future<RecordMetadata> response = producer.send(new ProducerRecord<byte[], byte[]>(topic, message)); | ||
if (syncSend) { | ||
try { | ||
response.get(); | ||
} catch (Exception ex) { | ||
addError("Error waiting for Kafka response", ex); | ||
} | ||
} | ||
} | ||
|
||
protected Producer createKafkaProducer(Properties props) { | ||
return new KafkaProducer<byte[], byte[]>(props); | ||
} | ||
} |
195 changes: 195 additions & 0 deletions
195
src/main/java/com/calclab/kafka/KafkaLogbackConfigBase.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,195 @@ | ||
package com.calclab.kafka; | ||
|
||
import ch.qos.logback.classic.spi.ILoggingEvent; | ||
import ch.qos.logback.core.Layout; | ||
import ch.qos.logback.core.UnsynchronizedAppenderBase; | ||
import ch.qos.logback.core.encoder.Encoder; | ||
import ch.qos.logback.core.encoder.LayoutWrappingEncoder; | ||
import ch.qos.logback.core.spi.DeferredProcessingAware; | ||
import org.apache.kafka.clients.CommonClientConfigs; | ||
import org.apache.kafka.clients.producer.ProducerConfig; | ||
import org.apache.kafka.common.config.ConfigException; | ||
import org.apache.kafka.common.config.SaslConfigs; | ||
|
||
import java.util.Properties; | ||
|
||
import static ch.qos.logback.core.CoreConstants.CODES_URL; | ||
import static org.apache.kafka.common.config.SslConfigs.*; | ||
|
||
/** | ||
* Base class holding all configuration parameters of the appender (the underlying KafkaProducer | ||
* and the appender itself). | ||
*/ | ||
public abstract class KafkaLogbackConfigBase<I extends DeferredProcessingAware> extends UnsynchronizedAppenderBase<ILoggingEvent> { | ||
protected static final String TOPIC = null; | ||
private static final String BOOTSTRAP_SERVERS_CONFIG = ProducerConfig.BOOTSTRAP_SERVERS_CONFIG; | ||
private static final String COMPRESSION_TYPE_CONFIG = ProducerConfig.COMPRESSION_TYPE_CONFIG; | ||
private static final String ACKS_CONFIG = ProducerConfig.ACKS_CONFIG; | ||
private static final String RETRIES_CONFIG = ProducerConfig.RETRIES_CONFIG; | ||
private static final String KEY_SERIALIZER_CLASS_CONFIG = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; | ||
private static final String VALUE_SERIALIZER_CLASS_CONFIG = ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; | ||
private static final String SECURITY_PROTOCOL = CommonClientConfigs.SECURITY_PROTOCOL_CONFIG; | ||
private static final String SSL_TRUSTSTORE_LOCATION = SSL_TRUSTSTORE_LOCATION_CONFIG; | ||
private static final String SSL_TRUSTSTORE_PASSWORD = SSL_TRUSTSTORE_PASSWORD_CONFIG; | ||
private static final String SSL_KEYSTORE_TYPE = SSL_KEYSTORE_TYPE_CONFIG; | ||
private static final String SSL_KEYSTORE_LOCATION = SSL_KEYSTORE_LOCATION_CONFIG; | ||
private static final String SSL_KEYSTORE_PASSWORD = SSL_KEYSTORE_PASSWORD_CONFIG; | ||
private static final String SASL_KERBEROS_SERVICE_NAME = SaslConfigs.SASL_KERBEROS_SERVICE_NAME; | ||
|
||
protected String brokerList = null; | ||
protected String topic = null; | ||
protected String compressionType = null; | ||
protected String keySerializerClass = null; | ||
protected String valueSerializerClass = null; | ||
protected String securityProtocol = null; | ||
protected String sslTruststoreLocation = null; | ||
protected String sslTruststorePassword = null; | ||
protected String sslKeystoreType = null; | ||
protected String sslKeystoreLocation = null; | ||
protected String sslKeystorePassword = null; | ||
protected String saslKerberosServiceName = null; | ||
protected String clientJaasConfPath = null; | ||
protected String kerb5ConfPath = null; | ||
|
||
protected int retries = 0; | ||
protected int requiredNumAcks = Integer.MAX_VALUE; | ||
protected boolean syncSend = false; | ||
|
||
protected Encoder<ILoggingEvent> encoder; | ||
|
||
/** | ||
* Return the configuration properties for the KafkaProducer. | ||
* @return Properties with configured producer params | ||
*/ | ||
protected Properties getProducerProperties() { | ||
// check for config parameter validity | ||
Properties props = new Properties(); | ||
if (brokerList != null) | ||
props.put(BOOTSTRAP_SERVERS_CONFIG, brokerList); | ||
if (props.isEmpty()) | ||
throw new ConfigException("The bootstrap servers property should be specified"); | ||
if (topic == null) | ||
throw new ConfigException("Topic must be specified by the Kafka Logback appender"); | ||
if (compressionType != null) | ||
props.put(COMPRESSION_TYPE_CONFIG, compressionType); | ||
if (requiredNumAcks != Integer.MAX_VALUE) | ||
props.put(ACKS_CONFIG, Integer.toString(requiredNumAcks)); | ||
if (retries > 0) | ||
props.put(RETRIES_CONFIG, retries); | ||
if (securityProtocol != null) { | ||
props.put(SECURITY_PROTOCOL, securityProtocol); | ||
} | ||
if (securityProtocol != null && securityProtocol.contains("SSL") && sslTruststoreLocation != null && | ||
sslTruststorePassword != null) { | ||
props.put(SSL_TRUSTSTORE_LOCATION, sslTruststoreLocation); | ||
props.put(SSL_TRUSTSTORE_PASSWORD, sslTruststorePassword); | ||
|
||
if (sslKeystoreType != null && sslKeystoreLocation != null && | ||
sslKeystorePassword != null) { | ||
props.put(SSL_KEYSTORE_TYPE, sslKeystoreType); | ||
props.put(SSL_KEYSTORE_LOCATION, sslKeystoreLocation); | ||
props.put(SSL_KEYSTORE_PASSWORD, sslKeystorePassword); | ||
} | ||
} | ||
if (securityProtocol != null && securityProtocol.contains("SASL") && saslKerberosServiceName != null && clientJaasConfPath != null) { | ||
props.put(SASL_KERBEROS_SERVICE_NAME, saslKerberosServiceName); | ||
System.setProperty("java.security.auth.login.config", clientJaasConfPath); | ||
if (kerb5ConfPath != null) { | ||
System.setProperty("java.security.krb5.conf", kerb5ConfPath); | ||
} | ||
} | ||
if (keySerializerClass != null) { | ||
props.put(KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass); | ||
} else { | ||
props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); | ||
} | ||
if (valueSerializerClass != null) { | ||
props.put(VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass); | ||
} else { | ||
props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); | ||
} | ||
|
||
return props; | ||
} | ||
|
||
// Producer parameters | ||
|
||
public void setBrokerList(String brokerList) { | ||
this.brokerList = brokerList; | ||
} | ||
|
||
public void setRequiredNumAcks(int requiredNumAcks) { | ||
this.requiredNumAcks = requiredNumAcks; | ||
} | ||
|
||
public void setRetries(int retries) { | ||
this.retries = retries; | ||
} | ||
|
||
|
||
public void setCompressionType(String compressionType) { | ||
this.compressionType = compressionType; | ||
} | ||
|
||
public void setTopic(String topic) { | ||
this.topic = topic; | ||
} | ||
|
||
public void setSyncSend(boolean syncSend) { | ||
this.syncSend = syncSend; | ||
} | ||
|
||
public void setKeySerializerClass(String clazz) { this.keySerializerClass = clazz; } | ||
|
||
public void setValueSerializerClass(String clazz) { this.valueSerializerClass = clazz; } | ||
|
||
public void setSecurityProtocol(String securityProtocol) { | ||
this.securityProtocol = securityProtocol; | ||
} | ||
|
||
public void setSslTruststoreLocation(String sslTruststoreLocation) { | ||
this.sslTruststoreLocation = sslTruststoreLocation; | ||
} | ||
|
||
public void setSslTruststorePassword(String sslTruststorePassword) { | ||
this.sslTruststorePassword = sslTruststorePassword; | ||
} | ||
|
||
public void setSslKeystorePassword(String sslKeystorePassword) { | ||
this.sslKeystorePassword = sslKeystorePassword; | ||
} | ||
|
||
public void setSslKeystoreType(String sslKeystoreType) { | ||
this.sslKeystoreType = sslKeystoreType; | ||
} | ||
|
||
public void setSslKeystoreLocation(String sslKeystoreLocation) { | ||
this.sslKeystoreLocation = sslKeystoreLocation; | ||
} | ||
|
||
public void setSaslKerberosServiceName(String saslKerberosServiceName) { | ||
this.saslKerberosServiceName = saslKerberosServiceName; | ||
} | ||
|
||
public void setClientJaasConfPath(String clientJaasConfPath) { | ||
this.clientJaasConfPath = clientJaasConfPath; | ||
} | ||
|
||
public void setKerb5ConfPath(String kerb5ConfPath) { | ||
this.kerb5ConfPath = kerb5ConfPath; | ||
} | ||
|
||
// Appender configuration parameters | ||
|
||
public void setLayout(Layout<ILoggingEvent> layout) { | ||
addWarn("This appender no longer admits a layout as a sub-component, set an encoder instead."); | ||
addWarn("To ensure compatibility, wrapping your layout in LayoutWrappingEncoder."); | ||
addWarn("See also " + CODES_URL + "#layoutInsteadOfEncoder for details"); | ||
LayoutWrappingEncoder<ILoggingEvent> lwe = new LayoutWrappingEncoder<ILoggingEvent>(); | ||
lwe.setLayout(layout); | ||
lwe.setContext(context); | ||
this.encoder = lwe; | ||
} | ||
|
||
public void setEncoder(Encoder<ILoggingEvent> encoder) { this.encoder = encoder; } | ||
} |