Skip to content

Commit

Permalink
address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
m-trieu committed Nov 19, 2024
1 parent 50d94e0 commit b7dba9e
Showing 1 changed file with 8 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCache;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingRemoteStubFactory;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingStubFactory;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.IsolationChannel;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactory;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactoryImpl;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
Expand Down Expand Up @@ -603,8 +604,13 @@ private static ChannelCachingStubFactory createStubFactory(
workerOptions.getGcpCredential(),
ChannelCache.create(
serviceAddress ->
remoteChannel(
serviceAddress, workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec())));
// IsolationChannel will create and manage separate RPC channels to the same
// serviceAddress.
IsolationChannel.create(
() ->
remoteChannel(
serviceAddress,
workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec()))));
}

@VisibleForTesting
Expand Down Expand Up @@ -860,7 +866,6 @@ ComputationStateCache getComputationStateCache() {
return computationStateCache;
}

@SuppressWarnings("FutureReturnValueIgnored")
public void start() {
running.set(true);
configFetcher.start();
Expand Down

0 comments on commit b7dba9e

Please sign in to comment.