diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index e47f74d17bc..a57f4c43639 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -84,6 +84,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCache; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingRemoteStubFactory; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingStubFactory; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.IsolationChannel; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactory; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactoryImpl; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache; @@ -604,8 +605,13 @@ private static ChannelCachingStubFactory createStubFactory( workerOptions.getGcpCredential(), ChannelCache.create( serviceAddress -> - remoteChannel( - serviceAddress, workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec()))); + // IsolationChannel will create and manage separate RPC channels to the same + // serviceAddress. + IsolationChannel.create( + () -> + remoteChannel( + serviceAddress, + workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec())))); } @VisibleForTesting @@ -857,7 +863,6 @@ ComputationStateCache getComputationStateCache() { return computationStateCache; } - @SuppressWarnings("FutureReturnValueIgnored") public void start() { running.set(true); configFetcher.start(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java index 458cf57ca8e..35cfdbca4a2 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java @@ -356,6 +356,7 @@ private void closeStreamSender(Endpoint endpoint, Closeable sender) { } /** Add up all the throttle times of all streams including GetWorkerMetadataStream. */ + @Override public long getAndResetThrottleTime() { return backends.get().windmillStreams().values().stream() .map(WindmillStreamSender::getAndResetThrottleTime)