Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add WindmillConnectionsCache used to consume GetWorkerMetadata response and updates #28428

Closed
wants to merge 1 commit into from

Conversation

m-trieu
Copy link
Contributor

@m-trieu m-trieu commented Sep 12, 2023

WindmillConnectionsCache will be used to consume GetWorkerMetadata response and updates.

It exposes a separate read and write interface. The write interface will be used for the caller of GetWorkerMetadata stream to be able to pass the response along to the cache to consume. The read interface will be vended out to GetWorkStream, GetDataStream, and CommitWorkStream call sites in order to fetch connections directly to Streaming Engine Windmill Workers, or the Dispatcher.

WindmillConnectionCacheTokens will be used to attach to WorkItems to correctly call GetDataStream and CommitWorkStream.

Since the WindmillConnectionsCache's will be shared, it uses AtomicReference and synchronization around writes to the cache. Updates are all or nothing and done under a lock/synchronization, and reads fetch a snapshot of the of the AtomicReference's value to serve reads without locking.

r: @scwhittle


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

* com.google.auth.Credentials}. Note that this class should override every method that is not final
* and not static and call the delegate directly.
*
* <p>TODO: Replace this with an auto generated proxy which calls the underlying implementation
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@scwhittle do you have context on this (or knows who does)? I can just replace this class with whatever this comment is referring to, or add more text describing what needs to be done, but not quite sure what it is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it means some sort of annotation to generate a proxy class. So that if new methods are added to the Credentials, this doesn't have to be updated.

That said, I don't see why this class is needed at all from just looking here. It doesn't seem to modify any behavior, just wraps and forwards on to the underlying.

You could try removing it and see if it works (can be separate PR) or look through the history more, perhaps it used to do something and should have been removed when that functionality changed.

@github-actions
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

Copy link
Contributor

@scwhittle scwhittle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please ignore the nits for now, we can come back to them if still present with possible interface changes.

return new AutoValue_WindmillStream_GetWorkStream_GetWorkBudget.Builder();
}

public GetWorkBudget consumeBudgetUpdate(long bytes, long items) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consume made me think it was going to subtract

how about just increment or add? Doesn't really matter where it came from

Optional<CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub> getGlobalDataStub(
Windmill.GlobalDataId globalDataId);

Optional<WindmillApplianceGrpc.WindmillApplianceBlockingStub> getWindmillApplianceStub();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need to be part of this interface? Seems cleaner to keep appliance stuff separate.

* com.google.auth.Credentials}. Note that this class should override every method that is not final
* and not static and call the delegate directly.
*
* <p>TODO: Replace this with an auto generated proxy which calls the underlying implementation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it means some sort of annotation to generate a proxy class. So that if new methods are added to the Credentials, this doesn't have to be updated.

That said, I don't see why this class is needed at all from just looking here. It doesn't seem to modify any behavior, just wraps and forwards on to the underlying.

You could try removing it and see if it works (can be separate PR) or look through the history more, perhaps it used to do something and should have been removed when that functionality changed.

}

static Channel remoteChannel(
WindmillServiceAddress windmillServiceAddress, int windmillServiceRpcChannelTimeoutSec) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

windmillServiceRpcChannelTimeoutSec is a little unclear (ie could be entire channel lifetime, connect timeout etc)

how about
channelKeepAliveTimeoutSec?

import java.util.UUID;

@AutoValue
public abstract class WindmillConnectionCacheToken {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the tokens for Java? With garbage collection perhaps we can just hand out per-endpoint objects to associate with work items. Then we don't have to go back to the cache to lookup based upon the token for subsequent getdata/commit , we just have the endpoint object itself.

The cache can keep the same endpoint objects as metadata updates come in if the endpoint remains. If the endpoint is removed, we could note that in the object and subsequent calls using it could fail fast.

private final Random rand;

/**
* Writes are guarded by synchronization on "this". Reads are done grabbing the {@link
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it guarded by synchronization on dispatcherConnectionLock?

* href=https://medium.com/google-cloud/streaming-engine-execution-model-1eb2eef69a8e>Windmill
* API</a>
*/
WindmillConnection getNewWindmillWorkerConnection();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the direct user-worker to windmill worker mode, there is a set of endpoints that get work needs to be sent to not just 1.

With the multiple endpoints, we need to think through when we find out about them. One mechanism would be to have the existing interface and poll it periodically. But that introduces latency so we might want to move to where this connections cache either notifies something else via a Consumer etc of endpoints deltas or contains the streams itself.

@scwhittle
Copy link
Contributor

Should we close this one?

@damccorm
Copy link
Contributor

@m-trieu what are next steps here?

@scwhittle
Copy link
Contributor

I belive this is replaced by the other PRs, closing this one. Repush after rebasing if there are things worht keeping in this one.

@scwhittle scwhittle closed this Dec 15, 2023
@m-trieu m-trieu deleted the mtrieu-consume-md branch June 10, 2024 15:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants