From 93c5b31c65fb6fd84682654098a108a675ece9bc Mon Sep 17 00:00:00 2001 From: Martin Trieu Date: Fri, 25 Oct 2024 10:30:41 -0700 Subject: [PATCH] address PR comments --- .../client/grpc/GrpcGetDataStream.java | 42 ++++++++----------- 1 file changed, 18 insertions(+), 24 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java index cda246065ab9..a5d5b1882fd7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java @@ -144,6 +144,16 @@ static GrpcGetDataStream create( processHeartbeatResponses); } + private static WindmillStreamShutdownException shutdownException(QueuedBatch batch) { + return new WindmillStreamShutdownException( + "Stream was closed when attempting to send " + batch.requestsCount() + " requests."); + } + + private static WindmillStreamShutdownException shutdownException(QueuedRequest request) { + return new WindmillStreamShutdownException( + "Cannot send request=[" + request + "] on closed stream."); + } + @Override protected synchronized void onNewStream() { if (isShutdown()) { @@ -349,34 +359,21 @@ private ResponseT issueRequest(QueuedRequest request, ParseFn