Skip to content

Commit

Permalink
Upload PerWorkerMetrics every 30 second instead of every 10 seconds (#…
Browse files Browse the repository at this point in the history
…30795)

* Upload PerWorkerMetrics every 30 second instead of every 10 seconds

* Address comments

* Fix merge conflicts
  • Loading branch information
JayajP authored Apr 15, 2024
1 parent 285b20d commit a00e947
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,16 @@ public interface DataflowStreamingPipelineOptions extends PipelineOptions {

void setWindmillHarnessUpdateReportingPeriod(Duration value);

@Description(
"Specifies how often system defined per-worker metrics are reported. These metrics are "
+ " reported on the worker updates path so this number will be rounded up to the "
+ " nearest multiple of WindmillHarnessUpdateReportingPeriod. If that value is 0, then "
+ " these metrics are never sent.")
@Default.Integer(30000)
int getPerWorkerMetricsUpdateReportingPeriodMillis();

void setPerWorkerMetricsUpdateReportingPeriodMillis(int value);

@Description("Limit on depth of user exception stack trace reported to cloud monitoring.")
@Default.InstanceFactory(MaxStackTraceDepthToReportFactory.class)
int getMaxStackTraceDepthToReport();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,9 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
failureTracker,
streamingCounters,
memoryMonitor,
workExecutor);

