Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add in redistribute option for Kafka Read #31347

Merged
merged 1 commit into from
Jul 9, 2024

Conversation

Naireen
Copy link
Contributor

@Naireen Naireen commented May 20, 2024

Add option to enable Kafka Read with redistribute afterwards to increase paralellism


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@Naireen Naireen force-pushed the kafka_alo branch 7 times, most recently from 4a4da1c to b1098df Compare May 22, 2024 19:42
@Naireen
Copy link
Contributor Author

Naireen commented May 22, 2024

Run Python_PVR_Flink PreCommit

Copy link
Member

@kennknowles kennknowles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes to the translation make sense to me. I would have expected it to at least basically work without it but it is good to update too.

addCountingAsserts(input, numElements);

kafkaIOExpectedLogs.verifyWarn(
"This will redistribute the load across the same number of shards as the Kafka source.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't Redistribute.arbitrarily currently create a random key for each record and just create way too many keys and that is the problem? In other words, numShards is used to decrease the number of keys, not to increase. And that is an implementation detail that actually could/should change in the future if we do something clever and actually make it a black box that uses e.g. least loaded algorithms.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're correct, I have removed this comment, and updated it to

"This will create a key per record, which is sub-optimal for most use cases." -> this is still an implementation detail, but is a beam implementation detail rather than from the underlying runner.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean about the implementation detail is that it is Dataflow-specific that we need fewer keys instead of doing something intelligent like bundling across keys (which is very much allowed by the model).

@Naireen Naireen force-pushed the kafka_alo branch 3 times, most recently from 7c1e99d to 4ada08d Compare May 24, 2024 22:28
Copy link

codecov bot commented May 24, 2024

Codecov Report

Attention: Patch coverage is 0% with 1 line in your changes missing coverage. Please review.

Project coverage is 71.41%. Comparing base (5c30b1d) to head (4ada08d).
Report is 122 commits behind head on master.

Current head 4ada08d differs from pull request most recent head f7c1974

Please upload reports for the commit f7c1974 to get more accurate results.

Files Patch % Lines
sdks/python/apache_beam/io/kafka.py 0.00% 1 Missing ⚠️
Additional details and impacted files
@@              Coverage Diff              @@
##             master   #31347       +/-   ##
=============================================
+ Coverage     68.55%   71.41%    +2.86%     
+ Complexity    14921     1474    -13447     
=============================================
  Files          2636      910     -1726     
  Lines        222080   113845   -108235     
  Branches      11825     1076    -10749     
=============================================
- Hits         152238    81299    -70939     
+ Misses        63647    30526    -33121     
+ Partials       6195     2020     -4175     
Flag Coverage Δ
python 81.37% <0.00%> (-0.10%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@Naireen Naireen force-pushed the kafka_alo branch 6 times, most recently from 3d5db64 to be0bf2a Compare May 28, 2024 19:37
@Naireen
Copy link
Contributor Author

Naireen commented May 28, 2024

Run Java_Examples_Dataflow PreCommit

@Naireen Naireen changed the title [WIP] Add in redistribute option for Kafka Read Add in redistribute option for Kafka Read May 28, 2024
@Naireen Naireen marked this pull request as ready for review May 28, 2024 21:13
@Naireen
Copy link
Contributor Author

Naireen commented May 28, 2024

Run Yaml_Xlang_Direct PreCommit

1 similar comment
@Naireen
Copy link
Contributor Author

Naireen commented May 28, 2024

Run Yaml_Xlang_Direct PreCommit

Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@Naireen
Copy link
Contributor Author

Naireen commented May 28, 2024

@reviewers

Will periodically repeat current tests, but failures look unrelated to current changes (example workflow is failling due to oath issues + unable to find source files for tests + issues with translation beam:external:java:sql:v1)

@Naireen
Copy link
Contributor Author

Naireen commented May 29, 2024

Run Yaml_Xlang_Direct PreCommit

@Naireen
Copy link
Contributor Author

Naireen commented May 29, 2024

Run Java_Examples_Dataflow PreCommit

if (kafkaRead.isRedistributed()) {
if (kafkaRead.isCommitOffsetsInFinalizeEnabled()) {
LOG.warn(
"Both isRedistribute and isCommitOffsetsInFinalizeEnabled are enabled, isRedistribute will take precendence");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redistribute is not guaranteed to do the checkpoint that is required by commitOffsetsInFinalize. Either we need to make both work correctly, or this should be a rejected configuration (throw an exception)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since its meant to be a hint to the runner, and the runner can choose what to do, I decided to fail this combination for now (instead of only failing if kafkaRead.isRedistributed() + allowDuplicates ==true)

@@ -904,6 +929,8 @@ public static class Configuration {
private Boolean commitOffsetInFinalize;
private Long consumerPollingTimeout;
private String timestampPolicy;
private Long redistributeNumShards;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you pass this to Redistribute you cast it to an int. So just use Integer here if it has to be that small (which is obviously plenty big since the goal is to reduce the number of keys).

Actually we should rename all of this to numKeys because "shard" is an element of processing and it is a Dataflow-specific quirk that key == shard even when it is not required.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

this.redistributeNumShards = redistributeNumShards;
}

public void setRedistribute(Boolean redistribute) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You also need to have allowDuplicates as a parameter. It makes sense to also Redistribute for exactly once use case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -948,6 +975,14 @@ public void setTimestampPolicy(String timestampPolicy) {
public void setConsumerPollingTimeout(Long consumerPollingTimeout) {
this.consumerPollingTimeout = consumerPollingTimeout;
}

public void setRedistributeNumShards(Long redistributeNumShards) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it is boxed, you can make it @Nullable and that way you can check whether it has been set or not. It is fine to rely on defaults to zero also, but then make it unboxed (if you can - I don't know how autovalue works with that)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left it as defaulting to zero.

Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @damccorm for label python.
R: @damondouglas for label java.
R: @shunping for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@Naireen Naireen force-pushed the kafka_alo branch 5 times, most recently from 4075df9 to f7c1974 Compare June 3, 2024 16:59
Copy link
Contributor

Reminder, please take a look at this pr: @damccorm @damondouglas @shunping

@damccorm
Copy link
Contributor

R: @kennknowles

@damccorm
Copy link
Contributor

damccorm commented Jun 14, 2024

(since you've started the review, I'm inclined to keep it sticky/stop the bot - feel free to say otherwise if appropriate)

Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

@kennknowles
Copy link
Member

Agree, keep it sticky.

@kennknowles kennknowles merged commit cf37997 into apache:master Jul 9, 2024
103 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants