Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use direct executor to deflake tests #33187

Merged
merged 2 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Streams;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.MoreExecutors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -125,7 +126,8 @@ private FanOutStreamingEngineWorkerHarness(
GetWorkBudgetDistributor getWorkBudgetDistributor,
GrpcDispatcherClient dispatcherClient,
Function<WindmillStream.CommitWorkStream, WorkCommitter> workCommitterFactory,
ThrottlingGetDataMetricTracker getDataMetricTracker) {
ThrottlingGetDataMetricTracker getDataMetricTracker,
ExecutorService workerMetadataConsumer) {
this.jobHeader = jobHeader;
this.getDataMetricTracker = getDataMetricTracker;
this.started = false;
Expand All @@ -138,9 +140,7 @@ private FanOutStreamingEngineWorkerHarness(
this.windmillStreamManager =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat(STREAM_MANAGER_THREAD_NAME).build());
this.workerMetadataConsumer =
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat(WORKER_METADATA_CONSUMER_THREAD_NAME).build());
this.workerMetadataConsumer = workerMetadataConsumer;
this.getWorkBudgetDistributor = getWorkBudgetDistributor;
this.totalGetWorkBudget = totalGetWorkBudget;
this.activeMetadataVersion = Long.MIN_VALUE;
Expand Down Expand Up @@ -171,7 +171,11 @@ public static FanOutStreamingEngineWorkerHarness create(
getWorkBudgetDistributor,
dispatcherClient,
workCommitterFactory,
getDataMetricTracker);
getDataMetricTracker,
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setNameFormat(WORKER_METADATA_CONSUMER_THREAD_NAME)
.build()));
}

@VisibleForTesting
Expand All @@ -195,7 +199,10 @@ static FanOutStreamingEngineWorkerHarness forTesting(
getWorkBudgetDistributor,
dispatcherClient,
workCommitterFactory,
getDataMetricTracker);
getDataMetricTracker,
// Run the workerMetadataConsumer on the direct calling thread to make testing more
// deterministic.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"to make testing more deterministic" gives an impression that the change just fix tests, however the test code path then diverts from the real one.

Please provide more information in this comment why the race observed in the test does not affect production, for future reference.

If this indeed could happen in production then we should fix the code.

Copy link
Contributor Author

@m-trieu m-trieu Nov 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In prod, this is to hand off the task from a thread that (may) perform network IO and we do not want the task to block that since it acquires a lock to do its work. Not needed in testing and can logically be called in line

Added comment.

MoreExecutors.newDirectExecutorService());
fanOutStreamingEngineWorkProvider.start();
return fanOutStreamingEngineWorkProvider;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,23 @@ private static Optional<WindmillServiceAddress> parseDirectEndpoint(
.map(address -> AuthenticatedGcpServiceAddress.create(authenticatingService, address))
.map(WindmillServiceAddress::create);

return directEndpointIpV6Address.isPresent()
? directEndpointIpV6Address
: tryParseEndpointIntoHostAndPort(endpointProto.getDirectEndpoint())
.map(WindmillServiceAddress::create);
Optional<WindmillServiceAddress> windmillServiceAddress =
directEndpointIpV6Address.isPresent()
? directEndpointIpV6Address
: tryParseEndpointIntoHostAndPort(endpointProto.getDirectEndpoint())
.map(WindmillServiceAddress::create);

if (!windmillServiceAddress.isPresent()) {
LOG.warn("Endpoint {} could not be parsed into a WindmillServiceAddress.", endpointProto);
}

return windmillServiceAddress;
}

