From 92dc8ff6f3b87e2bb3ffedf9a26ead174e912243 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Fri, 20 Sep 2024 02:45:47 -0700 Subject: [PATCH] review comments --- .../worker/StreamingDataflowWorker.java | 25 ++++++++----------- .../refresh/StreamPoolHeartbeatSender.java | 9 ++++--- 2 files changed, 15 insertions(+), 19 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 81868729d851..8b440c306f0e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -256,26 +256,21 @@ private StreamingDataflowWorker( windmillServer::getDataStream); getDataClient = new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool); // Experiment gates the logic till backend changes are rollback safe - if (DataflowRunner.hasExperiment( - options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL)) { - heartbeatSender = - // If the setting is explicitly passed in via PipelineOptions use it, - // else rely on the global config - options.getUseSeparateWindmillHeartbeatStreams() != null - ? StreamPoolHeartbeatSender.Create( - options.getUseSeparateWindmillHeartbeatStreams() - ? separateHeartbeatPool(windmillServer) - : getDataStreamPool) - : StreamPoolHeartbeatSender.Create( - separateHeartbeatPool(windmillServer), - getDataStreamPool, - configFetcher.getGlobalConfigHandle()); - } else { + if (!DataflowRunner.hasExperiment( + options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL) + || options.getUseSeparateWindmillHeartbeatStreams() != null) { heartbeatSender = StreamPoolHeartbeatSender.Create( Boolean.TRUE.equals(options.getUseSeparateWindmillHeartbeatStreams()) ? separateHeartbeatPool(windmillServer) : getDataStreamPool); + + } else { + heartbeatSender = + StreamPoolHeartbeatSender.Create( + separateHeartbeatPool(windmillServer), + getDataStreamPool, + configFetcher.getGlobalConfigHandle()); } stuckCommitDurationMillis = diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java index b632db3c6d47..fa36b11ffe55 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java @@ -51,23 +51,24 @@ public static StreamPoolHeartbeatSender Create( * Creates StreamPoolHeartbeatSender that switches between the passed in stream pools depending on * global config. * - * @param heartbeatStreamPool stream to use when using separate streams for heartbeat is enabled. + * @param dedicatedHeartbeatPool stream to use when using separate streams for heartbeat is + * enabled. * @param getDataPool stream to use when using separate streams for heartbeat is disabled. */ public static StreamPoolHeartbeatSender Create( - @Nonnull WindmillStreamPool heartbeatStreamPool, + @Nonnull WindmillStreamPool dedicatedHeartbeatPool, @Nonnull WindmillStreamPool getDataPool, @Nonnull StreamingGlobalConfigHandle configHandle) { // Use getDataPool as the default, settings callback will // switch to the separate pool if enabled before processing any elements are processed. - StreamPoolHeartbeatSender heartbeatSender = new StreamPoolHeartbeatSender(heartbeatStreamPool); + StreamPoolHeartbeatSender heartbeatSender = new StreamPoolHeartbeatSender(getDataPool); configHandle.registerConfigObserver( streamingGlobalConfig -> heartbeatSender.heartbeatStreamPool.set( streamingGlobalConfig .userWorkerJobSettings() .getUseSeparateWindmillHeartbeatStreams() - ? heartbeatStreamPool + ? dedicatedHeartbeatPool : getDataPool)); return heartbeatSender; }