diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java index bc93e6d89c41..22fba91e170a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java @@ -33,7 +33,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.streaming.Work; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub.RpcException; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub.WindmillRpcException; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream; import org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter; import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.GetDataClient; @@ -199,7 +199,7 @@ private void applianceDispatchLoop(Supplier getWorkFn) if (workResponse.getWorkCount() > 0) { break; } - } catch (RpcException e) { + } catch (WindmillRpcException e) { LOG.warn("GetWork failed, retrying:", e); } sleepUninterruptibly(backoff, TimeUnit.MILLISECONDS); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java index cd753cb8ec91..2ae97087fec7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java @@ -30,10 +30,16 @@ public abstract class WindmillServerStub @Override public void appendSummaryHtml(PrintWriter writer) {} - /** Generic Exception type for implementors to use to represent errors while making RPCs. */ - public static final class RpcException extends RuntimeException { - public RpcException(Throwable cause) { + /** + * Generic Exception type for implementors to use to represent errors while making Windmill RPCs. + */ + public static final class WindmillRpcException extends RuntimeException { + public WindmillRpcException(Throwable cause) { super(cause); } + + public WindmillRpcException(String message, Throwable cause) { + super(message, cause); + } } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java index 98566e0a9d39..df34797b647a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java @@ -326,7 +326,6 @@ public final void shutdown() { // Don't lock on "this" before poisoning the request observer since otherwise the observer may // be blocking in send(). requestObserver.poison(); - isShutdown = true; synchronized (this) { if (!isShutdown) { isShutdown = true; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java index 17f65f56c984..5eb691cbf55a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java @@ -121,7 +121,7 @@ public void onNext(T t) throws StreamClosedException, WindmillStreamShutdownExce try { delegate.onError(e); - } catch (RuntimeException ignored) { + } catch (IllegalStateException ignored) { // If the delegate above was already terminated via onError or onComplete from another // thread. logger.warn("StreamObserver was previously cancelled.", e); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java index 6951a6cdf772..2dd069b9c443 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import java.util.function.Function; +import javax.annotation.Nullable; import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitStatus; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader; @@ -161,15 +162,19 @@ public void sendHealthCheck() throws WindmillStreamShutdownException { protected void onResponse(StreamingCommitResponse response) { commitWorkThrottleTimer.stop(); - CommitCompletionException failures = new CommitCompletionException(); + CommitCompletionFailureHandler failureHandler = new CommitCompletionFailureHandler(); for (int i = 0; i < response.getRequestIdCount(); ++i) { long requestId = response.getRequestId(i); if (requestId == HEARTBEAT_REQUEST_ID) { continue; } - PendingRequest pendingRequest = pending.remove(requestId); + + // From windmill.proto: Indices must line up with the request_id field, but trailing OKs may + // be omitted. CommitStatus commitStatus = i < response.getStatusCount() ? response.getStatus(i) : CommitStatus.OK; + + @Nullable PendingRequest pendingRequest = pending.remove(requestId); if (pendingRequest == null) { synchronized (this) { if (!isShutdown) { @@ -185,12 +190,12 @@ protected void onResponse(StreamingCommitResponse response) { // other commits from being processed. Aggregate all the failures to throw after // processing the response if they exist. LOG.warn("Exception while processing commit response.", e); - failures.addError(commitStatus, e); + failureHandler.addError(commitStatus, e); } } } - failures.throwIfNonEmpty(); + failureHandler.throwIfNonEmpty(); } @Override @@ -362,12 +367,17 @@ private void abort() { } private static class CommitCompletionException extends RuntimeException { + private CommitCompletionException(String message) { + super(message); + } + } + + private static class CommitCompletionFailureHandler { private static final int MAX_PRINTABLE_ERRORS = 10; private final Map>, Integer> errorCounter; private final EvictingQueue detailedErrors; - private CommitCompletionException() { - super("Exception while processing commit response."); + private CommitCompletionFailureHandler() { this.errorCounter = new HashMap<>(); this.detailedErrors = EvictingQueue.create(MAX_PRINTABLE_ERRORS); } @@ -381,19 +391,13 @@ private void addError(CommitStatus commitStatus, Throwable error) { private void throwIfNonEmpty() { if (!errorCounter.isEmpty()) { - throw this; + String errorMessage = + String.format( + "Exception while processing commit response. ErrorCounter: %s; Details: %s", + errorCounter, detailedErrors); + throw new CommitCompletionException(errorMessage); } } - - @Override - public final String getMessage() { - return "CommitCompletionException{" - + "errorCounter=" - + errorCounter - + ", detailedErrors=" - + detailedErrors - + '}'; - } } private class Batcher implements CommitWorkStream.RequestBatcher { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java index f848489f51c6..27f457900e6c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java @@ -235,9 +235,7 @@ public void sendHealthCheck() throws WindmillStreamShutdownException { } @Override - protected void shutdownInternal() { - workItemAssemblers.clear(); - } + protected void shutdownInternal() {} @Override protected void onResponse(StreamingGetWorkResponseChunk chunk) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java index 26a87e1c805b..19eb6dd4915a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Verify.verify; import java.io.IOException; import java.io.InputStream; @@ -56,7 +57,6 @@ import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer; import org.apache.beam.sdk.util.BackOff; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Verify; import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -166,7 +166,7 @@ protected synchronized void onNewStream() throws WindmillStreamShutdownException // We rely on close only occurring after all methods on the stream have returned. // Since the requestKeyedData and requestGlobalData methods are blocking this // means there should be no pending requests. - verify(!hasPendingRequests(), "Pending requests not expected on stream restart."); + verify(!hasPendingRequests(), "Pending requests not expected if we've half-closed."); } else { for (AppendableInputStream responseStream : pending.values()) { responseStream.cancel(); @@ -188,7 +188,9 @@ protected void onResponse(StreamingGetDataResponse chunk) { for (int i = 0; i < chunk.getRequestIdCount(); ++i) { AppendableInputStream responseStream = pending.get(chunk.getRequestId(i)); - verify(responseStream != null, "No pending response stream"); + synchronized (this) { + verify(responseStream != null || isShutdown, "No pending response stream"); + } responseStream.append(chunk.getSerializedResponse(i).newInput()); if (chunk.getRemainingBytesForResponse() == 0) { responseStream.complete(); @@ -222,12 +224,6 @@ public GlobalData requestGlobalData(GlobalDataRequest request) @Override public void refreshActiveWork(Map> heartbeats) throws WindmillStreamShutdownException { - synchronized (this) { - if (isShutdown) { - throw new WindmillStreamShutdownException("Unable to refresh work for shutdown stream."); - } - } - StreamingGetDataRequest.Builder builder = StreamingGetDataRequest.newBuilder(); if (sendKeyedGetDataRequests) { long builderBytes = 0; @@ -302,7 +298,7 @@ public void sendHealthCheck() throws WindmillStreamShutdownException { } @Override - protected void shutdownInternal() { + protected synchronized void shutdownInternal() { // Stream has been explicitly closed. Drain pending input streams and request batches. // Future calls to send RPCs will fail. pending.values().forEach(AppendableInputStream::cancel); @@ -341,13 +337,13 @@ public void appendSpecificHtml(PrintWriter writer) { private ResponseT issueRequest(QueuedRequest request, ParseFn parseFn) throws WindmillStreamShutdownException { - while (!isShutdownLocked()) { + while (true) { request.resetResponseStream(); try { queueRequestAndWait(request); return parseFn.parse(request.getResponseStream()); } catch (AppendableInputStream.InvalidInputStreamStateException | CancellationException e) { - handleShutdown(request, e); + throwIfShutdown(request, e); if (!(e instanceof CancellationException)) { throw e; } @@ -355,17 +351,15 @@ private ResponseT issueRequest(QueuedRequest request, ParseFn { ResponseT parse(InputStream input) throws IOException; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java index 770cd616a5ac..f35b9b23d091 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java @@ -290,13 +290,13 @@ private ResponseT callWithBackoff(Supplier function) { e.getStatus()); } if (!BackOffUtils.next(Sleeper.DEFAULT, backoff)) { - throw new RpcException(e); + throw new WindmillRpcException(e); } } catch (IOException | InterruptedException i) { if (i instanceof InterruptedException) { Thread.currentThread().interrupt(); } - RpcException rpcException = new RpcException(e); + WindmillRpcException rpcException = new WindmillRpcException(e); rpcException.addSuppressed(i); throw rpcException; } @@ -310,7 +310,7 @@ public GetWorkResponse getWork(GetWorkRequest request) { return callWithBackoff(() -> syncApplianceStub.getWork(request)); } - throw new RpcException(unsupportedUnaryRequestInStreamingEngineException("GetWork")); + throw new WindmillRpcException(unsupportedUnaryRequestInStreamingEngineException("GetWork")); } @Override @@ -319,7 +319,7 @@ public GetDataResponse getData(GetDataRequest request) { return callWithBackoff(() -> syncApplianceStub.getData(request)); } - throw new RpcException(unsupportedUnaryRequestInStreamingEngineException("GetData")); + throw new WindmillRpcException(unsupportedUnaryRequestInStreamingEngineException("GetData")); } @Override @@ -327,7 +327,7 @@ public CommitWorkResponse commitWork(CommitWorkRequest request) { if (syncApplianceStub != null) { return callWithBackoff(() -> syncApplianceStub.commitWork(request)); } - throw new RpcException(unsupportedUnaryRequestInStreamingEngineException("CommitWork")); + throw new WindmillRpcException(unsupportedUnaryRequestInStreamingEngineException("CommitWork")); } /** @@ -382,7 +382,7 @@ public GetConfigResponse getConfig(GetConfigRequest request) { return callWithBackoff(() -> syncApplianceStub.getConfig(request)); } - throw new RpcException( + throw new WindmillRpcException( new UnsupportedOperationException("GetConfig not supported in Streaming Engine.")); } @@ -392,7 +392,7 @@ public ReportStatsResponse reportStats(ReportStatsRequest request) { return callWithBackoff(() -> syncApplianceStub.reportStats(request)); } - throw new RpcException( + throw new WindmillRpcException( new UnsupportedOperationException("ReportStats not supported in Streaming Engine.")); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java index 582acce17aae..3a289e4dd48b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java @@ -22,8 +22,10 @@ import java.util.concurrent.TimeoutException; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub.WindmillRpcException; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.CallStreamObserver; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,16 +85,14 @@ public void onNext(T value) throws StreamObserverCancelledException { // Phaser is terminated so don't use the outboundObserver. Since onError and onCompleted // are synchronized after terminating the phaser if we observe that the phaser is not // terminated the onNext calls below are guaranteed to not be called on a closed observer. - if (currentPhase < 0) return; + if (currentPhase < 0) { + throw new StreamObserverCancelledException("StreamObserver was terminated."); + } // If we awaited previously and timed out, wait for the same phase. Otherwise we're // careful to observe the phase before observing isReady. if (awaitPhase < 0) { - awaitPhase = isReadyNotifier.getPhase(); - // If getPhase() returns a value less than 0, the phaser has been terminated. - if (awaitPhase < 0) { - return; - } + awaitPhase = currentPhase; } // We only check isReady periodically to effectively allow for increasing the outbound @@ -128,7 +128,9 @@ public void onNext(T value) throws StreamObserverCancelledException { // Phaser is terminated so don't use the outboundObserver. Since onError and onCompleted // are synchronized after terminating the phaser if we observe that the phaser is not // terminated the onNext calls below are guaranteed to not be called on a closed observer. - if (currentPhase < 0) return; + if (currentPhase < 0) { + throw new StreamObserverCancelledException("StreamObserver was terminated."); + } messagesSinceReady = 0; outboundObserver.onNext(value); return; @@ -138,7 +140,7 @@ public void onNext(T value) throws StreamObserverCancelledException { if (totalSecondsWaited > deadlineSeconds) { String errorMessage = constructStreamCancelledErrorMessage(totalSecondsWaited); LOG.error(errorMessage); - throw new StreamObserverCancelledException(errorMessage, e); + throw new WindmillRpcException(errorMessage, e); } if (totalSecondsWaited > OUTPUT_CHANNEL_CONSIDERED_STALLED_SECONDS) { @@ -146,7 +148,6 @@ public void onNext(T value) throws StreamObserverCancelledException { "Output channel stalled for {}s, outbound thread {}.", totalSecondsWaited, Thread.currentThread().getName()); - Thread.dumpStack(); } waitSeconds = waitSeconds * 2; @@ -161,19 +162,27 @@ public void onNext(T value) throws StreamObserverCancelledException { public void onError(Throwable t) { isReadyNotifier.forceTermination(); synchronized (lock) { - isClosed = true; + markClosedOrThrow(); outboundObserver.onError(t); } } @Override public void onCompleted() { + isReadyNotifier.forceTermination(); synchronized (lock) { - isClosed = true; + markClosedOrThrow(); outboundObserver.onCompleted(); } } + private void markClosedOrThrow() { + synchronized (lock) { + Preconditions.checkState(!isClosed); + isClosed = true; + } + } + @Override public void terminate(Throwable terminationException) { // Free the blocked threads in onNext(). diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/TerminatingStreamObserver.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/TerminatingStreamObserver.java index fb2555c8454f..5fb4f95e3e1e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/TerminatingStreamObserver.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/TerminatingStreamObserver.java @@ -23,6 +23,13 @@ @Internal public interface TerminatingStreamObserver extends StreamObserver { - /** Terminates the StreamObserver. */ + /** + * Terminates the StreamObserver. + * + * @implSpec Different then {@link #onError(Throwable)} and {@link #onCompleted()} which can only + * be called once during the lifetime of each {@link StreamObserver}, terminate() + * implementations are meant to be idempotent and can be called multiple times as well as + * being interleaved with other stream operations. + */ void terminate(Throwable terminationException); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStreamTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStreamTest.java index aada071416de..036158f5289e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStreamTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStreamTest.java @@ -28,12 +28,14 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory; import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.CallStreamObserver; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -104,6 +106,10 @@ public void setMessageCompression(boolean b) {} () -> assertThrows(WindmillStreamShutdownException.class, () -> testStream.testSend(1))); testStream.shutdown(); + + // Sleep a bit to give sendExecutor time to execute the send(). + Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS); + sendBlocker.countDown(); assertThat(sendFuture.get()).isInstanceOf(WindmillStreamShutdownException.class); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java index b6454d319f9f..316ff76eb929 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java @@ -20,6 +20,7 @@ import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -33,6 +34,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverCancelledException; import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; @@ -51,9 +53,12 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.InOrder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @RunWith(JUnit4.class) public class GrpcCommitWorkStreamTest { + private static final Logger LOG = LoggerFactory.getLogger(GrpcCommitWorkStreamTest.class); private static final String FAKE_SERVER_NAME = "Fake server for GrpcCommitWorkStreamTest"; private static final Windmill.JobHeader TEST_JOB_HEADER = Windmill.JobHeader.newBuilder() @@ -131,13 +136,27 @@ public void testShutdown_abortsQueuedCommits() throws InterruptedException { commitProcessed.countDown(); }); } + } catch (StreamObserverCancelledException ignored) { } // Verify that we sent the commits above in a request + the initial header. - verify(requestObserver, times(2)).onNext(any(Windmill.StreamingCommitWorkRequest.class)); + verify(requestObserver, times(2)) + .onNext( + argThat( + request -> { + if (request.getHeader().equals(TEST_JOB_HEADER)) { + LOG.info("Header received."); + return true; + } else if (!request.getCommitChunkList().isEmpty()) { + LOG.info("Chunk received."); + return true; + } else { + LOG.error("Incorrect request."); + return false; + } + })); // We won't get responses so we will have some pending requests. assertTrue(commitWorkStream.hasPendingRequests()); - commitWorkStream.shutdown(); commitProcessed.await(); @@ -198,7 +217,7 @@ public void testSend_notCalledAfterShutdown() { // the header, which happens before we shutdown. requestObserverVerifier .verify(requestObserver) - .onNext(any(Windmill.StreamingCommitWorkRequest.class)); + .onNext(argThat(request -> request.getHeader().equals(TEST_JOB_HEADER))); requestObserverVerifier.verify(requestObserver).onError(any()); requestObserverVerifier.verifyNoMoreInteractions(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamRequestsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamRequestsTest.java index d8b787fe1020..a6120f4052b3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamRequestsTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamRequestsTest.java @@ -24,9 +24,11 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamShutdownException; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -142,7 +144,8 @@ public void testQueuedBatch_notifyFailed_throwsWindmillStreamShutdownExceptionOn assertThrows( WindmillStreamShutdownException.class, queuedBatch::waitForSendOrFailNotification)); - + // Wait a few seconds for the above future to get scheduled and run. + Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS); queuedBatch.notifyFailed(); waitFuture.join(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamTest.java index 0ce455ac1270..252a73c92319 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamTest.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; import static com.google.common.truth.Truth.assertThat; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -28,6 +29,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; import javax.annotation.Nullable; @@ -44,6 +46,7 @@ import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.testing.GrpcCleanupRule; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.util.MutableHandlerRegistry; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -102,6 +105,48 @@ private GrpcGetDataStream createGetDataStream(GetDataStreamTestStub testStub) { return getDataStream; } + @Test + public void testRequestKeyedData() { + GetDataStreamTestStub testStub = + new GetDataStreamTestStub(new TestGetDataStreamRequestObserver()); + GrpcGetDataStream getDataStream = createGetDataStream(testStub); + // These will block until they are successfully sent. + CompletableFuture sendFuture = + CompletableFuture.supplyAsync( + () -> { + try { + return getDataStream.requestKeyedData( + "computationId", + Windmill.KeyedGetDataRequest.newBuilder() + .setKey(ByteString.EMPTY) + .setShardingKey(1) + .setCacheToken(1) + .setWorkToken(1) + .build()); + } catch (WindmillStreamShutdownException e) { + throw new RuntimeException(e); + } + }); + + // Sleep a bit to allow future to run. + Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS); + + Windmill.KeyedGetDataResponse response = + Windmill.KeyedGetDataResponse.newBuilder() + .setShardingKey(1) + .setKey(ByteString.EMPTY) + .build(); + + testStub.injectResponse( + Windmill.StreamingGetDataResponse.newBuilder() + .addRequestId(1) + .addSerializedResponse(response.toByteString()) + .setRemainingBytesForResponse(0) + .build()); + + assertThat(sendFuture.join()).isEqualTo(response); + } + @Test public void testRequestKeyedData_sendOnShutdownStreamThrowsWindmillStreamShutdownException() { GetDataStreamTestStub testStub = @@ -206,5 +251,9 @@ public StreamObserver getDataStream( return requestObserver; } + + private void injectResponse(Windmill.StreamingGetDataResponse getDataResponse) { + checkNotNull(responseObserver).onNext(getDataResponse); + } } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java index bc978a72a8d3..4f0552959ee1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java @@ -473,7 +473,6 @@ private void flushResponse() { responseObserver.onNext(responseBuilder.build()); } catch (Exception e) { // Stream is already closed. - LOG.warn("trieu: ", e); LOG.warn(Arrays.toString(e.getStackTrace())); } responseBuilder.clear(); @@ -514,9 +513,7 @@ private void flushResponse() { done.countDown(); }); } - while (done.await(5, TimeUnit.SECONDS)) { - LOG.info("trieu: {}", done.getCount()); - } + while (done.await(5, TimeUnit.SECONDS)) {} stream.halfClose(); assertTrue(stream.awaitTermination(60, TimeUnit.SECONDS)); executor.shutdown(); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserverTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserverTest.java index ee0a9280610a..374c5aec3b5b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserverTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserverTest.java @@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub; import org.apache.beam.sdk.fn.stream.AdvancingPhaser; import org.apache.beam.vendor.grpc.v1p60p1.com.google.common.base.VerifyException; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.CallStreamObserver; @@ -122,15 +123,16 @@ public void testOnNext_timeOut() throws ExecutionException, InterruptedException new DirectStreamObserver<>(new AdvancingPhaser(1), delegate, 1, 1); ExecutorService onNextExecutor = Executors.newSingleThreadExecutor(); CountDownLatch streamObserverExitLatch = new CountDownLatch(1); - Future onNextFuture = + Future onNextFuture = onNextExecutor.submit( () -> { // Won't block on the first one. streamObserver.onNext(1); // We will check isReady on the next message, will block here. - StreamObserverCancelledException e = + WindmillServerStub.WindmillRpcException e = assertThrows( - StreamObserverCancelledException.class, () -> streamObserver.onNext(1)); + WindmillServerStub.WindmillRpcException.class, + () -> streamObserver.onNext(1)); streamObserverExitLatch.countDown(); return e; });