diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java index b9648fe4ab47..9bc7c4e7d97a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.dataflow.worker.windmill.client; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Verify.verify; + import java.io.IOException; import java.io.PrintWriter; import java.util.Set; @@ -68,13 +70,14 @@ public abstract class AbstractWindmillStream implements Win // Indicates that the logical stream has been half-closed and is waiting for clean server // shutdown. private static final Status OK_STATUS = Status.fromCode(Status.Code.OK); + private static final String NEVER_RECEIVED_RESPONSE_LOG_STRING = "never received response"; protected final Sleeper sleeper; /** * Used to guard {@link #start()} and {@link #shutdown()} behavior. * - * @implNote Do not hold when performing IO. If also locking on {@code this} in the same context, - * should acquire shutdownLock first to prevent deadlocks. + * @implNote Do NOT hold when performing IO. If also locking on {@code this} in the same context, + * should acquire shutdownLock after {@code this} to prevent deadlocks. */ protected final Object shutdownLock = new Object(); @@ -94,11 +97,13 @@ public abstract class AbstractWindmillStream implements Win * #halfClose()}. Separate from {@link #clientClosed} as this is specific to the requestObserver * and is initially false on retry. */ - @GuardedBy("this") - private boolean streamClosed; + private volatile boolean streamClosed; + + @GuardedBy("shutdownLock") + private boolean isShutdown; - private volatile boolean isShutdown; - private volatile boolean started; + @GuardedBy("shutdownLock") + private boolean started; protected AbstractWindmillStream( Logger logger, @@ -132,7 +137,7 @@ protected AbstractWindmillStream( new AbstractWindmillStream.ResponseObserver())); this.sleeper = Sleeper.DEFAULT; this.logger = logger; - this.debugMetrics = new StreamDebugMetrics(); + this.debugMetrics = StreamDebugMetrics.create(); } private static String createThreadName(String streamType, String backendWorkerToken) { @@ -158,14 +163,16 @@ private static String createThreadName(String streamType, String backendWorkerTo protected abstract void startThrottleTimer(); /** Reflects that {@link #shutdown()} was explicitly called. */ - protected boolean isShutdown() { - return isShutdown; + protected boolean hasReceivedShutdownSignal() { + synchronized (shutdownLock) { + return isShutdown; + } } /** Send a request to the server. */ protected final void send(RequestT request) { synchronized (this) { - if (isShutdown) { + if (hasReceivedShutdownSignal()) { return; } @@ -175,10 +182,11 @@ protected final void send(RequestT request) { } try { + verify(!Thread.holdsLock(shutdownLock), "shutdownLock should not be held during send."); debugMetrics.recordSend(); requestObserver.onNext(request); } catch (StreamObserverCancelledException e) { - if (isShutdown) { + if (hasReceivedShutdownSignal()) { logger.debug("Stream was shutdown during send.", e); return; } @@ -210,7 +218,7 @@ private void startStream() { while (true) { try { synchronized (this) { - if (isShutdown) { + if (hasReceivedShutdownSignal()) { break; } debugMetrics.recordStart(); @@ -260,7 +268,7 @@ protected final void executeSafely(Runnable runnable) { } public final void maybeSendHealthCheck(Instant lastSendThreshold) { - if (!clientClosed && debugMetrics.lastSendTimeMs() < lastSendThreshold.getMillis()) { + if (!clientClosed && debugMetrics.getLastSendTimeMs() < lastSendThreshold.getMillis()) { try { sendHealthCheck(); } catch (RuntimeException e) { @@ -276,7 +284,6 @@ public final void maybeSendHealthCheck(Instant lastSendThreshold) { * information. Blocking sends are made beneath this stream object's lock which could block * status page rendering. */ - @SuppressWarnings("GuardedBy") public final void appendSummaryHtml(PrintWriter writer) { appendSpecificHtml(writer); StreamDebugMetrics.Snapshot summaryMetrics = debugMetrics.getSummaryMetrics(); @@ -306,7 +313,7 @@ public final void appendSummaryHtml(PrintWriter writer) { summaryMetrics.timeSinceLastSend(), summaryMetrics.timeSinceLastResponse(), streamClosed, - isShutdown, + hasReceivedShutdownSignal(), summaryMetrics.shutdownTime().orElse(null)); } @@ -331,7 +338,7 @@ public final boolean awaitTermination(int time, TimeUnit unit) throws Interrupte @Override public final Instant startTime() { - return new Instant(debugMetrics.startTimeMs()); + return new Instant(debugMetrics.getStartTimeMs()); } @Override @@ -429,8 +436,10 @@ private void recordStreamStatus(Status status) { currentErrorCount, t, status, - nowMillis - debugMetrics.startTimeMs(), - debugMetrics.responseDebugString(nowMillis)); + nowMillis - debugMetrics.getStartTimeMs(), + debugMetrics + .responseDebugString(nowMillis) + .orElse(NEVER_RECEIVED_RESPONSE_LOG_STRING)); } // If the stream was stopped due to a resource exhausted error then we are throttled. @@ -442,7 +451,7 @@ private void recordStreamStatus(Status status) { /** Returns true if the stream was torn down and should not be restarted internally. */ private synchronized boolean maybeTeardownStream() { - if (isShutdown || (clientClosed && !hasPendingRequests())) { + if (hasReceivedShutdownSignal() || (clientClosed && !hasPendingRequests())) { streamRegistry.remove(AbstractWindmillStream.this); finishLatch.countDown(); executor.shutdownNow(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableStreamObserver.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableStreamObserver.java index cbe8d5122ee7..e5b99f2b6ae6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableStreamObserver.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableStreamObserver.java @@ -40,10 +40,6 @@ final class ResettableStreamObserver implements StreamObserver { @GuardedBy("this") private @Nullable StreamObserver delegateStreamObserver; - /** - * Indicates that the request observer should no longer be used. Attempts to perform operations on - * the request observer will throw an {@link WindmillStreamShutdownException}. - */ @GuardedBy("this") private boolean isPoisoned; @@ -63,6 +59,7 @@ private synchronized StreamObserver delegate() { "requestObserver cannot be null. Missing a call to startStream() to initialize."); } + /** Creates a new delegate to use for future {@link StreamObserver} methods. */ synchronized void reset() { if (isPoisoned) { throw new WindmillStreamShutdownException("Explicit call to shutdown stream."); @@ -71,6 +68,10 @@ synchronized void reset() { delegateStreamObserver = streamObserverFactory.get(); } + /** + * Indicates that the request observer should no longer be used. Attempts to perform operations on + * the request observer will throw an {@link WindmillStreamShutdownException}. + */ synchronized void poison() { if (!isPoisoned) { isPoisoned = true; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/StreamDebugMetrics.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/StreamDebugMetrics.java index 48aaf9c5bf4b..d813b6d7c9c2 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/StreamDebugMetrics.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/StreamDebugMetrics.java @@ -19,18 +19,24 @@ import com.google.auto.value.AutoValue; import java.util.Optional; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.joda.time.DateTime; import org.joda.time.Instant; /** Records stream metrics for debugging. */ @ThreadSafe final class StreamDebugMetrics { - private final AtomicInteger restartCount = new AtomicInteger(); - private final AtomicInteger errorCount = new AtomicInteger(); + private final Supplier clock; + + @GuardedBy("this") + private int errorCount = 0; + + @GuardedBy("this") + private int restartCount = 0; @GuardedBy("this") private long sleepUntil = 0; @@ -53,12 +59,25 @@ final class StreamDebugMetrics { @GuardedBy("this") private DateTime shutdownTime = null; + private StreamDebugMetrics(Supplier clock) { + this.clock = clock; + } + + static StreamDebugMetrics create() { + return new StreamDebugMetrics(Instant::now); + } + + @VisibleForTesting + static StreamDebugMetrics forTesting(Supplier fakeClock) { + return new StreamDebugMetrics(fakeClock); + } + private static long debugDuration(long nowMs, long startMs) { return startMs <= 0 ? -1 : Math.max(0, nowMs - startMs); } - private static long nowMs() { - return Instant.now().getMillis(); + private long nowMs() { + return clock.get().getMillis(); } synchronized void recordSend() { @@ -76,14 +95,14 @@ synchronized void recordResponse() { synchronized void recordRestartReason(String error) { lastRestartReason = error; - lastRestartTime = DateTime.now(); + lastRestartTime = clock.get().toDateTime(); } - synchronized long startTimeMs() { + synchronized long getStartTimeMs() { return startTimeMs; } - synchronized long lastSendTimeMs() { + synchronized long getLastSendTimeMs() { return lastSendTimeMs; } @@ -91,38 +110,35 @@ synchronized void recordSleep(long sleepMs) { sleepUntil = nowMs() + sleepMs; } - int incrementAndGetRestarts() { - return restartCount.incrementAndGet(); + synchronized int incrementAndGetRestarts() { + return restartCount++; } - int incrementAndGetErrors() { - return errorCount.incrementAndGet(); + synchronized int incrementAndGetErrors() { + return errorCount++; } synchronized void recordShutdown() { - shutdownTime = DateTime.now(); + shutdownTime = clock.get().toDateTime(); } - synchronized String responseDebugString(long nowMillis) { + synchronized Optional responseDebugString(long nowMillis) { return lastResponseTimeMs == 0 - ? "never received response" - : "received response " + (nowMillis - lastResponseTimeMs) + "ms ago"; + ? Optional.empty() + : Optional.of("received response " + (nowMillis - lastResponseTimeMs) + "ms ago"); } - private Optional getRestartMetrics() { - if (restartCount.get() > 0) { - synchronized (this) { - return Optional.of( - RestartMetrics.create( - restartCount.get(), lastRestartReason, lastRestartTime, errorCount.get())); - } + private synchronized Optional getRestartMetrics() { + if (restartCount > 0) { + return Optional.of( + RestartMetrics.create(restartCount, lastRestartReason, lastRestartTime, errorCount)); } return Optional.empty(); } synchronized Snapshot getSummaryMetrics() { - long nowMs = Instant.now().getMillis(); + long nowMs = clock.get().getMillis(); return Snapshot.create( debugDuration(nowMs, startTimeMs), debugDuration(nowMs, lastSendTimeMs), diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java index be834bf03bbd..8711bf5850d0 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java @@ -170,7 +170,7 @@ protected void onResponse(StreamingCommitResponse response) { CommitStatus commitStatus = i < response.getStatusCount() ? response.getStatus(i) : CommitStatus.OK; if (pendingRequest == null) { - if (!isShutdown()) { + if (!hasReceivedShutdownSignal()) { // Skip responses when the stream is shutdown since they are now invalid. LOG.error("Got unknown commit request ID: {}", requestId); } @@ -225,11 +225,6 @@ private void flushInternal(Map requests) { } private void issueSingleRequest(long id, PendingRequest pendingRequest) { - if (!prepareForSend(id, pendingRequest)) { - pendingRequest.abort(); - return; - } - StreamingCommitWorkRequest.Builder requestBuilder = StreamingCommitWorkRequest.newBuilder(); requestBuilder .addCommitChunkBuilder() @@ -238,19 +233,20 @@ private void issueSingleRequest(long id, PendingRequest pendingRequest) { .setShardingKey(pendingRequest.shardingKey()) .setSerializedWorkItemCommit(pendingRequest.serializedCommit()); StreamingCommitWorkRequest chunk = requestBuilder.build(); - try { - send(chunk); - } catch (IllegalStateException e) { - // Stream was broken, request will be retried when stream is reopened. + synchronized (this) { + try { + if (!prepareForSend(id, pendingRequest)) { + pendingRequest.abort(); + return; + } + send(chunk); + } catch (IllegalStateException e) { + // Stream was broken, request will be retried when stream is reopened. + } } } private void issueBatchedRequest(Map requests) { - if (!prepareForSend(requests)) { - requests.forEach((ignored, pendingRequest) -> pendingRequest.abort()); - return; - } - StreamingCommitWorkRequest.Builder requestBuilder = StreamingCommitWorkRequest.newBuilder(); String lastComputation = null; for (Map.Entry entry : requests.entrySet()) { @@ -266,28 +262,33 @@ private void issueBatchedRequest(Map requests) { .setSerializedWorkItemCommit(request.serializedCommit()); } StreamingCommitWorkRequest request = requestBuilder.build(); - try { - send(request); - } catch (IllegalStateException e) { - // Stream was broken, request will be retried when stream is reopened. + synchronized (this) { + if (!prepareForSend(requests)) { + requests.forEach((ignored, pendingRequest) -> pendingRequest.abort()); + return; + } + try { + send(request); + } catch (IllegalStateException e) { + // Stream was broken, request will be retried when stream is reopened. + } } } private void issueMultiChunkRequest(long id, PendingRequest pendingRequest) { - if (!prepareForSend(id, pendingRequest)) { - pendingRequest.abort(); - return; - } - checkNotNull(pendingRequest.computationId(), "Cannot commit WorkItem w/o a computationId."); ByteString serializedCommit = pendingRequest.serializedCommit(); synchronized (this) { + if (!prepareForSend(id, pendingRequest)) { + pendingRequest.abort(); + return; + } + for (int i = 0; i < serializedCommit.size(); i += AbstractWindmillStream.RPC_STREAM_CHUNK_SIZE) { int end = i + AbstractWindmillStream.RPC_STREAM_CHUNK_SIZE; ByteString chunk = serializedCommit.substring(i, Math.min(end, serializedCommit.size())); - StreamingCommitRequestChunk.Builder chunkBuilder = StreamingCommitRequestChunk.newBuilder() .setRequestId(id) @@ -298,7 +299,6 @@ private void issueMultiChunkRequest(long id, PendingRequest pendingRequest) { if (remaining > 0) { chunkBuilder.setRemainingBytesForWorkItem(remaining); } - StreamingCommitWorkRequest requestChunk = StreamingCommitWorkRequest.newBuilder().addCommitChunk(chunkBuilder).build(); try { @@ -312,28 +312,24 @@ private void issueMultiChunkRequest(long id, PendingRequest pendingRequest) { } /** Returns true if prepare for send succeeded. */ - private boolean prepareForSend(long id, PendingRequest request) { + private synchronized boolean prepareForSend(long id, PendingRequest request) { synchronized (shutdownLock) { - synchronized (this) { - if (!isShutdown()) { - pending.put(id, request); - return true; - } - return false; + if (!hasReceivedShutdownSignal()) { + pending.put(id, request); + return true; } + return false; } } /** Returns true if prepare for send succeeded. */ - private boolean prepareForSend(Map requests) { + private synchronized boolean prepareForSend(Map requests) { synchronized (shutdownLock) { - synchronized (this) { - if (!isShutdown()) { - pending.putAll(requests); - return true; - } - return false; + if (!hasReceivedShutdownSignal()) { + pending.putAll(requests); + return true; } + return false; } } @@ -418,12 +414,8 @@ private Batcher() { @Override public boolean commitWorkItem( String computation, WorkItemCommitRequest commitRequest, Consumer onDone) { - if (isShutdown()) { - onDone.accept(CommitStatus.ABORTED); - return false; - } - - if (!canAccept(commitRequest.getSerializedSize() + computation.length())) { + if (!canAccept(commitRequest.getSerializedSize() + computation.length()) + || hasReceivedShutdownSignal()) { return false; } @@ -436,7 +428,7 @@ public boolean commitWorkItem( @Override public void flush() { try { - if (!isShutdown()) { + if (!hasReceivedShutdownSignal()) { flushInternal(queue); } else { queue.forEach((ignored, request) -> request.abort()); @@ -448,13 +440,9 @@ public void flush() { } void add(long id, PendingRequest request) { - if (isShutdown()) { - request.abort(); - } else { - Preconditions.checkState(canAccept(request.getBytes())); - queuedBytes += request.getBytes(); - queue.put(id, request); - } + Preconditions.checkState(canAccept(request.getBytes())); + queuedBytes += request.getBytes(); + queue.put(id, request); } private boolean canAccept(long requestBytes) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java index be9d6c6d06d6..ec26dfacc255 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java @@ -198,7 +198,7 @@ private void maybeSendRequestExtension(GetWorkBudget extension) { @Override protected synchronized void onNewStream() { workItemAssemblers.clear(); - if (!isShutdown()) { + if (!hasReceivedShutdownSignal()) { budgetTracker.reset(); GetWorkBudget initialGetWorkBudget = budgetTracker.computeBudgetExtension(); StreamingGetWorkRequest request = diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java index 6a809712bd9f..afc40ebabc17 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java @@ -156,12 +156,12 @@ private static WindmillStreamShutdownException shutdownException(QueuedRequest r @Override protected synchronized void onNewStream() { - if (isShutdown()) { + if (hasReceivedShutdownSignal()) { return; } send(StreamingGetDataRequest.newBuilder().setHeader(jobHeader).build()); - if (clientClosed && !isShutdown()) { + if (clientClosed && !hasReceivedShutdownSignal()) { // We rely on close only occurring after all methods on the stream have returned. // Since the requestKeyedData and requestGlobalData methods are blocking this // means there should be no pending requests. @@ -218,7 +218,7 @@ public GlobalData requestGlobalData(GlobalDataRequest request) { @Override public void refreshActiveWork(Map> heartbeats) { - if (isShutdown()) { + if (hasReceivedShutdownSignal()) { throw new WindmillStreamShutdownException("Unable to refresh work for shutdown stream."); } @@ -334,7 +334,7 @@ public void appendSpecificHtml(PrintWriter writer) { } private ResponseT issueRequest(QueuedRequest request, ParseFn parseFn) { - while (!isShutdown()) { + while (!hasReceivedShutdownSignal()) { request.resetResponseStream(); try { queueRequestAndWait(request); @@ -360,7 +360,7 @@ private ResponseT issueRequest(QueuedRequest request, ParseFn fakeClock = () -> aLongTimeAgo; + StreamDebugMetrics streamDebugMetrics = StreamDebugMetrics.forTesting(fakeClock); + streamDebugMetrics.recordSleep(sleepMs); + StreamDebugMetrics.Snapshot metricsSnapshot = streamDebugMetrics.getSummaryMetrics(); + assertEquals(sleepMs, metricsSnapshot.sleepLeft()); + } + + @Test + public void testSummaryMetrics_withRestarts() { + String restartReason = "something bad happened"; + StreamDebugMetrics streamDebugMetrics = StreamDebugMetrics.forTesting(Instant::now); + streamDebugMetrics.incrementAndGetErrors(); + streamDebugMetrics.incrementAndGetRestarts(); + streamDebugMetrics.recordRestartReason(restartReason); + + StreamDebugMetrics.Snapshot metricsSnapshot = streamDebugMetrics.getSummaryMetrics(); + assertTrue(metricsSnapshot.restartMetrics().isPresent()); + StreamDebugMetrics.RestartMetrics restartMetrics = metricsSnapshot.restartMetrics().get(); + assertThat(restartMetrics.lastRestartReason()).isEqualTo(restartReason); + assertThat(restartMetrics.restartCount()).isEqualTo(1); + assertThat(restartMetrics.errorCount()).isEqualTo(1); + assertThat(restartMetrics.lastRestartTime()).isLessThan(DateTime.now()); + assertThat(restartMetrics.lastRestartTime().toInstant()).isGreaterThan(Instant.EPOCH); + } + + @Test + public void testResponseDebugString_neverReceivedResponse() { + StreamDebugMetrics streamDebugMetrics = StreamDebugMetrics.forTesting(Instant::now); + assertFalse(streamDebugMetrics.responseDebugString(Instant.now().getMillis()).isPresent()); + } + + @Test + public void testResponseDebugString_withResponse() { + StreamDebugMetrics streamDebugMetrics = StreamDebugMetrics.forTesting(Instant::now); + streamDebugMetrics.recordResponse(); + assertTrue(streamDebugMetrics.responseDebugString(Instant.now().getMillis()).isPresent()); + } + + @Test + public void testGetStartTime() { + Instant aLongTimeAgo = Instant.parse("1998-09-04T00:00:00Z"); + Supplier fakeClock = () -> aLongTimeAgo; + StreamDebugMetrics streamDebugMetrics = StreamDebugMetrics.forTesting(fakeClock); + assertEquals(0, streamDebugMetrics.getStartTimeMs()); + streamDebugMetrics.recordStart(); + assertThat(streamDebugMetrics.getStartTimeMs()).isEqualTo(aLongTimeAgo.getMillis()); + } + + @Test + public void testGetLastSendTime() { + Instant aLongTimeAgo = Instant.parse("1998-09-04T00:00:00Z"); + Supplier fakeClock = () -> aLongTimeAgo; + StreamDebugMetrics streamDebugMetrics = StreamDebugMetrics.forTesting(fakeClock); + assertEquals(0, streamDebugMetrics.getLastSendTimeMs()); + streamDebugMetrics.recordSend(); + assertThat(streamDebugMetrics.getLastSendTimeMs()).isEqualTo(aLongTimeAgo.getMillis()); + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java index 3baa31585a09..df6946ea8763 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java @@ -165,7 +165,7 @@ public void testCommitWorkItem_afterShutdownFalse() { Set commitStatuses = new HashSet<>(); assertFalse( batcher.commitWorkItem(COMPUTATION_ID, workItemCommitRequest(i), commitStatuses::add)); - assertThat(commitStatuses).containsExactly(Windmill.CommitStatus.ABORTED); + assertThat(commitStatuses).isEmpty(); } } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStreamTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStreamTest.java index 8a37958700c9..6584ed1c5ae6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStreamTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStreamTest.java @@ -117,8 +117,8 @@ public void setUp() throws IOException { @After public void cleanUp() { - inProcessChannel.shutdownNow(); checkNotNull(stream).shutdown(); + inProcessChannel.shutdownNow(); } private GrpcDirectGetWorkStream createGetWorkStream(