diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 00000000..16039333 --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,45 @@ +name: build + +on: [push, pull_request] + +jobs: + + tests: + name: scala-${{ matrix.scala }} jdk-${{ matrix.java }} tests + runs-on: ubuntu-latest + + strategy: + fail-fast: true + matrix: + java: [8] + scala: [2.11.12, 2.12.10] + + steps: + - uses: actions/checkout@v2 + - uses: olafurpg/setup-scala@v10 + with: + java-version: "adopt@1.${{ matrix.java }}" + + - name: Cache SBT Coursier directory + uses: actions/cache@v1 + with: + path: ~/.cache/coursier/v1 + key: ${{ runner.os }}-coursier-${{ hashFiles('**/*.sbt') }} + restore-keys: | + ${{ runner.os }}-coursier- + - name: Cache SBT directory + uses: actions/cache@v1 + with: + path: ~/.sbt + key: | + ${{ runner.os }}-sbt-${{ hashFiles('project/build.properties') }}-${{ hashFiles('project/plugins.sbt') }} + restore-keys: ${{ runner.os }}-sbt- + + - name: Run Tests for Kafka 0.10.x, Java ${{ matrix.java }} and Scala ${{ matrix.scala }} + run: sbt -J-Xmx6144m kafka10/test + + - name: Run Tests for Kafka 0.11.x, Java ${{ matrix.java }} and Scala ${{ matrix.scala }} + run: sbt -J-Xmx6144m kafka11/test + + - name: Run Tests for Kafka 1.x.x, Java ${{ matrix.java }} and Scala ${{ matrix.scala }} + run: sbt -J-Xmx6144m kafka1x/test diff --git a/.gitignore b/.gitignore index 1310ac33..c4af8e55 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,7 @@ project/plugins/project/ .scala_dependencies .worksheet .idea + +.bsp/sbt.json + +.DS_Store diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index d284396e..00000000 --- a/.travis.yml +++ /dev/null @@ -1,37 +0,0 @@ -language: scala -sudo: required -dist: trusty -group: edge - -matrix: - include: - # Scala 2.11 - - jdk: oraclejdk8 - scala: 2.11.12 - env: COMMAND=ci - # Scala 2.12 - - jdk: oraclejdk8 - scala: 2.12.13 - env: COMMAND=ci - -env: - global: - - TRAVIS_NODE_VERSION="8.9" # LTS - -# http://austinpray.com/ops/2015/09/20/change-travis-node-version.html -install: -- rm -rf ~/.nvm && git clone https://github.com/creationix/nvm.git ~/.nvm && (cd ~/.nvm && git checkout `git describe --abbrev=0 --tags`) && source ~/.nvm/nvm.sh && nvm install $TRAVIS_NODE_VERSION - -script: -- export SBT_PROFILE=$COVERAGE -- sbt -J-Xmx6144m ++$TRAVIS_SCALA_VERSION $COMMAND - -cache: - directories: - - $HOME/.ivy2/cache - - $HOME/.coursier/cache - - $HOME/.sbt - -before_cache: -- find $HOME/.sbt -name "*.lock" -type f -delete -- find $HOME/.ivy2/cache -name "ivydata-*.properties" -type f -delete \ No newline at end of file diff --git a/CHANGES.md b/CHANGES.md index 8860e9a8..c9b58030 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,13 @@ + +## Version 1.0.0-RC7 (January 23, 2021) + +Depends on Monix 3.3.0 + +Main changes: + +- PR #248: Enable '-Xfatal-warnings' flag (internal) +- PR #245: Use Kafka Producer Consumer interfaces + ## Version 1.0.0-RC6 (April 18, 2020) Depends on Monix 3.x diff --git a/README.md b/README.md index cfc2457e..5026496d 100644 --- a/README.md +++ b/README.md @@ -256,20 +256,18 @@ val observable = Enjoy! -### Caveats +### Internal poll heartbeat -[Issue#101](https://github.com/monix/monix-kafka/issues/101) -Starting from Kafka 0.10.1.0, there is `max.poll.interval.ms` setting: - - The maximum delay between invocations of poll() when using consumer group management. - This places an upper bound on the amount of time that the consumer can be idle before - fetching more records. If poll() is not called before expiration of this timeout, - then the consumer is considered failed and the group will rebalance in order +Starting from Kafka _0.10.1.0_, there is `max.poll.interval.ms` setting that defines the maximum delay between +invocations of poll(), if it is not called in that interval, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. -Since, monix-kafka backpressures until all records has been processed. -This could be a problem if processing takes time. -You can reduce `max.poll.records` if you are experiencing this issue. +This was an [issue](https://github.com/monix/monix-kafka/issues/101) in `monix-kafka`, +since poll is not called until all previous consumed ones were processed, so that slow downstream subscribers + were in risk of being kicked off the consumer group indefinitely. + +It is resolved in `1.0.0-RC8` by introducing an internal poll heartbeat interval +that runs in the background keeping the consumer alive. ## How can I contribute to Monix-Kafka? diff --git a/benchmarks/docker-compose.yml b/benchmarks/docker-compose.yml new file mode 100644 index 00000000..de7d1c73 --- /dev/null +++ b/benchmarks/docker-compose.yml @@ -0,0 +1,39 @@ +--- +version: '2' +services: + + zookeeper: + image: confluentinc/cp-zookeeper:4.1.1 + hostname: zookeeper + container_name: zookeeper + ports: + - "2181:2181" + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + + broker: + image: confluentinc/cp-server:5.5.0 + hostname: broker + container_name: broker + depends_on: + - zookeeper + ports: + - "9092:9092" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092 + CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181 + CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1 + CONFLUENT_METRICS_ENABLE: 'true' + CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous' + diff --git a/benchmarks/project/build.properties b/benchmarks/project/build.properties new file mode 100644 index 00000000..6adcdc75 --- /dev/null +++ b/benchmarks/project/build.properties @@ -0,0 +1 @@ +sbt.version=1.3.3 diff --git a/benchmarks/results/consumer.md b/benchmarks/results/consumer.md new file mode 100644 index 00000000..a4c84e47 --- /dev/null +++ b/benchmarks/results/consumer.md @@ -0,0 +1,47 @@ + +## Consumer Benchmarks + +The consumer benchmark covers the *manual* and *auto commit* consumer implementations of the different libraries. +The manual commit will also cover producing committing back the consumed offsets. +It also runs on different range `pollHeartbeatRate` [1, 10, 15, 100, 1000], which is an important configuration + implemented in this `monix-kafka` library. + +## RC7 +### 1fork 1thread +Benchmark Mode Cnt Score Error Units +ConsumerBenchmark.monix_auto_commit_async thrpt 7 13.097 ± 0.827 ops/s +ConsumerBenchmark.monix_auto_commit_sync thrpt 7 12.486 ± 1.087 ops/s +ConsumerBenchmark.monix_manual_commit thrpt 10 11.519 ± 2.247 ops/s + +### 1 fork 3 threads +Benchmark Mode Cnt Score Error Units +ConsumerBenchmark.monix_auto_commit thrpt 10 16.186 ± 0.920 ops/s +ConsumerBenchmark.monix_manual_commit thrpt 10 16.319 ± 1.465 ops/s + +## RC8 - (Introduces PollHeartbeatRate) +### 1fork 1thread +--- +Benchmark Mode Cnt Score Error Units +ConsumerBenchmark.monixAsyncAutoCommitHeartbeat15ms thrpt 7 13.126 ± 1.737 ops/s +ConsumerBenchmark.monixSyncAutoCommitHeartbeat15ms thrpt 7 12.102 ± 1.214 ops/s +ConsumerBenchmark.monixManualCommitHeartbeat1000ms thrpt 7 2.978 ± 0.006 ops/s +ConsumerBenchmark.monixManualCommitHeartbeat100ms thrpt 7 9.961 ± 1.317 ops/s +ConsumerBenchmark.monixManualCommitHeartbeat10ms thrpt 7 13.346 ± 0.716 ops/s +ConsumerBenchmark.monixManualCommitHeartbeat15ms thrpt 7 13.454 ± 2.680 ops/s +ConsumerBenchmark.monixManualCommitHeartbeat1ms thrpt 7 14.281 ± 1.591 ops/s +ConsumerBenchmark.monixManualCommitHeartbeat50ms thrpt 7 11.900 ± 0.698 ops/s +--- +### 1 fork 3 threads +Benchmark Mode Cnt Score Error Units +ConsumerBenchmark.monixAsyncAutoCommitHeartbeat15ms thrpt 7 16.966 ± 2.659 ops/s +ConsumerBenchmark.monixSyncAutoCommitHeartbeat15ms thrpt 7 15.083 ± 4.242 ops/s +ConsumerBenchmark.monixManualCommitHeartbeat1000ms thrpt 7 2.978 ± 0.006 ops/s +ConsumerBenchmark.monixManualCommitHeartbeat100ms thrpt 7 9.961 ± 1.317 ops/s +ConsumerBenchmark.monixManualCommitHeartbeat10ms thrpt 7 13.346 ± 0.716 ops/s +ConsumerBenchmark.monixManualCommitHeartbeat15ms thrpt 7 13.454 ± 2.680 ops/s +ConsumerBenchmark.monixManualCommitHeartbeat1ms thrpt 7 14.281 ± 1.591 ops/s +ConsumerBenchmark.monixManualCommitHeartbeat50ms thrpt 7 11.900 ± 0.698 ops/s + +```sbt +sbt 'benchmarks/jmh:run -i 5 -wi 1 -f1 -t1 monix.kafka.benchmarks.ConsumerBenchmark.*' +``` \ No newline at end of file diff --git a/benchmarks/results/producer.md b/benchmarks/results/producer.md new file mode 100644 index 00000000..38aec700 --- /dev/null +++ b/benchmarks/results/producer.md @@ -0,0 +1,21 @@ +## Producer benchmarks + +This section includes benchmarks for single and sink producers. + +## RC7 + +### 10iterations 1fork 1thread +Benchmark Mode Cnt Score Error Units +ProducerBenchmark.monix_single_producer thrpt 3 0.504 ± 0.668 ops/s +ProducerBenchmark.monix_sink_producer thrpt 3 1.861 ± 5.475 ops/s + +## RC8 + +### 10iterations 1fork 1thread +ProducerBenchmark.monix_single_producer thrpt 10 0.473 ± 0.070 ops/s +ProducerBenchmark.monix_sink_producer thrpt 10 2.009 ± 0.277 ops/s + +### 10iterations 1fork 3threads +Benchmark Mode Cnt Score Error Units +ProducerBenchmark.monix_single_producer thrpt 10 0.981 ± 0.202 ops/s +ProducerBenchmark.monix_sink_producer thrpt 10 3.241 ± 0.191 ops/s diff --git a/benchmarks/src/main/resources/logback.xml b/benchmarks/src/main/resources/logback.xml new file mode 100644 index 00000000..327dc7d7 --- /dev/null +++ b/benchmarks/src/main/resources/logback.xml @@ -0,0 +1,25 @@ + + + + + + %d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/benchmarks/src/main/scala/monix/kafka/benchmarks/BaseFixture.scala b/benchmarks/src/main/scala/monix/kafka/benchmarks/BaseFixture.scala new file mode 100644 index 00000000..5ac3f2d2 --- /dev/null +++ b/benchmarks/src/main/scala/monix/kafka/benchmarks/BaseFixture.scala @@ -0,0 +1,11 @@ +package monix.kafka.benchmarks + +import monix.eval.Coeval + +import scala.util.Random + +trait BaseFixture { + protected val brokerUrl = "127.0.0.1:9092" + protected val randomId: Coeval[String] = Coeval(Random.alphanumeric.filter(_.isLetter).take(20).mkString) + protected val monixTopic = "monix_topic" +} diff --git a/benchmarks/src/main/scala/monix/kafka/benchmarks/ConsumerBenchmark.scala b/benchmarks/src/main/scala/monix/kafka/benchmarks/ConsumerBenchmark.scala new file mode 100644 index 00000000..ad8dfee3 --- /dev/null +++ b/benchmarks/src/main/scala/monix/kafka/benchmarks/ConsumerBenchmark.scala @@ -0,0 +1,138 @@ +package monix.kafka.benchmarks + +import monix.kafka.config.ObservableCommitOrder.BeforeAck +import monix.kafka.config.ObservableCommitType + +import java.util.concurrent.TimeUnit +import monix.kafka.{KafkaConsumerObservable, KafkaProducerSink} +import monix.reactive.Observable +import org.apache.kafka.clients.producer.ProducerRecord +import org.openjdk.jmh.annotations.{BenchmarkMode, Fork, Measurement, Mode, OutputTimeUnit, Scope, State, Threads, Warmup, _} +import scala.concurrent.duration._ + +@State(Scope.Thread) +@BenchmarkMode(Array(Mode.Throughput)) +@OutputTimeUnit(TimeUnit.SECONDS) +@Measurement(iterations = 1) +@Warmup(iterations = 1) +@Fork(1) +@Threads(3) +class ConsumerBenchmark extends MonixFixture { + + var totalRecords: Int = 1100 + val consumedRecords = 1000 + var maxPollRecords: Int = 1 + + @Setup + def setup(): Unit = { + // preparing test data + Observable + .from(0 to totalRecords) + .map(i => new ProducerRecord[Integer, Integer](monixTopic, i)) + .bufferTumbling(totalRecords) + .consumeWith(KafkaProducerSink(producerConf.copy(monixSinkParallelism = 10), io)) + .runSyncUnsafe() + } + + @Benchmark + def monixAsyncAutoCommitHeartbeat15ms(): Unit = { + val conf = consumerConf.value().copy( + maxPollRecords = maxPollRecords, + observableCommitType = ObservableCommitType.Async, + observableCommitOrder = BeforeAck) + .withPollHeartBeatRate(15.millis) + + KafkaConsumerObservable[Integer, Integer](conf, List(monixTopic)) + .take(consumedRecords) + .headL + .runSyncUnsafe() + } + + @Benchmark + def monixSyncAutoCommitHeartbeat15ms(): Unit = { + val conf = consumerConf.value().copy( + maxPollRecords = maxPollRecords, + observableCommitType = ObservableCommitType.Sync, + observableCommitOrder = BeforeAck) + .withPollHeartBeatRate(15.millis) + + KafkaConsumerObservable[Integer, Integer](conf, List(monixTopic)) + .take(consumedRecords) + .headL + .runSyncUnsafe() + } + + @Benchmark + def monixManualCommitHeartbeat1ms(): Unit = { + val conf = consumerConf.value().copy(maxPollRecords = maxPollRecords) + .withPollHeartBeatRate(1.millis) + + KafkaConsumerObservable.manualCommit[Integer, Integer](conf, List(monixTopic)) + .mapEvalF(_.committableOffset.commitAsync()) + .take(consumedRecords) + .headL + .runSyncUnsafe() + } + + @Benchmark + def monixManualCommitHeartbeat10ms(): Unit = { + val conf = consumerConf.value().copy(maxPollRecords = maxPollRecords) + .withPollHeartBeatRate(10.millis) + + KafkaConsumerObservable.manualCommit[Integer, Integer](conf, List(monixTopic)) + .mapEvalF(_.committableOffset.commitAsync()) + .take(consumedRecords) + .headL + .runSyncUnsafe() + } + + @Benchmark + def monixManualCommitHeartbeat15ms(): Unit = { + val conf = consumerConf.value().copy(maxPollRecords = maxPollRecords) + .withPollHeartBeatRate(15.millis) + + KafkaConsumerObservable.manualCommit[Integer, Integer](conf, List(monixTopic)) + .mapEvalF(_.committableOffset.commitAsync()) + .take(consumedRecords) + .headL + .runSyncUnsafe() + } + + @Benchmark + def monixManualCommitHeartbeat50ms(): Unit = { + val conf = consumerConf.value().copy(maxPollRecords = maxPollRecords) + .withPollHeartBeatRate(50.millis) + + KafkaConsumerObservable.manualCommit[Integer, Integer](conf, List(monixTopic)) + .mapEvalF(_.committableOffset.commitAsync()) + .take(consumedRecords) + .headL + .runSyncUnsafe() + } + + @Benchmark + def monixManualCommitHeartbeat100ms(): Unit = { + val conf = consumerConf.value().copy(maxPollRecords = maxPollRecords) + .withPollHeartBeatRate(100.millis) + + KafkaConsumerObservable.manualCommit[Integer, Integer](conf, List(monixTopic)) + .mapEvalF(_.committableOffset.commitAsync()) + .take(consumedRecords) + .headL + .runSyncUnsafe() + } + + @Benchmark + def monixManualCommitHeartbeat1000ms(): Unit = { + val conf = consumerConf.value().copy(maxPollRecords = maxPollRecords) + .withPollHeartBeatRate(1000.millis) + + KafkaConsumerObservable.manualCommit[Integer, Integer](conf, List(monixTopic)) + .mapEvalF(_.committableOffset.commitAsync()) + .take(consumedRecords) + .headL + .runSyncUnsafe() + } + + +} diff --git a/benchmarks/src/main/scala/monix/kafka/benchmarks/MonixFixture.scala b/benchmarks/src/main/scala/monix/kafka/benchmarks/MonixFixture.scala new file mode 100644 index 00000000..e99d9a6a --- /dev/null +++ b/benchmarks/src/main/scala/monix/kafka/benchmarks/MonixFixture.scala @@ -0,0 +1,27 @@ +package monix.kafka.benchmarks + +import monix.eval.Coeval +import monix.execution.Scheduler +import monix.kafka.config.{AutoOffsetReset => MonixAutoOffsetReset} +import monix.kafka.{KafkaConsumerConfig, KafkaProducerConfig} + +trait MonixFixture extends BaseFixture{ + + implicit lazy val io = Scheduler.io("monix-kafka-benchmark") + + val producerConf = KafkaProducerConfig.default.copy( + bootstrapServers = List(brokerUrl), + clientId = monixTopic, + ) + + // we set a different group id every time so then the consumer will always + // read from the beginning + val consumerConf = Coeval(KafkaConsumerConfig.default.copy( + bootstrapServers = List(brokerUrl), + clientId = monixTopic, + groupId = randomId.value(), + enableAutoCommit = false, + autoOffsetReset = MonixAutoOffsetReset.Earliest + )) + +} diff --git a/benchmarks/src/main/scala/monix/kafka/benchmarks/ProducerBenchmark.scala b/benchmarks/src/main/scala/monix/kafka/benchmarks/ProducerBenchmark.scala new file mode 100644 index 00000000..c29c05db --- /dev/null +++ b/benchmarks/src/main/scala/monix/kafka/benchmarks/ProducerBenchmark.scala @@ -0,0 +1,44 @@ +package monix.kafka.benchmarks + +import org.openjdk.jmh.annotations.{BenchmarkMode, Fork, Measurement, Mode, OutputTimeUnit, Scope, State, Threads, Warmup} + +import java.util.concurrent.TimeUnit +import monix.eval.Task +import monix.kafka.{KafkaProducer, KafkaProducerSink} +import monix.reactive.Observable +import org.apache.kafka.clients.producer.ProducerRecord +import org.openjdk.jmh.annotations._ + +import scala.concurrent.duration._ +import scala.concurrent.Await + +@State(Scope.Thread) +@BenchmarkMode(Array(Mode.Throughput)) +@OutputTimeUnit(TimeUnit.SECONDS) +@Measurement(iterations = 3) +@Warmup(iterations = 1) +@Fork(1) +@Threads(1) +class ProducerBenchmark extends MonixFixture { + + var size: Int = 1000 + val producer = KafkaProducer[Integer, Integer](producerConf, io) + + @Benchmark + def monix_single_producer(): Unit = { + val f = Task.traverse((0 until size).toList)(i => producer.send(topic = monixTopic, i)).runToFuture(io) + Await.ready(f, Duration.Inf) + } + + @Benchmark + def monix_sink_producer(): Unit = { + val f = Observable + .from(1 to size) + .map(i => new ProducerRecord[Integer, Integer](monixTopic, i)) + .bufferTumbling(50) + .consumeWith(KafkaProducerSink(producerConf.copy(monixSinkParallelism = 10), io)) + .runToFuture(io) + Await.ready(f, Duration.Inf) + } + +} diff --git a/benchmarks/src/test/scala/monix/kafka/benchmarks/ConsumerSpec.scala b/benchmarks/src/test/scala/monix/kafka/benchmarks/ConsumerSpec.scala new file mode 100644 index 00000000..b2009471 --- /dev/null +++ b/benchmarks/src/test/scala/monix/kafka/benchmarks/ConsumerSpec.scala @@ -0,0 +1,51 @@ +package monix.kafka.benchmarks + +import monix.execution.Scheduler.Implicits.global +import monix.kafka.{KafkaConsumerObservable, KafkaProducer, KafkaProducerSink} +import monix.reactive.Observable +import org.apache.kafka.clients.producer.ProducerRecord +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + +import scala.concurrent.Await +import scala.concurrent.duration._ + +class ConsumerSpec extends FlatSpec with MonixFixture with Matchers with BeforeAndAfterAll { + + + override def beforeAll(): Unit = { + super.beforeAll() + Observable + .from(1 to 1000) + .map(i => new ProducerRecord[Integer, Integer](monixTopic, i)) + .bufferTumbling(100) + .consumeWith(KafkaProducerSink(producerConf.copy(monixSinkParallelism = 10), io)) + .runSyncUnsafe() + } + + s"Monix ${monixTopic}" should "exist" in { + val producer = KafkaProducer[String, String](producerConf, global) + val t = producer.send(topic = monixTopic, "test") + + t.runSyncUnsafe().isDefined shouldBe true + } + + it should "consume with autocommit" in { + val conf = consumerConf.value() + + val f2 = KafkaConsumerObservable[Integer, Integer](conf, List(monixTopic)).take(1000).toListL.runToFuture(io) + + val elements = Await.result(f2, 10.seconds) + elements.size shouldBe 1000 + } + + it should "consume with manual commit" in { + val conf = consumerConf.value() + + val f2 = KafkaConsumerObservable.manualCommit[Integer, Integer](conf, List(monixTopic)) + .take(1000).toListL.runToFuture(io) + + val elements = Await.result(f2, 10.seconds) + elements.size shouldBe 1000 + } + +} diff --git a/benchmarks/src/test/scala/monix/kafka/benchmarks/ProducerSpec.scala b/benchmarks/src/test/scala/monix/kafka/benchmarks/ProducerSpec.scala new file mode 100644 index 00000000..c5c292ca --- /dev/null +++ b/benchmarks/src/test/scala/monix/kafka/benchmarks/ProducerSpec.scala @@ -0,0 +1,17 @@ +package monix.kafka.benchmarks + +import monix.kafka.KafkaProducer +import monix.execution.Scheduler.Implicits.global +import org.scalatest.{FlatSpec, Matchers} + +class ProducerSpec extends FlatSpec with MonixFixture with Matchers { + + val producer = KafkaProducer[String, String](producerConf, global) + + s"Monix ${monixTopic}" should "exist befor running Producer Benchmark" in { + val t = producer.send(topic = monixTopic, "test") + + t.runSyncUnsafe().isDefined shouldBe true + } + +} diff --git a/benchmarks/start-kafka-cluster.sh b/benchmarks/start-kafka-cluster.sh new file mode 100644 index 00000000..e20ff4fa --- /dev/null +++ b/benchmarks/start-kafka-cluster.sh @@ -0,0 +1,21 @@ +#!/usr/bin/env bash + +set -e + +function create_topic { + TOPIC_NAME=$1 + PARTITIONS=$2 + REPLICATION_FACTOR=$3 + echo "Creating topic ${TOPIC_NAME} with ${PARTITIONS} partitions and replication factor of ${REPLICATION_FACTOR}." + docker-compose -f ./docker-compose.yml exec -T broker kafka-topics --create --topic ${TOPIC_NAME} --partitions ${PARTITIONS} --replication-factor ${REPLICATION_FACTOR} --if-not-exists --zookeeper zookeeper:2181 +} + +echo "Starting Kafka cluster..." +docker-compose -f ./docker-compose.yml up -d zookeeper broker + +echo -e "Docker ps..." +docker ps + +sleep 15 + +create_topic monix_topic 2 1 \ No newline at end of file diff --git a/build.sbt b/build.sbt index 354105d1..e17931e4 100644 --- a/build.sbt +++ b/build.sbt @@ -1,3 +1,5 @@ +import pl.project13.scala.sbt.JmhPlugin + val monixVersion = "3.3.0" val allProjects = List( @@ -197,8 +199,7 @@ lazy val commonDependencies = Seq( // For testing ... "ch.qos.logback" % "logback-classic" % "1.2.3" % "test", "org.scalatest" %% "scalatest" % "3.0.9" % "test", - "org.scalacheck" %% "scalacheck" % "1.15.2" % "test" - ) + "org.scalacheck" %% "scalacheck" % "1.15.2" % "test") ) lazy val monixKafka = project.in(file(".")) @@ -213,7 +214,7 @@ lazy val kafka1x = project.in(file("kafka-1.0.x")) .settings( name := "monix-kafka-1x", libraryDependencies ++= { - if (!(scalaVersion.value startsWith "2.13")) Seq("net.manub" %% "scalatest-embedded-kafka" % "1.0.0" % "test" exclude ("log4j", "log4j")) + if (!(scalaVersion.value startsWith "2.13")) Seq("net.manub" %% "scalatest-embedded-kafka" % "1.1.1" % "test" exclude ("log4j", "log4j")) else Seq.empty[ModuleID] }, libraryDependencies += "org.apache.kafka" % "kafka-clients" % "1.0.2" exclude("org.slf4j","slf4j-log4j12") exclude("log4j", "log4j") @@ -226,7 +227,7 @@ lazy val kafka11 = project.in(file("kafka-0.11.x")) .settings( name := "monix-kafka-11", libraryDependencies ++= { - if (!(scalaVersion.value startsWith "2.13")) Seq("net.manub" %% "scalatest-embedded-kafka" % "1.0.0" % "test" exclude ("log4j", "log4j")) + if (!(scalaVersion.value startsWith "2.13")) Seq("net.manub" %% "scalatest-embedded-kafka" % "1.1.1" % "test" exclude ("log4j", "log4j")) else Seq.empty[ModuleID] }, libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.3" exclude("org.slf4j","slf4j-log4j12") exclude("log4j", "log4j") @@ -256,6 +257,22 @@ lazy val kafka9 = project.in(file("kafka-0.9.x")) ) ) +lazy val benchmarks = project.in(file("benchmarks")) + .settings(sharedSettings) + .settings(commonDependencies) + .settings( + scalacOptions += "-Ypartial-unification", + name := "benchmarks", + organization := "io.monix", + scalaVersion := "2.12.10", + libraryDependencies ++= Seq("org.scala-lang.modules" %% "scala-collection-compat" % "2.3.2") + ) + .enablePlugins(JmhPlugin) + .aggregate(kafka1x) + .dependsOn(kafka1x) + +scalacOptions += "-Ypartial-unification" + //------------- For Release enablePlugins(GitVersioning) diff --git a/kafka-0.10.x/src/main/scala/monix/kafka/Commit.scala b/kafka-0.10.x/src/main/scala/monix/kafka/Commit.scala index ba56a332..b174ef76 100644 --- a/kafka-0.10.x/src/main/scala/monix/kafka/Commit.scala +++ b/kafka-0.10.x/src/main/scala/monix/kafka/Commit.scala @@ -18,22 +18,18 @@ package monix.kafka import monix.eval.Task import org.apache.kafka.common.TopicPartition -import org.apache.kafka.clients.consumer.OffsetCommitCallback /** Callback for batched commit realized as closure in [[KafkaConsumerObservable]] context. */ trait Commit { def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] - def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] - final def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit] = commitBatchAsync(batch, null) + def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit] } private[kafka] object Commit { val empty: Commit = new Commit { override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit - - override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] = - Task.unit + override def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit } } diff --git a/kafka-0.10.x/src/main/scala/monix/kafka/CommittableOffset.scala b/kafka-0.10.x/src/main/scala/monix/kafka/CommittableOffset.scala index 71b7da86..9bf00254 100644 --- a/kafka-0.10.x/src/main/scala/monix/kafka/CommittableOffset.scala +++ b/kafka-0.10.x/src/main/scala/monix/kafka/CommittableOffset.scala @@ -17,7 +17,6 @@ package monix.kafka import monix.eval.Task -import org.apache.kafka.clients.consumer.OffsetCommitCallback import org.apache.kafka.common.TopicPartition /** Represents offset for specified topic and partition that can be @@ -46,12 +45,6 @@ final class CommittableOffset private[kafka] ( * to use batched commit with [[CommittableOffsetBatch]] class. */ def commitAsync(): Task[Unit] = commitCallback.commitBatchAsync(Map(topicPartition -> offset)) - - /** Asynchronously commits [[offset]] to Kafka. It is recommended - * to use batched commit with [[CommittableOffsetBatch]] class. - */ - def commitAsync(callback: OffsetCommitCallback): Task[Unit] = - commitCallback.commitBatchAsync(Map(topicPartition -> offset), callback) } object CommittableOffset { diff --git a/kafka-0.10.x/src/main/scala/monix/kafka/CommittableOffsetBatch.scala b/kafka-0.10.x/src/main/scala/monix/kafka/CommittableOffsetBatch.scala index 4bfee4ad..7d7d2bad 100644 --- a/kafka-0.10.x/src/main/scala/monix/kafka/CommittableOffsetBatch.scala +++ b/kafka-0.10.x/src/main/scala/monix/kafka/CommittableOffsetBatch.scala @@ -17,7 +17,6 @@ package monix.kafka import monix.eval.Task -import org.apache.kafka.clients.consumer.OffsetCommitCallback import org.apache.kafka.common.TopicPartition /** Batch of Kafka offsets which can be committed together. @@ -45,10 +44,6 @@ final class CommittableOffsetBatch private[kafka] (val offsets: Map[TopicPartiti */ def commitAsync(): Task[Unit] = commitCallback.commitBatchAsync(offsets) - /** Asynchronously commits [[offsets]] to Kafka - */ - def commitAsync(callback: OffsetCommitCallback): Task[Unit] = commitCallback.commitBatchAsync(offsets) - /** Adds new [[CommittableOffset]] to batch. Added offset replaces previous one specified * for same topic and partition. */ diff --git a/kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala b/kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala index 82178bcc..007c694a 100644 --- a/kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala +++ b/kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala @@ -18,8 +18,8 @@ package monix.kafka import java.io.File import java.util.Properties - import com.typesafe.config.{Config, ConfigFactory} +import monix.execution.internal.InternalApi import monix.kafka.config._ import scala.jdk.CollectionConverters._ @@ -286,8 +286,16 @@ final case class KafkaConsumerConfig( "retry.backoff.ms" -> retryBackoffTime.toMillis.toString ) + private[kafka] var pollHeartbeatRate: FiniteDuration = 15.millis + + @InternalApi + private[kafka] def withPollHeartBeatRate(interval: FiniteDuration): KafkaConsumerConfig = { + pollHeartbeatRate = interval + this + } + def toJavaMap: java.util.Map[String, Object] = - toMap.filter(_._2 != null).map{case (a, b) =>(a, b.asInstanceOf[AnyRef])}.asJava + toMap.filter(_._2 != null).map { case (a, b) => (a, b.asInstanceOf[AnyRef]) }.asJava def toProperties: Properties = { val props = new Properties() diff --git a/kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala b/kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala index 04d1e409..dea323ef 100644 --- a/kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala +++ b/kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala @@ -18,16 +18,16 @@ package monix.kafka import monix.eval.Task import monix.execution.Ack.{Continue, Stop} -import monix.execution.{Ack, Callback, Cancelable} +import monix.execution.{Ack, Callback, Cancelable, Scheduler} import monix.kafka.config.ObservableCommitOrder import monix.reactive.Observable import monix.reactive.observers.Subscriber import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, KafkaConsumer} - import scala.jdk.CollectionConverters._ import scala.concurrent.blocking +import scala.concurrent.duration.DurationInt import scala.util.matching.Regex /** Exposes an `Observable` that consumes a Kafka stream by @@ -39,7 +39,10 @@ import scala.util.matching.Regex */ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] { protected def config: KafkaConsumerConfig - protected def consumer: Task[Consumer[K, V]] + protected def consumerT: Task[Consumer[K, V]] + + @volatile + protected var isAcked = true /** Creates a task that polls the source, then feeds the downstream * subscriber, returning the resulting acknowledgement @@ -62,23 +65,25 @@ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] { private def feedTask(out: Subscriber[Out]): Task[Unit] = { Task.create { (scheduler, cb) => implicit val s = scheduler - val feedTask = consumer.flatMap { c => - // Skipping all available messages on all partitions - if (config.observableSeekOnStart.isSeekEnd) c.seekToEnd(Nil.asJavaCollection) - else if (config.observableSeekOnStart.isSeekBeginning) c.seekToBeginning(Nil.asJavaCollection) - // A task to execute on both cancellation and normal termination - val onCancel = cancelTask(c) - runLoop(c, out).guarantee(onCancel) - } - feedTask.runAsync(cb) + val startConsuming = + consumerT.bracket { c => + // Skipping all available messages on all partitions + if (config.observableSeekOnStart.isSeekEnd) c.seekToEnd(Nil.asJavaCollection) + else if (config.observableSeekOnStart.isSeekBeginning) c.seekToBeginning(Nil.asJavaCollection) + Task.race(runLoop(c, out), pollHeartbeat(c).loopForever).void + } { consumer => + // Forced asynchronous boundary + Task.evalAsync(consumer.synchronized(blocking(consumer.close()))) + } + startConsuming.runAsync(cb) } } - /* Returns a task that continuously polls the `KafkaConsumer` for - * new messages and feeds the given subscriber. - * - * Creates an asynchronous boundary on every poll. - */ + /** Returns a task that continuously polls the `KafkaConsumer` for + * new messages and feeds the given subscriber. + * + * Creates an asynchronous boundary on every poll. + */ private def runLoop(consumer: Consumer[K, V], out: Subscriber[Out]): Task[Unit] = { ackTask(consumer, out).flatMap { case Stop => Task.unit @@ -86,20 +91,35 @@ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] { } } - /* Returns a `Task` that triggers the closing of the - * Kafka Consumer connection. - */ - private def cancelTask(consumer: Consumer[K, V]): Task[Unit] = { - // Forced asynchronous boundary - val cancelTask = Task.evalAsync { - consumer.synchronized(blocking(consumer.close())) - } - - // By applying memoization, we are turning this - // into an idempotent action, such that we are - // guaranteed that consumer.close() happens - // at most once - cancelTask.memoize + /** Returns task that constantly polls the `KafkaConsumer` in case subscriber + * is still processing last fed batch. + * This allows producer process commit calls and also keeps consumer alive even + * with long batch processing. + * + * If polling fails the error is reported to the subscriber through the scheduler. + * + * @see [[https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread]] + */ + private def pollHeartbeat(consumer: Consumer[K, V])(implicit scheduler: Scheduler): Task[Unit] = { + Task.sleep(config.pollHeartbeatRate) >> + Task.eval { + if (!isAcked) { + consumer.synchronized { + // needed in order to ensure that the consumer assignment + // is paused, meaning that no messages will get lost. + val assignment = consumer.assignment() + consumer.pause(assignment) + val records = blocking(consumer.poll(0)) + if (!records.isEmpty) { + val errorMsg = s"Received ${records.count()} unexpected messages." + throw new IllegalStateException(errorMsg) + } + } + } + }.onErrorHandleWith { ex => + Task.now(scheduler.reportFailure(ex)) >> + Task.sleep(1.second) + } } } diff --git a/kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerObservableAutoCommit.scala b/kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerObservableAutoCommit.scala index 6406c224..911bd97a 100644 --- a/kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerObservableAutoCommit.scala +++ b/kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerObservableAutoCommit.scala @@ -34,7 +34,7 @@ import scala.util.{Failure, Success} */ final class KafkaConsumerObservableAutoCommit[K, V] private[kafka] ( override protected val config: KafkaConsumerConfig, - override protected val consumer: Task[Consumer[K, V]]) + override protected val consumerT: Task[Consumer[K, V]]) extends KafkaConsumerObservable[K, V, ConsumerRecord[K, V]] { /* Based on the [[KafkaConsumerConfig.observableCommitType]] it @@ -68,12 +68,16 @@ final class KafkaConsumerObservableAutoCommit[K, V] private[kafka] ( s.executeAsync { () => val ackFuture = try consumer.synchronized { + val assignment = consumer.assignment() if (cancelable.isCanceled) Stop else { + consumer.resume(assignment) val next = blocking(consumer.poll(pollTimeoutMillis)) + consumer.pause(assignment) if (shouldCommitBefore) consumerCommit(consumer) // Feeding the observer happens on the Subscriber's scheduler // if any asynchronous boundaries happen + isAcked = false Observer.feed(out, next.asScala)(out.scheduler) } } catch { @@ -83,6 +87,7 @@ final class KafkaConsumerObservableAutoCommit[K, V] private[kafka] ( ackFuture.syncOnComplete { case Success(ack) => + isAcked = true // The `streamError` flag protects against contract violations // (i.e. onSuccess/onError should happen only once). // Not really required, but we don't want to depend on the @@ -106,6 +111,7 @@ final class KafkaConsumerObservableAutoCommit[K, V] private[kafka] ( } case Failure(ex) => + isAcked = true asyncCb.onError(ex) } } diff --git a/kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala b/kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala index 58d7302b..1f924b96 100644 --- a/kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala +++ b/kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala @@ -25,6 +25,7 @@ import monix.reactive.observers.Subscriber import org.apache.kafka.clients.consumer.{Consumer, OffsetAndMetadata, OffsetCommitCallback} import org.apache.kafka.common.TopicPartition +import java.util import scala.jdk.CollectionConverters._ import scala.concurrent.{blocking, Future} import scala.util.control.NonFatal @@ -36,7 +37,7 @@ import scala.util.{Failure, Success} */ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] ( override protected val config: KafkaConsumerConfig, - override protected val consumer: Task[Consumer[K, V]]) + override protected val consumerT: Task[Consumer[K, V]]) extends KafkaConsumerObservable[K, V, CommittableMessage[K, V]] { // Caching value to save CPU cycles @@ -49,16 +50,26 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] ( k -> new OffsetAndMetadata(v) }.asJava)))) - override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] = - Task { - blocking( - consumer.synchronized( - consumer.commitAsync( - batch.map { case (k, v) => - k -> new OffsetAndMetadata(v) - }.asJava, - callback))) - } + override def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit] = { + Task + .async0[Unit] { (s, cb) => + val asyncCb = Callback.forked(cb)(s) + s.executeAsync { () => + val offsets = batch.map { case (k, v) => k -> new OffsetAndMetadata(v) }.asJava + val offsetCommitCallback = new OffsetCommitCallback { + def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], ex: Exception): Unit = + if (ex != null && !cb.tryOnError(ex)) { s.reportFailure(ex) } + else { cb.tryOnSuccess(()) } + } + try { + consumer.synchronized(consumer.commitAsync(offsets, offsetCommitCallback)) + } catch { + case NonFatal(ex) => + if (!asyncCb.tryOnError(ex)) { s.reportFailure(ex) } + } + } + } + } } override protected def ackTask(consumer: Consumer[K, V], out: Subscriber[CommittableMessage[K, V]]): Task[Ack] = @@ -66,18 +77,18 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] ( implicit val s = scheduler val asyncCb = Callback.forked(cb) val cancelable = BooleanCancelable() - val commit: Commit = CommitWithConsumer(consumer) // Forced asynchronous boundary (on the I/O scheduler) s.executeAsync { () => - val ackFuture = - try consumer.synchronized { - if (cancelable.isCanceled) Stop - else { + val ackFuture: Future[Ack] = + if (cancelable.isCanceled) Stop + else { + try consumer.synchronized { + val assignment = consumer.assignment() + consumer.resume(assignment) val next = blocking(consumer.poll(pollTimeoutMillis)) - // Feeding the observer happens on the Subscriber's scheduler - // if any asynchronous boundaries happen + consumer.pause(assignment) val result = next.asScala.map { record => CommittableMessage( record, @@ -86,15 +97,19 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] ( record.offset() + 1, commit)) } + // Feeding the observer happens on the Subscriber's scheduler + // if any asynchronous boundaries happen + isAcked = false Observer.feed(out, result)(out.scheduler) + } catch { + case NonFatal(ex) => + Future.failed(ex) } - } catch { - case NonFatal(ex) => - Future.failed(ex) } ackFuture.syncOnComplete { case Success(ack) => + isAcked = true // The `streamError` flag protects against contract violations // (i.e. onSuccess/onError should happen only once). // Not really required, but we don't want to depend on the @@ -117,6 +132,7 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] ( } case Failure(ex) => + isAcked = true asyncCb.onError(ex) } } diff --git a/kafka-0.10.x/src/main/scala/monix/kafka/KafkaProducerConfig.scala b/kafka-0.10.x/src/main/scala/monix/kafka/KafkaProducerConfig.scala index 90656598..de704730 100644 --- a/kafka-0.10.x/src/main/scala/monix/kafka/KafkaProducerConfig.scala +++ b/kafka-0.10.x/src/main/scala/monix/kafka/KafkaProducerConfig.scala @@ -272,7 +272,7 @@ case class KafkaProducerConfig( ) def toJavaMap: java.util.Map[String, Object] = - toMap.filter(_._2 != null).map{case (a, b) =>(a, b.asInstanceOf[AnyRef])}.asJava + toMap.filter(_._2 != null).map { case (a, b) => (a, b.asInstanceOf[AnyRef]) }.asJava def toProperties: Properties = { val props = new Properties() diff --git a/kafka-0.10.x/src/test/resources/logback.xml b/kafka-0.10.x/src/test/resources/logback.xml index 2beb83c9..6b2cd9ed 100644 --- a/kafka-0.10.x/src/test/resources/logback.xml +++ b/kafka-0.10.x/src/test/resources/logback.xml @@ -8,4 +8,15 @@ + + + + + + + + + + + diff --git a/kafka-0.10.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala b/kafka-0.10.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala index 6a03f5b4..7d854a66 100644 --- a/kafka-0.10.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala +++ b/kafka-0.10.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala @@ -9,7 +9,6 @@ import org.scalatest.{FunSuite, Matchers} import scala.concurrent.duration._ import scala.concurrent.Await import monix.execution.Scheduler.Implicits.global -import org.apache.kafka.clients.consumer.OffsetCommitCallback import org.apache.kafka.common.TopicPartition import org.scalacheck.Gen import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks @@ -18,9 +17,7 @@ class MergeByCommitCallbackTest extends FunSuite with KafkaTestKit with ScalaChe val commitCallbacks: List[Commit] = List.fill(4)(new Commit { override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit - - override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] = - Task.unit + override def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit }) val committableOffsetsGen: Gen[CommittableOffset] = for { diff --git a/kafka-0.10.x/src/test/scala/monix/kafka/MonixKafkaTopicListTest.scala b/kafka-0.10.x/src/test/scala/monix/kafka/MonixKafkaTopicListTest.scala index d1810ffe..02bd40fe 100644 --- a/kafka-0.10.x/src/test/scala/monix/kafka/MonixKafkaTopicListTest.scala +++ b/kafka-0.10.x/src/test/scala/monix/kafka/MonixKafkaTopicListTest.scala @@ -24,6 +24,7 @@ import monix.reactive.Observable import org.apache.kafka.common.TopicPartition import org.apache.kafka.clients.producer.ProducerRecord import org.scalatest.FunSuite + import scala.concurrent.Await import scala.concurrent.duration._ import scala.collection.JavaConverters._ @@ -155,4 +156,37 @@ class MonixKafkaTopicListTest extends FunSuite with KafkaTestKit { assert(first.isDefined && second.isRight && third.isEmpty) } } + + test("manual async commit consumer test when subscribed to topics list") { + withRunningKafka { + + val count = 10000 + val topicName = "monix-kafka-manual-commit-tests" + + val producer = KafkaProducerSink[String, String](producerCfg, io) + val consumer = KafkaConsumerObservable.manualCommit[String, String](consumerCfg, List(topicName)) + + val pushT = Observable + .range(0, count) + .map(msg => new ProducerRecord(topicName, "obs", msg.toString)) + .bufferIntrospective(1024) + .consumeWith(producer) + + val listT = consumer + .executeOn(io) + .bufferTumbling(count) + .map { messages => + messages.map(_.record.value()) -> CommittableOffsetBatch(messages.map(_.committableOffset)) + } + .mapEval { case (values, batch) => Task.shift *> batch.commitAsync().map(_ => values -> batch.offsets) } + .headL + + val ((result, offsets), _) = + Await.result(Task.parZip2(listT.executeAsync, pushT.executeAsync).runToFuture, 100.seconds) + + val properOffsets = Map(new TopicPartition(topicName, 0) -> 10000) + assert(result.map(_.toInt).sum === (0 until count).sum && offsets === properOffsets) + } + } + } diff --git a/kafka-0.10.x/src/test/scala/monix/kafka/MonixKafkaTopicRegexTest.scala b/kafka-0.10.x/src/test/scala/monix/kafka/MonixKafkaTopicRegexTest.scala index d62077b0..f09c6a1b 100644 --- a/kafka-0.10.x/src/test/scala/monix/kafka/MonixKafkaTopicRegexTest.scala +++ b/kafka-0.10.x/src/test/scala/monix/kafka/MonixKafkaTopicRegexTest.scala @@ -45,8 +45,8 @@ class MonixKafkaTopicRegexTest extends FunSuite with KafkaTestKit { ) test("publish one message when subscribed to topics regex") { - withRunningKafka { + val producer = KafkaProducer[String, String](producerCfg, io) val consumerTask = KafkaConsumerObservable.createConsumer[String, String](consumerCfg, topicsRegex).executeOn(io) val consumer = Await.result(consumerTask.runToFuture, 60.seconds) @@ -66,6 +66,7 @@ class MonixKafkaTopicRegexTest extends FunSuite with KafkaTestKit { } test("listen for one message when subscribed to topics regex") { + withRunningKafka { val producer = KafkaProducer[String, String](producerCfg, io) val consumer = KafkaConsumerObservable[String, String](consumerCfg, topicsRegex).executeOn(io) diff --git a/kafka-0.10.x/src/test/scala/monix/kafka/PollHeartbeatTest.scala b/kafka-0.10.x/src/test/scala/monix/kafka/PollHeartbeatTest.scala new file mode 100644 index 00000000..2d4ab8fb --- /dev/null +++ b/kafka-0.10.x/src/test/scala/monix/kafka/PollHeartbeatTest.scala @@ -0,0 +1,261 @@ +/* + * Copyright (c) 2014-2019 by its authors. Some rights reserved. + * See the project homepage at: https://github.com/monix/monix-kafka + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.kafka + +import monix.eval.Task +import monix.execution.Scheduler.Implicits.global +import monix.kafka.config.AutoOffsetReset +import monix.reactive.Observable +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.TopicPartition +import org.scalatest.FunSuite +import org.scalatest.concurrent.ScalaFutures + +import scala.concurrent.duration._ + +class PollHeartbeatTest extends FunSuite with KafkaTestKit with ScalaFutures { + + val topicName = "monix-kafka-tests" + + val producerCfg = KafkaProducerConfig.default.copy( + bootstrapServers = List("127.0.0.1:6001"), + clientId = "monix-kafka-1-0-producer-test" + ) + + val consumerCfg = KafkaConsumerConfig.default.copy( + bootstrapServers = List("127.0.0.1:6001"), + groupId = "kafka-tests", + clientId = "monix-kafka-1-0-consumer-test", + autoOffsetReset = AutoOffsetReset.Earliest + ) + + test("slow downstream with small poll heartbeat and manual async commit keeps the consumer assignment") { + withRunningKafka { + + val count = 250 + val topicName = "monix-kafka-manual-commit-tests" + val delay = 200.millis + val pollHeartbeat = 2.millis + val fastPollHeartbeatConfig = + consumerCfg.copy(maxPollInterval = 200.millis, maxPollRecords = 1).withPollHeartBeatRate(pollHeartbeat) + + val producer = KafkaProducer[String, String](producerCfg, io) + val consumer = KafkaConsumerObservable.manualCommit[String, String](fastPollHeartbeatConfig, List(topicName)) + + val pushT = Observable + .fromIterable(1 to count) + .map(msg => new ProducerRecord(topicName, "obs", msg.toString)) + .mapEval(producer.send) + .lastL + + val listT = consumer + .executeOn(io) + .doOnNextF { committableMessage => + Task.sleep(delay) *> + CommittableOffsetBatch(Seq(committableMessage.committableOffset)).commitAsync().executeAsync + } + .take(count) + .toListL + + val (committableMessages, _) = Task.parZip2(listT.executeAsync, pushT.executeAsync).runSyncUnsafe(100.seconds) + val CommittableMessage(lastRecord, lastCommittableOffset) = committableMessages.last + assert(pollHeartbeat * 10 < delay) + assert((1 to count).sum === committableMessages.map(_.record.value().toInt).sum) + assert(lastRecord.value().toInt === count) + assert(count === lastCommittableOffset.offset) + assert(new TopicPartition(topicName, 0) === lastCommittableOffset.topicPartition) + } + } + + test("auto committable consumer with slow processing doesn't cause rebalancing") { + withRunningKafka { + val count = 10000 + + val consumerConfig = consumerCfg.copy( + maxPollInterval = 200.millis, + heartbeatInterval = 10.millis, + maxPollRecords = 1 + ) + + val producer = KafkaProducerSink[String, String](producerCfg, io) + val consumer = KafkaConsumerObservable[String, String](consumerConfig, List(topicName)).executeOn(io) + + val pushT = Observable + .range(0, count) + .map(msg => new ProducerRecord(topicName, "obs", msg.toString)) + .bufferIntrospective(1024) + .consumeWith(producer) + + val listT = consumer + .take(count) + .map(_.value()) + .bufferTumbling(count / 4) + .mapEval(s => Task.sleep(2.second) >> Task.delay(s)) + .flatMap(Observable.fromIterable) + .toListL + + val (result, _) = Task.parZip2(listT.executeAsync, pushT.delayExecution(1.second).executeAsync).runSyncUnsafe() + assert(result.map(_.toInt).sum === (0 until count).sum) + } + } + + test("slow committable downstream with small poll heartbeat does not cause rebalancing") { + withRunningKafka { + val totalRecords = 1000 + val topicName = "monix-kafka-manual-commit-tests" + val downstreamLatency = 40.millis + val pollHeartbeat = 1.millis + val maxPollInterval = 10.millis + val maxPollRecords = 1 + val fastPollHeartbeatConfig = + consumerCfg + .copy(maxPollInterval = 200.millis, maxPollRecords = maxPollRecords) + .withPollHeartBeatRate(pollHeartbeat) + + val producer = KafkaProducer[String, String](producerCfg, io) + val consumer = KafkaConsumerObservable.manualCommit[String, String](fastPollHeartbeatConfig, List(topicName)) + + val pushT = Observable + .fromIterable(1 to totalRecords) + .map(msg => new ProducerRecord(topicName, "obs", msg.toString)) + .mapEval(producer.send) + .lastL + + val listT = consumer + .executeOn(io) + .mapEvalF { committableMessage => + val manualCommit = Task + .defer(committableMessage.committableOffset.commitAsync()) + .as(committableMessage) + Task.sleep(downstreamLatency) *> manualCommit + } + .take(totalRecords) + .toListL + + val (committableMessages, _) = + Task.parZip2(listT.executeAsync, pushT.delayExecution(100.millis).executeAsync).runSyncUnsafe() + val CommittableMessage(lastRecord, lastCommittableOffset) = committableMessages.last + assert(pollHeartbeat * 10 < downstreamLatency) + assert(pollHeartbeat < maxPollInterval) + assert(maxPollInterval < downstreamLatency) + assert((1 to totalRecords).sum === committableMessages.map(_.record.value().toInt).sum) + assert(lastRecord.value().toInt === totalRecords) + assert(totalRecords === lastCommittableOffset.offset) + assert(new TopicPartition(topicName, 0) === lastCommittableOffset.topicPartition) + } + } + + //unhappy scenario + test("slow committable downstream with small `maxPollInterval` and high `pollHeartBeat` causes consumer rebalance") { + withRunningKafka { + val totalRecords = 200 + val topicName = "monix-kafka-manual-commit-tests" + val downstreamLatency = 2.seconds + val pollHeartbeat = 15.seconds + val maxPollInterval = 100.millis + val fastPollHeartbeatConfig = + consumerCfg.copy(maxPollInterval = maxPollInterval, maxPollRecords = 1).withPollHeartBeatRate(pollHeartbeat) + + val producer = KafkaProducer[String, String](producerCfg, io) + val consumer = KafkaConsumerObservable.manualCommit[String, String](fastPollHeartbeatConfig, List(topicName)) + + val pushT = Observable + .fromIterable(1 to totalRecords) + .map(msg => new ProducerRecord(topicName, "obs", msg.toString)) + .mapEval(producer.send) + .lastL + + val listT = consumer + .executeOn(io) + .mapEvalF { committableMessage => + val manualCommit = Task + .defer(committableMessage.committableOffset.commitAsync()) + .as(committableMessage) + Task.sleep(downstreamLatency) *> manualCommit + } + .take(totalRecords) + .toListL + + assert(pollHeartbeat > downstreamLatency) + assert(maxPollInterval < downstreamLatency) + assert(fastPollHeartbeatConfig.pollHeartbeatRate === pollHeartbeat) + + val f = Task.parZip2(listT.executeAsync, pushT.executeAsync).map(_._1).delayResult(50.seconds).runToFuture + + /* checks that value never returns, + * which is correct in this scenario since if the `maxPollInterval` + * is higher than `pollHeartBeat` and `downstreamLatency` + * on the other hand, it would be ideal to receive the following error message from kafka + * "the group has already rebalanced and assigned the partitions to another member" + * as it happens from kafka-client 1.1.0, see tests from kafka1x. + */ + assert(f.value === None) + } + } + + test("super slow committable downstream causes consumer rebalance") { + withRunningKafka { + val totalRecords = 3 + val topicName = "monix-kafka-manual-commit-tests" + val downstreamLatency = 55.seconds + val pollHeartbeat = 5.seconds + val maxPollInterval = 4.seconds + // the downstreamLatency is higher than the `maxPollInterval` + // but smaller than `pollHeartBeat`, kafka will trigger rebalance + // and the consumer will be kicked out of the consumer group. + val fastPollHeartbeatConfig = + consumerCfg.copy(maxPollInterval = maxPollInterval, maxPollRecords = 1).withPollHeartBeatRate(pollHeartbeat) + + val producer = KafkaProducer[String, String](producerCfg, io) + val consumer = KafkaConsumerObservable.manualCommit[String, String](fastPollHeartbeatConfig, List(topicName)) + + val pushT = Observable + .fromIterable(1 to totalRecords) + .map(msg => new ProducerRecord(topicName, "obs", msg.toString)) + .mapEval(producer.send) + .lastL + + val listT = consumer + .executeOn(io) + .doOnNextF { committableMessage => + val manualCommit = Task.defer(committableMessage.committableOffset.commitAsync()) + Task.sleep(downstreamLatency) *> manualCommit + } + .take(totalRecords) + .toListL + + assert(pollHeartbeat * 10 < downstreamLatency) + assert(maxPollInterval * 10 < downstreamLatency) + assert(fastPollHeartbeatConfig.pollHeartbeatRate === pollHeartbeat) + + implicit val patienceConfig: PatienceConfig = PatienceConfig(30.seconds, 100.milliseconds) + + val f = Task.parZip2(listT.executeAsync, pushT.executeAsync).map(_._1).delayResult(50.seconds).runToFuture + + /* checks that value never returns, + * which is correct in this scenario since if the `maxPollInterval` + * is higher than `pollHeartBeat` and `downstreamLatency` + * on the other hand, it would be ideal to receive the following error message from kafka + * "the group has already rebalanced and assigned the partitions to another member" + * as it happens from kafka-client 1.1.0, see tests from kafka1x. + */ + assert(f.value === None) + } + } + +} diff --git a/kafka-0.10.x/src/test/scala/monix/kafka/SerializationTest.scala b/kafka-0.10.x/src/test/scala/monix/kafka/SerializationTest.scala index e1941115..1a80c91d 100644 --- a/kafka-0.10.x/src/test/scala/monix/kafka/SerializationTest.scala +++ b/kafka-0.10.x/src/test/scala/monix/kafka/SerializationTest.scala @@ -96,7 +96,7 @@ class SerializationTest extends FunSuite with KafkaTestKit { .map(_.value()) .toListL - val (result, _) = Await.result(Task.parZip2(listT, pushT).runToFuture, 60.seconds) + val (result, _) = Await.result(Task.parZip2(listT, pushT).runToFuture, 70.seconds) assert(result.map(_.value.toInt).sum === (0 until count).filter(_ % 2 == 0).sum) } } diff --git a/kafka-0.11.x/src/main/scala/monix/kafka/Commit.scala b/kafka-0.11.x/src/main/scala/monix/kafka/Commit.scala index ba56a332..b174ef76 100644 --- a/kafka-0.11.x/src/main/scala/monix/kafka/Commit.scala +++ b/kafka-0.11.x/src/main/scala/monix/kafka/Commit.scala @@ -18,22 +18,18 @@ package monix.kafka import monix.eval.Task import org.apache.kafka.common.TopicPartition -import org.apache.kafka.clients.consumer.OffsetCommitCallback /** Callback for batched commit realized as closure in [[KafkaConsumerObservable]] context. */ trait Commit { def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] - def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] - final def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit] = commitBatchAsync(batch, null) + def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit] } private[kafka] object Commit { val empty: Commit = new Commit { override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit - - override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] = - Task.unit + override def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit } } diff --git a/kafka-0.11.x/src/main/scala/monix/kafka/CommittableOffset.scala b/kafka-0.11.x/src/main/scala/monix/kafka/CommittableOffset.scala index 71b7da86..9bf00254 100644 --- a/kafka-0.11.x/src/main/scala/monix/kafka/CommittableOffset.scala +++ b/kafka-0.11.x/src/main/scala/monix/kafka/CommittableOffset.scala @@ -17,7 +17,6 @@ package monix.kafka import monix.eval.Task -import org.apache.kafka.clients.consumer.OffsetCommitCallback import org.apache.kafka.common.TopicPartition /** Represents offset for specified topic and partition that can be @@ -46,12 +45,6 @@ final class CommittableOffset private[kafka] ( * to use batched commit with [[CommittableOffsetBatch]] class. */ def commitAsync(): Task[Unit] = commitCallback.commitBatchAsync(Map(topicPartition -> offset)) - - /** Asynchronously commits [[offset]] to Kafka. It is recommended - * to use batched commit with [[CommittableOffsetBatch]] class. - */ - def commitAsync(callback: OffsetCommitCallback): Task[Unit] = - commitCallback.commitBatchAsync(Map(topicPartition -> offset), callback) } object CommittableOffset { diff --git a/kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala b/kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala index e1229c12..7dc0e606 100644 --- a/kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala +++ b/kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala @@ -18,8 +18,8 @@ package monix.kafka import java.io.File import java.util.Properties - import com.typesafe.config.{Config, ConfigFactory} +import monix.execution.internal.InternalApi import monix.kafka.config._ import scala.jdk.CollectionConverters._ @@ -292,8 +292,16 @@ final case class KafkaConsumerConfig( "retry.backoff.ms" -> retryBackoffTime.toMillis.toString ) + private[kafka] var pollHeartbeatRate: FiniteDuration = 15.millis + + @InternalApi + private[kafka] def withPollHeartBeatRate(interval: FiniteDuration): KafkaConsumerConfig = { + pollHeartbeatRate = interval + this + } + def toJavaMap: java.util.Map[String, Object] = - toMap.filter(_._2 != null).map{case (a, b) =>(a, b.asInstanceOf[AnyRef])}.asJava + toMap.filter(_._2 != null).map { case (a, b) => (a, b.asInstanceOf[AnyRef]) }.asJava def toProperties: Properties = { val props = new Properties() diff --git a/kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala b/kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala index bf457267..2e7db3f5 100644 --- a/kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala +++ b/kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala @@ -18,7 +18,7 @@ package monix.kafka import monix.eval.Task import monix.execution.Ack.{Continue, Stop} -import monix.execution.{Ack, Callback, Cancelable} +import monix.execution.{Ack, Callback, Cancelable, Scheduler} import monix.kafka.config.ObservableCommitOrder import monix.reactive.Observable import monix.reactive.observers.Subscriber @@ -26,6 +26,7 @@ import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, KafkaConsumer} import scala.jdk.CollectionConverters._ +import scala.concurrent.duration._ import scala.concurrent.blocking import scala.util.matching.Regex @@ -38,7 +39,11 @@ import scala.util.matching.Regex */ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] { protected def config: KafkaConsumerConfig - protected def consumer: Task[Consumer[K, V]] + + protected def consumerT: Task[Consumer[K, V]] + + @volatile + protected var isAcked = true /** Creates a task that polls the source, then feeds the downstream * subscriber, returning the resulting acknowledgement @@ -51,6 +56,7 @@ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] { val callback = new Callback[Throwable, Unit] { def onSuccess(value: Unit): Unit = out.onComplete() + def onError(ex: Throwable): Unit = out.onError(ex) } @@ -61,23 +67,25 @@ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] { private def feedTask(out: Subscriber[Out]): Task[Unit] = { Task.create { (scheduler, cb) => implicit val s = scheduler - val feedTask = consumer.flatMap { c => - // Skipping all available messages on all partitions - if (config.observableSeekOnStart.isSeekEnd) c.seekToEnd(Nil.asJavaCollection) - else if (config.observableSeekOnStart.isSeekBeginning) c.seekToBeginning(Nil.asJavaCollection) - // A task to execute on both cancellation and normal termination - val onCancel = cancelTask(c) - runLoop(c, out).guarantee(onCancel) - } - feedTask.runAsync(cb) + val startConsuming = + consumerT.bracket { c => + // Skipping all available messages on all partitions + if (config.observableSeekOnStart.isSeekEnd) c.seekToEnd(Nil.asJavaCollection) + else if (config.observableSeekOnStart.isSeekBeginning) c.seekToBeginning(Nil.asJavaCollection) + Task.race(runLoop(c, out), pollHeartbeat(c).loopForever).void + } { consumer => + // Forced asynchronous boundary + Task.evalAsync(consumer.synchronized(blocking(consumer.close()))) + } + startConsuming.runAsync(cb) } } - /* Returns a task that continuously polls the `KafkaConsumer` for - * new messages and feeds the given subscriber. - * - * Creates an asynchronous boundary on every poll. - */ + /** Returns a task that continuously polls the `KafkaConsumer` for + * new messages and feeds the given subscriber. + * + * Creates an asynchronous boundary on every poll. + */ private def runLoop(consumer: Consumer[K, V], out: Subscriber[Out]): Task[Unit] = { ackTask(consumer, out).flatMap { case Stop => Task.unit @@ -85,20 +93,35 @@ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] { } } - /* Returns a `Task` that triggers the closing of the - * Kafka Consumer connection. - */ - private def cancelTask(consumer: Consumer[K, V]): Task[Unit] = { - // Forced asynchronous boundary - val cancelTask = Task.evalAsync { - consumer.synchronized(blocking(consumer.close())) - } - - // By applying memoization, we are turning this - // into an idempotent action, such that we are - // guaranteed that consumer.close() happens - // at most once - cancelTask.memoize + /** Returns task that constantly polls the `KafkaConsumer` in case subscriber + * is still processing last fed batch. + * This allows producer process commit calls and also keeps consumer alive even + * with long batch processing. + * + * If polling fails the error is reported to the subscriber through the scheduler. + * + * @see [[https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread]] + */ + private def pollHeartbeat(consumer: Consumer[K, V])(implicit scheduler: Scheduler): Task[Unit] = { + Task.sleep(config.pollHeartbeatRate) >> + Task.eval { + if (!isAcked) { + consumer.synchronized { + // needed in order to ensure that the consumer assignment + // is paused, meaning that no messages will get lost. + val assignment = consumer.assignment() + consumer.pause(assignment) + val records = blocking(consumer.poll(0)) + if (!records.isEmpty) { + val errorMsg = s"Received ${records.count()} unexpected messages." + throw new IllegalStateException(errorMsg) + } + } + } + }.onErrorHandleWith { ex => + Task.now(scheduler.reportFailure(ex)) >> + Task.sleep(1.seconds) + } } } @@ -106,13 +129,12 @@ object KafkaConsumerObservable { /** Builds a [[KafkaConsumerObservable]] instance. * - * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the - * consumer; also make sure to see `monix/kafka/default.conf` for - * the default values being used. - * + * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the + * consumer; also make sure to see `monix/kafka/default.conf` for + * the default values being used. * @param consumer is a factory for the - * `org.apache.kafka.clients.consumer.KafkaConsumer` - * instance to use for consuming from Kafka + * `org.apache.kafka.clients.consumer.KafkaConsumer` + * instance to use for consuming from Kafka */ def apply[K, V]( cfg: KafkaConsumerConfig, @@ -121,10 +143,9 @@ object KafkaConsumerObservable { /** Builds a [[KafkaConsumerObservable]] instance. * - * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the - * consumer; also make sure to see `monix/kafka/default.conf` for - * the default values being used. - * + * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the + * consumer; also make sure to see `monix/kafka/default.conf` for + * the default values being used. * @param topics is the list of Kafka topics to subscribe to. */ def apply[K, V](cfg: KafkaConsumerConfig, topics: List[String])(implicit @@ -137,10 +158,9 @@ object KafkaConsumerObservable { /** Builds a [[KafkaConsumerObservable]] instance. * - * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the - * consumer; also make sure to see `monix/kafka/default.conf` for - * the default values being used. - * + * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the + * consumer; also make sure to see `monix/kafka/default.conf` for + * the default values being used. * @param topicsRegex is the pattern of Kafka topics to subscribe to. */ def apply[K, V](cfg: KafkaConsumerConfig, topicsRegex: Regex)(implicit @@ -165,14 +185,13 @@ object KafkaConsumerObservable { * .subscribe() * }}} * - * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the - * consumer; also make sure to see `monix/kafka/default.conf` for - * the default values being used. Auto commit will disabled and - * observable commit order will turned to [[monix.kafka.config.ObservableCommitOrder.NoAck NoAck]] forcibly! - * + * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the + * consumer; also make sure to see `monix/kafka/default.conf` for + * the default values being used. Auto commit will disabled and + * observable commit order will turned to [[monix.kafka.config.ObservableCommitOrder.NoAck NoAck]] forcibly! * @param consumer is a factory for the - * `org.apache.kafka.clients.consumer.KafkaConsumer` - * instance to use for consuming from Kafka + * `org.apache.kafka.clients.consumer.KafkaConsumer` + * instance to use for consuming from Kafka */ def manualCommit[K, V]( cfg: KafkaConsumerConfig, @@ -196,11 +215,10 @@ object KafkaConsumerObservable { * .subscribe() * }}} * - * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the - * consumer; also make sure to see `monix/kafka/default.conf` for - * the default values being used. Auto commit will disabled and - * observable commit order will turned to [[monix.kafka.config.ObservableCommitOrder.NoAck NoAck]] forcibly! - * + * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the + * consumer; also make sure to see `monix/kafka/default.conf` for + * the default values being used. Auto commit will disabled and + * observable commit order will turned to [[monix.kafka.config.ObservableCommitOrder.NoAck NoAck]] forcibly! * @param topics is the list of Kafka topics to subscribe to. */ def manualCommit[K, V](cfg: KafkaConsumerConfig, topics: List[String])(implicit @@ -225,11 +243,10 @@ object KafkaConsumerObservable { * .subscribe() * }}} * - * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the - * consumer; also make sure to see `monix/kafka/default.conf` for - * the default values being used. Auto commit will disabled and - * observable commit order will turned to [[monix.kafka.config.ObservableCommitOrder.NoAck NoAck]] forcibly! - * + * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the + * consumer; also make sure to see `monix/kafka/default.conf` for + * the default values being used. Auto commit will disabled and + * observable commit order will turned to [[monix.kafka.config.ObservableCommitOrder.NoAck NoAck]] forcibly! * @param topicsRegex is the pattern of Kafka topics to subscribe to. */ def manualCommit[K, V](cfg: KafkaConsumerConfig, topicsRegex: Regex)(implicit diff --git a/kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerObservableAutoCommit.scala b/kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerObservableAutoCommit.scala index 6406c224..911bd97a 100644 --- a/kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerObservableAutoCommit.scala +++ b/kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerObservableAutoCommit.scala @@ -34,7 +34,7 @@ import scala.util.{Failure, Success} */ final class KafkaConsumerObservableAutoCommit[K, V] private[kafka] ( override protected val config: KafkaConsumerConfig, - override protected val consumer: Task[Consumer[K, V]]) + override protected val consumerT: Task[Consumer[K, V]]) extends KafkaConsumerObservable[K, V, ConsumerRecord[K, V]] { /* Based on the [[KafkaConsumerConfig.observableCommitType]] it @@ -68,12 +68,16 @@ final class KafkaConsumerObservableAutoCommit[K, V] private[kafka] ( s.executeAsync { () => val ackFuture = try consumer.synchronized { + val assignment = consumer.assignment() if (cancelable.isCanceled) Stop else { + consumer.resume(assignment) val next = blocking(consumer.poll(pollTimeoutMillis)) + consumer.pause(assignment) if (shouldCommitBefore) consumerCommit(consumer) // Feeding the observer happens on the Subscriber's scheduler // if any asynchronous boundaries happen + isAcked = false Observer.feed(out, next.asScala)(out.scheduler) } } catch { @@ -83,6 +87,7 @@ final class KafkaConsumerObservableAutoCommit[K, V] private[kafka] ( ackFuture.syncOnComplete { case Success(ack) => + isAcked = true // The `streamError` flag protects against contract violations // (i.e. onSuccess/onError should happen only once). // Not really required, but we don't want to depend on the @@ -106,6 +111,7 @@ final class KafkaConsumerObservableAutoCommit[K, V] private[kafka] ( } case Failure(ex) => + isAcked = true asyncCb.onError(ex) } } diff --git a/kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala b/kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala index 58d7302b..11331b2e 100644 --- a/kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala +++ b/kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala @@ -25,6 +25,7 @@ import monix.reactive.observers.Subscriber import org.apache.kafka.clients.consumer.{Consumer, OffsetAndMetadata, OffsetCommitCallback} import org.apache.kafka.common.TopicPartition +import java.util import scala.jdk.CollectionConverters._ import scala.concurrent.{blocking, Future} import scala.util.control.NonFatal @@ -36,7 +37,7 @@ import scala.util.{Failure, Success} */ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] ( override protected val config: KafkaConsumerConfig, - override protected val consumer: Task[Consumer[K, V]]) + override protected val consumerT: Task[Consumer[K, V]]) extends KafkaConsumerObservable[K, V, CommittableMessage[K, V]] { // Caching value to save CPU cycles @@ -49,16 +50,26 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] ( k -> new OffsetAndMetadata(v) }.asJava)))) - override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] = - Task { - blocking( - consumer.synchronized( - consumer.commitAsync( - batch.map { case (k, v) => - k -> new OffsetAndMetadata(v) - }.asJava, - callback))) - } + override def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit] = { + Task + .async0[Unit] { (s, cb) => + val asyncCb = Callback.forked(cb)(s) + s.executeAsync { () => + val offsets = batch.map { case (k, v) => k -> new OffsetAndMetadata(v) }.asJava + val offsetCommitCallback = new OffsetCommitCallback { + def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], ex: Exception): Unit = + if (ex != null && !cb.tryOnError(ex)) { s.reportFailure(ex) } + else { cb.tryOnSuccess(()) } + } + try { + consumer.synchronized(consumer.commitAsync(offsets, offsetCommitCallback)) + } catch { + case NonFatal(ex) => + if (!asyncCb.tryOnError(ex)) { s.reportFailure(ex) } + } + } + } + } } override protected def ackTask(consumer: Consumer[K, V], out: Subscriber[CommittableMessage[K, V]]): Task[Ack] = @@ -71,13 +82,14 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] ( // Forced asynchronous boundary (on the I/O scheduler) s.executeAsync { () => - val ackFuture = - try consumer.synchronized { - if (cancelable.isCanceled) Stop - else { + val ackFuture: Future[Ack] = + if (cancelable.isCanceled) Stop + else { + try consumer.synchronized { + val assignment = consumer.assignment() + consumer.resume(assignment) val next = blocking(consumer.poll(pollTimeoutMillis)) - // Feeding the observer happens on the Subscriber's scheduler - // if any asynchronous boundaries happen + consumer.pause(assignment) val result = next.asScala.map { record => CommittableMessage( record, @@ -86,15 +98,19 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] ( record.offset() + 1, commit)) } + // Feeding the observer happens on the Subscriber's scheduler + // if any asynchronous boundaries happen + isAcked = false Observer.feed(out, result)(out.scheduler) + } catch { + case NonFatal(ex) => + Future.failed(ex) } - } catch { - case NonFatal(ex) => - Future.failed(ex) } ackFuture.syncOnComplete { case Success(ack) => + isAcked = true // The `streamError` flag protects against contract violations // (i.e. onSuccess/onError should happen only once). // Not really required, but we don't want to depend on the @@ -117,6 +133,7 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] ( } case Failure(ex) => + isAcked = true asyncCb.onError(ex) } } diff --git a/kafka-0.11.x/src/main/scala/monix/kafka/KafkaProducerConfig.scala b/kafka-0.11.x/src/main/scala/monix/kafka/KafkaProducerConfig.scala index 3afea389..efdea9f9 100644 --- a/kafka-0.11.x/src/main/scala/monix/kafka/KafkaProducerConfig.scala +++ b/kafka-0.11.x/src/main/scala/monix/kafka/KafkaProducerConfig.scala @@ -272,7 +272,7 @@ case class KafkaProducerConfig( ) def toJavaMap: java.util.Map[String, Object] = - toMap.filter(_._2 != null).map{case (a, b) =>(a, b.asInstanceOf[AnyRef])}.asJava + toMap.filter(_._2 != null).map { case (a, b) => (a, b.asInstanceOf[AnyRef]) }.asJava def toProperties: Properties = { val props = new Properties() diff --git a/kafka-0.11.x/src/test/resources/logback.xml b/kafka-0.11.x/src/test/resources/logback.xml index 2beb83c9..327dc7d7 100644 --- a/kafka-0.11.x/src/test/resources/logback.xml +++ b/kafka-0.11.x/src/test/resources/logback.xml @@ -1,11 +1,25 @@ - - - %d{yyyyMMdd-HH:mm:ss.SSSZ} [%thread] %-5level %logger - %msg%n - + + + + + %d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n + + - - + + + + + + + + + + + + - + + \ No newline at end of file diff --git a/kafka-0.11.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala b/kafka-0.11.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala index 6a03f5b4..7d854a66 100644 --- a/kafka-0.11.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala +++ b/kafka-0.11.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala @@ -9,7 +9,6 @@ import org.scalatest.{FunSuite, Matchers} import scala.concurrent.duration._ import scala.concurrent.Await import monix.execution.Scheduler.Implicits.global -import org.apache.kafka.clients.consumer.OffsetCommitCallback import org.apache.kafka.common.TopicPartition import org.scalacheck.Gen import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks @@ -18,9 +17,7 @@ class MergeByCommitCallbackTest extends FunSuite with KafkaTestKit with ScalaChe val commitCallbacks: List[Commit] = List.fill(4)(new Commit { override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit - - override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] = - Task.unit + override def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit }) val committableOffsetsGen: Gen[CommittableOffset] = for { diff --git a/kafka-0.11.x/src/test/scala/monix/kafka/MonixKafkaTopicListTest.scala b/kafka-0.11.x/src/test/scala/monix/kafka/MonixKafkaTopicListTest.scala index 46006cce..8d44c672 100644 --- a/kafka-0.11.x/src/test/scala/monix/kafka/MonixKafkaTopicListTest.scala +++ b/kafka-0.11.x/src/test/scala/monix/kafka/MonixKafkaTopicListTest.scala @@ -106,7 +106,7 @@ class MonixKafkaTopicListTest extends FunSuite with KafkaTestKit { } } - test("manual commit consumer test when subscribed to topics list") { + test("manual commitSync consumer test when subscribed to topics list") { withRunningKafka { val count = 10000 val topicName = "monix-kafka-manual-commit-tests" @@ -124,13 +124,45 @@ class MonixKafkaTopicListTest extends FunSuite with KafkaTestKit { .executeOn(io) .bufferTumbling(count) .map { messages => messages.map(_.record.value()) -> CommittableOffsetBatch(messages.map(_.committableOffset)) } - .mapEval { case (values, batch) => Task.shift *> batch.commitSync().map(_ => values -> batch.offsets) } + .mapEval { case (values, batch) => batch.commitSync().map(_ => values -> batch.offsets) } .headL val ((result, offsets), _) = Await.result(Task.parZip2(listT, pushT).runToFuture, 60.seconds) - val properOffsets = Map(new TopicPartition(topicName, 0) -> 10000) + val properOffsets = Map(new TopicPartition(topicName, 0) -> count) + assert(result.map(_.toInt).sum === (0 until count).sum && offsets === properOffsets) + } + } + + test("manual async commit consumer test when subscribed to topics list") { + withRunningKafka { + + val count = 10000 + val topicName = "monix-kafka-manual-commit-tests" + + val producer = KafkaProducerSink[String, String](producerCfg, io) + val consumer = KafkaConsumerObservable.manualCommit[String, String](consumerCfg, List(topicName)) + + val pushT = Observable + .range(0, count) + .map(msg => new ProducerRecord(topicName, "obs", msg.toString)) + .bufferIntrospective(1024) + .consumeWith(producer) + + val listT = consumer + .executeOn(io) + .bufferTumbling(count) + .map { messages => + messages.map(_.record.value()) -> CommittableOffsetBatch(messages.map(_.committableOffset)) + } + .mapEval { case (values, batch) => Task.shift *> batch.commitAsync().map(_ => values -> batch.offsets) } + .headL + + val ((result, offsets), _) = + Await.result(Task.parZip2(listT.executeAsync, pushT.executeAsync).runToFuture, 60.seconds) + + val properOffsets = Map(new TopicPartition(topicName, 0) -> count) assert(result.map(_.toInt).sum === (0 until count).sum && offsets === properOffsets) } } @@ -152,4 +184,5 @@ class MonixKafkaTopicListTest extends FunSuite with KafkaTestKit { assert(first.isDefined && second.isRight && third.isEmpty) } } + } diff --git a/kafka-0.11.x/src/test/scala/monix/kafka/MonixKafkaTopicRegexTest.scala b/kafka-0.11.x/src/test/scala/monix/kafka/MonixKafkaTopicRegexTest.scala index f91c8bfe..17e2b090 100644 --- a/kafka-0.11.x/src/test/scala/monix/kafka/MonixKafkaTopicRegexTest.scala +++ b/kafka-0.11.x/src/test/scala/monix/kafka/MonixKafkaTopicRegexTest.scala @@ -89,7 +89,9 @@ class MonixKafkaTopicRegexTest extends FunSuite with KafkaTestKit { val count = 10000 val producer = KafkaProducerSink[String, String](producerCfg, io) - val consumer = KafkaConsumerObservable[String, String](consumerCfg, topicsRegex).executeOn(io).take(count) + val consumer = KafkaConsumerObservable[String, String](consumerCfg, topicsRegex) + .executeOn(io) + .take(count) val pushT = Observable .range(0, count) diff --git a/kafka-0.11.x/src/test/scala/monix/kafka/PollHeartbeatTest.scala b/kafka-0.11.x/src/test/scala/monix/kafka/PollHeartbeatTest.scala new file mode 100644 index 00000000..7db73855 --- /dev/null +++ b/kafka-0.11.x/src/test/scala/monix/kafka/PollHeartbeatTest.scala @@ -0,0 +1,261 @@ +/* + * Copyright (c) 2014-2019 by its authors. Some rights reserved. + * See the project homepage at: https://github.com/monix/monix-kafka + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.kafka + +import monix.eval.Task +import monix.execution.Scheduler.Implicits.global +import monix.kafka.config.AutoOffsetReset +import monix.reactive.Observable +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.TopicPartition +import org.scalatest.FunSuite +import org.scalatest.concurrent.ScalaFutures + +import scala.concurrent.duration._ + +class PollHeartbeatTest extends FunSuite with KafkaTestKit with ScalaFutures { + + val topicName = "monix-kafka-tests" + + val producerCfg = KafkaProducerConfig.default.copy( + bootstrapServers = List("127.0.0.1:6001"), + clientId = "monix-kafka-1-0-producer-test" + ) + + val consumerCfg = KafkaConsumerConfig.default.copy( + bootstrapServers = List("127.0.0.1:6001"), + groupId = "kafka-tests", + clientId = "monix-kafka-1-0-consumer-test", + autoOffsetReset = AutoOffsetReset.Earliest + ) + + test("slow downstream with small poll heartbeat and manual async commit keeps the consumer assignment") { + withRunningKafka { + + val count = 250 + val topicName = "monix-kafka-manual-commit-tests" + val delay = 200.millis + val pollHeartbeat = 2.millis + val fastPollHeartbeatConfig = + consumerCfg.copy(maxPollInterval = 200.millis, maxPollRecords = 1).withPollHeartBeatRate(pollHeartbeat) + + val producer = KafkaProducer[String, String](producerCfg, io) + val consumer = KafkaConsumerObservable.manualCommit[String, String](fastPollHeartbeatConfig, List(topicName)) + + val pushT = Observable + .fromIterable(1 to count) + .map(msg => new ProducerRecord(topicName, "obs", msg.toString)) + .mapEval(producer.send) + .lastL + + val listT = consumer + .executeOn(io) + .doOnNextF { committableMessage => + Task.sleep(delay) *> + CommittableOffsetBatch(Seq(committableMessage.committableOffset)).commitAsync().executeAsync + } + .take(count) + .toListL + + val (committableMessages, _) = Task.parZip2(listT.executeAsync, pushT.executeAsync).runSyncUnsafe(100.seconds) + val CommittableMessage(lastRecord, lastCommittableOffset) = committableMessages.last + assert(pollHeartbeat * 10 < delay) + assert((1 to count).sum === committableMessages.map(_.record.value().toInt).sum) + assert(lastRecord.value().toInt === count) + assert(count === lastCommittableOffset.offset) + assert(new TopicPartition(topicName, 0) === lastCommittableOffset.topicPartition) + } + } + + test("auto committable consumer with slow processing doesn't cause rebalancing") { + withRunningKafka { + val count = 10000 + + val consumerConfig = consumerCfg.copy( + maxPollInterval = 200.millis, + heartbeatInterval = 10.millis, + maxPollRecords = 1 + ) + + val producer = KafkaProducerSink[String, String](producerCfg, io) + val consumer = KafkaConsumerObservable[String, String](consumerConfig, List(topicName)).executeOn(io) + + val pushT = Observable + .range(0, count) + .map(msg => new ProducerRecord(topicName, "obs", msg.toString)) + .bufferIntrospective(1024) + .consumeWith(producer) + + val listT = consumer + .take(count) + .map(_.value()) + .bufferTumbling(count / 4) + .mapEval(s => Task.sleep(2.second) >> Task.delay(s)) + .flatMap(Observable.fromIterable) + .toListL + + val result = Task.parZip2(listT.executeAsync, pushT.delayExecution(1.second)).map(_._1).runSyncUnsafe() + assert(result.map(_.toInt).sum === (0 until count).sum) + } + } + + test("slow committable downstream with small poll heartbeat does not cause rebalancing") { + withRunningKafka { + val totalRecords = 1000 + val topicName = "monix-kafka-manual-commit-tests" + val downstreamLatency = 40.millis + val pollHeartbeat = 1.millis + val maxPollInterval = 10.millis + val maxPollRecords = 1 + val fastPollHeartbeatConfig = + consumerCfg + .copy(maxPollInterval = 200.millis, maxPollRecords = maxPollRecords) + .withPollHeartBeatRate(pollHeartbeat) + + val producer = KafkaProducer[String, String](producerCfg, io) + val consumer = KafkaConsumerObservable.manualCommit[String, String](fastPollHeartbeatConfig, List(topicName)) + + val pushT = Observable + .fromIterable(1 to totalRecords) + .map(msg => new ProducerRecord(topicName, "obs", msg.toString)) + .mapEval(producer.send) + .lastL + + val listT = consumer + .executeOn(io) + .mapEvalF { committableMessage => + val manualCommit = Task + .defer(committableMessage.committableOffset.commitAsync()) + .as(committableMessage) + Task.sleep(downstreamLatency) *> manualCommit + } + .take(totalRecords) + .toListL + + val (committableMessages, _) = + Task.parZip2(listT.executeAsync, pushT.delayExecution(100.millis).executeAsync).runSyncUnsafe() + val CommittableMessage(lastRecord, lastCommittableOffset) = committableMessages.last + assert(pollHeartbeat * 10 < downstreamLatency) + assert(pollHeartbeat < maxPollInterval) + assert(maxPollInterval < downstreamLatency) + assert((1 to totalRecords).sum === committableMessages.map(_.record.value().toInt).sum) + assert(lastRecord.value().toInt === totalRecords) + assert(totalRecords === lastCommittableOffset.offset) + assert(new TopicPartition(topicName, 0) === lastCommittableOffset.topicPartition) + } + } + + //unhappy scenario + test("slow committable downstream with small `maxPollInterval` and high `pollHeartBeat` causes consumer rebalance") { + withRunningKafka { + val totalRecords = 200 + val topicName = "monix-kafka-manual-commit-tests" + val downstreamLatency = 2.seconds + val pollHeartbeat = 15.seconds + val maxPollInterval = 100.millis + val fastPollHeartbeatConfig = + consumerCfg.copy(maxPollInterval = maxPollInterval, maxPollRecords = 1).withPollHeartBeatRate(pollHeartbeat) + + val producer = KafkaProducer[String, String](producerCfg, io) + val consumer = KafkaConsumerObservable.manualCommit[String, String](fastPollHeartbeatConfig, List(topicName)) + + val pushT = Observable + .fromIterable(1 to totalRecords) + .map(msg => new ProducerRecord(topicName, "obs", msg.toString)) + .mapEval(producer.send) + .lastL + + val listT = consumer + .executeOn(io) + .mapEvalF { committableMessage => + val manualCommit = Task + .defer(committableMessage.committableOffset.commitAsync()) + .as(committableMessage) + Task.sleep(downstreamLatency) *> manualCommit + } + .take(totalRecords) + .toListL + + assert(pollHeartbeat > downstreamLatency) + assert(maxPollInterval < downstreamLatency) + assert(fastPollHeartbeatConfig.pollHeartbeatRate === pollHeartbeat) + + val f = Task.parZip2(listT.executeAsync, pushT.executeAsync).map(_._1).delayResult(50.seconds).runToFuture + + /* checks that value never returns, + * which is correct in this scenario since if the `maxPollInterval` + * is higher than `pollHeartBeat` and `downstreamLatency` + * on the other hand, it would be ideal to receive the following error message from kafka + * "the group has already rebalanced and assigned the partitions to another member" + * as it happens from kafka-client 1.1.0, see tests from kafka1x. + */ + assert(f.value === None) + } + } + + test("super slow committable downstream causes consumer rebalance") { + withRunningKafka { + val totalRecords = 3 + val topicName = "monix-kafka-manual-commit-tests" + val downstreamLatency = 55.seconds + val pollHeartbeat = 5.seconds + val maxPollInterval = 4.seconds + // the downstreamLatency is higher than the `maxPollInterval` + // but smaller than `pollHeartBeat`, kafka will trigger rebalance + // and the consumer will be kicked out of the consumer group. + val fastPollHeartbeatConfig = + consumerCfg.copy(maxPollInterval = maxPollInterval, maxPollRecords = 1).withPollHeartBeatRate(pollHeartbeat) + + val producer = KafkaProducer[String, String](producerCfg, io) + val consumer = KafkaConsumerObservable.manualCommit[String, String](fastPollHeartbeatConfig, List(topicName)) + + val pushT = Observable + .fromIterable(1 to totalRecords) + .map(msg => new ProducerRecord(topicName, "obs", msg.toString)) + .mapEval(producer.send) + .lastL + + val listT = consumer + .executeOn(io) + .doOnNextF { committableMessage => + val manualCommit = Task.defer(committableMessage.committableOffset.commitAsync()) + Task.sleep(downstreamLatency) *> manualCommit + } + .take(totalRecords) + .toListL + + assert(pollHeartbeat * 10 < downstreamLatency) + assert(maxPollInterval * 10 < downstreamLatency) + assert(fastPollHeartbeatConfig.pollHeartbeatRate === pollHeartbeat) + + implicit val patienceConfig: PatienceConfig = PatienceConfig(30.seconds, 100.milliseconds) + + val f = Task.parZip2(listT.executeAsync, pushT.executeAsync).map(_._1).delayResult(50.seconds).runToFuture + + /* checks that value never returns, + * which is correct in this scenario since if the `maxPollInterval` + * is higher than `pollHeartBeat` and `downstreamLatency` + * on the other hand, it would be ideal to receive the following error message from kafka + * "the group has already rebalanced and assigned the partitions to another member" + * as it happens from kafka-client 1.1.0, see tests from kafka1x. + */ + assert(f.value === None) + } + } + +} diff --git a/kafka-0.11.x/src/test/scala/monix/kafka/SerializationTest.scala b/kafka-0.11.x/src/test/scala/monix/kafka/SerializationTest.scala index e1941115..1a80c91d 100644 --- a/kafka-0.11.x/src/test/scala/monix/kafka/SerializationTest.scala +++ b/kafka-0.11.x/src/test/scala/monix/kafka/SerializationTest.scala @@ -96,7 +96,7 @@ class SerializationTest extends FunSuite with KafkaTestKit { .map(_.value()) .toListL - val (result, _) = Await.result(Task.parZip2(listT, pushT).runToFuture, 60.seconds) + val (result, _) = Await.result(Task.parZip2(listT, pushT).runToFuture, 70.seconds) assert(result.map(_.value.toInt).sum === (0 until count).filter(_ % 2 == 0).sum) } } diff --git a/kafka-0.9.x/src/main/scala/monix/kafka/Commit.scala b/kafka-0.9.x/src/main/scala/monix/kafka/Commit.scala index ba56a332..8a9a4539 100644 --- a/kafka-0.9.x/src/main/scala/monix/kafka/Commit.scala +++ b/kafka-0.9.x/src/main/scala/monix/kafka/Commit.scala @@ -18,14 +18,12 @@ package monix.kafka import monix.eval.Task import org.apache.kafka.common.TopicPartition -import org.apache.kafka.clients.consumer.OffsetCommitCallback /** Callback for batched commit realized as closure in [[KafkaConsumerObservable]] context. */ trait Commit { def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] - def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] - final def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit] = commitBatchAsync(batch, null) + def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit] } private[kafka] object Commit { @@ -33,7 +31,6 @@ private[kafka] object Commit { val empty: Commit = new Commit { override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit - override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] = - Task.unit + override def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit } } diff --git a/kafka-0.9.x/src/main/scala/monix/kafka/CommittableOffset.scala b/kafka-0.9.x/src/main/scala/monix/kafka/CommittableOffset.scala index 71b7da86..741d6c0e 100644 --- a/kafka-0.9.x/src/main/scala/monix/kafka/CommittableOffset.scala +++ b/kafka-0.9.x/src/main/scala/monix/kafka/CommittableOffset.scala @@ -17,7 +17,6 @@ package monix.kafka import monix.eval.Task -import org.apache.kafka.clients.consumer.OffsetCommitCallback import org.apache.kafka.common.TopicPartition /** Represents offset for specified topic and partition that can be @@ -47,11 +46,6 @@ final class CommittableOffset private[kafka] ( */ def commitAsync(): Task[Unit] = commitCallback.commitBatchAsync(Map(topicPartition -> offset)) - /** Asynchronously commits [[offset]] to Kafka. It is recommended - * to use batched commit with [[CommittableOffsetBatch]] class. - */ - def commitAsync(callback: OffsetCommitCallback): Task[Unit] = - commitCallback.commitBatchAsync(Map(topicPartition -> offset), callback) } object CommittableOffset { diff --git a/kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala b/kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala index d90b1473..7bcfa171 100644 --- a/kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala +++ b/kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala @@ -18,7 +18,6 @@ package monix.kafka import java.io.File import java.util.Properties - import com.typesafe.config.{Config, ConfigFactory} import monix.kafka.config._ @@ -279,7 +278,7 @@ final case class KafkaConsumerConfig( ) def toJavaMap: java.util.Map[String, Object] = - toMap.filter(_._2 != null).map{case (a, b) =>(a, b.asInstanceOf[AnyRef])}.asJava + toMap.filter(_._2 != null).map { case (a, b) => (a, b.asInstanceOf[AnyRef]) }.asJava def toProperties: Properties = { val props = new Properties() diff --git a/kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala b/kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala index c9a44a0c..d131e181 100644 --- a/kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala +++ b/kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala @@ -16,6 +16,7 @@ package monix.kafka +import cats.effect.Resource import monix.eval.Task import monix.execution.Ack.{Continue, Stop} import monix.execution.{Ack, Callback, Cancelable} @@ -38,6 +39,10 @@ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] { protected def config: KafkaConsumerConfig protected def consumer: Task[Consumer[K, V]] + /** Creates a task that polls the source, then feeds the downstream */ + @volatile + protected var isAcked = true + /** Creates a task that polls the source, then feeds the downstream * subscriber, returning the resulting acknowledgement */ @@ -59,23 +64,28 @@ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] { private def feedTask(out: Subscriber[Out]): Task[Unit] = { Task.create { (scheduler, cb) => implicit val s = scheduler - val feedTask = consumer.flatMap { c => - // Skipping all available messages on all partitions - if (config.observableSeekOnStart.isSeekEnd) c.seekToEnd() - else if (config.observableSeekOnStart.isSeekBeginning) c.seekToBeginning() - // A task to execute on both cancellation and normal termination - val onCancel = cancelTask(c) - runLoop(c, out).guarantee(onCancel) - } - feedTask.runAsync(cb) + val startConsuming = + Resource + .make(consumer) { c => + // Forced asynchronous boundary + Task.evalAsync(consumer.synchronized(blocking(c.close()))) + } + .use { c => + // Skipping all available messages on all partitions + if (config.observableSeekOnStart.isSeekEnd) c.seekToEnd() + else if (config.observableSeekOnStart.isSeekBeginning) c.seekToBeginning() + // A task to execute on both cancellation and normal termination + runLoop(c, out) + } + startConsuming.runAsync(cb) } } - /* Returns a task that continuously polls the `KafkaConsumer` for - * new messages and feeds the given subscriber. - * - * Creates an asynchronous boundary on every poll. - */ + /** Returns a task that continuously polls the `KafkaConsumer` for + * new messages and feeds the given subscriber. + * + * Creates an asynchronous boundary on every poll. + */ private def runLoop(consumer: Consumer[K, V], out: Subscriber[Out]): Task[Unit] = { ackTask(consumer, out).flatMap { case Stop => Task.unit @@ -83,21 +93,6 @@ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] { } } - /* Returns a `Task` that triggers the closing of the - * Kafka Consumer connection. - */ - private def cancelTask(consumer: Consumer[K, V]): Task[Unit] = { - // Forced asynchronous boundary - val cancelTask = Task.evalAsync { - consumer.synchronized(blocking(consumer.close())) - } - - // By applying memoization, we are turning this - // into an idempotent action, such that we are - // guaranteed that consumer.close() happens - // at most once - cancelTask.memoize - } } object KafkaConsumerObservable { diff --git a/kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerObservableAutoCommit.scala b/kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerObservableAutoCommit.scala index 6406c224..8828a083 100644 --- a/kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerObservableAutoCommit.scala +++ b/kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerObservableAutoCommit.scala @@ -68,12 +68,16 @@ final class KafkaConsumerObservableAutoCommit[K, V] private[kafka] ( s.executeAsync { () => val ackFuture = try consumer.synchronized { + val assignment = consumer.assignment().asScala.toArray if (cancelable.isCanceled) Stop else { + consumer.resume(assignment: _*) val next = blocking(consumer.poll(pollTimeoutMillis)) + consumer.pause(assignment: _*) if (shouldCommitBefore) consumerCommit(consumer) // Feeding the observer happens on the Subscriber's scheduler // if any asynchronous boundaries happen + isAcked = false Observer.feed(out, next.asScala)(out.scheduler) } } catch { @@ -83,6 +87,7 @@ final class KafkaConsumerObservableAutoCommit[K, V] private[kafka] ( ackFuture.syncOnComplete { case Success(ack) => + isAcked = true // The `streamError` flag protects against contract violations // (i.e. onSuccess/onError should happen only once). // Not really required, but we don't want to depend on the @@ -106,6 +111,7 @@ final class KafkaConsumerObservableAutoCommit[K, V] private[kafka] ( } case Failure(ex) => + isAcked = true asyncCb.onError(ex) } } diff --git a/kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala b/kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala index 58d7302b..2ad32cd8 100644 --- a/kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala +++ b/kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala @@ -25,6 +25,7 @@ import monix.reactive.observers.Subscriber import org.apache.kafka.clients.consumer.{Consumer, OffsetAndMetadata, OffsetCommitCallback} import org.apache.kafka.common.TopicPartition +import java.util import scala.jdk.CollectionConverters._ import scala.concurrent.{blocking, Future} import scala.util.control.NonFatal @@ -49,16 +50,26 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] ( k -> new OffsetAndMetadata(v) }.asJava)))) - override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] = - Task { - blocking( - consumer.synchronized( - consumer.commitAsync( - batch.map { case (k, v) => - k -> new OffsetAndMetadata(v) - }.asJava, - callback))) - } + override def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit] = { + Task + .async0[Unit] { (s, cb) => + val asyncCb = Callback.forked(cb)(s) + s.executeAsync { () => + val offsets = batch.map { case (k, v) => k -> new OffsetAndMetadata(v) }.asJava + val offsetCommitCallback = new OffsetCommitCallback { + def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], ex: Exception): Unit = + if (ex != null && !cb.tryOnError(ex)) { s.reportFailure(ex) } + else { cb.tryOnSuccess(()) } + } + try { + consumer.synchronized(consumer.commitAsync(offsets, offsetCommitCallback)) + } catch { + case NonFatal(ex) => + if (!asyncCb.tryOnError(ex)) { s.reportFailure(ex) } + } + } + } + } } override protected def ackTask(consumer: Consumer[K, V], out: Subscriber[CommittableMessage[K, V]]): Task[Ack] = @@ -71,13 +82,14 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] ( // Forced asynchronous boundary (on the I/O scheduler) s.executeAsync { () => - val ackFuture = - try consumer.synchronized { - if (cancelable.isCanceled) Stop - else { + val ackFuture: Future[Ack] = + if (cancelable.isCanceled) Stop + else { + try consumer.synchronized { + val assignment = consumer.assignment().asScala.toList + consumer.resume(assignment: _*) val next = blocking(consumer.poll(pollTimeoutMillis)) - // Feeding the observer happens on the Subscriber's scheduler - // if any asynchronous boundaries happen + consumer.pause(assignment: _*) val result = next.asScala.map { record => CommittableMessage( record, @@ -86,15 +98,19 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] ( record.offset() + 1, commit)) } + // Feeding the observer happens on the Subscriber's scheduler + // if any asynchronous boundaries happen + isAcked = false Observer.feed(out, result)(out.scheduler) + } catch { + case NonFatal(ex) => + Future.failed(ex) } - } catch { - case NonFatal(ex) => - Future.failed(ex) } ackFuture.syncOnComplete { case Success(ack) => + isAcked = true // The `streamError` flag protects against contract violations // (i.e. onSuccess/onError should happen only once). // Not really required, but we don't want to depend on the @@ -117,6 +133,7 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] ( } case Failure(ex) => + isAcked = true asyncCb.onError(ex) } } diff --git a/kafka-0.9.x/src/main/scala/monix/kafka/KafkaProducerConfig.scala b/kafka-0.9.x/src/main/scala/monix/kafka/KafkaProducerConfig.scala index 2d221c12..9ecfe392 100644 --- a/kafka-0.9.x/src/main/scala/monix/kafka/KafkaProducerConfig.scala +++ b/kafka-0.9.x/src/main/scala/monix/kafka/KafkaProducerConfig.scala @@ -266,7 +266,7 @@ case class KafkaProducerConfig( ) def toJavaMap: java.util.Map[String, Object] = - toMap.filter(_._2 != null).map{case (a, b) =>(a, b.asInstanceOf[AnyRef])}.asJava + toMap.filter(_._2 != null).map { case (a, b) => (a, b.asInstanceOf[AnyRef]) }.asJava def toProperties: Properties = { val props = new Properties() diff --git a/kafka-0.9.x/src/test/resources/logback.xml b/kafka-0.9.x/src/test/resources/logback.xml index 5a3e8a64..327dc7d7 100644 --- a/kafka-0.9.x/src/test/resources/logback.xml +++ b/kafka-0.9.x/src/test/resources/logback.xml @@ -1,11 +1,25 @@ - - - %d{yyyyMMdd-HH:mm:ss.SSSZ} [%thread] %-5level %logger - %msg%n - + + + + + %d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n + + - - + + + + + + + + + + + + - + + \ No newline at end of file diff --git a/kafka-0.9.x/src/test/scala/monix/kafka/MonixKafkaTest.scala b/kafka-0.9.x/src/test/scala/monix/kafka/MonixKafkaTest.scala index 604bd196..213a854b 100644 --- a/kafka-0.9.x/src/test/scala/monix/kafka/MonixKafkaTest.scala +++ b/kafka-0.9.x/src/test/scala/monix/kafka/MonixKafkaTest.scala @@ -127,6 +127,36 @@ class MonixKafkaTest extends FunSuite { assert(result.map(_.toInt).sum === (0 until count).sum && offsets === properOffsets) } + test("manual async commit consumer test when subscribed to topics list") { + + val count = 10000 + val topicName = "monix-kafka-manual-commit-tests" + + val producer = KafkaProducerSink[String, String](producerCfg, io) + val consumer = KafkaConsumerObservable.manualCommit[String, String](consumerCfg, List(topicName)) + + val pushT = Observable + .range(0, count) + .map(msg => new ProducerRecord(topicName, "obs", msg.toString)) + .bufferIntrospective(1024) + .consumeWith(producer) + + val listT = consumer + .executeOn(io) + .bufferTumbling(count) + .map { messages => + messages.map(_.record.value()) -> CommittableOffsetBatch(messages.map(_.committableOffset)) + } + .mapEval { case (values, batch) => Task.shift *> batch.commitAsync().map(_ => values -> batch.offsets) } + .headL + + val ((result, offsets), _) = + Await.result(Task.parZip2(listT.executeAsync, pushT.executeAsync).runToFuture, 60.seconds) + + val properOffsets = Map(new TopicPartition(topicName, 0) -> 10000) + assert(result.map(_.toInt).sum === (0 until count).sum && offsets === properOffsets) + } + test("publish to closed producer when subscribed to topics list") { val producer = KafkaProducer[String, String](producerCfg, io) val sendTask = producer.send(topicName, "test-message") @@ -185,4 +215,30 @@ class MonixKafkaTest extends FunSuite { .headL Await.result(Task.parZip2(listT, pushT).runToFuture, 60.seconds) } + + test("slow batches processing doesn't cause rebalancing") { + val count = 10000 + + val consumerConfig = consumerCfg.copy(sessionTimeout = 200.millis) + + val producer = KafkaProducerSink[String, String](producerCfg, io) + val consumer = KafkaConsumerObservable[String, String](consumerConfig, List(topicName)).executeOn(io) + + val pushT = Observable + .range(0, count) + .map(msg => new ProducerRecord(topicName, "obs", msg.toString)) + .bufferIntrospective(1024) + .consumeWith(producer) + + val listT = consumer + .take(count) + .map(_.value()) + .bufferTumbling(count / 4) + .mapEval(s => Task.sleep(2.second) >> Task.delay(s)) + .flatMap(Observable.fromIterable) + .toListL + + val (result, _) = Await.result(Task.parZip2(listT.executeAsync, pushT.executeAsync).runToFuture, 60.seconds) + assert(result.map(_.toInt).sum === (0 until count).sum) + } } diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/Commit.scala b/kafka-1.0.x/src/main/scala/monix/kafka/Commit.scala index ba56a332..b174ef76 100644 --- a/kafka-1.0.x/src/main/scala/monix/kafka/Commit.scala +++ b/kafka-1.0.x/src/main/scala/monix/kafka/Commit.scala @@ -18,22 +18,18 @@ package monix.kafka import monix.eval.Task import org.apache.kafka.common.TopicPartition -import org.apache.kafka.clients.consumer.OffsetCommitCallback /** Callback for batched commit realized as closure in [[KafkaConsumerObservable]] context. */ trait Commit { def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] - def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] - final def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit] = commitBatchAsync(batch, null) + def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit] } private[kafka] object Commit { val empty: Commit = new Commit { override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit - - override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] = - Task.unit + override def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit } } diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/CommittableOffset.scala b/kafka-1.0.x/src/main/scala/monix/kafka/CommittableOffset.scala index 71b7da86..9bf00254 100644 --- a/kafka-1.0.x/src/main/scala/monix/kafka/CommittableOffset.scala +++ b/kafka-1.0.x/src/main/scala/monix/kafka/CommittableOffset.scala @@ -17,7 +17,6 @@ package monix.kafka import monix.eval.Task -import org.apache.kafka.clients.consumer.OffsetCommitCallback import org.apache.kafka.common.TopicPartition /** Represents offset for specified topic and partition that can be @@ -46,12 +45,6 @@ final class CommittableOffset private[kafka] ( * to use batched commit with [[CommittableOffsetBatch]] class. */ def commitAsync(): Task[Unit] = commitCallback.commitBatchAsync(Map(topicPartition -> offset)) - - /** Asynchronously commits [[offset]] to Kafka. It is recommended - * to use batched commit with [[CommittableOffsetBatch]] class. - */ - def commitAsync(callback: OffsetCommitCallback): Task[Unit] = - commitCallback.commitBatchAsync(Map(topicPartition -> offset), callback) } object CommittableOffset { diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/CommittableOffsetBatch.scala b/kafka-1.0.x/src/main/scala/monix/kafka/CommittableOffsetBatch.scala index 4bfee4ad..7d7d2bad 100644 --- a/kafka-1.0.x/src/main/scala/monix/kafka/CommittableOffsetBatch.scala +++ b/kafka-1.0.x/src/main/scala/monix/kafka/CommittableOffsetBatch.scala @@ -17,7 +17,6 @@ package monix.kafka import monix.eval.Task -import org.apache.kafka.clients.consumer.OffsetCommitCallback import org.apache.kafka.common.TopicPartition /** Batch of Kafka offsets which can be committed together. @@ -45,10 +44,6 @@ final class CommittableOffsetBatch private[kafka] (val offsets: Map[TopicPartiti */ def commitAsync(): Task[Unit] = commitCallback.commitBatchAsync(offsets) - /** Asynchronously commits [[offsets]] to Kafka - */ - def commitAsync(callback: OffsetCommitCallback): Task[Unit] = commitCallback.commitBatchAsync(offsets) - /** Adds new [[CommittableOffset]] to batch. Added offset replaces previous one specified * for same topic and partition. */ diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala index 8fd10421..b1254626 100644 --- a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala +++ b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala @@ -18,8 +18,8 @@ package monix.kafka import java.io.File import java.util.Properties - import com.typesafe.config.{Config, ConfigFactory} +import monix.execution.internal.InternalApi import monix.kafka.config._ import scala.jdk.CollectionConverters._ @@ -212,7 +212,7 @@ import scala.concurrent.duration._ * by this object can be set via the map, but in case of a duplicate * a value set on the case class will overwrite value set via properties. */ -final case class KafkaConsumerConfig( +case class KafkaConsumerConfig( bootstrapServers: List[String], fetchMinBytes: Int, fetchMaxBytes: Int, @@ -299,8 +299,16 @@ final case class KafkaConsumerConfig( "retry.backoff.ms" -> retryBackoffTime.toMillis.toString ) + private[kafka] var pollHeartbeatRate: FiniteDuration = 15.millis + + @InternalApi + private[kafka] def withPollHeartBeatRate(interval: FiniteDuration): KafkaConsumerConfig = { + pollHeartbeatRate = interval + this + } + def toJavaMap: java.util.Map[String, Object] = - toMap.filter(_._2 != null).map{case (a, b) =>(a, b.asInstanceOf[AnyRef])}.asJava + toMap.filter(_._2 != null).map { case (a, b) => (a, b.asInstanceOf[AnyRef]) }.asJava def toProperties: Properties = { val props = new Properties() diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala index a65eee76..2b38df73 100644 --- a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala +++ b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala @@ -18,7 +18,7 @@ package monix.kafka import monix.eval.Task import monix.execution.Ack.{Continue, Stop} -import monix.execution.{Ack, Callback, Cancelable} +import monix.execution.{Ack, Callback, Cancelable, Scheduler} import monix.kafka.config.ObservableCommitOrder import monix.reactive.Observable import monix.reactive.observers.Subscriber @@ -27,6 +27,7 @@ import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, KafkaConsume import scala.jdk.CollectionConverters._ import scala.concurrent.blocking import scala.util.matching.Regex +import scala.concurrent.duration._ /** Exposes an `Observable` that consumes a Kafka stream by * means of a Kafka Consumer client. @@ -36,8 +37,12 @@ import scala.util.matching.Regex * (in the resource files) that is exposing all default values. */ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] { + protected def config: KafkaConsumerConfig - protected def consumer: Task[Consumer[K, V]] + protected def consumerT: Task[Consumer[K, V]] + + @volatile + protected var isAcked = true /** Creates a task that polls the source, then feeds the downstream * subscriber, returning the resulting acknowledgement @@ -50,6 +55,7 @@ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] { val callback = new Callback[Throwable, Unit] { def onSuccess(value: Unit): Unit = out.onComplete() + def onError(ex: Throwable): Unit = out.onError(ex) } @@ -60,23 +66,25 @@ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] { private def feedTask(out: Subscriber[Out]): Task[Unit] = { Task.create { (scheduler, cb) => implicit val s = scheduler - val feedTask = consumer.flatMap { c => - // Skipping all available messages on all partitions - if (config.observableSeekOnStart.isSeekEnd) c.seekToEnd(Nil.asJavaCollection) - else if (config.observableSeekOnStart.isSeekBeginning) c.seekToBeginning(Nil.asJavaCollection) - // A task to execute on both cancellation and normal termination - val onCancel = cancelTask(c) - runLoop(c, out).guarantee(onCancel) - } - feedTask.runAsync(cb) + val startConsuming = + consumerT.bracket { c => + // Skipping all available messages on all partitions + if (config.observableSeekOnStart.isSeekEnd) c.seekToEnd(Nil.asJavaCollection) + else if (config.observableSeekOnStart.isSeekBeginning) c.seekToBeginning(Nil.asJavaCollection) + Task.race(runLoop(c, out), pollHeartbeat(c).loopForever).void + } { consumer => + // Forced asynchronous boundary + Task.evalAsync(consumer.synchronized(blocking(consumer.close()))) + } + startConsuming.runAsync(cb) } } - /* Returns a task that continuously polls the `KafkaConsumer` for - * new messages and feeds the given subscriber. - * - * Creates an asynchronous boundary on every poll. - */ + /** Returns a task that continuously polls the `KafkaConsumer` for + * new messages and feeds the given subscriber. + * + * Creates an asynchronous boundary on every poll. + */ private def runLoop(consumer: Consumer[K, V], out: Subscriber[Out]): Task[Unit] = { ackTask(consumer, out).flatMap { case Stop => Task.unit @@ -84,34 +92,49 @@ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] { } } - /* Returns a `Task` that triggers the closing of the - * Kafka Consumer connection. - */ - private def cancelTask(consumer: Consumer[K, V]): Task[Unit] = { - // Forced asynchronous boundary - val cancelTask = Task.evalAsync { - consumer.synchronized(blocking(consumer.close())) - } - - // By applying memoization, we are turning this - // into an idempotent action, such that we are - // guaranteed that consumer.close() happens - // at most once - cancelTask.memoize + /** Returns task that constantly polls the `KafkaConsumer` in case subscriber + * is still processing last fed batch. + * This allows producer process commit calls and also keeps consumer alive even + * with long batch processing. + * + * If polling fails the error is reported to the subscriber through the scheduler. + * + * @see [[https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread]] + */ + private def pollHeartbeat(consumer: Consumer[K, V])(implicit scheduler: Scheduler): Task[Unit] = { + Task.sleep(config.pollHeartbeatRate) >> + Task.eval { + if (!isAcked) { + consumer.synchronized { + // needed in order to ensure that the consumer assignment + // is paused, meaning that no messages will get lost. + val assignment = consumer.assignment() + consumer.pause(assignment) + val records = blocking(consumer.poll(0)) + if (!records.isEmpty) { + val errorMsg = s"Received ${records.count()} unexpected messages." + throw new IllegalStateException(errorMsg) + } + } + } + }.onErrorHandleWith { ex => + Task.now(scheduler.reportFailure(ex)) >> + Task.sleep(1.seconds) + } } + } object KafkaConsumerObservable { /** Builds a [[KafkaConsumerObservable]] instance. * - * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the - * consumer; also make sure to see `monix/kafka/default.conf` for - * the default values being used. - * + * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the + * consumer; also make sure to see `monix/kafka/default.conf` for + * the default values being used. * @param consumer is a factory for the - * `org.apache.kafka.clients.consumer.KafkaConsumer` - * instance to use for consuming from Kafka + * `org.apache.kafka.clients.consumer.KafkaConsumer` + * instance to use for consuming from Kafka */ def apply[K, V]( cfg: KafkaConsumerConfig, @@ -120,10 +143,9 @@ object KafkaConsumerObservable { /** Builds a [[KafkaConsumerObservable]] instance. * - * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the - * consumer; also make sure to see `monix/kafka/default.conf` for - * the default values being used. - * + * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the + * consumer; also make sure to see `monix/kafka/default.conf` for + * the default values being used. * @param topics is the list of Kafka topics to subscribe to. */ def apply[K, V](cfg: KafkaConsumerConfig, topics: List[String])(implicit @@ -136,10 +158,9 @@ object KafkaConsumerObservable { /** Builds a [[KafkaConsumerObservable]] instance. * - * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the - * consumer; also make sure to see `monix/kafka/default.conf` for - * the default values being used. - * + * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the + * consumer; also make sure to see `monix/kafka/default.conf` for + * the default values being used. * @param topicsRegex is the pattern of Kafka topics to subscribe to. */ def apply[K, V](cfg: KafkaConsumerConfig, topicsRegex: Regex)(implicit @@ -164,20 +185,21 @@ object KafkaConsumerObservable { * .subscribe() * }}} * - * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the - * consumer; also make sure to see `monix/kafka/default.conf` for - * the default values being used. Auto commit will disabled and - * observable commit order will turned to [[monix.kafka.config.ObservableCommitOrder.NoAck NoAck]] forcibly! - * + * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the + * consumer; also make sure to see `monix/kafka/default.conf` for + * the default values being used. Auto commit will disabled and + * observable commit order will turned to [[monix.kafka.config.ObservableCommitOrder.NoAck NoAck]] forcibly! * @param consumer is a factory for the - * `org.apache.kafka.clients.consumer.KafkaConsumer` - * instance to use for consuming from Kafka + * `org.apache.kafka.clients.consumer.KafkaConsumer` + * instance to use for consuming from Kafka */ def manualCommit[K, V]( cfg: KafkaConsumerConfig, consumer: Task[Consumer[K, V]]): KafkaConsumerObservable[K, V, CommittableMessage[K, V]] = { - val manualCommitConfig = cfg.copy(observableCommitOrder = ObservableCommitOrder.NoAck, enableAutoCommit = false) + val manualCommitConfig = cfg + .copy(observableCommitOrder = ObservableCommitOrder.NoAck, enableAutoCommit = false) + .withPollHeartBeatRate(cfg.pollHeartbeatRate) new KafkaConsumerObservableManualCommit[K, V](manualCommitConfig, consumer) } @@ -195,11 +217,10 @@ object KafkaConsumerObservable { * .subscribe() * }}} * - * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the - * consumer; also make sure to see `monix/kafka/default.conf` for - * the default values being used. Auto commit will disabled and - * observable commit order will turned to [[monix.kafka.config.ObservableCommitOrder.NoAck NoAck]] forcibly! - * + * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the + * consumer; also make sure to see `monix/kafka/default.conf` for + * the default values being used. Auto commit will disabled and + * observable commit order will turned to [[monix.kafka.config.ObservableCommitOrder.NoAck NoAck]] forcibly! * @param topics is the list of Kafka topics to subscribe to. */ def manualCommit[K, V](cfg: KafkaConsumerConfig, topics: List[String])(implicit @@ -224,11 +245,10 @@ object KafkaConsumerObservable { * .subscribe() * }}} * - * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the - * consumer; also make sure to see `monix/kafka/default.conf` for - * the default values being used. Auto commit will disabled and - * observable commit order will turned to [[monix.kafka.config.ObservableCommitOrder.NoAck NoAck]] forcibly! - * + * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the + * consumer; also make sure to see `monix/kafka/default.conf` for + * the default values being used. Auto commit will disabled and + * observable commit order will turned to [[monix.kafka.config.ObservableCommitOrder.NoAck NoAck]] forcibly! * @param topicsRegex is the pattern of Kafka topics to subscribe to. */ def manualCommit[K, V](cfg: KafkaConsumerConfig, topicsRegex: Regex)(implicit diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableAutoCommit.scala b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableAutoCommit.scala index 6406c224..911bd97a 100644 --- a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableAutoCommit.scala +++ b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableAutoCommit.scala @@ -34,7 +34,7 @@ import scala.util.{Failure, Success} */ final class KafkaConsumerObservableAutoCommit[K, V] private[kafka] ( override protected val config: KafkaConsumerConfig, - override protected val consumer: Task[Consumer[K, V]]) + override protected val consumerT: Task[Consumer[K, V]]) extends KafkaConsumerObservable[K, V, ConsumerRecord[K, V]] { /* Based on the [[KafkaConsumerConfig.observableCommitType]] it @@ -68,12 +68,16 @@ final class KafkaConsumerObservableAutoCommit[K, V] private[kafka] ( s.executeAsync { () => val ackFuture = try consumer.synchronized { + val assignment = consumer.assignment() if (cancelable.isCanceled) Stop else { + consumer.resume(assignment) val next = blocking(consumer.poll(pollTimeoutMillis)) + consumer.pause(assignment) if (shouldCommitBefore) consumerCommit(consumer) // Feeding the observer happens on the Subscriber's scheduler // if any asynchronous boundaries happen + isAcked = false Observer.feed(out, next.asScala)(out.scheduler) } } catch { @@ -83,6 +87,7 @@ final class KafkaConsumerObservableAutoCommit[K, V] private[kafka] ( ackFuture.syncOnComplete { case Success(ack) => + isAcked = true // The `streamError` flag protects against contract violations // (i.e. onSuccess/onError should happen only once). // Not really required, but we don't want to depend on the @@ -106,6 +111,7 @@ final class KafkaConsumerObservableAutoCommit[K, V] private[kafka] ( } case Failure(ex) => + isAcked = true asyncCb.onError(ex) } } diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala index cb3dc5cf..4f83a455 100644 --- a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala +++ b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala @@ -25,6 +25,7 @@ import monix.reactive.observers.Subscriber import org.apache.kafka.clients.consumer.{Consumer, OffsetAndMetadata, OffsetCommitCallback} import org.apache.kafka.common.TopicPartition +import java.util import scala.concurrent.{blocking, Future} import scala.util.control.NonFatal import scala.util.{Failure, Success} @@ -36,7 +37,7 @@ import scala.jdk.CollectionConverters._ */ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] ( override protected val config: KafkaConsumerConfig, - override protected val consumer: Task[Consumer[K, V]]) + override protected val consumerT: Task[Consumer[K, V]]) extends KafkaConsumerObservable[K, V, CommittableMessage[K, V]] { // Caching value to save CPU cycles @@ -49,16 +50,26 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] ( k -> new OffsetAndMetadata(v) }.asJava)))) - override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] = - Task { - blocking( - consumer.synchronized( - consumer.commitAsync( - batch.map { case (k, v) => - k -> new OffsetAndMetadata(v) - }.asJava, - callback))) - } + override def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit] = { + Task + .async0[Unit] { (s, cb) => + val asyncCb = Callback.forked(cb)(s) + s.executeAsync { () => + val offsets = batch.map { case (k, v) => k -> new OffsetAndMetadata(v) }.asJava + val offsetCommitCallback = new OffsetCommitCallback { + def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], ex: Exception): Unit = + if (ex != null && !cb.tryOnError(ex)) { s.reportFailure(ex) } + else { cb.tryOnSuccess(()) } + } + try { + consumer.synchronized(consumer.commitAsync(offsets, offsetCommitCallback)) + } catch { + case NonFatal(ex) => + if (!asyncCb.tryOnError(ex)) { s.reportFailure(ex) } + } + } + } + } } override protected def ackTask(consumer: Consumer[K, V], out: Subscriber[CommittableMessage[K, V]]): Task[Ack] = @@ -71,13 +82,14 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] ( // Forced asynchronous boundary (on the I/O scheduler) s.executeAsync { () => - val ackFuture = - try consumer.synchronized { - if (cancelable.isCanceled) Stop - else { + val ackFuture: Future[Ack] = + if (cancelable.isCanceled) Stop + else { + try consumer.synchronized { + val assignment = consumer.assignment() + consumer.resume(assignment) val next = blocking(consumer.poll(pollTimeoutMillis)) - // Feeding the observer happens on the Subscriber's scheduler - // if any asynchronous boundaries happen + consumer.pause(assignment) val result = next.asScala.map { record => CommittableMessage( record, @@ -86,15 +98,20 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] ( record.offset() + 1, commit)) } + // Feeding the observer happens on the Subscriber's scheduler + // if any asynchronous boundaries happen + isAcked = false Observer.feed(out, result)(out.scheduler) + } catch { + case NonFatal(ex) => + Future.failed(ex) } - } catch { - case NonFatal(ex) => - Future.failed(ex) + } ackFuture.syncOnComplete { case Success(ack) => + isAcked = true // The `streamError` flag protects against contract violations // (i.e. onSuccess/onError should happen only once). // Not really required, but we don't want to depend on the @@ -117,6 +134,7 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] ( } case Failure(ex) => + isAcked = true asyncCb.onError(ex) } } diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaProducerConfig.scala b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaProducerConfig.scala index 90656598..20f87485 100644 --- a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaProducerConfig.scala +++ b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaProducerConfig.scala @@ -194,7 +194,7 @@ import scala.concurrent.duration._ * by this object can be set via the map, but in case of a duplicate * a value set on the case class will overwrite value set via properties. */ -case class KafkaProducerConfig( +final case class KafkaProducerConfig( bootstrapServers: List[String], acks: Acks, bufferMemoryInBytes: Int, @@ -272,7 +272,7 @@ case class KafkaProducerConfig( ) def toJavaMap: java.util.Map[String, Object] = - toMap.filter(_._2 != null).map{case (a, b) =>(a, b.asInstanceOf[AnyRef])}.asJava + toMap.filter(_._2 != null).map { case (a, b) => (a, b.asInstanceOf[AnyRef]) }.asJava def toProperties: Properties = { val props = new Properties() diff --git a/kafka-1.0.x/src/test/resources/logback-test.xml b/kafka-1.0.x/src/test/resources/logback-test.xml index cc97f771..1e022eb2 100644 --- a/kafka-1.0.x/src/test/resources/logback-test.xml +++ b/kafka-1.0.x/src/test/resources/logback-test.xml @@ -9,8 +9,17 @@ - + + + + + + + + + + diff --git a/kafka-1.0.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala b/kafka-1.0.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala index 6a03f5b4..56cdf168 100644 --- a/kafka-1.0.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala +++ b/kafka-1.0.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala @@ -9,7 +9,6 @@ import org.scalatest.{FunSuite, Matchers} import scala.concurrent.duration._ import scala.concurrent.Await import monix.execution.Scheduler.Implicits.global -import org.apache.kafka.clients.consumer.OffsetCommitCallback import org.apache.kafka.common.TopicPartition import org.scalacheck.Gen import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks @@ -19,7 +18,7 @@ class MergeByCommitCallbackTest extends FunSuite with KafkaTestKit with ScalaChe val commitCallbacks: List[Commit] = List.fill(4)(new Commit { override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit - override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] = + override def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit }) diff --git a/kafka-1.0.x/src/test/scala/monix/kafka/MonixKafkaTopicRegexTest.scala b/kafka-1.0.x/src/test/scala/monix/kafka/MonixKafkaTopicRegexTest.scala index f09c6a1b..d6f2335e 100644 --- a/kafka-1.0.x/src/test/scala/monix/kafka/MonixKafkaTopicRegexTest.scala +++ b/kafka-1.0.x/src/test/scala/monix/kafka/MonixKafkaTopicRegexTest.scala @@ -66,7 +66,6 @@ class MonixKafkaTopicRegexTest extends FunSuite with KafkaTestKit { } test("listen for one message when subscribed to topics regex") { - withRunningKafka { val producer = KafkaProducer[String, String](producerCfg, io) val consumer = KafkaConsumerObservable[String, String](consumerCfg, topicsRegex).executeOn(io) diff --git a/kafka-1.0.x/src/test/scala/monix/kafka/PollHeartbeatTest.scala b/kafka-1.0.x/src/test/scala/monix/kafka/PollHeartbeatTest.scala new file mode 100644 index 00000000..2be9232e --- /dev/null +++ b/kafka-1.0.x/src/test/scala/monix/kafka/PollHeartbeatTest.scala @@ -0,0 +1,236 @@ +package monix.kafka + +import monix.eval.Task +import monix.kafka.config.AutoOffsetReset +import monix.reactive.Observable +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.TopicPartition +import monix.execution.Scheduler.Implicits.global +import org.scalactic.source +import org.scalatest.FunSuite +import org.scalatest.concurrent.ScalaFutures + +import scala.concurrent.duration._ + +class PollHeartbeatTest extends FunSuite with KafkaTestKit with ScalaFutures { + + val topicName = "monix-kafka-tests" + + override implicit val patienceConfig: PatienceConfig = PatienceConfig(30.seconds, 100.milliseconds) + + val producerCfg: KafkaProducerConfig = KafkaProducerConfig.default.copy( + bootstrapServers = List("127.0.0.1:6001"), + clientId = "monix-kafka-1-0-producer-test" + ) + + val consumerCfg: KafkaConsumerConfig = KafkaConsumerConfig.default.copy( + bootstrapServers = List("127.0.0.1:6001"), + groupId = "kafka-tests", + clientId = "monix-kafka-1-0-consumer-test", + autoOffsetReset = AutoOffsetReset.Earliest + ) + + test("slow downstream with small poll heartbeat and manual async commit keeps the consumer assignment") { + withRunningKafka { + + val count = 250 + val topicName = "monix-kafka-manual-commit-tests" + val delay = 200.millis + val pollHeartbeat = 2.millis + val fastPollHeartbeatConfig = + consumerCfg.copy(maxPollInterval = 200.millis, maxPollRecords = 1).withPollHeartBeatRate(pollHeartbeat) + + val producer = KafkaProducer[String, String](producerCfg, io) + val consumer = KafkaConsumerObservable.manualCommit[String, String](fastPollHeartbeatConfig, List(topicName)) + + val pushT = Observable + .fromIterable(1 to count) + .map(msg => new ProducerRecord(topicName, "obs", msg.toString)) + .mapEval(producer.send) + .lastL + + val listT = consumer + .executeOn(io) + .doOnNextF { committableMessage => + Task.sleep(delay) *> + CommittableOffsetBatch(Seq(committableMessage.committableOffset)).commitAsync().executeAsync + } + .take(count) + .toListL + + val (committableMessages, _) = Task.parZip2(listT.executeAsync, pushT.executeAsync).runSyncUnsafe(100.seconds) + val CommittableMessage(lastRecord, lastCommittableOffset) = committableMessages.last + assert(pollHeartbeat * 10 < delay) + assert((1 to count).sum === committableMessages.map(_.record.value().toInt).sum) + assert(lastRecord.value().toInt === count) + assert(count === lastCommittableOffset.offset) + assert(new TopicPartition(topicName, 0) === lastCommittableOffset.topicPartition) + } + } + + test("auto committable consumer with slow processing doesn't cause rebalancing") { + withRunningKafka { + val count = 10000 + + val consumerConfig = consumerCfg.copy( + maxPollInterval = 200.millis, + heartbeatInterval = 10.millis, + maxPollRecords = 1 + ) + + val producer = KafkaProducerSink[String, String](producerCfg, io) + val consumer = KafkaConsumerObservable[String, String](consumerConfig, List(topicName)).executeOn(io) + + val pushT = Observable + .range(0, count) + .map(msg => new ProducerRecord(topicName, "obs", msg.toString)) + .bufferIntrospective(1024) + .consumeWith(producer) + + val listT = consumer + .take(count) + .map(_.value()) + .bufferTumbling(count / 4) + .mapEval(s => Task.sleep(2.second) >> Task.delay(s)) + .flatMap(Observable.fromIterable) + .toListL + + val (result, _) = Task.parZip2(listT.executeAsync, pushT.delayExecution(1.second).executeAsync).runSyncUnsafe() + assert(result.map(_.toInt).sum === (0 until count).sum) + } + } + + test("slow committable downstream with small poll heartbeat does not cause rebalancing") { + withRunningKafka { + val totalRecords = 1000 + val topicName = "monix-kafka-manual-commit-tests" + val downstreamLatency = 40.millis + val pollHeartbeat = 1.millis + val maxPollInterval = 10.millis + val maxPollRecords = 1 + val fastPollHeartbeatConfig = + consumerCfg + .copy(maxPollInterval = 200.millis, maxPollRecords = maxPollRecords) + .withPollHeartBeatRate(pollHeartbeat) + + val producer = KafkaProducer[String, String](producerCfg, io) + val consumer = KafkaConsumerObservable.manualCommit[String, String](fastPollHeartbeatConfig, List(topicName)) + + val pushT = Observable + .fromIterable(1 to totalRecords) + .map(msg => new ProducerRecord(topicName, "obs", msg.toString)) + .mapEval(producer.send) + .lastL + + val listT = consumer + .executeOn(io) + .mapEvalF { committableMessage => + val manualCommit = Task + .defer(committableMessage.committableOffset.commitAsync()) + .as(committableMessage) + Task.sleep(downstreamLatency) *> manualCommit + } + .take(totalRecords) + .toListL + + val (committableMessages, _) = + Task.parZip2(listT.executeAsync, pushT.delayExecution(100.millis).executeAsync).runSyncUnsafe() + val CommittableMessage(lastRecord, lastCommittableOffset) = committableMessages.last + assert(pollHeartbeat * 10 < downstreamLatency) + assert(pollHeartbeat < maxPollInterval) + assert(maxPollInterval < downstreamLatency) + assert((1 to totalRecords).sum === committableMessages.map(_.record.value().toInt).sum) + assert(lastRecord.value().toInt === totalRecords) + assert(totalRecords === lastCommittableOffset.offset) + assert(new TopicPartition(topicName, 0) === lastCommittableOffset.topicPartition) + } + } + + //unhappy scenario + test("slow committable downstream with small `maxPollInterval` and high `pollHeartBeat` causes consumer rebalance") { + withRunningKafka { + val totalRecords = 200 + val topicName = "monix-kafka-manual-commit-tests" + val downstreamLatency = 2.seconds + val pollHeartbeat = 15.seconds + val maxPollInterval = 100.millis + val fastPollHeartbeatConfig = + consumerCfg.copy(maxPollInterval = maxPollInterval, maxPollRecords = 1).withPollHeartBeatRate(pollHeartbeat) + + val producer = KafkaProducer[String, String](producerCfg, io) + val consumer = KafkaConsumerObservable.manualCommit[String, String](fastPollHeartbeatConfig, List(topicName)) + + val pushT = Observable + .fromIterable(1 to totalRecords) + .map(msg => new ProducerRecord(topicName, "obs", msg.toString)) + .mapEval(producer.send) + .lastL + + val listT = consumer + .executeOn(io) + .mapEvalF { committableMessage => + val manualCommit = Task + .defer(committableMessage.committableOffset.commitAsync()) + .as(committableMessage) + Task.sleep(downstreamLatency) *> manualCommit + } + .take(totalRecords) + .toListL + + assert(pollHeartbeat > downstreamLatency) + assert(maxPollInterval < downstreamLatency) + assert(fastPollHeartbeatConfig.pollHeartbeatRate === pollHeartbeat) + + val t = Task.parZip2(listT.executeAsync, pushT.executeAsync).map(_._1) + whenReady(t.runToFuture.failed) { ex => + assert(ex.getMessage.contains("the group has already rebalanced and assigned the partitions to another member")) + } + + } + } + + test("super slow committable downstream causes consumer rebalance") { + withRunningKafka { + val totalRecords = 3 + val topicName = "monix-kafka-manual-commit-tests" + val downstreamLatency = 55.seconds + val pollHeartbeat = 5.seconds + val maxPollInterval = 4.seconds + // the downstreamLatency is higher than the `maxPollInterval` + // but smaller than `pollHeartBeat`, kafka will trigger rebalance + // and the consumer will be kicked out of the consumer group. + val fastPollHeartbeatConfig = + consumerCfg.copy(maxPollInterval = maxPollInterval, maxPollRecords = 1).withPollHeartBeatRate(pollHeartbeat) + + val producer = KafkaProducer[String, String](producerCfg, io) + val consumer = KafkaConsumerObservable.manualCommit[String, String](fastPollHeartbeatConfig, List(topicName)) + + val pushT = Observable + .fromIterable(1 to totalRecords) + .map(msg => new ProducerRecord(topicName, "obs", msg.toString)) + .mapEval(producer.send) + .lastL + + val listT = consumer + .executeOn(io) + .doOnNextF { committableMessage => + val manualCommit = Task.defer(committableMessage.committableOffset.commitAsync()) + Task.sleep(downstreamLatency) *> manualCommit + } + .take(totalRecords) + .toListL + + assert(pollHeartbeat * 10 < downstreamLatency) + assert(maxPollInterval * 10 < downstreamLatency) + assert(fastPollHeartbeatConfig.pollHeartbeatRate === pollHeartbeat) + + implicit val patienceConfig: PatienceConfig = PatienceConfig(30.seconds, 100.milliseconds) + + val t = Task.parZip2(listT.executeAsync, pushT.executeAsync).map(_._1) + whenReady(t.runToFuture.failed) { ex => + assert(ex.getMessage.contains("the group has already rebalanced and assigned the partitions to another member")) + }(PatienceConfig(200.seconds, 1.seconds), source.Position.here) + } + } + +} diff --git a/kafka-1.0.x/src/test/scala/monix/kafka/SerializationTest.scala b/kafka-1.0.x/src/test/scala/monix/kafka/SerializationTest.scala index e1941115..1a80c91d 100644 --- a/kafka-1.0.x/src/test/scala/monix/kafka/SerializationTest.scala +++ b/kafka-1.0.x/src/test/scala/monix/kafka/SerializationTest.scala @@ -96,7 +96,7 @@ class SerializationTest extends FunSuite with KafkaTestKit { .map(_.value()) .toListL - val (result, _) = Await.result(Task.parZip2(listT, pushT).runToFuture, 60.seconds) + val (result, _) = Await.result(Task.parZip2(listT, pushT).runToFuture, 70.seconds) assert(result.map(_.value.toInt).sum === (0 until count).filter(_ % 2 == 0).sum) } } diff --git a/project/plugins.sbt b/project/plugins.sbt index 86808a1e..3026984f 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -5,3 +5,4 @@ addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "1.0.0") addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.8.1") addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.6.1") addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.2") +addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.3.7") \ No newline at end of file