Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upload PerWorkerMetrics every 30 second instead of every 10 seconds #30795

Merged
merged 6 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,16 @@ public interface DataflowStreamingPipelineOptions extends PipelineOptions {

void setWindmillHarnessUpdateReportingPeriod(Duration value);

@Description(
"Specifies how often system defined per-worker metrics are reported. These metrics are "
+ " reported on the worker updates path so this number will be rounded up to the "
+ " nearest multiple of WindmillHarnessUpdateReportingPeriod. If that value is 0, then "
+ " these metrics are never sent.")
@Default.Integer(30000)
int getPerWorkerMetricsUpdateReportingPeriodMillis();

void setPerWorkerMetricsUpdateReportingPeriodMillis(int value);

@Description("Limit on depth of user exception stack trace reported to cloud monitoring.")
@Default.InstanceFactory(MaxStackTraceDepthToReportFactory.class)
int getMaxStackTraceDepthToReport();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,9 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
failureTracker,
streamingCounters,
memoryMonitor,
workExecutor);

workExecutor,
options.getWindmillHarnessUpdateReportingPeriod().getMillis(),
options.getPerWorkerMetricsUpdateReportingPeriodMillis());
return new StreamingDataflowWorker(
windmillServer,
clientId,
Expand Down Expand Up @@ -500,7 +501,9 @@ static StreamingDataflowWorker forTesting(
streamingCounters,
memoryMonitor,
workExecutor,
executorSupplier);
executorSupplier,
options.getWindmillHarnessUpdateReportingPeriod().getMillis(),
options.getPerWorkerMetricsUpdateReportingPeriodMillis());
return new StreamingDataflowWorker(
windmillServer,
1L,
Expand Down Expand Up @@ -760,7 +763,7 @@ public void start() {
scheduledExecutors.add(statusPageTimer);
}
workCommitter.start();
workerStatusReporter.start(options.getWindmillHarnessUpdateReportingPeriod().getMillis());
workerStatusReporter.start();
activeWorkRefresher.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.api.services.dataflow.model.WorkerMessage;
import com.google.api.services.dataflow.model.WorkerMessageResponse;
import java.io.IOException;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
Expand Down Expand Up @@ -55,6 +56,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ListMultimap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.MultimapBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.LongMath;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -87,6 +89,14 @@ public final class StreamingWorkerStatusReporter {
private final ScheduledExecutorService globalWorkerUpdateReporter;
private final ScheduledExecutorService workerMessageReporter;

// Reporting period for periodic status updates.
private final long windmillHarnessUpdateReportingPeriodMillis;
// PerWorkerMetrics are sent on the WorkerMessages channel, and are sent one in every
// perWorkerMetricsUpdateFrequency RPC call. If 0, PerWorkerMetrics are not reported.
private final long perWorkerMetricsUpdateFrequency;
// Used to track the number of WorkerMessages that have been sent without PerWorkerMetrics.
private final AtomicLong workerMessagesIndex;

private StreamingWorkerStatusReporter(
boolean publishCounters,
WorkUnitClient dataflowServiceClient,
Expand All @@ -96,7 +106,9 @@ private StreamingWorkerStatusReporter(
StreamingCounters streamingCounters,
MemoryMonitor memoryMonitor,
BoundedQueueExecutor workExecutor,
Function<String, ScheduledExecutorService> executorFactory) {
Function<String, ScheduledExecutorService> executorFactory,
long windmillHarnessUpdateReportingPeriodMillis,
long perWorkerMetricsUpdateReportingPeriodMillis) {
this.publishCounters = publishCounters;
this.dataflowServiceClient = dataflowServiceClient;
this.windmillQuotaThrottleTime = windmillQuotaThrottleTime;
Expand All @@ -111,6 +123,12 @@ private StreamingWorkerStatusReporter(
this.maxThreadCountOverride = new AtomicInteger();
this.globalWorkerUpdateReporter = executorFactory.apply(GLOBAL_WORKER_UPDATE_REPORTER_THREAD);
this.workerMessageReporter = executorFactory.apply(WORKER_MESSAGE_REPORTER_THREAD);
this.windmillHarnessUpdateReportingPeriodMillis = windmillHarnessUpdateReportingPeriodMillis;
this.perWorkerMetricsUpdateFrequency =
getPerWorkerMetricsUpdateFrequency(
windmillHarnessUpdateReportingPeriodMillis,
perWorkerMetricsUpdateReportingPeriodMillis);
this.workerMessagesIndex = new AtomicLong();
}

public static StreamingWorkerStatusReporter create(
Expand All @@ -120,7 +138,9 @@ public static StreamingWorkerStatusReporter create(
FailureTracker failureTracker,
StreamingCounters streamingCounters,
MemoryMonitor memoryMonitor,
BoundedQueueExecutor workExecutor) {
BoundedQueueExecutor workExecutor,
long windmillHarnessUpdateReportingPeriodMillis,
long perWorkerMetricsUpdateReportingPeriodMillis) {
return new StreamingWorkerStatusReporter(
/* publishCounters= */ true,
workUnitClient,
Expand All @@ -132,7 +152,9 @@ public static StreamingWorkerStatusReporter create(
workExecutor,
threadName ->
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(threadName).build()));
new ThreadFactoryBuilder().setNameFormat(threadName).build()),
windmillHarnessUpdateReportingPeriodMillis,
perWorkerMetricsUpdateReportingPeriodMillis);
}

@VisibleForTesting
Expand All @@ -145,7 +167,9 @@ public static StreamingWorkerStatusReporter forTesting(
StreamingCounters streamingCounters,
MemoryMonitor memoryMonitor,
BoundedQueueExecutor workExecutor,
Function<String, ScheduledExecutorService> executorFactory) {
Function<String, ScheduledExecutorService> executorFactory,
long windmillHarnessUpdateReportingPeriodMillis,
long perWorkerMetricsUpdateReportingPeriodMillis) {
return new StreamingWorkerStatusReporter(
publishCounters,
workUnitClient,
Expand All @@ -155,7 +179,9 @@ public static StreamingWorkerStatusReporter forTesting(
streamingCounters,
memoryMonitor,
workExecutor,
executorFactory);
executorFactory,
windmillHarnessUpdateReportingPeriodMillis,
perWorkerMetricsUpdateReportingPeriodMillis);
}

/**
Expand Down Expand Up @@ -203,7 +229,7 @@ private static void shutdownExecutor(ScheduledExecutorService executor) {
}

@SuppressWarnings("FutureReturnValueIgnored")
public void start(long windmillHarnessUpdateReportingPeriodMillis) {
public void start() {
reportHarnessStartup();
if (windmillHarnessUpdateReportingPeriodMillis > 0) {
LOG.info(
Expand Down Expand Up @@ -231,6 +257,7 @@ public void stop() {
shutdownExecutor(workerMessageReporter);
// one last send
reportPeriodicWorkerUpdates();
this.workerMessagesIndex.set(this.perWorkerMetricsUpdateFrequency);
reportPeriodicWorkerMessage();
}

Expand All @@ -249,6 +276,22 @@ private void reportHarnessStartup() {
}
}

// Calculates the PerWorkerMetrics reporting frequency, ensuring alignment with the
// WorkerMessages RPC schedule. The desired reporting period
// (perWorkerMetricsUpdateReportingPeriodMillis) is adjusted to the nearest multiple
// of the RPC interval (windmillHarnessUpdateReportingPeriodMillis).
private static long getPerWorkerMetricsUpdateFrequency(
long windmillHarnessUpdateReportingPeriodMillis,
long perWorkerMetricsUpdateReportingPeriodMillis) {
if (windmillHarnessUpdateReportingPeriodMillis == 0) {
return 0;
}
return LongMath.divide(
perWorkerMetricsUpdateReportingPeriodMillis,
windmillHarnessUpdateReportingPeriodMillis,
RoundingMode.CEILING);
}

/** Sends counter updates to Dataflow backend. */
private void sendWorkerUpdatesToDataflowService(
CounterSet deltaCounters, CounterSet cumulativeCounters) throws IOException {
Expand Down Expand Up @@ -325,11 +368,7 @@ private List<WorkerMessage> createWorkerMessage() {
List<WorkerMessage> workerMessages = new ArrayList<>(2);
workerMessages.add(createWorkerMessageForStreamingScalingReport());

if (StreamingStepMetricsContainer.getEnablePerWorkerMetrics()) {
Optional<WorkerMessage> metricsMsg = createWorkerMessageForPerWorkerMetrics();
metricsMsg.ifPresent(workerMessages::add);
}

createWorkerMessageForPerWorkerMetrics().ifPresent(metrics -> workerMessages.add(metrics));
return workerMessages;
}

Expand All @@ -346,6 +385,17 @@ private WorkerMessage createWorkerMessageForStreamingScalingReport() {
}

private Optional<WorkerMessage> createWorkerMessageForPerWorkerMetrics() {
if (!StreamingStepMetricsContainer.getEnablePerWorkerMetrics()
|| perWorkerMetricsUpdateFrequency == 0) {
return Optional.empty();
}

if (workerMessagesIndex.incrementAndGet() < perWorkerMetricsUpdateFrequency) {
return Optional.empty();
} else {
workerMessagesIndex.set(0L);
}

List<PerStepNamespaceMetrics> metrics = new ArrayList<>();
allStageInfo.get().forEach(s -> metrics.addAll(s.extractPerWorkerMetricValues()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
@RunWith(JUnit4.class)
public class StreamingWorkerStatusReporterTest {
private final long DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME = 1000;
private final long DEFAULT_HARNESS_REPORTING_PERIOD = 10000;
private final long DEFAULT_PER_WORKER_METRICS_PERIOD = 30000;

private BoundedQueueExecutor mockExecutor;
private WorkUnitClient mockWorkUnitClient;
Expand All @@ -66,7 +68,9 @@ public void testOverrideMaximumThreadCount() throws Exception {
StreamingCounters.create(),
mockMemoryMonitor,
mockExecutor,
(threadName) -> Executors.newSingleThreadScheduledExecutor());
(threadName) -> Executors.newSingleThreadScheduledExecutor(),
DEFAULT_HARNESS_REPORTING_PERIOD,
DEFAULT_PER_WORKER_METRICS_PERIOD);
StreamingScalingReportResponse streamingScalingReportResponse =
new StreamingScalingReportResponse().setMaximumThreadCount(10);
WorkerMessageResponse workerMessageResponse =
Expand All @@ -90,7 +94,9 @@ public void testHandleEmptyWorkerMessageResponse() throws Exception {
StreamingCounters.create(),
mockMemoryMonitor,
mockExecutor,
(threadName) -> Executors.newSingleThreadScheduledExecutor());
(threadName) -> Executors.newSingleThreadScheduledExecutor(),
DEFAULT_HARNESS_REPORTING_PERIOD,
DEFAULT_PER_WORKER_METRICS_PERIOD);
WorkerMessageResponse workerMessageResponse = new WorkerMessageResponse();
when(mockWorkUnitClient.reportWorkerMessage(any()))
.thenReturn(Collections.singletonList(workerMessageResponse));
Expand Down
Loading