Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
JayajP committed Apr 11, 2024
1 parent 0dc2c4b commit dbceef7
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,9 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
failureTracker,
streamingCounters,
memoryMonitor,
workExecutor);
workExecutor,
options.getWindmillHarnessUpdateReportingPeriod().getMillis(),
options.getPerWorkerMetricsUpdateReportingPeriodMillis());
return new StreamingDataflowWorker(
windmillServer,
clientId,
Expand Down Expand Up @@ -494,7 +496,9 @@ static StreamingDataflowWorker forTesting(
streamingCounters,
memoryMonitor,
workExecutor,
executorSupplier);
executorSupplier,
options.getWindmillHarnessUpdateReportingPeriod().getMillis(),
options.getPerWorkerMetricsUpdateReportingPeriodMillis());
return new StreamingDataflowWorker(
windmillServer,
1L,
Expand Down Expand Up @@ -753,9 +757,7 @@ public void start() {
scheduledExecutors.add(statusPageTimer);
}
workCommitter.start();
workerStatusReporter.start(
options.getWindmillHarnessUpdateReportingPeriod().getMillis(),
options.getPerWorkerMetricsUpdateReportingPeriodMillis());
workerStatusReporter.start();
activeWorkRefresher.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,13 @@ public final class StreamingWorkerStatusReporter {
private final ScheduledExecutorService globalWorkerUpdateReporter;
private final ScheduledExecutorService workerMessageReporter;

// Reporting period for periodic status updates.
private final long windmillHarnessUpdateReportingPeriodMillis;
// PerWorkerMetrics are sent on the WorkerMessages channel, and are sent one in every
// perWorkerMetricsUpdateFrequency RPC call. If 0, PerWorkerMetrics are not reported.
long perWorkerMetricsUpdateFrequency = 0L;
private final long perWorkerMetricsUpdateFrequency;
// Used to track the number of WorkerMessages that have been sent without PerWorkerMetrics.
long workerMessagesIndex = 0L;
private final AtomicLong workerMessagesIndex;

private StreamingWorkerStatusReporter(
boolean publishCounters,
Expand All @@ -98,7 +100,9 @@ private StreamingWorkerStatusReporter(
StreamingCounters streamingCounters,
MemoryMonitor memoryMonitor,
BoundedQueueExecutor workExecutor,
Function<String, ScheduledExecutorService> executorFactory) {
Function<String, ScheduledExecutorService> executorFactory,
long windmillHarnessUpdateReportingPeriodMillis,
long perWorkerMetricsUpdateReportingPeriodMillis) {
this.publishCounters = publishCounters;
this.dataflowServiceClient = dataflowServiceClient;
this.windmillQuotaThrottleTime = windmillQuotaThrottleTime;
Expand All @@ -110,6 +114,12 @@ private StreamingWorkerStatusReporter(
this.previousTimeAtMaxThreads = new AtomicLong();
this.globalWorkerUpdateReporter = executorFactory.apply(GLOBAL_WORKER_UPDATE_REPORTER_THREAD);
this.workerMessageReporter = executorFactory.apply(WORKER_MESSAGE_REPORTER_THREAD);
this.windmillHarnessUpdateReportingPeriodMillis = windmillHarnessUpdateReportingPeriodMillis;
this.perWorkerMetricsUpdateFrequency =
getPerWorkerMetricsUpdateFrequency(
windmillHarnessUpdateReportingPeriodMillis,
perWorkerMetricsUpdateReportingPeriodMillis);
this.workerMessagesIndex = new AtomicLong();
}

public static StreamingWorkerStatusReporter create(
Expand All @@ -119,7 +129,9 @@ public static StreamingWorkerStatusReporter create(
FailureTracker failureTracker,
StreamingCounters streamingCounters,
MemoryMonitor memoryMonitor,
BoundedQueueExecutor workExecutor) {
BoundedQueueExecutor workExecutor,
long windmillHarnessUpdateReportingPeriodMillis,
long perWorkerMetricsUpdateReportingPeriodMillis) {
return new StreamingWorkerStatusReporter(
/* publishCounters= */ true,
workUnitClient,
Expand All @@ -131,7 +143,9 @@ public static StreamingWorkerStatusReporter create(
workExecutor,
threadName ->
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(threadName).build()));
new ThreadFactoryBuilder().setNameFormat(threadName).build()),
windmillHarnessUpdateReportingPeriodMillis,
perWorkerMetricsUpdateReportingPeriodMillis);
}

@VisibleForTesting
Expand All @@ -144,7 +158,9 @@ public static StreamingWorkerStatusReporter forTesting(
StreamingCounters streamingCounters,
MemoryMonitor memoryMonitor,
BoundedQueueExecutor workExecutor,
Function<String, ScheduledExecutorService> executorFactory) {
Function<String, ScheduledExecutorService> executorFactory,
long windmillHarnessUpdateReportingPeriodMillis,
long perWorkerMetricsUpdateReportingPeriodMillis) {
return new StreamingWorkerStatusReporter(
publishCounters,
workUnitClient,
Expand All @@ -154,7 +170,9 @@ public static StreamingWorkerStatusReporter forTesting(
streamingCounters,
memoryMonitor,
workExecutor,
executorFactory);
executorFactory,
windmillHarnessUpdateReportingPeriodMillis,
perWorkerMetricsUpdateReportingPeriodMillis);
}

/**
Expand Down Expand Up @@ -202,12 +220,8 @@ private static void shutdownExecutor(ScheduledExecutorService executor) {
}

@SuppressWarnings("FutureReturnValueIgnored")
public void start(
long windmillHarnessUpdateReportingPeriodMillis,
long perWorkerMetricsUpdateReportingPeriodMillis) {
public void start() {
reportHarnessStartup();
setPerWorkerMetricsUpdateFrequency(
windmillHarnessUpdateReportingPeriodMillis, perWorkerMetricsUpdateReportingPeriodMillis);
if (windmillHarnessUpdateReportingPeriodMillis > 0) {
LOG.info(
"Starting periodic worker status reporters. Reporting period is every {} millis.",
Expand All @@ -234,7 +248,7 @@ public void stop() {
shutdownExecutor(workerMessageReporter);
// one last send
reportPeriodicWorkerUpdates();
this.workerMessagesIndex = this.perWorkerMetricsUpdateFrequency;
this.workerMessagesIndex.set(this.perWorkerMetricsUpdateFrequency);
reportPeriodicWorkerMessage();
}

Expand All @@ -257,18 +271,16 @@ private void reportHarnessStartup() {
// WorkerMessages RPC schedule. The desired reporting period
// (perWorkerMetricsUpdateReportingPeriodMillis) is adjusted to the nearest multiple
// of the RPC interval (windmillHarnessUpdateReportingPeriodMillis).
private void setPerWorkerMetricsUpdateFrequency(
private static long getPerWorkerMetricsUpdateFrequency(
long windmillHarnessUpdateReportingPeriodMillis,
long perWorkerMetricsUpdateReportingPeriodMillis) {
if (windmillHarnessUpdateReportingPeriodMillis == 0) {
this.perWorkerMetricsUpdateFrequency = 0;
return;
return 0;
}
this.perWorkerMetricsUpdateFrequency =
LongMath.divide(
perWorkerMetricsUpdateReportingPeriodMillis,
windmillHarnessUpdateReportingPeriodMillis,
RoundingMode.CEILING);
return LongMath.divide(
perWorkerMetricsUpdateReportingPeriodMillis,
windmillHarnessUpdateReportingPeriodMillis,
RoundingMode.CEILING);
}

/** Sends counter updates to Dataflow backend. */
Expand Down Expand Up @@ -366,11 +378,10 @@ private Optional<WorkerMessage> createWorkerMessageForPerWorkerMetrics() {
return Optional.empty();
}

workerMessagesIndex += 1;
if (workerMessagesIndex < perWorkerMetricsUpdateFrequency) {
if (workerMessagesIndex.incrementAndGet() < perWorkerMetricsUpdateFrequency) {
return Optional.empty();
} else {
workerMessagesIndex = 0;
workerMessagesIndex.set(0L);
}

List<PerStepNamespaceMetrics> metrics = new ArrayList<>();
Expand Down

0 comments on commit dbceef7

Please sign in to comment.