From 50d94e033a9a5833403e639b0d6df4f311082272 Mon Sep 17 00:00:00 2001 From: m-trieu Date: Mon, 14 Oct 2024 16:51:02 -0700 Subject: [PATCH 1/6] Integrate direct path with StreamingDataflowWorker code path --- .../DataflowStreamingPipelineOptions.java | 15 +- .../worker/StreamingDataflowWorker.java | 507 +++++++++++------- .../FanOutStreamingEngineWorkerHarness.java | 1 + .../harness/SingleSourceWorkerHarness.java | 12 +- .../harness/StreamingWorkerHarness.java | 2 + .../StreamingWorkerStatusReporter.java | 122 ++--- .../worker/StreamingDataflowWorkerTest.java | 42 +- .../StreamingWorkerStatusReporterTest.java | 53 +- 8 files changed, 456 insertions(+), 298 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java index 6a0208f1447f..61c38dde2b42 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java @@ -20,6 +20,7 @@ import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.Hidden; import org.apache.beam.sdk.options.PipelineOptions; import org.joda.time.Duration; @@ -219,10 +220,8 @@ public interface DataflowStreamingPipelineOptions extends PipelineOptions { void setWindmillServiceStreamMaxBackoffMillis(int value); - @Description( - "If true, Dataflow streaming pipeline will be running in direct path mode." - + " VMs must have IPv6 enabled for this to work.") - @Default.Boolean(false) + @Description("Enables direct path mode for streaming engine.") + @Default.InstanceFactory(EnableWindmillServiceDirectPathFactory.class) boolean getIsWindmillServiceDirectPathEnabled(); void setIsWindmillServiceDirectPathEnabled(boolean isWindmillServiceDirectPathEnabled); @@ -300,4 +299,12 @@ public Integer create(PipelineOptions options) { return streamingOptions.isEnableStreamingEngine() ? Integer.MAX_VALUE : 1; } } + + /** EnableStreamingEngine defaults to false unless one of the experiment is set. */ + class EnableWindmillServiceDirectPathFactory implements DefaultValueFactory { + @Override + public Boolean create(PipelineOptions options) { + return ExperimentalOptions.hasExperiment(options, "enable_windmill_service_direct_path"); + } + } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 6ce60283735f..333f674e94f3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -17,11 +17,11 @@ */ package org.apache.beam.runners.dataflow.worker; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.remoteChannel; -import com.google.api.services.dataflow.model.CounterUpdate; import com.google.api.services.dataflow.model.MapTask; import com.google.auto.value.AutoValue; +import java.io.PrintWriter; import java.util.List; import java.util.Map; import java.util.Optional; @@ -33,12 +33,13 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import javax.annotation.Nullable; import org.apache.beam.runners.core.metrics.MetricsLogger; import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions; -import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor; import org.apache.beam.runners.dataflow.worker.status.DebugCapture; import org.apache.beam.runners.dataflow.worker.status.WorkerStatusPages; import org.apache.beam.runners.dataflow.worker.streaming.ComputationState; @@ -50,7 +51,9 @@ import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingApplianceComputationConfigFetcher; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEngineComputationConfigFetcher; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandleImpl; +import org.apache.beam.runners.dataflow.worker.streaming.harness.FanOutStreamingEngineWorkerHarness; import org.apache.beam.runners.dataflow.worker.streaming.harness.SingleSourceWorkerHarness; import org.apache.beam.runners.dataflow.worker.streaming.harness.SingleSourceWorkerHarness.GetWorkSender; import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingCounters; @@ -63,9 +66,9 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader; import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub; import org.apache.beam.runners.dataflow.worker.windmill.appliance.JniWindmillApplianceServer; +import org.apache.beam.runners.dataflow.worker.windmill.client.CloseableStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool; -import org.apache.beam.runners.dataflow.worker.windmill.client.commits.Commits; import org.apache.beam.runners.dataflow.worker.windmill.client.commits.CompleteCommit; import org.apache.beam.runners.dataflow.worker.windmill.client.commits.StreamingApplianceWorkCommitter; import org.apache.beam.runners.dataflow.worker.windmill.client.commits.StreamingEngineWorkCommitter; @@ -78,8 +81,14 @@ import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcDispatcherClient; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillServer; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCache; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingRemoteStubFactory; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingStubFactory; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactory; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactoryImpl; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache; +import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget; +import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetDistributors; import org.apache.beam.runners.dataflow.worker.windmill.work.processing.StreamingWorkScheduler; import org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.FailureTracker; import org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.StreamingApplianceFailureTracker; @@ -98,8 +107,10 @@ import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.util.construction.CoderTranslation; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheStats; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.joda.time.Duration; import org.joda.time.Instant; @@ -120,6 +131,8 @@ public final class StreamingDataflowWorker { */ public static final int MAX_SINK_BYTES = 10_000_000; + public static final String STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL = + "streaming_engine_use_job_settings_for_heartbeat_pool"; private static final Logger LOG = LoggerFactory.getLogger(StreamingDataflowWorker.class); /** @@ -142,6 +155,8 @@ public final class StreamingDataflowWorker { private static final int DEFAULT_STATUS_PORT = 8081; private static final Random CLIENT_ID_GENERATOR = new Random(); private static final String CHANNELZ_PATH = "/channelz"; + private static final String BEAM_FN_API_EXPERIMENT = "beam_fn_api"; + private static final String ENABLE_IPV6_EXPERIMENT = "enable_private_ipv6_google_access"; private final WindmillStateCache stateCache; private final StreamingWorkerStatusPages statusPages; @@ -155,9 +170,8 @@ public final class StreamingDataflowWorker { private final ReaderCache readerCache; private final DataflowExecutionStateSampler sampler = DataflowExecutionStateSampler.instance(); private final ActiveWorkRefresher activeWorkRefresher; - private final WorkCommitter workCommitter; private final StreamingWorkerStatusReporter workerStatusReporter; - private final StreamingCounters streamingCounters; + private final int numCommitThreads; private StreamingDataflowWorker( WindmillServerStub windmillServer, @@ -170,17 +184,17 @@ private StreamingDataflowWorker( DataflowWorkerHarnessOptions options, HotKeyLogger hotKeyLogger, Supplier clock, - StreamingWorkerStatusReporter workerStatusReporter, + StreamingWorkerStatusReporterFactory streamingWorkerStatusReporterFactory, FailureTracker failureTracker, WorkFailureProcessor workFailureProcessor, StreamingCounters streamingCounters, MemoryMonitor memoryMonitor, GrpcWindmillStreamFactory windmillStreamFactory, ScheduledExecutorService activeWorkRefreshExecutorFn, - ConcurrentMap stageInfoMap) { + ConcurrentMap stageInfoMap, + @Nullable GrpcDispatcherClient dispatcherClient) { // Register standard file systems. FileSystems.setDefaultPipelineOptions(options); - this.configFetcher = configFetcher; this.computationStateCache = computationStateCache; this.stateCache = windmillStateCache; @@ -189,35 +203,13 @@ private StreamingDataflowWorker( Duration.standardSeconds(options.getReaderCacheTimeoutSec()), Executors.newCachedThreadPool()); this.options = options; - - boolean windmillServiceEnabled = options.isEnableStreamingEngine(); - - int numCommitThreads = 1; - if (windmillServiceEnabled && options.getWindmillServiceCommitThreads() > 0) { - numCommitThreads = options.getWindmillServiceCommitThreads(); - } - - this.workCommitter = - windmillServiceEnabled - ? StreamingEngineWorkCommitter.builder() - .setCommitByteSemaphore(Commits.maxCommitByteSemaphore()) - .setCommitWorkStreamFactory( - WindmillStreamPool.create( - numCommitThreads, - COMMIT_STREAM_TIMEOUT, - windmillServer::commitWorkStream) - ::getCloseableStream) - .setNumCommitSenders(numCommitThreads) - .setOnCommitComplete(this::onCompleteCommit) - .build() - : StreamingApplianceWorkCommitter.create( - windmillServer::commitWork, this::onCompleteCommit); - this.workUnitExecutor = workUnitExecutor; - - this.workerStatusReporter = workerStatusReporter; - this.streamingCounters = streamingCounters; this.memoryMonitor = BackgroundMemoryMonitor.create(memoryMonitor); + this.numCommitThreads = + options.isEnableStreamingEngine() + ? Math.max(options.getWindmillServiceCommitThreads(), 1) + : 1; + StreamingWorkScheduler streamingWorkScheduler = StreamingWorkScheduler.create( options, @@ -234,107 +226,190 @@ private StreamingDataflowWorker( ID_GENERATOR, configFetcher.getGlobalConfigHandle(), stageInfoMap); - ThrottlingGetDataMetricTracker getDataMetricTracker = new ThrottlingGetDataMetricTracker(memoryMonitor); - WorkerStatusPages workerStatusPages = - WorkerStatusPages.create(DEFAULT_STATUS_PORT, memoryMonitor); - StreamingWorkerStatusPages.Builder statusPagesBuilder = StreamingWorkerStatusPages.builder(); - int stuckCommitDurationMillis; - GetDataClient getDataClient; - HeartbeatSender heartbeatSender; - if (windmillServiceEnabled) { - WindmillStreamPool getDataStreamPool = - WindmillStreamPool.create( - Math.max(1, options.getWindmillGetDataStreamCount()), - GET_DATA_STREAM_TIMEOUT, - windmillServer::getDataStream); - getDataClient = new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool); - if (options.getUseSeparateWindmillHeartbeatStreams() != null) { + // Status page members. Different implementations on whether the harness is streaming engine + // direct path, streaming engine cloud path, or streaming appliance. + @Nullable ChannelzServlet channelzServlet = null; + Consumer getDataStatusProvider; + Supplier currentActiveCommitBytesProvider; + if (isDirectPathPipeline(options)) { + FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness = + FanOutStreamingEngineWorkerHarness.create( + createJobHeader(options, clientId), + GetWorkBudget.builder() + .setItems(chooseMaxBundlesOutstanding(options)) + .setBytes(MAX_GET_WORK_FETCH_BYTES) + .build(), + windmillStreamFactory, + (workItem, watermarks, processingContext, getWorkStreamLatencies) -> + computationStateCache + .get(processingContext.computationId()) + .ifPresent( + computationState -> { + memoryMonitor.waitForResources("GetWork"); + streamingWorkScheduler.scheduleWork( + computationState, + workItem, + watermarks, + processingContext, + getWorkStreamLatencies); + }), + createStubFactory(options), + GetWorkBudgetDistributors.distributeEvenly(), + Preconditions.checkNotNull(dispatcherClient), + commitWorkStream -> + StreamingEngineWorkCommitter.builder() + .setBackendWorkerToken(commitWorkStream.backendWorkerToken()) + .setOnCommitComplete(this::onCompleteCommit) + .setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1)) + .setCommitWorkStreamFactory( + () -> CloseableStream.create(commitWorkStream, () -> {})) + .build(), + getDataMetricTracker); + getDataStatusProvider = getDataMetricTracker::printHtml; + currentActiveCommitBytesProvider = + fanOutStreamingEngineWorkerHarness::currentActiveCommitBytes; + channelzServlet = + createChannelZServlet( + options, fanOutStreamingEngineWorkerHarness::currentWindmillEndpoints); + this.streamingWorkerHarness = fanOutStreamingEngineWorkerHarness; + } else { + Windmill.GetWorkRequest request = createGetWorkRequest(clientId, options); + GetDataClient getDataClient; + HeartbeatSender heartbeatSender; + WorkCommitter workCommitter; + GetWorkSender getWorkSender; + if (options.isEnableStreamingEngine()) { + WindmillStreamPool getDataStreamPool = + WindmillStreamPool.create( + Math.max(1, options.getWindmillGetDataStreamCount()), + GET_DATA_STREAM_TIMEOUT, + windmillServer::getDataStream); + getDataClient = new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool); heartbeatSender = - StreamPoolHeartbeatSender.Create( - Boolean.TRUE.equals(options.getUseSeparateWindmillHeartbeatStreams()) - ? separateHeartbeatPool(windmillServer) - : getDataStreamPool); - + createStreamingEngineHeartbeatSender( + options, windmillServer, getDataStreamPool, configFetcher.getGlobalConfigHandle()); + channelzServlet = + createChannelZServlet(options, windmillServer::getWindmillServiceEndpoints); + workCommitter = + StreamingEngineWorkCommitter.builder() + .setCommitWorkStreamFactory( + WindmillStreamPool.create( + numCommitThreads, + COMMIT_STREAM_TIMEOUT, + windmillServer::commitWorkStream) + ::getCloseableStream) + .setNumCommitSenders(numCommitThreads) + .setOnCommitComplete(this::onCompleteCommit) + .build(); + getWorkSender = + GetWorkSender.forStreamingEngine( + receiver -> windmillServer.getWorkStream(request, receiver)); } else { - heartbeatSender = - StreamPoolHeartbeatSender.Create( - separateHeartbeatPool(windmillServer), - getDataStreamPool, - configFetcher.getGlobalConfigHandle()); + getDataClient = new ApplianceGetDataClient(windmillServer, getDataMetricTracker); + heartbeatSender = new ApplianceHeartbeatSender(windmillServer::getData); + workCommitter = + StreamingApplianceWorkCommitter.create( + windmillServer::commitWork, this::onCompleteCommit); + getWorkSender = GetWorkSender.forAppliance(() -> windmillServer.getWork(request)); } - stuckCommitDurationMillis = - options.getStuckCommitDurationMillis() > 0 ? options.getStuckCommitDurationMillis() : 0; - statusPagesBuilder - .setDebugCapture( - new DebugCapture.Manager(options, workerStatusPages.getDebugCapturePages())) - .setChannelzServlet( - new ChannelzServlet( - CHANNELZ_PATH, options, windmillServer::getWindmillServiceEndpoints)) - .setWindmillStreamFactory(windmillStreamFactory); - } else { - getDataClient = new ApplianceGetDataClient(windmillServer, getDataMetricTracker); - heartbeatSender = new ApplianceHeartbeatSender(windmillServer::getData); - stuckCommitDurationMillis = 0; + getDataStatusProvider = getDataClient::printHtml; + currentActiveCommitBytesProvider = workCommitter::currentActiveCommitBytes; + + this.streamingWorkerHarness = + SingleSourceWorkerHarness.builder() + .setStreamingWorkScheduler(streamingWorkScheduler) + .setWorkCommitter(workCommitter) + .setGetDataClient(getDataClient) + .setComputationStateFetcher(this.computationStateCache::get) + .setWaitForResources(() -> memoryMonitor.waitForResources("GetWork")) + .setHeartbeatSender(heartbeatSender) + .setThrottleTimeSupplier(windmillServer::getAndResetThrottleTime) + .setGetWorkSender(getWorkSender) + .build(); } + this.workerStatusReporter = + streamingWorkerStatusReporterFactory.createStatusReporter( + streamingWorkerHarness::getAndResetThrottleTime); this.activeWorkRefresher = new ActiveWorkRefresher( clock, options.getActiveWorkRefreshPeriodMillis(), - stuckCommitDurationMillis, + options.isEnableStreamingEngine() + ? Math.max(options.getStuckCommitDurationMillis(), 0) + : 0, computationStateCache::getAllPresentComputations, sampler, activeWorkRefreshExecutorFn, getDataMetricTracker::trackHeartbeats); this.statusPages = - statusPagesBuilder + createStatusPageBuilder(options, windmillStreamFactory, memoryMonitor) .setClock(clock) .setClientId(clientId) .setIsRunning(running) - .setStatusPages(workerStatusPages) .setStateCache(stateCache) .setComputationStateCache(this.computationStateCache) - .setCurrentActiveCommitBytes(workCommitter::currentActiveCommitBytes) - .setGetDataStatusProvider(getDataClient::printHtml) .setWorkUnitExecutor(workUnitExecutor) .setGlobalConfigHandle(configFetcher.getGlobalConfigHandle()) + .setChannelzServlet(channelzServlet) + .setGetDataStatusProvider(getDataStatusProvider) + .setCurrentActiveCommitBytes(currentActiveCommitBytesProvider) .build(); - Windmill.GetWorkRequest request = - Windmill.GetWorkRequest.newBuilder() - .setClientId(clientId) - .setMaxItems(chooseMaximumBundlesOutstanding()) - .setMaxBytes(MAX_GET_WORK_FETCH_BYTES) - .build(); - - this.streamingWorkerHarness = - SingleSourceWorkerHarness.builder() - .setStreamingWorkScheduler(streamingWorkScheduler) - .setWorkCommitter(workCommitter) - .setGetDataClient(getDataClient) - .setComputationStateFetcher(this.computationStateCache::get) - .setWaitForResources(() -> memoryMonitor.waitForResources("GetWork")) - .setHeartbeatSender(heartbeatSender) - .setGetWorkSender( - windmillServiceEnabled - ? GetWorkSender.forStreamingEngine( - receiver -> windmillServer.getWorkStream(request, receiver)) - : GetWorkSender.forAppliance(() -> windmillServer.getWork(request))) - .build(); - - LOG.debug("windmillServiceEnabled: {}", windmillServiceEnabled); + LOG.debug("isDirectPathEnabled: {}", options.getIsWindmillServiceDirectPathEnabled()); + LOG.debug("windmillServiceEnabled: {}", options.isEnableStreamingEngine()); LOG.debug("WindmillServiceEndpoint: {}", options.getWindmillServiceEndpoint()); LOG.debug("WindmillServicePort: {}", options.getWindmillServicePort()); LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport()); } - private static WindmillStreamPool separateHeartbeatPool( - WindmillServerStub windmillServer) { - return WindmillStreamPool.create(1, GET_DATA_STREAM_TIMEOUT, windmillServer::getDataStream); + private static StreamingWorkerStatusPages.Builder createStatusPageBuilder( + DataflowWorkerHarnessOptions options, + GrpcWindmillStreamFactory windmillStreamFactory, + MemoryMonitor memoryMonitor) { + WorkerStatusPages workerStatusPages = + WorkerStatusPages.create(DEFAULT_STATUS_PORT, memoryMonitor); + + StreamingWorkerStatusPages.Builder streamingStatusPages = + StreamingWorkerStatusPages.builder().setStatusPages(workerStatusPages); + + return options.isEnableStreamingEngine() + ? streamingStatusPages + .setDebugCapture( + new DebugCapture.Manager(options, workerStatusPages.getDebugCapturePages())) + .setWindmillStreamFactory(windmillStreamFactory) + : streamingStatusPages; + } + + private static ChannelzServlet createChannelZServlet( + DataflowWorkerHarnessOptions options, + Supplier> windmillEndpointProvider) { + return new ChannelzServlet(CHANNELZ_PATH, options, windmillEndpointProvider); + } + + private static HeartbeatSender createStreamingEngineHeartbeatSender( + DataflowWorkerHarnessOptions options, + WindmillServerStub windmillClient, + WindmillStreamPool getDataStreamPool, + StreamingGlobalConfigHandle globalConfigHandle) { + // Experiment gates the logic till backend changes are rollback safe + if (!DataflowRunner.hasExperiment(options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL) + || options.getUseSeparateWindmillHeartbeatStreams() != null) { + return StreamPoolHeartbeatSender.Create( + Boolean.TRUE.equals(options.getUseSeparateWindmillHeartbeatStreams()) + ? WindmillStreamPool.create(1, GET_DATA_STREAM_TIMEOUT, windmillClient::getDataStream) + : getDataStreamPool); + + } else { + return StreamPoolHeartbeatSender.Create( + WindmillStreamPool.create(1, GET_DATA_STREAM_TIMEOUT, windmillClient::getDataStream), + getDataStreamPool, + globalConfigHandle); + } } public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions options) { @@ -387,17 +462,21 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o failureTracker, () -> Optional.ofNullable(memoryMonitor.tryToDumpHeap()), clock); - StreamingWorkerStatusReporter workerStatusReporter = - StreamingWorkerStatusReporter.create( - dataflowServiceClient, - windmillServer::getAndResetThrottleTime, - stageInfo::values, - failureTracker, - streamingCounters, - memoryMonitor, - workExecutor, - options.getWindmillHarnessUpdateReportingPeriod().getMillis(), - options.getPerWorkerMetricsUpdateReportingPeriodMillis()); + StreamingWorkerStatusReporterFactory workerStatusReporterFactory = + throttleTimeSupplier -> + StreamingWorkerStatusReporter.builder() + .setDataflowServiceClient(dataflowServiceClient) + .setWindmillQuotaThrottleTime(throttleTimeSupplier) + .setAllStageInfo(stageInfo::values) + .setFailureTracker(failureTracker) + .setStreamingCounters(streamingCounters) + .setMemoryMonitor(memoryMonitor) + .setWorkExecutor(workExecutor) + .setWindmillHarnessUpdateReportingPeriodMillis( + options.getWindmillHarnessUpdateReportingPeriod().getMillis()) + .setPerWorkerMetricsUpdateReportingPeriodMillis( + options.getPerWorkerMetricsUpdateReportingPeriodMillis()) + .build(); return new StreamingDataflowWorker( windmillServer, @@ -410,7 +489,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o options, new HotKeyLogger(), clock, - workerStatusReporter, + workerStatusReporterFactory, failureTracker, workFailureProcessor, streamingCounters, @@ -418,7 +497,8 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o configFetcherComputationStateCacheAndWindmillClient.windmillStreamFactory(), Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("RefreshWork").build()), - stageInfo); + stageInfo, + configFetcherComputationStateCacheAndWindmillClient.windmillDispatcherClient()); } /** @@ -437,6 +517,8 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o WindmillServerStub windmillServer; ComputationStateCache computationStateCache; GrpcWindmillStreamFactory windmillStreamFactory; + ConfigFetcherComputationStateCacheAndWindmillClient.Builder builder = + ConfigFetcherComputationStateCacheAndWindmillClient.builder(); if (options.isEnableStreamingEngine()) { GrpcDispatcherClient dispatcherClient = GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options)); @@ -453,19 +535,20 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o options.getWindmillServiceStreamingRpcHealthCheckPeriodMs()) .build(); windmillServer = GrpcWindmillServer.create(options, windmillStreamFactory, dispatcherClient); + builder.setWindmillDispatcherClient(dispatcherClient); } else { if (options.getWindmillServiceEndpoint() != null || options.getLocalWindmillHostport().startsWith("grpc:")) { + GrpcDispatcherClient dispatcherClient = + GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options)); windmillStreamFactory = windmillStreamFactoryBuilder .setHealthCheckIntervalMillis( options.getWindmillServiceStreamingRpcHealthCheckPeriodMs()) .build(); windmillServer = - GrpcWindmillServer.create( - options, - windmillStreamFactory, - GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options))); + GrpcWindmillServer.create(options, windmillStreamFactory, dispatcherClient); + builder.setWindmillDispatcherClient(dispatcherClient); } else { windmillStreamFactory = windmillStreamFactoryBuilder.build(); windmillServer = new JniWindmillApplianceServer(options.getLocalWindmillHostport()); @@ -478,8 +561,50 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o computationStateCache = computationStateCacheFactory.apply(configFetcher); } - return ConfigFetcherComputationStateCacheAndWindmillClient.create( - configFetcher, computationStateCache, windmillServer, windmillStreamFactory); + return builder + .setConfigFetcher(configFetcher) + .setComputationStateCache(computationStateCache) + .setWindmillServer(windmillServer) + .setWindmillStreamFactory(windmillStreamFactory) + .build(); + } + + private static boolean isDirectPathPipeline(DataflowWorkerHarnessOptions options) { + if (options.isEnableStreamingEngine() && options.getIsWindmillServiceDirectPathEnabled()) { + boolean isIpV6Enabled = + Optional.ofNullable(options.getDataflowServiceOptions()) + .map(serviceOptions -> serviceOptions.contains(ENABLE_IPV6_EXPERIMENT)) + .orElse(false); + if (isIpV6Enabled) { + return true; + } + LOG.warn( + "DirectPath is currently only supported with IPv6 networking stack. Defaulting to" + + " CloudPath."); + } + return false; + } + + private static void validateWorkerOptions(DataflowWorkerHarnessOptions options) { + Preconditions.checkArgument( + options.isStreaming(), + "%s instantiated with options indicating batch use", + StreamingDataflowWorker.class.getName()); + + Preconditions.checkArgument( + !DataflowRunner.hasExperiment(options, BEAM_FN_API_EXPERIMENT), + "%s cannot be main() class with beam_fn_api enabled", + StreamingDataflowWorker.class.getSimpleName()); + } + + private static ChannelCachingStubFactory createStubFactory( + DataflowWorkerHarnessOptions workerOptions) { + return ChannelCachingRemoteStubFactory.create( + workerOptions.getGcpCredential(), + ChannelCache.create( + serviceAddress -> + remoteChannel( + serviceAddress, workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec()))); } @VisibleForTesting @@ -495,7 +620,9 @@ static StreamingDataflowWorker forTesting( Supplier clock, Function executorSupplier, StreamingGlobalConfigHandleImpl globalConfigHandle, - int localRetryTimeoutMs) { + int localRetryTimeoutMs, + StreamingCounters streamingCounters, + WindmillStubFactoryFactory stubFactory) { ConcurrentMap stageInfo = new ConcurrentHashMap<>(); BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options); WindmillStateCache stateCache = @@ -538,7 +665,6 @@ static StreamingDataflowWorker forTesting( stateNameMap, stateCache.forComputation(mapTask.getStageName()))); MemoryMonitor memoryMonitor = MemoryMonitor.fromOptions(options); - StreamingCounters streamingCounters = StreamingCounters.create(); FailureTracker failureTracker = options.isEnableStreamingEngine() ? StreamingEngineFailureTracker.create( @@ -554,19 +680,23 @@ static StreamingDataflowWorker forTesting( () -> Optional.ofNullable(memoryMonitor.tryToDumpHeap()), clock, localRetryTimeoutMs); - StreamingWorkerStatusReporter workerStatusReporter = - StreamingWorkerStatusReporter.forTesting( - publishCounters, - workUnitClient, - windmillServer::getAndResetThrottleTime, - stageInfo::values, - failureTracker, - streamingCounters, - memoryMonitor, - workExecutor, - executorSupplier, - options.getWindmillHarnessUpdateReportingPeriod().getMillis(), - options.getPerWorkerMetricsUpdateReportingPeriodMillis()); + StreamingWorkerStatusReporterFactory workerStatusReporterFactory = + throttleTimeSupplier -> + StreamingWorkerStatusReporter.builder() + .setPublishCounters(publishCounters) + .setDataflowServiceClient(workUnitClient) + .setWindmillQuotaThrottleTime(throttleTimeSupplier) + .setAllStageInfo(stageInfo::values) + .setFailureTracker(failureTracker) + .setStreamingCounters(streamingCounters) + .setMemoryMonitor(memoryMonitor) + .setWorkExecutor(workExecutor) + .setExecutorFactory(executorSupplier) + .setWindmillHarnessUpdateReportingPeriodMillis( + options.getWindmillHarnessUpdateReportingPeriod().getMillis()) + .setPerWorkerMetricsUpdateReportingPeriodMillis( + options.getPerWorkerMetricsUpdateReportingPeriodMillis()) + .build(); GrpcWindmillStreamFactory.Builder windmillStreamFactory = createGrpcwindmillStreamFactoryBuilder(options, 1) @@ -584,7 +714,7 @@ static StreamingDataflowWorker forTesting( options, hotKeyLogger, clock, - workerStatusReporter, + workerStatusReporterFactory, failureTracker, workFailureProcessor, streamingCounters, @@ -596,7 +726,8 @@ static StreamingDataflowWorker forTesting( .build() : windmillStreamFactory.build(), executorSupplier.apply("RefreshWork"), - stageInfo); + stageInfo, + GrpcDispatcherClient.create(options, stubFactory)); } private static GrpcWindmillStreamFactory.Builder createGrpcwindmillStreamFactoryBuilder( @@ -605,13 +736,7 @@ private static GrpcWindmillStreamFactory.Builder createGrpcwindmillStreamFactory !options.isEnableStreamingEngine() && options.getLocalWindmillHostport() != null ? GrpcWindmillServer.LOCALHOST_MAX_BACKOFF : Duration.millis(options.getWindmillServiceStreamMaxBackoffMillis()); - return GrpcWindmillStreamFactory.of( - JobHeader.newBuilder() - .setJobId(options.getJobId()) - .setProjectId(options.getProject()) - .setWorkerId(options.getWorkerId()) - .setClientId(clientId) - .build()) + return GrpcWindmillStreamFactory.of(createJobHeader(options, clientId)) .setWindmillMessagesBetweenIsReadyChecks(options.getWindmillMessagesBetweenIsReadyChecks()) .setMaxBackOffSupplier(() -> maxBackoff) .setLogEveryNStreamFailures(options.getWindmillServiceStreamingLogEveryNStreamFailures()) @@ -622,6 +747,15 @@ private static GrpcWindmillStreamFactory.Builder createGrpcwindmillStreamFactory options, "streaming_engine_disable_new_heartbeat_requests")); } + private static JobHeader createJobHeader(DataflowWorkerHarnessOptions options, long clientId) { + return JobHeader.newBuilder() + .setJobId(options.getJobId()) + .setProjectId(options.getProject()) + .setWorkerId(options.getWorkerId()) + .setClientId(clientId) + .build(); + } + private static BoundedQueueExecutor createWorkUnitExecutor(DataflowWorkerHarnessOptions options) { return new BoundedQueueExecutor( chooseMaxThreads(options), @@ -640,15 +774,7 @@ public static void main(String[] args) throws Exception { DataflowWorkerHarnessHelper.initializeGlobalStateAndPipelineOptions( StreamingDataflowWorker.class, DataflowWorkerHarnessOptions.class); DataflowWorkerHarnessHelper.configureLogging(options); - checkArgument( - options.isStreaming(), - "%s instantiated with options indicating batch use", - StreamingDataflowWorker.class.getName()); - - checkArgument( - !DataflowRunner.hasExperiment(options, "beam_fn_api"), - "%s cannot be main() class with beam_fn_api enabled", - StreamingDataflowWorker.class.getSimpleName()); + validateWorkerOptions(options); CoderTranslation.verifyModelCodersRegistered(); @@ -700,26 +826,20 @@ private static void enableBigQueryMetrics() { BigQuerySinkMetrics.setSupportStreamingInsertsMetrics(true); } + private static Windmill.GetWorkRequest createGetWorkRequest( + long clientId, DataflowWorkerHarnessOptions options) { + return Windmill.GetWorkRequest.newBuilder() + .setClientId(clientId) + .setMaxItems(chooseMaxBundlesOutstanding(options)) + .setMaxBytes(MAX_GET_WORK_FETCH_BYTES) + .build(); + } + @VisibleForTesting void reportPeriodicWorkerUpdatesForTest() { workerStatusReporter.reportPeriodicWorkerUpdates(); } - private int chooseMaximumNumberOfThreads() { - if (options.getNumberOfWorkerHarnessThreads() != 0) { - return options.getNumberOfWorkerHarnessThreads(); - } - return MAX_PROCESSING_THREADS; - } - - private int chooseMaximumBundlesOutstanding() { - int maxBundles = options.getMaxBundlesFromWindmillOutstanding(); - if (maxBundles > 0) { - return maxBundles; - } - return chooseMaximumNumberOfThreads() + 100; - } - @VisibleForTesting public boolean workExecutorIsEmpty() { return workUnitExecutor.executorQueueIsEmpty(); @@ -727,7 +847,7 @@ public boolean workExecutorIsEmpty() { @VisibleForTesting int numCommitThreads() { - return workCommitter.parallelism(); + return numCommitThreads; } @VisibleForTesting @@ -791,27 +911,17 @@ private void onCompleteCommit(CompleteCommit completeCommit) { completeCommit.shardedKey(), completeCommit.workId())); } - @VisibleForTesting - public Iterable buildCounters() { - return Iterables.concat( - streamingCounters - .pendingDeltaCounters() - .extractModifiedDeltaUpdates(DataflowCounterUpdateExtractor.INSTANCE), - streamingCounters - .pendingCumulativeCounters() - .extractUpdates(false, DataflowCounterUpdateExtractor.INSTANCE)); + @FunctionalInterface + private interface StreamingWorkerStatusReporterFactory { + StreamingWorkerStatusReporter createStatusReporter(Supplier throttleTimeSupplier); } @AutoValue abstract static class ConfigFetcherComputationStateCacheAndWindmillClient { - private static ConfigFetcherComputationStateCacheAndWindmillClient create( - ComputationConfig.Fetcher configFetcher, - ComputationStateCache computationStateCache, - WindmillServerStub windmillServer, - GrpcWindmillStreamFactory windmillStreamFactory) { - return new AutoValue_StreamingDataflowWorker_ConfigFetcherComputationStateCacheAndWindmillClient( - configFetcher, computationStateCache, windmillServer, windmillStreamFactory); + private static Builder builder() { + return new AutoValue_StreamingDataflowWorker_ConfigFetcherComputationStateCacheAndWindmillClient + .Builder(); } abstract ComputationConfig.Fetcher configFetcher(); @@ -821,6 +931,23 @@ private static ConfigFetcherComputationStateCacheAndWindmillClient create( abstract WindmillServerStub windmillServer(); abstract GrpcWindmillStreamFactory windmillStreamFactory(); + + abstract @Nullable GrpcDispatcherClient windmillDispatcherClient(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setConfigFetcher(ComputationConfig.Fetcher value); + + abstract Builder setComputationStateCache(ComputationStateCache value); + + abstract Builder setWindmillServer(WindmillServerStub value); + + abstract Builder setWindmillStreamFactory(GrpcWindmillStreamFactory value); + + abstract Builder setWindmillDispatcherClient(GrpcDispatcherClient value); + + abstract ConfigFetcherComputationStateCacheAndWindmillClient build(); + } } /** diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java index 3eed4ee6d835..086392f5673d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java @@ -356,6 +356,7 @@ private void closeStreamSender(Endpoint endpoint, Closeable sender) { } /** Add up all the throttle times of all streams including GetWorkerMetadataStream. */ + @Override public long getAndResetThrottleTime() { return backends.get().windmillStreams().values().stream() .map(WindmillStreamSender::getAndResetThrottleTime) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java index 06598b61c458..e89e83eafe5b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java @@ -66,6 +66,7 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness { private final Function> computationStateFetcher; private final ExecutorService workProviderExecutor; private final GetWorkSender getWorkSender; + private final Supplier throttleTimeSupplier; SingleSourceWorkerHarness( WorkCommitter workCommitter, @@ -74,7 +75,8 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness { StreamingWorkScheduler streamingWorkScheduler, Runnable waitForResources, Function> computationStateFetcher, - GetWorkSender getWorkSender) { + GetWorkSender getWorkSender, + Supplier throttleTimeSupplier) { this.workCommitter = workCommitter; this.getDataClient = getDataClient; this.heartbeatSender = heartbeatSender; @@ -90,6 +92,7 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness { .build()); this.isRunning = new AtomicBoolean(false); this.getWorkSender = getWorkSender; + this.throttleTimeSupplier = throttleTimeSupplier; } public static SingleSourceWorkerHarness.Builder builder() { @@ -144,6 +147,11 @@ public void shutdown() { workCommitter.stop(); } + @Override + public long getAndResetThrottleTime() { + return throttleTimeSupplier.get(); + } + private void streamingEngineDispatchLoop( Function getWorkStreamFactory) { while (isRunning.get()) { @@ -254,6 +262,8 @@ Builder setComputationStateFetcher( Builder setGetWorkSender(GetWorkSender getWorkSender); + Builder setThrottleTimeSupplier(Supplier throttleTimeSupplier); + SingleSourceWorkerHarness build(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java index c1b4570e2260..02b101def753 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java @@ -25,4 +25,6 @@ public interface StreamingWorkerHarness { void start(); void shutdown(); + + long getAndResetThrottleTime(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java index ba77d8e1ce26..4c06d12d50a3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java @@ -27,6 +27,7 @@ import com.google.api.services.dataflow.model.WorkItemStatus; import com.google.api.services.dataflow.model.WorkerMessage; import com.google.api.services.dataflow.model.WorkerMessageResponse; +import com.google.auto.value.AutoBuilder; import java.io.IOException; import java.math.RoundingMode; import java.util.ArrayList; @@ -97,7 +98,7 @@ public final class StreamingWorkerStatusReporter { // Used to track the number of WorkerMessages that have been sent without PerWorkerMetrics. private final AtomicLong workerMessagesIndex; - private StreamingWorkerStatusReporter( + StreamingWorkerStatusReporter( boolean publishCounters, WorkUnitClient dataflowServiceClient, Supplier windmillQuotaThrottleTime, @@ -131,57 +132,13 @@ private StreamingWorkerStatusReporter( this.workerMessagesIndex = new AtomicLong(); } - public static StreamingWorkerStatusReporter create( - WorkUnitClient workUnitClient, - Supplier windmillQuotaThrottleTime, - Supplier> allStageInfo, - FailureTracker failureTracker, - StreamingCounters streamingCounters, - MemoryMonitor memoryMonitor, - BoundedQueueExecutor workExecutor, - long windmillHarnessUpdateReportingPeriodMillis, - long perWorkerMetricsUpdateReportingPeriodMillis) { - return new StreamingWorkerStatusReporter( - /* publishCounters= */ true, - workUnitClient, - windmillQuotaThrottleTime, - allStageInfo, - failureTracker, - streamingCounters, - memoryMonitor, - workExecutor, - threadName -> - Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat(threadName).build()), - windmillHarnessUpdateReportingPeriodMillis, - perWorkerMetricsUpdateReportingPeriodMillis); - } - - @VisibleForTesting - public static StreamingWorkerStatusReporter forTesting( - boolean publishCounters, - WorkUnitClient workUnitClient, - Supplier windmillQuotaThrottleTime, - Supplier> allStageInfo, - FailureTracker failureTracker, - StreamingCounters streamingCounters, - MemoryMonitor memoryMonitor, - BoundedQueueExecutor workExecutor, - Function executorFactory, - long windmillHarnessUpdateReportingPeriodMillis, - long perWorkerMetricsUpdateReportingPeriodMillis) { - return new StreamingWorkerStatusReporter( - publishCounters, - workUnitClient, - windmillQuotaThrottleTime, - allStageInfo, - failureTracker, - streamingCounters, - memoryMonitor, - workExecutor, - executorFactory, - windmillHarnessUpdateReportingPeriodMillis, - perWorkerMetricsUpdateReportingPeriodMillis); + public static Builder builder() { + return new AutoBuilder_StreamingWorkerStatusReporter_Builder() + .setPublishCounters(true) + .setExecutorFactory( + threadName -> + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat(threadName).build())); } /** @@ -228,6 +185,22 @@ private static void shutdownExecutor(ScheduledExecutorService executor) { } } + // Calculates the PerWorkerMetrics reporting frequency, ensuring alignment with the + // WorkerMessages RPC schedule. The desired reporting period + // (perWorkerMetricsUpdateReportingPeriodMillis) is adjusted to the nearest multiple + // of the RPC interval (windmillHarnessUpdateReportingPeriodMillis). + private static long getPerWorkerMetricsUpdateFrequency( + long windmillHarnessUpdateReportingPeriodMillis, + long perWorkerMetricsUpdateReportingPeriodMillis) { + if (windmillHarnessUpdateReportingPeriodMillis == 0) { + return 0; + } + return LongMath.divide( + perWorkerMetricsUpdateReportingPeriodMillis, + windmillHarnessUpdateReportingPeriodMillis, + RoundingMode.CEILING); + } + @SuppressWarnings("FutureReturnValueIgnored") public void start() { reportHarnessStartup(); @@ -276,22 +249,6 @@ private void reportHarnessStartup() { } } - // Calculates the PerWorkerMetrics reporting frequency, ensuring alignment with the - // WorkerMessages RPC schedule. The desired reporting period - // (perWorkerMetricsUpdateReportingPeriodMillis) is adjusted to the nearest multiple - // of the RPC interval (windmillHarnessUpdateReportingPeriodMillis). - private static long getPerWorkerMetricsUpdateFrequency( - long windmillHarnessUpdateReportingPeriodMillis, - long perWorkerMetricsUpdateReportingPeriodMillis) { - if (windmillHarnessUpdateReportingPeriodMillis == 0) { - return 0; - } - return LongMath.divide( - perWorkerMetricsUpdateReportingPeriodMillis, - windmillHarnessUpdateReportingPeriodMillis, - RoundingMode.CEILING); - } - /** Sends counter updates to Dataflow backend. */ private void sendWorkerUpdatesToDataflowService( CounterSet deltaCounters, CounterSet cumulativeCounters) throws IOException { @@ -496,4 +453,33 @@ private void updateThreadMetrics() { .maxOutstandingBundles() .addValue((long) workExecutor.maximumElementsOutstanding()); } + + @AutoBuilder + public interface Builder { + Builder setPublishCounters(boolean publishCounters); + + Builder setDataflowServiceClient(WorkUnitClient dataflowServiceClient); + + Builder setWindmillQuotaThrottleTime(Supplier windmillQuotaThrottleTime); + + Builder setAllStageInfo(Supplier> allStageInfo); + + Builder setFailureTracker(FailureTracker failureTracker); + + Builder setStreamingCounters(StreamingCounters streamingCounters); + + Builder setMemoryMonitor(MemoryMonitor memoryMonitor); + + Builder setWorkExecutor(BoundedQueueExecutor workExecutor); + + Builder setExecutorFactory(Function executorFactory); + + Builder setWindmillHarnessUpdateReportingPeriodMillis( + long windmillHarnessUpdateReportingPeriodMillis); + + Builder setPerWorkerMetricsUpdateReportingPeriodMillis( + long perWorkerMetricsUpdateReportingPeriodMillis); + + StreamingWorkerStatusReporter build(); + } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index dadf02171235..6eeb7bd6bbfc 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -96,6 +96,7 @@ import org.apache.beam.runners.dataflow.util.CloudObjects; import org.apache.beam.runners.dataflow.util.PropertyNames; import org.apache.beam.runners.dataflow.util.Structs; +import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor; import org.apache.beam.runners.dataflow.worker.streaming.ComputationState; import org.apache.beam.runners.dataflow.worker.streaming.ComputationStateCache; import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork; @@ -104,6 +105,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.Work; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandleImpl; +import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingCounters; import org.apache.beam.runners.dataflow.worker.testing.RestoreDataflowLoggingMDC; import org.apache.beam.runners.dataflow.worker.testing.TestCountingSource; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; @@ -129,6 +131,9 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WatermarkHold; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest; import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.FakeGetDataClient; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory; +import org.apache.beam.runners.dataflow.worker.windmill.testing.FakeWindmillStubFactory; +import org.apache.beam.runners.dataflow.worker.windmill.testing.FakeWindmillStubFactoryFactory; import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; @@ -178,6 +183,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheStats; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.UnsignedLong; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -285,6 +291,7 @@ public Long get() { private final FakeWindmillServer server = new FakeWindmillServer( errorCollector, computationId -> computationStateCache.get(computationId)); + private StreamingCounters streamingCounters; public StreamingDataflowWorkerTest(Boolean streamingEngine) { this.streamingEngine = streamingEngine; @@ -304,9 +311,20 @@ private static CounterUpdate getCounter(Iterable counters, String return null; } + private Iterable buildCounters() { + return Iterables.concat( + streamingCounters + .pendingDeltaCounters() + .extractModifiedDeltaUpdates(DataflowCounterUpdateExtractor.INSTANCE), + streamingCounters + .pendingCumulativeCounters() + .extractUpdates(false, DataflowCounterUpdateExtractor.INSTANCE)); + } + @Before public void setUp() { server.clearCommitsReceived(); + streamingCounters = StreamingCounters.create(); } @After @@ -856,7 +874,13 @@ private StreamingDataflowWorker makeWorker( streamingDataflowWorkerTestParams.clock(), streamingDataflowWorkerTestParams.executorSupplier(), mockGlobalConfigHandle, - streamingDataflowWorkerTestParams.localRetryTimeoutMs()); + streamingDataflowWorkerTestParams.localRetryTimeoutMs(), + streamingCounters, + new FakeWindmillStubFactoryFactory( + new FakeWindmillStubFactory( + () -> + WindmillChannelFactory.inProcessChannel( + "StreamingDataflowWorkerTestChannel")))); this.computationStateCache = worker.getComputationStateCache(); return worker; } @@ -1715,7 +1739,7 @@ public void testMergeWindows() throws Exception { intervalWindowBytes(WINDOW_AT_ZERO))); Map result = server.waitForAndGetCommits(1); - Iterable counters = worker.buildCounters(); + Iterable counters = buildCounters(); // These tags and data are opaque strings and this is a change detector test. // The "/u" indicates the user's namespace, versus "/s" for system namespace @@ -1836,7 +1860,7 @@ public void testMergeWindows() throws Exception { expectedBytesRead += dataBuilder.build().getSerializedSize(); result = server.waitForAndGetCommits(1); - counters = worker.buildCounters(); + counters = buildCounters(); actualOutput = result.get(2L); assertEquals(1, actualOutput.getOutputMessagesCount()); @@ -2004,7 +2028,7 @@ public void testMergeWindowsCaching() throws Exception { intervalWindowBytes(WINDOW_AT_ZERO))); Map result = server.waitForAndGetCommits(1); - Iterable counters = worker.buildCounters(); + Iterable counters = buildCounters(); // These tags and data are opaque strings and this is a change detector test. // The "/u" indicates the user's namespace, versus "/s" for system namespace @@ -2125,7 +2149,7 @@ public void testMergeWindowsCaching() throws Exception { expectedBytesRead += dataBuilder.build().getSerializedSize(); result = server.waitForAndGetCommits(1); - counters = worker.buildCounters(); + counters = buildCounters(); actualOutput = result.get(2L); assertEquals(1, actualOutput.getOutputMessagesCount()); @@ -2430,7 +2454,7 @@ public void testUnboundedSources() throws Exception { null)); Map result = server.waitForAndGetCommits(1); - Iterable counters = worker.buildCounters(); + Iterable counters = buildCounters(); Windmill.WorkItemCommitRequest commit = result.get(1L); UnsignedLong finalizeId = UnsignedLong.fromLongBits(commit.getSourceStateUpdates().getFinalizeIds(0)); @@ -2492,7 +2516,7 @@ public void testUnboundedSources() throws Exception { null)); result = server.waitForAndGetCommits(1); - counters = worker.buildCounters(); + counters = buildCounters(); commit = result.get(2L); finalizeId = UnsignedLong.fromLongBits(commit.getSourceStateUpdates().getFinalizeIds(0)); @@ -2540,7 +2564,7 @@ public void testUnboundedSources() throws Exception { null)); result = server.waitForAndGetCommits(1); - counters = worker.buildCounters(); + counters = buildCounters(); commit = result.get(3L); finalizeId = UnsignedLong.fromLongBits(commit.getSourceStateUpdates().getFinalizeIds(0)); @@ -2710,7 +2734,7 @@ public void testUnboundedSourceWorkRetry() throws Exception { server.whenGetWorkCalled().thenReturn(work); Map result = server.waitForAndGetCommits(1); - Iterable counters = worker.buildCounters(); + Iterable counters = buildCounters(); Windmill.WorkItemCommitRequest commit = result.get(1L); UnsignedLong finalizeId = UnsignedLong.fromLongBits(commit.getSourceStateUpdates().getFinalizeIds(0)); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporterTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporterTest.java index 7e65a495638f..8726b1242f9b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporterTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporterTest.java @@ -59,18 +59,19 @@ public void setUp() { @Test public void testOverrideMaximumThreadCount() throws Exception { StreamingWorkerStatusReporter reporter = - StreamingWorkerStatusReporter.forTesting( - true, - mockWorkUnitClient, - () -> DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME, - () -> Collections.emptyList(), - mockFailureTracker, - StreamingCounters.create(), - mockMemoryMonitor, - mockExecutor, - (threadName) -> Executors.newSingleThreadScheduledExecutor(), - DEFAULT_HARNESS_REPORTING_PERIOD, - DEFAULT_PER_WORKER_METRICS_PERIOD); + StreamingWorkerStatusReporter.builder() + .setPublishCounters(true) + .setDataflowServiceClient(mockWorkUnitClient) + .setWindmillQuotaThrottleTime(() -> DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME) + .setAllStageInfo(Collections::emptyList) + .setFailureTracker(mockFailureTracker) + .setStreamingCounters(StreamingCounters.create()) + .setMemoryMonitor(mockMemoryMonitor) + .setWorkExecutor(mockExecutor) + .setExecutorFactory((threadName) -> Executors.newSingleThreadScheduledExecutor()) + .setWindmillHarnessUpdateReportingPeriodMillis(DEFAULT_HARNESS_REPORTING_PERIOD) + .setPerWorkerMetricsUpdateReportingPeriodMillis(DEFAULT_PER_WORKER_METRICS_PERIOD) + .build(); StreamingScalingReportResponse streamingScalingReportResponse = new StreamingScalingReportResponse().setMaximumThreadCount(10); WorkerMessageResponse workerMessageResponse = @@ -85,21 +86,21 @@ public void testOverrideMaximumThreadCount() throws Exception { @Test public void testHandleEmptyWorkerMessageResponse() throws Exception { StreamingWorkerStatusReporter reporter = - StreamingWorkerStatusReporter.forTesting( - true, - mockWorkUnitClient, - () -> DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME, - () -> Collections.emptyList(), - mockFailureTracker, - StreamingCounters.create(), - mockMemoryMonitor, - mockExecutor, - (threadName) -> Executors.newSingleThreadScheduledExecutor(), - DEFAULT_HARNESS_REPORTING_PERIOD, - DEFAULT_PER_WORKER_METRICS_PERIOD); - WorkerMessageResponse workerMessageResponse = new WorkerMessageResponse(); + StreamingWorkerStatusReporter.builder() + .setPublishCounters(true) + .setDataflowServiceClient(mockWorkUnitClient) + .setWindmillQuotaThrottleTime(() -> DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME) + .setAllStageInfo(Collections::emptyList) + .setFailureTracker(mockFailureTracker) + .setStreamingCounters(StreamingCounters.create()) + .setMemoryMonitor(mockMemoryMonitor) + .setWorkExecutor(mockExecutor) + .setExecutorFactory((threadName) -> Executors.newSingleThreadScheduledExecutor()) + .setWindmillHarnessUpdateReportingPeriodMillis(DEFAULT_HARNESS_REPORTING_PERIOD) + .setPerWorkerMetricsUpdateReportingPeriodMillis(DEFAULT_PER_WORKER_METRICS_PERIOD) + .build(); when(mockWorkUnitClient.reportWorkerMessage(any())) - .thenReturn(Collections.singletonList(workerMessageResponse)); + .thenReturn(Collections.singletonList(new WorkerMessageResponse())); reporter.reportPeriodicWorkerMessage(); verify(mockExecutor, Mockito.times(0)).setMaximumPoolSize(anyInt(), anyInt()); } From b7dba9e8b752d8a45653508592019ed6850d7e73 Mon Sep 17 00:00:00 2001 From: Martin Trieu Date: Tue, 22 Oct 2024 16:09:17 -0700 Subject: [PATCH 2/6] address PR comments --- .../dataflow/worker/StreamingDataflowWorker.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 333f674e94f3..5ee81b2ee362 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -84,6 +84,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCache; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingRemoteStubFactory; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingStubFactory; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.IsolationChannel; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactory; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactoryImpl; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache; @@ -603,8 +604,13 @@ private static ChannelCachingStubFactory createStubFactory( workerOptions.getGcpCredential(), ChannelCache.create( serviceAddress -> - remoteChannel( - serviceAddress, workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec()))); + // IsolationChannel will create and manage separate RPC channels to the same + // serviceAddress. + IsolationChannel.create( + () -> + remoteChannel( + serviceAddress, + workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec())))); } @VisibleForTesting @@ -860,7 +866,6 @@ ComputationStateCache getComputationStateCache() { return computationStateCache; } - @SuppressWarnings("FutureReturnValueIgnored") public void start() { running.set(true); configFetcher.start(); From 395abf10299d1bdce909f53211259eb477c0f23a Mon Sep 17 00:00:00 2001 From: Martin Trieu Date: Tue, 19 Nov 2024 00:47:09 -0800 Subject: [PATCH 3/6] rebase and fix tests --- .../worker/StreamingDataflowWorker.java | 23 +++++++++++-------- .../SingleSourceWorkerHarnessTest.java | 2 ++ 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 5ee81b2ee362..0ad9728b3063 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -45,6 +45,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.ComputationState; import org.apache.beam.runners.dataflow.worker.streaming.ComputationStateCache; import org.apache.beam.runners.dataflow.worker.streaming.StageInfo; +import org.apache.beam.runners.dataflow.worker.streaming.WeightedSemaphore; import org.apache.beam.runners.dataflow.worker.streaming.WorkHeartbeatResponseProcessor; import org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig; import org.apache.beam.runners.dataflow.worker.streaming.config.FixedGlobalConfigHandle; @@ -69,6 +70,8 @@ import org.apache.beam.runners.dataflow.worker.windmill.client.CloseableStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool; +import org.apache.beam.runners.dataflow.worker.windmill.client.commits.Commit; +import org.apache.beam.runners.dataflow.worker.windmill.client.commits.Commits; import org.apache.beam.runners.dataflow.worker.windmill.client.commits.CompleteCommit; import org.apache.beam.runners.dataflow.worker.windmill.client.commits.StreamingApplianceWorkCommitter; import org.apache.beam.runners.dataflow.worker.windmill.client.commits.StreamingEngineWorkCommitter; @@ -235,6 +238,7 @@ private StreamingDataflowWorker( Consumer getDataStatusProvider; Supplier currentActiveCommitBytesProvider; if (isDirectPathPipeline(options)) { + WeightedSemaphore maxCommitByteSemaphore = Commits.maxCommitByteSemaphore(); FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness = FanOutStreamingEngineWorkerHarness.create( createJobHeader(options, clientId), @@ -261,6 +265,8 @@ private StreamingDataflowWorker( Preconditions.checkNotNull(dispatcherClient), commitWorkStream -> StreamingEngineWorkCommitter.builder() + // Share the commitByteSemaphore across all created workCommitters. + .setCommitByteSemaphore(maxCommitByteSemaphore) .setBackendWorkerToken(commitWorkStream.backendWorkerToken()) .setOnCommitComplete(this::onCompleteCommit) .setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1)) @@ -276,7 +282,12 @@ private StreamingDataflowWorker( options, fanOutStreamingEngineWorkerHarness::currentWindmillEndpoints); this.streamingWorkerHarness = fanOutStreamingEngineWorkerHarness; } else { - Windmill.GetWorkRequest request = createGetWorkRequest(clientId, options); + Windmill.GetWorkRequest request = + Windmill.GetWorkRequest.newBuilder() + .setClientId(clientId) + .setMaxItems(chooseMaxBundlesOutstanding(options)) + .setMaxBytes(MAX_GET_WORK_FETCH_BYTES) + .build(); GetDataClient getDataClient; HeartbeatSender heartbeatSender; WorkCommitter workCommitter; @@ -301,6 +312,7 @@ private StreamingDataflowWorker( COMMIT_STREAM_TIMEOUT, windmillServer::commitWorkStream) ::getCloseableStream) + .setCommitByteSemaphore(Commits.maxCommitByteSemaphore()) .setNumCommitSenders(numCommitThreads) .setOnCommitComplete(this::onCompleteCommit) .build(); @@ -832,15 +844,6 @@ private static void enableBigQueryMetrics() { BigQuerySinkMetrics.setSupportStreamingInsertsMetrics(true); } - private static Windmill.GetWorkRequest createGetWorkRequest( - long clientId, DataflowWorkerHarnessOptions options) { - return Windmill.GetWorkRequest.newBuilder() - .setClientId(clientId) - .setMaxItems(chooseMaxBundlesOutstanding(options)) - .setMaxBytes(MAX_GET_WORK_FETCH_BYTES) - .build(); - } - @VisibleForTesting void reportPeriodicWorkerUpdatesForTest() { workerStatusReporter.reportPeriodicWorkerUpdates(); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java index 5a2df4baae61..a65985b91f4e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java @@ -60,6 +60,8 @@ private SingleSourceWorkerHarness createWorkerHarness( .setWaitForResources(waitForResources) .setStreamingWorkScheduler(streamingWorkScheduler) .setComputationStateFetcher(computationStateFetcher) + // no-op throttle time supplier. + .setThrottleTimeSupplier(() -> 0L) .setGetWorkSender(getWorkSender) .build(); } From 1c895f5e5f1e1a3c45c8fc9c751070934e63d3f3 Mon Sep 17 00:00:00 2001 From: Martin Trieu Date: Tue, 19 Nov 2024 14:00:22 -0800 Subject: [PATCH 4/6] address PR comments --- .../worker/StreamingDataflowWorker.java | 109 +++++++++++------- .../harness/StreamingWorkerStatusPages.java | 2 +- .../refresh/StreamPoolHeartbeatSender.java | 4 +- .../StreamingWorkerStatusReporterTest.java | 52 ++++----- .../StreamPoolHeartbeatSenderTest.java | 6 +- 5 files changed, 95 insertions(+), 78 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 0ad9728b3063..f26b1aa74852 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -63,6 +63,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerStatusReporter; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor; +import org.apache.beam.runners.dataflow.worker.windmill.ApplianceWindmillClient; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader; import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub; @@ -102,6 +103,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.ApplianceHeartbeatSender; import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender; import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.StreamPoolHeartbeatSender; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.fn.IdGenerator; import org.apache.beam.sdk.fn.IdGenerators; import org.apache.beam.sdk.fn.JvmInitializers; @@ -121,10 +123,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Implements a Streaming Dataflow worker. */ +/** + * For internal use only. + * + *

Implements a Streaming Dataflow worker. + */ @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) +@Internal public final class StreamingDataflowWorker { /** @@ -135,8 +142,6 @@ public final class StreamingDataflowWorker { */ public static final int MAX_SINK_BYTES = 10_000_000; - public static final String STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL = - "streaming_engine_use_job_settings_for_heartbeat_pool"; private static final Logger LOG = LoggerFactory.getLogger(StreamingDataflowWorker.class); /** @@ -161,6 +166,8 @@ public final class StreamingDataflowWorker { private static final String CHANNELZ_PATH = "/channelz"; private static final String BEAM_FN_API_EXPERIMENT = "beam_fn_api"; private static final String ENABLE_IPV6_EXPERIMENT = "enable_private_ipv6_google_access"; + private static final String STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL_EXPERIMENT = + "streaming_engine_use_job_settings_for_heartbeat_pool"; private final WindmillStateCache stateCache; private final StreamingWorkerStatusPages statusPages; @@ -260,7 +267,7 @@ private StreamingDataflowWorker( processingContext, getWorkStreamLatencies); }), - createStubFactory(options), + createFanOutStubFactory(options), GetWorkBudgetDistributors.distributeEvenly(), Preconditions.checkNotNull(dispatcherClient), commitWorkStream -> @@ -278,10 +285,11 @@ private StreamingDataflowWorker( currentActiveCommitBytesProvider = fanOutStreamingEngineWorkerHarness::currentActiveCommitBytes; channelzServlet = - createChannelZServlet( + createChannelzServlet( options, fanOutStreamingEngineWorkerHarness::currentWindmillEndpoints); this.streamingWorkerHarness = fanOutStreamingEngineWorkerHarness; } else { + // Non-direct path pipelines. Windmill.GetWorkRequest request = Windmill.GetWorkRequest.newBuilder() .setClientId(clientId) @@ -303,7 +311,7 @@ private StreamingDataflowWorker( createStreamingEngineHeartbeatSender( options, windmillServer, getDataStreamPool, configFetcher.getGlobalConfigHandle()); channelzServlet = - createChannelZServlet(options, windmillServer::getWindmillServiceEndpoints); + createChannelzServlet(options, windmillServer::getWindmillServiceEndpoints); workCommitter = StreamingEngineWorkCommitter.builder() .setCommitWorkStreamFactory( @@ -398,7 +406,7 @@ private static StreamingWorkerStatusPages.Builder createStatusPageBuilder( : streamingStatusPages; } - private static ChannelzServlet createChannelZServlet( + private static ChannelzServlet createChannelzServlet( DataflowWorkerHarnessOptions options, Supplier> windmillEndpointProvider) { return new ChannelzServlet(CHANNELZ_PATH, options, windmillEndpointProvider); @@ -410,15 +418,16 @@ private static HeartbeatSender createStreamingEngineHeartbeatSender( WindmillStreamPool getDataStreamPool, StreamingGlobalConfigHandle globalConfigHandle) { // Experiment gates the logic till backend changes are rollback safe - if (!DataflowRunner.hasExperiment(options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL) + if (!DataflowRunner.hasExperiment( + options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL_EXPERIMENT) || options.getUseSeparateWindmillHeartbeatStreams() != null) { - return StreamPoolHeartbeatSender.Create( + return StreamPoolHeartbeatSender.create( Boolean.TRUE.equals(options.getUseSeparateWindmillHeartbeatStreams()) ? WindmillStreamPool.create(1, GET_DATA_STREAM_TIMEOUT, windmillClient::getDataStream) : getDataStreamPool); } else { - return StreamPoolHeartbeatSender.Create( + return StreamPoolHeartbeatSender.create( WindmillStreamPool.create(1, GET_DATA_STREAM_TIMEOUT, windmillClient::getDataStream), getDataStreamPool, globalConfigHandle); @@ -526,60 +535,72 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o WorkUnitClient dataflowServiceClient, GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder, Function computationStateCacheFactory) { - ComputationConfig.Fetcher configFetcher; - WindmillServerStub windmillServer; - ComputationStateCache computationStateCache; - GrpcWindmillStreamFactory windmillStreamFactory; - ConfigFetcherComputationStateCacheAndWindmillClient.Builder builder = - ConfigFetcherComputationStateCacheAndWindmillClient.builder(); if (options.isEnableStreamingEngine()) { GrpcDispatcherClient dispatcherClient = GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options)); - configFetcher = + ComputationConfig.Fetcher configFetcher = StreamingEngineComputationConfigFetcher.create( options.getGlobalConfigRefreshPeriod().getMillis(), dataflowServiceClient); configFetcher.getGlobalConfigHandle().registerConfigObserver(dispatcherClient::onJobConfig); - computationStateCache = computationStateCacheFactory.apply(configFetcher); - windmillStreamFactory = + ComputationStateCache computationStateCache = + computationStateCacheFactory.apply(configFetcher); + GrpcWindmillStreamFactory windmillStreamFactory = windmillStreamFactoryBuilder .setProcessHeartbeatResponses( new WorkHeartbeatResponseProcessor(computationStateCache::get)) .setHealthCheckIntervalMillis( options.getWindmillServiceStreamingRpcHealthCheckPeriodMs()) .build(); - windmillServer = GrpcWindmillServer.create(options, windmillStreamFactory, dispatcherClient); - builder.setWindmillDispatcherClient(dispatcherClient); + return ConfigFetcherComputationStateCacheAndWindmillClient.builder() + .setWindmillDispatcherClient(dispatcherClient) + .setConfigFetcher(configFetcher) + .setComputationStateCache(computationStateCache) + .setWindmillStreamFactory(windmillStreamFactory) + .setWindmillServer( + GrpcWindmillServer.create(options, windmillStreamFactory, dispatcherClient)) + .build(); } else { + // Build with local Windmill client. if (options.getWindmillServiceEndpoint() != null || options.getLocalWindmillHostport().startsWith("grpc:")) { GrpcDispatcherClient dispatcherClient = GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options)); - windmillStreamFactory = + GrpcWindmillStreamFactory windmillStreamFactory = windmillStreamFactoryBuilder .setHealthCheckIntervalMillis( options.getWindmillServiceStreamingRpcHealthCheckPeriodMs()) .build(); - windmillServer = + GrpcWindmillServer windmillServer = GrpcWindmillServer.create(options, windmillStreamFactory, dispatcherClient); - builder.setWindmillDispatcherClient(dispatcherClient); - } else { - windmillStreamFactory = windmillStreamFactoryBuilder.build(); - windmillServer = new JniWindmillApplianceServer(options.getLocalWindmillHostport()); + ComputationConfig.Fetcher configFetcher = + createApplianceComputationConfigFetcher(windmillServer); + return ConfigFetcherComputationStateCacheAndWindmillClient.builder() + .setWindmillDispatcherClient(dispatcherClient) + .setWindmillServer(windmillServer) + .setWindmillStreamFactory(windmillStreamFactory) + .setConfigFetcher(configFetcher) + .setComputationStateCache(computationStateCacheFactory.apply(configFetcher)) + .build(); } - configFetcher = - new StreamingApplianceComputationConfigFetcher( - windmillServer::getConfig, - new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build())); - computationStateCache = computationStateCacheFactory.apply(configFetcher); + WindmillServerStub windmillServer = + new JniWindmillApplianceServer(options.getLocalWindmillHostport()); + ComputationConfig.Fetcher configFetcher = + createApplianceComputationConfigFetcher(windmillServer); + return ConfigFetcherComputationStateCacheAndWindmillClient.builder() + .setWindmillStreamFactory(windmillStreamFactoryBuilder.build()) + .setWindmillServer(windmillServer) + .setConfigFetcher(configFetcher) + .setComputationStateCache(computationStateCacheFactory.apply(configFetcher)) + .build(); } + } - return builder - .setConfigFetcher(configFetcher) - .setComputationStateCache(computationStateCache) - .setWindmillServer(windmillServer) - .setWindmillStreamFactory(windmillStreamFactory) - .build(); + private static StreamingApplianceComputationConfigFetcher createApplianceComputationConfigFetcher( + ApplianceWindmillClient windmillClient) { + return new StreamingApplianceComputationConfigFetcher( + windmillClient::getConfig, + new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build())); } private static boolean isDirectPathPipeline(DataflowWorkerHarnessOptions options) { @@ -588,13 +609,19 @@ private static boolean isDirectPathPipeline(DataflowWorkerHarnessOptions options Optional.ofNullable(options.getDataflowServiceOptions()) .map(serviceOptions -> serviceOptions.contains(ENABLE_IPV6_EXPERIMENT)) .orElse(false); + if (isIpV6Enabled) { return true; } + LOG.warn( - "DirectPath is currently only supported with IPv6 networking stack. Defaulting to" - + " CloudPath."); + "DirectPath is currently only supported with IPv6 networking stack. This requires setting " + + "\"enable_private_ipv6_google_access\" in experimental pipeline options. " + + "For information on how to set experimental pipeline options see " + + "https://cloud.google.com/dataflow/docs/guides/setting-pipeline-options#experimental. " + + "Defaulting to CloudPath."); } + return false; } @@ -610,7 +637,7 @@ private static void validateWorkerOptions(DataflowWorkerHarnessOptions options) StreamingDataflowWorker.class.getSimpleName()); } - private static ChannelCachingStubFactory createStubFactory( + private static ChannelCachingStubFactory createFanOutStubFactory( DataflowWorkerHarnessOptions workerOptions) { return ChannelCachingRemoteStubFactory.create( workerOptions.getGcpCredential(), diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java index 6981312eff1d..ddfc6809231a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java @@ -258,7 +258,7 @@ public interface Builder { Builder setDebugCapture(DebugCapture.Manager debugCapture); - Builder setChannelzServlet(ChannelzServlet channelzServlet); + Builder setChannelzServlet(@Nullable ChannelzServlet channelzServlet); Builder setStateCache(WindmillStateCache stateCache); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java index fa36b11ffe55..f54091dc2b95 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java @@ -42,7 +42,7 @@ private StreamPoolHeartbeatSender( this.heartbeatStreamPool.set(heartbeatStreamPool); } - public static StreamPoolHeartbeatSender Create( + public static StreamPoolHeartbeatSender create( @Nonnull WindmillStreamPool heartbeatStreamPool) { return new StreamPoolHeartbeatSender(heartbeatStreamPool); } @@ -55,7 +55,7 @@ public static StreamPoolHeartbeatSender Create( * enabled. * @param getDataPool stream to use when using separate streams for heartbeat is disabled. */ - public static StreamPoolHeartbeatSender Create( + public static StreamPoolHeartbeatSender create( @Nonnull WindmillStreamPool dedicatedHeartbeatPool, @Nonnull WindmillStreamPool getDataPool, @Nonnull StreamingGlobalConfigHandle configHandle) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporterTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporterTest.java index 8726b1242f9b..f348e4cf1bdb 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporterTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporterTest.java @@ -39,14 +39,15 @@ @RunWith(JUnit4.class) public class StreamingWorkerStatusReporterTest { - private final long DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME = 1000; - private final long DEFAULT_HARNESS_REPORTING_PERIOD = 10000; - private final long DEFAULT_PER_WORKER_METRICS_PERIOD = 30000; + private static final long DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME = 1000; + private static final long DEFAULT_HARNESS_REPORTING_PERIOD = 10000; + private static final long DEFAULT_PER_WORKER_METRICS_PERIOD = 30000; private BoundedQueueExecutor mockExecutor; private WorkUnitClient mockWorkUnitClient; private FailureTracker mockFailureTracker; private MemoryMonitor mockMemoryMonitor; + private StreamingWorkerStatusReporter reporter; @Before public void setUp() { @@ -54,24 +55,11 @@ public void setUp() { this.mockWorkUnitClient = mock(WorkUnitClient.class); this.mockFailureTracker = mock(FailureTracker.class); this.mockMemoryMonitor = mock(MemoryMonitor.class); + this.reporter = buildWorkerStatusReporterForTest(); } @Test public void testOverrideMaximumThreadCount() throws Exception { - StreamingWorkerStatusReporter reporter = - StreamingWorkerStatusReporter.builder() - .setPublishCounters(true) - .setDataflowServiceClient(mockWorkUnitClient) - .setWindmillQuotaThrottleTime(() -> DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME) - .setAllStageInfo(Collections::emptyList) - .setFailureTracker(mockFailureTracker) - .setStreamingCounters(StreamingCounters.create()) - .setMemoryMonitor(mockMemoryMonitor) - .setWorkExecutor(mockExecutor) - .setExecutorFactory((threadName) -> Executors.newSingleThreadScheduledExecutor()) - .setWindmillHarnessUpdateReportingPeriodMillis(DEFAULT_HARNESS_REPORTING_PERIOD) - .setPerWorkerMetricsUpdateReportingPeriodMillis(DEFAULT_PER_WORKER_METRICS_PERIOD) - .build(); StreamingScalingReportResponse streamingScalingReportResponse = new StreamingScalingReportResponse().setMaximumThreadCount(10); WorkerMessageResponse workerMessageResponse = @@ -85,23 +73,25 @@ public void testOverrideMaximumThreadCount() throws Exception { @Test public void testHandleEmptyWorkerMessageResponse() throws Exception { - StreamingWorkerStatusReporter reporter = - StreamingWorkerStatusReporter.builder() - .setPublishCounters(true) - .setDataflowServiceClient(mockWorkUnitClient) - .setWindmillQuotaThrottleTime(() -> DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME) - .setAllStageInfo(Collections::emptyList) - .setFailureTracker(mockFailureTracker) - .setStreamingCounters(StreamingCounters.create()) - .setMemoryMonitor(mockMemoryMonitor) - .setWorkExecutor(mockExecutor) - .setExecutorFactory((threadName) -> Executors.newSingleThreadScheduledExecutor()) - .setWindmillHarnessUpdateReportingPeriodMillis(DEFAULT_HARNESS_REPORTING_PERIOD) - .setPerWorkerMetricsUpdateReportingPeriodMillis(DEFAULT_PER_WORKER_METRICS_PERIOD) - .build(); when(mockWorkUnitClient.reportWorkerMessage(any())) .thenReturn(Collections.singletonList(new WorkerMessageResponse())); reporter.reportPeriodicWorkerMessage(); verify(mockExecutor, Mockito.times(0)).setMaximumPoolSize(anyInt(), anyInt()); } + + private StreamingWorkerStatusReporter buildWorkerStatusReporterForTest() { + return StreamingWorkerStatusReporter.builder() + .setPublishCounters(true) + .setDataflowServiceClient(mockWorkUnitClient) + .setWindmillQuotaThrottleTime(() -> DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME) + .setAllStageInfo(Collections::emptyList) + .setFailureTracker(mockFailureTracker) + .setStreamingCounters(StreamingCounters.create()) + .setMemoryMonitor(mockMemoryMonitor) + .setWorkExecutor(mockExecutor) + .setExecutorFactory((threadName) -> Executors.newSingleThreadScheduledExecutor()) + .setWindmillHarnessUpdateReportingPeriodMillis(DEFAULT_HARNESS_REPORTING_PERIOD) + .setPerWorkerMetricsUpdateReportingPeriodMillis(DEFAULT_PER_WORKER_METRICS_PERIOD) + .build(); + } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSenderTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSenderTest.java index ed915088d0a6..acbb3aebbcf5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSenderTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSenderTest.java @@ -39,7 +39,7 @@ public class StreamPoolHeartbeatSenderTest { public void sendsHeartbeatsOnStream() { FakeWindmillServer server = new FakeWindmillServer(new ErrorCollector(), c -> Optional.empty()); StreamPoolHeartbeatSender heartbeatSender = - StreamPoolHeartbeatSender.Create( + StreamPoolHeartbeatSender.create( WindmillStreamPool.create(1, Duration.standardSeconds(10), server::getDataStream)); Heartbeats.Builder heartbeatsBuilder = Heartbeats.builder(); heartbeatsBuilder @@ -59,7 +59,7 @@ public void sendsHeartbeatsOnDedicatedStream() { FakeGlobalConfigHandle configHandle = new FakeGlobalConfigHandle(getGlobalConfig(/*useSeparateHeartbeatStreams=*/ true)); StreamPoolHeartbeatSender heartbeatSender = - StreamPoolHeartbeatSender.Create( + StreamPoolHeartbeatSender.create( WindmillStreamPool.create( 1, Duration.standardSeconds(10), dedicatedServer::getDataStream), WindmillStreamPool.create( @@ -104,7 +104,7 @@ public void sendsHeartbeatsOnGetDataStream() { FakeGlobalConfigHandle configHandle = new FakeGlobalConfigHandle(getGlobalConfig(/*useSeparateHeartbeatStreams=*/ false)); StreamPoolHeartbeatSender heartbeatSender = - StreamPoolHeartbeatSender.Create( + StreamPoolHeartbeatSender.create( WindmillStreamPool.create( 1, Duration.standardSeconds(10), dedicatedServer::getDataStream), WindmillStreamPool.create( From a93bae57118195fe997284fb570c1fe4b3ebd8bd Mon Sep 17 00:00:00 2001 From: Martin Trieu Date: Mon, 25 Nov 2024 10:11:52 -0800 Subject: [PATCH 5/6] address PR comments --- .../worker/StreamingDataflowWorker.java | 60 +++++++++---------- .../harness/SingleSourceWorkerHarness.java | 10 ++-- .../harness/StreamingWorkerHarness.java | 4 +- .../StreamingWorkerStatusReporter.java | 10 ++-- .../harness/ThrottledTimeTracker.java | 32 ++++++++++ .../SingleSourceWorkerHarnessTest.java | 2 +- 6 files changed, 75 insertions(+), 43 deletions(-) create mode 100644 runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/ThrottledTimeTracker.java diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index f26b1aa74852..07e8cfb4d359 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -61,6 +61,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerHarness; import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerStatusPages; import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerStatusReporter; +import org.apache.beam.runners.dataflow.worker.streaming.harness.ThrottledTimeTracker; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor; import org.apache.beam.runners.dataflow.worker.windmill.ApplianceWindmillClient; @@ -347,14 +348,13 @@ private StreamingDataflowWorker( .setComputationStateFetcher(this.computationStateCache::get) .setWaitForResources(() -> memoryMonitor.waitForResources("GetWork")) .setHeartbeatSender(heartbeatSender) - .setThrottleTimeSupplier(windmillServer::getAndResetThrottleTime) + .setThrottleTimeTracker(windmillServer::getAndResetThrottleTime) .setGetWorkSender(getWorkSender) .build(); } this.workerStatusReporter = - streamingWorkerStatusReporterFactory.createStatusReporter( - streamingWorkerHarness::getAndResetThrottleTime); + streamingWorkerStatusReporterFactory.createStatusReporter(streamingWorkerHarness); this.activeWorkRefresher = new ActiveWorkRefresher( clock, @@ -559,41 +559,41 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o .setWindmillServer( GrpcWindmillServer.create(options, windmillStreamFactory, dispatcherClient)) .build(); - } else { - // Build with local Windmill client. - if (options.getWindmillServiceEndpoint() != null - || options.getLocalWindmillHostport().startsWith("grpc:")) { - GrpcDispatcherClient dispatcherClient = - GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options)); - GrpcWindmillStreamFactory windmillStreamFactory = - windmillStreamFactoryBuilder - .setHealthCheckIntervalMillis( - options.getWindmillServiceStreamingRpcHealthCheckPeriodMs()) - .build(); - GrpcWindmillServer windmillServer = - GrpcWindmillServer.create(options, windmillStreamFactory, dispatcherClient); - ComputationConfig.Fetcher configFetcher = - createApplianceComputationConfigFetcher(windmillServer); - return ConfigFetcherComputationStateCacheAndWindmillClient.builder() - .setWindmillDispatcherClient(dispatcherClient) - .setWindmillServer(windmillServer) - .setWindmillStreamFactory(windmillStreamFactory) - .setConfigFetcher(configFetcher) - .setComputationStateCache(computationStateCacheFactory.apply(configFetcher)) - .build(); - } + } - WindmillServerStub windmillServer = - new JniWindmillApplianceServer(options.getLocalWindmillHostport()); + // Build with local Windmill client. + if (options.getWindmillServiceEndpoint() != null + || options.getLocalWindmillHostport().startsWith("grpc:")) { + GrpcDispatcherClient dispatcherClient = + GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options)); + GrpcWindmillStreamFactory windmillStreamFactory = + windmillStreamFactoryBuilder + .setHealthCheckIntervalMillis( + options.getWindmillServiceStreamingRpcHealthCheckPeriodMs()) + .build(); + GrpcWindmillServer windmillServer = + GrpcWindmillServer.create(options, windmillStreamFactory, dispatcherClient); ComputationConfig.Fetcher configFetcher = createApplianceComputationConfigFetcher(windmillServer); return ConfigFetcherComputationStateCacheAndWindmillClient.builder() - .setWindmillStreamFactory(windmillStreamFactoryBuilder.build()) + .setWindmillDispatcherClient(dispatcherClient) .setWindmillServer(windmillServer) + .setWindmillStreamFactory(windmillStreamFactory) .setConfigFetcher(configFetcher) .setComputationStateCache(computationStateCacheFactory.apply(configFetcher)) .build(); } + + WindmillServerStub windmillServer = + new JniWindmillApplianceServer(options.getLocalWindmillHostport()); + ComputationConfig.Fetcher configFetcher = + createApplianceComputationConfigFetcher(windmillServer); + return ConfigFetcherComputationStateCacheAndWindmillClient.builder() + .setWindmillStreamFactory(windmillStreamFactoryBuilder.build()) + .setWindmillServer(windmillServer) + .setConfigFetcher(configFetcher) + .setComputationStateCache(computationStateCacheFactory.apply(configFetcher)) + .build(); } private static StreamingApplianceComputationConfigFetcher createApplianceComputationConfigFetcher( @@ -948,7 +948,7 @@ private void onCompleteCommit(CompleteCommit completeCommit) { @FunctionalInterface private interface StreamingWorkerStatusReporterFactory { - StreamingWorkerStatusReporter createStatusReporter(Supplier throttleTimeSupplier); + StreamingWorkerStatusReporter createStatusReporter(ThrottledTimeTracker throttledTimeTracker); } @AutoValue diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java index e89e83eafe5b..0ec232723ef8 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java @@ -66,7 +66,7 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness { private final Function> computationStateFetcher; private final ExecutorService workProviderExecutor; private final GetWorkSender getWorkSender; - private final Supplier throttleTimeSupplier; + private final ThrottledTimeTracker throttleTimeTracker; SingleSourceWorkerHarness( WorkCommitter workCommitter, @@ -76,7 +76,7 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness { Runnable waitForResources, Function> computationStateFetcher, GetWorkSender getWorkSender, - Supplier throttleTimeSupplier) { + ThrottledTimeTracker throttleTimeTracker) { this.workCommitter = workCommitter; this.getDataClient = getDataClient; this.heartbeatSender = heartbeatSender; @@ -92,7 +92,7 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness { .build()); this.isRunning = new AtomicBoolean(false); this.getWorkSender = getWorkSender; - this.throttleTimeSupplier = throttleTimeSupplier; + this.throttleTimeTracker = throttleTimeTracker; } public static SingleSourceWorkerHarness.Builder builder() { @@ -149,7 +149,7 @@ public void shutdown() { @Override public long getAndResetThrottleTime() { - return throttleTimeSupplier.get(); + return throttleTimeTracker.getAndResetThrottleTime(); } private void streamingEngineDispatchLoop( @@ -262,7 +262,7 @@ Builder setComputationStateFetcher( Builder setGetWorkSender(GetWorkSender getWorkSender); - Builder setThrottleTimeSupplier(Supplier throttleTimeSupplier); + Builder setThrottleTimeTracker(ThrottledTimeTracker throttleTimeTracker); SingleSourceWorkerHarness build(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java index 02b101def753..779dd50d2ba9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java @@ -21,10 +21,8 @@ /** Provides an interface to start streaming worker processing. */ @Internal -public interface StreamingWorkerHarness { +public interface StreamingWorkerHarness extends ThrottledTimeTracker { void start(); void shutdown(); - - long getAndResetThrottleTime(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java index 4c06d12d50a3..8a3719a0270b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java @@ -79,7 +79,7 @@ public final class StreamingWorkerStatusReporter { private final int initialMaxThreadCount; private final int initialMaxBundlesOutstanding; private final WorkUnitClient dataflowServiceClient; - private final Supplier windmillQuotaThrottleTime; + private final ThrottledTimeTracker windmillQuotaThrottleTime; private final Supplier> allStageInfo; private final FailureTracker failureTracker; private final StreamingCounters streamingCounters; @@ -101,7 +101,7 @@ public final class StreamingWorkerStatusReporter { StreamingWorkerStatusReporter( boolean publishCounters, WorkUnitClient dataflowServiceClient, - Supplier windmillQuotaThrottleTime, + ThrottledTimeTracker windmillQuotaThrottleTime, Supplier> allStageInfo, FailureTracker failureTracker, StreamingCounters streamingCounters, @@ -253,7 +253,9 @@ private void reportHarnessStartup() { private void sendWorkerUpdatesToDataflowService( CounterSet deltaCounters, CounterSet cumulativeCounters) throws IOException { // Throttle time is tracked by the windmillServer but is reported to DFE here. - streamingCounters.windmillQuotaThrottling().addValue(windmillQuotaThrottleTime.get()); + streamingCounters + .windmillQuotaThrottling() + .addValue(windmillQuotaThrottleTime.getAndResetThrottleTime()); if (memoryMonitor.isThrashing()) { streamingCounters.memoryThrashing().addValue(1); } @@ -460,7 +462,7 @@ public interface Builder { Builder setDataflowServiceClient(WorkUnitClient dataflowServiceClient); - Builder setWindmillQuotaThrottleTime(Supplier windmillQuotaThrottleTime); + Builder setWindmillQuotaThrottleTime(ThrottledTimeTracker windmillQuotaThrottleTime); Builder setAllStageInfo(Supplier> allStageInfo); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/ThrottledTimeTracker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/ThrottledTimeTracker.java new file mode 100644 index 000000000000..9abd88a5cd6d --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/ThrottledTimeTracker.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming.harness; + +import org.apache.beam.sdk.annotations.Internal; + +/** + * Tracks time spent in a throttled state due to {@code Status.RESOURCE_EXHAUSTED} errors returned + * from gRPC calls. + */ +@Internal +@FunctionalInterface +public interface ThrottledTimeTracker { + + /** Returns the combined total of all throttle times and resets those times to 0. */ + long getAndResetThrottleTime(); +} diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java index a65985b91f4e..39bd98f6e8f4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java @@ -61,7 +61,7 @@ private SingleSourceWorkerHarness createWorkerHarness( .setStreamingWorkScheduler(streamingWorkScheduler) .setComputationStateFetcher(computationStateFetcher) // no-op throttle time supplier. - .setThrottleTimeSupplier(() -> 0L) + .setThrottleTimeTracker(() -> 0L) .setGetWorkSender(getWorkSender) .build(); } From 978b660891aa4f266ee88807ae3df91e0b079f16 Mon Sep 17 00:00:00 2001 From: Martin Trieu Date: Mon, 25 Nov 2024 17:28:48 -0800 Subject: [PATCH 6/6] address PR comments --- .../dataflow/worker/StreamingDataflowWorker.java | 4 ++-- .../streaming/harness/SingleSourceWorkerHarness.java | 11 ++++++----- .../streaming/harness/StreamingWorkerHarness.java | 1 + .../harness/StreamingWorkerStatusReporter.java | 3 ++- .../windmill/client/throttling/ThrottleTimer.java | 3 ++- .../client/throttling}/ThrottledTimeTracker.java | 2 +- .../harness/SingleSourceWorkerHarnessTest.java | 2 +- 7 files changed, 15 insertions(+), 11 deletions(-) rename runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/{streaming/harness => windmill/client/throttling}/ThrottledTimeTracker.java (93%) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 07e8cfb4d359..088a28e9b2db 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -61,7 +61,6 @@ import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerHarness; import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerStatusPages; import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerStatusReporter; -import org.apache.beam.runners.dataflow.worker.streaming.harness.ThrottledTimeTracker; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor; import org.apache.beam.runners.dataflow.worker.windmill.ApplianceWindmillClient; @@ -92,6 +91,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.IsolationChannel; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactory; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactoryImpl; +import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottledTimeTracker; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache; import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget; import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetDistributors; @@ -348,7 +348,7 @@ private StreamingDataflowWorker( .setComputationStateFetcher(this.computationStateCache::get) .setWaitForResources(() -> memoryMonitor.waitForResources("GetWork")) .setHeartbeatSender(heartbeatSender) - .setThrottleTimeTracker(windmillServer::getAndResetThrottleTime) + .setThrottledTimeTracker(windmillServer::getAndResetThrottleTime) .setGetWorkSender(getWorkSender) .build(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java index 0ec232723ef8..089541da8fc8 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java @@ -37,6 +37,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream; import org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter; import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.GetDataClient; +import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottledTimeTracker; import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver; import org.apache.beam.runners.dataflow.worker.windmill.work.processing.StreamingWorkScheduler; import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender; @@ -66,7 +67,7 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness { private final Function> computationStateFetcher; private final ExecutorService workProviderExecutor; private final GetWorkSender getWorkSender; - private final ThrottledTimeTracker throttleTimeTracker; + private final ThrottledTimeTracker throttledTimeTracker; SingleSourceWorkerHarness( WorkCommitter workCommitter, @@ -76,7 +77,7 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness { Runnable waitForResources, Function> computationStateFetcher, GetWorkSender getWorkSender, - ThrottledTimeTracker throttleTimeTracker) { + ThrottledTimeTracker throttledTimeTracker) { this.workCommitter = workCommitter; this.getDataClient = getDataClient; this.heartbeatSender = heartbeatSender; @@ -92,7 +93,7 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness { .build()); this.isRunning = new AtomicBoolean(false); this.getWorkSender = getWorkSender; - this.throttleTimeTracker = throttleTimeTracker; + this.throttledTimeTracker = throttledTimeTracker; } public static SingleSourceWorkerHarness.Builder builder() { @@ -149,7 +150,7 @@ public void shutdown() { @Override public long getAndResetThrottleTime() { - return throttleTimeTracker.getAndResetThrottleTime(); + return throttledTimeTracker.getAndResetThrottleTime(); } private void streamingEngineDispatchLoop( @@ -262,7 +263,7 @@ Builder setComputationStateFetcher( Builder setGetWorkSender(GetWorkSender getWorkSender); - Builder setThrottleTimeTracker(ThrottledTimeTracker throttleTimeTracker); + Builder setThrottledTimeTracker(ThrottledTimeTracker throttledTimeTracker); SingleSourceWorkerHarness build(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java index 779dd50d2ba9..731a5a4b1b51 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.dataflow.worker.streaming.harness; +import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottledTimeTracker; import org.apache.beam.sdk.annotations.Internal; /** Provides an interface to start streaming worker processing. */ diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java index 8a3719a0270b..3557f0d193c5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java @@ -52,6 +52,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.StageInfo; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor; +import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottledTimeTracker; import org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.FailureTracker; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; @@ -462,7 +463,7 @@ public interface Builder { Builder setDataflowServiceClient(WorkUnitClient dataflowServiceClient); - Builder setWindmillQuotaThrottleTime(ThrottledTimeTracker windmillQuotaThrottleTime); + Builder setWindmillQuotaThrottleTime(ThrottledTimeTracker windmillQuotaThrottledTimeTracker); Builder setAllStageInfo(Supplier> allStageInfo); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottleTimer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottleTimer.java index f660112721ba..fdcb0339d23d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottleTimer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottleTimer.java @@ -25,7 +25,7 @@ * CommitWork are both blocked for x, totalTime will be 2x. However, if 2 GetWork streams are both * blocked for x totalTime will be x. All methods are thread safe. */ -public final class ThrottleTimer { +public final class ThrottleTimer implements ThrottledTimeTracker { // This is -1 if not currently being throttled or the time in // milliseconds when throttling for this type started. private long startTime = -1; @@ -56,6 +56,7 @@ public synchronized boolean throttled() { } /** Returns the combined total of all throttle times and resets those times to 0. */ + @Override public synchronized long getAndResetThrottleTime() { if (throttled()) { stop(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/ThrottledTimeTracker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottledTimeTracker.java similarity index 93% rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/ThrottledTimeTracker.java rename to runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottledTimeTracker.java index 9abd88a5cd6d..9bb8fb0a7b5f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/ThrottledTimeTracker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottledTimeTracker.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.worker.streaming.harness; +package org.apache.beam.runners.dataflow.worker.windmill.client.throttling; import org.apache.beam.sdk.annotations.Internal; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java index 39bd98f6e8f4..4df3bf7cd823 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java @@ -61,7 +61,7 @@ private SingleSourceWorkerHarness createWorkerHarness( .setStreamingWorkScheduler(streamingWorkScheduler) .setComputationStateFetcher(computationStateFetcher) // no-op throttle time supplier. - .setThrottleTimeTracker(() -> 0L) + .setThrottledTimeTracker(() -> 0L) .setGetWorkSender(getWorkSender) .build(); }