From aa21e4a84b3594e9ddd6ddb436c9628cb6660552 Mon Sep 17 00:00:00 2001 From: martin trieu Date: Tue, 26 Nov 2024 02:51:57 -0600 Subject: [PATCH] Integrate direct path with StreamingDataflowWorker code path (#32778) --- .../DataflowStreamingPipelineOptions.java | 15 +- .../worker/StreamingDataflowWorker.java | 594 +++++++++++------- .../FanOutStreamingEngineWorkerHarness.java | 1 + .../harness/SingleSourceWorkerHarness.java | 13 +- .../harness/StreamingWorkerHarness.java | 3 +- .../harness/StreamingWorkerStatusPages.java | 2 +- .../StreamingWorkerStatusReporter.java | 131 ++-- .../client/throttling/ThrottleTimer.java | 3 +- .../throttling/ThrottledTimeTracker.java | 32 + .../refresh/StreamPoolHeartbeatSender.java | 4 +- .../worker/StreamingDataflowWorkerTest.java | 42 +- .../SingleSourceWorkerHarnessTest.java | 2 + .../StreamingWorkerStatusReporterTest.java | 53 +- .../StreamPoolHeartbeatSenderTest.java | 6 +- 14 files changed, 561 insertions(+), 340 deletions(-) create mode 100644 runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottledTimeTracker.java diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java index 6a0208f1447f..61c38dde2b42 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java @@ -20,6 +20,7 @@ import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.Hidden; import org.apache.beam.sdk.options.PipelineOptions; import org.joda.time.Duration; @@ -219,10 +220,8 @@ public interface DataflowStreamingPipelineOptions extends PipelineOptions { void setWindmillServiceStreamMaxBackoffMillis(int value); - @Description( - "If true, Dataflow streaming pipeline will be running in direct path mode." - + " VMs must have IPv6 enabled for this to work.") - @Default.Boolean(false) + @Description("Enables direct path mode for streaming engine.") + @Default.InstanceFactory(EnableWindmillServiceDirectPathFactory.class) boolean getIsWindmillServiceDirectPathEnabled(); void setIsWindmillServiceDirectPathEnabled(boolean isWindmillServiceDirectPathEnabled); @@ -300,4 +299,12 @@ public Integer create(PipelineOptions options) { return streamingOptions.isEnableStreamingEngine() ? Integer.MAX_VALUE : 1; } } + + /** EnableStreamingEngine defaults to false unless one of the experiment is set. */ + class EnableWindmillServiceDirectPathFactory implements DefaultValueFactory { + @Override + public Boolean create(PipelineOptions options) { + return ExperimentalOptions.hasExperiment(options, "enable_windmill_service_direct_path"); + } + } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 6ce60283735f..088a28e9b2db 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -17,11 +17,11 @@ */ package org.apache.beam.runners.dataflow.worker; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.remoteChannel; -import com.google.api.services.dataflow.model.CounterUpdate; import com.google.api.services.dataflow.model.MapTask; import com.google.auto.value.AutoValue; +import java.io.PrintWriter; import java.util.List; import java.util.Map; import java.util.Optional; @@ -33,24 +33,28 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import javax.annotation.Nullable; import org.apache.beam.runners.core.metrics.MetricsLogger; import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions; -import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor; import org.apache.beam.runners.dataflow.worker.status.DebugCapture; import org.apache.beam.runners.dataflow.worker.status.WorkerStatusPages; import org.apache.beam.runners.dataflow.worker.streaming.ComputationState; import org.apache.beam.runners.dataflow.worker.streaming.ComputationStateCache; import org.apache.beam.runners.dataflow.worker.streaming.StageInfo; +import org.apache.beam.runners.dataflow.worker.streaming.WeightedSemaphore; import org.apache.beam.runners.dataflow.worker.streaming.WorkHeartbeatResponseProcessor; import org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig; import org.apache.beam.runners.dataflow.worker.streaming.config.FixedGlobalConfigHandle; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingApplianceComputationConfigFetcher; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEngineComputationConfigFetcher; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandleImpl; +import org.apache.beam.runners.dataflow.worker.streaming.harness.FanOutStreamingEngineWorkerHarness; import org.apache.beam.runners.dataflow.worker.streaming.harness.SingleSourceWorkerHarness; import org.apache.beam.runners.dataflow.worker.streaming.harness.SingleSourceWorkerHarness.GetWorkSender; import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingCounters; @@ -59,12 +63,15 @@ import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerStatusReporter; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor; +import org.apache.beam.runners.dataflow.worker.windmill.ApplianceWindmillClient; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader; import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub; import org.apache.beam.runners.dataflow.worker.windmill.appliance.JniWindmillApplianceServer; +import org.apache.beam.runners.dataflow.worker.windmill.client.CloseableStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool; +import org.apache.beam.runners.dataflow.worker.windmill.client.commits.Commit; import org.apache.beam.runners.dataflow.worker.windmill.client.commits.Commits; import org.apache.beam.runners.dataflow.worker.windmill.client.commits.CompleteCommit; import org.apache.beam.runners.dataflow.worker.windmill.client.commits.StreamingApplianceWorkCommitter; @@ -78,8 +85,16 @@ import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcDispatcherClient; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillServer; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCache; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingRemoteStubFactory; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingStubFactory; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.IsolationChannel; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactory; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactoryImpl; +import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottledTimeTracker; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache; +import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget; +import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetDistributors; import org.apache.beam.runners.dataflow.worker.windmill.work.processing.StreamingWorkScheduler; import org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.FailureTracker; import org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.StreamingApplianceFailureTracker; @@ -89,6 +104,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.ApplianceHeartbeatSender; import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender; import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.StreamPoolHeartbeatSender; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.fn.IdGenerator; import org.apache.beam.sdk.fn.IdGenerators; import org.apache.beam.sdk.fn.JvmInitializers; @@ -98,18 +114,25 @@ import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.util.construction.CoderTranslation; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheStats; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.joda.time.Duration; import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Implements a Streaming Dataflow worker. */ +/** + * For internal use only. + * + *

Implements a Streaming Dataflow worker. + */ @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) +@Internal public final class StreamingDataflowWorker { /** @@ -142,6 +165,10 @@ public final class StreamingDataflowWorker { private static final int DEFAULT_STATUS_PORT = 8081; private static final Random CLIENT_ID_GENERATOR = new Random(); private static final String CHANNELZ_PATH = "/channelz"; + private static final String BEAM_FN_API_EXPERIMENT = "beam_fn_api"; + private static final String ENABLE_IPV6_EXPERIMENT = "enable_private_ipv6_google_access"; + private static final String STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL_EXPERIMENT = + "streaming_engine_use_job_settings_for_heartbeat_pool"; private final WindmillStateCache stateCache; private final StreamingWorkerStatusPages statusPages; @@ -155,9 +182,8 @@ public final class StreamingDataflowWorker { private final ReaderCache readerCache; private final DataflowExecutionStateSampler sampler = DataflowExecutionStateSampler.instance(); private final ActiveWorkRefresher activeWorkRefresher; - private final WorkCommitter workCommitter; private final StreamingWorkerStatusReporter workerStatusReporter; - private final StreamingCounters streamingCounters; + private final int numCommitThreads; private StreamingDataflowWorker( WindmillServerStub windmillServer, @@ -170,17 +196,17 @@ private StreamingDataflowWorker( DataflowWorkerHarnessOptions options, HotKeyLogger hotKeyLogger, Supplier clock, - StreamingWorkerStatusReporter workerStatusReporter, + StreamingWorkerStatusReporterFactory streamingWorkerStatusReporterFactory, FailureTracker failureTracker, WorkFailureProcessor workFailureProcessor, StreamingCounters streamingCounters, MemoryMonitor memoryMonitor, GrpcWindmillStreamFactory windmillStreamFactory, ScheduledExecutorService activeWorkRefreshExecutorFn, - ConcurrentMap stageInfoMap) { + ConcurrentMap stageInfoMap, + @Nullable GrpcDispatcherClient dispatcherClient) { // Register standard file systems. FileSystems.setDefaultPipelineOptions(options); - this.configFetcher = configFetcher; this.computationStateCache = computationStateCache; this.stateCache = windmillStateCache; @@ -189,35 +215,13 @@ private StreamingDataflowWorker( Duration.standardSeconds(options.getReaderCacheTimeoutSec()), Executors.newCachedThreadPool()); this.options = options; - - boolean windmillServiceEnabled = options.isEnableStreamingEngine(); - - int numCommitThreads = 1; - if (windmillServiceEnabled && options.getWindmillServiceCommitThreads() > 0) { - numCommitThreads = options.getWindmillServiceCommitThreads(); - } - - this.workCommitter = - windmillServiceEnabled - ? StreamingEngineWorkCommitter.builder() - .setCommitByteSemaphore(Commits.maxCommitByteSemaphore()) - .setCommitWorkStreamFactory( - WindmillStreamPool.create( - numCommitThreads, - COMMIT_STREAM_TIMEOUT, - windmillServer::commitWorkStream) - ::getCloseableStream) - .setNumCommitSenders(numCommitThreads) - .setOnCommitComplete(this::onCompleteCommit) - .build() - : StreamingApplianceWorkCommitter.create( - windmillServer::commitWork, this::onCompleteCommit); - this.workUnitExecutor = workUnitExecutor; - - this.workerStatusReporter = workerStatusReporter; - this.streamingCounters = streamingCounters; this.memoryMonitor = BackgroundMemoryMonitor.create(memoryMonitor); + this.numCommitThreads = + options.isEnableStreamingEngine() + ? Math.max(options.getWindmillServiceCommitThreads(), 1) + : 1; + StreamingWorkScheduler streamingWorkScheduler = StreamingWorkScheduler.create( options, @@ -234,107 +238,200 @@ private StreamingDataflowWorker( ID_GENERATOR, configFetcher.getGlobalConfigHandle(), stageInfoMap); - ThrottlingGetDataMetricTracker getDataMetricTracker = new ThrottlingGetDataMetricTracker(memoryMonitor); - WorkerStatusPages workerStatusPages = - WorkerStatusPages.create(DEFAULT_STATUS_PORT, memoryMonitor); - StreamingWorkerStatusPages.Builder statusPagesBuilder = StreamingWorkerStatusPages.builder(); - int stuckCommitDurationMillis; - GetDataClient getDataClient; - HeartbeatSender heartbeatSender; - if (windmillServiceEnabled) { - WindmillStreamPool getDataStreamPool = - WindmillStreamPool.create( - Math.max(1, options.getWindmillGetDataStreamCount()), - GET_DATA_STREAM_TIMEOUT, - windmillServer::getDataStream); - getDataClient = new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool); - if (options.getUseSeparateWindmillHeartbeatStreams() != null) { + // Status page members. Different implementations on whether the harness is streaming engine + // direct path, streaming engine cloud path, or streaming appliance. + @Nullable ChannelzServlet channelzServlet = null; + Consumer getDataStatusProvider; + Supplier currentActiveCommitBytesProvider; + if (isDirectPathPipeline(options)) { + WeightedSemaphore maxCommitByteSemaphore = Commits.maxCommitByteSemaphore(); + FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness = + FanOutStreamingEngineWorkerHarness.create( + createJobHeader(options, clientId), + GetWorkBudget.builder() + .setItems(chooseMaxBundlesOutstanding(options)) + .setBytes(MAX_GET_WORK_FETCH_BYTES) + .build(), + windmillStreamFactory, + (workItem, watermarks, processingContext, getWorkStreamLatencies) -> + computationStateCache + .get(processingContext.computationId()) + .ifPresent( + computationState -> { + memoryMonitor.waitForResources("GetWork"); + streamingWorkScheduler.scheduleWork( + computationState, + workItem, + watermarks, + processingContext, + getWorkStreamLatencies); + }), + createFanOutStubFactory(options), + GetWorkBudgetDistributors.distributeEvenly(), + Preconditions.checkNotNull(dispatcherClient), + commitWorkStream -> + StreamingEngineWorkCommitter.builder() + // Share the commitByteSemaphore across all created workCommitters. + .setCommitByteSemaphore(maxCommitByteSemaphore) + .setBackendWorkerToken(commitWorkStream.backendWorkerToken()) + .setOnCommitComplete(this::onCompleteCommit) + .setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1)) + .setCommitWorkStreamFactory( + () -> CloseableStream.create(commitWorkStream, () -> {})) + .build(), + getDataMetricTracker); + getDataStatusProvider = getDataMetricTracker::printHtml; + currentActiveCommitBytesProvider = + fanOutStreamingEngineWorkerHarness::currentActiveCommitBytes; + channelzServlet = + createChannelzServlet( + options, fanOutStreamingEngineWorkerHarness::currentWindmillEndpoints); + this.streamingWorkerHarness = fanOutStreamingEngineWorkerHarness; + } else { + // Non-direct path pipelines. + Windmill.GetWorkRequest request = + Windmill.GetWorkRequest.newBuilder() + .setClientId(clientId) + .setMaxItems(chooseMaxBundlesOutstanding(options)) + .setMaxBytes(MAX_GET_WORK_FETCH_BYTES) + .build(); + GetDataClient getDataClient; + HeartbeatSender heartbeatSender; + WorkCommitter workCommitter; + GetWorkSender getWorkSender; + if (options.isEnableStreamingEngine()) { + WindmillStreamPool getDataStreamPool = + WindmillStreamPool.create( + Math.max(1, options.getWindmillGetDataStreamCount()), + GET_DATA_STREAM_TIMEOUT, + windmillServer::getDataStream); + getDataClient = new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool); heartbeatSender = - StreamPoolHeartbeatSender.Create( - Boolean.TRUE.equals(options.getUseSeparateWindmillHeartbeatStreams()) - ? separateHeartbeatPool(windmillServer) - : getDataStreamPool); - + createStreamingEngineHeartbeatSender( + options, windmillServer, getDataStreamPool, configFetcher.getGlobalConfigHandle()); + channelzServlet = + createChannelzServlet(options, windmillServer::getWindmillServiceEndpoints); + workCommitter = + StreamingEngineWorkCommitter.builder() + .setCommitWorkStreamFactory( + WindmillStreamPool.create( + numCommitThreads, + COMMIT_STREAM_TIMEOUT, + windmillServer::commitWorkStream) + ::getCloseableStream) + .setCommitByteSemaphore(Commits.maxCommitByteSemaphore()) + .setNumCommitSenders(numCommitThreads) + .setOnCommitComplete(this::onCompleteCommit) + .build(); + getWorkSender = + GetWorkSender.forStreamingEngine( + receiver -> windmillServer.getWorkStream(request, receiver)); } else { - heartbeatSender = - StreamPoolHeartbeatSender.Create( - separateHeartbeatPool(windmillServer), - getDataStreamPool, - configFetcher.getGlobalConfigHandle()); + getDataClient = new ApplianceGetDataClient(windmillServer, getDataMetricTracker); + heartbeatSender = new ApplianceHeartbeatSender(windmillServer::getData); + workCommitter = + StreamingApplianceWorkCommitter.create( + windmillServer::commitWork, this::onCompleteCommit); + getWorkSender = GetWorkSender.forAppliance(() -> windmillServer.getWork(request)); } - stuckCommitDurationMillis = - options.getStuckCommitDurationMillis() > 0 ? options.getStuckCommitDurationMillis() : 0; - statusPagesBuilder - .setDebugCapture( - new DebugCapture.Manager(options, workerStatusPages.getDebugCapturePages())) - .setChannelzServlet( - new ChannelzServlet( - CHANNELZ_PATH, options, windmillServer::getWindmillServiceEndpoints)) - .setWindmillStreamFactory(windmillStreamFactory); - } else { - getDataClient = new ApplianceGetDataClient(windmillServer, getDataMetricTracker); - heartbeatSender = new ApplianceHeartbeatSender(windmillServer::getData); - stuckCommitDurationMillis = 0; + getDataStatusProvider = getDataClient::printHtml; + currentActiveCommitBytesProvider = workCommitter::currentActiveCommitBytes; + + this.streamingWorkerHarness = + SingleSourceWorkerHarness.builder() + .setStreamingWorkScheduler(streamingWorkScheduler) + .setWorkCommitter(workCommitter) + .setGetDataClient(getDataClient) + .setComputationStateFetcher(this.computationStateCache::get) + .setWaitForResources(() -> memoryMonitor.waitForResources("GetWork")) + .setHeartbeatSender(heartbeatSender) + .setThrottledTimeTracker(windmillServer::getAndResetThrottleTime) + .setGetWorkSender(getWorkSender) + .build(); } + this.workerStatusReporter = + streamingWorkerStatusReporterFactory.createStatusReporter(streamingWorkerHarness); this.activeWorkRefresher = new ActiveWorkRefresher( clock, options.getActiveWorkRefreshPeriodMillis(), - stuckCommitDurationMillis, + options.isEnableStreamingEngine() + ? Math.max(options.getStuckCommitDurationMillis(), 0) + : 0, computationStateCache::getAllPresentComputations, sampler, activeWorkRefreshExecutorFn, getDataMetricTracker::trackHeartbeats); this.statusPages = - statusPagesBuilder + createStatusPageBuilder(options, windmillStreamFactory, memoryMonitor) .setClock(clock) .setClientId(clientId) .setIsRunning(running) - .setStatusPages(workerStatusPages) .setStateCache(stateCache) .setComputationStateCache(this.computationStateCache) - .setCurrentActiveCommitBytes(workCommitter::currentActiveCommitBytes) - .setGetDataStatusProvider(getDataClient::printHtml) .setWorkUnitExecutor(workUnitExecutor) .setGlobalConfigHandle(configFetcher.getGlobalConfigHandle()) + .setChannelzServlet(channelzServlet) + .setGetDataStatusProvider(getDataStatusProvider) + .setCurrentActiveCommitBytes(currentActiveCommitBytesProvider) .build(); - Windmill.GetWorkRequest request = - Windmill.GetWorkRequest.newBuilder() - .setClientId(clientId) - .setMaxItems(chooseMaximumBundlesOutstanding()) - .setMaxBytes(MAX_GET_WORK_FETCH_BYTES) - .build(); - - this.streamingWorkerHarness = - SingleSourceWorkerHarness.builder() - .setStreamingWorkScheduler(streamingWorkScheduler) - .setWorkCommitter(workCommitter) - .setGetDataClient(getDataClient) - .setComputationStateFetcher(this.computationStateCache::get) - .setWaitForResources(() -> memoryMonitor.waitForResources("GetWork")) - .setHeartbeatSender(heartbeatSender) - .setGetWorkSender( - windmillServiceEnabled - ? GetWorkSender.forStreamingEngine( - receiver -> windmillServer.getWorkStream(request, receiver)) - : GetWorkSender.forAppliance(() -> windmillServer.getWork(request))) - .build(); - - LOG.debug("windmillServiceEnabled: {}", windmillServiceEnabled); + LOG.debug("isDirectPathEnabled: {}", options.getIsWindmillServiceDirectPathEnabled()); + LOG.debug("windmillServiceEnabled: {}", options.isEnableStreamingEngine()); LOG.debug("WindmillServiceEndpoint: {}", options.getWindmillServiceEndpoint()); LOG.debug("WindmillServicePort: {}", options.getWindmillServicePort()); LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport()); } - private static WindmillStreamPool separateHeartbeatPool( - WindmillServerStub windmillServer) { - return WindmillStreamPool.create(1, GET_DATA_STREAM_TIMEOUT, windmillServer::getDataStream); + private static StreamingWorkerStatusPages.Builder createStatusPageBuilder( + DataflowWorkerHarnessOptions options, + GrpcWindmillStreamFactory windmillStreamFactory, + MemoryMonitor memoryMonitor) { + WorkerStatusPages workerStatusPages = + WorkerStatusPages.create(DEFAULT_STATUS_PORT, memoryMonitor); + + StreamingWorkerStatusPages.Builder streamingStatusPages = + StreamingWorkerStatusPages.builder().setStatusPages(workerStatusPages); + + return options.isEnableStreamingEngine() + ? streamingStatusPages + .setDebugCapture( + new DebugCapture.Manager(options, workerStatusPages.getDebugCapturePages())) + .setWindmillStreamFactory(windmillStreamFactory) + : streamingStatusPages; + } + + private static ChannelzServlet createChannelzServlet( + DataflowWorkerHarnessOptions options, + Supplier> windmillEndpointProvider) { + return new ChannelzServlet(CHANNELZ_PATH, options, windmillEndpointProvider); + } + + private static HeartbeatSender createStreamingEngineHeartbeatSender( + DataflowWorkerHarnessOptions options, + WindmillServerStub windmillClient, + WindmillStreamPool getDataStreamPool, + StreamingGlobalConfigHandle globalConfigHandle) { + // Experiment gates the logic till backend changes are rollback safe + if (!DataflowRunner.hasExperiment( + options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL_EXPERIMENT) + || options.getUseSeparateWindmillHeartbeatStreams() != null) { + return StreamPoolHeartbeatSender.create( + Boolean.TRUE.equals(options.getUseSeparateWindmillHeartbeatStreams()) + ? WindmillStreamPool.create(1, GET_DATA_STREAM_TIMEOUT, windmillClient::getDataStream) + : getDataStreamPool); + + } else { + return StreamPoolHeartbeatSender.create( + WindmillStreamPool.create(1, GET_DATA_STREAM_TIMEOUT, windmillClient::getDataStream), + getDataStreamPool, + globalConfigHandle); + } } public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions options) { @@ -387,17 +484,21 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o failureTracker, () -> Optional.ofNullable(memoryMonitor.tryToDumpHeap()), clock); - StreamingWorkerStatusReporter workerStatusReporter = - StreamingWorkerStatusReporter.create( - dataflowServiceClient, - windmillServer::getAndResetThrottleTime, - stageInfo::values, - failureTracker, - streamingCounters, - memoryMonitor, - workExecutor, - options.getWindmillHarnessUpdateReportingPeriod().getMillis(), - options.getPerWorkerMetricsUpdateReportingPeriodMillis()); + StreamingWorkerStatusReporterFactory workerStatusReporterFactory = + throttleTimeSupplier -> + StreamingWorkerStatusReporter.builder() + .setDataflowServiceClient(dataflowServiceClient) + .setWindmillQuotaThrottleTime(throttleTimeSupplier) + .setAllStageInfo(stageInfo::values) + .setFailureTracker(failureTracker) + .setStreamingCounters(streamingCounters) + .setMemoryMonitor(memoryMonitor) + .setWorkExecutor(workExecutor) + .setWindmillHarnessUpdateReportingPeriodMillis( + options.getWindmillHarnessUpdateReportingPeriod().getMillis()) + .setPerWorkerMetricsUpdateReportingPeriodMillis( + options.getPerWorkerMetricsUpdateReportingPeriodMillis()) + .build(); return new StreamingDataflowWorker( windmillServer, @@ -410,7 +511,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o options, new HotKeyLogger(), clock, - workerStatusReporter, + workerStatusReporterFactory, failureTracker, workFailureProcessor, streamingCounters, @@ -418,7 +519,8 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o configFetcherComputationStateCacheAndWindmillClient.windmillStreamFactory(), Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("RefreshWork").build()), - stageInfo); + stageInfo, + configFetcherComputationStateCacheAndWindmillClient.windmillDispatcherClient()); } /** @@ -433,53 +535,121 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o WorkUnitClient dataflowServiceClient, GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder, Function computationStateCacheFactory) { - ComputationConfig.Fetcher configFetcher; - WindmillServerStub windmillServer; - ComputationStateCache computationStateCache; - GrpcWindmillStreamFactory windmillStreamFactory; if (options.isEnableStreamingEngine()) { GrpcDispatcherClient dispatcherClient = GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options)); - configFetcher = + ComputationConfig.Fetcher configFetcher = StreamingEngineComputationConfigFetcher.create( options.getGlobalConfigRefreshPeriod().getMillis(), dataflowServiceClient); configFetcher.getGlobalConfigHandle().registerConfigObserver(dispatcherClient::onJobConfig); - computationStateCache = computationStateCacheFactory.apply(configFetcher); - windmillStreamFactory = + ComputationStateCache computationStateCache = + computationStateCacheFactory.apply(configFetcher); + GrpcWindmillStreamFactory windmillStreamFactory = windmillStreamFactoryBuilder .setProcessHeartbeatResponses( new WorkHeartbeatResponseProcessor(computationStateCache::get)) .setHealthCheckIntervalMillis( options.getWindmillServiceStreamingRpcHealthCheckPeriodMs()) .build(); - windmillServer = GrpcWindmillServer.create(options, windmillStreamFactory, dispatcherClient); - } else { - if (options.getWindmillServiceEndpoint() != null - || options.getLocalWindmillHostport().startsWith("grpc:")) { - windmillStreamFactory = - windmillStreamFactoryBuilder - .setHealthCheckIntervalMillis( - options.getWindmillServiceStreamingRpcHealthCheckPeriodMs()) - .build(); - windmillServer = - GrpcWindmillServer.create( - options, - windmillStreamFactory, - GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options))); - } else { - windmillStreamFactory = windmillStreamFactoryBuilder.build(); - windmillServer = new JniWindmillApplianceServer(options.getLocalWindmillHostport()); + return ConfigFetcherComputationStateCacheAndWindmillClient.builder() + .setWindmillDispatcherClient(dispatcherClient) + .setConfigFetcher(configFetcher) + .setComputationStateCache(computationStateCache) + .setWindmillStreamFactory(windmillStreamFactory) + .setWindmillServer( + GrpcWindmillServer.create(options, windmillStreamFactory, dispatcherClient)) + .build(); + } + + // Build with local Windmill client. + if (options.getWindmillServiceEndpoint() != null + || options.getLocalWindmillHostport().startsWith("grpc:")) { + GrpcDispatcherClient dispatcherClient = + GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options)); + GrpcWindmillStreamFactory windmillStreamFactory = + windmillStreamFactoryBuilder + .setHealthCheckIntervalMillis( + options.getWindmillServiceStreamingRpcHealthCheckPeriodMs()) + .build(); + GrpcWindmillServer windmillServer = + GrpcWindmillServer.create(options, windmillStreamFactory, dispatcherClient); + ComputationConfig.Fetcher configFetcher = + createApplianceComputationConfigFetcher(windmillServer); + return ConfigFetcherComputationStateCacheAndWindmillClient.builder() + .setWindmillDispatcherClient(dispatcherClient) + .setWindmillServer(windmillServer) + .setWindmillStreamFactory(windmillStreamFactory) + .setConfigFetcher(configFetcher) + .setComputationStateCache(computationStateCacheFactory.apply(configFetcher)) + .build(); + } + + WindmillServerStub windmillServer = + new JniWindmillApplianceServer(options.getLocalWindmillHostport()); + ComputationConfig.Fetcher configFetcher = + createApplianceComputationConfigFetcher(windmillServer); + return ConfigFetcherComputationStateCacheAndWindmillClient.builder() + .setWindmillStreamFactory(windmillStreamFactoryBuilder.build()) + .setWindmillServer(windmillServer) + .setConfigFetcher(configFetcher) + .setComputationStateCache(computationStateCacheFactory.apply(configFetcher)) + .build(); + } + + private static StreamingApplianceComputationConfigFetcher createApplianceComputationConfigFetcher( + ApplianceWindmillClient windmillClient) { + return new StreamingApplianceComputationConfigFetcher( + windmillClient::getConfig, + new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build())); + } + + private static boolean isDirectPathPipeline(DataflowWorkerHarnessOptions options) { + if (options.isEnableStreamingEngine() && options.getIsWindmillServiceDirectPathEnabled()) { + boolean isIpV6Enabled = + Optional.ofNullable(options.getDataflowServiceOptions()) + .map(serviceOptions -> serviceOptions.contains(ENABLE_IPV6_EXPERIMENT)) + .orElse(false); + + if (isIpV6Enabled) { + return true; } - configFetcher = - new StreamingApplianceComputationConfigFetcher( - windmillServer::getConfig, - new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build())); - computationStateCache = computationStateCacheFactory.apply(configFetcher); + LOG.warn( + "DirectPath is currently only supported with IPv6 networking stack. This requires setting " + + "\"enable_private_ipv6_google_access\" in experimental pipeline options. " + + "For information on how to set experimental pipeline options see " + + "https://cloud.google.com/dataflow/docs/guides/setting-pipeline-options#experimental. " + + "Defaulting to CloudPath."); } - return ConfigFetcherComputationStateCacheAndWindmillClient.create( - configFetcher, computationStateCache, windmillServer, windmillStreamFactory); + return false; + } + + private static void validateWorkerOptions(DataflowWorkerHarnessOptions options) { + Preconditions.checkArgument( + options.isStreaming(), + "%s instantiated with options indicating batch use", + StreamingDataflowWorker.class.getName()); + + Preconditions.checkArgument( + !DataflowRunner.hasExperiment(options, BEAM_FN_API_EXPERIMENT), + "%s cannot be main() class with beam_fn_api enabled", + StreamingDataflowWorker.class.getSimpleName()); + } + + private static ChannelCachingStubFactory createFanOutStubFactory( + DataflowWorkerHarnessOptions workerOptions) { + return ChannelCachingRemoteStubFactory.create( + workerOptions.getGcpCredential(), + ChannelCache.create( + serviceAddress -> + // IsolationChannel will create and manage separate RPC channels to the same + // serviceAddress. + IsolationChannel.create( + () -> + remoteChannel( + serviceAddress, + workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec())))); } @VisibleForTesting @@ -495,7 +665,9 @@ static StreamingDataflowWorker forTesting( Supplier clock, Function executorSupplier, StreamingGlobalConfigHandleImpl globalConfigHandle, - int localRetryTimeoutMs) { + int localRetryTimeoutMs, + StreamingCounters streamingCounters, + WindmillStubFactoryFactory stubFactory) { ConcurrentMap stageInfo = new ConcurrentHashMap<>(); BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options); WindmillStateCache stateCache = @@ -538,7 +710,6 @@ static StreamingDataflowWorker forTesting( stateNameMap, stateCache.forComputation(mapTask.getStageName()))); MemoryMonitor memoryMonitor = MemoryMonitor.fromOptions(options); - StreamingCounters streamingCounters = StreamingCounters.create(); FailureTracker failureTracker = options.isEnableStreamingEngine() ? StreamingEngineFailureTracker.create( @@ -554,19 +725,23 @@ static StreamingDataflowWorker forTesting( () -> Optional.ofNullable(memoryMonitor.tryToDumpHeap()), clock, localRetryTimeoutMs); - StreamingWorkerStatusReporter workerStatusReporter = - StreamingWorkerStatusReporter.forTesting( - publishCounters, - workUnitClient, - windmillServer::getAndResetThrottleTime, - stageInfo::values, - failureTracker, - streamingCounters, - memoryMonitor, - workExecutor, - executorSupplier, - options.getWindmillHarnessUpdateReportingPeriod().getMillis(), - options.getPerWorkerMetricsUpdateReportingPeriodMillis()); + StreamingWorkerStatusReporterFactory workerStatusReporterFactory = + throttleTimeSupplier -> + StreamingWorkerStatusReporter.builder() + .setPublishCounters(publishCounters) + .setDataflowServiceClient(workUnitClient) + .setWindmillQuotaThrottleTime(throttleTimeSupplier) + .setAllStageInfo(stageInfo::values) + .setFailureTracker(failureTracker) + .setStreamingCounters(streamingCounters) + .setMemoryMonitor(memoryMonitor) + .setWorkExecutor(workExecutor) + .setExecutorFactory(executorSupplier) + .setWindmillHarnessUpdateReportingPeriodMillis( + options.getWindmillHarnessUpdateReportingPeriod().getMillis()) + .setPerWorkerMetricsUpdateReportingPeriodMillis( + options.getPerWorkerMetricsUpdateReportingPeriodMillis()) + .build(); GrpcWindmillStreamFactory.Builder windmillStreamFactory = createGrpcwindmillStreamFactoryBuilder(options, 1) @@ -584,7 +759,7 @@ static StreamingDataflowWorker forTesting( options, hotKeyLogger, clock, - workerStatusReporter, + workerStatusReporterFactory, failureTracker, workFailureProcessor, streamingCounters, @@ -596,7 +771,8 @@ static StreamingDataflowWorker forTesting( .build() : windmillStreamFactory.build(), executorSupplier.apply("RefreshWork"), - stageInfo); + stageInfo, + GrpcDispatcherClient.create(options, stubFactory)); } private static GrpcWindmillStreamFactory.Builder createGrpcwindmillStreamFactoryBuilder( @@ -605,13 +781,7 @@ private static GrpcWindmillStreamFactory.Builder createGrpcwindmillStreamFactory !options.isEnableStreamingEngine() && options.getLocalWindmillHostport() != null ? GrpcWindmillServer.LOCALHOST_MAX_BACKOFF : Duration.millis(options.getWindmillServiceStreamMaxBackoffMillis()); - return GrpcWindmillStreamFactory.of( - JobHeader.newBuilder() - .setJobId(options.getJobId()) - .setProjectId(options.getProject()) - .setWorkerId(options.getWorkerId()) - .setClientId(clientId) - .build()) + return GrpcWindmillStreamFactory.of(createJobHeader(options, clientId)) .setWindmillMessagesBetweenIsReadyChecks(options.getWindmillMessagesBetweenIsReadyChecks()) .setMaxBackOffSupplier(() -> maxBackoff) .setLogEveryNStreamFailures(options.getWindmillServiceStreamingLogEveryNStreamFailures()) @@ -622,6 +792,15 @@ private static GrpcWindmillStreamFactory.Builder createGrpcwindmillStreamFactory options, "streaming_engine_disable_new_heartbeat_requests")); } + private static JobHeader createJobHeader(DataflowWorkerHarnessOptions options, long clientId) { + return JobHeader.newBuilder() + .setJobId(options.getJobId()) + .setProjectId(options.getProject()) + .setWorkerId(options.getWorkerId()) + .setClientId(clientId) + .build(); + } + private static BoundedQueueExecutor createWorkUnitExecutor(DataflowWorkerHarnessOptions options) { return new BoundedQueueExecutor( chooseMaxThreads(options), @@ -640,15 +819,7 @@ public static void main(String[] args) throws Exception { DataflowWorkerHarnessHelper.initializeGlobalStateAndPipelineOptions( StreamingDataflowWorker.class, DataflowWorkerHarnessOptions.class); DataflowWorkerHarnessHelper.configureLogging(options); - checkArgument( - options.isStreaming(), - "%s instantiated with options indicating batch use", - StreamingDataflowWorker.class.getName()); - - checkArgument( - !DataflowRunner.hasExperiment(options, "beam_fn_api"), - "%s cannot be main() class with beam_fn_api enabled", - StreamingDataflowWorker.class.getSimpleName()); + validateWorkerOptions(options); CoderTranslation.verifyModelCodersRegistered(); @@ -705,21 +876,6 @@ void reportPeriodicWorkerUpdatesForTest() { workerStatusReporter.reportPeriodicWorkerUpdates(); } - private int chooseMaximumNumberOfThreads() { - if (options.getNumberOfWorkerHarnessThreads() != 0) { - return options.getNumberOfWorkerHarnessThreads(); - } - return MAX_PROCESSING_THREADS; - } - - private int chooseMaximumBundlesOutstanding() { - int maxBundles = options.getMaxBundlesFromWindmillOutstanding(); - if (maxBundles > 0) { - return maxBundles; - } - return chooseMaximumNumberOfThreads() + 100; - } - @VisibleForTesting public boolean workExecutorIsEmpty() { return workUnitExecutor.executorQueueIsEmpty(); @@ -727,7 +883,7 @@ public boolean workExecutorIsEmpty() { @VisibleForTesting int numCommitThreads() { - return workCommitter.parallelism(); + return numCommitThreads; } @VisibleForTesting @@ -740,7 +896,6 @@ ComputationStateCache getComputationStateCache() { return computationStateCache; } - @SuppressWarnings("FutureReturnValueIgnored") public void start() { running.set(true); configFetcher.start(); @@ -791,27 +946,17 @@ private void onCompleteCommit(CompleteCommit completeCommit) { completeCommit.shardedKey(), completeCommit.workId())); } - @VisibleForTesting - public Iterable buildCounters() { - return Iterables.concat( - streamingCounters - .pendingDeltaCounters() - .extractModifiedDeltaUpdates(DataflowCounterUpdateExtractor.INSTANCE), - streamingCounters - .pendingCumulativeCounters() - .extractUpdates(false, DataflowCounterUpdateExtractor.INSTANCE)); + @FunctionalInterface + private interface StreamingWorkerStatusReporterFactory { + StreamingWorkerStatusReporter createStatusReporter(ThrottledTimeTracker throttledTimeTracker); } @AutoValue abstract static class ConfigFetcherComputationStateCacheAndWindmillClient { - private static ConfigFetcherComputationStateCacheAndWindmillClient create( - ComputationConfig.Fetcher configFetcher, - ComputationStateCache computationStateCache, - WindmillServerStub windmillServer, - GrpcWindmillStreamFactory windmillStreamFactory) { - return new AutoValue_StreamingDataflowWorker_ConfigFetcherComputationStateCacheAndWindmillClient( - configFetcher, computationStateCache, windmillServer, windmillStreamFactory); + private static Builder builder() { + return new AutoValue_StreamingDataflowWorker_ConfigFetcherComputationStateCacheAndWindmillClient + .Builder(); } abstract ComputationConfig.Fetcher configFetcher(); @@ -821,6 +966,23 @@ private static ConfigFetcherComputationStateCacheAndWindmillClient create( abstract WindmillServerStub windmillServer(); abstract GrpcWindmillStreamFactory windmillStreamFactory(); + + abstract @Nullable GrpcDispatcherClient windmillDispatcherClient(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setConfigFetcher(ComputationConfig.Fetcher value); + + abstract Builder setComputationStateCache(ComputationStateCache value); + + abstract Builder setWindmillServer(WindmillServerStub value); + + abstract Builder setWindmillStreamFactory(GrpcWindmillStreamFactory value); + + abstract Builder setWindmillDispatcherClient(GrpcDispatcherClient value); + + abstract ConfigFetcherComputationStateCacheAndWindmillClient build(); + } } /** diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java index 1ef1691b0817..4c73e8c7f61c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java @@ -371,6 +371,7 @@ private void closeStreamSender(Endpoint endpoint, StreamSender sender) { } /** Add up all the throttle times of all streams including GetWorkerMetadataStream. */ + @Override public long getAndResetThrottleTime() { return backends.get().windmillStreams().values().stream() .map(WindmillStreamSender::getAndResetThrottleTime) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java index 9716b834cac4..65203288e169 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java @@ -37,6 +37,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream; import org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter; import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.GetDataClient; +import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottledTimeTracker; import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver; import org.apache.beam.runners.dataflow.worker.windmill.work.processing.StreamingWorkScheduler; import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender; @@ -66,6 +67,7 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness { private final Function> computationStateFetcher; private final ExecutorService workProviderExecutor; private final GetWorkSender getWorkSender; + private final ThrottledTimeTracker throttledTimeTracker; SingleSourceWorkerHarness( WorkCommitter workCommitter, @@ -74,7 +76,8 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness { StreamingWorkScheduler streamingWorkScheduler, Runnable waitForResources, Function> computationStateFetcher, - GetWorkSender getWorkSender) { + GetWorkSender getWorkSender, + ThrottledTimeTracker throttledTimeTracker) { this.workCommitter = workCommitter; this.getDataClient = getDataClient; this.heartbeatSender = heartbeatSender; @@ -90,6 +93,7 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness { .build()); this.isRunning = new AtomicBoolean(false); this.getWorkSender = getWorkSender; + this.throttledTimeTracker = throttledTimeTracker; } public static SingleSourceWorkerHarness.Builder builder() { @@ -144,6 +148,11 @@ public void shutdown() { workCommitter.stop(); } + @Override + public long getAndResetThrottleTime() { + return throttledTimeTracker.getAndResetThrottleTime(); + } + private void streamingEngineDispatchLoop( Function getWorkStreamFactory) { while (isRunning.get()) { @@ -254,6 +263,8 @@ Builder setComputationStateFetcher( Builder setGetWorkSender(GetWorkSender getWorkSender); + Builder setThrottledTimeTracker(ThrottledTimeTracker throttledTimeTracker); + SingleSourceWorkerHarness build(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java index c1b4570e2260..731a5a4b1b51 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java @@ -17,11 +17,12 @@ */ package org.apache.beam.runners.dataflow.worker.streaming.harness; +import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottledTimeTracker; import org.apache.beam.sdk.annotations.Internal; /** Provides an interface to start streaming worker processing. */ @Internal -public interface StreamingWorkerHarness { +public interface StreamingWorkerHarness extends ThrottledTimeTracker { void start(); void shutdown(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java index 6981312eff1d..ddfc6809231a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java @@ -258,7 +258,7 @@ public interface Builder { Builder setDebugCapture(DebugCapture.Manager debugCapture); - Builder setChannelzServlet(ChannelzServlet channelzServlet); + Builder setChannelzServlet(@Nullable ChannelzServlet channelzServlet); Builder setStateCache(WindmillStateCache stateCache); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java index ba77d8e1ce26..3557f0d193c5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java @@ -27,6 +27,7 @@ import com.google.api.services.dataflow.model.WorkItemStatus; import com.google.api.services.dataflow.model.WorkerMessage; import com.google.api.services.dataflow.model.WorkerMessageResponse; +import com.google.auto.value.AutoBuilder; import java.io.IOException; import java.math.RoundingMode; import java.util.ArrayList; @@ -51,6 +52,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.StageInfo; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor; +import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottledTimeTracker; import org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.FailureTracker; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; @@ -78,7 +80,7 @@ public final class StreamingWorkerStatusReporter { private final int initialMaxThreadCount; private final int initialMaxBundlesOutstanding; private final WorkUnitClient dataflowServiceClient; - private final Supplier windmillQuotaThrottleTime; + private final ThrottledTimeTracker windmillQuotaThrottleTime; private final Supplier> allStageInfo; private final FailureTracker failureTracker; private final StreamingCounters streamingCounters; @@ -97,10 +99,10 @@ public final class StreamingWorkerStatusReporter { // Used to track the number of WorkerMessages that have been sent without PerWorkerMetrics. private final AtomicLong workerMessagesIndex; - private StreamingWorkerStatusReporter( + StreamingWorkerStatusReporter( boolean publishCounters, WorkUnitClient dataflowServiceClient, - Supplier windmillQuotaThrottleTime, + ThrottledTimeTracker windmillQuotaThrottleTime, Supplier> allStageInfo, FailureTracker failureTracker, StreamingCounters streamingCounters, @@ -131,57 +133,13 @@ private StreamingWorkerStatusReporter( this.workerMessagesIndex = new AtomicLong(); } - public static StreamingWorkerStatusReporter create( - WorkUnitClient workUnitClient, - Supplier windmillQuotaThrottleTime, - Supplier> allStageInfo, - FailureTracker failureTracker, - StreamingCounters streamingCounters, - MemoryMonitor memoryMonitor, - BoundedQueueExecutor workExecutor, - long windmillHarnessUpdateReportingPeriodMillis, - long perWorkerMetricsUpdateReportingPeriodMillis) { - return new StreamingWorkerStatusReporter( - /* publishCounters= */ true, - workUnitClient, - windmillQuotaThrottleTime, - allStageInfo, - failureTracker, - streamingCounters, - memoryMonitor, - workExecutor, - threadName -> - Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat(threadName).build()), - windmillHarnessUpdateReportingPeriodMillis, - perWorkerMetricsUpdateReportingPeriodMillis); - } - - @VisibleForTesting - public static StreamingWorkerStatusReporter forTesting( - boolean publishCounters, - WorkUnitClient workUnitClient, - Supplier windmillQuotaThrottleTime, - Supplier> allStageInfo, - FailureTracker failureTracker, - StreamingCounters streamingCounters, - MemoryMonitor memoryMonitor, - BoundedQueueExecutor workExecutor, - Function executorFactory, - long windmillHarnessUpdateReportingPeriodMillis, - long perWorkerMetricsUpdateReportingPeriodMillis) { - return new StreamingWorkerStatusReporter( - publishCounters, - workUnitClient, - windmillQuotaThrottleTime, - allStageInfo, - failureTracker, - streamingCounters, - memoryMonitor, - workExecutor, - executorFactory, - windmillHarnessUpdateReportingPeriodMillis, - perWorkerMetricsUpdateReportingPeriodMillis); + public static Builder builder() { + return new AutoBuilder_StreamingWorkerStatusReporter_Builder() + .setPublishCounters(true) + .setExecutorFactory( + threadName -> + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat(threadName).build())); } /** @@ -228,6 +186,22 @@ private static void shutdownExecutor(ScheduledExecutorService executor) { } } + // Calculates the PerWorkerMetrics reporting frequency, ensuring alignment with the + // WorkerMessages RPC schedule. The desired reporting period + // (perWorkerMetricsUpdateReportingPeriodMillis) is adjusted to the nearest multiple + // of the RPC interval (windmillHarnessUpdateReportingPeriodMillis). + private static long getPerWorkerMetricsUpdateFrequency( + long windmillHarnessUpdateReportingPeriodMillis, + long perWorkerMetricsUpdateReportingPeriodMillis) { + if (windmillHarnessUpdateReportingPeriodMillis == 0) { + return 0; + } + return LongMath.divide( + perWorkerMetricsUpdateReportingPeriodMillis, + windmillHarnessUpdateReportingPeriodMillis, + RoundingMode.CEILING); + } + @SuppressWarnings("FutureReturnValueIgnored") public void start() { reportHarnessStartup(); @@ -276,27 +250,13 @@ private void reportHarnessStartup() { } } - // Calculates the PerWorkerMetrics reporting frequency, ensuring alignment with the - // WorkerMessages RPC schedule. The desired reporting period - // (perWorkerMetricsUpdateReportingPeriodMillis) is adjusted to the nearest multiple - // of the RPC interval (windmillHarnessUpdateReportingPeriodMillis). - private static long getPerWorkerMetricsUpdateFrequency( - long windmillHarnessUpdateReportingPeriodMillis, - long perWorkerMetricsUpdateReportingPeriodMillis) { - if (windmillHarnessUpdateReportingPeriodMillis == 0) { - return 0; - } - return LongMath.divide( - perWorkerMetricsUpdateReportingPeriodMillis, - windmillHarnessUpdateReportingPeriodMillis, - RoundingMode.CEILING); - } - /** Sends counter updates to Dataflow backend. */ private void sendWorkerUpdatesToDataflowService( CounterSet deltaCounters, CounterSet cumulativeCounters) throws IOException { // Throttle time is tracked by the windmillServer but is reported to DFE here. - streamingCounters.windmillQuotaThrottling().addValue(windmillQuotaThrottleTime.get()); + streamingCounters + .windmillQuotaThrottling() + .addValue(windmillQuotaThrottleTime.getAndResetThrottleTime()); if (memoryMonitor.isThrashing()) { streamingCounters.memoryThrashing().addValue(1); } @@ -496,4 +456,33 @@ private void updateThreadMetrics() { .maxOutstandingBundles() .addValue((long) workExecutor.maximumElementsOutstanding()); } + + @AutoBuilder + public interface Builder { + Builder setPublishCounters(boolean publishCounters); + + Builder setDataflowServiceClient(WorkUnitClient dataflowServiceClient); + + Builder setWindmillQuotaThrottleTime(ThrottledTimeTracker windmillQuotaThrottledTimeTracker); + + Builder setAllStageInfo(Supplier> allStageInfo); + + Builder setFailureTracker(FailureTracker failureTracker); + + Builder setStreamingCounters(StreamingCounters streamingCounters); + + Builder setMemoryMonitor(MemoryMonitor memoryMonitor); + + Builder setWorkExecutor(BoundedQueueExecutor workExecutor); + + Builder setExecutorFactory(Function executorFactory); + + Builder setWindmillHarnessUpdateReportingPeriodMillis( + long windmillHarnessUpdateReportingPeriodMillis); + + Builder setPerWorkerMetricsUpdateReportingPeriodMillis( + long perWorkerMetricsUpdateReportingPeriodMillis); + + StreamingWorkerStatusReporter build(); + } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottleTimer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottleTimer.java index f660112721ba..fdcb0339d23d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottleTimer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottleTimer.java @@ -25,7 +25,7 @@ * CommitWork are both blocked for x, totalTime will be 2x. However, if 2 GetWork streams are both * blocked for x totalTime will be x. All methods are thread safe. */ -public final class ThrottleTimer { +public final class ThrottleTimer implements ThrottledTimeTracker { // This is -1 if not currently being throttled or the time in // milliseconds when throttling for this type started. private long startTime = -1; @@ -56,6 +56,7 @@ public synchronized boolean throttled() { } /** Returns the combined total of all throttle times and resets those times to 0. */ + @Override public synchronized long getAndResetThrottleTime() { if (throttled()) { stop(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottledTimeTracker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottledTimeTracker.java new file mode 100644 index 000000000000..9bb8fb0a7b5f --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottledTimeTracker.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.throttling; + +import org.apache.beam.sdk.annotations.Internal; + +/** + * Tracks time spent in a throttled state due to {@code Status.RESOURCE_EXHAUSTED} errors returned + * from gRPC calls. + */ +@Internal +@FunctionalInterface +public interface ThrottledTimeTracker { + + /** Returns the combined total of all throttle times and resets those times to 0. */ + long getAndResetThrottleTime(); +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java index fa36b11ffe55..f54091dc2b95 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java @@ -42,7 +42,7 @@ private StreamPoolHeartbeatSender( this.heartbeatStreamPool.set(heartbeatStreamPool); } - public static StreamPoolHeartbeatSender Create( + public static StreamPoolHeartbeatSender create( @Nonnull WindmillStreamPool heartbeatStreamPool) { return new StreamPoolHeartbeatSender(heartbeatStreamPool); } @@ -55,7 +55,7 @@ public static StreamPoolHeartbeatSender Create( * enabled. * @param getDataPool stream to use when using separate streams for heartbeat is disabled. */ - public static StreamPoolHeartbeatSender Create( + public static StreamPoolHeartbeatSender create( @Nonnull WindmillStreamPool dedicatedHeartbeatPool, @Nonnull WindmillStreamPool getDataPool, @Nonnull StreamingGlobalConfigHandle configHandle) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index dadf02171235..6eeb7bd6bbfc 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -96,6 +96,7 @@ import org.apache.beam.runners.dataflow.util.CloudObjects; import org.apache.beam.runners.dataflow.util.PropertyNames; import org.apache.beam.runners.dataflow.util.Structs; +import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor; import org.apache.beam.runners.dataflow.worker.streaming.ComputationState; import org.apache.beam.runners.dataflow.worker.streaming.ComputationStateCache; import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork; @@ -104,6 +105,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.Work; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandleImpl; +import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingCounters; import org.apache.beam.runners.dataflow.worker.testing.RestoreDataflowLoggingMDC; import org.apache.beam.runners.dataflow.worker.testing.TestCountingSource; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; @@ -129,6 +131,9 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WatermarkHold; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest; import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.FakeGetDataClient; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory; +import org.apache.beam.runners.dataflow.worker.windmill.testing.FakeWindmillStubFactory; +import org.apache.beam.runners.dataflow.worker.windmill.testing.FakeWindmillStubFactoryFactory; import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; @@ -178,6 +183,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheStats; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.UnsignedLong; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -285,6 +291,7 @@ public Long get() { private final FakeWindmillServer server = new FakeWindmillServer( errorCollector, computationId -> computationStateCache.get(computationId)); + private StreamingCounters streamingCounters; public StreamingDataflowWorkerTest(Boolean streamingEngine) { this.streamingEngine = streamingEngine; @@ -304,9 +311,20 @@ private static CounterUpdate getCounter(Iterable counters, String return null; } + private Iterable buildCounters() { + return Iterables.concat( + streamingCounters + .pendingDeltaCounters() + .extractModifiedDeltaUpdates(DataflowCounterUpdateExtractor.INSTANCE), + streamingCounters + .pendingCumulativeCounters() + .extractUpdates(false, DataflowCounterUpdateExtractor.INSTANCE)); + } + @Before public void setUp() { server.clearCommitsReceived(); + streamingCounters = StreamingCounters.create(); } @After @@ -856,7 +874,13 @@ private StreamingDataflowWorker makeWorker( streamingDataflowWorkerTestParams.clock(), streamingDataflowWorkerTestParams.executorSupplier(), mockGlobalConfigHandle, - streamingDataflowWorkerTestParams.localRetryTimeoutMs()); + streamingDataflowWorkerTestParams.localRetryTimeoutMs(), + streamingCounters, + new FakeWindmillStubFactoryFactory( + new FakeWindmillStubFactory( + () -> + WindmillChannelFactory.inProcessChannel( + "StreamingDataflowWorkerTestChannel")))); this.computationStateCache = worker.getComputationStateCache(); return worker; } @@ -1715,7 +1739,7 @@ public void testMergeWindows() throws Exception { intervalWindowBytes(WINDOW_AT_ZERO))); Map result = server.waitForAndGetCommits(1); - Iterable counters = worker.buildCounters(); + Iterable counters = buildCounters(); // These tags and data are opaque strings and this is a change detector test. // The "/u" indicates the user's namespace, versus "/s" for system namespace @@ -1836,7 +1860,7 @@ public void testMergeWindows() throws Exception { expectedBytesRead += dataBuilder.build().getSerializedSize(); result = server.waitForAndGetCommits(1); - counters = worker.buildCounters(); + counters = buildCounters(); actualOutput = result.get(2L); assertEquals(1, actualOutput.getOutputMessagesCount()); @@ -2004,7 +2028,7 @@ public void testMergeWindowsCaching() throws Exception { intervalWindowBytes(WINDOW_AT_ZERO))); Map result = server.waitForAndGetCommits(1); - Iterable counters = worker.buildCounters(); + Iterable counters = buildCounters(); // These tags and data are opaque strings and this is a change detector test. // The "/u" indicates the user's namespace, versus "/s" for system namespace @@ -2125,7 +2149,7 @@ public void testMergeWindowsCaching() throws Exception { expectedBytesRead += dataBuilder.build().getSerializedSize(); result = server.waitForAndGetCommits(1); - counters = worker.buildCounters(); + counters = buildCounters(); actualOutput = result.get(2L); assertEquals(1, actualOutput.getOutputMessagesCount()); @@ -2430,7 +2454,7 @@ public void testUnboundedSources() throws Exception { null)); Map result = server.waitForAndGetCommits(1); - Iterable counters = worker.buildCounters(); + Iterable counters = buildCounters(); Windmill.WorkItemCommitRequest commit = result.get(1L); UnsignedLong finalizeId = UnsignedLong.fromLongBits(commit.getSourceStateUpdates().getFinalizeIds(0)); @@ -2492,7 +2516,7 @@ public void testUnboundedSources() throws Exception { null)); result = server.waitForAndGetCommits(1); - counters = worker.buildCounters(); + counters = buildCounters(); commit = result.get(2L); finalizeId = UnsignedLong.fromLongBits(commit.getSourceStateUpdates().getFinalizeIds(0)); @@ -2540,7 +2564,7 @@ public void testUnboundedSources() throws Exception { null)); result = server.waitForAndGetCommits(1); - counters = worker.buildCounters(); + counters = buildCounters(); commit = result.get(3L); finalizeId = UnsignedLong.fromLongBits(commit.getSourceStateUpdates().getFinalizeIds(0)); @@ -2710,7 +2734,7 @@ public void testUnboundedSourceWorkRetry() throws Exception { server.whenGetWorkCalled().thenReturn(work); Map result = server.waitForAndGetCommits(1); - Iterable counters = worker.buildCounters(); + Iterable counters = buildCounters(); Windmill.WorkItemCommitRequest commit = result.get(1L); UnsignedLong finalizeId = UnsignedLong.fromLongBits(commit.getSourceStateUpdates().getFinalizeIds(0)); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java index 5a2df4baae61..4df3bf7cd823 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java @@ -60,6 +60,8 @@ private SingleSourceWorkerHarness createWorkerHarness( .setWaitForResources(waitForResources) .setStreamingWorkScheduler(streamingWorkScheduler) .setComputationStateFetcher(computationStateFetcher) + // no-op throttle time supplier. + .setThrottledTimeTracker(() -> 0L) .setGetWorkSender(getWorkSender) .build(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporterTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporterTest.java index 7e65a495638f..f348e4cf1bdb 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporterTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporterTest.java @@ -39,14 +39,15 @@ @RunWith(JUnit4.class) public class StreamingWorkerStatusReporterTest { - private final long DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME = 1000; - private final long DEFAULT_HARNESS_REPORTING_PERIOD = 10000; - private final long DEFAULT_PER_WORKER_METRICS_PERIOD = 30000; + private static final long DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME = 1000; + private static final long DEFAULT_HARNESS_REPORTING_PERIOD = 10000; + private static final long DEFAULT_PER_WORKER_METRICS_PERIOD = 30000; private BoundedQueueExecutor mockExecutor; private WorkUnitClient mockWorkUnitClient; private FailureTracker mockFailureTracker; private MemoryMonitor mockMemoryMonitor; + private StreamingWorkerStatusReporter reporter; @Before public void setUp() { @@ -54,23 +55,11 @@ public void setUp() { this.mockWorkUnitClient = mock(WorkUnitClient.class); this.mockFailureTracker = mock(FailureTracker.class); this.mockMemoryMonitor = mock(MemoryMonitor.class); + this.reporter = buildWorkerStatusReporterForTest(); } @Test public void testOverrideMaximumThreadCount() throws Exception { - StreamingWorkerStatusReporter reporter = - StreamingWorkerStatusReporter.forTesting( - true, - mockWorkUnitClient, - () -> DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME, - () -> Collections.emptyList(), - mockFailureTracker, - StreamingCounters.create(), - mockMemoryMonitor, - mockExecutor, - (threadName) -> Executors.newSingleThreadScheduledExecutor(), - DEFAULT_HARNESS_REPORTING_PERIOD, - DEFAULT_PER_WORKER_METRICS_PERIOD); StreamingScalingReportResponse streamingScalingReportResponse = new StreamingScalingReportResponse().setMaximumThreadCount(10); WorkerMessageResponse workerMessageResponse = @@ -84,23 +73,25 @@ public void testOverrideMaximumThreadCount() throws Exception { @Test public void testHandleEmptyWorkerMessageResponse() throws Exception { - StreamingWorkerStatusReporter reporter = - StreamingWorkerStatusReporter.forTesting( - true, - mockWorkUnitClient, - () -> DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME, - () -> Collections.emptyList(), - mockFailureTracker, - StreamingCounters.create(), - mockMemoryMonitor, - mockExecutor, - (threadName) -> Executors.newSingleThreadScheduledExecutor(), - DEFAULT_HARNESS_REPORTING_PERIOD, - DEFAULT_PER_WORKER_METRICS_PERIOD); - WorkerMessageResponse workerMessageResponse = new WorkerMessageResponse(); when(mockWorkUnitClient.reportWorkerMessage(any())) - .thenReturn(Collections.singletonList(workerMessageResponse)); + .thenReturn(Collections.singletonList(new WorkerMessageResponse())); reporter.reportPeriodicWorkerMessage(); verify(mockExecutor, Mockito.times(0)).setMaximumPoolSize(anyInt(), anyInt()); } + + private StreamingWorkerStatusReporter buildWorkerStatusReporterForTest() { + return StreamingWorkerStatusReporter.builder() + .setPublishCounters(true) + .setDataflowServiceClient(mockWorkUnitClient) + .setWindmillQuotaThrottleTime(() -> DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME) + .setAllStageInfo(Collections::emptyList) + .setFailureTracker(mockFailureTracker) + .setStreamingCounters(StreamingCounters.create()) + .setMemoryMonitor(mockMemoryMonitor) + .setWorkExecutor(mockExecutor) + .setExecutorFactory((threadName) -> Executors.newSingleThreadScheduledExecutor()) + .setWindmillHarnessUpdateReportingPeriodMillis(DEFAULT_HARNESS_REPORTING_PERIOD) + .setPerWorkerMetricsUpdateReportingPeriodMillis(DEFAULT_PER_WORKER_METRICS_PERIOD) + .build(); + } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSenderTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSenderTest.java index ed915088d0a6..acbb3aebbcf5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSenderTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSenderTest.java @@ -39,7 +39,7 @@ public class StreamPoolHeartbeatSenderTest { public void sendsHeartbeatsOnStream() { FakeWindmillServer server = new FakeWindmillServer(new ErrorCollector(), c -> Optional.empty()); StreamPoolHeartbeatSender heartbeatSender = - StreamPoolHeartbeatSender.Create( + StreamPoolHeartbeatSender.create( WindmillStreamPool.create(1, Duration.standardSeconds(10), server::getDataStream)); Heartbeats.Builder heartbeatsBuilder = Heartbeats.builder(); heartbeatsBuilder @@ -59,7 +59,7 @@ public void sendsHeartbeatsOnDedicatedStream() { FakeGlobalConfigHandle configHandle = new FakeGlobalConfigHandle(getGlobalConfig(/*useSeparateHeartbeatStreams=*/ true)); StreamPoolHeartbeatSender heartbeatSender = - StreamPoolHeartbeatSender.Create( + StreamPoolHeartbeatSender.create( WindmillStreamPool.create( 1, Duration.standardSeconds(10), dedicatedServer::getDataStream), WindmillStreamPool.create( @@ -104,7 +104,7 @@ public void sendsHeartbeatsOnGetDataStream() { FakeGlobalConfigHandle configHandle = new FakeGlobalConfigHandle(getGlobalConfig(/*useSeparateHeartbeatStreams=*/ false)); StreamPoolHeartbeatSender heartbeatSender = - StreamPoolHeartbeatSender.Create( + StreamPoolHeartbeatSender.create( WindmillStreamPool.create( 1, Duration.standardSeconds(10), dedicatedServer::getDataStream), WindmillStreamPool.create(