From 9679382e488d6b952436bb7edd7735193d7e5fc3 Mon Sep 17 00:00:00 2001 From: Martin Trieu Date: Tue, 22 Oct 2024 11:59:05 -0700 Subject: [PATCH] address PR comments around deadlocking, move WindmillStreamShutdownException to its own top level class --- .../client/AbstractWindmillStream.java | 31 ++---- .../WindmillStreamShutdownException.java | 25 +++++ .../client/getdata/StreamGetDataClient.java | 6 +- .../client/grpc/GrpcCommitWorkStream.java | 95 ++++++++++++------- .../client/grpc/GrpcGetDataStream.java | 51 ++++++---- .../refresh/FixedStreamHeartbeatSender.java | 4 +- .../client/grpc/GrpcCommitWorkStreamTest.java | 4 +- 7 files changed, 135 insertions(+), 81 deletions(-) create mode 100644 runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamShutdownException.java diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java index 57331d749267..63d02104b136 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java @@ -79,7 +79,8 @@ public abstract class AbstractWindmillStream implements Win /** * Used to guard {@link #start()} and {@link #shutdown()} behavior. * - * @implNote Should not be held when performing IO. + * @implNote Do not hold when performing IO. If also locking on {@code this} in the same context, + * should acquire shutdownLock first to prevent deadlocks. */ protected final Object shutdownLock = new Object(); @@ -184,15 +185,6 @@ protected boolean isShutdown() { return isShutdown; } - private StreamObserver requestObserver() { - if (requestObserver == null) { - throw new NullPointerException( - "requestObserver cannot be null. Missing a call to start() to initialize stream."); - } - - return requestObserver; - } - /** Send a request to the server. */ protected final void send(RequestT request) { synchronized (this) { @@ -221,14 +213,17 @@ protected final void send(RequestT request) { @Override public final void start() { + boolean shouldStartStream = false; synchronized (shutdownLock) { if (!isShutdown && !started) { - // start() should only be executed once during the lifetime of the stream for idempotency - // and when shutdown() has not been called. - startStream(); started = true; + shouldStartStream = true; } } + + if (shouldStartStream) { + startStream(); + } } /** Starts the underlying stream. */ @@ -366,8 +361,8 @@ public final void shutdown() { if (!isShutdown) { isShutdown = true; shutdownTime.set(DateTime.now()); - requestObserver() - .onError(new WindmillStreamShutdownException("Explicit call to shutdown stream.")); + requestObserver.onError( + new WindmillStreamShutdownException("Explicit call to shutdown stream.")); shutdownInternal(); } } @@ -380,12 +375,6 @@ private void recordRestartReason(String error) { protected abstract void shutdownInternal(); - public static class WindmillStreamShutdownException extends RuntimeException { - public WindmillStreamShutdownException(String message) { - super(message); - } - } - /** * Request observer that allows resetting its internal delegate using the given {@link * #requestObserverSupplier}. diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamShutdownException.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamShutdownException.java new file mode 100644 index 000000000000..5f4387d6111f --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamShutdownException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client; + +/** Thrown when operations are requested on a {@link WindmillStream} has been shutdown/closed. */ +public final class WindmillStreamShutdownException extends RuntimeException { + public WindmillStreamShutdownException(String message) { + super(message); + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/getdata/StreamGetDataClient.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/getdata/StreamGetDataClient.java index c8e058e7e230..ab12946ad18b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/getdata/StreamGetDataClient.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/getdata/StreamGetDataClient.java @@ -21,8 +21,8 @@ import java.util.function.Function; import org.apache.beam.runners.dataflow.worker.WorkItemCancelledException; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; -import org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamShutdownException; import org.apache.beam.sdk.annotations.Internal; /** {@link GetDataClient} that fetches data directly from a specific {@link GetDataStream}. */ @@ -61,7 +61,7 @@ public Windmill.KeyedGetDataResponse getStateData( String computationId, Windmill.KeyedGetDataRequest request) throws GetDataException { try (AutoCloseable ignored = getDataMetricTracker.trackStateDataFetchWithThrottling()) { return getDataStream.requestKeyedData(computationId, request); - } catch (AbstractWindmillStream.WindmillStreamShutdownException e) { + } catch (WindmillStreamShutdownException e) { throw new WorkItemCancelledException(request.getShardingKey()); } catch (Exception e) { throw new GetDataException( @@ -86,7 +86,7 @@ public Windmill.GlobalData getSideInputData(Windmill.GlobalDataRequest request) sideInputGetDataStreamFactory.apply(request.getDataId().getTag()); try (AutoCloseable ignored = getDataMetricTracker.trackSideInputFetchWithThrottling()) { return sideInputGetDataStream.requestGlobalData(request); - } catch (AbstractWindmillStream.WindmillStreamShutdownException e) { + } catch (WindmillStreamShutdownException e) { throw new WorkItemCancelledException(e); } catch (Exception e) { throw new GetDataException( diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java index 826fb13b4db9..e4c8947b38fd 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java @@ -233,19 +233,16 @@ private void issueSingleRequest(long id, PendingRequest pendingRequest) { .setShardingKey(pendingRequest.shardingKey()) .setSerializedWorkItemCommit(pendingRequest.serializedCommit()); StreamingCommitWorkRequest chunk = requestBuilder.build(); - synchronized (this) { - synchronized (shutdownLock) { - if (!isShutdown()) { - pending.put(id, pendingRequest); - } else { - return; - } - } - try { - send(chunk); - } catch (IllegalStateException e) { - // Stream was broken, request will be retried when stream is reopened. - } + if (shouldCancelRequest(id, pendingRequest)) { + pendingRequest.abort(); + return; + } + + try { + send(chunk); + } catch (IllegalStateException e) { + // Stream was broken, request will be retried when stream is reopened. + } } @@ -265,33 +262,28 @@ private void issueBatchedRequest(Map requests) { .setSerializedWorkItemCommit(request.serializedCommit()); } StreamingCommitWorkRequest request = requestBuilder.build(); - synchronized (this) { - synchronized (shutdownLock) { - if (!isShutdown()) { - pending.putAll(requests); - } else { - return; - } - } - try { - send(request); - } catch (IllegalStateException e) { - // Stream was broken, request will be retried when stream is reopened. - } + + if (shouldCancelRequest(requests)) { + requests.forEach((ignored, pendingRequest) -> pendingRequest.abort()); + return; + } + + try { + send(request); + } catch (IllegalStateException e) { + // Stream was broken, request will be retried when stream is reopened. } } - private void issueMultiChunkRequest(final long id, PendingRequest pendingRequest) { + private void issueMultiChunkRequest(long id, PendingRequest pendingRequest) { checkNotNull(pendingRequest.computationId()); - final ByteString serializedCommit = pendingRequest.serializedCommit(); + ByteString serializedCommit = pendingRequest.serializedCommit(); + if (shouldCancelRequest(id, pendingRequest)) { + pendingRequest.abort(); + return; + } + synchronized (this) { - synchronized (shutdownLock) { - if (!isShutdown()) { - pending.put(id, pendingRequest); - } else { - return; - } - } for (int i = 0; i < serializedCommit.size(); i += AbstractWindmillStream.RPC_STREAM_CHUNK_SIZE) { @@ -321,6 +313,32 @@ private void issueMultiChunkRequest(final long id, PendingRequest pendingRequest } } + private boolean shouldCancelRequest(long id, PendingRequest request) { + synchronized (shutdownLock) { + synchronized (this) { + if (!isShutdown()) { + pending.put(id, request); + return false; + } + + return true; + } + } + } + + private boolean shouldCancelRequest(Map requests) { + synchronized (shutdownLock) { + synchronized (this) { + if (!isShutdown()) { + pending.putAll(requests); + return false; + } + + return true; + } + } + } + @AutoValue abstract static class PendingRequest { @@ -402,6 +420,11 @@ private Batcher() { @Override public boolean commitWorkItem( String computation, WorkItemCommitRequest commitRequest, Consumer onDone) { + if (isShutdown()) { + onDone.accept(CommitStatus.ABORTED); + return false; + } + if (!canAccept(commitRequest.getSerializedSize() + computation.length()) || isShutdown()) { return false; } @@ -418,7 +441,7 @@ public void flush() { if (!isShutdown()) { flushInternal(queue); } else { - queue.forEach((ignored, request) -> request.onDone().accept(CommitStatus.ABORTED)); + queue.forEach((ignored, request) -> request.abort()); } } finally { queuedBytes = 0; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java index 3b0bea713466..c2aa965de7c3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java @@ -50,6 +50,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingGetDataResponse; import org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamShutdownException; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcGetDataStreamRequests.QueuedBatch; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcGetDataStreamRequests.QueuedRequest; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory; @@ -198,7 +199,8 @@ public KeyedGetDataResponse requestKeyedData(String computation, KeyedGetDataReq return issueRequest( QueuedRequest.forComputation(uniqueId(), computation, request), KeyedGetDataResponse::parseFrom); - } catch (WindmillStreamShutdownException e) { + } catch ( + org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamShutdownException e) { throw new WorkItemCancelledException(request.getShardingKey()); } } @@ -207,7 +209,8 @@ public KeyedGetDataResponse requestKeyedData(String computation, KeyedGetDataReq public GlobalData requestGlobalData(GlobalDataRequest request) { try { return issueRequest(QueuedRequest.global(uniqueId(), request), GlobalData::parseFrom); - } catch (WindmillStreamShutdownException e) { + } catch ( + org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamShutdownException e) { throw new WorkItemCancelledException( "SideInput fetch failed for request due to stream shutdown: " + request, e); } @@ -216,7 +219,8 @@ public GlobalData requestGlobalData(GlobalDataRequest request) { @Override public void refreshActiveWork(Map> heartbeats) { if (isShutdown()) { - throw new WindmillStreamShutdownException("Unable to refresh work for shutdown stream."); + throw new org.apache.beam.runners.dataflow.worker.windmill.client + .WindmillStreamShutdownException("Unable to refresh work for shutdown stream."); } StreamingGetDataRequest.Builder builder = StreamingGetDataRequest.newBuilder(); @@ -354,18 +358,24 @@ private ResponseT issueRequest(QueuedRequest request, ParseFn requests) { + if (requests.isEmpty()) { + return; + } + StreamingGetDataRequest batchedRequest = flushToBatch(requests); - synchronized (this) { + synchronized (shutdownLock) { // Synchronization of pending inserts is necessary with send to ensure duplicates are not // sent on stream reconnect. - synchronized (shutdownLock) { + synchronized (this) { // shutdown() clears pending, once the stream is shutdown, prevent values from being added // to it. if (isShutdown()) { @@ -448,12 +462,13 @@ private void sendBatch(List requests) { verify(pending.put(request.id(), request.getResponseStream()) == null); } } - try { - send(batchedRequest); - } catch (IllegalStateException e) { - // The stream broke before this call went through; onNewStream will retry the fetch. - LOG.warn("GetData stream broke before call started.", e); - } + } + + try { + send(batchedRequest); + } catch (IllegalStateException e) { + // The stream broke before this call went through; onNewStream will retry the fetch. + LOG.warn("GetData stream broke before call started.", e); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/FixedStreamHeartbeatSender.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/FixedStreamHeartbeatSender.java index 33a55d1927f8..ed5f2db7f480 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/FixedStreamHeartbeatSender.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/FixedStreamHeartbeatSender.java @@ -20,8 +20,8 @@ import java.util.Objects; import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.worker.streaming.RefreshableWork; -import org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamShutdownException; import org.apache.beam.sdk.annotations.Internal; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,7 +61,7 @@ public void sendHeartbeats(Heartbeats heartbeats) { Thread.currentThread().setName(originalThreadName + "-" + backendWorkerToken); } getDataStream.refreshActiveWork(heartbeats.heartbeatRequests().asMap()); - } catch (AbstractWindmillStream.WindmillStreamShutdownException e) { + } catch (WindmillStreamShutdownException e) { LOG.warn( "Trying to refresh work w/ {} heartbeats on stream={} after work has moved off of worker." + " heartbeats", diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java index 5a281a06dc28..1ee16d774b0d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java @@ -158,10 +158,12 @@ public void testCommitWorkItem_afterShutdownFalse() { } commitWorkStream.shutdown(); + Set commitStatuses = new HashSet<>(); try (WindmillStream.CommitWorkStream.RequestBatcher batcher = commitWorkStream.batcher()) { for (int i = 0; i < numCommits; i++) { assertFalse( - batcher.commitWorkItem(COMPUTATION_ID, workItemCommitRequest(i), ignored -> {})); + batcher.commitWorkItem(COMPUTATION_ID, workItemCommitRequest(i), commitStatuses::add)); + assertThat(commitStatuses).containsExactly(Windmill.CommitStatus.ABORTED); } } }