-
Notifications
You must be signed in to change notification settings - Fork 38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CommitAsync fix #104
CommitAsync fix #104
Conversation
|
Looked at fs2 and noticed there's empty |
Thanks for the help, I really appreciate it. :) Sorry I'm not very active here, I have very busy time and whenever I spend it on Open Source I'm prioritizing Monix 3.0.0. Although I should have much more free time in 1-2 weeks and plan to update
If we end up doing it this way than I think it is necessary because I'm sure it would confuse everyone
Maybe we could share
I think so, it could fix #101 as well |
Added periodic consumer polling, not sure though if more strict checking required inside As far as I understand there's no way |
kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you guys once again. :) To my eye it looks good, have you had a chance to test it in regard to #101 ?
kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala
Outdated
Show resolved
Hide resolved
kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala
Show resolved
Hide resolved
I am going to try and add a test for this case, and after apply changes for previous versions |
Private conversation summary
My proposalAdd What kind of problems did I miss? I'm not sure that this decision will help with #101. |
if (!isAcked) { | ||
Task.evalAsync { | ||
consumer.synchronized { | ||
blocking(consumer.poll(0)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Avasil @voidconductor Looking at the alpakka implementation, they raise an error here if the poll result size != 0 (we might lose messages if its > 0). Should we do the same thing? Did you test that it won't overlap with the poll during ackTask
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All polls are synchronized on consumer, therefore they can't overlap.
About probability of losing records - consumer is paused right after poll
, according to pause
javadoc:
* Suspend fetching from the requested partitions. Future calls to {@link #poll(long)} will not return
* any records from these partitions until they have been resumed using {@link #resume(TopicPartition...)}.
Here we call poll(0)
without resuming, so it shouldn't return any records, and if it does - it will be apache kafka bug
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like many implementations do this check (though they don't use synchronized
I think), I will try to double check if it concerns us or not in the following days
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've looked around a bit, it seems like it is possible that poll(0)
with paused partitions could receive new partition assignment with new records for it
I haven't reproduced it yet, I will have to setup something because unfortunately I don't have any codebase to play around right now :D
If you'd like to experiment, I published a snapshot from this branch:
1.0.0-RC5-d7546d6-SNAPSHOT
Few links from other libraries:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've found out we can use commitAsync(Map.empty, null)
instead of poll(0)
to complete commit callbacks, as it also calls coordinator's invokeCompletedOffsetCommitCallbacks
internally
This means no risk of getting unexpected messages or losing data
There are downsides I see in this approach:
- looks like metadata is not requested and rebalance is not performed before next poll
- it won't refresh
max.poll.interval
timeout, so Consumer can be rebalanced if poll is not called beforemax.poll.interval.ms
#101 will not be solved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as it also calls coordinator's invokeCompletedOffsetCommitCallbacks internally
I'm also worried about depending on implementation detail (unless docs somewhere guarantee it) that could perhaps silently change between versions
@poslegm From the docs, its mentioned that when you poll with 0, it should:
So, it shouldn't fetch any records (if it does, we'll lose messages). My assumption is that it will just refresh the max poll interval ms timer. This is provided that it is behaving as whats documented. So, the actual poll should still happen during |
@voidconductor @Avasil @poslegm it's been long time since this one, I believe it is still to fix that bug, shall we rebase the PR branch onto master and get it reviewed once again? |
A little bit of both, so would appreciate if someone finishes this 🙂 I was thinking about storing records fetched with |
proper commitAsync implementation
removed callback from methods arguments as tasks itself now completed in this callback, and it was also never used actually used in previous implementations, so probably no api changes
had to change return type as there's quite unobvious logic of callbacks invocations (see related issue Manual commitAsync completes before actual commit #94)
I also have a few questions:
Do I need to clarify why there's
Task[Task[Unit]]
in scaladoc ofcommitAsync
?Is test I provided ok? I had to leave observable running or otherwise callbacks are never called.
Will add implementations and tests to older versions after review.