Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove expensive shuffle of read data in KafkaIO when using sdf and commit offsets #31682

Merged
merged 4 commits into from
Aug 29, 2024

Conversation

scwhittle
Copy link
Contributor

The reshuffle adds a barrier so that the read records were committed before possibly committing offsets. However this is unnecessary as the DoFn is annotated with RequiresStableInput and the fusion barrier can be introduced with just shuffling the offsets.

In both cases it is possible for a commit offset to be committed back to kafka before the data has been entirely processed by the pipeline. However with support to drain data from the pipeline (such as in Cloud Dataflow) this allows for exactly-once semantics for KafkaIO via the committed offsets to the topics across pipeline drain and restart.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@scwhittle
Copy link
Contributor Author

@kennknowles I'd appreciate you to take a look and let me know if this makes sense with the beam model in general. I don't think that the Reshuffle adds any more guarantees about barriers for other runners than the combine itself would but could use some confirmation.

@scwhittle scwhittle requested a review from kennknowles June 25, 2024 10:46
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@kennknowles
Copy link
Member

I think the problem is that no runner actually implements RequiresStableInput. If Dataflow implemented it, the implemention would be a shuffle.

Copy link
Member

@kennknowles kennknowles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we assume RequiresStableInput does not work (which I'm almost certain it does not) then this is incorrect, no? It is easy to see that KafkaCommitOffset might have written offets to Kafka so it will never vend the messages again, but the main message processing could fail and need to retrieve them

@scwhittle
Copy link
Contributor Author

There is still a shuffle of the offsets due to the combine-per-key before the offset commit fn. If there was not that shuffle, having RequiresStableInput insert a shuffle would be ok because this shuffle would just be of the commit offsets not the read data and thus cheap.

We need to guarantee that offsets are committed to kafka only once the records have been processed by the system such that no records are lost if the pipeline is drained.
For dataflow streaming, given that all side-effects/outputs of a fused stage are committed atomically, it is sufficient if the committing of the offsets is in a subsequent fused stage (in this case enforced by combining per key to get max offset) as the processing of the records themselves either completes in the fused-stage reading the records or the effects of that processing are also part of the same atomic commit.

For other runners such as Flink, is Reshuffle more powerful than any other groupby key, i.e. does it insert some sort of checkpoint barrier?
If not, the existing solution of reshuffling the data+offsets doesn't seem like it guarantees any better that the commitfn runs after the record processing effects have been persisted in a checkpoint.
The fused graph with the reshuffle looks something like:

Read -> Reshuffled records+ offsets + fused processing  -> fused stage with more user processing                                                                                               
                                                       \-> per key combined fused stage commiting offsets

With flink, if data is just flowing through before checkpoint barrier, there is nothing to prevent offsets from being committed to Kafka before the checkpoint passes, so it seems this reshuffle doesn't provide any further guarantees.

@kennknowles
Copy link
Member

There is still a shuffle of the offsets due to the combine-per-key before the offset commit fn. If there was not that shuffle, having RequiresStableInput insert a shuffle would be ok because this shuffle would just be of the commit offsets not the read data and thus cheap.

We need to guarantee that offsets are committed to kafka only once the records have been processed by the system such that no records are lost if the pipeline is drained. For dataflow streaming, given that all side-effects/outputs of a fused stage are committed atomically, it is sufficient if the committing of the offsets is in a subsequent fused stage (in this case enforced by combining per key to get max offset) as the processing of the records themselves either completes in the fused-stage reading the records or the effects of that processing are also part of the same atomic commit.

For other runners such as Flink, is Reshuffle more powerful than any other groupby key, i.e. does it insert some sort of checkpoint barrier? If not, the existing solution of reshuffling the data+offsets doesn't seem like it guarantees any better that the commitfn runs after the record processing effects have been persisted in a checkpoint. The fused graph with the reshuffle looks something like:

Read -> Reshuffled records+ offsets + fused processing  -> fused stage with more user processing                                                                                               
                                                       \-> per key combined fused stage commiting offsets

With flink, if data is just flowing through before checkpoint barrier, there is nothing to prevent offsets from being committed to Kafka before the checkpoint passes, so it seems this reshuffle doesn't provide any further guarantees.

Yes, for Flink this really needs to be something that happens after the checkpoint is known to be durable. To do this right, we probably could use a semantic construct for that, which would also solve it for Dataflow.

Given that everything about this implementation depends on Dataflow's idiosyncracies, I'm OK with modifying it to further depend on them but have better performance.

@scwhittle scwhittle force-pushed the kafka_commits_without_shuffle branch 3 times, most recently from cc5e2b0 to f997145 Compare August 12, 2024 11:04
@scwhittle
Copy link
Contributor Author

This change causes an error when updating the Dataflow pipeline from previous version, but the update can be allowed by passing:

--transformNameMapping={"KafkaIO.Read/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/KafkaIO.ReadSourceDescriptors/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey":""}"

@kennknowles Is that sufficient or should I add some option to maintain the previous expensive behavior? Are there any other concerns with this change? Thanks!

@kennknowles
Copy link
Member

kennknowles commented Aug 12, 2024

This change causes an error when updating the Dataflow pipeline from previous version, but the update can be allowed by passing:

--transformNameMapping={"KafkaIO.Read/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/KafkaIO.ReadSourceDescriptors/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey":""}"

@kennknowles Is that sufficient or should I add some option to maintain the previous expensive behavior? Are there any other concerns with this change? Thanks!

Thanks for raising this. I hate to bring it in, because it is a pain, but we do have a mechanism where the user can pass --updateCompatibilityVersion and for this one we really ought to use it. Example at https://github.com/apache/beam/pull/28853/files#diff-b8cf6c3051a36c566f2f28f525449f456a88b05b3b4c17c814e6a55ba2ce36e9R77

For you, the work is to keep the old code as-is in a deprecated fork of expand that is activated if the user requests it. Users who value compatibility can/should set this field to the value of the version they launched the pipeline with.

This allows us to make update-incompatible changes to the default codepath without breaking users with long-running pipeline that want to upgrade the SDK for some other reason. It is fine for users who want this new improvement to have to pass the parameter you mentioned, or even to have to drain.

@scwhittle scwhittle force-pushed the kafka_commits_without_shuffle branch from f997145 to 5b1c418 Compare August 19, 2024 20:15
@scwhittle
Copy link
Contributor Author

@kennknowles PTAL, I added the legacy support back via the option you mentioned.

@scwhittle scwhittle force-pushed the kafka_commits_without_shuffle branch from 5b1c418 to 4f81e6c Compare August 19, 2024 20:23
Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @Abacn for label java.
R: @damondouglas for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

if (Comparators.lexicographical(Comparator.<String>naturalOrder())
.compare(requestedVersion, targetVersion)
< 0) {
useLegacyImplementation = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Readability: I prefer the approach of a hard fork, where the first thing you do with expand is to take a different path depending on the version. This keeps the cyclomatic complexity of the main logic to a minimum. It is also more precise than "legacy" which has some connotations but isn't as good as expand_2_59_0. I can see how here you are only changing a tiny bit of logic based on this, but I would still prefer distinct methods called where each method is a straight-line implementation for a particular version range.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK to merge and get it in the release ASAP but I really care about this follow-up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified PTAL. Fixed the test serialization issue. Running some manual update compatability checks.

@scwhittle scwhittle force-pushed the kafka_commits_without_shuffle branch from 8716f29 to 10d8cd9 Compare August 29, 2024 11:54
@scwhittle
Copy link
Contributor Author

Manually verified against dataflow service that:

  1. starting a pipeline without the change and then attempting to update with the change fails with update compatability.
  2. starting a pipeline without the change and then attempting to update with the updateCompatibilityVersion set to 2.59.1 succeeds and is compatible

@scwhittle scwhittle requested a review from kennknowles August 29, 2024 13:10

@RequiresStableInput
@ProcessElement
@SuppressWarnings("nullness") // startBundle guaranteed to initialize
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: don't suppress. Even though startbundle is guaranteed to initialize, it is not guaranteed that whoever is calling this class obeys the contract of calling methods in a particular order. In fact, it is incredibly common to get it wrong. (this style of class is bad, but it is too late now)

if (Comparators.lexicographical(Comparator.<String>naturalOrder())
.compare(requestedVersion, targetVersion)
< 0) {
return expand259Commits(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: would be preferable to have the parallel constructs appear as parallel in the code, e.g.

if (...259) {
  expand259commits
} else {
  expandcommits
}

whereas now we have some inline and some factored even though they are analogous

@kennknowles kennknowles merged commit bcee5d0 into apache:master Aug 29, 2024
20 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants