Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upgrade to Kafka 2.5.1, ES 7.8, Java 11 #110

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ docker/logs
docker/src
docker/elasticsearch/data

**/*DS_Store
*.iml
kafkaESConsumerLocal.properties
dependency-reduced-pom.xml
6 changes: 4 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ ext {
spring_version = '4.3.8.RELEASE'
slf4j_version = '1.7.25'
logback_version = '1.2.3'
kafka_version = '2.1.1'
elasticsearch_version = '6.6.0'
kafka_version = '2.5.1'
elasticsearch_version = '7.8.1'
}

configurations.all {
Expand Down Expand Up @@ -79,6 +79,8 @@ dependencies {
compile group: 'ch.qos.logback', name: 'logback-classic', version: logback_version
// https://mvnrepository.com/artifact/org.apache.commons/commons-lang3
compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.6'
// missing in JDK since Java 11
compile group: 'javax.annotation', name: 'javax.annotation-api', version: '1.3.2'

testCompile group: 'junit', name: 'junit', version: '4.12'
testCompile group: 'org.mockito', name: 'mockito-all', version: '1.10.19'
Expand Down
Binary file modified gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
4 changes: 2 additions & 2 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#Thu Jul 13 16:56:59 EEST 2017
#Thu Jan 23 14:28:37 EST 2020
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.0.1-all.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-6.0-all.zip

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

6.6 is the latest one

Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public interface IBatchMessageProcessor {
* Returning FALSE means that the offsets will NOT be committed and events from the previous poll will
* be re-processed
* @param consumerId
* @param previousPollEndPosition
* @param pollEndPosition
* @return boolean shouldCommitThisPoll
* @throws Exception
*/
Expand Down
39 changes: 16 additions & 23 deletions src/main/resources/config/kafka-es-indexer.properties
Original file line number Diff line number Diff line change
@@ -1,79 +1,72 @@
### Kafka properties ####################################
# all properties starting with this prefix - will be added to KafkaPRoperties object
# all properties starting with this prefix - will be added to KafkaProperties object
# with the property name = original property name minus the prefix
kafka.consumer.property.prefix=consumer.kafka.property.
# Kafka Brokers host:port list: <host1>:<port1>,…,<hostN>:<portN>
# default: localhost:9092
# old: kafka.consumer.brokers.list=localhost:9092
consumer.kafka.property.bootstrap.servers=localhost:9092

# Kafka Consumer group name prefix -
# each indexer job will have a clientId = kafka.consumer.group.name + "_" + partitionNumber
# default: kafka_es_indexer
# old: kafka.consumer.group.name=kafka_es_indexer
# each job will have a clientId = kafka.consumer.group.name + "_" + partitionNumber
consumer.kafka.property.group.id=kafka-batch-consumer

# kafka session timeout in ms - is kafka broker does not get a heartbeat from a consumer during this interval -
# consumer is marked as 'dead' and re-balancing is kicking off
# default: 30s x 1000 = 30000 ms
# old: kafka.consumer.session.timeout.ms=30000
consumer.kafka.property.session.timeout.ms=30000

# Max number of bytes to fetch in one poll request PER partition
# default: 1M = 1048576
# old: kafka.consumer.max.partition.fetch.bytes=1048576
consumer.kafka.property.max.partition.fetch.bytes=1048576

# application instance name:
# used as a common name prefix of all consumer threads
application.id=app1

# Kafka Topic from which the message has to be processed
# mandatory property, no default value specified.
kafka.consumer.source.topic=my_log_topic

#Number of consumer threads
kafka.consumer.pool.count=5

# time in ms to wait for new messages to arrive when calling poll() on Kafka brokers , if there are no messages right away
# WARNING: make sure this value is not higher than kafka.consumer.session.timeout.ms !!!
# WARNING: make sure this value is not higher than session.timeout.ms !!!
# default: 10 sec = 10 x 1000 = 10000 ms
kafka.consumer.poll.interval.ms=10000

# number of time poll records will be attempted to be re-processed in the event of a recoverable exception
# from the IBatchMessageProcessor.beforeCommitCallBack() method
kafka.consumer.poll.retry.limit=5

# time delay in ms before retires of the poll records in the event of a recoverable exception
# time delay in ms before retries of the poll records in the event of a recoverable exception
# from the IBatchMessageProcessor.beforeCommitCallBack() method
kafka.consumer.poll.retry.delay.interval.ms=1000
# in the case when the max limit of recoverable exceptions was reached:
# if set to TRUE - ignore the exception and continue processing the next poll()
# if set to FALSE - throw ConcumerUnrecoverableException and shutdown the Consumer
# if set to FALSE - throw ConsumerUnrecoverableException and shutdown the Consumer
kafka.consumer.ignore.overlimit.recoverable.errors=false

### ElasticSearch properties ####################################
# ElasticSearch Host and Port List for all the nodes
# Example: elasticsearch.hosts.list=machine_1_ip:9300,machine_2_ip:9300
# ElasticSearch host and port List for all the nodes
# example: elasticsearch.hosts.list=machine_1_ip:9300,machine_2_ip:9300
elasticsearch.hosts.list=localhost:9300

# Name of the ElasticSearch Cluster that messages will be posted to;
# Tip: Its not a good idea to use the default name "ElasticSearch" as your cluster name.
# name of the ElasticSearch Cluster that messages will be posted to;
elasticsearch.cluster.name=KafkaESCluster

# ES Index Name that messages will be posted/indexed to; this can be customized via using a custom IndexHandler implementation class
# Default: "kafkaESIndex"
elasticsearch.index.name=kafkaESIndex
# ES Index Name that messages will be posted/indexed to;
# this can be customized in your own implementation of a batch processor, for example in the processMessage() method
elasticsearch.index.name=kafka-es-index

# ES Index Type that messages will be posted/indexed to; this can be customized via using a custom IndexHandler implementation class
# Default: “kafkaESType”
# TODO deprecate this
# ES Index Type that messages will be posted/indexed to; this can be customized in your own implementation of a batch processor
elasticsearch.index.type=kafkaESType

#Sleep time in ms between re-attempts of sending batch to ES , in case of SERVICE_UNAVAILABLE response
# Default: 10000
# Sleep time in ms between re-attempts of sending batch to ES , in case of SERVICE_UNAVAILABLE response
# default: 10s = 10*1000 = 10000ms
elasticsearch.reconnect.attempt.wait.ms=10000

# number of times to try to re-connect to ES when performing batch indexing , if connection to ES fails
elasticsearch.indexing.retry.attempts=2
# sleep time in ms between attempts to connect to ES
# default: 10s = 10*1000 = 10000ms
elasticsearch.indexing.retry.sleep.ms=10000
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,6 @@ public void testAddEventToBulkRequest_withUUID_withRouting() {
Mockito.verify(mockedIndexRequestBuilder, Mockito.times(1)).setRouting(eventUUID);
}

/**
* Test method for
* {@link org.elasticsearch.kafka.indexer.service.impl.BasicMessageHandler#postToElasticSearch()}
* .
*/

@Test
public void testPostOneBulkRequestToES_NoNodeException() {
// simulate failure due to ES cluster (or part of it) not being
Expand Down