-
Notifications
You must be signed in to change notification settings - Fork 38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Poll heartbeat rate #250
Poll heartbeat rate #250
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stellar work, we're very close to finally fixing this issue! :D
} | ||
feedTask.runAsync(cb) | ||
val startConsuming = | ||
Resource |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We immediately use the Resource
so I feel like bracket
would be a better fit
kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala
Outdated
Show resolved
Hide resolved
kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala
Outdated
Show resolved
Hide resolved
kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala
Outdated
Show resolved
Hide resolved
kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala
Outdated
Show resolved
Hide resolved
kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala
Outdated
Show resolved
Hide resolved
@@ -77,4 +77,6 @@ kafka { | |||
monix.observable.commit.type = "sync" | |||
# Possible values: before-ack, after-ack or no-ack | |||
monix.observable.commit.order = "after-ack" | |||
# Internal kafka heartbeat that avoids partition reasignment when the upstream subscribers is slow. | |||
monix.observable.poll.heartbeat.rate.ms = 100 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any recommended ratio between poll heartbeats and max.poll.interval.ms
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Avasil in order to respond to that question, I have created benchmark with different poll intervals:
Benchmark Mode Cnt Score Error Units
ConsumerBenchmark.monix_auto_commit10ms thrpt 10 17.266 ± 2.231 ops/s
ConsumerBenchmark.monix_manual_commit_heartbeat1000ms thrpt 10 2.971 ± 0.009 ops/s
ConsumerBenchmark.monix_manual_commit_heartbeat100ms thrpt 10 9.477 ± 0.064 ops/s
ConsumerBenchmark.monix_manual_commit_heartbeat10ms thrpt 10 14.710 ± 1.660 ops/s
ConsumerBenchmark.monix_manual_commit_heartbeat1ms thrpt 10 15.494 ± 4.163 ops/s
Not sure why the performance is degraded with the increment of the pollHeartBeat
, from the numbers, I would say that 10.ms
would be the best fit. I have made the property configurable only internally, since I think the user does not need to know about it. Any advice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would expect that we could do this heartbeat very rarely.
Is it only for manual commit?
I think the commitAsync
can only complete after poll
so that could be a potential cause? Perhaps we could do a heartbeat on commit as well? Akka Streams seems to be doing something like that but I have only had a quick glance and could be completely wrong.
I have made the property configurable only internally, since I think the user does not need to know about it.
Is this the current state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I did that after seeing that altering this value could impact substantially performance and the user should not be aware of it.
Ley me try your suggestion of polling after commit!
kafka-0.10.x/src/test/scala/monix/kafka/MonixKafkaTopicListTest.scala
Outdated
Show resolved
Hide resolved
kafka-0.10.x/src/test/scala/monix/kafka/MonixKafkaTopicListTest.scala
Outdated
Show resolved
Hide resolved
} | ||
} | ||
|
||
test("slow batches autocommit processing doesn't cause rebalancing") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if there is a rebalancing caused by other reasons? For instance, if another consumer shuts down.
Can we test if we're protected against such scenarios?
I'm thinking that maybe some tests with multiple consumers could be useful. Or if there are some tools to force the repartitioning
@Avasil thank you for all the comments! Will address them and do review again 🧐 |
Block Fix Binded connection First benchmark for kafka producer Added kafka benchmark strategy plan Added sink and consumer benchmarks Producer results Akka Removed references to akka a Final
@Avasil first of all sorry for the amount of commits, it might make difficult to track the changes review, I assume they will be squashed at the end... I have written specific new cases in
Finally, I was also tempted to move from travis to github actions and splitting the tests by the different kafka versions. |
No worries, just let me know when you think it's good to re-review - thanks for the hard work!
I'll look at the benchmarks later but I feel like they can be unreliable if everything is hosted on one machine so it's hard to draw any conclusions
👍 Yeah, it needs to be done and I could setup sbt ci-release |
@Avasil I think it could already be re-reviewed :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't look too much into benchmarks but the code seems good to me!
Would be nice to give it a try in a real project though.
@allantl do you happen to still use monix-kafka
and would be able to try these changes if we release a snapshot ?
CHANGES.md
Outdated
@@ -1,3 +1,22 @@ | |||
|
|||
## Version 1.0.0-RC6 (January 23, 2021) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this one should be RC7 and Version 1.0.0-RC7 (April 18, 2020)
below should be removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected, sorry I only intended to add the entry for RC7 that was missing.
|
||
## RC7 | ||
|
||
### 10iterations 1fork 1thread |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we compare results of the same fork / threads setup?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep they should, will address that, although I think I just added them more as a base for future api changes since the producer logic was untouched.
Task.race(runLoop(c, out), pollHeartbeat(c).loopForever).void | ||
} { consumer => | ||
// Forced asynchronous boundary | ||
Task.evalAsync(consumer.synchronized(blocking(consumer.close()))).memoizeOnSuccess |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
memoize
seems to be redundant, is there any situation where the finalizer would be called more than once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Totally
That would be ideal, alternatively we may ask it in the main monix gitter channel too, and see if someone is interested in testing it. |
The main motivations for this PR is to tackle two existing issues #94 and #101.
Rebasing existing pending PR: #104, so credits for the author, @voidconductor 🙏
poll.max.records
. Consumer can be rebalanced if poll is not called beforemax.poll.interval.ms
#101.The polling rate is only configurable internally for performance analysis on
benchmarks
repos. Ideally I we would justThis polling is achieved by performing
poll(0)
from a task running concurrently in the background, which in case of error it the schedule will report it, without stopping the subscription. Although it should never happen since the heartbeat poll is only called if the consumer is paused, and both processes are call synchronized.