Skip to content

Commit

Permalink
Poll heartbeat rate (#250)
Browse files Browse the repository at this point in the history
* Async commit completed with callback (#94)

Add consumer polling (#94)

AutoCommit polling logic (#94)

Remove nested task loop

Remove redundant atomic refs

Add test for (#101)

Apply changes to older versions

* Tests passing

* Observable poll heartbeat rate

* Resolves merge conflicts

* Unused imports

* Multiple threads test failing

* Test small poll heartbeat and slow upstream

* ScalafmtAll

* Removes lambda anonymous class for scala 11 compatibility

* Scaladoc fix

* MR Feedback

* Unused import

* Adding poll heartbeat tests

* l

* Improved rebalancing test scenarios

* a

* Improved test

* Monix Kafka benchmarks for consumer, single and sink producer

Block


Fix


Binded connection


First benchmark for kafka producer


Added kafka benchmark strategy plan


Added sink and consumer benchmarks 


Producer results


Akka 


Removed references to akka


a


Final

* Benchmarks

* Fs2

* Heartbeat poll

* Benchmarks updated

* Private poll heartbeat rate

* Some improvements

* Fix unexpected consumed records

* Benchmarks with different pollHeartbeatIntervals

* New benchmark results

* More benchmarking

* Increase consumed records on perf test

* Set kafka-clients dependency to 1.1.1

* RC8 - ProducerBenchmarks

* Provided scala compat

* Downgrade scalacheck dependency

* Clean up

* Fixed topic regex tests

* Typo

* Update embedded kafka dependency

* Port back to older kafka subprojects

* A

* Make poll interval rate private on kafka 10 and 9

* Bit of clean up and updated readme

* Update changelog and trigger pipeline

* Test indentation fix

* Add logback in test

* Add unchanged topic partition test case

* Kafka tests running on github actions

* Renames github workflows folder

* Updates scala action

* Removes tests for kafka 0.9.x


Revmoves travis-ci

* Add benchmark with autocomit async and sync scenarios

* Removed benchmark results file

* Some clean up

* Use pollTimeout and updated benchmark results

* ScalafmtAll

* Ignore Ds_store file

* Removes logback file from main

* Remove unnecessary stuff

* Remove unused import

* Benchmarks readme

* Small correction

* MR Feedback
  • Loading branch information
paualarco authored Mar 24, 2022
1 parent c83a57a commit fd89177
Show file tree
Hide file tree
Showing 67 changed files with 1,945 additions and 389 deletions.
45 changes: 45 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
name: build

on: [push, pull_request]

jobs:

tests:
name: scala-${{ matrix.scala }} jdk-${{ matrix.java }} tests
runs-on: ubuntu-latest

strategy:
fail-fast: true
matrix:
java: [8]
scala: [2.11.12, 2.12.10]

steps:
- uses: actions/checkout@v2
- uses: olafurpg/setup-scala@v10
with:
java-version: "adopt@1.${{ matrix.java }}"

- name: Cache SBT Coursier directory
uses: actions/cache@v1
with:
path: ~/.cache/coursier/v1
key: ${{ runner.os }}-coursier-${{ hashFiles('**/*.sbt') }}
restore-keys: |
${{ runner.os }}-coursier-
- name: Cache SBT directory
uses: actions/cache@v1
with:
path: ~/.sbt
key: |
${{ runner.os }}-sbt-${{ hashFiles('project/build.properties') }}-${{ hashFiles('project/plugins.sbt') }}
restore-keys: ${{ runner.os }}-sbt-

- name: Run Tests for Kafka 0.10.x, Java ${{ matrix.java }} and Scala ${{ matrix.scala }}
run: sbt -J-Xmx6144m kafka10/test

- name: Run Tests for Kafka 0.11.x, Java ${{ matrix.java }} and Scala ${{ matrix.scala }}
run: sbt -J-Xmx6144m kafka11/test

- name: Run Tests for Kafka 1.x.x, Java ${{ matrix.java }} and Scala ${{ matrix.scala }}
run: sbt -J-Xmx6144m kafka1x/test
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,7 @@ project/plugins/project/
.scala_dependencies
.worksheet
.idea

.bsp/sbt.json

.DS_Store
10 changes: 10 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@

## Version 1.0.0-RC7 (January 23, 2021)

Depends on Monix 3.3.0

Main changes:

- PR #248: Enable '-Xfatal-warnings' flag (internal)
- PR #245: Use Kafka Producer Consumer interfaces

## Version 1.0.0-RC6 (April 18, 2020)

Depends on Monix 3.x
Expand Down
20 changes: 9 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -256,20 +256,18 @@ val observable =

Enjoy!

### Caveats
### Internal poll heartbeat

[Issue#101](https://github.com/monix/monix-kafka/issues/101)
Starting from Kafka 0.10.1.0, there is `max.poll.interval.ms` setting:

The maximum delay between invocations of poll() when using consumer group management.
This places an upper bound on the amount of time that the consumer can be idle before
fetching more records. If poll() is not called before expiration of this timeout,
then the consumer is considered failed and the group will rebalance in order
Starting from Kafka _0.10.1.0_, there is `max.poll.interval.ms` setting that defines the maximum delay between
invocations of poll(), if it is not called in that interval, then the consumer is considered failed and the group will rebalance in order
to reassign the partitions to another member.

Since, monix-kafka backpressures until all records has been processed.
This could be a problem if processing takes time.
You can reduce `max.poll.records` if you are experiencing this issue.
This was an [issue](https://github.com/monix/monix-kafka/issues/101) in `monix-kafka`,
since poll is not called until all previous consumed ones were processed, so that slow downstream subscribers
were in risk of being kicked off the consumer group indefinitely.

It is resolved in `1.0.0-RC8` by introducing an internal poll heartbeat interval
that runs in the background keeping the consumer alive.

## How can I contribute to Monix-Kafka?

Expand Down
39 changes: 39 additions & 0 deletions benchmarks/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
---
version: '2'
services:

zookeeper:
image: confluentinc/cp-zookeeper:4.1.1
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

broker:
image: confluentinc/cp-server:5.5.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

1 change: 1 addition & 0 deletions benchmarks/project/build.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sbt.version=1.3.3
47 changes: 47 additions & 0 deletions benchmarks/results/consumer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@

## Consumer Benchmarks

The consumer benchmark covers the *manual* and *auto commit* consumer implementations of the different libraries.
The manual commit will also cover producing committing back the consumed offsets.
It also runs on different range `pollHeartbeatRate` [1, 10, 15, 100, 1000], which is an important configuration
implemented in this `monix-kafka` library.

## RC7
### 1fork 1thread
Benchmark Mode Cnt Score Error Units
ConsumerBenchmark.monix_auto_commit_async thrpt 7 13.097 ± 0.827 ops/s
ConsumerBenchmark.monix_auto_commit_sync thrpt 7 12.486 ± 1.087 ops/s
ConsumerBenchmark.monix_manual_commit thrpt 10 11.519 ± 2.247 ops/s

### 1 fork 3 threads
Benchmark Mode Cnt Score Error Units
ConsumerBenchmark.monix_auto_commit thrpt 10 16.186 ± 0.920 ops/s
ConsumerBenchmark.monix_manual_commit thrpt 10 16.319 ± 1.465 ops/s

## RC8 - (Introduces PollHeartbeatRate)
### 1fork 1thread
---
Benchmark Mode Cnt Score Error Units
ConsumerBenchmark.monixAsyncAutoCommitHeartbeat15ms thrpt 7 13.126 ± 1.737 ops/s
ConsumerBenchmark.monixSyncAutoCommitHeartbeat15ms thrpt 7 12.102 ± 1.214 ops/s
ConsumerBenchmark.monixManualCommitHeartbeat1000ms thrpt 7 2.978 ± 0.006 ops/s
ConsumerBenchmark.monixManualCommitHeartbeat100ms thrpt 7 9.961 ± 1.317 ops/s
ConsumerBenchmark.monixManualCommitHeartbeat10ms thrpt 7 13.346 ± 0.716 ops/s
ConsumerBenchmark.monixManualCommitHeartbeat15ms thrpt 7 13.454 ± 2.680 ops/s
ConsumerBenchmark.monixManualCommitHeartbeat1ms thrpt 7 14.281 ± 1.591 ops/s
ConsumerBenchmark.monixManualCommitHeartbeat50ms thrpt 7 11.900 ± 0.698 ops/s
---
### 1 fork 3 threads
Benchmark Mode Cnt Score Error Units
ConsumerBenchmark.monixAsyncAutoCommitHeartbeat15ms thrpt 7 16.966 ± 2.659 ops/s
ConsumerBenchmark.monixSyncAutoCommitHeartbeat15ms thrpt 7 15.083 ± 4.242 ops/s
ConsumerBenchmark.monixManualCommitHeartbeat1000ms thrpt 7 2.978 ± 0.006 ops/s
ConsumerBenchmark.monixManualCommitHeartbeat100ms thrpt 7 9.961 ± 1.317 ops/s
ConsumerBenchmark.monixManualCommitHeartbeat10ms thrpt 7 13.346 ± 0.716 ops/s
ConsumerBenchmark.monixManualCommitHeartbeat15ms thrpt 7 13.454 ± 2.680 ops/s
ConsumerBenchmark.monixManualCommitHeartbeat1ms thrpt 7 14.281 ± 1.591 ops/s
ConsumerBenchmark.monixManualCommitHeartbeat50ms thrpt 7 11.900 ± 0.698 ops/s

```sbt
sbt 'benchmarks/jmh:run -i 5 -wi 1 -f1 -t1 monix.kafka.benchmarks.ConsumerBenchmark.*'
```
21 changes: 21 additions & 0 deletions benchmarks/results/producer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
## Producer benchmarks

This section includes benchmarks for single and sink producers.

## RC7

### 10iterations 1fork 1thread
Benchmark Mode Cnt Score Error Units
ProducerBenchmark.monix_single_producer thrpt 3 0.504 ± 0.668 ops/s
ProducerBenchmark.monix_sink_producer thrpt 3 1.861 ± 5.475 ops/s

## RC8

### 10iterations 1fork 1thread
ProducerBenchmark.monix_single_producer thrpt 10 0.473 ± 0.070 ops/s
ProducerBenchmark.monix_sink_producer thrpt 10 2.009 ± 0.277 ops/s

### 10iterations 1fork 3threads
Benchmark Mode Cnt Score Error Units
ProducerBenchmark.monix_single_producer thrpt 10 0.981 ± 0.202 ops/s
ProducerBenchmark.monix_sink_producer thrpt 10 3.241 ± 0.191 ops/s
25 changes: 25 additions & 0 deletions benchmarks/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<configuration>

<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>
%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n
</Pattern>
</layout>
</appender>

<logger name="monix.kafka" level="info" additivity="false">
<appender-ref ref="CONSOLE"/>
</logger>
<logger name="org.apache.kafka" level="error" additivity="false">
<appender-ref ref="CONSOLE"/>
</logger>
<logger name="org.apache.zookeeper" level="error" additivity="false">
<appender-ref ref="CONSOLE"/>
</logger>

<root level="warn">
<appender-ref ref="CONSOLE"/>
</root>

</configuration>
11 changes: 11 additions & 0 deletions benchmarks/src/main/scala/monix/kafka/benchmarks/BaseFixture.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package monix.kafka.benchmarks

import monix.eval.Coeval

import scala.util.Random

trait BaseFixture {
protected val brokerUrl = "127.0.0.1:9092"
protected val randomId: Coeval[String] = Coeval(Random.alphanumeric.filter(_.isLetter).take(20).mkString)
protected val monixTopic = "monix_topic"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package monix.kafka.benchmarks

import monix.kafka.config.ObservableCommitOrder.BeforeAck
import monix.kafka.config.ObservableCommitType

import java.util.concurrent.TimeUnit
import monix.kafka.{KafkaConsumerObservable, KafkaProducerSink}
import monix.reactive.Observable
import org.apache.kafka.clients.producer.ProducerRecord
import org.openjdk.jmh.annotations.{BenchmarkMode, Fork, Measurement, Mode, OutputTimeUnit, Scope, State, Threads, Warmup, _}
import scala.concurrent.duration._

@State(Scope.Thread)
@BenchmarkMode(Array(Mode.Throughput))
@OutputTimeUnit(TimeUnit.SECONDS)
@Measurement(iterations = 1)
@Warmup(iterations = 1)
@Fork(1)
@Threads(3)
class ConsumerBenchmark extends MonixFixture {

var totalRecords: Int = 1100
val consumedRecords = 1000
var maxPollRecords: Int = 1

@Setup
def setup(): Unit = {
// preparing test data
Observable
.from(0 to totalRecords)
.map(i => new ProducerRecord[Integer, Integer](monixTopic, i))
.bufferTumbling(totalRecords)
.consumeWith(KafkaProducerSink(producerConf.copy(monixSinkParallelism = 10), io))
.runSyncUnsafe()
}

@Benchmark
def monixAsyncAutoCommitHeartbeat15ms(): Unit = {
val conf = consumerConf.value().copy(
maxPollRecords = maxPollRecords,
observableCommitType = ObservableCommitType.Async,
observableCommitOrder = BeforeAck)
.withPollHeartBeatRate(15.millis)

KafkaConsumerObservable[Integer, Integer](conf, List(monixTopic))
.take(consumedRecords)
.headL
.runSyncUnsafe()
}

@Benchmark
def monixSyncAutoCommitHeartbeat15ms(): Unit = {
val conf = consumerConf.value().copy(
maxPollRecords = maxPollRecords,
observableCommitType = ObservableCommitType.Sync,
observableCommitOrder = BeforeAck)
.withPollHeartBeatRate(15.millis)

KafkaConsumerObservable[Integer, Integer](conf, List(monixTopic))
.take(consumedRecords)
.headL
.runSyncUnsafe()
}

@Benchmark
def monixManualCommitHeartbeat1ms(): Unit = {
val conf = consumerConf.value().copy(maxPollRecords = maxPollRecords)
.withPollHeartBeatRate(1.millis)

KafkaConsumerObservable.manualCommit[Integer, Integer](conf, List(monixTopic))
.mapEvalF(_.committableOffset.commitAsync())
.take(consumedRecords)
.headL
.runSyncUnsafe()
}

@Benchmark
def monixManualCommitHeartbeat10ms(): Unit = {
val conf = consumerConf.value().copy(maxPollRecords = maxPollRecords)
.withPollHeartBeatRate(10.millis)

KafkaConsumerObservable.manualCommit[Integer, Integer](conf, List(monixTopic))
.mapEvalF(_.committableOffset.commitAsync())
.take(consumedRecords)
.headL
.runSyncUnsafe()
}

@Benchmark
def monixManualCommitHeartbeat15ms(): Unit = {
val conf = consumerConf.value().copy(maxPollRecords = maxPollRecords)
.withPollHeartBeatRate(15.millis)

KafkaConsumerObservable.manualCommit[Integer, Integer](conf, List(monixTopic))
.mapEvalF(_.committableOffset.commitAsync())
.take(consumedRecords)
.headL
.runSyncUnsafe()
}

@Benchmark
def monixManualCommitHeartbeat50ms(): Unit = {
val conf = consumerConf.value().copy(maxPollRecords = maxPollRecords)
.withPollHeartBeatRate(50.millis)

KafkaConsumerObservable.manualCommit[Integer, Integer](conf, List(monixTopic))
.mapEvalF(_.committableOffset.commitAsync())
.take(consumedRecords)
.headL
.runSyncUnsafe()
}

@Benchmark
def monixManualCommitHeartbeat100ms(): Unit = {
val conf = consumerConf.value().copy(maxPollRecords = maxPollRecords)
.withPollHeartBeatRate(100.millis)

KafkaConsumerObservable.manualCommit[Integer, Integer](conf, List(monixTopic))
.mapEvalF(_.committableOffset.commitAsync())
.take(consumedRecords)
.headL
.runSyncUnsafe()
}

@Benchmark
def monixManualCommitHeartbeat1000ms(): Unit = {
val conf = consumerConf.value().copy(maxPollRecords = maxPollRecords)
.withPollHeartBeatRate(1000.millis)

KafkaConsumerObservable.manualCommit[Integer, Integer](conf, List(monixTopic))
.mapEvalF(_.committableOffset.commitAsync())
.take(consumedRecords)
.headL
.runSyncUnsafe()
}


}
Loading

0 comments on commit fd89177

Please sign in to comment.