diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java index 225b3d045d20..a761d38de1ab 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java @@ -158,6 +158,16 @@ public interface DataflowStreamingPipelineOptions extends PipelineOptions { void setWindmillHarnessUpdateReportingPeriod(Duration value); + @Description( + "Specifies how often system defined per-worker metrics are reported. These metrics are " + + " reported on the worker updates path so this number will be rounded up to the " + + " nearest multiple of WindmillHarnessUpdateReportingPeriod. If that value is 0, then " + + " these metrics are never sent.") + @Default.Integer(30000) + int getPerWorkerMetricsUpdateReportingPeriodMillis(); + + void setPerWorkerMetricsUpdateReportingPeriodMillis(int value); + @Description("Limit on depth of user exception stack trace reported to cloud monitoring.") @Default.InstanceFactory(MaxStackTraceDepthToReportFactory.class) int getMaxStackTraceDepthToReport(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 93d795478a3d..9f822f80a3bd 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -431,8 +431,9 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o failureTracker, streamingCounters, memoryMonitor, - workExecutor); - + workExecutor, + options.getWindmillHarnessUpdateReportingPeriod().getMillis(), + options.getPerWorkerMetricsUpdateReportingPeriodMillis()); return new StreamingDataflowWorker( windmillServer, clientId, @@ -500,7 +501,9 @@ static StreamingDataflowWorker forTesting( streamingCounters, memoryMonitor, workExecutor, - executorSupplier); + executorSupplier, + options.getWindmillHarnessUpdateReportingPeriod().getMillis(), + options.getPerWorkerMetricsUpdateReportingPeriodMillis()); return new StreamingDataflowWorker( windmillServer, 1L, @@ -760,7 +763,7 @@ public void start() { scheduledExecutors.add(statusPageTimer); } workCommitter.start(); - workerStatusReporter.start(options.getWindmillHarnessUpdateReportingPeriod().getMillis()); + workerStatusReporter.start(); activeWorkRefresher.start(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java index 8e950546ae68..ba77d8e1ce26 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java @@ -28,6 +28,7 @@ import com.google.api.services.dataflow.model.WorkerMessage; import com.google.api.services.dataflow.model.WorkerMessageResponse; import java.io.IOException; +import java.math.RoundingMode; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; @@ -55,6 +56,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ListMultimap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.MultimapBuilder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.LongMath; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,6 +89,14 @@ public final class StreamingWorkerStatusReporter { private final ScheduledExecutorService globalWorkerUpdateReporter; private final ScheduledExecutorService workerMessageReporter; + // Reporting period for periodic status updates. + private final long windmillHarnessUpdateReportingPeriodMillis; + // PerWorkerMetrics are sent on the WorkerMessages channel, and are sent one in every + // perWorkerMetricsUpdateFrequency RPC call. If 0, PerWorkerMetrics are not reported. + private final long perWorkerMetricsUpdateFrequency; + // Used to track the number of WorkerMessages that have been sent without PerWorkerMetrics. + private final AtomicLong workerMessagesIndex; + private StreamingWorkerStatusReporter( boolean publishCounters, WorkUnitClient dataflowServiceClient, @@ -96,7 +106,9 @@ private StreamingWorkerStatusReporter( StreamingCounters streamingCounters, MemoryMonitor memoryMonitor, BoundedQueueExecutor workExecutor, - Function executorFactory) { + Function executorFactory, + long windmillHarnessUpdateReportingPeriodMillis, + long perWorkerMetricsUpdateReportingPeriodMillis) { this.publishCounters = publishCounters; this.dataflowServiceClient = dataflowServiceClient; this.windmillQuotaThrottleTime = windmillQuotaThrottleTime; @@ -111,6 +123,12 @@ private StreamingWorkerStatusReporter( this.maxThreadCountOverride = new AtomicInteger(); this.globalWorkerUpdateReporter = executorFactory.apply(GLOBAL_WORKER_UPDATE_REPORTER_THREAD); this.workerMessageReporter = executorFactory.apply(WORKER_MESSAGE_REPORTER_THREAD); + this.windmillHarnessUpdateReportingPeriodMillis = windmillHarnessUpdateReportingPeriodMillis; + this.perWorkerMetricsUpdateFrequency = + getPerWorkerMetricsUpdateFrequency( + windmillHarnessUpdateReportingPeriodMillis, + perWorkerMetricsUpdateReportingPeriodMillis); + this.workerMessagesIndex = new AtomicLong(); } public static StreamingWorkerStatusReporter create( @@ -120,7 +138,9 @@ public static StreamingWorkerStatusReporter create( FailureTracker failureTracker, StreamingCounters streamingCounters, MemoryMonitor memoryMonitor, - BoundedQueueExecutor workExecutor) { + BoundedQueueExecutor workExecutor, + long windmillHarnessUpdateReportingPeriodMillis, + long perWorkerMetricsUpdateReportingPeriodMillis) { return new StreamingWorkerStatusReporter( /* publishCounters= */ true, workUnitClient, @@ -132,7 +152,9 @@ public static StreamingWorkerStatusReporter create( workExecutor, threadName -> Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat(threadName).build())); + new ThreadFactoryBuilder().setNameFormat(threadName).build()), + windmillHarnessUpdateReportingPeriodMillis, + perWorkerMetricsUpdateReportingPeriodMillis); } @VisibleForTesting @@ -145,7 +167,9 @@ public static StreamingWorkerStatusReporter forTesting( StreamingCounters streamingCounters, MemoryMonitor memoryMonitor, BoundedQueueExecutor workExecutor, - Function executorFactory) { + Function executorFactory, + long windmillHarnessUpdateReportingPeriodMillis, + long perWorkerMetricsUpdateReportingPeriodMillis) { return new StreamingWorkerStatusReporter( publishCounters, workUnitClient, @@ -155,7 +179,9 @@ public static StreamingWorkerStatusReporter forTesting( streamingCounters, memoryMonitor, workExecutor, - executorFactory); + executorFactory, + windmillHarnessUpdateReportingPeriodMillis, + perWorkerMetricsUpdateReportingPeriodMillis); } /** @@ -203,7 +229,7 @@ private static void shutdownExecutor(ScheduledExecutorService executor) { } @SuppressWarnings("FutureReturnValueIgnored") - public void start(long windmillHarnessUpdateReportingPeriodMillis) { + public void start() { reportHarnessStartup(); if (windmillHarnessUpdateReportingPeriodMillis > 0) { LOG.info( @@ -231,6 +257,7 @@ public void stop() { shutdownExecutor(workerMessageReporter); // one last send reportPeriodicWorkerUpdates(); + this.workerMessagesIndex.set(this.perWorkerMetricsUpdateFrequency); reportPeriodicWorkerMessage(); } @@ -249,6 +276,22 @@ private void reportHarnessStartup() { } } + // Calculates the PerWorkerMetrics reporting frequency, ensuring alignment with the + // WorkerMessages RPC schedule. The desired reporting period + // (perWorkerMetricsUpdateReportingPeriodMillis) is adjusted to the nearest multiple + // of the RPC interval (windmillHarnessUpdateReportingPeriodMillis). + private static long getPerWorkerMetricsUpdateFrequency( + long windmillHarnessUpdateReportingPeriodMillis, + long perWorkerMetricsUpdateReportingPeriodMillis) { + if (windmillHarnessUpdateReportingPeriodMillis == 0) { + return 0; + } + return LongMath.divide( + perWorkerMetricsUpdateReportingPeriodMillis, + windmillHarnessUpdateReportingPeriodMillis, + RoundingMode.CEILING); + } + /** Sends counter updates to Dataflow backend. */ private void sendWorkerUpdatesToDataflowService( CounterSet deltaCounters, CounterSet cumulativeCounters) throws IOException { @@ -325,11 +368,7 @@ private List createWorkerMessage() { List workerMessages = new ArrayList<>(2); workerMessages.add(createWorkerMessageForStreamingScalingReport()); - if (StreamingStepMetricsContainer.getEnablePerWorkerMetrics()) { - Optional metricsMsg = createWorkerMessageForPerWorkerMetrics(); - metricsMsg.ifPresent(workerMessages::add); - } - + createWorkerMessageForPerWorkerMetrics().ifPresent(metrics -> workerMessages.add(metrics)); return workerMessages; } @@ -346,6 +385,17 @@ private WorkerMessage createWorkerMessageForStreamingScalingReport() { } private Optional createWorkerMessageForPerWorkerMetrics() { + if (!StreamingStepMetricsContainer.getEnablePerWorkerMetrics() + || perWorkerMetricsUpdateFrequency == 0) { + return Optional.empty(); + } + + if (workerMessagesIndex.incrementAndGet() < perWorkerMetricsUpdateFrequency) { + return Optional.empty(); + } else { + workerMessagesIndex.set(0L); + } + List metrics = new ArrayList<>(); allStageInfo.get().forEach(s -> metrics.addAll(s.extractPerWorkerMetricValues())); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporterTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporterTest.java index bdf0f0031d69..7e65a495638f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporterTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporterTest.java @@ -40,6 +40,8 @@ @RunWith(JUnit4.class) public class StreamingWorkerStatusReporterTest { private final long DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME = 1000; + private final long DEFAULT_HARNESS_REPORTING_PERIOD = 10000; + private final long DEFAULT_PER_WORKER_METRICS_PERIOD = 30000; private BoundedQueueExecutor mockExecutor; private WorkUnitClient mockWorkUnitClient; @@ -66,7 +68,9 @@ public void testOverrideMaximumThreadCount() throws Exception { StreamingCounters.create(), mockMemoryMonitor, mockExecutor, - (threadName) -> Executors.newSingleThreadScheduledExecutor()); + (threadName) -> Executors.newSingleThreadScheduledExecutor(), + DEFAULT_HARNESS_REPORTING_PERIOD, + DEFAULT_PER_WORKER_METRICS_PERIOD); StreamingScalingReportResponse streamingScalingReportResponse = new StreamingScalingReportResponse().setMaximumThreadCount(10); WorkerMessageResponse workerMessageResponse = @@ -90,7 +94,9 @@ public void testHandleEmptyWorkerMessageResponse() throws Exception { StreamingCounters.create(), mockMemoryMonitor, mockExecutor, - (threadName) -> Executors.newSingleThreadScheduledExecutor()); + (threadName) -> Executors.newSingleThreadScheduledExecutor(), + DEFAULT_HARNESS_REPORTING_PERIOD, + DEFAULT_PER_WORKER_METRICS_PERIOD); WorkerMessageResponse workerMessageResponse = new WorkerMessageResponse(); when(mockWorkUnitClient.reportWorkerMessage(any())) .thenReturn(Collections.singletonList(workerMessageResponse));