Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add shutdown and start mechanics to windmill streams #32774

Merged
merged 23 commits into from
Nov 19, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
address PR comments part 1
m-trieu committed Nov 12, 2024
commit 4a9a86311a112e4857c0ade7736df16410bcff8c
Original file line number Diff line number Diff line change
@@ -36,6 +36,7 @@
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.joda.time.DateTime;
import org.joda.time.Instant;
import org.slf4j.Logger;

@@ -71,6 +72,7 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
// shutdown.
private static final Status OK_STATUS = Status.fromCode(Status.Code.OK);
private static final String NEVER_RECEIVED_RESPONSE_LOG_STRING = "never received response";
private static final String NOT_SHUTDOWN = "not shutdown";
protected final Sleeper sleeper;

private final Logger logger;
@@ -262,7 +264,7 @@ public final void appendSummaryHtml(PrintWriter writer) {
", %d restarts, last restart reason [ %s ] at [%s], %d errors",
metrics.restartCount(),
metrics.lastRestartReason(),
metrics.lastRestartTime(),
metrics.lastRestartTime().orElse(null),
metrics.errorCount()));

if (summaryMetrics.isClientClosed()) {
@@ -275,13 +277,12 @@ public final void appendSummaryHtml(PrintWriter writer) {

writer.format(
", current stream is %dms old, last send %dms, last response %dms, closed: %s, "
+ "isShutdown: %s, shutdown time: %s",
+ "shutdown time: %s",
summaryMetrics.streamAge(),
summaryMetrics.timeSinceLastSend(),
summaryMetrics.timeSinceLastResponse(),
requestObserver.isClosed(),
summaryMetrics.shutdownTime().isPresent(),
summaryMetrics.shutdownTime().orElse(null));
summaryMetrics.shutdownTime().map(DateTime::toString).orElse(NOT_SHUTDOWN));
}

/**
@@ -297,8 +298,10 @@ public final synchronized void halfClose() {
clientClosed = true;
try {
requestObserver.onCompleted();
} catch (StreamClosedException | WindmillStreamShutdownException e) {
logger.warn("Stream was previously closed or shutdown.");
} catch (StreamClosedException e) {
logger.warn("Stream was previously closed.");
} catch (WindmillStreamShutdownException e) {
logger.warn("Stream was previously shutdown.");
}
}

@@ -317,10 +320,13 @@ public String backendWorkerToken() {
return backendWorkerToken;
}

@SuppressWarnings("GuardedBy")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove suppression

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@Override
public final void shutdown() {
scwhittle marked this conversation as resolved.
Show resolved Hide resolved
// Don't lock on "this" before poisoning the request observer as allow IO to block shutdown.
// Don't lock on "this" before poisoning the request observer since otherwise the observer may
// be blocking in send().
requestObserver.poison();
isShutdown = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should remove this (and the suppress)

shouldn't the poison prevent the blocking beneath the mutex? and then the below lock will be acquired soon?

Setting it to true outside the mutex will break invariants that are easier to think about if it is strictly guarded by. (and it breaks logic below we'll never run shutdownInternal)

if we do need it for something it seems like we could have a separate volatile shutdownRequested boolean. But I'd prefer to figure out what gets stuck with the current code and fix it instead because it is confusing to have two.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cleaneed up was supposed to stay within the sync block

synchronized (this) {
if (!isShutdown) {
isShutdown = true;
@@ -332,6 +338,18 @@ public final void shutdown() {

protected abstract void shutdownInternal();

/** Returns true if the stream was torn down and should not be restarted internally. */
private synchronized boolean maybeTeardownStream() {
if (isShutdown || (clientClosed && !hasPendingRequests())) {
streamRegistry.remove(AbstractWindmillStream.this);
finishLatch.countDown();
executor.shutdownNow();
return true;
}

return false;
}

