Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upload PerWorkerMetrics every 30 second instead of every 10 seconds #30795

Merged
merged 6 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,16 @@ public interface DataflowStreamingPipelineOptions extends PipelineOptions {

void setWindmillHarnessUpdateReportingPeriod(Duration value);

@Description(
"Specifies how often system defined per-worker metrics are reported. These metrics are "
+ " reported on the worker updates path so this number will be rounded up to the "
+ " nearest multiple of WindmillHarnessUpdateReportingPeriod. If that value is 0, then "
+ " these metrics are never sent.")
@Default.Integer(30000)
int getPerWorkerMetricsUpdateReportingPeriodMillis();

void setPerWorkerMetricsUpdateReportingPeriodMillis(int value);

@Description("Limit on depth of user exception stack trace reported to cloud monitoring.")
@Default.InstanceFactory(MaxStackTraceDepthToReportFactory.class)
int getMaxStackTraceDepthToReport();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,9 @@ public void start() {
scheduledExecutors.add(statusPageTimer);
}
workCommitter.start();
workerStatusReporter.start(options.getWindmillHarnessUpdateReportingPeriod().getMillis());
workerStatusReporter.start(
options.getWindmillHarnessUpdateReportingPeriod().getMillis(),
options.getPerWorkerMetricsUpdateReportingPeriodMillis());
activeWorkRefresher.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.api.services.dataflow.model.WorkItemStatus;
import com.google.api.services.dataflow.model.WorkerMessage;
import java.io.IOException;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
Expand All @@ -52,6 +53,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ListMultimap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.MultimapBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.LongMath;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -81,6 +83,12 @@ public final class StreamingWorkerStatusReporter {
private final ScheduledExecutorService globalWorkerUpdateReporter;
private final ScheduledExecutorService workerMessageReporter;

// PerWorkerMetrics are sent on the WorkerMessages channel, and are sent one in every
// perWorkerMetricsUpdateFrequency RPC call. If 0, PerWorkerMetrics are not reported.
long perWorkerMetricsUpdateFrequency = 0L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this never changes lets make it private and final and initialize in the constructor

you can modify the constructor to take in this instead of passing it into start() and have start() take no params.

// Used to track the number of WorkerMessages that have been sent without PerWorkerMetrics.
long workerMessagesIndex = 0L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will not be threadsafe, how about using an AtomicLong


private StreamingWorkerStatusReporter(
boolean publishCounters,
WorkUnitClient dataflowServiceClient,
Expand Down Expand Up @@ -194,8 +202,12 @@ private static void shutdownExecutor(ScheduledExecutorService executor) {
}

@SuppressWarnings("FutureReturnValueIgnored")
public void start(long windmillHarnessUpdateReportingPeriodMillis) {
public void start(
long windmillHarnessUpdateReportingPeriodMillis,
long perWorkerMetricsUpdateReportingPeriodMillis) {
reportHarnessStartup();
setPerWorkerMetricsUpdateFrequency(
windmillHarnessUpdateReportingPeriodMillis, perWorkerMetricsUpdateReportingPeriodMillis);
if (windmillHarnessUpdateReportingPeriodMillis > 0) {
LOG.info(
"Starting periodic worker status reporters. Reporting period is every {} millis.",
Expand All @@ -222,6 +234,7 @@ public void stop() {
shutdownExecutor(workerMessageReporter);
// one last send
reportPeriodicWorkerUpdates();
this.workerMessagesIndex = this.perWorkerMetricsUpdateFrequency;
reportPeriodicWorkerMessage();
}

Expand All @@ -240,6 +253,24 @@ private void reportHarnessStartup() {
}
}

// Calculates the PerWorkerMetrics reporting frequency, ensuring alignment with the
// WorkerMessages RPC schedule. The desired reporting period
// (perWorkerMetricsUpdateReportingPeriodMillis) is adjusted to the nearest multiple
// of the RPC interval (windmillHarnessUpdateReportingPeriodMillis).
private void setPerWorkerMetricsUpdateFrequency(
long windmillHarnessUpdateReportingPeriodMillis,
long perWorkerMetricsUpdateReportingPeriodMillis) {
if (windmillHarnessUpdateReportingPeriodMillis == 0) {
this.perWorkerMetricsUpdateFrequency = 0;
return;
}
this.perWorkerMetricsUpdateFrequency =
LongMath.divide(
perWorkerMetricsUpdateReportingPeriodMillis,
windmillHarnessUpdateReportingPeriodMillis,
RoundingMode.CEILING);
}

/** Sends counter updates to Dataflow backend. */
private void sendWorkerUpdatesToDataflowService(
CounterSet deltaCounters, CounterSet cumulativeCounters) throws IOException {
Expand Down Expand Up @@ -313,11 +344,7 @@ private List<WorkerMessage> createWorkerMessage() {
List<WorkerMessage> workerMessages = new ArrayList<>(2);
workerMessages.add(createWorkerMessageForStreamingScalingReport());

if (StreamingStepMetricsContainer.getEnablePerWorkerMetrics()) {
Optional<WorkerMessage> metricsMsg = createWorkerMessageForPerWorkerMetrics();
metricsMsg.ifPresent(workerMessages::add);
}

createWorkerMessageForPerWorkerMetrics().ifPresent(metrics -> workerMessages.add(metrics));
return workerMessages;
}

Expand All @@ -334,6 +361,18 @@ private WorkerMessage createWorkerMessageForStreamingScalingReport() {
}

private Optional<WorkerMessage> createWorkerMessageForPerWorkerMetrics() {
if (!StreamingStepMetricsContainer.getEnablePerWorkerMetrics()
|| perWorkerMetricsUpdateFrequency == 0) {
return Optional.empty();
}

workerMessagesIndex += 1;
if (workerMessagesIndex < perWorkerMetricsUpdateFrequency) {
return Optional.empty();
} else {
workerMessagesIndex = 0;
}

List<PerStepNamespaceMetrics> metrics = new ArrayList<>();
allStageInfo.get().forEach(s -> metrics.addAll(s.extractPerWorkerMetricValues()));

Expand Down
Loading