Skip to content

Commit

Permalink
Remove experiments guarding isolated channels enablement based on job…
Browse files Browse the repository at this point in the history
… settings
  • Loading branch information
arunpandianp committed Oct 15, 2024
1 parent 80cba56 commit 438b52d
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,6 @@ public final class StreamingDataflowWorker {
private static final int DEFAULT_STATUS_PORT = 8081;
private static final Random CLIENT_ID_GENERATOR = new Random();
private static final String CHANNELZ_PATH = "/channelz";
public static final String STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL =
"streaming_engine_use_job_settings_for_heartbeat_pool";

private final WindmillStateCache stateCache;
private final StreamingWorkerStatusPages statusPages;
Expand Down Expand Up @@ -249,10 +247,7 @@ private StreamingDataflowWorker(
GET_DATA_STREAM_TIMEOUT,
windmillServer::getDataStream);
getDataClient = new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool);
// Experiment gates the logic till backend changes are rollback safe
if (!DataflowRunner.hasExperiment(
options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL)
|| options.getUseSeparateWindmillHeartbeatStreams() != null) {
if (options.getUseSeparateWindmillHeartbeatStreams() != null) {
heartbeatSender =
StreamPoolHeartbeatSender.Create(
Boolean.TRUE.equals(options.getUseSeparateWindmillHeartbeatStreams())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc;
Expand All @@ -53,8 +52,6 @@
public class GrpcDispatcherClient {

private static final Logger LOG = LoggerFactory.getLogger(GrpcDispatcherClient.class);
static final String STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_ISOLATED_CHANNELS =
"streaming_engine_use_job_settings_for_isolated_channels";
private final CountDownLatch onInitializedEndpoints;

/**
Expand All @@ -80,18 +77,12 @@ private GrpcDispatcherClient(
DispatcherStubs initialDispatcherStubs,
Random rand) {
this.windmillStubFactoryFactory = windmillStubFactoryFactory;
if (DataflowRunner.hasExperiment(
options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_ISOLATED_CHANNELS)) {
if (options.getUseWindmillIsolatedChannels() != null) {
this.useIsolatedChannels.set(options.getUseWindmillIsolatedChannels());
this.reactToIsolatedChannelsJobSetting = false;
} else {
this.useIsolatedChannels.set(false);
this.reactToIsolatedChannelsJobSetting = true;
}
} else {
this.useIsolatedChannels.set(Boolean.TRUE.equals(options.getUseWindmillIsolatedChannels()));
if (options.getUseWindmillIsolatedChannels() != null) {
this.useIsolatedChannels.set(options.getUseWindmillIsolatedChannels());
this.reactToIsolatedChannelsJobSetting = false;
} else {
this.useIsolatedChannels.set(false);
this.reactToIsolatedChannelsJobSetting = true;
}
this.windmillStubFactory.set(
windmillStubFactoryFactory.makeWindmillStubFactory(useIsolatedChannels.get()));
Expand Down

0 comments on commit 438b52d

Please sign in to comment.