private class ResponseObserver implements StreamObserver<ResponseT> {

@Override
@@ -351,7 +369,13 @@ public void onError(Throwable t) {
return;
}

recordStreamStatus(Status.fromThrowable(t));
Status errorStatus = Status.fromThrowable(t);
recordStreamStatus(errorStatus);

// If the stream was stopped due to a resource exhausted error then we are throttled.
if (errorStatus.getCode() == Status.Code.RESOURCE_EXHAUSTED) {
startThrottleTimer();
}

try {
long sleep = backoff.nextBackOffMillis();
@@ -411,25 +435,6 @@ private void recordStreamStatus(Status status) {
.responseDebugString(nowMillis)
.orElse(NEVER_RECEIVED_RESPONSE_LOG_STRING));
}

// If the stream was stopped due to a resource exhausted error then we are throttled.
if (status.getCode() == Status.Code.RESOURCE_EXHAUSTED) {
startThrottleTimer();
}
}
}

/** Returns true if the stream was torn down and should not be restarted internally. */
private boolean maybeTeardownStream() {
synchronized (AbstractWindmillStream.this) {
if (isShutdown || (clientClosed && !hasPendingRequests())) {
streamRegistry.remove(AbstractWindmillStream.this);
finishLatch.countDown();
executor.shutdownNow();
return true;
}

return false;
}
}
}
Original file line number Diff line number Diff line change
@@ -55,7 +55,7 @@ final class ResettableThrowingStreamObserver<T> {
* StreamObserver.
*/
@GuardedBy("this")
private boolean isCurrentStreamClosed = false;
private boolean isCurrentStreamClosed = true;

ResettableThrowingStreamObserver(
Supplier<TerminatingStreamObserver<T>> streamObserverFactory, Logger logger) {
@@ -72,12 +72,10 @@ private synchronized StreamObserver<T> delegate()

if (isCurrentStreamClosed) {
throw new StreamClosedException(
"Current stream is closed, requires reset for future stream operations.");
"Current stream is closed, requires reset() for future stream operations.");
}

return Preconditions.checkNotNull(
delegateStreamObserver,
"requestObserver cannot be null. Missing a call to startStream() to initialize.");
return Preconditions.checkNotNull(delegateStreamObserver, "requestObserver cannot be null.");
}

/** Creates a new delegate to use for future {@link StreamObserver} methods. */
@@ -131,9 +129,10 @@ public void onNext(T t) throws StreamClosedException, WindmillStreamShutdownExce
}
}

public void onError(Throwable throwable)
public synchronized void onError(Throwable throwable)
throws StreamClosedException, WindmillStreamShutdownException {
delegate().onError(throwable);
isCurrentStreamClosed = true;
}

