diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java index 37476546b6cc..b9648fe4ab47 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java @@ -276,21 +276,38 @@ public final void maybeSendHealthCheck(Instant lastSendThreshold) { * information. Blocking sends are made beneath this stream object's lock which could block * status page rendering. */ + @SuppressWarnings("GuardedBy") public final void appendSummaryHtml(PrintWriter writer) { appendSpecificHtml(writer); - debugMetrics.printRestartsHtml(writer); + StreamDebugMetrics.Snapshot summaryMetrics = debugMetrics.getSummaryMetrics(); + summaryMetrics + .restartMetrics() + .ifPresent( + metrics -> + writer.format( + ", %d restarts, last restart reason [ %s ] at [%s], %d errors", + metrics.restartCount(), + metrics.lastRestartReason(), + metrics.lastRestartTime(), + metrics.errorCount())); + if (clientClosed) { writer.write(", client closed"); } - long nowMs = Instant.now().getMillis(); - long sleepLeft = debugMetrics.sleepLeft(); - if (sleepLeft > 0) { - writer.format(", %dms backoff remaining", sleepLeft); + + if (summaryMetrics.sleepLeft() > 0) { + writer.format(", %dms backoff remaining", summaryMetrics.sleepLeft()); } - debugMetrics.printSummaryHtml(writer, nowMs); + writer.format( - ", closed: %s, " + "isShutdown: %s, shutdown time: %s", - streamClosed, isShutdown, debugMetrics.shutdownTime()); + ", current stream is %dms old, last send %dms, last response %dms, closed: %s, " + + "isShutdown: %s, shutdown time: %s", + summaryMetrics.streamAge(), + summaryMetrics.timeSinceLastSend(), + summaryMetrics.timeSinceLastResponse(), + streamClosed, + isShutdown, + summaryMetrics.shutdownTime().orElse(null)); } /** diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/StreamDebugMetrics.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/StreamDebugMetrics.java index 9f5da3b417ff..48aaf9c5bf4b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/StreamDebugMetrics.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/StreamDebugMetrics.java @@ -17,8 +17,10 @@ */ package org.apache.beam.runners.dataflow.worker.windmill.client; -import java.io.PrintWriter; +import com.google.auto.value.AutoValue; +import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; import org.joda.time.DateTime; @@ -89,10 +91,6 @@ synchronized void recordSleep(long sleepMs) { sleepUntil = nowMs() + sleepMs; } - synchronized long sleepLeft() { - return sleepUntil - nowMs(); - } - int incrementAndGetRestarts() { return restartCount.incrementAndGet(); } @@ -111,25 +109,74 @@ synchronized String responseDebugString(long nowMillis) { : "received response " + (nowMillis - lastResponseTimeMs) + "ms ago"; } - void printRestartsHtml(PrintWriter writer) { + private Optional getRestartMetrics() { if (restartCount.get() > 0) { synchronized (this) { - writer.format( - ", %d restarts, last restart reason [ %s ] at [%s], %d errors", - restartCount.get(), lastRestartReason, lastRestartTime, errorCount.get()); + return Optional.of( + RestartMetrics.create( + restartCount.get(), lastRestartReason, lastRestartTime, errorCount.get())); } } - } - synchronized DateTime shutdownTime() { - return shutdownTime; + return Optional.empty(); } - synchronized void printSummaryHtml(PrintWriter writer, long nowMs) { - writer.format( - ", current stream is %dms old, last send %dms, last response %dms", + synchronized Snapshot getSummaryMetrics() { + long nowMs = Instant.now().getMillis(); + return Snapshot.create( debugDuration(nowMs, startTimeMs), debugDuration(nowMs, lastSendTimeMs), - debugDuration(nowMs, lastResponseTimeMs)); + debugDuration(nowMs, lastResponseTimeMs), + getRestartMetrics(), + sleepUntil - nowMs(), + shutdownTime); + } + + @AutoValue + abstract static class Snapshot { + private static Snapshot create( + long streamAge, + long timeSinceLastSend, + long timeSinceLastResponse, + Optional restartMetrics, + long sleepLeft, + @Nullable DateTime shutdownTime) { + return new AutoValue_StreamDebugMetrics_Snapshot( + streamAge, + timeSinceLastSend, + timeSinceLastResponse, + restartMetrics, + sleepLeft, + Optional.ofNullable(shutdownTime)); + } + + abstract long streamAge(); + + abstract long timeSinceLastSend(); + + abstract long timeSinceLastResponse(); + + abstract Optional restartMetrics(); + + abstract long sleepLeft(); + + abstract Optional shutdownTime(); + } + + @AutoValue + abstract static class RestartMetrics { + private static RestartMetrics create( + int restartCount, String restartReason, DateTime lastRestartTime, int errorCount) { + return new AutoValue_StreamDebugMetrics_RestartMetrics( + restartCount, restartReason, lastRestartTime, errorCount); + } + + abstract int restartCount(); + + abstract String lastRestartReason(); + + abstract DateTime lastRestartTime(); + + abstract int errorCount(); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableStreamObserverTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableStreamObserverTest.java index 538da9607f8b..189a244c822e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableStreamObserverTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableStreamObserverTest.java @@ -18,11 +18,14 @@ package org.apache.beam.runners.dataflow.worker.windmill.client; import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; +import java.util.ArrayList; +import java.util.List; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver; import org.junit.Test; import org.junit.runner.RunWith; @@ -30,18 +33,21 @@ @RunWith(JUnit4.class) public class ResettableStreamObserverTest { - private final StreamObserver delegate = - spy( - new StreamObserver() { - @Override - public void onNext(Integer integer) {} + private final StreamObserver delegate = newDelegate(); - @Override - public void onError(Throwable throwable) {} + private static StreamObserver newDelegate() { + return spy( + new StreamObserver() { + @Override + public void onNext(Integer integer) {} - @Override - public void onCompleted() {} - }); + @Override + public void onError(Throwable throwable) {} + + @Override + public void onCompleted() {} + }); + } @Test public void testPoison_beforeDelegateSet() { @@ -66,14 +72,14 @@ public void testReset_afterPoisonedThrows() { } @Test - public void onNext_afterPoisonedThrows() { + public void testOnNext_afterPoisonedThrows() { ResettableStreamObserver observer = new ResettableStreamObserver<>(() -> delegate); observer.poison(); assertThrows(WindmillStreamShutdownException.class, () -> observer.onNext(1)); } @Test - public void onError_afterPoisonedThrows() { + public void testOnError_afterPoisonedThrows() { ResettableStreamObserver observer = new ResettableStreamObserver<>(() -> delegate); observer.poison(); assertThrows( @@ -82,9 +88,31 @@ public void onError_afterPoisonedThrows() { } @Test - public void onCompleted_afterPoisonedThrows() { + public void testOnCompleted_afterPoisonedThrows() { ResettableStreamObserver observer = new ResettableStreamObserver<>(() -> delegate); observer.poison(); assertThrows(WindmillStreamShutdownException.class, observer::onCompleted); } + + @Test + public void testReset_usesNewDelegate() { + List> delegates = new ArrayList<>(); + ResettableStreamObserver observer = + new ResettableStreamObserver<>( + () -> { + StreamObserver delegate = newDelegate(); + delegates.add(delegate); + return delegate; + }); + observer.reset(); + observer.onNext(1); + observer.reset(); + observer.onNext(2); + + StreamObserver firstObserver = delegates.get(0); + StreamObserver secondObserver = delegates.get(1); + + verify(firstObserver).onNext(eq(1)); + verify(secondObserver).onNext(eq(2)); + } }