Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poll heartbeat rate #250

Merged
merged 62 commits into from
Mar 24, 2022
Merged
Show file tree
Hide file tree
Changes from 61 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
0ad10cb
Async commit completed with callback (monix#94)
paualarco Jan 17, 2021
d56095d
Tests passing
paualarco Jan 17, 2021
bc18d91
Observable poll heartbeat rate
paualarco Jan 23, 2021
fa380f6
Resolves merge conflicts
paualarco Jan 23, 2021
7232568
Unused imports
paualarco Jan 23, 2021
32d238c
Multiple threads test failing
paualarco Jan 24, 2021
622b180
Test small poll heartbeat and slow upstream
paualarco Jan 24, 2021
1365c7c
ScalafmtAll
paualarco Jan 24, 2021
c6c1c32
Removes lambda anonymous class for scala 11 compatibility
paualarco Jan 24, 2021
512f56c
Scaladoc fix
paualarco Jan 24, 2021
2651712
MR Feedback
paualarco Mar 21, 2021
336a9f2
Unused import
paualarco Mar 21, 2021
b835310
Adding poll heartbeat tests
paualarco Mar 21, 2021
e612d64
l
paualarco Mar 21, 2021
ca5e98a
Improved rebalancing test scenarios
paualarco Mar 21, 2021
4bcf70c
a
paualarco Mar 21, 2021
f93ae9a
Improved test
paualarco Mar 22, 2021
1c8c5c1
Monix Kafka benchmarks for consumer, single and sink producer
paualarco Sep 27, 2020
0bf0bff
Benchmarks
paualarco Jan 24, 2021
4974c62
Fs2
paualarco Jan 27, 2021
f13d855
Heartbeat poll
paualarco Mar 22, 2021
d3a4425
Benchmarks updated
paualarco Mar 22, 2021
0dee12b
Private poll heartbeat rate
paualarco Mar 23, 2021
bc6eb42
Some improvements
paualarco Mar 23, 2021
eca6927
Fix unexpected consumed records
paualarco Mar 24, 2021
6165418
Benchmarks with different pollHeartbeatIntervals
paualarco Mar 25, 2021
610eb74
New benchmark results
paualarco Mar 25, 2021
f0861e7
More benchmarking
paualarco Mar 25, 2021
d37e3a2
Increase consumed records on perf test
paualarco Mar 26, 2021
fc1cc4a
Set kafka-clients dependency to 1.1.1
paualarco Mar 27, 2021
6a54a9c
RC8 - ProducerBenchmarks
paualarco Mar 27, 2021
450b086
Provided scala compat
paualarco Mar 27, 2021
20416f0
Downgrade scalacheck dependency
paualarco Mar 27, 2021
de79221
Clean up
paualarco Mar 27, 2021
2b3e3e0
Fixed topic regex tests
paualarco Mar 27, 2021
da099c4
Typo
paualarco Mar 27, 2021
a5e6577
Update embedded kafka dependency
paualarco Mar 28, 2021
874ea8a
Port back to older kafka subprojects
paualarco Mar 28, 2021
7bb610c
A
paualarco Mar 30, 2021
a3210c3
Make poll interval rate private on kafka 10 and 9
paualarco Mar 30, 2021
df39cd0
Bit of clean up and updated readme
paualarco Mar 30, 2021
ff7f422
Merge branch 'master' into poll-heartbeat-rate
paualarco Mar 30, 2021
6d1ebbf
Update changelog and trigger pipeline
paualarco Mar 31, 2021
38f7d47
Test indentation fix
paualarco Mar 31, 2021
bac3175
Add logback in test
paualarco Mar 31, 2021
444c2a2
Add unchanged topic partition test case
paualarco Apr 1, 2021
b0316b6
Kafka tests running on github actions
paualarco Apr 1, 2021
38b31c7
Renames github workflows folder
paualarco Apr 1, 2021
c61857f
Updates scala action
paualarco Apr 1, 2021
edeb179
Removes tests for kafka 0.9.x
paualarco Apr 2, 2021
8c09b6a
Add benchmark with autocomit async and sync scenarios
paualarco Apr 2, 2021
d54ed10
Removed benchmark results file
paualarco Apr 2, 2021
9bcd8ac
Some clean up
paualarco Apr 3, 2021
6f2fa50
Use pollTimeout and updated benchmark results
paualarco Apr 3, 2021
50c3d9d
ScalafmtAll
paualarco Apr 3, 2021
f7991ca
Ignore Ds_store file
paualarco Apr 3, 2021
1c71a68
Removes logback file from main
paualarco Apr 3, 2021
f05abe6
Remove unnecessary stuff
paualarco Apr 3, 2021
12e75a8
Remove unused import
paualarco Apr 3, 2021
10cbf9f
Benchmarks readme
paualarco Apr 9, 2021
32c5bd7
Small correction
paualarco Apr 10, 2021
1526702
MR Feedback
paualarco Apr 11, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
name: build

on: [push, pull_request]

jobs:

tests:
name: scala-${{ matrix.scala }} jdk-${{ matrix.java }} tests
runs-on: ubuntu-latest

strategy:
fail-fast: true
matrix:
java: [8]
scala: [2.11.12, 2.12.10]

steps:
- uses: actions/checkout@v2
- uses: olafurpg/setup-scala@v10
with:
java-version: "adopt@1.${{ matrix.java }}"

- name: Cache SBT Coursier directory
uses: actions/cache@v1
with:
path: ~/.cache/coursier/v1
key: ${{ runner.os }}-coursier-${{ hashFiles('**/*.sbt') }}
restore-keys: |
${{ runner.os }}-coursier-
- name: Cache SBT directory
uses: actions/cache@v1
with:
path: ~/.sbt
key: |
${{ runner.os }}-sbt-${{ hashFiles('project/build.properties') }}-${{ hashFiles('project/plugins.sbt') }}
restore-keys: ${{ runner.os }}-sbt-

- name: Run Tests for Kafka 0.10.x, Java ${{ matrix.java }} and Scala ${{ matrix.scala }}
run: sbt -J-Xmx6144m kafka10/test

- name: Run Tests for Kafka 0.11.x, Java ${{ matrix.java }} and Scala ${{ matrix.scala }}
run: sbt -J-Xmx6144m kafka11/test

- name: Run Tests for Kafka 1.x.x, Java ${{ matrix.java }} and Scala ${{ matrix.scala }}
run: sbt -J-Xmx6144m kafka1x/test
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,7 @@ project/plugins/project/
.scala_dependencies
.worksheet
.idea

.bsp/sbt.json

.DS_Store
37 changes: 0 additions & 37 deletions .travis.yml

This file was deleted.

19 changes: 19 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,22 @@

## Version 1.0.0-RC6 (January 23, 2021)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this one should be RC7 and Version 1.0.0-RC7 (April 18, 2020) below should be removed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected, sorry I only intended to add the entry for RC7 that was missing.


Depends on Monix 3.3.0

Main changes:

- PR #248: Enable '-Xfatal-warnings' flag (internal)
- PR #245: Use Kafka Producer Consumer interfaces

## Version 1.0.0-RC7 (April 18, 2020)

Depends on Monix 3.3.0

Main changes:

- PR #248: Enable '-Xfatal-warnings flag
- PR #245: Use Kafka Producer Consumer interfaces

## Version 1.0.0-RC6 (April 18, 2020)

Depends on Monix 3.x
Expand Down
20 changes: 9 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -256,20 +256,18 @@ val observable =

Enjoy!

### Caveats
### Internal poll heartbeat

[Issue#101](https://github.com/monix/monix-kafka/issues/101)
Starting from Kafka 0.10.1.0, there is `max.poll.interval.ms` setting:

The maximum delay between invocations of poll() when using consumer group management.
This places an upper bound on the amount of time that the consumer can be idle before
fetching more records. If poll() is not called before expiration of this timeout,
then the consumer is considered failed and the group will rebalance in order
Starting from Kafka _0.10.1.0_, there is `max.poll.interval.ms` setting that defines the maximum delay between
invocations of poll(), if it is not called in that interval, then the consumer is considered failed and the group will rebalance in order
to reassign the partitions to another member.

Since, monix-kafka backpressures until all records has been processed.
This could be a problem if processing takes time.
You can reduce `max.poll.records` if you are experiencing this issue.
This was an [issue](https://github.com/monix/monix-kafka/issues/101) in `monix-kafka`,
since poll is not called until all previous consumed ones were processed, so that slow downstream subscribers
were in risk of being kicked off the consumer group indefinitely.

It is resolved in `1.0.0-RC8` by introducing an internal poll heartbeat interval
that runs in the background keeping the consumer alive.

## How can I contribute to Monix-Kafka?

Expand Down
39 changes: 39 additions & 0 deletions benchmarks/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
---
version: '2'
services:

zookeeper:
image: confluentinc/cp-zookeeper:4.1.1
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

broker:
image: confluentinc/cp-server:5.5.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

1 change: 1 addition & 0 deletions benchmarks/project/build.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sbt.version=1.3.3
47 changes: 47 additions & 0 deletions benchmarks/results/consumer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@

## Consumer Benchmarks

The consumer benchmark covers the *manual* and *auto commit* consumer implementations of the different libraries.
The manual commit will also cover producing committing back the consumed offsets.
It also runs on different range `pollHeartbeatRate` [1, 10, 15, 100, 1000], which is an important configuration
implemented in this `monix-kafka` library.

## RC7
### 1fork 1thread
Benchmark Mode Cnt Score Error Units
ConsumerBenchmark.monix_auto_commit_async thrpt 7 13.097 ± 0.827 ops/s
ConsumerBenchmark.monix_auto_commit_sync thrpt 7 12.486 ± 1.087 ops/s
ConsumerBenchmark.monix_manual_commit thrpt 10 11.519 ± 2.247 ops/s

### 1 fork 3 threads
Benchmark Mode Cnt Score Error Units
ConsumerBenchmark.monix_auto_commit thrpt 10 16.186 ± 0.920 ops/s
ConsumerBenchmark.monix_manual_commit thrpt 10 16.319 ± 1.465 ops/s

## RC8 - (Introduces PollHeartbeatRate)
### 1fork 1thread
---
Benchmark Mode Cnt Score Error Units
ConsumerBenchmark.monixAsyncAutoCommitHeartbeat15ms thrpt 7 13.126 ± 1.737 ops/s
ConsumerBenchmark.monixSyncAutoCommitHeartbeat15ms thrpt 7 12.102 ± 1.214 ops/s
ConsumerBenchmark.monixManualCommitHeartbeat1000ms thrpt 7 2.978 ± 0.006 ops/s
ConsumerBenchmark.monixManualCommitHeartbeat100ms thrpt 7 9.961 ± 1.317 ops/s
ConsumerBenchmark.monixManualCommitHeartbeat10ms thrpt 7 13.346 ± 0.716 ops/s
ConsumerBenchmark.monixManualCommitHeartbeat15ms thrpt 7 13.454 ± 2.680 ops/s
ConsumerBenchmark.monixManualCommitHeartbeat1ms thrpt 7 14.281 ± 1.591 ops/s
ConsumerBenchmark.monixManualCommitHeartbeat50ms thrpt 7 11.900 ± 0.698 ops/s
---
### 1 fork 3 threads
Benchmark Mode Cnt Score Error Units
ConsumerBenchmark.monixAsyncAutoCommitHeartbeat15ms thrpt 7 16.966 ± 2.659 ops/s
ConsumerBenchmark.monixSyncAutoCommitHeartbeat15ms thrpt 7 15.083 ± 4.242 ops/s
ConsumerBenchmark.monixManualCommitHeartbeat1000ms thrpt 7 2.978 ± 0.006 ops/s
ConsumerBenchmark.monixManualCommitHeartbeat100ms thrpt 7 9.961 ± 1.317 ops/s
ConsumerBenchmark.monixManualCommitHeartbeat10ms thrpt 7 13.346 ± 0.716 ops/s
ConsumerBenchmark.monixManualCommitHeartbeat15ms thrpt 7 13.454 ± 2.680 ops/s
ConsumerBenchmark.monixManualCommitHeartbeat1ms thrpt 7 14.281 ± 1.591 ops/s
ConsumerBenchmark.monixManualCommitHeartbeat50ms thrpt 7 11.900 ± 0.698 ops/s

```sbt
sbt 'benchmarks/jmh:run -i 5 -wi 1 -f1 -t1 monix.kafka.benchmarks.ConsumerBenchmark.*'
```
17 changes: 17 additions & 0 deletions benchmarks/results/producer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
## Producer benchmarks

This section includes benchmarks for single and sink producers.

## RC7

### 10iterations 1fork 1thread
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we compare results of the same fork / threads setup?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep they should, will address that, although I think I just added them more as a base for future api changes since the producer logic was untouched.

Benchmark Mode Cnt Score Error Units
ProducerBenchmark.monix_single_producer thrpt 3 0.504 ± 0.668 ops/s
ProducerBenchmark.monix_sink_producer thrpt 3 1.861 ± 5.475 ops/s

## RC8

### 10iterations 1fork 3threads
Benchmark Mode Cnt Score Error Units
ProducerBenchmark.monix_single_producer thrpt 10 0.981 ± 0.202 ops/s
ProducerBenchmark.monix_sink_producer thrpt 10 3.241 ± 0.191 ops/s
25 changes: 25 additions & 0 deletions benchmarks/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<configuration>

<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>
%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n
</Pattern>
</layout>
</appender>

<logger name="monix.kafka" level="info" additivity="false">
<appender-ref ref="CONSOLE"/>
</logger>
<logger name="org.apache.kafka" level="error" additivity="false">
<appender-ref ref="CONSOLE"/>
</logger>
<logger name="org.apache.zookeeper" level="error" additivity="false">
<appender-ref ref="CONSOLE"/>
</logger>

<root level="warn">
<appender-ref ref="CONSOLE"/>
</root>

</configuration>
11 changes: 11 additions & 0 deletions benchmarks/src/main/scala/monix/kafka/benchmarks/BaseFixture.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package monix.kafka.benchmarks

import monix.eval.Coeval

import scala.util.Random

trait BaseFixture {
protected val brokerUrl = "127.0.0.1:9092"
protected val randomId: Coeval[String] = Coeval(Random.alphanumeric.filter(_.isLetter).take(20).mkString)
protected val monixTopic = "monix_topic"
}
Loading