public synchronized void onCompleted()
scwhittle marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -45,7 +45,7 @@ final class StreamDebugMetrics {
private String lastRestartReason = "";

@GuardedBy("this")
private DateTime lastRestartTime = null;
private @Nullable DateTime lastRestartTime = null;

@GuardedBy("this")
private long lastResponseTimeMs = 0;
@@ -57,7 +57,7 @@ final class StreamDebugMetrics {
private long startTimeMs = 0;

@GuardedBy("this")
private DateTime shutdownTime = null;
private @Nullable DateTime shutdownTime = null;

@GuardedBy("this")
private boolean clientClosed = false;
@@ -194,16 +194,19 @@ private static Snapshot create(
@AutoValue
abstract static class RestartMetrics {
private static RestartMetrics create(
int restartCount, String restartReason, DateTime lastRestartTime, int errorCount) {
int restartCount,
String restartReason,
@Nullable DateTime lastRestartTime,
int errorCount) {
return new AutoValue_StreamDebugMetrics_RestartMetrics(
restartCount, restartReason, lastRestartTime, errorCount);
restartCount, restartReason, Optional.ofNullable(lastRestartTime), errorCount);
}

abstract int restartCount();

abstract String lastRestartReason();

abstract DateTime lastRestartTime();
abstract Optional<DateTime> lastRestartTime();

abstract int errorCount();
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.runners.dataflow.worker.windmill.client;

/**
Original file line number Diff line number Diff line change
@@ -204,7 +204,8 @@ protected synchronized void onNewStream() throws WindmillStreamShutdownException
StreamingGetWorkRequest request =
StreamingGetWorkRequest.newBuilder()
.setRequest(
requestHeader.toBuilder()
requestHeader
.toBuilder()
.setMaxItems(initialGetWorkBudget.items())
.setMaxBytes(initialGetWorkBudget.bytes())
.build())
Original file line number Diff line number Diff line change
@@ -69,9 +69,7 @@ final class GrpcGetDataStream
private static final StreamingGetDataRequest HEALTH_CHECK_REQUEST =
StreamingGetDataRequest.newBuilder().build();

/**
* @implNote {@link QueuedBatch} objects in the queue are is guarded by {@code this}
*/
/** @implNote {@link QueuedBatch} objects in the queue are is guarded by {@code this} */
private final Deque<QueuedBatch> batches;
scwhittle marked this conversation as resolved.
Show resolved Hide resolved

private final Map<Long, AppendableInputStream> pending;
Original file line number Diff line number Diff line change
@@ -146,6 +146,7 @@ public void onNext(T value) throws StreamObserverCancelledException {
"Output channel stalled for {}s, outbound thread {}.",
totalSecondsWaited,
Thread.currentThread().getName());
Thread.dumpStack();
scwhittle marked this conversation as resolved.
Show resolved Hide resolved
}

waitSeconds = waitSeconds * 2;
Original file line number Diff line number Diff line change
@@ -142,7 +142,9 @@ protected boolean hasPendingRequests() {
@Override
protected void startThrottleTimer() {}

public void testSend(Integer i) throws ResettableThrowingStreamObserver.StreamClosedException, WindmillStreamShutdownException {
public void testSend(Integer i)
throws ResettableThrowingStreamObserver.StreamClosedException,
WindmillStreamShutdownException {
trySend(i);
}

Original file line number Diff line number Diff line change
@@ -102,7 +102,8 @@ public void testOnCompleted_afterPoisonedThrows() {

@Test
public void testReset_usesNewDelegate()
throws WindmillStreamShutdownException, ResettableThrowingStreamObserver.StreamClosedException {
throws WindmillStreamShutdownException,
ResettableThrowingStreamObserver.StreamClosedException {
List<StreamObserver<Integer>> delegates = new ArrayList<>();
ResettableThrowingStreamObserver<Integer> observer =
newStreamObserver(
Original file line number Diff line number Diff line change
@@ -78,8 +78,9 @@ public void testSummaryMetrics_withRestarts() {
assertThat(restartMetrics.lastRestartReason()).isEqualTo(restartReason);
assertThat(restartMetrics.restartCount()).isEqualTo(1);
assertThat(restartMetrics.errorCount()).isEqualTo(1);
assertThat(restartMetrics.lastRestartTime()).isLessThan(DateTime.now());
assertThat(restartMetrics.lastRestartTime().toInstant()).isGreaterThan(Instant.EPOCH);
assertTrue(restartMetrics.lastRestartTime().isPresent());
assertThat(restartMetrics.lastRestartTime().get()).isLessThan(DateTime.now());
assertThat(restartMetrics.lastRestartTime().get().toInstant()).isGreaterThan(Instant.EPOCH);
}

@Test
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -472,7 +473,8 @@ private void flushResponse() {
responseObserver.onNext(responseBuilder.build());
} catch (Exception e) {
// Stream is already closed.
System.out.println("trieu: " + e);
LOG.warn("trieu: ", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rm debug logs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed this waas for debugging

LOG.warn(Arrays.toString(e.getStackTrace()));
}
responseBuilder.clear();
}
@@ -512,7 +514,9 @@ private void flushResponse() {
done.countDown();
});
}
done.await();
while (done.await(5, TimeUnit.SECONDS)) {
LOG.info("trieu: {}", done.getCount());
}
stream.halfClose();
assertTrue(stream.awaitTermination(60, TimeUnit.SECONDS));
executor.shutdown();