From 438b52d5214b5c5d7c89749c16db4add7d8ff690 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Tue, 15 Oct 2024 05:55:52 -0700 Subject: [PATCH 1/2] Remove experiments guarding isolated channels enablement based on job settings --- .../worker/StreamingDataflowWorker.java | 7 +------ .../client/grpc/GrpcDispatcherClient.java | 19 +++++-------------- 2 files changed, 6 insertions(+), 20 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index ecdba404151e..524906023722 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -140,8 +140,6 @@ public final class StreamingDataflowWorker { private static final int DEFAULT_STATUS_PORT = 8081; private static final Random CLIENT_ID_GENERATOR = new Random(); private static final String CHANNELZ_PATH = "/channelz"; - public static final String STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL = - "streaming_engine_use_job_settings_for_heartbeat_pool"; private final WindmillStateCache stateCache; private final StreamingWorkerStatusPages statusPages; @@ -249,10 +247,7 @@ private StreamingDataflowWorker( GET_DATA_STREAM_TIMEOUT, windmillServer::getDataStream); getDataClient = new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool); - // Experiment gates the logic till backend changes are rollback safe - if (!DataflowRunner.hasExperiment( - options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL) - || options.getUseSeparateWindmillHeartbeatStreams() != null) { + if (options.getUseSeparateWindmillHeartbeatStreams() != null) { heartbeatSender = StreamPoolHeartbeatSender.Create( Boolean.TRUE.equals(options.getUseSeparateWindmillHeartbeatStreams()) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java index f96464150d4a..6bae84483d16 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java @@ -30,7 +30,6 @@ import java.util.concurrent.atomic.AtomicReference; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; -import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc; @@ -53,8 +52,6 @@ public class GrpcDispatcherClient { private static final Logger LOG = LoggerFactory.getLogger(GrpcDispatcherClient.class); - static final String STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_ISOLATED_CHANNELS = - "streaming_engine_use_job_settings_for_isolated_channels"; private final CountDownLatch onInitializedEndpoints; /** @@ -80,18 +77,12 @@ private GrpcDispatcherClient( DispatcherStubs initialDispatcherStubs, Random rand) { this.windmillStubFactoryFactory = windmillStubFactoryFactory; - if (DataflowRunner.hasExperiment( - options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_ISOLATED_CHANNELS)) { - if (options.getUseWindmillIsolatedChannels() != null) { - this.useIsolatedChannels.set(options.getUseWindmillIsolatedChannels()); - this.reactToIsolatedChannelsJobSetting = false; - } else { - this.useIsolatedChannels.set(false); - this.reactToIsolatedChannelsJobSetting = true; - } - } else { - this.useIsolatedChannels.set(Boolean.TRUE.equals(options.getUseWindmillIsolatedChannels())); + if (options.getUseWindmillIsolatedChannels() != null) { + this.useIsolatedChannels.set(options.getUseWindmillIsolatedChannels()); this.reactToIsolatedChannelsJobSetting = false; + } else { + this.useIsolatedChannels.set(false); + this.reactToIsolatedChannelsJobSetting = true; } this.windmillStubFactory.set( windmillStubFactoryFactory.makeWindmillStubFactory(useIsolatedChannels.get())); From 598b92ee59f43b974a36cb9f14912615c1d07163 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Tue, 15 Oct 2024 06:14:40 -0700 Subject: [PATCH 2/2] fix tests --- .../client/grpc/GrpcDispatcherClientTest.java | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClientTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClientTest.java index 3f746d91a868..c04456906ea2 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClientTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClientTest.java @@ -34,7 +34,6 @@ import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactoryImpl; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; import org.hamcrest.Matcher; import org.junit.Test; @@ -55,9 +54,6 @@ public static class RespectsJobSettingTest { public void createsNewStubWhenIsolatedChannelsConfigIsChanged() { DataflowWorkerHarnessOptions options = PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class); - options.setExperiments( - Lists.newArrayList( - GrpcDispatcherClient.STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_ISOLATED_CHANNELS)); GrpcDispatcherClient dispatcherClient = GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options)); // Create first time with Isolated channels disabled @@ -91,27 +87,18 @@ public static class RespectsPipelineOptionsTest { public static Collection data() { List list = new ArrayList<>(); for (Boolean pipelineOption : new Boolean[] {true, false}) { - list.add(new Object[] {/*experimentEnabled=*/ false, pipelineOption}); - list.add(new Object[] {/*experimentEnabled=*/ true, pipelineOption}); + list.add(new Object[] {pipelineOption}); } return list; } @Parameter(0) - public Boolean experimentEnabled; - - @Parameter(1) public Boolean pipelineOption; @Test public void ignoresIsolatedChannelsConfigWithPipelineOption() { DataflowWorkerHarnessOptions options = PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class); - if (experimentEnabled) { - options.setExperiments( - Lists.newArrayList( - GrpcDispatcherClient.STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_ISOLATED_CHANNELS)); - } options.setUseWindmillIsolatedChannels(pipelineOption); GrpcDispatcherClient dispatcherClient = GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options));