diff --git a/.gitignore b/.gitignore index 71d3af0..ba7880a 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,7 @@ docker/logs docker/src docker/elasticsearch/data +**/*DS_Store *.iml kafkaESConsumerLocal.properties dependency-reduced-pom.xml diff --git a/build.gradle b/build.gradle index 90ab446..8434984 100644 --- a/build.gradle +++ b/build.gradle @@ -38,8 +38,8 @@ ext { spring_version = '4.3.8.RELEASE' slf4j_version = '1.7.25' logback_version = '1.2.3' - kafka_version = '2.1.1' - elasticsearch_version = '6.6.0' + kafka_version = '2.5.1' + elasticsearch_version = '7.8.1' } configurations.all { @@ -79,6 +79,8 @@ dependencies { compile group: 'ch.qos.logback', name: 'logback-classic', version: logback_version // https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.6' + // missing in JDK since Java 11 + compile group: 'javax.annotation', name: 'javax.annotation-api', version: '1.3.2' testCompile group: 'junit', name: 'junit', version: '4.12' testCompile group: 'org.mockito', name: 'mockito-all', version: '1.10.19' diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index bae6615..cb798d9 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index b10af04..0b0acf1 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ -#Thu Jul 13 16:56:59 EEST 2017 +#Thu Jan 23 14:28:37 EST 2020 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-4.0.1-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-6.0-all.zip diff --git a/src/main/java/org/elasticsearch/kafka/indexer/service/IBatchMessageProcessor.java b/src/main/java/org/elasticsearch/kafka/indexer/service/IBatchMessageProcessor.java index 4c5b47e..63d486d 100644 --- a/src/main/java/org/elasticsearch/kafka/indexer/service/IBatchMessageProcessor.java +++ b/src/main/java/org/elasticsearch/kafka/indexer/service/IBatchMessageProcessor.java @@ -54,7 +54,7 @@ public interface IBatchMessageProcessor { * Returning FALSE means that the offsets will NOT be committed and events from the previous poll will * be re-processed * @param consumerId - * @param previousPollEndPosition + * @param pollEndPosition * @return boolean shouldCommitThisPoll * @throws Exception */ diff --git a/src/main/resources/config/kafka-es-indexer.properties b/src/main/resources/config/kafka-es-indexer.properties index b3f1213..edecb2b 100644 --- a/src/main/resources/config/kafka-es-indexer.properties +++ b/src/main/resources/config/kafka-es-indexer.properties @@ -1,27 +1,21 @@ ### Kafka properties #################################### -# all properties starting with this prefix - will be added to KafkaPRoperties object +# all properties starting with this prefix - will be added to KafkaProperties object # with the property name = original property name minus the prefix kafka.consumer.property.prefix=consumer.kafka.property. # Kafka Brokers host:port list: :,…,: -# default: localhost:9092 -# old: kafka.consumer.brokers.list=localhost:9092 consumer.kafka.property.bootstrap.servers=localhost:9092 # Kafka Consumer group name prefix - -# each indexer job will have a clientId = kafka.consumer.group.name + "_" + partitionNumber -# default: kafka_es_indexer -# old: kafka.consumer.group.name=kafka_es_indexer +# each job will have a clientId = kafka.consumer.group.name + "_" + partitionNumber consumer.kafka.property.group.id=kafka-batch-consumer # kafka session timeout in ms - is kafka broker does not get a heartbeat from a consumer during this interval - # consumer is marked as 'dead' and re-balancing is kicking off # default: 30s x 1000 = 30000 ms -# old: kafka.consumer.session.timeout.ms=30000 consumer.kafka.property.session.timeout.ms=30000 # Max number of bytes to fetch in one poll request PER partition # default: 1M = 1048576 -# old: kafka.consumer.max.partition.fetch.bytes=1048576 consumer.kafka.property.max.partition.fetch.bytes=1048576 # application instance name: @@ -29,14 +23,13 @@ consumer.kafka.property.max.partition.fetch.bytes=1048576 application.id=app1 # Kafka Topic from which the message has to be processed -# mandatory property, no default value specified. kafka.consumer.source.topic=my_log_topic #Number of consumer threads kafka.consumer.pool.count=5 # time in ms to wait for new messages to arrive when calling poll() on Kafka brokers , if there are no messages right away -# WARNING: make sure this value is not higher than kafka.consumer.session.timeout.ms !!! +# WARNING: make sure this value is not higher than session.timeout.ms !!! # default: 10 sec = 10 x 1000 = 10000 ms kafka.consumer.poll.interval.ms=10000 @@ -44,36 +37,36 @@ kafka.consumer.poll.interval.ms=10000 # from the IBatchMessageProcessor.beforeCommitCallBack() method kafka.consumer.poll.retry.limit=5 -# time delay in ms before retires of the poll records in the event of a recoverable exception +# time delay in ms before retries of the poll records in the event of a recoverable exception # from the IBatchMessageProcessor.beforeCommitCallBack() method kafka.consumer.poll.retry.delay.interval.ms=1000 # in the case when the max limit of recoverable exceptions was reached: # if set to TRUE - ignore the exception and continue processing the next poll() -# if set to FALSE - throw ConcumerUnrecoverableException and shutdown the Consumer +# if set to FALSE - throw ConsumerUnrecoverableException and shutdown the Consumer kafka.consumer.ignore.overlimit.recoverable.errors=false ### ElasticSearch properties #################################### -# ElasticSearch Host and Port List for all the nodes -# Example: elasticsearch.hosts.list=machine_1_ip:9300,machine_2_ip:9300 +# ElasticSearch host and port List for all the nodes +# example: elasticsearch.hosts.list=machine_1_ip:9300,machine_2_ip:9300 elasticsearch.hosts.list=localhost:9300 -# Name of the ElasticSearch Cluster that messages will be posted to; -# Tip: Its not a good idea to use the default name "ElasticSearch" as your cluster name. +# name of the ElasticSearch Cluster that messages will be posted to; elasticsearch.cluster.name=KafkaESCluster -# ES Index Name that messages will be posted/indexed to; this can be customized via using a custom IndexHandler implementation class -# Default: "kafkaESIndex" -elasticsearch.index.name=kafkaESIndex +# ES Index Name that messages will be posted/indexed to; +# this can be customized in your own implementation of a batch processor, for example in the processMessage() method +elasticsearch.index.name=kafka-es-index -# ES Index Type that messages will be posted/indexed to; this can be customized via using a custom IndexHandler implementation class -# Default: “kafkaESType” +# TODO deprecate this +# ES Index Type that messages will be posted/indexed to; this can be customized in your own implementation of a batch processor elasticsearch.index.type=kafkaESType -#Sleep time in ms between re-attempts of sending batch to ES , in case of SERVICE_UNAVAILABLE response -# Default: 10000 +# Sleep time in ms between re-attempts of sending batch to ES , in case of SERVICE_UNAVAILABLE response +# default: 10s = 10*1000 = 10000ms elasticsearch.reconnect.attempt.wait.ms=10000 # number of times to try to re-connect to ES when performing batch indexing , if connection to ES fails elasticsearch.indexing.retry.attempts=2 # sleep time in ms between attempts to connect to ES +# default: 10s = 10*1000 = 10000ms elasticsearch.indexing.retry.sleep.ms=10000 diff --git a/src/test/java/org/elasticsearch/kafka/indexer/service/ElasticSearchBatchServiceTest.java b/src/test/java/org/elasticsearch/kafka/indexer/service/ElasticSearchBatchServiceTest.java index 07d66e7..4debdb2 100644 --- a/src/test/java/org/elasticsearch/kafka/indexer/service/ElasticSearchBatchServiceTest.java +++ b/src/test/java/org/elasticsearch/kafka/indexer/service/ElasticSearchBatchServiceTest.java @@ -83,12 +83,6 @@ public void testAddEventToBulkRequest_withUUID_withRouting() { Mockito.verify(mockedIndexRequestBuilder, Mockito.times(1)).setRouting(eventUUID); } - /** - * Test method for - * {@link org.elasticsearch.kafka.indexer.service.impl.BasicMessageHandler#postToElasticSearch()} - * . - */ - @Test public void testPostOneBulkRequestToES_NoNodeException() { // simulate failure due to ES cluster (or part of it) not being