-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Integrate direct path #31902
Integrate direct path #31902
Conversation
Assigning reviewers. If you would like to opt out of this review, comment R: @shunping added as fallback since no labels match configuration Available commands:
The PR bot will only process comments in the main thread (not review comments). |
R: @scwhittle |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
01e8586
to
ff287f2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not finished, mostly looking at stream stuff to help find what is causing stuckness
/** Send a request to the server. */ | ||
protected final void send(RequestT request) { | ||
lastSendTimeMs.set(Instant.now().getMillis()); | ||
synchronized (this) { | ||
// Check if we should send after we acquire the lock. | ||
if (isShutdown()) { | ||
LOG.warn("Send called on a shutdown stream."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this is possible, don't log as customers don't like warning logs and open issues about them
if this should not be possible, perhaps better to throw an exception so that we notice and fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
} | ||
|
||
/** Starts the underlying stream. */ | ||
protected final void startStream() { | ||
// Add the stream to the registry after it has been fully constructed. | ||
streamRegistry.add(this); | ||
while (true) { | ||
while (!isShutdown.get()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd just remove this check since you do it first thing below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
return; | ||
} | ||
private void tryRestartStream() { | ||
if (!isShutdown()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks racy, we check shutdown above in isStreamDone with synchronization, but then if it is shutdown before here, we end up with an error but won't restart the stream or remove it from the registry.
I would remove this one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
setLastError(error); | ||
private synchronized boolean isStreamDone() { | ||
if (isShutdown() || (clientClosed.get() && !hasPendingRequests())) { | ||
streamRegistry.remove(AbstractWindmillStream.this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this kind of side effect is confusing in method that just sounds like an accessor
how about maybeTeardownStream()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment on return value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -176,7 +180,7 @@ private StreamingDataflowWorker( | |||
DataflowWorkerHarnessOptions options, | |||
HotKeyLogger hotKeyLogger, | |||
Supplier<Instant> clock, | |||
StreamingWorkerStatusReporter workerStatusReporter, | |||
Function<Supplier<Long>, StreamingWorkerStatusReporter> streamingWorkerStatusReporterFactory, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be a functional interface so you can document? it's unclear what the long supplier is
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
...va/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java
Show resolved
Hide resolved
() -> { | ||
try { | ||
send(extension); | ||
} catch (IllegalStateException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we handle illegalstateexception internally? or rely on executeSafely to catch it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
.../java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java
Show resolved
Hide resolved
@@ -182,6 +184,12 @@ protected void onResponse(StreamingCommitResponse response) { | |||
} | |||
} | |||
|
|||
@Override | |||
protected void shutdownInternal() { | |||
pending.values().forEach(pendingRequest -> pendingRequest.onDone.accept(CommitStatus.ABORTED)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm worried if pending has somethign inserted between iterating and clear
can you instead use an iterator where you remove as you go so everything removed is guaranteed to be aborted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -163,7 +165,7 @@ protected void onResponse(StreamingCommitResponse response) { | |||
continue; | |||
} | |||
PendingRequest done = pending.remove(requestId); | |||
if (done == null) { | |||
if (done == null && !isShutdown()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is going to get nullptr exception below, instead move the isShutdown check to whether or not to log inside this if
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
bbf2778
to
1e02463
Compare
@@ -302,4 +301,12 @@ public Integer create(PipelineOptions options) { | |||
return streamingOptions.isEnableStreamingEngine() ? Integer.MAX_VALUE : 1; | |||
} | |||
} | |||
|
|||
/** EnableStreamingEngine defaults to false unless one of the two experiments is set. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like just 1 experiment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
.../worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
Outdated
Show resolved
Hide resolved
.../worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
Outdated
Show resolved
Hide resolved
...taflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
Outdated
Show resolved
Hide resolved
...pache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java
Outdated
Show resolved
Hide resolved
...pache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamShutdownException.java
Show resolved
Hide resolved
* Only send the next value if the phaser is not terminated by the time we acquire the lock since | ||
* the phaser can be terminated at any time. | ||
*/ | ||
private void tryOnNext(T value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure these phaser checks are necessary since the outboundObserver itself should stop blocking for onNext if the notifier is terminated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but if the phaser is terminated, we don't want to call outboundObserver.onNext() right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even if you do check here there is still race between checking the phaser and calling onNext regardless. Internally the outboundObserver is already observing the phaser termination via getPhase() (and also blocking respecting phaser termination) so the extra check is just mental overhead I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
return; | ||
} | ||
} catch (TimeoutException e) { | ||
if (isReadyNotifier.isTerminated()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't think you need this, awaitAdvanceInterruptibly will return -1 if it's terminated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -39,7 +39,9 @@ | |||
@ThreadSafe | |||
public final class DirectStreamObserver<T> implements StreamObserver<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe this change can be submitted separately? Would be nice to have a test for it showing the previous bug as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, I was thinking why DirectStreamObserver needs to be modified for direct path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to remove the blocking outside of the lock or else terminating the stream will be held up by the deadline waiting for a response
@@ -67,17 +70,18 @@ public <T extends GetWorkBudgetSpender> void distributeBudget( | |||
GetWorkBudgetSpender getWorkBudgetSpender = streamAndDesiredBudget.getKey(); | |||
GetWorkBudget desired = streamAndDesiredBudget.getValue(); | |||
GetWorkBudget remaining = getWorkBudgetSpender.remainingBudget(); | |||
if (isBelowFiftyPercentOfTarget(remaining, desired)) { | |||
if (isBelowFiftyPercentOfTarget(remaining, desired) && isActiveWorkBudgetAware) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is unclear to me. It seems like in one case the adjustment is addtiive and another is resetting
Wouldn't we want to only increase by the desired-remaining in either case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whether its overriding or additive is depending on the internal implementation of GetWorkBudgetSpender.adjustBudget()
based on the current implementation that this is reflecting, we do not account for remaining budget here it is handled internally in adjustBudget.
4820b9f
to
d03b0bc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
didn't get through everything but sending comments I had
+ "<th>Active For</th>" | ||
+ "<th>State</th>" | ||
+ "<th>State Active For</th>" | ||
+ "<th>Produced By</th>" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: produced is confusing since we use that for shuffle terminology. How about just "Backend"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
getWorkBudgetRefresher.requestBudgetRefresh(); | ||
getWorkBudgetDistributor.distributeBudget(newStreams.values(), totalGetWorkBudget); | ||
|
||
// Close the streams outside the lock. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I don't know if we need to block on closing streams. I think we might want to block on creating the new ones.
one possible thing is that if we are leaking stuff and they never actually close we might not know. But if we have the stream registry I think we'd see that. Or we can add some logging to closeAllStreams that it is taking a long time.
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Show resolved
Hide resolved
private final Supplier<StreamObserver<RequestT>> requestObserverSupplier; | ||
|
||
@GuardedBy("this") | ||
private volatile @Nullable StreamObserver<RequestT> delegateRequestObserver; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need both, synchronization also ensures that memory changes are viewed by other threads. Volatile just ensures one fields's changes are visible to other threads.
This seems like a good overview: https://blogs.oracle.com/javamagazine/post/java-thread-synchronization-volatile-final-atomic-deadlocks
} | ||
|
||
@Override | ||
public synchronized void onCompleted() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rm synchronized?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still synchronized
} | ||
streamRegistry.stream() | ||
.sorted(Comparator.comparing(AbstractWindmillStream::backendWorkerToken)) | ||
.collect( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I missed the multimap bit. I agree the grouping is nice. However isn't the sorting then unnecessary if we're putting it in a map right away?
@@ -264,24 +284,18 @@ protected void startThrottleTimer() { | |||
|
|||
@Override | |||
public void adjustBudget(long itemsDelta, long bytesDelta) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we name this setBudget?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} catch (AppendableInputStream.InvalidInputStreamStateException | ||
| VerifyException | ||
| CancellationException e) { | ||
handleShutdown(request); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make shutdown handling consistent?
we throw an exception for shutdown here but if we don't run the loop due to isShutdown we just return from this function without doing anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we do throw a WindmillStreamShutdownException at the bottom if we exit the loop
private void handleShutdown(QueuedRequest request) { | ||
if (isShutdown()) { | ||
throw new WindmillStreamShutdownException( | ||
"Cannot send request=[" + request + "] on closed stream."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we keep this, should we pass in the exception above to add as a suppressed exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
} | ||
|
||
ImmutableList<String> createStreamCancelledErrorMessage() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add parameter for the limit or just inline above? We're building up a possibly big list just to ignore most of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
d95d1b5
to
9bad2d7
Compare
@scwhittle ready for another look! I will resolve the merge conflicts and rebase |
9bad2d7
to
08a9c1e
Compare
4464453
to
d0a199d
Compare
@scwhittle |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I didn't finish up reviewing and will be on vacation next week. Perhaps you could have Arun take a look.
In particular I need some more time to go over the stream changes as that affects the dispatcher path too.
? WindmillStreamPool.create( | ||
1, GET_DATA_STREAM_TIMEOUT, windmillServer::getDataStream) | ||
: getDataStreamPool); | ||
statusPagesBuilder |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can the status page stuff be moved after this block so it can be shared for the SE paths? It seems like same stuff is duplicated for direct and not
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the only similarity is the DebugCapture manager I can consolidate that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consolidated in builder construction
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved creation to after harness intialization block
@@ -402,14 +455,15 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o | |||
options, | |||
new HotKeyLogger(), | |||
clock, | |||
workerStatusReporter, | |||
workerStatusReporterFactory, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we just pass in the builder prepopulated instead of a factory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wanted to constrain the interface to not allow callers to modify the other members
but if passing in a builder is preferred i can do that
private void consumeWorkerMetadata(WindmillEndpoints windmillEndpoints) { | ||
synchronized (metadataLock) { | ||
// Only process versions greater than what we currently have to prevent double processing of | ||
// metadata. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add comemnt: the consumer is single-threaded so we maintain ordering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// Only process versions greater than what we currently have to prevent double processing of | ||
// metadata. | ||
if (windmillEndpoints.version() > metadataVersion) { | ||
metadataVersion = windmillEndpoints.version(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe it woudl be good to have pendingMetadataVersion (updated here) and activeMetadataVersion (which you update after consuming). That could help debugging since it could show if we're stuck on some old version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
we could add that information to the status pages (probably separate PR).
we only check pendingMetadataVersion then since pending == active if there are no updates pending
@@ -155,9 +155,11 @@ void closeAllStreams() { | |||
|
|||
@Override | |||
public void adjustBudget(long itemsDelta, long bytesDelta) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name setBudget as well?
update variables to not be delta, adjustment if it is just a total budget request
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -123,6 +124,8 @@ private static Optional<HostAndPort> tryParseDirectEndpointIntoIpV6Address( | |||
directEndpointAddress.getHostAddress(), (int) endpointProto.getPort())); | |||
} | |||
|
|||
public abstract long version(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// The metadata version increases with every modification.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// Keep trying to create the stream. | ||
} | ||
} | ||
} | ||
|
||
// We were never able to start the stream, remove it from the stream registry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Otherwise it is removed when closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
public void shutdown() { | ||
public final void shutdown() { | ||
// Don't lock here as isShutdown checks are used in the stream to free blocked | ||
// threads or as exit conditions to loops. | ||
if (isShutdown.compareAndSet(false, true)) { | ||
requestObserver() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think requestObserver is thread-safe though? do we need to synchronize it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
requestObserver() is thread safe with its own internal synchronization.
} | ||
|
||
@Override | ||
public synchronized void onCompleted() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still synchronized
synchronized (this) { | ||
pending.put(id, pendingRequest); | ||
for (int i = 0; | ||
i < serializedCommit.size(); | ||
i < serializedCommit.size() && !isShutdown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd remove the shutdown check since it's not consistent with other paths and probably won't matter if the send just won't do anything anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
19b50d9
to
745d87c
Compare
@Nullable ChannelzServlet channelzServlet = null; | ||
Consumer<PrintWriter> getDataStatusProvider; | ||
Supplier<Long> currentActiveCommitBytesProvider; | ||
if (isDirectPathPipeline(options)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this have to be set during pipeline creation? Thoughts on supporting on/off based on job settings?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sync'd offline
eventually will be propogated in workerMetadata, but there are some immutable settings that the VM needs to have that supports this, so we check here or fail.
… Future proofs this logic for direct path.
… have it implement WorkProvider interface. Move class to windmill/work/provider directory, update visibility for dependent classes and move tests, add GetWorkBudgetOwnerInterface
… have it implement WorkProvider interface. Move class to windmill/work/provider directory, update visibility for dependent classes and move tests, add GetWorkBudgetOwnerInterface
… have it implement WorkProvider interface. Move class to windmill/work/provider directory, update visibility for dependent classes and move tests, add GetWorkBudgetOwnerInterface
… WindmillEndpoints and don't process any version that is older than the current version in FanOutStreamingEngineWorkerHarness
…a_redistributesBudget()
…ady 600 seconds anyway. Move DEFAULT_STREAM_RPC_DEADLINE_SECONDS to where it is being used and remove references in tests
a59733f
to
6df1adf
Compare
// IsolationChannel will create and manage separate RPC channels to the same | ||
// serviceAddress via calling the channelFactory, else just directly return the | ||
// RPC channel. | ||
workerOptions.getUseWindmillIsolatedChannels() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could choose to not support this for direct path and add it later if needed.
If we are keeping it, i think we need to use a different option to avoid confusing with the usage and rollout of the these flags in cloud path. In cloud path isolated channels are enabled based on this flag or job setting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking more, will it be better to default to isolated channels enabled? That'll be the closest to what we'll have in cloud path after #32782
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still need to look at the Streams classes, sending some more comments I've.
started = true; | ||
} | ||
|
||
public ImmutableSet<HostAndPort> currentWindmillEndpoints() { | ||
return connections.get().windmillConnections().keySet().stream() | ||
return connections.get().windmillStreams().keySet().stream() | ||
.map(Endpoint::directEndpoint) | ||
.filter(Optional::isPresent) | ||
.map(Optional::get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will filtering out IPV6 below prevent direct path endpoints from showing up in channelz?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need that filter i am going to remove the IPV6 since we don't use that anymore
} | ||
|
||
private synchronized CompletableFuture<ImmutableMap<Endpoint, WindmillStreamSender>> | ||
createAndStartNewStreams(Collection<Endpoint> newWindmillConnections) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
createAndStartNewStreams(Collection<Endpoint> newWindmillConnections) { | |
createAndStartNewStreams(ImmutableSet<Endpoint> newWindmillConnections) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
MoreFutures.allAsList( | ||
newWindmillConnections.stream() | ||
.map( | ||
connection -> | ||
MoreFutures.supplyAsync( | ||
() -> | ||
Pair.of( | ||
connection, | ||
Optional.ofNullable(currentStreams.get(connection)) | ||
.orElseGet( | ||
() -> createAndStartWindmillStreamSender(connection))), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MoreFutures.allAsList( | |
newWindmillConnections.stream() | |
.map( | |
connection -> | |
MoreFutures.supplyAsync( | |
() -> | |
Pair.of( | |
connection, | |
Optional.ofNullable(currentStreams.get(connection)) | |
.orElseGet( | |
() -> createAndStartWindmillStreamSender(connection))), | |
MoreFutures.allAsList( | |
newWindmillEndpoints.stream() | |
.map( | |
endpoint -> | |
MoreFutures.supplyAsync( | |
() -> | |
Pair.of( | |
endpoint, | |
Optional.ofNullable(currentStreams.get(endpoint)) | |
.orElseGet( | |
() -> createAndStartWindmillStreamSender(endpoint))), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
WindmillConnection connection) { | ||
// Initially create each stream with no budget. The budget will be eventually assigned by the | ||
// GetWorkBudgetDistributor. | ||
private WindmillStreamSender createAndStartWindmillStreamSender(Endpoint connection) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private WindmillStreamSender createAndStartWindmillStreamSender(Endpoint connection) { | |
private WindmillStreamSender createAndStartWindmillStreamSender(Endpoint endpoint) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
previousMetadataVersion, | ||
activeMetadataVersion); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
previousMetadataVersion and activeMetadataVersion are same here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
closeStaleStreams( | ||
newWindmillEndpoints.windmillEndpoints(), connections.get().windmillStreams()); | ||
ImmutableMap<Endpoint, WindmillStreamSender> newStreams = | ||
createAndStartNewStreams(newWindmillEndpoints.windmillEndpoints()).join(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can defer the join()
to be after creating the globalDataStreams.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
global data streams are created in createAndStartNewStreams
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well not really created
we pass a factory to thr StreamGetDataClient
and it will get created whenever the user code fetches side input
WindmillConnection connection) { | ||
// Initially create each stream with no budget. The budget will be eventually assigned by the | ||
// GetWorkBudgetDistributor. | ||
private WindmillStreamSender createAndStartWindmillStreamSender(Endpoint connection) { | ||
WindmillStreamSender windmillStreamSender = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need a dispatcher fallback in createWindmillStub
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for fallback
if we don't pass in direct endpoint we will just fallback to dispatcher stub
at some point we were thinking about supporting dispatcher as a passthrough proxy with the user worker telling it which backend to hit and this would also allow that
@@ -156,29 +162,47 @@ public void sendHealthCheck() { | |||
protected void onResponse(StreamingCommitResponse response) { | |||
commitWorkThrottleTimer.stop(); | |||
|
|||
RuntimeException finalException = null; | |||
@Nullable RuntimeException failure = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Create a new Exception here and attach all failures as suppressed?
Want to avoid marking failure from one request as suppressed of another.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
if (failure == null) { | ||
failure = e; | ||
} else { | ||
failure.addSuppressed(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There could be 1000s of requests in the queue, do we want to add all of them here? Maybe only the finalException or a subset is enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is important that we track it some how? maybe keep account of the error/exception type
it's possible that some might have a real issue but others won't
actually maybe a subset is good
this is an error consuming the commit since at this point windmill is acking that the commit was either successful or failed. so maybe we can just record the status + failure or have a map<<status, exception>, count> and just log that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
opted to record last 10 errors in detail and have a counter of <status, throwable.class> so we can keep track of any weird behavior
@@ -187,13 +211,14 @@ protected void startThrottleTimer() { | |||
commitWorkThrottleTimer.start(); | |||
} | |||
|
|||
private void flushInternal(Map<Long, PendingRequest> requests) { | |||
private void flushInternal(Map<Long, PendingRequest> requests) throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need throws InterruptedException
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we do since it is a checked exception thrown by one of the methods inside flushInternal
we handle it in flush()
wanted to make it clear the flushInternal is blocking()
if we get interrupted we interrupt the thread, but always clear the queue and queuedBytes (in the finally block)
…e StreamingEngineConnectionsState to StreamingEngineBackends
de3b016
to
4f5b381
Compare
R: @scwhittle
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.