From f4aa267e18c9408926512acafe6b33e21ac1ad23 Mon Sep 17 00:00:00 2001 From: Marcus Spiegel Date: Tue, 28 Jun 2016 09:30:32 +0200 Subject: [PATCH] First implementation --- .gitignore | 3 + pom.xml | 62 ++++++ .../calclab/kafka/KafkaLogbackAppender.java | 64 ++++++ .../calclab/kafka/KafkaLogbackConfigBase.java | 195 ++++++++++++++++++ 4 files changed, 324 insertions(+) create mode 100644 .gitignore create mode 100644 pom.xml create mode 100644 src/main/java/com/calclab/kafka/KafkaLogbackAppender.java create mode 100644 src/main/java/com/calclab/kafka/KafkaLogbackConfigBase.java diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f8239b1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/target +/*.iml +/.idea diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..f62a95d --- /dev/null +++ b/pom.xml @@ -0,0 +1,62 @@ + + 4.0.0 + + com.calclab.kafka + kafka-logback-appender + jar + 1.0-SNAPSHOT + kafka-logback-appender + + + + MIT License + http://www.opensource.org/licenses/mit-license.php + repo + + + + + + malex + Marcus Spiegel + malesch@gmail.com + + + + + + junit + junit + 4.12 + test + + + + ch.qos.logback + logback-classic + 1.1.7 + provided + + + + org.slf4j + slf4j-api + 1.7.21 + provided + + + + ch.qos.logback + logback-core + 1.1.7 + provided + + + + org.apache.kafka + kafka-clients + 0.10.0.0 + + + diff --git a/src/main/java/com/calclab/kafka/KafkaLogbackAppender.java b/src/main/java/com/calclab/kafka/KafkaLogbackAppender.java new file mode 100644 index 0000000..14f4c32 --- /dev/null +++ b/src/main/java/com/calclab/kafka/KafkaLogbackAppender.java @@ -0,0 +1,64 @@ +package com.calclab.kafka; + + +import ch.qos.logback.classic.spi.ILoggingEvent; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Properties; +import java.util.concurrent.Future; + +public class KafkaLogbackAppender extends KafkaLogbackConfigBase { + + protected Producer producer = null; + + public void start() { + super.start(); + Properties props = getProducerProperties(); + this.producer = createKafkaProducer(props); + addInfo("Kafka producer connected to " + brokerList); + addInfo("Logging for topic: " + topic); + } + + @Override + public void stop() { + super.stop(); + if (producer != null) { + producer.close(); + } + } + + protected void append(ILoggingEvent event) { + byte[] message = null; + if (encoder != null) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + encoder.init(baos); + encoder.setContext(getContext()); + encoder.doEncode(event); + message = baos.toByteArray(); + } catch (IOException ex) { + addError("Error encoding event", ex); + } + } else { + message = event.getMessage().getBytes(); + } + Future response = producer.send(new ProducerRecord(topic, message)); + if (syncSend) { + try { + response.get(); + } catch (Exception ex) { + addError("Error waiting for Kafka response", ex); + } + } + } + + protected Producer createKafkaProducer(Properties props) { + return new KafkaProducer(props); + } +} diff --git a/src/main/java/com/calclab/kafka/KafkaLogbackConfigBase.java b/src/main/java/com/calclab/kafka/KafkaLogbackConfigBase.java new file mode 100644 index 0000000..9260b4d --- /dev/null +++ b/src/main/java/com/calclab/kafka/KafkaLogbackConfigBase.java @@ -0,0 +1,195 @@ +package com.calclab.kafka; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.Layout; +import ch.qos.logback.core.UnsynchronizedAppenderBase; +import ch.qos.logback.core.encoder.Encoder; +import ch.qos.logback.core.encoder.LayoutWrappingEncoder; +import ch.qos.logback.core.spi.DeferredProcessingAware; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.SaslConfigs; + +import java.util.Properties; + +import static ch.qos.logback.core.CoreConstants.CODES_URL; +import static org.apache.kafka.common.config.SslConfigs.*; + +/** + * Base class holding all configuration parameters of the appender (the underlying KafkaProducer + * and the appender itself). + */ +public abstract class KafkaLogbackConfigBase extends UnsynchronizedAppenderBase { + protected static final String TOPIC = null; + private static final String BOOTSTRAP_SERVERS_CONFIG = ProducerConfig.BOOTSTRAP_SERVERS_CONFIG; + private static final String COMPRESSION_TYPE_CONFIG = ProducerConfig.COMPRESSION_TYPE_CONFIG; + private static final String ACKS_CONFIG = ProducerConfig.ACKS_CONFIG; + private static final String RETRIES_CONFIG = ProducerConfig.RETRIES_CONFIG; + private static final String KEY_SERIALIZER_CLASS_CONFIG = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; + private static final String VALUE_SERIALIZER_CLASS_CONFIG = ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; + private static final String SECURITY_PROTOCOL = CommonClientConfigs.SECURITY_PROTOCOL_CONFIG; + private static final String SSL_TRUSTSTORE_LOCATION = SSL_TRUSTSTORE_LOCATION_CONFIG; + private static final String SSL_TRUSTSTORE_PASSWORD = SSL_TRUSTSTORE_PASSWORD_CONFIG; + private static final String SSL_KEYSTORE_TYPE = SSL_KEYSTORE_TYPE_CONFIG; + private static final String SSL_KEYSTORE_LOCATION = SSL_KEYSTORE_LOCATION_CONFIG; + private static final String SSL_KEYSTORE_PASSWORD = SSL_KEYSTORE_PASSWORD_CONFIG; + private static final String SASL_KERBEROS_SERVICE_NAME = SaslConfigs.SASL_KERBEROS_SERVICE_NAME; + + protected String brokerList = null; + protected String topic = null; + protected String compressionType = null; + protected String keySerializerClass = null; + protected String valueSerializerClass = null; + protected String securityProtocol = null; + protected String sslTruststoreLocation = null; + protected String sslTruststorePassword = null; + protected String sslKeystoreType = null; + protected String sslKeystoreLocation = null; + protected String sslKeystorePassword = null; + protected String saslKerberosServiceName = null; + protected String clientJaasConfPath = null; + protected String kerb5ConfPath = null; + + protected int retries = 0; + protected int requiredNumAcks = Integer.MAX_VALUE; + protected boolean syncSend = false; + + protected Encoder encoder; + + /** + * Return the configuration properties for the KafkaProducer. + * @return Properties with configured producer params + */ + protected Properties getProducerProperties() { + // check for config parameter validity + Properties props = new Properties(); + if (brokerList != null) + props.put(BOOTSTRAP_SERVERS_CONFIG, brokerList); + if (props.isEmpty()) + throw new ConfigException("The bootstrap servers property should be specified"); + if (topic == null) + throw new ConfigException("Topic must be specified by the Kafka Logback appender"); + if (compressionType != null) + props.put(COMPRESSION_TYPE_CONFIG, compressionType); + if (requiredNumAcks != Integer.MAX_VALUE) + props.put(ACKS_CONFIG, Integer.toString(requiredNumAcks)); + if (retries > 0) + props.put(RETRIES_CONFIG, retries); + if (securityProtocol != null) { + props.put(SECURITY_PROTOCOL, securityProtocol); + } + if (securityProtocol != null && securityProtocol.contains("SSL") && sslTruststoreLocation != null && + sslTruststorePassword != null) { + props.put(SSL_TRUSTSTORE_LOCATION, sslTruststoreLocation); + props.put(SSL_TRUSTSTORE_PASSWORD, sslTruststorePassword); + + if (sslKeystoreType != null && sslKeystoreLocation != null && + sslKeystorePassword != null) { + props.put(SSL_KEYSTORE_TYPE, sslKeystoreType); + props.put(SSL_KEYSTORE_LOCATION, sslKeystoreLocation); + props.put(SSL_KEYSTORE_PASSWORD, sslKeystorePassword); + } + } + if (securityProtocol != null && securityProtocol.contains("SASL") && saslKerberosServiceName != null && clientJaasConfPath != null) { + props.put(SASL_KERBEROS_SERVICE_NAME, saslKerberosServiceName); + System.setProperty("java.security.auth.login.config", clientJaasConfPath); + if (kerb5ConfPath != null) { + System.setProperty("java.security.krb5.conf", kerb5ConfPath); + } + } + if (keySerializerClass != null) { + props.put(KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass); + } else { + props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + } + if (valueSerializerClass != null) { + props.put(VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass); + } else { + props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + } + + return props; + } + + // Producer parameters + + public void setBrokerList(String brokerList) { + this.brokerList = brokerList; + } + + public void setRequiredNumAcks(int requiredNumAcks) { + this.requiredNumAcks = requiredNumAcks; + } + + public void setRetries(int retries) { + this.retries = retries; + } + + + public void setCompressionType(String compressionType) { + this.compressionType = compressionType; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public void setSyncSend(boolean syncSend) { + this.syncSend = syncSend; + } + + public void setKeySerializerClass(String clazz) { this.keySerializerClass = clazz; } + + public void setValueSerializerClass(String clazz) { this.valueSerializerClass = clazz; } + + public void setSecurityProtocol(String securityProtocol) { + this.securityProtocol = securityProtocol; + } + + public void setSslTruststoreLocation(String sslTruststoreLocation) { + this.sslTruststoreLocation = sslTruststoreLocation; + } + + public void setSslTruststorePassword(String sslTruststorePassword) { + this.sslTruststorePassword = sslTruststorePassword; + } + + public void setSslKeystorePassword(String sslKeystorePassword) { + this.sslKeystorePassword = sslKeystorePassword; + } + + public void setSslKeystoreType(String sslKeystoreType) { + this.sslKeystoreType = sslKeystoreType; + } + + public void setSslKeystoreLocation(String sslKeystoreLocation) { + this.sslKeystoreLocation = sslKeystoreLocation; + } + + public void setSaslKerberosServiceName(String saslKerberosServiceName) { + this.saslKerberosServiceName = saslKerberosServiceName; + } + + public void setClientJaasConfPath(String clientJaasConfPath) { + this.clientJaasConfPath = clientJaasConfPath; + } + + public void setKerb5ConfPath(String kerb5ConfPath) { + this.kerb5ConfPath = kerb5ConfPath; + } + + // Appender configuration parameters + + public void setLayout(Layout layout) { + addWarn("This appender no longer admits a layout as a sub-component, set an encoder instead."); + addWarn("To ensure compatibility, wrapping your layout in LayoutWrappingEncoder."); + addWarn("See also " + CODES_URL + "#layoutInsteadOfEncoder for details"); + LayoutWrappingEncoder lwe = new LayoutWrappingEncoder(); + lwe.setLayout(layout); + lwe.setContext(context); + this.encoder = lwe; + } + + public void setEncoder(Encoder encoder) { this.encoder = encoder; } +}