diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java index d9c6c5f16764..15da9ea85202 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java @@ -158,6 +158,16 @@ public interface DataflowStreamingPipelineOptions extends PipelineOptions { void setWindmillHarnessUpdateReportingPeriod(Duration value); + @Description( + "Specifies how often system defined per-worker metrics are reported. These metrics are " + + " reported on the worker updates path so this number will be rounded up to the " + + " nearest multiple of WindmillHarnessUpdateReportingPeriod. If that value is 0, then " + + " these metrics are never sent.") + @Default.Integer(30000) + int getPerWorkerMetricsUpdateReportingPeriodMillis(); + + void setPerWorkerMetricsUpdateReportingPeriodMillis(int value); + @Description("Limit on depth of user exception stack trace reported to cloud monitoring.") @Default.InstanceFactory(MaxStackTraceDepthToReportFactory.class) int getMaxStackTraceDepthToReport(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 22d3b105c2b3..a40ab514176c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -753,7 +753,9 @@ public void start() { scheduledExecutors.add(statusPageTimer); } workCommitter.start(); - workerStatusReporter.start(options.getWindmillHarnessUpdateReportingPeriod().getMillis()); + workerStatusReporter.start( + options.getWindmillHarnessUpdateReportingPeriod().getMillis(), + options.getPerWorkerMetricsUpdateReportingPeriodMillis()); activeWorkRefresher.start(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java index 409f0337eebd..90cc9c411c44 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java @@ -26,6 +26,7 @@ import com.google.api.services.dataflow.model.WorkItemStatus; import com.google.api.services.dataflow.model.WorkerMessage; import java.io.IOException; +import java.math.RoundingMode; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; @@ -52,6 +53,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ListMultimap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.MultimapBuilder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.LongMath; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,6 +83,12 @@ public final class StreamingWorkerStatusReporter { private final ScheduledExecutorService globalWorkerUpdateReporter; private final ScheduledExecutorService workerMessageReporter; + // PerWorkerMetrics are sent on the WorkerMessages channel, and are sent one in every + // perWorkerMetricsUpdateFrequency RPC call. If 0, PerWorkerMetrics are not reported. + long perWorkerMetricsUpdateFrequency = 0L; + // Used to track the number of WorkerMessages that have been sent without PerWorkerMetrics. + long workerMessagesIndex = 0L; + private StreamingWorkerStatusReporter( boolean publishCounters, WorkUnitClient dataflowServiceClient, @@ -194,8 +202,12 @@ private static void shutdownExecutor(ScheduledExecutorService executor) { } @SuppressWarnings("FutureReturnValueIgnored") - public void start(long windmillHarnessUpdateReportingPeriodMillis) { + public void start( + long windmillHarnessUpdateReportingPeriodMillis, + long perWorkerMetricsUpdateReportingPeriodMillis) { reportHarnessStartup(); + setPerWorkerMetricsUpdateFrequency( + windmillHarnessUpdateReportingPeriodMillis, perWorkerMetricsUpdateReportingPeriodMillis); if (windmillHarnessUpdateReportingPeriodMillis > 0) { LOG.info( "Starting periodic worker status reporters. Reporting period is every {} millis.", @@ -222,6 +234,7 @@ public void stop() { shutdownExecutor(workerMessageReporter); // one last send reportPeriodicWorkerUpdates(); + this.workerMessagesIndex = this.perWorkerMetricsUpdateFrequency; reportPeriodicWorkerMessage(); } @@ -240,6 +253,24 @@ private void reportHarnessStartup() { } } + // Calculates the PerWorkerMetrics reporting frequency, ensuring alignment with the + // WorkerMessages RPC schedule. The desired reporting period + // (perWorkerMetricsUpdateReportingPeriodMillis) is adjusted to the nearest multiple + // of the RPC interval (windmillHarnessUpdateReportingPeriodMillis). + private void setPerWorkerMetricsUpdateFrequency( + long windmillHarnessUpdateReportingPeriodMillis, + long perWorkerMetricsUpdateReportingPeriodMillis) { + if (windmillHarnessUpdateReportingPeriodMillis == 0) { + this.perWorkerMetricsUpdateFrequency = 0; + return; + } + this.perWorkerMetricsUpdateFrequency = + LongMath.divide( + perWorkerMetricsUpdateReportingPeriodMillis, + windmillHarnessUpdateReportingPeriodMillis, + RoundingMode.CEILING); + } + /** Sends counter updates to Dataflow backend. */ private void sendWorkerUpdatesToDataflowService( CounterSet deltaCounters, CounterSet cumulativeCounters) throws IOException { @@ -313,11 +344,7 @@ private List createWorkerMessage() { List workerMessages = new ArrayList<>(2); workerMessages.add(createWorkerMessageForStreamingScalingReport()); - if (StreamingStepMetricsContainer.getEnablePerWorkerMetrics()) { - Optional metricsMsg = createWorkerMessageForPerWorkerMetrics(); - metricsMsg.ifPresent(workerMessages::add); - } - + createWorkerMessageForPerWorkerMetrics().ifPresent(metrics -> workerMessages.add(metrics)); return workerMessages; } @@ -334,6 +361,18 @@ private WorkerMessage createWorkerMessageForStreamingScalingReport() { } private Optional createWorkerMessageForPerWorkerMetrics() { + if (!StreamingStepMetricsContainer.getEnablePerWorkerMetrics() + || perWorkerMetricsUpdateFrequency == 0) { + return Optional.empty(); + } + + workerMessagesIndex += 1; + if (workerMessagesIndex < perWorkerMetricsUpdateFrequency) { + return Optional.empty(); + } else { + workerMessagesIndex = 0; + } + List metrics = new ArrayList<>(); allStageInfo.get().forEach(s -> metrics.addAll(s.extractPerWorkerMetricValues()));