Skip to content

Commit

Permalink
address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
m-trieu committed Nov 1, 2024
1 parent 5a68e0f commit 3fb76ac
Show file tree
Hide file tree
Showing 9 changed files with 262 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.dataflow.worker.windmill.client;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Verify.verify;

import java.io.IOException;
import java.io.PrintWriter;
import java.util.Set;
Expand Down Expand Up @@ -68,13 +70,14 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
// Indicates that the logical stream has been half-closed and is waiting for clean server
// shutdown.
private static final Status OK_STATUS = Status.fromCode(Status.Code.OK);
private static final String NEVER_RECEIVED_RESPONSE_LOG_STRING = "never received response";
protected final Sleeper sleeper;

/**
* Used to guard {@link #start()} and {@link #shutdown()} behavior.
*
* @implNote Do not hold when performing IO. If also locking on {@code this} in the same context,
* should acquire shutdownLock first to prevent deadlocks.
* @implNote Do NOT hold when performing IO. If also locking on {@code this} in the same context,
* should acquire shutdownLock after {@code this} to prevent deadlocks.
*/
protected final Object shutdownLock = new Object();

Expand All @@ -94,11 +97,13 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
* #halfClose()}. Separate from {@link #clientClosed} as this is specific to the requestObserver
* and is initially false on retry.
*/
@GuardedBy("this")
private boolean streamClosed;
private volatile boolean streamClosed;

@GuardedBy("shutdownLock")
private boolean isShutdown;

private volatile boolean isShutdown;
private volatile boolean started;
@GuardedBy("shutdownLock")
private boolean started;

protected AbstractWindmillStream(
Logger logger,
Expand Down Expand Up @@ -132,7 +137,7 @@ protected AbstractWindmillStream(
new AbstractWindmillStream<RequestT, ResponseT>.ResponseObserver()));
this.sleeper = Sleeper.DEFAULT;
this.logger = logger;
this.debugMetrics = new StreamDebugMetrics();
this.debugMetrics = StreamDebugMetrics.create();
}

