Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Cross-Bundle BatchElements #29175

Merged
merged 14 commits into from
Nov 1, 2023
Merged

Conversation

jrmccluskey
Copy link
Contributor

Implements a stateful version of BatchElements that works across bundles, allowing for streaming pipelines that can batch elements in a dynamic fashion.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@jrmccluskey
Copy link
Contributor Author

Run Python_Coverage PreCommit

@jrmccluskey
Copy link
Contributor Author

Run Python_Dataframes PreCommit

@jrmccluskey
Copy link
Contributor Author

Run Python_Examples PreCommit

@jrmccluskey
Copy link
Contributor Author

Run Python_Transforms PreCommit

@codecov
Copy link

codecov bot commented Oct 30, 2023

Codecov Report

Merging #29175 (a58498c) into master (8911595) will decrease coverage by 0.08%.
Report is 172 commits behind head on master.
The diff coverage is 4.08%.

@@            Coverage Diff             @@
##           master   #29175      +/-   ##
==========================================
- Coverage   38.38%   38.31%   -0.08%     
==========================================
  Files         686      688       +2     
  Lines      101640   101907     +267     
==========================================
+ Hits        39018    39041      +23     
- Misses      61042    61286     +244     
  Partials     1580     1580              
Flag Coverage Δ
python 29.91% <4.08%> (-0.09%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Coverage Δ
sdks/python/apache_beam/transforms/util.py 37.63% <4.08%> (-2.34%) ⬇️

... and 20 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@jrmccluskey jrmccluskey changed the title [WIP] Implement Cross-Bundle BatchElements Implement Cross-Bundle BatchElements Oct 31, 2023
@jrmccluskey
Copy link
Contributor Author

R: @damccorm

Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

@jrmccluskey
Copy link
Contributor Author

Run Python_Coverage PreCommit

@jrmccluskey jrmccluskey added this to the 2.52.0 Release milestone Oct 31, 2023
Copy link
Contributor

@damccorm damccorm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a couple comments (one non-blocking for the future), otherwise LGTM


def expand(self, pcoll):
if getattr(pcoll.pipeline.runner, 'is_streaming', False):
raise NotImplementedError("Requires stateful processing (BEAM-2687)")
elif self._max_batch_dur is not None:
coder = coders.registry.get_coder(pcoll)
return pcoll | WithKeys(0) | ParDo(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking for this PR, but something we may want to consider; rather than using a single fixed key, does it make sense to try to have a single key per worker somehow? (one way to do this would be using multi_process_shared.py)

That way we're still batching per machine in a parallelizable way, but we get stateful batching across bundles.

The current implementation is likely still useful for many use cases where batching is not the expensive part (e.g. RunInference) or there are few workers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At one point I suggested keying with worker IDs, it may be worth coming back to that idea

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I missed that (or forgot), hopefully I wasn't against it initially?

Regardless, I like having this as our first pass, we can see how it performs and go from there

sdks/python/apache_beam/transforms/util.py Outdated Show resolved Hide resolved
Copy link
Contributor

@damccorm damccorm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM once checks pass

@damccorm damccorm merged commit 5d6c182 into apache:master Nov 1, 2023
73 of 74 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants