Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add getworkermetadata streaming rpc #27767

Merged
merged 1 commit into from
Sep 12, 2023

Conversation

m-trieu
Copy link
Contributor

@m-trieu m-trieu commented Aug 1, 2023

Adding a stream for GetWorkerMetadata

Does not include (the following will be in a seperate CL)

  1. Storing the metadata
  2. Using the metadata
  3. Adding stream to GrpcWindmillServer (I will add tests to GRPCWindmillServer as well)
  • Updated the type parameters for the *ResponseObserver.java files
  • Add unit tests (should do this for all of the stream classes)
  • make GetWorkerMetadata RPC a BiDi Stream

r: @scwhittle @dustin12


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@m-trieu m-trieu force-pushed the mtrieu-metadata-stream branch from 9e6fc7e to 523e487 Compare August 1, 2023 04:22
@github-actions
Copy link
Contributor

github-actions bot commented Aug 1, 2023

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

private final AtomicLong startTimeMs;
private final AtomicLong lastSendTimeMs;
protected final AtomicLong lastSendTimeMs;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be reverted? don't see it accessed by subclass

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@@ -73,9 +73,9 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
private final Executor executor;
private final BackOff backoff;
// Indicates if the current stream in requestObserver is closed by calling close() method
private final AtomicBoolean streamClosed;
protected AtomicBoolean streamClosed;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be reverted?
It seems at least it should be final, but also setting it as part of stream callback doesn't match the comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

}

/**
* GetWorkerMetadata is a server streaming API. There is no request observer since the client side
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that a lot of the changes here are necessary because this is not a bi-directional stream.

Beyond the code complexity of supporting this, I'm concerned about us not being able to send application-level heartbeat requests as we've had numerous issues in the past where sending was necessary to detect that the stream had died. See https://issues.apache.org/jira/browse/BEAM-10808 and #12688.

I'm thinking that we should change the metadata streaming rpc to be bi-directional instead of just server side streaming to make sure we don't hit similar issues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@m-trieu m-trieu force-pushed the mtrieu-metadata-stream branch from 523e487 to 2cb875f Compare August 4, 2023 02:54
@m-trieu m-trieu force-pushed the mtrieu-metadata-stream branch 2 times, most recently from 07ad823 to 32ba5c2 Compare August 4, 2023 17:15
@@ -130,16 +127,18 @@ private static long debugDuration(long nowMs, long startMs) {
protected abstract void onNewStream();

/** Returns whether there are any pending requests that should be retried on a stream break. */
protected abstract boolean hasPendingRequests();
protected boolean hasPendingRequests() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert? Since this is related to correctness a default seems dangerous since it makes it possible to skip implementing correctly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.