private static Optional<HostAndPort> tryParseEndpointIntoHostAndPort(String directEndpoint) {
try {
return Optional.of(HostAndPort.fromString(directEndpoint));
} catch (IllegalArgumentException e) {
LOG.warn("{} cannot be parsed into a gcpServiceAddress", directEndpoint);
return Optional.empty();
}
}
Expand All @@ -113,19 +119,12 @@ private static Optional<HostAndPort> tryParseDirectEndpointIntoIpV6Address(
try {
directEndpointAddress = Inet6Address.getByName(endpointProto.getDirectEndpoint());
} catch (UnknownHostException e) {
LOG.warn(
"Error occurred trying to parse direct_endpoint={} into IPv6 address. Exception={}",
endpointProto.getDirectEndpoint(),
e.toString());
return Optional.empty();
}

// Inet6Address.getByAddress returns either an IPv4 or an IPv6 address depending on the format
// of the direct_endpoint string.
if (!(directEndpointAddress instanceof Inet6Address)) {
LOG.warn(
"{} is not an IPv6 address. Direct endpoints are expected to be in IPv6 format.",
endpointProto.getDirectEndpoint());
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ public CloudWindmillMetadataServiceV1Alpha1Stub getWindmillMetadataServiceStubBl
}
}

LOG.info("Windmill Service endpoint initialized after {} seconds.", secondsWaited);

ImmutableList<CloudWindmillMetadataServiceV1Alpha1Stub> windmillMetadataServiceStubs =
dispatcherStubs.get().windmillMetadataServiceStubs();

Expand Down Expand Up @@ -190,7 +192,7 @@ public void onJobConfig(StreamingGlobalConfig config) {

public synchronized void consumeWindmillDispatcherEndpoints(
ImmutableSet<HostAndPort> dispatcherEndpoints) {
consumeWindmillDispatcherEndpoints(dispatcherEndpoints, /*forceRecreateStubs=*/ false);
consumeWindmillDispatcherEndpoints(dispatcherEndpoints, /* forceRecreateStubs= */ false);
}

private synchronized void consumeWindmillDispatcherEndpoints(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;

import java.io.PrintWriter;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
Expand All @@ -31,6 +32,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalListener;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalListeners;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.MoreExecutors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -50,14 +52,11 @@ public final class ChannelCache implements StatusDataProvider {

private ChannelCache(
Function<WindmillServiceAddress, ManagedChannel> channelFactory,
RemovalListener<WindmillServiceAddress, ManagedChannel> onChannelRemoved) {
RemovalListener<WindmillServiceAddress, ManagedChannel> onChannelRemoved,
Executor channelCloser) {
this.channelCache =
CacheBuilder.newBuilder()
.removalListener(
RemovalListeners.asynchronous(
onChannelRemoved,
Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("GrpcChannelCloser").build())))
.removalListener(RemovalListeners.asynchronous(onChannelRemoved, channelCloser))
.build(
new CacheLoader<WindmillServiceAddress, ManagedChannel>() {
@Override
Expand All @@ -72,11 +71,13 @@ public static ChannelCache create(
return new ChannelCache(
channelFactory,
// Shutdown the channels as they get removed from the cache, so they do not leak.
notification -> shutdownChannel(notification.getValue()));
notification -> shutdownChannel(notification.getValue()),
Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("GrpcChannelCloser").build()));
}

@VisibleForTesting
static ChannelCache forTesting(
public static ChannelCache forTesting(
Function<WindmillServiceAddress, ManagedChannel> channelFactory, Runnable onChannelShutdown) {
return new ChannelCache(
channelFactory,
Expand All @@ -85,7 +86,9 @@ static ChannelCache forTesting(
notification -> {
shutdownChannel(notification.getValue());
onChannelShutdown.run();
});
},
// Run the removal on the calling thread for better determinism in tests.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added, this doesn't change any behavior we just want the removal to run synchronously so we don't have to rely on waiting in tests

MoreExecutors.directExecutor());
}

private static void shutdownChannel(ManagedChannel channel) {
Expand All @@ -108,6 +111,7 @@ public void remove(WindmillServiceAddress windmillServiceAddress) {

public void clear() {
channelCache.invalidateAll();
channelCache.cleanUp();
}

/**
Expand Down
Loading
Loading