-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add getworkermetadata streaming rpc #27767
Conversation
...org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkerMetadataStream.java
Show resolved
Hide resolved
9e6fc7e
to
523e487
Compare
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
.../java/org/apache/beam/runners/dataflow/worker/windmill/ForwardingClientResponseObserver.java
Outdated
Show resolved
Hide resolved
private final AtomicLong startTimeMs; | ||
private final AtomicLong lastSendTimeMs; | ||
protected final AtomicLong lastSendTimeMs; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be reverted? don't see it accessed by subclass
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
@@ -73,9 +73,9 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win | |||
private final Executor executor; | |||
private final BackOff backoff; | |||
// Indicates if the current stream in requestObserver is closed by calling close() method | |||
private final AtomicBoolean streamClosed; | |||
protected AtomicBoolean streamClosed; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be reverted?
It seems at least it should be final, but also setting it as part of stream callback doesn't match the comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
} | ||
|
||
/** | ||
* GetWorkerMetadata is a server streaming API. There is no request observer since the client side |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that a lot of the changes here are necessary because this is not a bi-directional stream.
Beyond the code complexity of supporting this, I'm concerned about us not being able to send application-level heartbeat requests as we've had numerous issues in the past where sending was necessary to detect that the stream had died. See https://issues.apache.org/jira/browse/BEAM-10808 and #12688.
I'm thinking that we should change the metadata streaming rpc to be bi-directional instead of just server side streaming to make sure we don't hit similar issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
523e487
to
2cb875f
Compare
...apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkerMetadataStreamTest.java
Show resolved
Hide resolved
...org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkerMetadataStream.java
Outdated
Show resolved
Hide resolved
07ad823
to
32ba5c2
Compare
@@ -130,16 +127,18 @@ private static long debugDuration(long nowMs, long startMs) { | |||
protected abstract void onNewStream(); | |||
|
|||
/** Returns whether there are any pending requests that should be retried on a stream break. */ | |||
protected abstract boolean hasPendingRequests(); | |||
protected boolean hasPendingRequests() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert? Since this is related to correctness a default seems dangerous since it makes it possible to skip implementing correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
|
||
@AutoValue | ||
public abstract class WindmillEndpoints { | ||
public abstract ImmutableMap<String, String> globalDataServers(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment on what key and value are
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
public abstract class WindmillEndpoints { | ||
public abstract ImmutableMap<String, String> globalDataServers(); | ||
|
||
public abstract ImmutableList<String> windmillServers(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a comment about what string is
alternatively should it be some sort of endpoint object instead of a string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i guess we can convert the windmill server string to an object let me find the format
@@ -56,7 +56,8 @@ final class GrpcCommitWorkStream | |||
private final int streamingRpcBatchLimit; | |||
|
|||
private GrpcCommitWorkStream( | |||
CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub stub, | |||
Function<StreamObserver<StreamingCommitResponse>, StreamObserver<StreamingCommitWorkRequest>> | |||
commitWorkRpc, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: commitWorkRpc is a confusing name to me for a function. What about createCommitWorkRpcFn or startCommitWorkRpcFn etc?
ditto for the other files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
@Override | ||
protected void onResponse(WorkerMetadataResponse response) { | ||
metadataVersion.getAndUpdate( | ||
current -> updateMetadataVersion(current, response.getMetadataVersion())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we get a version before our current version this seems to leave metadataVersion at the more current version but then it goes ahead and returns the stale response instead of filtering it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done. good catch! tried to use the Atomic* non blocking to do the filtering.
return stubList | ||
.get(rand.nextInt(stubList.size())) | ||
.withDeadlineAfter( | ||
AbstractWindmillStream.DEFAULT_STREAM_RPC_DEADLINE_SECONDS, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this deadline is not being applied to stub returned above if only 1 stub
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
...apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkerMetadataStreamTest.java
Outdated
Show resolved
Hide resolved
"stale_global_data", | ||
WorkerMetadataResponse.Endpoint.newBuilder().setEndpoint("staleGlobalData").build()); | ||
|
||
testStub.injectWorkerMetadata( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see comment in code. It seems like this test should fail with the current logic. It would also pass if this injection is somehow not reaching the stream so maybe some wiring is wrong?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it failed (i didnt rerun it after i removed the locking)
passes now that the logic is fixed
32ba5c2
to
061e846
Compare
Assigning reviewers. If you would like to opt out of this review, comment R: @lostluck added as fallback since no labels match configuration Available commands:
The PR bot will only process comments in the main thread (not review comments). |
@@ -149,7 +146,7 @@ private StreamObserver<RequestT> requestObserver() { | |||
} | |||
|
|||
/** Send a request to the server. */ | |||
protected final void send(RequestT request) { | |||
protected void send(RequestT request) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this remain final? don't see new override
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missed it in the revert done.
*/ | ||
protected abstract void startThrottleTimer(); | ||
|
||
private StreamObserver<RequestT> requestObserver() { | ||
protected StreamObserver<RequestT> requestObserver() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this remain private? don't see new call site
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
@@ -252,7 +249,7 @@ public final Instant startTime() { | |||
return new Instant(startTimeMs.get()); | |||
} | |||
|
|||
private class ResponseObserver implements StreamObserver<ResponseT> { | |||
public class ResponseObserver implements StreamObserver<ResponseT> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, seems like this could remain private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
061e846
to
711830e
Compare
waiting for changes to be finalized in windmill api proto before continuing to work on this @scwhittle |
changes to add worker token support are independent, this review can proceed |
147b6c8
to
99b89a0
Compare
*/ | ||
final class ForwardingClientResponseObserver<ReqT, RespT> | ||
implements ClientResponseObserver<RespT, ReqT> { | ||
final class ForwardingClientResponseObserver<ResponseT, RequestT> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you rebase? I think a lot of this has already been merged with your previous PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
even after rebase this is showing up hmmm when i look at master https://github.com/apache/beam/blame/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/ForwardingClientResponseObserver.java it still shows it as the old code?
*/ | ||
public abstract ImmutableList<Endpoint> windmillServers(); | ||
|
||
@AutoValue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just checking that Autovalue automatically creates equals/hash methods
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Set<AbstractWindmillStream<?, ?>> streamRegistry, | ||
int logEveryNStreamFailures, | ||
JobHeader jobHeader, | ||
AtomicLong metadataVersion, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
integer? Can keep the AtomicLong internal for non-testing case to ensure it isn't modified externally
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still AtomicLong here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whoops misread the comment as for test only. done.
TestGetWorkMetadataRequestObserver requestObserver = | ||
new TestGetWorkMetadataRequestObserver(mockResponse); | ||
GetWorkerMetadataTestStub testStub = new GetWorkerMetadataTestStub(requestObserver); | ||
AtomicLong metadataVersion = new AtomicLong(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use some non-0 version, 0 might be correct if just default initialized on accident instead of propagated for example.
(sorry if some of these are duplicated, I was putting comments on some other snapshot I think, not sure where they'll show up).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done. no worries!
|
||
@Override | ||
public void onNext(WorkerMetadataRequest workerMetadataRequest) { | ||
responseObserver.onNext(response); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about removing this way to send responses and just having tests explicitly call injectWorkerMetadata?
I think it will be easier to see what tests are doing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
@@ -758,7 +758,8 @@ message WorkerMetadataResponse { | |||
// CommitWorkStream. Each response on this stream replaces the previous, and | |||
// connections to endpoints that are no longer present should be closed. | |||
message Endpoint { | |||
optional string endpoint = 1; | |||
optional string direct_endpoint = 1; | |||
optional string worker_token = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we want to remove direct_path_endpoints and just use work_endpoints
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
ab7544e
to
b4a7a05
Compare
@scwhittle do you have any more comments for this pull request? Thank you! |
private final AtomicLong metadataVersion; | ||
private final WorkerMetadataRequest workerMetadataRequest; | ||
private final ThrottleTimer getWorkerMetadataThrottleTimer; | ||
private final Consumer<WindmillEndpoints> serverMappingUpdater; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: serverMappingConsumer or serverMappingObserver?
Updater makes it sound a little like it is the other direction to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Set<AbstractWindmillStream<?, ?>> streamRegistry, | ||
int logEveryNStreamFailures, | ||
JobHeader jobHeader, | ||
AtomicLong metadataVersion, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still AtomicLong here
long currentMetadataVersion = metadataVersion.get(); | ||
long updatedMetadataVersion = | ||
metadataVersion.updateAndGet( | ||
current -> updateMetadataVersion(current, response.getMetadataVersion())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about inlining the non-logging part of updateMetadataVersion here, and moving the logging below to else? Seems clearer than logging in the separate method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
current -> updateMetadataVersion(current, response.getMetadataVersion())); | ||
|
||
if (updatedMetadataVersion > currentMetadataVersion) { | ||
serverMappingUpdater.accept(WindmillEndpoints.from(response)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need the atomic? I think that there is only one stream calling onResponse and thus those should be ordered. If we did have concurrent callers, this could be racy because you could have
current version is 0
T1: atomically updates to 1
T2: atomically updates to 2, calls accept
T1: resumes and calls accept
then the consumer has last observed the payload for 1.
So either:
- remove the atomic if it isn't needed and document the thread-safety
- or ensure that the ordering is maintained (ie could use synchronized and update version and call accept under it)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@Override | ||
protected void appendSpecificHtml(PrintWriter writer) { | ||
writer.format( | ||
"GetWorkerMetadataStream: version=[%d] , job_header=[%s]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could show the latest response? could be useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
(stubList.size() == 1 ? stubList.get(0) : stubList.get(rand.nextInt(stubList.size()))); | ||
|
||
return stub.withDeadlineAfter( | ||
AbstractWindmillStream.DEFAULT_STREAM_RPC_DEADLINE_SECONDS, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see other cl about withDeadlineAfter
either use some other mechanism or document that result of stub() should just be used and not cached.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could name stubWithAbsoluteDeadlineSet to make it obvious to callers instead of a comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, decided to keep it the same and change the name for simplicity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking again, the AbstractWindmillStream are used for longer than the deadline, they reconnect grpc streams internally. So I think you need to use the observer approach and not a fixed deadline.
return endpointBuilder.build(); | ||
} | ||
|
||
public abstract Optional<String> directEndpoint(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a better way to represent ipv6 address than String?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets use Inet6Address, i will update
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
632aef6
to
5728ff5
Compare
It looks like you added a merge commit, instead could you rebase your change? |
|
||
return directEndpointAddress == null | ||
? Optional.empty() | ||
: Optional.of((Inet6Address) directEndpointAddress); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this cast seems wrong if the above instanceof fails.
seems we should return empty in that case
since null will be false for instanceof you could just do a single check using that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
metadataVersion); | ||
} | ||
|
||
this.latestResponse = response; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't update lastResponse in the case it is stale
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
protected void onResponse(WorkerMetadataResponse response) { | ||
if (response.getMetadataVersion() > metadataVersion) { | ||
metadataVersion = response.getMetadataVersion(); | ||
serverMappingConsumer.accept(WindmillEndpoints.from(response)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you add sychronization for html rendering (see below), it would be good to keep the accept call out of the synchronized block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From AbstractStream header it sounds like you should not synchronize on this but some other object:
-
Synchronization on this is used to synchronize the gRpc stream state and internal data
- structures. Since grpc channel operations may block, synchronization on this stream may also
- block. This is generally not a problem since streams are used in a single-threaded manner.
- However, some accessors used for status page and other debugging need to take care not to require
- synchronizing on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i opted to use StampedLock https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/StampedLock.html cause it supports optimistic locking and allows for multiple readers.
protected void appendSpecificHtml(PrintWriter writer) { | ||
writer.format( | ||
"GetWorkerMetadataStream: version=[%d] , job_header=[%s], latest_response=[%s]", | ||
metadataVersion, workerMetadataRequest.getHeader(), latestResponse); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you need some synchronization for latestResponse etc since I believe the html rendering can be concurrent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done. added locking
(stubList.size() == 1 ? stubList.get(0) : stubList.get(rand.nextInt(stubList.size()))); | ||
|
||
return stub.withDeadlineAfter( | ||
AbstractWindmillStream.DEFAULT_STREAM_RPC_DEADLINE_SECONDS, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking again, the AbstractWindmillStream are used for longer than the deadline, they reconnect grpc streams internally. So I think you need to use the observer approach and not a fixed deadline.
// DirectPath endpoints to be used by user workers for streaming engine jobs. | ||
// DirectPath endpoints here are virtual IPv6 addresses of the windmill | ||
// workers. | ||
repeated Endpoint direct_path_endpoints = 4; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reserve the tag # so it is not reused
reserved 4;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see a newer commit. You're likely not done addressing the other comments yet but just fyi in case you're waiting on me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep changes are still in the repo, was just using this to keep track of what i was finished with will tag you when finished thanks!
94b56e1
to
7c2bf12
Compare
@scwhittle ready for another round of reviews! i had the WindmillServiceAddress.java file in the next CL in the chain, but I moved it to this one since it fits with the updates. |
7c2bf12
to
d97c48c
Compare
/** Guards access to metadataVersion and latestResponse. */ | ||
private final StampedLock metadataLock; | ||
|
||
private long metadataVersion; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use GuardedBy annotations
metadataVersion, | ||
new ThrottleTimer(), | ||
serverMappingUpdater, | ||
lock); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about just using @VisibleForTesting annotation instead of injecting lower-level things like the lock which seems like it adds boilerplate here and in the test as well as making it less obvious that it is just for testing.
or add a getMetadataVersion() method that the tests use
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, just reused the create method in the test.
*/ | ||
@Override | ||
protected void onResponse(WorkerMetadataResponse response) { | ||
long readStamp = metadataLock.tryOptimisticRead(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the complication of the StampedLock is beneficial here. Both the responses and status page are fairly infrequent and the logic in the critical section is minimal.
More complex locks can perform worse for such cases and is certainly more error-prone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done. using an object and synchronizing on it for read/write access
return Optional.ofNullable((Inet6Address) directEndpointAddress); | ||
} | ||
|
||
public abstract Optional<WindmillServiceAddress> directEndpoint(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move public accessors above all the private methods?
add some comments about how these shoudl be used or refer to the proto file and add a comment there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done. added comments and linked to proto and java files.
81fe06a
to
f92c3e6
Compare
back to you @scwhittle thank you! |
f92c3e6
to
8c3b580
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except for question about improving test with matchers. Feel free to ask someone else to review/merge
|
||
assertEquals( | ||
GLOBAL_DATA_ENDPOINTS.size(), testWindmillEndpointsConsumer.globalDataEndpoints.size()); | ||
testWindmillEndpointsConsumer.globalDataEndpoints.forEach( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are there matchers you could use instead? This is where having someone more familiar with Java could help review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
found them adding the dependency for the gradle file. I will update the tests to reflect and then submit
using google.Truth
8c3b580
to
3152fac
Compare
@scwhittle thanks for reviewing! |
Adding a stream for GetWorkerMetadata
Does not include (the following will be in a seperate CL)
r: @scwhittle @dustin12
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.