From b552e0fd026113b2270b1cafc2e39b2978fc009d Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 22 Aug 2024 20:47:01 +0100 Subject: [PATCH] Check experiment for hotkey logging (#32285) (#32290) * Check experiment for hotkey logging * Spotless --- .../beam/runners/dataflow/DataflowPipelineTranslator.java | 2 +- .../runners/dataflow/worker/DataflowWorkProgressUpdater.java | 3 ++- .../windmill/work/processing/StreamingWorkScheduler.java | 5 ++++- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 1fedcd8f3a29..c01096716c97 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -423,7 +423,7 @@ public Job translate(List packages) { if (options.getDataflowKmsKey() != null) { environment.setServiceKmsKeyName(options.getDataflowKmsKey()); } - if (options.isHotKeyLoggingEnabled()) { + if (options.isHotKeyLoggingEnabled() || hasExperiment(options, "enable_hot_key_logging")) { DebugOptions debugOptions = new DebugOptions(); debugOptions.setEnableHotKeyLogging(true); environment.setDebugOptions(debugOptions); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdater.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdater.java index f901b01ed566..b94759c239a3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdater.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdater.java @@ -19,6 +19,7 @@ import static org.apache.beam.runners.dataflow.util.TimeUtil.fromCloudDuration; import static org.apache.beam.runners.dataflow.util.TimeUtil.fromCloudTime; +import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment; import com.google.api.client.util.Clock; import com.google.api.services.dataflow.model.ApproximateSplitRequest; @@ -132,7 +133,7 @@ protected void reportProgressHelper() throws Exception { // The key set the in BatchModeExecutionContext is only set in the GroupingShuffleReader // which is the correct key. The key is also translated into a Java object in the reader. - if (options.isHotKeyLoggingEnabled()) { + if (options.isHotKeyLoggingEnabled() || hasExperiment(options, "enable_hot_key_logging")) { hotKeyLogger.logHotKeyDetection( hotKeyDetection.getUserStepName(), TimeUtil.fromCloudDuration(hotKeyDetection.getHotKeyAge()), diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java index b0b6377dd8b1..86f2cffe604c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.dataflow.worker.windmill.work.processing; +import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment; + import com.google.api.services.dataflow.model.MapTask; import com.google.auto.value.AutoValue; import java.util.Collection; @@ -368,7 +370,8 @@ private ExecuteWorkResult executeWork( Duration hotKeyAge = Duration.millis(hotKeyInfo.getHotKeyAgeUsec() / 1000); String stepName = getShuffleTaskStepName(computationState.getMapTask()); - if (options.isHotKeyLoggingEnabled() && keyCoder.isPresent()) { + if ((options.isHotKeyLoggingEnabled() || hasExperiment(options, "enable_hot_key_logging")) + && keyCoder.isPresent()) { hotKeyLogger.logHotKeyDetection(stepName, hotKeyAge, executionKey); } else { hotKeyLogger.logHotKeyDetection(stepName, hotKeyAge);