workExecutor,
options.getWindmillHarnessUpdateReportingPeriod().getMillis(),
options.getPerWorkerMetricsUpdateReportingPeriodMillis());
return new StreamingDataflowWorker(
windmillServer,
clientId,
Expand Down Expand Up @@ -500,7 +501,9 @@ static StreamingDataflowWorker forTesting(
streamingCounters,
memoryMonitor,
workExecutor,
executorSupplier);
executorSupplier,
options.getWindmillHarnessUpdateReportingPeriod().getMillis(),
options.getPerWorkerMetricsUpdateReportingPeriodMillis());
return new StreamingDataflowWorker(
windmillServer,
1L,
Expand Down Expand Up @@ -760,7 +763,7 @@ public void start() {
scheduledExecutors.add(statusPageTimer);
}
workCommitter.start();
workerStatusReporter.start(options.getWindmillHarnessUpdateReportingPeriod().getMillis());
workerStatusReporter.start();
activeWorkRefresher.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.api.services.dataflow.model.WorkerMessage;
import com.google.api.services.dataflow.model.WorkerMessageResponse;
import java.io.IOException;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
Expand Down Expand Up @@ -55,6 +56,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ListMultimap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.MultimapBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.LongMath;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -87,6 +89,14 @@ public final class StreamingWorkerStatusReporter {
private final ScheduledExecutorService globalWorkerUpdateReporter;
private final ScheduledExecutorService workerMessageReporter;

// Reporting period for periodic status updates.
private final long windmillHarnessUpdateReportingPeriodMillis;
// PerWorkerMetrics are sent on the WorkerMessages channel, and are sent one in every
// perWorkerMetricsUpdateFrequency RPC call. If 0, PerWorkerMetrics are not reported.
private final long perWorkerMetricsUpdateFrequency;
// Used to track the number of WorkerMessages that have been sent without PerWorkerMetrics.
private final AtomicLong workerMessagesIndex;

private StreamingWorkerStatusReporter(
boolean publishCounters,
WorkUnitClient dataflowServiceClient,
Expand All @@ -96,7 +106,9 @@ private StreamingWorkerStatusReporter(
StreamingCounters streamingCounters,
MemoryMonitor memoryMonitor,
BoundedQueueExecutor workExecutor,
Function<String, ScheduledExecutorService> executorFactory) {
Function<String, ScheduledExecutorService> executorFactory,
long windmillHarnessUpdateReportingPeriodMillis,
long perWorkerMetricsUpdateReportingPeriodMillis) {
this.publishCounters = publishCounters;
this.dataflowServiceClient = dataflowServiceClient;
this.windmillQuotaThrottleTime = windmillQuotaThrottleTime;
Expand All @@ -111,6 +123,12 @@ private StreamingWorkerStatusReporter(
this.maxThreadCountOverride = new AtomicInteger();
this.globalWorkerUpdateReporter = executorFactory.apply(GLOBAL_WORKER_UPDATE_REPORTER_THREAD);
this.workerMessageReporter = executorFactory.apply(WORKER_MESSAGE_REPORTER_THREAD);
this.windmillHarnessUpdateReportingPeriodMillis = windmillHarnessUpdateReportingPeriodMillis;
this.perWorkerMetricsUpdateFrequency =
getPerWorkerMetricsUpdateFrequency(
windmillHarnessUpdateReportingPeriodMillis,
perWorkerMetricsUpdateReportingPeriodMillis);
this.workerMessagesIndex = new AtomicLong();
}

public static StreamingWorkerStatusReporter create(
Expand All @@ -120,7 +138,9 @@ public static StreamingWorkerStatusReporter create(
FailureTracker failureTracker,
StreamingCounters streamingCounters,
MemoryMonitor memoryMonitor,
BoundedQueueExecutor workExecutor) {
BoundedQueueExecutor workExecutor,
long windmillHarnessUpdateReportingPeriodMillis,
long perWorkerMetricsUpdateReportingPeriodMillis) {
return new StreamingWorkerStatusReporter(
/* publishCounters= */ true,
workUnitClient,
Expand All @@ -132,7 +152,9 @@ public static StreamingWorkerStatusReporter create(
workExecutor,
threadName ->
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(threadName).build()));
new ThreadFactoryBuilder().setNameFormat(threadName).build()),
windmillHarnessUpdateReportingPeriodMillis,
perWorkerMetricsUpdateReportingPeriodMillis);
}

@VisibleForTesting
Expand All @@ -145,7 +167,9 @@ public static StreamingWorkerStatusReporter forTesting(
StreamingCounters streamingCounters,
MemoryMonitor memoryMonitor,
BoundedQueueExecutor workExecutor,
Function<String, ScheduledExecutorService> executorFactory) {
Function<String, ScheduledExecutorService> executorFactory,
long windmillHarnessUpdateReportingPeriodMillis,
long perWorkerMetricsUpdateReportingPeriodMillis) {
return new StreamingWorkerStatusReporter(
publishCounters,
workUnitClient,
Expand All @@ -155,7 +179,9 @@ public static StreamingWorkerStatusReporter forTesting(
streamingCounters,
memoryMonitor,
workExecutor,
executorFactory);
executorFactory,
windmillHarnessUpdateReportingPeriodMillis,
perWorkerMetricsUpdateReportingPeriodMillis);
}

/**
Expand Down Expand Up @@ -203,7 +229,7 @@ private static void shutdownExecutor(ScheduledExecutorService executor) {
}

@SuppressWarnings("FutureReturnValueIgnored")
public void start(long windmillHarnessUpdateReportingPeriodMillis) {
public void start() {
reportHarnessStartup();
if (windmillHarnessUpdateReportingPeriodMillis > 0) {
LOG.info(
Expand Down Expand Up @@ -231,6 +257,7 @@ public void stop() {
shutdownExecutor(workerMessageReporter);
// one last send
reportPeriodicWorkerUpdates();
this.workerMessagesIndex.set(this.perWorkerMetricsUpdateFrequency);
reportPeriodicWorkerMessage();
}

Expand All @@ -249,6 +276,22 @@ private void reportHarnessStartup() {
}
}

// Calculates the PerWorkerMetrics reporting frequency, ensuring alignment with the
// WorkerMessages RPC schedule. The desired reporting period
// (perWorkerMetricsUpdateReportingPeriodMillis) is adjusted to the nearest multiple
// of the RPC interval (windmillHarnessUpdateReportingPeriodMillis).
private static long getPerWorkerMetricsUpdateFrequency(
long windmillHarnessUpdateReportingPeriodMillis,
long perWorkerMetricsUpdateReportingPeriodMillis) {
if (windmillHarnessUpdateReportingPeriodMillis == 0) {
return 0;
}
return LongMath.divide(
perWorkerMetricsUpdateReportingPeriodMillis,
windmillHarnessUpdateReportingPeriodMillis,
RoundingMode.CEILING);
}

/** Sends counter updates to Dataflow backend. */
private void sendWorkerUpdatesToDataflowService(
CounterSet deltaCounters, CounterSet cumulativeCounters) throws IOException {
Expand Down Expand Up @@ -325,11 +368,7 @@ private List<WorkerMessage> createWorkerMessage() {
List<WorkerMessage> workerMessages = new ArrayList<>(2);
workerMessages.add(createWorkerMessageForStreamingScalingReport());

if (StreamingStepMetricsContainer.getEnablePerWorkerMetrics()) {
Optional<WorkerMessage> metricsMsg = createWorkerMessageForPerWorkerMetrics();
metricsMsg.ifPresent(workerMessages::add);
}

createWorkerMessageForPerWorkerMetrics().ifPresent(metrics -> workerMessages.add(metrics));
return workerMessages;
}

Expand All @@ -346,6 +385,17 @@ private WorkerMessage createWorkerMessageForStreamingScalingReport() {
}

private Optional<WorkerMessage> createWorkerMessageForPerWorkerMetrics() {
if (!StreamingStepMetricsContainer.getEnablePerWorkerMetrics()
|| perWorkerMetricsUpdateFrequency == 0) {
return Optional.empty();
}

if (workerMessagesIndex.incrementAndGet() < perWorkerMetricsUpdateFrequency) {
return Optional.empty();
} else {
workerMessagesIndex.set(0L);
}

List<PerStepNamespaceMetrics> metrics = new ArrayList<>();
allStageInfo.get().forEach(s -> metrics.addAll(s.extractPerWorkerMetricValues()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
@RunWith(JUnit4.class)
public class StreamingWorkerStatusReporterTest {
private final long DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME = 1000;
private final long DEFAULT_HARNESS_REPORTING_PERIOD = 10000;
private final long DEFAULT_PER_WORKER_METRICS_PERIOD = 30000;

private BoundedQueueExecutor mockExecutor;
private WorkUnitClient mockWorkUnitClient;
Expand All @@ -66,7 +68,9 @@ public void testOverrideMaximumThreadCount() throws Exception {
StreamingCounters.create(),
mockMemoryMonitor,
mockExecutor,
(threadName) -> Executors.newSingleThreadScheduledExecutor());
(threadName) -> Executors.newSingleThreadScheduledExecutor(),
DEFAULT_HARNESS_REPORTING_PERIOD,
DEFAULT_PER_WORKER_METRICS_PERIOD);
StreamingScalingReportResponse streamingScalingReportResponse =
new StreamingScalingReportResponse().setMaximumThreadCount(10);
WorkerMessageResponse workerMessageResponse =
Expand All @@ -90,7 +94,9 @@ public void testHandleEmptyWorkerMessageResponse() throws Exception {
StreamingCounters.create(),
mockMemoryMonitor,
mockExecutor,
(threadName) -> Executors.newSingleThreadScheduledExecutor());
(threadName) -> Executors.newSingleThreadScheduledExecutor(),
DEFAULT_HARNESS_REPORTING_PERIOD,
DEFAULT_PER_WORKER_METRICS_PERIOD);
WorkerMessageResponse workerMessageResponse = new WorkerMessageResponse();
when(mockWorkUnitClient.reportWorkerMessage(any()))
.thenReturn(Collections.singletonList(workerMessageResponse));
Expand Down

0 comments on commit a00e947

Please sign in to comment.