-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Reshuffle implementation in Java SDK #28853
Conversation
24360e1
to
b7c954b
Compare
b7c954b
to
c474e53
Compare
R: @robertwb |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
run java precommit |
The failure in the AWS2 tests was the same Spark resume from checkpoint test. It shouldn't even have been run in that suite. It looks like our invocations are a bit messed up (@damccorm FYI but I'm assuming this is just the same as it was in Jenkins). |
run java precommit |
I get the impression this somehow actually slows things down. Everything looks like a timeout. Or perhaps it just invalidates too much of the Gradle cache? |
I haven't figured out if this is flaky or environmental. I can't repro it locally. |
run java precommit |
@robertwb finally green - I don't think this accomplishes everything from the dev thread, just the SDK implementation. It does not impact runners that override or specially translate reshuffle. |
run dataflow validatesrunner |
run flink validatesrunner |
run spark validatesrunner |
run samza validatesrunner |
new DoFn<KV<K, ValueInSingleWindow<V>>, KV<K, V>>() { | ||
@Override | ||
public Duration getAllowedTimestampSkew() { | ||
return Duration.millis(Long.MAX_VALUE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The runner doesn't read/act on this value anywhere, does it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noting that this is not a change but a move. But no the runner does not act on it. In fact it is unsafe for this reason - the runner may well drop data that you have allowed to be output.
.apply( | ||
"RestoreOriginalWindows", | ||
Window.into(new RestoreWindowsFn<>(originalStrategy.getWindowFn()))) | ||
.apply("RestoreOriginalTimestamps", new RestoreTimestamps<>()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about the PaneInfo information?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ha of course. The whole point.
I got caught up in the fact that I could not restore windows and timestamps in a single ParDo and had to do a rewindowing. But in fact I don't think there is a way to restore the paneinfo without changing the model after all to at least allow direct manipulation of it.
PCollection<KV<K, ValueInSingleWindow<V>>> reified = | ||
input | ||
.apply("SetIdentityWindow", rewindow) | ||
.apply("ReifyOriginalMetadata", Reify.windowsInValue()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't this break backwards compatibility for everything using a shuffle (though I concede it's fixing a bug)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had that same concern and was coming down on the side of "we should change this anyhow". BUT looking into it, every single runner directly implements reshuffle. So this is essentially just a fix for the reference implementation and the default for runners that don't implement it yet (like Dataflow v2).
So I actually need a ValidatesRunner
test which I presume most runner will fail. But we don't have to worry about update incompatibility unless/until those runners come into compliance. I still think this change should happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we test that this does not break compatibility (even if the test is a one-off manual test)?
c474e53
to
31a4714
Compare
31a4714
to
90ac268
Compare
OK, I added the capability to |
Gotta go through and poke a few more files |
90ac268
to
c23cdaa
Compare
2dfa6a0
to
657b7e4
Compare
What's the status on this? |
java precommit timed out - perhaps due to cache invalidation due to modifying core SDK |
@@ -669,6 +670,15 @@ public void output(RestrictionT part) { | |||
public void outputWithTimestamp(RestrictionT part, Instant timestamp) { | |||
throw new UnsupportedOperationException(); | |||
} | |||
|
|||
@Override | |||
public void outputWindowedValue( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be easier to make this outputWindowedValue a default implementation for OutputReceiver to simplify this change for all the test implementations of OutputReceiver.
657b7e4
to
adead2f
Compare
The |
7817665
to
7a214a0
Compare
Failure is the error on |
7a214a0
to
ff6d4ba
Compare
ff6d4ba
to
7827d1d
Compare
Noting that this was green but I went ahead and added the old expansion as an option using the new update compat flag. I am going to use the flag to test compatibility on Dataflow v1. We expect the override makes this change irrelevant, so we don't want to actually bug users. And on v2 we want the incompatibility. |
Added internal Dataflow test that reloads the pipeline with changing the compatibility flag. |
It seems the newly added test beam_PostCommit_Java_PVR_Samza: https://github.com/apache/beam/actions/runs/7399747764/job/20131866953 beam_PostCommit_Java_PVR_Spark3_Streaming: https://github.com/apache/beam/actions/runs/7399737943/job/20131836389 beam_PostCommit_Java_PVR_Spark3_Batch: https://github.com/apache/beam/runs/20340841429 beam_PostCommit_Java_ValidatesRunner_ULR also |
This fixes the Reshuffle default implementation to preserve all metadata and be a true no-op, whereas before it did not preserve the pane info.
Regarding update compatibility, my understanding of runners that have long-lived streaming jobs and perform updates, as of this writing:
I went ahead and used the update compatibility flag anyhow just in case, so it can be reverted to exactly the old implementation on demand.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.