Skip to content

Commit

Permalink
Add last error time to stream error message (#30476)
Browse files Browse the repository at this point in the history
  • Loading branch information
arunpandianp authored Mar 4, 2024
1 parent 770c6fe commit 4ad8d53
Showing 1 changed file with 15 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTime;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -61,6 +62,7 @@
* synchronizing on this.
*/
public abstract class AbstractWindmillStream<RequestT, ResponseT> implements WindmillStream {

public static final long DEFAULT_STREAM_RPC_DEADLINE_SECONDS = 300;
// Default gRPC streams to 2MB chunks, which has shown to be a large enough chunk size to reduce
// per-chunk overhead, and small enough that we can still perform granular flow-control.
Expand All @@ -74,6 +76,7 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
private final AtomicLong lastResponseTimeMs;
private final AtomicInteger errorCount;
private final AtomicReference<String> lastError;
private final AtomicReference<DateTime> lastErrorTime;
private final AtomicLong sleepUntil;
private final CountDownLatch finishLatch;
private final Set<AbstractWindmillStream<?, ?>> streamRegistry;
Expand Down Expand Up @@ -105,6 +108,7 @@ protected AbstractWindmillStream(
this.lastResponseTimeMs = new AtomicLong();
this.errorCount = new AtomicInteger();
this.lastError = new AtomicReference<>();
this.lastErrorTime = new AtomicReference<>();
this.sleepUntil = new AtomicLong();
this.finishLatch = new CountDownLatch(1);
this.requestObserverSupplier =
Expand Down Expand Up @@ -210,7 +214,9 @@ public final synchronized void maybeSendHealthCheck(Instant lastSendThreshold) {
public final void appendSummaryHtml(PrintWriter writer) {
appendSpecificHtml(writer);
if (errorCount.get() > 0) {
writer.format(", %d errors, last error [ %s ]", errorCount.get(), lastError.get());
writer.format(
", %d errors, last error [ %s ] at [%s]",
errorCount.get(), lastError.get(), lastErrorTime.get());
}
if (clientClosed.get()) {
writer.write(", client closed");
Expand Down Expand Up @@ -250,6 +256,7 @@ public final Instant startTime() {
}

private class ResponseObserver implements StreamObserver<ResponseT> {

@Override
public void onNext(ResponseT response) {
try {
Expand Down Expand Up @@ -285,7 +292,7 @@ private void onStreamFinished(@Nullable Throwable t) {
status = ((StatusRuntimeException) t).getStatus();
}
String statusError = status == null ? "" : status.toString();
lastError.set(statusError);
setLastError(statusError);
if (errorCount.getAndIncrement() % logEveryNStreamFailures == 0) {
long nowMillis = Instant.now().getMillis();
String responseDebug;
Expand Down Expand Up @@ -325,9 +332,14 @@ private void onStreamFinished(@Nullable Throwable t) {
"Stream completed successfully but did not complete requested operations, "
+ "recreating";
LOG.warn(error);
lastError.set(error);
setLastError(error);
}
executor.execute(AbstractWindmillStream.this::startStream);
}
}

private void setLastError(String error) {
lastError.set(error);
lastErrorTime.set(DateTime.now());
}
}

0 comments on commit 4ad8d53

Please sign in to comment.