@AutoValue
public abstract class WindmillEndpoints {
public abstract ImmutableMap<String, String> globalDataServers();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on what key and value are

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

public abstract class WindmillEndpoints {
public abstract ImmutableMap<String, String> globalDataServers();

public abstract ImmutableList<String> windmillServers();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment about what string is

alternatively should it be some sort of endpoint object instead of a string?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess we can convert the windmill server string to an object let me find the format

@@ -56,7 +56,8 @@ final class GrpcCommitWorkStream
private final int streamingRpcBatchLimit;

private GrpcCommitWorkStream(
CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub stub,
Function<StreamObserver<StreamingCommitResponse>, StreamObserver<StreamingCommitWorkRequest>>
commitWorkRpc,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: commitWorkRpc is a confusing name to me for a function. What about createCommitWorkRpcFn or startCommitWorkRpcFn etc?

ditto for the other files

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@Override
protected void onResponse(WorkerMetadataResponse response) {
metadataVersion.getAndUpdate(
current -> updateMetadataVersion(current, response.getMetadataVersion()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we get a version before our current version this seems to leave metadataVersion at the more current version but then it goes ahead and returns the stale response instead of filtering it.

Copy link
Contributor Author

@m-trieu m-trieu Aug 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. good catch! tried to use the Atomic* non blocking to do the filtering.

return stubList
.get(rand.nextInt(stubList.size()))
.withDeadlineAfter(
AbstractWindmillStream.DEFAULT_STREAM_RPC_DEADLINE_SECONDS, TimeUnit.SECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this deadline is not being applied to stub returned above if only 1 stub

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

"stale_global_data",
WorkerMetadataResponse.Endpoint.newBuilder().setEndpoint("staleGlobalData").build());

testStub.injectWorkerMetadata(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see comment in code. It seems like this test should fail with the current logic. It would also pass if this injection is somehow not reaching the stream so maybe some wiring is wrong?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it failed (i didnt rerun it after i removed the locking)
passes now that the logic is fixed

@m-trieu m-trieu force-pushed the mtrieu-metadata-stream branch from 32ba5c2 to 061e846 Compare August 11, 2023 05:49
@github-actions
Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @lostluck added as fallback since no labels match configuration

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@@ -149,7 +146,7 @@ private StreamObserver<RequestT> requestObserver() {
}

/** Send a request to the server. */
protected final void send(RequestT request) {
protected void send(RequestT request) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this remain final? don't see new override

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missed it in the revert done.

*/
protected abstract void startThrottleTimer();

private StreamObserver<RequestT> requestObserver() {
protected StreamObserver<RequestT> requestObserver() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this remain private? don't see new call site

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@@ -252,7 +249,7 @@ public final Instant startTime() {
return new Instant(startTimeMs.get());
}

private class ResponseObserver implements StreamObserver<ResponseT> {
public class ResponseObserver implements StreamObserver<ResponseT> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, seems like this could remain private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@m-trieu m-trieu force-pushed the mtrieu-metadata-stream branch from 061e846 to 711830e Compare August 11, 2023 21:24
@m-trieu
Copy link
Contributor Author

m-trieu commented Aug 11, 2023

waiting for changes to be finalized in windmill api proto before continuing to work on this @scwhittle

@m-trieu
Copy link
Contributor Author

m-trieu commented Aug 15, 2023

changes to add worker token support are independent, this review can proceed
thanks! @scwhittle

@m-trieu m-trieu force-pushed the mtrieu-metadata-stream branch 3 times, most recently from 147b6c8 to 99b89a0 Compare August 16, 2023 01:07
*/
final class ForwardingClientResponseObserver<ReqT, RespT>
implements ClientResponseObserver<RespT, ReqT> {
final class ForwardingClientResponseObserver<ResponseT, RequestT>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you rebase? I think a lot of this has already been merged with your previous PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*/
public abstract ImmutableList<Endpoint> windmillServers();

@AutoValue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just checking that Autovalue automatically creates equals/hash methods

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set<AbstractWindmillStream<?, ?>> streamRegistry,
int logEveryNStreamFailures,
JobHeader jobHeader,
AtomicLong metadataVersion,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

integer? Can keep the AtomicLong internal for non-testing case to ensure it isn't modified externally

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still AtomicLong here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoops misread the comment as for test only. done.

TestGetWorkMetadataRequestObserver requestObserver =
new TestGetWorkMetadataRequestObserver(mockResponse);
GetWorkerMetadataTestStub testStub = new GetWorkerMetadataTestStub(requestObserver);
AtomicLong metadataVersion = new AtomicLong(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use some non-0 version, 0 might be correct if just default initialized on accident instead of propagated for example.

(sorry if some of these are duplicated, I was putting comments on some other snapshot I think, not sure where they'll show up).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. no worries!


@Override
public void onNext(WorkerMetadataRequest workerMetadataRequest) {
responseObserver.onNext(response);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about removing this way to send responses and just having tests explicitly call injectWorkerMetadata?
I think it will be easier to see what tests are doing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@@ -758,7 +758,8 @@ message WorkerMetadataResponse {
// CommitWorkStream. Each response on this stream replaces the previous, and
// connections to endpoints that are no longer present should be closed.
message Endpoint {
optional string endpoint = 1;
optional string direct_endpoint = 1;
optional string worker_token = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to remove direct_path_endpoints and just use work_endpoints

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@m-trieu m-trieu force-pushed the mtrieu-metadata-stream branch 2 times, most recently from ab7544e to b4a7a05 Compare August 23, 2023 16:37
@m-trieu
Copy link
Contributor Author

m-trieu commented Aug 25, 2023

@scwhittle do you have any more comments for this pull request? Thank you!

private final AtomicLong metadataVersion;
private final WorkerMetadataRequest workerMetadataRequest;
private final ThrottleTimer getWorkerMetadataThrottleTimer;
private final Consumer<WindmillEndpoints> serverMappingUpdater;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: serverMappingConsumer or serverMappingObserver?
Updater makes it sound a little like it is the other direction to me

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Set<AbstractWindmillStream<?, ?>> streamRegistry,
int logEveryNStreamFailures,
JobHeader jobHeader,
AtomicLong metadataVersion,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still AtomicLong here

long currentMetadataVersion = metadataVersion.get();
long updatedMetadataVersion =
metadataVersion.updateAndGet(
current -> updateMetadataVersion(current, response.getMetadataVersion()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about inlining the non-logging part of updateMetadataVersion here, and moving the logging below to else? Seems clearer than logging in the separate method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

current -> updateMetadataVersion(current, response.getMetadataVersion()));

if (updatedMetadataVersion > currentMetadataVersion) {
serverMappingUpdater.accept(WindmillEndpoints.from(response));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the atomic? I think that there is only one stream calling onResponse and thus those should be ordered. If we did have concurrent callers, this could be racy because you could have
current version is 0
T1: atomically updates to 1
T2: atomically updates to 2, calls accept
T1: resumes and calls accept
then the consumer has last observed the payload for 1.

So either:

  • remove the atomic if it isn't needed and document the thread-safety
  • or ensure that the ordering is maintained (ie could use synchronized and update version and call accept under it)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@Override
protected void appendSpecificHtml(PrintWriter writer) {
writer.format(
"GetWorkerMetadataStream: version=[%d] , job_header=[%s]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could show the latest response? could be useful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

(stubList.size() == 1 ? stubList.get(0) : stubList.get(rand.nextInt(stubList.size())));

return stub.withDeadlineAfter(
AbstractWindmillStream.DEFAULT_STREAM_RPC_DEADLINE_SECONDS, TimeUnit.SECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see other cl about withDeadlineAfter

either use some other mechanism or document that result of stub() should just be used and not cached.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could name stubWithAbsoluteDeadlineSet to make it obvious to callers instead of a comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, decided to keep it the same and change the name for simplicity

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking again, the AbstractWindmillStream are used for longer than the deadline, they reconnect grpc streams internally. So I think you need to use the observer approach and not a fixed deadline.

return endpointBuilder.build();
}

public abstract Optional<String> directEndpoint();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a better way to represent ipv6 address than String?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets use Inet6Address, i will update

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

@m-trieu m-trieu force-pushed the mtrieu-metadata-stream branch 3 times, most recently from 632aef6 to 5728ff5 Compare August 29, 2023 08:08
@scwhittle
Copy link
Contributor

It looks like you added a merge commit, instead could you rebase your change?


return directEndpointAddress == null
? Optional.empty()
: Optional.of((Inet6Address) directEndpointAddress);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this cast seems wrong if the above instanceof fails.
seems we should return empty in that case

since null will be false for instanceof you could just do a single check using that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

metadataVersion);
}

this.latestResponse = response;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't update lastResponse in the case it is stale

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

protected void onResponse(WorkerMetadataResponse response) {
if (response.getMetadataVersion() > metadataVersion) {
metadataVersion = response.getMetadataVersion();
serverMappingConsumer.accept(WindmillEndpoints.from(response));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you add sychronization for html rendering (see below), it would be good to keep the accept call out of the synchronized block.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From AbstractStream header it sounds like you should not synchronize on this but some other object:

  • Synchronization on this is used to synchronize the gRpc stream state and internal data

  • structures. Since grpc channel operations may block, synchronization on this stream may also
  • block. This is generally not a problem since streams are used in a single-threaded manner.
  • However, some accessors used for status page and other debugging need to take care not to require
  • synchronizing on this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i opted to use StampedLock https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/StampedLock.html cause it supports optimistic locking and allows for multiple readers.

protected void appendSpecificHtml(PrintWriter writer) {
writer.format(
"GetWorkerMetadataStream: version=[%d] , job_header=[%s], latest_response=[%s]",
metadataVersion, workerMetadataRequest.getHeader(), latestResponse);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need some synchronization for latestResponse etc since I believe the html rendering can be concurrent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. added locking

(stubList.size() == 1 ? stubList.get(0) : stubList.get(rand.nextInt(stubList.size())));

return stub.withDeadlineAfter(
AbstractWindmillStream.DEFAULT_STREAM_RPC_DEADLINE_SECONDS, TimeUnit.SECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking again, the AbstractWindmillStream are used for longer than the deadline, they reconnect grpc streams internally. So I think you need to use the observer approach and not a fixed deadline.

// DirectPath endpoints to be used by user workers for streaming engine jobs.
// DirectPath endpoints here are virtual IPv6 addresses of the windmill
// workers.
repeated Endpoint direct_path_endpoints = 4;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reserve the tag # so it is not reused

reserved 4;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a newer commit. You're likely not done addressing the other comments yet but just fyi in case you're waiting on me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep changes are still in the repo, was just using this to keep track of what i was finished with will tag you when finished thanks!

@m-trieu m-trieu force-pushed the mtrieu-metadata-stream branch from 94b56e1 to 7c2bf12 Compare September 6, 2023 01:05
@m-trieu
Copy link
Contributor Author

m-trieu commented Sep 6, 2023

@scwhittle ready for another round of reviews!

i had the WindmillServiceAddress.java file in the next CL in the chain, but I moved it to this one since it fits with the updates.

@m-trieu m-trieu force-pushed the mtrieu-metadata-stream branch from 7c2bf12 to d97c48c Compare September 6, 2023 01:10
/** Guards access to metadataVersion and latestResponse. */
private final StampedLock metadataLock;

private long metadataVersion;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use GuardedBy annotations

metadataVersion,
new ThrottleTimer(),
serverMappingUpdater,
lock);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about just using @VisibleForTesting annotation instead of injecting lower-level things like the lock which seems like it adds boilerplate here and in the test as well as making it less obvious that it is just for testing.

or add a getMetadataVersion() method that the tests use

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, just reused the create method in the test.

*/
@Override
protected void onResponse(WorkerMetadataResponse response) {
long readStamp = metadataLock.tryOptimisticRead();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the complication of the StampedLock is beneficial here. Both the responses and status page are fairly infrequent and the logic in the critical section is minimal.

More complex locks can perform worse for such cases and is certainly more error-prone.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. using an object and synchronizing on it for read/write access

return Optional.ofNullable((Inet6Address) directEndpointAddress);
}

public abstract Optional<WindmillServiceAddress> directEndpoint();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move public accessors above all the private methods?
add some comments about how these shoudl be used or refer to the proto file and add a comment there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. added comments and linked to proto and java files.

@m-trieu m-trieu force-pushed the mtrieu-metadata-stream branch 2 times, most recently from 81fe06a to f92c3e6 Compare September 7, 2023 04:48
@m-trieu
Copy link
Contributor Author

m-trieu commented Sep 7, 2023

back to you @scwhittle thank you!

@m-trieu m-trieu force-pushed the mtrieu-metadata-stream branch from f92c3e6 to 8c3b580 Compare September 7, 2023 05:11
Copy link
Contributor

@scwhittle scwhittle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except for question about improving test with matchers. Feel free to ask someone else to review/merge


assertEquals(
GLOBAL_DATA_ENDPOINTS.size(), testWindmillEndpointsConsumer.globalDataEndpoints.size());
testWindmillEndpointsConsumer.globalDataEndpoints.forEach(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are there matchers you could use instead? This is where having someone more familiar with Java could help review.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

found them adding the dependency for the gradle file. I will update the tests to reflect and then submit

using google.Truth

@m-trieu m-trieu force-pushed the mtrieu-metadata-stream branch from 8c3b580 to 3152fac Compare September 11, 2023 23:34
@m-trieu
Copy link
Contributor Author

m-trieu commented Sep 11, 2023

@scwhittle thanks for reviewing!
@lostluck I had to add a dependency to the gradle file to use Google Truth assertions in a unit test and wanted to make sure i did that correctly. Other than that, ready to merge thank you!

@lostluck lostluck merged commit 9b8644d into apache:master Sep 12, 2023
@m-trieu m-trieu deleted the mtrieu-metadata-stream branch June 10, 2024 15:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants