From d17e6a69dc89fb821a8ce52f50926693c20bc023 Mon Sep 17 00:00:00 2001 From: Martin Trieu Date: Mon, 25 Nov 2024 10:11:52 -0800 Subject: [PATCH] address PR comments --- .../worker/StreamingDataflowWorker.java | 60 +++++++++---------- .../harness/SingleSourceWorkerHarness.java | 10 ++-- .../harness/StreamingWorkerHarness.java | 4 +- .../StreamingWorkerStatusReporter.java | 10 ++-- .../harness/ThrottledTimeTracker.java | 29 +++++++++ .../SingleSourceWorkerHarnessTest.java | 2 +- 6 files changed, 72 insertions(+), 43 deletions(-) create mode 100644 runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/ThrottledTimeTracker.java diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index f26b1aa74852..07e8cfb4d359 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -61,6 +61,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerHarness; import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerStatusPages; import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerStatusReporter; +import org.apache.beam.runners.dataflow.worker.streaming.harness.ThrottledTimeTracker; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor; import org.apache.beam.runners.dataflow.worker.windmill.ApplianceWindmillClient; @@ -347,14 +348,13 @@ private StreamingDataflowWorker( .setComputationStateFetcher(this.computationStateCache::get) .setWaitForResources(() -> memoryMonitor.waitForResources("GetWork")) .setHeartbeatSender(heartbeatSender) - .setThrottleTimeSupplier(windmillServer::getAndResetThrottleTime) + .setThrottleTimeTracker(windmillServer::getAndResetThrottleTime) .setGetWorkSender(getWorkSender) .build(); } this.workerStatusReporter = - streamingWorkerStatusReporterFactory.createStatusReporter( - streamingWorkerHarness::getAndResetThrottleTime); + streamingWorkerStatusReporterFactory.createStatusReporter(streamingWorkerHarness); this.activeWorkRefresher = new ActiveWorkRefresher( clock, @@ -559,41 +559,41 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o .setWindmillServer( GrpcWindmillServer.create(options, windmillStreamFactory, dispatcherClient)) .build(); - } else { - // Build with local Windmill client. - if (options.getWindmillServiceEndpoint() != null - || options.getLocalWindmillHostport().startsWith("grpc:")) { - GrpcDispatcherClient dispatcherClient = - GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options)); - GrpcWindmillStreamFactory windmillStreamFactory = - windmillStreamFactoryBuilder - .setHealthCheckIntervalMillis( - options.getWindmillServiceStreamingRpcHealthCheckPeriodMs()) - .build(); - GrpcWindmillServer windmillServer = - GrpcWindmillServer.create(options, windmillStreamFactory, dispatcherClient); - ComputationConfig.Fetcher configFetcher = - createApplianceComputationConfigFetcher(windmillServer); - return ConfigFetcherComputationStateCacheAndWindmillClient.builder() - .setWindmillDispatcherClient(dispatcherClient) - .setWindmillServer(windmillServer) - .setWindmillStreamFactory(windmillStreamFactory) - .setConfigFetcher(configFetcher) - .setComputationStateCache(computationStateCacheFactory.apply(configFetcher)) - .build(); - } + } - WindmillServerStub windmillServer = - new JniWindmillApplianceServer(options.getLocalWindmillHostport()); + // Build with local Windmill client. + if (options.getWindmillServiceEndpoint() != null + || options.getLocalWindmillHostport().startsWith("grpc:")) { + GrpcDispatcherClient dispatcherClient = + GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options)); + GrpcWindmillStreamFactory windmillStreamFactory = + windmillStreamFactoryBuilder + .setHealthCheckIntervalMillis( + options.getWindmillServiceStreamingRpcHealthCheckPeriodMs()) + .build(); + GrpcWindmillServer windmillServer = + GrpcWindmillServer.create(options, windmillStreamFactory, dispatcherClient); ComputationConfig.Fetcher configFetcher = createApplianceComputationConfigFetcher(windmillServer); return ConfigFetcherComputationStateCacheAndWindmillClient.builder() - .setWindmillStreamFactory(windmillStreamFactoryBuilder.build()) + .setWindmillDispatcherClient(dispatcherClient) .setWindmillServer(windmillServer) + .setWindmillStreamFactory(windmillStreamFactory) .setConfigFetcher(configFetcher) .setComputationStateCache(computationStateCacheFactory.apply(configFetcher)) .build(); } + + WindmillServerStub windmillServer = + new JniWindmillApplianceServer(options.getLocalWindmillHostport()); + ComputationConfig.Fetcher configFetcher = + createApplianceComputationConfigFetcher(windmillServer); + return ConfigFetcherComputationStateCacheAndWindmillClient.builder() + .setWindmillStreamFactory(windmillStreamFactoryBuilder.build()) + .setWindmillServer(windmillServer) + .setConfigFetcher(configFetcher) + .setComputationStateCache(computationStateCacheFactory.apply(configFetcher)) + .build(); } private static StreamingApplianceComputationConfigFetcher createApplianceComputationConfigFetcher( @@ -948,7 +948,7 @@ private void onCompleteCommit(CompleteCommit completeCommit) { @FunctionalInterface private interface StreamingWorkerStatusReporterFactory { - StreamingWorkerStatusReporter createStatusReporter(Supplier throttleTimeSupplier); + StreamingWorkerStatusReporter createStatusReporter(ThrottledTimeTracker throttledTimeTracker); } @AutoValue diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java index e89e83eafe5b..0ec232723ef8 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java @@ -66,7 +66,7 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness { private final Function> computationStateFetcher; private final ExecutorService workProviderExecutor; private final GetWorkSender getWorkSender; - private final Supplier throttleTimeSupplier; + private final ThrottledTimeTracker throttleTimeTracker; SingleSourceWorkerHarness( WorkCommitter workCommitter, @@ -76,7 +76,7 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness { Runnable waitForResources, Function> computationStateFetcher, GetWorkSender getWorkSender, - Supplier throttleTimeSupplier) { + ThrottledTimeTracker throttleTimeTracker) { this.workCommitter = workCommitter; this.getDataClient = getDataClient; this.heartbeatSender = heartbeatSender; @@ -92,7 +92,7 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness { .build()); this.isRunning = new AtomicBoolean(false); this.getWorkSender = getWorkSender; - this.throttleTimeSupplier = throttleTimeSupplier; + this.throttleTimeTracker = throttleTimeTracker; } public static SingleSourceWorkerHarness.Builder builder() { @@ -149,7 +149,7 @@ public void shutdown() { @Override public long getAndResetThrottleTime() { - return throttleTimeSupplier.get(); + return throttleTimeTracker.getAndResetThrottleTime(); } private void streamingEngineDispatchLoop( @@ -262,7 +262,7 @@ Builder setComputationStateFetcher( Builder setGetWorkSender(GetWorkSender getWorkSender); - Builder setThrottleTimeSupplier(Supplier throttleTimeSupplier); + Builder setThrottleTimeTracker(ThrottledTimeTracker throttleTimeTracker); SingleSourceWorkerHarness build(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java index 02b101def753..779dd50d2ba9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java @@ -21,10 +21,8 @@ /** Provides an interface to start streaming worker processing. */ @Internal -public interface StreamingWorkerHarness { +public interface StreamingWorkerHarness extends ThrottledTimeTracker { void start(); void shutdown(); - - long getAndResetThrottleTime(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java index 4c06d12d50a3..8a3719a0270b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java @@ -79,7 +79,7 @@ public final class StreamingWorkerStatusReporter { private final int initialMaxThreadCount; private final int initialMaxBundlesOutstanding; private final WorkUnitClient dataflowServiceClient; - private final Supplier windmillQuotaThrottleTime; + private final ThrottledTimeTracker windmillQuotaThrottleTime; private final Supplier> allStageInfo; private final FailureTracker failureTracker; private final StreamingCounters streamingCounters; @@ -101,7 +101,7 @@ public final class StreamingWorkerStatusReporter { StreamingWorkerStatusReporter( boolean publishCounters, WorkUnitClient dataflowServiceClient, - Supplier windmillQuotaThrottleTime, + ThrottledTimeTracker windmillQuotaThrottleTime, Supplier> allStageInfo, FailureTracker failureTracker, StreamingCounters streamingCounters, @@ -253,7 +253,9 @@ private void reportHarnessStartup() { private void sendWorkerUpdatesToDataflowService( CounterSet deltaCounters, CounterSet cumulativeCounters) throws IOException { // Throttle time is tracked by the windmillServer but is reported to DFE here. - streamingCounters.windmillQuotaThrottling().addValue(windmillQuotaThrottleTime.get()); + streamingCounters + .windmillQuotaThrottling() + .addValue(windmillQuotaThrottleTime.getAndResetThrottleTime()); if (memoryMonitor.isThrashing()) { streamingCounters.memoryThrashing().addValue(1); } @@ -460,7 +462,7 @@ public interface Builder { Builder setDataflowServiceClient(WorkUnitClient dataflowServiceClient); - Builder setWindmillQuotaThrottleTime(Supplier windmillQuotaThrottleTime); + Builder setWindmillQuotaThrottleTime(ThrottledTimeTracker windmillQuotaThrottleTime); Builder setAllStageInfo(Supplier> allStageInfo); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/ThrottledTimeTracker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/ThrottledTimeTracker.java new file mode 100644 index 000000000000..52623449cb83 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/ThrottledTimeTracker.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming.harness; + +/** + * Tracks time spent in a throttled state due to {@code Status.RESOURCE_EXHAUSTED} errors returned + * from gRPC calls. + */ +@FunctionalInterface +public interface ThrottledTimeTracker { + + /** Returns the combined total of all throttle times and resets those times to 0. */ + long getAndResetThrottleTime(); +} diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java index a65985b91f4e..39bd98f6e8f4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java @@ -61,7 +61,7 @@ private SingleSourceWorkerHarness createWorkerHarness( .setStreamingWorkScheduler(streamingWorkScheduler) .setComputationStateFetcher(computationStateFetcher) // no-op throttle time supplier. - .setThrottleTimeSupplier(() -> 0L) + .setThrottleTimeTracker(() -> 0L) .setGetWorkSender(getWorkSender) .build(); }