private static String createThreadName(String streamType, String backendWorkerToken) {
Expand All @@ -158,14 +163,16 @@ private static String createThreadName(String streamType, String backendWorkerTo
protected abstract void startThrottleTimer();

/** Reflects that {@link #shutdown()} was explicitly called. */
protected boolean isShutdown() {
return isShutdown;
protected boolean hasReceivedShutdownSignal() {
synchronized (shutdownLock) {
return isShutdown;
}
}

/** Send a request to the server. */
protected final void send(RequestT request) {
synchronized (this) {
if (isShutdown) {
if (hasReceivedShutdownSignal()) {
return;
}

Expand All @@ -175,10 +182,11 @@ protected final void send(RequestT request) {
}

try {
verify(!Thread.holdsLock(shutdownLock), "shutdownLock should not be held during send.");
debugMetrics.recordSend();
requestObserver.onNext(request);
} catch (StreamObserverCancelledException e) {
if (isShutdown) {
if (hasReceivedShutdownSignal()) {
logger.debug("Stream was shutdown during send.", e);
return;
}
Expand Down Expand Up @@ -210,7 +218,7 @@ private void startStream() {
while (true) {
try {
synchronized (this) {
if (isShutdown) {
if (hasReceivedShutdownSignal()) {
break;
}
debugMetrics.recordStart();
Expand Down Expand Up @@ -260,7 +268,7 @@ protected final void executeSafely(Runnable runnable) {
}

public final void maybeSendHealthCheck(Instant lastSendThreshold) {
if (!clientClosed && debugMetrics.lastSendTimeMs() < lastSendThreshold.getMillis()) {
if (!clientClosed && debugMetrics.getLastSendTimeMs() < lastSendThreshold.getMillis()) {
try {
sendHealthCheck();
} catch (RuntimeException e) {
Expand All @@ -276,7 +284,6 @@ public final void maybeSendHealthCheck(Instant lastSendThreshold) {
* information. Blocking sends are made beneath this stream object's lock which could block
* status page rendering.
*/
@SuppressWarnings("GuardedBy")
public final void appendSummaryHtml(PrintWriter writer) {
appendSpecificHtml(writer);
StreamDebugMetrics.Snapshot summaryMetrics = debugMetrics.getSummaryMetrics();
Expand Down Expand Up @@ -306,7 +313,7 @@ public final void appendSummaryHtml(PrintWriter writer) {
summaryMetrics.timeSinceLastSend(),
summaryMetrics.timeSinceLastResponse(),
streamClosed,
isShutdown,
hasReceivedShutdownSignal(),
summaryMetrics.shutdownTime().orElse(null));
}

Expand All @@ -331,7 +338,7 @@ public final boolean awaitTermination(int time, TimeUnit unit) throws Interrupte

@Override
public final Instant startTime() {
return new Instant(debugMetrics.startTimeMs());
return new Instant(debugMetrics.getStartTimeMs());
}

@Override
Expand Down Expand Up @@ -429,8 +436,10 @@ private void recordStreamStatus(Status status) {
currentErrorCount,
t,
status,
nowMillis - debugMetrics.startTimeMs(),
debugMetrics.responseDebugString(nowMillis));
nowMillis - debugMetrics.getStartTimeMs(),
debugMetrics
.responseDebugString(nowMillis)
.orElse(NEVER_RECEIVED_RESPONSE_LOG_STRING));
}

// If the stream was stopped due to a resource exhausted error then we are throttled.
Expand All @@ -442,7 +451,7 @@ private void recordStreamStatus(Status status) {

/** Returns true if the stream was torn down and should not be restarted internally. */
private synchronized boolean maybeTeardownStream() {
if (isShutdown || (clientClosed && !hasPendingRequests())) {
if (hasReceivedShutdownSignal() || (clientClosed && !hasPendingRequests())) {
streamRegistry.remove(AbstractWindmillStream.this);
finishLatch.countDown();
executor.shutdownNow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ final class ResettableStreamObserver<T> implements StreamObserver<T> {
@GuardedBy("this")
private @Nullable StreamObserver<T> delegateStreamObserver;

/**
* Indicates that the request observer should no longer be used. Attempts to perform operations on
* the request observer will throw an {@link WindmillStreamShutdownException}.
*/
@GuardedBy("this")
private boolean isPoisoned;

Expand All @@ -63,6 +59,7 @@ private synchronized StreamObserver<T> delegate() {
"requestObserver cannot be null. Missing a call to startStream() to initialize.");
}

/** Creates a new delegate to use for future {@link StreamObserver} methods. */
synchronized void reset() {
if (isPoisoned) {
throw new WindmillStreamShutdownException("Explicit call to shutdown stream.");
Expand All @@ -71,6 +68,10 @@ synchronized void reset() {
delegateStreamObserver = streamObserverFactory.get();
}

/**
* Indicates that the request observer should no longer be used. Attempts to perform operations on
* the request observer will throw an {@link WindmillStreamShutdownException}.
*/
synchronized void poison() {
if (!isPoisoned) {
isPoisoned = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,24 @@

import com.google.auto.value.AutoValue;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.DateTime;
import org.joda.time.Instant;

/** Records stream metrics for debugging. */
@ThreadSafe
final class StreamDebugMetrics {
private final AtomicInteger restartCount = new AtomicInteger();
private final AtomicInteger errorCount = new AtomicInteger();
private final Supplier<Instant> clock;

@GuardedBy("this")
private int errorCount = 0;

@GuardedBy("this")
private int restartCount = 0;

@GuardedBy("this")
private long sleepUntil = 0;
Expand All @@ -53,12 +59,25 @@ final class StreamDebugMetrics {
@GuardedBy("this")
private DateTime shutdownTime = null;

private StreamDebugMetrics(Supplier<Instant> clock) {
this.clock = clock;
}

static StreamDebugMetrics create() {
return new StreamDebugMetrics(Instant::now);
}

@VisibleForTesting
static StreamDebugMetrics forTesting(Supplier<Instant> fakeClock) {
return new StreamDebugMetrics(fakeClock);
}

private static long debugDuration(long nowMs, long startMs) {
return startMs <= 0 ? -1 : Math.max(0, nowMs - startMs);
}

private static long nowMs() {
return Instant.now().getMillis();
private long nowMs() {
return clock.get().getMillis();
}

synchronized void recordSend() {
Expand All @@ -76,53 +95,50 @@ synchronized void recordResponse() {

synchronized void recordRestartReason(String error) {
lastRestartReason = error;
lastRestartTime = DateTime.now();
lastRestartTime = clock.get().toDateTime();
}

synchronized long startTimeMs() {
synchronized long getStartTimeMs() {
return startTimeMs;
}

synchronized long lastSendTimeMs() {
synchronized long getLastSendTimeMs() {
return lastSendTimeMs;
}

synchronized void recordSleep(long sleepMs) {
sleepUntil = nowMs() + sleepMs;
}

int incrementAndGetRestarts() {
return restartCount.incrementAndGet();
synchronized int incrementAndGetRestarts() {
return restartCount++;
}

int incrementAndGetErrors() {
return errorCount.incrementAndGet();
synchronized int incrementAndGetErrors() {
return errorCount++;
}

synchronized void recordShutdown() {
shutdownTime = DateTime.now();
shutdownTime = clock.get().toDateTime();
}

synchronized String responseDebugString(long nowMillis) {
synchronized Optional<String> responseDebugString(long nowMillis) {
return lastResponseTimeMs == 0
? "never received response"
: "received response " + (nowMillis - lastResponseTimeMs) + "ms ago";
? Optional.empty()
: Optional.of("received response " + (nowMillis - lastResponseTimeMs) + "ms ago");
}

private Optional<RestartMetrics> getRestartMetrics() {
if (restartCount.get() > 0) {
synchronized (this) {
return Optional.of(
RestartMetrics.create(
restartCount.get(), lastRestartReason, lastRestartTime, errorCount.get()));
}
private synchronized Optional<RestartMetrics> getRestartMetrics() {
if (restartCount > 0) {
return Optional.of(
RestartMetrics.create(restartCount, lastRestartReason, lastRestartTime, errorCount));
}

return Optional.empty();
}

synchronized Snapshot getSummaryMetrics() {
long nowMs = Instant.now().getMillis();
long nowMs = clock.get().getMillis();
return Snapshot.create(
debugDuration(nowMs, startTimeMs),
debugDuration(nowMs, lastSendTimeMs),
Expand Down
Loading

0 comments on commit 3fb76ac

Please sign in to comment.