Skip to content

Commit

Permalink
address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
m-trieu committed Nov 13, 2024
1 parent 4a9a863 commit 95a19f4
Show file tree
Hide file tree
Showing 16 changed files with 166 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
import org.apache.beam.runners.dataflow.worker.streaming.Work;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub.RpcException;
import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub.WindmillRpcException;
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream;
import org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter;
import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.GetDataClient;
Expand Down Expand Up @@ -199,7 +199,7 @@ private void applianceDispatchLoop(Supplier<Windmill.GetWorkResponse> getWorkFn)
if (workResponse.getWorkCount() > 0) {
break;
}
} catch (RpcException e) {
} catch (WindmillRpcException e) {
LOG.warn("GetWork failed, retrying:", e);
}
sleepUninterruptibly(backoff, TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,16 @@ public abstract class WindmillServerStub
@Override
public void appendSummaryHtml(PrintWriter writer) {}

/** Generic Exception type for implementors to use to represent errors while making RPCs. */
public static final class RpcException extends RuntimeException {
public RpcException(Throwable cause) {
/**
* Generic Exception type for implementors to use to represent errors while making Windmill RPCs.
*/
public static final class WindmillRpcException extends RuntimeException {
public WindmillRpcException(Throwable cause) {
super(cause);
}

public WindmillRpcException(String message, Throwable cause) {
super(message, cause);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,6 @@ public final void shutdown() {
// Don't lock on "this" before poisoning the request observer since otherwise the observer may
// be blocking in send().
requestObserver.poison();
isShutdown = true;
synchronized (this) {
if (!isShutdown) {
isShutdown = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void onNext(T t) throws StreamClosedException, WindmillStreamShutdownExce

try {
delegate.onError(e);
} catch (RuntimeException ignored) {
} catch (IllegalStateException ignored) {
// If the delegate above was already terminated via onError or onComplete from another
// thread.
logger.warn("StreamObserver was previously cancelled.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitStatus;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
Expand Down Expand Up @@ -161,15 +162,19 @@ public void sendHealthCheck() throws WindmillStreamShutdownException {
protected void onResponse(StreamingCommitResponse response) {
commitWorkThrottleTimer.stop();

CommitCompletionException failures = new CommitCompletionException();
CommitCompletionFailureHandler failureHandler = new CommitCompletionFailureHandler();
for (int i = 0; i < response.getRequestIdCount(); ++i) {
long requestId = response.getRequestId(i);
if (requestId == HEARTBEAT_REQUEST_ID) {
continue;
}
PendingRequest pendingRequest = pending.remove(requestId);

// From windmill.proto: Indices must line up with the request_id field, but trailing OKs may
// be omitted.
CommitStatus commitStatus =
i < response.getStatusCount() ? response.getStatus(i) : CommitStatus.OK;

@Nullable PendingRequest pendingRequest = pending.remove(requestId);
if (pendingRequest == null) {
synchronized (this) {
if (!isShutdown) {
Expand All @@ -185,12 +190,12 @@ protected void onResponse(StreamingCommitResponse response) {
// other commits from being processed. Aggregate all the failures to throw after
// processing the response if they exist.
LOG.warn("Exception while processing commit response.", e);
failures.addError(commitStatus, e);
failureHandler.addError(commitStatus, e);
}
}
}

failures.throwIfNonEmpty();
failureHandler.throwIfNonEmpty();
}

@Override
Expand Down Expand Up @@ -362,12 +367,17 @@ private void abort() {
}

private static class CommitCompletionException extends RuntimeException {
private CommitCompletionException(String message) {
super(message);
}
}

private static class CommitCompletionFailureHandler {
private static final int MAX_PRINTABLE_ERRORS = 10;
private final Map<Pair<CommitStatus, Class<? extends Throwable>>, Integer> errorCounter;
private final EvictingQueue<Throwable> detailedErrors;

private CommitCompletionException() {
super("Exception while processing commit response.");
private CommitCompletionFailureHandler() {
this.errorCounter = new HashMap<>();
this.detailedErrors = EvictingQueue.create(MAX_PRINTABLE_ERRORS);
}
Expand All @@ -381,19 +391,13 @@ private void addError(CommitStatus commitStatus, Throwable error) {

private void throwIfNonEmpty() {
if (!errorCounter.isEmpty()) {
throw this;
String errorMessage =
String.format(
"Exception while processing commit response. ErrorCounter: %s; Details: %s",
errorCounter, detailedErrors);
throw new CommitCompletionException(errorMessage);
}
}

@Override
public final String getMessage() {
return "CommitCompletionException{"
+ "errorCounter="
+ errorCounter
+ ", detailedErrors="
+ detailedErrors
+ '}';
}
}

private class Batcher implements CommitWorkStream.RequestBatcher {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,7 @@ public void sendHealthCheck() throws WindmillStreamShutdownException {
}

@Override
protected void shutdownInternal() {
workItemAssemblers.clear();
}
protected void shutdownInternal() {}

@Override
protected void onResponse(StreamingGetWorkResponseChunk chunk) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Verify.verify;

import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -56,7 +57,6 @@
import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Verify;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -166,7 +166,7 @@ protected synchronized void onNewStream() throws WindmillStreamShutdownException
// We rely on close only occurring after all methods on the stream have returned.
// Since the requestKeyedData and requestGlobalData methods are blocking this
// means there should be no pending requests.
verify(!hasPendingRequests(), "Pending requests not expected on stream restart.");
verify(!hasPendingRequests(), "Pending requests not expected if we've half-closed.");
} else {
for (AppendableInputStream responseStream : pending.values()) {
responseStream.cancel();
Expand All @@ -188,7 +188,9 @@ protected void onResponse(StreamingGetDataResponse chunk) {

for (int i = 0; i < chunk.getRequestIdCount(); ++i) {
AppendableInputStream responseStream = pending.get(chunk.getRequestId(i));
verify(responseStream != null, "No pending response stream");
synchronized (this) {
verify(responseStream != null || isShutdown, "No pending response stream");
}
responseStream.append(chunk.getSerializedResponse(i).newInput());
if (chunk.getRemainingBytesForResponse() == 0) {
responseStream.complete();
Expand Down Expand Up @@ -222,12 +224,6 @@ public GlobalData requestGlobalData(GlobalDataRequest request)
@Override
public void refreshActiveWork(Map<String, Collection<HeartbeatRequest>> heartbeats)
throws WindmillStreamShutdownException {
synchronized (this) {
if (isShutdown) {
throw new WindmillStreamShutdownException("Unable to refresh work for shutdown stream.");
}
}

StreamingGetDataRequest.Builder builder = StreamingGetDataRequest.newBuilder();
if (sendKeyedGetDataRequests) {
long builderBytes = 0;
Expand Down Expand Up @@ -302,7 +298,7 @@ public void sendHealthCheck() throws WindmillStreamShutdownException {
}

@Override
protected void shutdownInternal() {
protected synchronized void shutdownInternal() {
// Stream has been explicitly closed. Drain pending input streams and request batches.
// Future calls to send RPCs will fail.
pending.values().forEach(AppendableInputStream::cancel);
Expand Down Expand Up @@ -341,31 +337,29 @@ public void appendSpecificHtml(PrintWriter writer) {

private <ResponseT> ResponseT issueRequest(QueuedRequest request, ParseFn<ResponseT> parseFn)
throws WindmillStreamShutdownException {
while (!isShutdownLocked()) {
while (true) {
request.resetResponseStream();
try {
queueRequestAndWait(request);
return parseFn.parse(request.getResponseStream());
} catch (AppendableInputStream.InvalidInputStreamStateException | CancellationException e) {
handleShutdown(request, e);
throwIfShutdown(request, e);
if (!(e instanceof CancellationException)) {
throw e;
}
} catch (IOException e) {
LOG.error("Parsing GetData response failed: ", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
handleShutdown(request, e);
throwIfShutdown(request, e);
throw new RuntimeException(e);
} finally {
pending.remove(request.id());
}
}

throw shutdownExceptionFor(request);
}

private synchronized void handleShutdown(QueuedRequest request, Throwable cause)
private synchronized void throwIfShutdown(QueuedRequest request, Throwable cause)
throws WindmillStreamShutdownException {
if (isShutdown) {
WindmillStreamShutdownException shutdownException = shutdownExceptionFor(request);
Expand Down Expand Up @@ -473,14 +467,6 @@ private void sendBatch(QueuedBatch batch) throws WindmillStreamShutdownException
}
}

private synchronized void verify(boolean condition, String message) {
Verify.verify(condition || isShutdown, message);
}

private synchronized boolean isShutdownLocked() {
return isShutdown;
}

@FunctionalInterface
private interface ParseFn<ResponseT> {
ResponseT parse(InputStream input) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,13 +290,13 @@ private <ResponseT> ResponseT callWithBackoff(Supplier<ResponseT> function) {
e.getStatus());
}
if (!BackOffUtils.next(Sleeper.DEFAULT, backoff)) {
throw new RpcException(e);
throw new WindmillRpcException(e);
}
} catch (IOException | InterruptedException i) {
if (i instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
RpcException rpcException = new RpcException(e);
WindmillRpcException rpcException = new WindmillRpcException(e);
rpcException.addSuppressed(i);
throw rpcException;
}
Expand All @@ -310,7 +310,7 @@ public GetWorkResponse getWork(GetWorkRequest request) {
return callWithBackoff(() -> syncApplianceStub.getWork(request));
}

throw new RpcException(unsupportedUnaryRequestInStreamingEngineException("GetWork"));
throw new WindmillRpcException(unsupportedUnaryRequestInStreamingEngineException("GetWork"));
}

@Override
Expand All @@ -319,15 +319,15 @@ public GetDataResponse getData(GetDataRequest request) {
return callWithBackoff(() -> syncApplianceStub.getData(request));
}

throw new RpcException(unsupportedUnaryRequestInStreamingEngineException("GetData"));
throw new WindmillRpcException(unsupportedUnaryRequestInStreamingEngineException("GetData"));
}

@Override
public CommitWorkResponse commitWork(CommitWorkRequest request) {
if (syncApplianceStub != null) {
return callWithBackoff(() -> syncApplianceStub.commitWork(request));
}
throw new RpcException(unsupportedUnaryRequestInStreamingEngineException("CommitWork"));
throw new WindmillRpcException(unsupportedUnaryRequestInStreamingEngineException("CommitWork"));
}

/**
Expand Down Expand Up @@ -382,7 +382,7 @@ public GetConfigResponse getConfig(GetConfigRequest request) {
return callWithBackoff(() -> syncApplianceStub.getConfig(request));
}

throw new RpcException(
throw new WindmillRpcException(
new UnsupportedOperationException("GetConfig not supported in Streaming Engine."));
}

Expand All @@ -392,7 +392,7 @@ public ReportStatsResponse reportStats(ReportStatsRequest request) {
return callWithBackoff(() -> syncApplianceStub.reportStats(request));
}

throw new RpcException(
throw new WindmillRpcException(
new UnsupportedOperationException("ReportStats not supported in Streaming Engine."));
}

Expand Down
Loading

0 comments on commit 95a19f4

Please sign in to comment.