From 48a048e3903a0de834a3506dd886658a56c27428 Mon Sep 17 00:00:00 2001 From: Martin Trieu Date: Thu, 21 Nov 2024 12:59:53 -0800 Subject: [PATCH] use direct executor to deflake tests --- .../FanOutStreamingEngineWorkerHarness.java | 19 ++- .../worker/windmill/WindmillEndpoints.java | 23 ++-- .../client/grpc/GrpcDispatcherClient.java | 4 +- .../client/grpc/stubs/ChannelCache.java | 22 ++-- ...anOutStreamingEngineWorkerHarnessTest.java | 123 ++++++++---------- .../client/grpc/stubs/ChannelCacheTest.java | 12 +- .../testing/FakeWindmillStubFactory.java | 2 +- 7 files changed, 95 insertions(+), 110 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java index 1ef1691b0817..571c39d34bf8 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java @@ -68,6 +68,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Streams; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.MoreExecutors; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -125,7 +126,8 @@ private FanOutStreamingEngineWorkerHarness( GetWorkBudgetDistributor getWorkBudgetDistributor, GrpcDispatcherClient dispatcherClient, Function workCommitterFactory, - ThrottlingGetDataMetricTracker getDataMetricTracker) { + ThrottlingGetDataMetricTracker getDataMetricTracker, + ExecutorService workerMetadataConsumer) { this.jobHeader = jobHeader; this.getDataMetricTracker = getDataMetricTracker; this.started = false; @@ -138,9 +140,7 @@ private FanOutStreamingEngineWorkerHarness( this.windmillStreamManager = Executors.newCachedThreadPool( new ThreadFactoryBuilder().setNameFormat(STREAM_MANAGER_THREAD_NAME).build()); - this.workerMetadataConsumer = - Executors.newSingleThreadExecutor( - new ThreadFactoryBuilder().setNameFormat(WORKER_METADATA_CONSUMER_THREAD_NAME).build()); + this.workerMetadataConsumer = workerMetadataConsumer; this.getWorkBudgetDistributor = getWorkBudgetDistributor; this.totalGetWorkBudget = totalGetWorkBudget; this.activeMetadataVersion = Long.MIN_VALUE; @@ -171,7 +171,11 @@ public static FanOutStreamingEngineWorkerHarness create( getWorkBudgetDistributor, dispatcherClient, workCommitterFactory, - getDataMetricTracker); + getDataMetricTracker, + Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder() + .setNameFormat(WORKER_METADATA_CONSUMER_THREAD_NAME) + .build())); } @VisibleForTesting @@ -195,7 +199,10 @@ static FanOutStreamingEngineWorkerHarness forTesting( getWorkBudgetDistributor, dispatcherClient, workCommitterFactory, - getDataMetricTracker); + getDataMetricTracker, + // Run the workerMetadataConsumer on the direct calling thread to make testing more + // deterministic. + MoreExecutors.newDirectExecutorService()); fanOutStreamingEngineWorkProvider.start(); return fanOutStreamingEngineWorkProvider; } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java index 13b3ea954198..dd7fdd45ab08 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java @@ -88,17 +88,23 @@ private static Optional parseDirectEndpoint( .map(address -> AuthenticatedGcpServiceAddress.create(authenticatingService, address)) .map(WindmillServiceAddress::create); - return directEndpointIpV6Address.isPresent() - ? directEndpointIpV6Address - : tryParseEndpointIntoHostAndPort(endpointProto.getDirectEndpoint()) - .map(WindmillServiceAddress::create); + Optional windmillServiceAddress = + directEndpointIpV6Address.isPresent() + ? directEndpointIpV6Address + : tryParseEndpointIntoHostAndPort(endpointProto.getDirectEndpoint()) + .map(WindmillServiceAddress::create); + + if (!windmillServiceAddress.isPresent()) { + LOG.warn("Endpoint {} could not be parsed into a WindmillServiceAddress.", endpointProto); + } + + return windmillServiceAddress; } private static Optional tryParseEndpointIntoHostAndPort(String directEndpoint) { try { return Optional.of(HostAndPort.fromString(directEndpoint)); } catch (IllegalArgumentException e) { - LOG.warn("{} cannot be parsed into a gcpServiceAddress", directEndpoint); return Optional.empty(); } } @@ -113,19 +119,12 @@ private static Optional tryParseDirectEndpointIntoIpV6Address( try { directEndpointAddress = Inet6Address.getByName(endpointProto.getDirectEndpoint()); } catch (UnknownHostException e) { - LOG.warn( - "Error occurred trying to parse direct_endpoint={} into IPv6 address. Exception={}", - endpointProto.getDirectEndpoint(), - e.toString()); return Optional.empty(); } // Inet6Address.getByAddress returns either an IPv4 or an IPv6 address depending on the format // of the direct_endpoint string. if (!(directEndpointAddress instanceof Inet6Address)) { - LOG.warn( - "{} is not an IPv6 address. Direct endpoints are expected to be in IPv6 format.", - endpointProto.getDirectEndpoint()); return Optional.empty(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java index 6bae84483d16..234888831779 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java @@ -150,6 +150,8 @@ public CloudWindmillMetadataServiceV1Alpha1Stub getWindmillMetadataServiceStubBl } } + LOG.info("Windmill Service endpoint initialized after {} seconds.", secondsWaited); + ImmutableList windmillMetadataServiceStubs = dispatcherStubs.get().windmillMetadataServiceStubs(); @@ -190,7 +192,7 @@ public void onJobConfig(StreamingGlobalConfig config) { public synchronized void consumeWindmillDispatcherEndpoints( ImmutableSet dispatcherEndpoints) { - consumeWindmillDispatcherEndpoints(dispatcherEndpoints, /*forceRecreateStubs=*/ false); + consumeWindmillDispatcherEndpoints(dispatcherEndpoints, /* forceRecreateStubs= */ false); } private synchronized void consumeWindmillDispatcherEndpoints( diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java index db012c6bb412..714dd6ee2941 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs; import java.io.PrintWriter; +import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -31,6 +32,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalListener; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalListeners; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.MoreExecutors; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,14 +52,11 @@ public final class ChannelCache implements StatusDataProvider { private ChannelCache( Function channelFactory, - RemovalListener onChannelRemoved) { + RemovalListener onChannelRemoved, + Executor channelCloser) { this.channelCache = CacheBuilder.newBuilder() - .removalListener( - RemovalListeners.asynchronous( - onChannelRemoved, - Executors.newCachedThreadPool( - new ThreadFactoryBuilder().setNameFormat("GrpcChannelCloser").build()))) + .removalListener(RemovalListeners.asynchronous(onChannelRemoved, channelCloser)) .build( new CacheLoader() { @Override @@ -72,11 +71,13 @@ public static ChannelCache create( return new ChannelCache( channelFactory, // Shutdown the channels as they get removed from the cache, so they do not leak. - notification -> shutdownChannel(notification.getValue())); + notification -> shutdownChannel(notification.getValue()), + Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setNameFormat("GrpcChannelCloser").build())); } @VisibleForTesting - static ChannelCache forTesting( + public static ChannelCache forTesting( Function channelFactory, Runnable onChannelShutdown) { return new ChannelCache( channelFactory, @@ -85,7 +86,9 @@ static ChannelCache forTesting( notification -> { shutdownChannel(notification.getValue()); onChannelShutdown.run(); - }); + }, + // Run the removal on the calling thread for better determinism in tests. + MoreExecutors.directExecutor()); } private static void shutdownChannel(ManagedChannel channel) { @@ -108,6 +111,7 @@ public void remove(WindmillServiceAddress windmillServiceAddress) { public void clear() { channelCache.invalidateAll(); + channelCache.cleanUp(); } /** diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java index bba6cad5529a..606d2b9dbdbc 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java @@ -33,8 +33,6 @@ import java.util.HashSet; import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions; @@ -65,7 +63,6 @@ import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessSocketAddress; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.testing.GrpcCleanupRule; -import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.util.MutableHandlerRegistry; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableCollection; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; @@ -103,7 +100,6 @@ public class FanOutStreamingEngineWorkerHarnessTest { .build(); @Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); - private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry(); private final GrpcWindmillStreamFactory streamFactory = spy(GrpcWindmillStreamFactory.of(JOB_HEADER).build()); private final ChannelCachingStubFactory stubFactory = @@ -146,22 +142,21 @@ private static WorkerMetadataResponse.Endpoint metadataResponseEndpoint(String w @Before public void setUp() throws IOException { - stubFactory.shutdown(); + getWorkerMetadataReady = new CountDownLatch(1); + fakeGetWorkerMetadataStub = new GetWorkerMetadataTestStub(getWorkerMetadataReady); fakeStreamingEngineServer = - grpcCleanup.register( - InProcessServerBuilder.forName(CHANNEL_NAME) - .fallbackHandlerRegistry(serviceRegistry) - .executor(Executors.newFixedThreadPool(1)) - .build()); + grpcCleanup + .register( + InProcessServerBuilder.forName(CHANNEL_NAME) + .directExecutor() + .addService(fakeGetWorkerMetadataStub) + .addService(new WindmillServiceFakeStub()) + .build()) + .start(); - fakeStreamingEngineServer.start(); dispatcherClient.consumeWindmillDispatcherEndpoints( ImmutableSet.of( HostAndPort.fromString(new InProcessSocketAddress(CHANNEL_NAME).toString()))); - getWorkerMetadataReady = new CountDownLatch(1); - fakeGetWorkerMetadataStub = new GetWorkerMetadataTestStub(getWorkerMetadataReady); - serviceRegistry.addService(fakeGetWorkerMetadataStub); - serviceRegistry.addService(new WindmillServiceFakeStub()); } @After @@ -174,27 +169,29 @@ public void cleanUp() { private FanOutStreamingEngineWorkerHarness newFanOutStreamingEngineWorkerHarness( GetWorkBudget getWorkBudget, GetWorkBudgetDistributor getWorkBudgetDistributor, - WorkItemScheduler workItemScheduler) { - return FanOutStreamingEngineWorkerHarness.forTesting( - JOB_HEADER, - getWorkBudget, - streamFactory, - workItemScheduler, - stubFactory, - getWorkBudgetDistributor, - dispatcherClient, - ignored -> mock(WorkCommitter.class), - new ThrottlingGetDataMetricTracker(mock(MemoryMonitor.class))); + WorkItemScheduler workItemScheduler) + throws InterruptedException { + FanOutStreamingEngineWorkerHarness harness = + FanOutStreamingEngineWorkerHarness.forTesting( + JOB_HEADER, + getWorkBudget, + streamFactory, + workItemScheduler, + stubFactory, + getWorkBudgetDistributor, + dispatcherClient, + ignored -> mock(WorkCommitter.class), + new ThrottlingGetDataMetricTracker(mock(MemoryMonitor.class))); + getWorkerMetadataReady.await(); + return harness; } @Test public void testStreamsStartCorrectly() throws InterruptedException { long items = 10L; long bytes = 10L; - int numBudgetDistributionsExpected = 1; - TestGetWorkBudgetDistributor getWorkBudgetDistributor = - spy(new TestGetWorkBudgetDistributor(numBudgetDistributionsExpected)); + TestGetWorkBudgetDistributor getWorkBudgetDistributor = spy(new TestGetWorkBudgetDistributor()); fanOutStreamingEngineWorkProvider = newFanOutStreamingEngineWorkerHarness( @@ -205,17 +202,13 @@ public void testStreamsStartCorrectly() throws InterruptedException { String workerToken = "workerToken1"; String workerToken2 = "workerToken2"; - WorkerMetadataResponse firstWorkerMetadata = + fakeGetWorkerMetadataStub.injectWorkerMetadata( WorkerMetadataResponse.newBuilder() .setMetadataVersion(1) .addWorkEndpoints(metadataResponseEndpoint(workerToken)) .addWorkEndpoints(metadataResponseEndpoint(workerToken2)) .putAllGlobalDataEndpoints(DEFAULT) - .build(); - - getWorkerMetadataReady.await(); - fakeGetWorkerMetadataStub.injectWorkerMetadata(firstWorkerMetadata); - assertTrue(getWorkBudgetDistributor.waitForBudgetDistribution()); + .build()); StreamingEngineBackends currentBackends = fanOutStreamingEngineWorkProvider.currentBackends(); @@ -249,8 +242,7 @@ public void testStreamsStartCorrectly() throws InterruptedException { @Test public void testOnNewWorkerMetadata_correctlyRemovesStaleWindmillServers() throws InterruptedException { - TestGetWorkBudgetDistributor getWorkBudgetDistributor = - spy(new TestGetWorkBudgetDistributor(1)); + TestGetWorkBudgetDistributor getWorkBudgetDistributor = spy(new TestGetWorkBudgetDistributor()); fanOutStreamingEngineWorkProvider = newFanOutStreamingEngineWorkerHarness( GetWorkBudget.builder().setItems(1).setBytes(1).build(), @@ -283,12 +275,8 @@ public void testOnNewWorkerMetadata_correctlyRemovesStaleWindmillServers() .build()) .build(); - getWorkerMetadataReady.await(); fakeGetWorkerMetadataStub.injectWorkerMetadata(firstWorkerMetadata); - assertTrue(getWorkBudgetDistributor.waitForBudgetDistribution()); - getWorkBudgetDistributor.expectNumDistributions(1); fakeGetWorkerMetadataStub.injectWorkerMetadata(secondWorkerMetadata); - assertTrue(getWorkBudgetDistributor.waitForBudgetDistribution()); StreamingEngineBackends currentBackends = fanOutStreamingEngineWorkProvider.currentBackends(); assertEquals(1, currentBackends.windmillStreams().size()); Set workerTokens = @@ -325,21 +313,15 @@ public void testOnNewWorkerMetadata_redistributesBudget() throws InterruptedExce .putAllGlobalDataEndpoints(DEFAULT) .build(); - TestGetWorkBudgetDistributor getWorkBudgetDistributor = - spy(new TestGetWorkBudgetDistributor(1)); + TestGetWorkBudgetDistributor getWorkBudgetDistributor = spy(new TestGetWorkBudgetDistributor()); fanOutStreamingEngineWorkProvider = newFanOutStreamingEngineWorkerHarness( GetWorkBudget.builder().setItems(1).setBytes(1).build(), getWorkBudgetDistributor, noOpProcessWorkItemFn()); - getWorkerMetadataReady.await(); - fakeGetWorkerMetadataStub.injectWorkerMetadata(firstWorkerMetadata); - assertTrue(getWorkBudgetDistributor.waitForBudgetDistribution()); - getWorkBudgetDistributor.expectNumDistributions(1); fakeGetWorkerMetadataStub.injectWorkerMetadata(secondWorkerMetadata); - assertTrue(getWorkBudgetDistributor.waitForBudgetDistribution()); verify(getWorkBudgetDistributor, times(2)).distributeBudget(any(), any()); } @@ -354,10 +336,14 @@ public StreamObserver getDataStream( public void onNext(Windmill.StreamingGetDataRequest getDataRequest) {} @Override - public void onError(Throwable throwable) {} + public void onError(Throwable throwable) { + responseObserver.onError(throwable); + } @Override - public void onCompleted() {} + public void onCompleted() { + responseObserver.onCompleted(); + } }; } @@ -369,10 +355,14 @@ public StreamObserver getWorkStream( public void onNext(Windmill.StreamingGetWorkRequest getWorkRequest) {} @Override - public void onError(Throwable throwable) {} + public void onError(Throwable throwable) { + responseObserver.onError(throwable); + } @Override - public void onCompleted() {} + public void onCompleted() { + responseObserver.onCompleted(); + } }; } @@ -384,10 +374,14 @@ public StreamObserver commitWorkStream( public void onNext(Windmill.StreamingCommitWorkRequest streamingCommitWorkRequest) {} @Override - public void onError(Throwable throwable) {} + public void onError(Throwable throwable) { + responseObserver.onError(throwable); + } @Override - public void onCompleted() {} + public void onCompleted() { + responseObserver.onCompleted(); + } }; } } @@ -422,7 +416,11 @@ public void onError(Throwable throwable) { } @Override - public void onCompleted() {} + public void onCompleted() { + if (responseObserver != null) { + responseObserver.onCompleted(); + } + } }; } @@ -434,25 +432,10 @@ private void injectWorkerMetadata(WorkerMetadataResponse response) { } private static class TestGetWorkBudgetDistributor implements GetWorkBudgetDistributor { - private CountDownLatch getWorkBudgetDistributorTriggered; - - private TestGetWorkBudgetDistributor(int numBudgetDistributionsExpected) { - this.getWorkBudgetDistributorTriggered = new CountDownLatch(numBudgetDistributionsExpected); - } - - private boolean waitForBudgetDistribution() throws InterruptedException { - return getWorkBudgetDistributorTriggered.await(5, TimeUnit.SECONDS); - } - - private void expectNumDistributions(int numBudgetDistributionsExpected) { - this.getWorkBudgetDistributorTriggered = new CountDownLatch(numBudgetDistributionsExpected); - } - @Override public void distributeBudget( ImmutableCollection streams, GetWorkBudget getWorkBudget) { streams.forEach(stream -> stream.setBudget(getWorkBudget.items(), getWorkBudget.bytes())); - getWorkBudgetDistributorTriggered.countDown(); } } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCacheTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCacheTest.java index 9f8a901cb629..1781261e3400 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCacheTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCacheTest.java @@ -105,19 +105,10 @@ public ManagedChannel apply(WindmillServiceAddress windmillServiceAddress) { @Test public void testRemoveAndClose() throws InterruptedException { String channelName = "existingChannel"; - CountDownLatch verifyRemovalListenerAsync = new CountDownLatch(1); CountDownLatch notifyWhenChannelClosed = new CountDownLatch(1); cache = ChannelCache.forTesting( - ignored -> newChannel(channelName), - () -> { - try { - verifyRemovalListenerAsync.await(); - notifyWhenChannelClosed.countDown(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }); + ignored -> newChannel(channelName), notifyWhenChannelClosed::countDown); WindmillServiceAddress someAddress = mock(WindmillServiceAddress.class); ManagedChannel cachedChannel = cache.get(someAddress); @@ -125,7 +116,6 @@ public void testRemoveAndClose() throws InterruptedException { // Assert that the removal happened before we check to see if the shutdowns happen to confirm // that removals are async. assertTrue(cache.isEmpty()); - verifyRemovalListenerAsync.countDown(); // Assert that the channel gets shutdown. notifyWhenChannelClosed.await(); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactory.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactory.java index af3a3e8295bb..19e05efb50c6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactory.java @@ -31,7 +31,7 @@ public final class FakeWindmillStubFactory implements ChannelCachingStubFactory private final ChannelCache channelCache; public FakeWindmillStubFactory(Supplier channelFactory) { - this.channelCache = ChannelCache.create(ignored -> channelFactory.get()); + this.channelCache = ChannelCache.forTesting(ignored -> channelFactory.get(), () -> {}); } @Override