Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Kafka with Redistribute #32344

Merged
merged 5 commits into from
Sep 17, 2024

Conversation

Naireen
Copy link
Contributor

@Naireen Naireen commented Aug 28, 2024

Change KafkaIO read withDuplicates to warn but still commit offsets if configured

Two ways to enable commits,

  1. explicitly via commitOffsetsInFinalize()
  2. Via consumer config ENABLE_AUTO_COMMIT_CONFIG=true

If the first is true, redistribute can't be enabled
If the second is true, the pipeline can still be passed, but isn't the most optimal, since if the runner wants to enable duplicates, there is no point of introducing the additional overhead of checkpointing messages within Kafka itself., though it is not semantically incorrect. The first option uses internal beam state to track which messages have been processed for preventing duplicates, which doesn't make sense if we also have a runner hint that duplicates can be allowed.

Before this fix, it just meant customers weren't able to set withRedistribute on the transform, even if commits weren't configured.

Updated tests to catch this.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@Naireen Naireen marked this pull request as ready for review August 28, 2024 01:25
@Naireen
Copy link
Contributor Author

Naireen commented Aug 28, 2024

Reviewers: @scwhittle

Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @kennknowles for label java.
R: @ahmedabu98 for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@scwhittle
Copy link
Contributor

One other benefit of the committed offsets is if it aids customer visibility in to progress of partitions as that can be queried/displayed external to dataflow.

Do we need to disable the committing of offsets? I can see the argument that it might not make sense from an exactly-once perspective but given that there are other reasons and that the customer is configuring it explicitly can we just perhaps log a warning that the offsets may not reflect all processed data but still perform them?

@Naireen
Copy link
Contributor Author

Naireen commented Aug 28, 2024

One other benefit of the committed offsets is if it aids customer visibility in to progress of partitions as that can be queried/displayed external to dataflow.

That is a good point, so is an argument to still allow for it. What I dont understand though, is how does reshuffling/redistributing work with commiting offsets? Here is the current graph wiht offsets enabled (pulled from KafkaIO.java incase the formatting of the graph below isn't clear):

PCollection --> ParDo(ReadFromKafkaDoFn<KafkaSourceDescriptor, KV<KafkaSourceDescriptor, KafkaRecord>>) --> Reshuffle() --> Map(output KafkaRecord)
|
--> KafkaCommitOffset

At that point, if redistribute is enabled, does it make more sense to substitute the Reshuffle here for the Redistribute transform? Instead of inserting a reshuffle after the Map? (this would introduce another shuffle based on runner implementation)

If we go with the former approach, commits will still occur, but the commits of the commits can have duplicates (need to investigate what that can cause, or will it just be a no op if we attempt to commit the same offset internally in BEAM twice?)

@scwhittle
Copy link
Contributor

My PR to remove the Reshuffle for the commit offsets was just merged. So I think the question on if it should be a redistribute if configured might be moot now. But we could disallow commit offsets and redistribute in the expand259 path since it is still an issue there.

@Naireen Naireen force-pushed the fix_redistribute_with_kafka branch from e3af50a to aeb694c Compare August 30, 2024 21:55
@Naireen Naireen force-pushed the fix_redistribute_with_kafka branch from aeb694c to 9c37865 Compare August 31, 2024 03:58
@Naireen
Copy link
Contributor Author

Naireen commented Aug 31, 2024

Perfect, thats great!

Copy link
Contributor

github-actions bot commented Sep 7, 2024

Reminder, please take a look at this pr: @kennknowles @ahmedabu98

@ahmedabu98
Copy link
Contributor

R: @scwhittle

Copy link
Contributor

github-actions bot commented Sep 7, 2024

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

LOG.warn(
"commitOffsetsInFinalize() will not capture all work processed if set with withRedistribute()");
}
if (Boolean.TRUE.equals(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haven't seen this before, is this to handle null as a one-liner?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it occurs a few more times in this file as well.

if (Boolean.TRUE.equals(
kafkaRead.getConsumerConfig().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))) {
LOG.warn(
"config.ENABLE_AUTO_COMMIT_CONFIG doesn't need to be set with withRedistribute()");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't need to but it seems like it could still be desirable if they are using the offsets for monitoring.

Should we just remove this log? It seems the correctness is the same as if we were shuffling persistently though the window is perhaps larger if a drain is being performed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, removed it.

@Naireen
Copy link
Contributor Author

Naireen commented Sep 11, 2024

Run Java_Kafka_IO_Direct PreCommit

@Naireen Naireen force-pushed the fix_redistribute_with_kafka branch from c8f6dad to 3f6fc4e Compare September 11, 2024 20:18
Copy link
Contributor

@scwhittle scwhittle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you change the PR description to be a little clearer too, maybe
"Change KafkaIO read withDuplicates to warn but still commit offsets if configured"

@@ -648,6 +669,29 @@ public void testCommitOffsetsInFinalizeAndRedistributeWarnings() {
"Offsets committed due to usage of commitOffsetsInFinalize() may not capture all work processed due to use of withRedistribute()");
}

@Test
public void testCommitOffsetsInFinalizeAndRedistributeNoWarningsWithAllowDuplicates() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like it should be NoWarningsWithNoAllowDuplicates

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do want to mention commits are enabled, since if we do not commit offsets, enabling allow duplicates also has no warning.
updated test name to NoWarningsWithNoAllowDuplicatesAndCommitOffsets, and updated the other test name so its more clear that the two are testing behaviour with and without allowDuplicates=true.

@@ -1696,7 +1696,7 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
}

if (kafkaRead.isRedistributed()) {
if (kafkaRead.isCommitOffsetsInFinalizeEnabled()) {
if (kafkaRead.isCommitOffsetsInFinalizeEnabled() && kafkaRead.isAllowDuplicates()) {
LOG.warn(
"Offsets committed due to usage of commitOffsetsInFinalize() may not capture all work processed due to use of withRedistribute()");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update the log to reflect allow duplicates.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@Naireen Naireen force-pushed the fix_redistribute_with_kafka branch from 87fe8e2 to 2bb6e3c Compare September 16, 2024 22:06
@scwhittle scwhittle merged commit 1b2d21a into apache:master Sep 17, 2024
20 checks passed
reeba212 pushed a commit to reeba212/beam that referenced this pull request Dec 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants