Skip to content

Commit

Permalink
Upload PerWorkerMetrics every 30 second instead of every 10 seconds
Browse files Browse the repository at this point in the history
  • Loading branch information
JayajP committed Mar 28, 2024
1 parent e894d8c commit 0dc2c4b
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,16 @@ public interface DataflowStreamingPipelineOptions extends PipelineOptions {

void setWindmillHarnessUpdateReportingPeriod(Duration value);

@Description(
"Specifies how often system defined per-worker metrics are reported. These metrics are "
+ " reported on the worker updates path so this number will be rounded up to the "
+ " nearest multiple of WindmillHarnessUpdateReportingPeriod. If that value is 0, then "
+ " these metrics are never sent.")
@Default.Integer(30000)
int getPerWorkerMetricsUpdateReportingPeriodMillis();

void setPerWorkerMetricsUpdateReportingPeriodMillis(int value);

@Description("Limit on depth of user exception stack trace reported to cloud monitoring.")
@Default.InstanceFactory(MaxStackTraceDepthToReportFactory.class)
int getMaxStackTraceDepthToReport();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,9 @@ public void start() {
scheduledExecutors.add(statusPageTimer);
}
workCommitter.start();
workerStatusReporter.start(options.getWindmillHarnessUpdateReportingPeriod().getMillis());
workerStatusReporter.start(
options.getWindmillHarnessUpdateReportingPeriod().getMillis(),
options.getPerWorkerMetricsUpdateReportingPeriodMillis());
activeWorkRefresher.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.api.services.dataflow.model.WorkItemStatus;
import com.google.api.services.dataflow.model.WorkerMessage;
import java.io.IOException;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
Expand All @@ -52,6 +53,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ListMultimap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.MultimapBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.LongMath;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -81,6 +83,12 @@ public final class StreamingWorkerStatusReporter {
private final ScheduledExecutorService globalWorkerUpdateReporter;
private final ScheduledExecutorService workerMessageReporter;

// PerWorkerMetrics are sent on the WorkerMessages channel, and are sent one in every
// perWorkerMetricsUpdateFrequency RPC call. If 0, PerWorkerMetrics are not reported.
long perWorkerMetricsUpdateFrequency = 0L;
// Used to track the number of WorkerMessages that have been sent without PerWorkerMetrics.
long workerMessagesIndex = 0L;

private StreamingWorkerStatusReporter(
boolean publishCounters,
WorkUnitClient dataflowServiceClient,
Expand Down Expand Up @@ -194,8 +202,12 @@ private static void shutdownExecutor(ScheduledExecutorService executor) {
}

@SuppressWarnings("FutureReturnValueIgnored")
public void start(long windmillHarnessUpdateReportingPeriodMillis) {
public void start(
long windmillHarnessUpdateReportingPeriodMillis,
long perWorkerMetricsUpdateReportingPeriodMillis) {
reportHarnessStartup();
setPerWorkerMetricsUpdateFrequency(
windmillHarnessUpdateReportingPeriodMillis, perWorkerMetricsUpdateReportingPeriodMillis);
if (windmillHarnessUpdateReportingPeriodMillis > 0) {
LOG.info(
"Starting periodic worker status reporters. Reporting period is every {} millis.",
Expand All @@ -222,6 +234,7 @@ public void stop() {
shutdownExecutor(workerMessageReporter);
// one last send
reportPeriodicWorkerUpdates();
this.workerMessagesIndex = this.perWorkerMetricsUpdateFrequency;
reportPeriodicWorkerMessage();
}

Expand All @@ -240,6 +253,24 @@ private void reportHarnessStartup() {
}
}

// Calculates the PerWorkerMetrics reporting frequency, ensuring alignment with the
// WorkerMessages RPC schedule. The desired reporting period
// (perWorkerMetricsUpdateReportingPeriodMillis) is adjusted to the nearest multiple
// of the RPC interval (windmillHarnessUpdateReportingPeriodMillis).
private void setPerWorkerMetricsUpdateFrequency(
long windmillHarnessUpdateReportingPeriodMillis,
long perWorkerMetricsUpdateReportingPeriodMillis) {
if (windmillHarnessUpdateReportingPeriodMillis == 0) {
this.perWorkerMetricsUpdateFrequency = 0;
return;
}
this.perWorkerMetricsUpdateFrequency =
LongMath.divide(
perWorkerMetricsUpdateReportingPeriodMillis,
windmillHarnessUpdateReportingPeriodMillis,
RoundingMode.CEILING);
}

/** Sends counter updates to Dataflow backend. */
private void sendWorkerUpdatesToDataflowService(
CounterSet deltaCounters, CounterSet cumulativeCounters) throws IOException {
Expand Down Expand Up @@ -313,11 +344,7 @@ private List<WorkerMessage> createWorkerMessage() {
List<WorkerMessage> workerMessages = new ArrayList<>(2);
workerMessages.add(createWorkerMessageForStreamingScalingReport());

if (StreamingStepMetricsContainer.getEnablePerWorkerMetrics()) {
Optional<WorkerMessage> metricsMsg = createWorkerMessageForPerWorkerMetrics();
metricsMsg.ifPresent(workerMessages::add);
}

createWorkerMessageForPerWorkerMetrics().ifPresent(metrics -> workerMessages.add(metrics));
return workerMessages;
}

Expand All @@ -334,6 +361,18 @@ private WorkerMessage createWorkerMessageForStreamingScalingReport() {
}

private Optional<WorkerMessage> createWorkerMessageForPerWorkerMetrics() {
if (!StreamingStepMetricsContainer.getEnablePerWorkerMetrics()
|| perWorkerMetricsUpdateFrequency == 0) {
return Optional.empty();
}

workerMessagesIndex += 1;
if (workerMessagesIndex < perWorkerMetricsUpdateFrequency) {
return Optional.empty();
} else {
workerMessagesIndex = 0;
}

List<PerStepNamespaceMetrics> metrics = new ArrayList<>();
allStageInfo.get().forEach(s -> metrics.addAll(s.extractPerWorkerMetricValues()));

Expand Down

0 comments on commit 0dc2c4b

Please sign in to comment.