-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor StateFetcher #28755
Refactor StateFetcher #28755
Conversation
6e12a07
to
52553bb
Compare
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
42e0f78
to
1a6bcc7
Compare
Assigning reviewers. If you would like to opt out of this review, comment R: @AnandInguva added as fallback since no labels match configuration Available commands:
The PR bot will only process comments in the main thread (not review comments). |
assign to next reviewer |
I am not much of a Java expert. Assigning it to the next reviewer |
} | ||
} | ||
|
||
@SuppressWarnings("deprecation") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
localize and comment (for all suppressions)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done!
a lot of the SuppressWarnings around deprecation is needed for the side input cache, and creating the values/keys for the cache.
if there is a non-deprecated way of getting that data, i can change it to the new way and remove the annotation.
Run Dataflow Streaming ValidatesRunner |
Reminder, please take a look at this pr: @AnandInguva |
@AnandInguva @kennknowles any other comments for this PR? Thank you! |
r: @scwhittle |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
r: @scwhittle |
* | ||
* <p>If the side input was ready and null, returns {@literal Optional.absent()}. If the side | ||
* input was ready and non-null returns {@literal Optional.present(...)}. | ||
* <p>If the side input cached, throws {@code IllegalStateException} if the state is {@literal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be: If the side input was not cached?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
*/ | ||
private @Nullable <T> Optional<T> fetchSideInput( | ||
@SuppressWarnings({"deprecation", "unchecked"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you move the unchecked suppression to where cast is made (similar to before)?
what is deprecation suppression required for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getTagInternal
is deprecated with message:
this method will be removed entirely. The {@link PCollection} underlying a side
input, is part of the side input's specification with a {@link ParDo} transform, which will
obtain that information via a package-private channel.
done.
"Expected side input to be cached. Tag: " + view.getTagInternal().getId()); | ||
} | ||
|
||
return seenSideInput.orElseGet( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the orelseget seems more confusing than just structuring like:
if (seenSideInput.isPresent()) {
return seenSideInput;
}
if (state == CACHED_IN_WORK_ITEM) {
throw ...
}
return fetchSideInputFromWindmill();
// or just inline that method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
Map<BoundedWindow, SideInput<?>> tagCache = | ||
sideInputCache.computeIfAbsent(view.getTagInternal(), k -> new HashMap<>()); | ||
|
||
Optional<SideInput<T>> seenSideInput = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: cachedSideInput?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
private final AtomicLong totalMillisInState = new AtomicLong(); | ||
|
||
// The worker that created this state. Used to report lulls back to the worker. | ||
@SuppressWarnings("unused") // Affects a public api |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we remove the member variable and put the suppresion on the public constructor parameter instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
...ker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInput.java
Show resolved
Hide resolved
} | ||
|
||
@SuppressWarnings({ | ||
"unchecked" // cacheLoaderFn loads SideInput<T>, so value for Key is always SideInput<T>. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the comment here and elsewhere doesn't entirely explain why the cast is safe.
If we're using the cacheLoaderFn from this invocation then it's safe, but if there was a previous call to get for the same key with a different type and that was cached, the cast would be incorrect.
So I think the comment should be that the runner enforces that the same type is used for the same key. (we could perhaps enforce this with some instanceof check or making type part of the cache key?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, i added the PCollectionView.viewFn.typeDescriptor
to the cache key.
String unknownMaterializationFormatErrorMessage = | ||
String.format( | ||
"Unknown side input materialization format requested '%s'", | ||
view.getViewFn().getMaterialization().getUrn()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just use viewMaterializationUrn and remove suppression here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
String.format( | ||
"Unknown side input materialization format requested '%s'", | ||
view.getViewFn().getMaterialization().getUrn()); | ||
throw new IllegalStateException(unknownMaterializationFormatErrorMessage); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: just inline String.format or just inline +
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
@SuppressWarnings({ | ||
"deprecation" // Underlying ViewFn is needed to validate the materialization URN. | ||
}) | ||
String materializationUrn = view.getViewFn().getMaterialization().getUrn(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about a private helper method to get urn from view so you don't have to repeat suppression and explanation a lot
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
798a661
to
6060ac0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like you need to rebase as well, merge conflict
import javax.annotation.Nullable; | ||
|
||
/** | ||
* Entry in the side input cache that stores the value (null if not ready), and the encoded size of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ping
@@ -312,9 +349,9 @@ private Windmill.GlobalData buildGlobalDataResponse( | |||
return builder.build(); | |||
} | |||
|
|||
private Windmill.GlobalDataRequest buildGlobalDataRequest(String tag, ByteString version) { | |||
private Windmill.GlobalDataRequest buildGlobalDataRequest(String tag) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like you removed version because we were always setting it to ByteString.EMPTY when calling this. IT might be good to have a test where we set a different version and verify it is plumbed as expected.
I believe the version is tied to the encoded window so this could reflect that all the tests use a global window. Having a test with different interval windows and verifying they are cached separately would be nice (could do in separate PR if you'd rather since this isn't a regression in test coverage).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack added todo
6060ac0
to
74d45b7
Compare
done. |
74d45b7
to
7732ecf
Compare
Refactor and cleanup of StateFetcher in preparation for future changes
Cleaning up of StateFetcher
Changes will need to be made to this (or possibly just make a new class that takes in a WindmillWorkItemClient instead of MetricTrackingWindmillServer to make GetData calls) to support direct path, so doing some clean up before introducing further changes in a later CLs.
null
as a valid state. Currentlynull
is being used to represent the side input not being ready along side withOptional<T>
, so the interface was returning a@Nullable Optional<T>
. Opted to expose the theSideInput
instance directly, and checking forready
state instead ofnull
.StateFetcher
into its own directory under/streaming/sideinput
for organizationR: scwhittle
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.