diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java index 6a0208f1447f..61c38dde2b42 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java @@ -20,6 +20,7 @@ import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.Hidden; import org.apache.beam.sdk.options.PipelineOptions; import org.joda.time.Duration; @@ -219,10 +220,8 @@ public interface DataflowStreamingPipelineOptions extends PipelineOptions { void setWindmillServiceStreamMaxBackoffMillis(int value); - @Description( - "If true, Dataflow streaming pipeline will be running in direct path mode." - + " VMs must have IPv6 enabled for this to work.") - @Default.Boolean(false) + @Description("Enables direct path mode for streaming engine.") + @Default.InstanceFactory(EnableWindmillServiceDirectPathFactory.class) boolean getIsWindmillServiceDirectPathEnabled(); void setIsWindmillServiceDirectPathEnabled(boolean isWindmillServiceDirectPathEnabled); @@ -300,4 +299,12 @@ public Integer create(PipelineOptions options) { return streamingOptions.isEnableStreamingEngine() ? Integer.MAX_VALUE : 1; } } + + /** EnableStreamingEngine defaults to false unless one of the experiment is set. */ + class EnableWindmillServiceDirectPathFactory implements DefaultValueFactory { + @Override + public Boolean create(PipelineOptions options) { + return ExperimentalOptions.hasExperiment(options, "enable_windmill_service_direct_path"); + } + } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 524906023722..e47f74d17bc0 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -17,11 +17,11 @@ */ package org.apache.beam.runners.dataflow.worker; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.remoteChannel; -import com.google.api.services.dataflow.model.CounterUpdate; import com.google.api.services.dataflow.model.MapTask; import com.google.auto.value.AutoValue; +import java.io.PrintWriter; import java.util.List; import java.util.Map; import java.util.Optional; @@ -33,12 +33,13 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import javax.annotation.Nullable; import org.apache.beam.runners.core.metrics.MetricsLogger; import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions; -import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor; import org.apache.beam.runners.dataflow.worker.status.DebugCapture; import org.apache.beam.runners.dataflow.worker.status.WorkerStatusPages; import org.apache.beam.runners.dataflow.worker.streaming.ComputationState; @@ -50,7 +51,9 @@ import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingApplianceComputationConfigFetcher; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEngineComputationConfigFetcher; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandleImpl; +import org.apache.beam.runners.dataflow.worker.streaming.harness.FanOutStreamingEngineWorkerHarness; import org.apache.beam.runners.dataflow.worker.streaming.harness.SingleSourceWorkerHarness; import org.apache.beam.runners.dataflow.worker.streaming.harness.SingleSourceWorkerHarness.GetWorkSender; import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingCounters; @@ -63,6 +66,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader; import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub; import org.apache.beam.runners.dataflow.worker.windmill.appliance.JniWindmillApplianceServer; +import org.apache.beam.runners.dataflow.worker.windmill.client.CloseableStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool; import org.apache.beam.runners.dataflow.worker.windmill.client.commits.CompleteCommit; @@ -77,8 +81,14 @@ import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcDispatcherClient; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillServer; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCache; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingRemoteStubFactory; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingStubFactory; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactory; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactoryImpl; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache; +import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget; +import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetDistributors; import org.apache.beam.runners.dataflow.worker.windmill.work.processing.StreamingWorkScheduler; import org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.FailureTracker; import org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.StreamingApplianceFailureTracker; @@ -96,8 +106,10 @@ import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.util.construction.CoderTranslation; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheStats; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.joda.time.Duration; import org.joda.time.Instant; @@ -118,6 +130,8 @@ public final class StreamingDataflowWorker { */ public static final int MAX_SINK_BYTES = 10_000_000; + public static final String STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL = + "streaming_engine_use_job_settings_for_heartbeat_pool"; private static final Logger LOG = LoggerFactory.getLogger(StreamingDataflowWorker.class); /** @@ -140,6 +154,8 @@ public final class StreamingDataflowWorker { private static final int DEFAULT_STATUS_PORT = 8081; private static final Random CLIENT_ID_GENERATOR = new Random(); private static final String CHANNELZ_PATH = "/channelz"; + private static final String BEAM_FN_API_EXPERIMENT = "beam_fn_api"; + private static final String ENABLE_IPV6_EXPERIMENT = "enable_private_ipv6_google_access"; private final WindmillStateCache stateCache; private final StreamingWorkerStatusPages statusPages; @@ -153,9 +169,8 @@ public final class StreamingDataflowWorker { private final ReaderCache readerCache; private final DataflowExecutionStateSampler sampler = DataflowExecutionStateSampler.instance(); private final ActiveWorkRefresher activeWorkRefresher; - private final WorkCommitter workCommitter; private final StreamingWorkerStatusReporter workerStatusReporter; - private final StreamingCounters streamingCounters; + private final int numCommitThreads; private StreamingDataflowWorker( WindmillServerStub windmillServer, @@ -168,17 +183,17 @@ private StreamingDataflowWorker( DataflowWorkerHarnessOptions options, HotKeyLogger hotKeyLogger, Supplier clock, - StreamingWorkerStatusReporter workerStatusReporter, + StreamingWorkerStatusReporterFactory streamingWorkerStatusReporterFactory, FailureTracker failureTracker, WorkFailureProcessor workFailureProcessor, StreamingCounters streamingCounters, MemoryMonitor memoryMonitor, GrpcWindmillStreamFactory windmillStreamFactory, Function executorSupplier, - ConcurrentMap stageInfoMap) { + ConcurrentMap stageInfoMap, + @Nullable GrpcDispatcherClient dispatcherClient) { // Register standard file systems. FileSystems.setDefaultPipelineOptions(options); - this.configFetcher = configFetcher; this.computationStateCache = computationStateCache; this.stateCache = windmillStateCache; @@ -187,34 +202,13 @@ private StreamingDataflowWorker( Duration.standardSeconds(options.getReaderCacheTimeoutSec()), Executors.newCachedThreadPool()); this.options = options; - - boolean windmillServiceEnabled = options.isEnableStreamingEngine(); - - int numCommitThreads = 1; - if (windmillServiceEnabled && options.getWindmillServiceCommitThreads() > 0) { - numCommitThreads = options.getWindmillServiceCommitThreads(); - } - - this.workCommitter = - windmillServiceEnabled - ? StreamingEngineWorkCommitter.builder() - .setCommitWorkStreamFactory( - WindmillStreamPool.create( - numCommitThreads, - COMMIT_STREAM_TIMEOUT, - windmillServer::commitWorkStream) - ::getCloseableStream) - .setNumCommitSenders(numCommitThreads) - .setOnCommitComplete(this::onCompleteCommit) - .build() - : StreamingApplianceWorkCommitter.create( - windmillServer::commitWork, this::onCompleteCommit); - this.workUnitExecutor = workUnitExecutor; - - this.workerStatusReporter = workerStatusReporter; - this.streamingCounters = streamingCounters; this.memoryMonitor = BackgroundMemoryMonitor.create(memoryMonitor); + this.numCommitThreads = + options.isEnableStreamingEngine() + ? Math.max(options.getWindmillServiceCommitThreads(), 1) + : 1; + StreamingWorkScheduler streamingWorkScheduler = StreamingWorkScheduler.create( options, @@ -231,107 +225,190 @@ private StreamingDataflowWorker( ID_GENERATOR, configFetcher.getGlobalConfigHandle(), stageInfoMap); - ThrottlingGetDataMetricTracker getDataMetricTracker = new ThrottlingGetDataMetricTracker(memoryMonitor); - WorkerStatusPages workerStatusPages = - WorkerStatusPages.create(DEFAULT_STATUS_PORT, memoryMonitor); - StreamingWorkerStatusPages.Builder statusPagesBuilder = StreamingWorkerStatusPages.builder(); - int stuckCommitDurationMillis; - GetDataClient getDataClient; - HeartbeatSender heartbeatSender; - if (windmillServiceEnabled) { - WindmillStreamPool getDataStreamPool = - WindmillStreamPool.create( - Math.max(1, options.getWindmillGetDataStreamCount()), - GET_DATA_STREAM_TIMEOUT, - windmillServer::getDataStream); - getDataClient = new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool); - if (options.getUseSeparateWindmillHeartbeatStreams() != null) { + // Status page members. Different implementations on whether the harness is streaming engine + // direct path, streaming engine cloud path, or streaming appliance. + @Nullable ChannelzServlet channelzServlet = null; + Consumer getDataStatusProvider; + Supplier currentActiveCommitBytesProvider; + if (isDirectPathPipeline(options)) { + FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness = + FanOutStreamingEngineWorkerHarness.create( + createJobHeader(options, clientId), + GetWorkBudget.builder() + .setItems(chooseMaxBundlesOutstanding(options)) + .setBytes(MAX_GET_WORK_FETCH_BYTES) + .build(), + windmillStreamFactory, + (workItem, watermarks, processingContext, getWorkStreamLatencies) -> + computationStateCache + .get(processingContext.computationId()) + .ifPresent( + computationState -> { + memoryMonitor.waitForResources("GetWork"); + streamingWorkScheduler.scheduleWork( + computationState, + workItem, + watermarks, + processingContext, + getWorkStreamLatencies); + }), + createStubFactory(options), + GetWorkBudgetDistributors.distributeEvenly(), + Preconditions.checkNotNull(dispatcherClient), + commitWorkStream -> + StreamingEngineWorkCommitter.builder() + .setBackendWorkerToken(commitWorkStream.backendWorkerToken()) + .setOnCommitComplete(this::onCompleteCommit) + .setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1)) + .setCommitWorkStreamFactory( + () -> CloseableStream.create(commitWorkStream, () -> {})) + .build(), + getDataMetricTracker); + getDataStatusProvider = getDataMetricTracker::printHtml; + currentActiveCommitBytesProvider = + fanOutStreamingEngineWorkerHarness::currentActiveCommitBytes; + channelzServlet = + createChannelZServlet( + options, fanOutStreamingEngineWorkerHarness::currentWindmillEndpoints); + this.streamingWorkerHarness = fanOutStreamingEngineWorkerHarness; + } else { + Windmill.GetWorkRequest request = createGetWorkRequest(clientId, options); + GetDataClient getDataClient; + HeartbeatSender heartbeatSender; + WorkCommitter workCommitter; + GetWorkSender getWorkSender; + if (options.isEnableStreamingEngine()) { + WindmillStreamPool getDataStreamPool = + WindmillStreamPool.create( + Math.max(1, options.getWindmillGetDataStreamCount()), + GET_DATA_STREAM_TIMEOUT, + windmillServer::getDataStream); + getDataClient = new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool); heartbeatSender = - StreamPoolHeartbeatSender.Create( - Boolean.TRUE.equals(options.getUseSeparateWindmillHeartbeatStreams()) - ? separateHeartbeatPool(windmillServer) - : getDataStreamPool); - + createStreamingEngineHeartbeatSender( + options, windmillServer, getDataStreamPool, configFetcher.getGlobalConfigHandle()); + channelzServlet = + createChannelZServlet(options, windmillServer::getWindmillServiceEndpoints); + workCommitter = + StreamingEngineWorkCommitter.builder() + .setCommitWorkStreamFactory( + WindmillStreamPool.create( + numCommitThreads, + COMMIT_STREAM_TIMEOUT, + windmillServer::commitWorkStream) + ::getCloseableStream) + .setNumCommitSenders(numCommitThreads) + .setOnCommitComplete(this::onCompleteCommit) + .build(); + getWorkSender = + GetWorkSender.forStreamingEngine( + receiver -> windmillServer.getWorkStream(request, receiver)); } else { - heartbeatSender = - StreamPoolHeartbeatSender.Create( - separateHeartbeatPool(windmillServer), - getDataStreamPool, - configFetcher.getGlobalConfigHandle()); + getDataClient = new ApplianceGetDataClient(windmillServer, getDataMetricTracker); + heartbeatSender = new ApplianceHeartbeatSender(windmillServer::getData); + workCommitter = + StreamingApplianceWorkCommitter.create( + windmillServer::commitWork, this::onCompleteCommit); + getWorkSender = GetWorkSender.forAppliance(() -> windmillServer.getWork(request)); } - stuckCommitDurationMillis = - options.getStuckCommitDurationMillis() > 0 ? options.getStuckCommitDurationMillis() : 0; - statusPagesBuilder - .setDebugCapture( - new DebugCapture.Manager(options, workerStatusPages.getDebugCapturePages())) - .setChannelzServlet( - new ChannelzServlet( - CHANNELZ_PATH, options, windmillServer::getWindmillServiceEndpoints)) - .setWindmillStreamFactory(windmillStreamFactory); - } else { - getDataClient = new ApplianceGetDataClient(windmillServer, getDataMetricTracker); - heartbeatSender = new ApplianceHeartbeatSender(windmillServer::getData); - stuckCommitDurationMillis = 0; + getDataStatusProvider = getDataClient::printHtml; + currentActiveCommitBytesProvider = workCommitter::currentActiveCommitBytes; + + this.streamingWorkerHarness = + SingleSourceWorkerHarness.builder() + .setStreamingWorkScheduler(streamingWorkScheduler) + .setWorkCommitter(workCommitter) + .setGetDataClient(getDataClient) + .setComputationStateFetcher(this.computationStateCache::get) + .setWaitForResources(() -> memoryMonitor.waitForResources("GetWork")) + .setHeartbeatSender(heartbeatSender) + .setThrottleTimeSupplier(windmillServer::getAndResetThrottleTime) + .setGetWorkSender(getWorkSender) + .build(); } + this.workerStatusReporter = + streamingWorkerStatusReporterFactory.createStatusReporter( + streamingWorkerHarness::getAndResetThrottleTime); this.activeWorkRefresher = new ActiveWorkRefresher( clock, options.getActiveWorkRefreshPeriodMillis(), - stuckCommitDurationMillis, + options.isEnableStreamingEngine() + ? Math.max(options.getStuckCommitDurationMillis(), 0) + : 0, computationStateCache::getAllPresentComputations, sampler, executorSupplier.apply("RefreshWork"), getDataMetricTracker::trackHeartbeats); this.statusPages = - statusPagesBuilder + createStatusPageBuilder(options, windmillStreamFactory, memoryMonitor) .setClock(clock) .setClientId(clientId) .setIsRunning(running) - .setStatusPages(workerStatusPages) .setStateCache(stateCache) .setComputationStateCache(this.computationStateCache) - .setCurrentActiveCommitBytes(workCommitter::currentActiveCommitBytes) - .setGetDataStatusProvider(getDataClient::printHtml) .setWorkUnitExecutor(workUnitExecutor) .setGlobalConfigHandle(configFetcher.getGlobalConfigHandle()) + .setChannelzServlet(channelzServlet) + .setGetDataStatusProvider(getDataStatusProvider) + .setCurrentActiveCommitBytes(currentActiveCommitBytesProvider) .build(); - Windmill.GetWorkRequest request = - Windmill.GetWorkRequest.newBuilder() - .setClientId(clientId) - .setMaxItems(chooseMaximumBundlesOutstanding()) - .setMaxBytes(MAX_GET_WORK_FETCH_BYTES) - .build(); - - this.streamingWorkerHarness = - SingleSourceWorkerHarness.builder() - .setStreamingWorkScheduler(streamingWorkScheduler) - .setWorkCommitter(workCommitter) - .setGetDataClient(getDataClient) - .setComputationStateFetcher(this.computationStateCache::get) - .setWaitForResources(() -> memoryMonitor.waitForResources("GetWork")) - .setHeartbeatSender(heartbeatSender) - .setGetWorkSender( - windmillServiceEnabled - ? GetWorkSender.forStreamingEngine( - receiver -> windmillServer.getWorkStream(request, receiver)) - : GetWorkSender.forAppliance(() -> windmillServer.getWork(request))) - .build(); - - LOG.debug("windmillServiceEnabled: {}", windmillServiceEnabled); + LOG.debug("isDirectPathEnabled: {}", options.getIsWindmillServiceDirectPathEnabled()); + LOG.debug("windmillServiceEnabled: {}", options.isEnableStreamingEngine()); LOG.debug("WindmillServiceEndpoint: {}", options.getWindmillServiceEndpoint()); LOG.debug("WindmillServicePort: {}", options.getWindmillServicePort()); LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport()); } - private static WindmillStreamPool separateHeartbeatPool( - WindmillServerStub windmillServer) { - return WindmillStreamPool.create(1, GET_DATA_STREAM_TIMEOUT, windmillServer::getDataStream); + private static StreamingWorkerStatusPages.Builder createStatusPageBuilder( + DataflowWorkerHarnessOptions options, + GrpcWindmillStreamFactory windmillStreamFactory, + MemoryMonitor memoryMonitor) { + WorkerStatusPages workerStatusPages = + WorkerStatusPages.create(DEFAULT_STATUS_PORT, memoryMonitor); + + StreamingWorkerStatusPages.Builder streamingStatusPages = + StreamingWorkerStatusPages.builder().setStatusPages(workerStatusPages); + + return options.isEnableStreamingEngine() + ? streamingStatusPages + .setDebugCapture( + new DebugCapture.Manager(options, workerStatusPages.getDebugCapturePages())) + .setWindmillStreamFactory(windmillStreamFactory) + : streamingStatusPages; + } + + private static ChannelzServlet createChannelZServlet( + DataflowWorkerHarnessOptions options, + Supplier> windmillEndpointProvider) { + return new ChannelzServlet(CHANNELZ_PATH, options, windmillEndpointProvider); + } + + private static HeartbeatSender createStreamingEngineHeartbeatSender( + DataflowWorkerHarnessOptions options, + WindmillServerStub windmillClient, + WindmillStreamPool getDataStreamPool, + StreamingGlobalConfigHandle globalConfigHandle) { + // Experiment gates the logic till backend changes are rollback safe + if (!DataflowRunner.hasExperiment(options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL) + || options.getUseSeparateWindmillHeartbeatStreams() != null) { + return StreamPoolHeartbeatSender.Create( + Boolean.TRUE.equals(options.getUseSeparateWindmillHeartbeatStreams()) + ? WindmillStreamPool.create(1, GET_DATA_STREAM_TIMEOUT, windmillClient::getDataStream) + : getDataStreamPool); + + } else { + return StreamPoolHeartbeatSender.Create( + WindmillStreamPool.create(1, GET_DATA_STREAM_TIMEOUT, windmillClient::getDataStream), + getDataStreamPool, + globalConfigHandle); + } } public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions options) { @@ -387,17 +464,21 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o failureTracker, () -> Optional.ofNullable(memoryMonitor.tryToDumpHeap()), clock); - StreamingWorkerStatusReporter workerStatusReporter = - StreamingWorkerStatusReporter.create( - dataflowServiceClient, - windmillServer::getAndResetThrottleTime, - stageInfo::values, - failureTracker, - streamingCounters, - memoryMonitor, - workExecutor, - options.getWindmillHarnessUpdateReportingPeriod().getMillis(), - options.getPerWorkerMetricsUpdateReportingPeriodMillis()); + StreamingWorkerStatusReporterFactory workerStatusReporterFactory = + throttleTimeSupplier -> + StreamingWorkerStatusReporter.builder() + .setDataflowServiceClient(dataflowServiceClient) + .setWindmillQuotaThrottleTime(throttleTimeSupplier) + .setAllStageInfo(stageInfo::values) + .setFailureTracker(failureTracker) + .setStreamingCounters(streamingCounters) + .setMemoryMonitor(memoryMonitor) + .setWorkExecutor(workExecutor) + .setWindmillHarnessUpdateReportingPeriodMillis( + options.getWindmillHarnessUpdateReportingPeriod().getMillis()) + .setPerWorkerMetricsUpdateReportingPeriodMillis( + options.getPerWorkerMetricsUpdateReportingPeriodMillis()) + .build(); return new StreamingDataflowWorker( windmillServer, @@ -410,14 +491,15 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o options, new HotKeyLogger(), clock, - workerStatusReporter, + workerStatusReporterFactory, failureTracker, workFailureProcessor, streamingCounters, memoryMonitor, configFetcherComputationStateCacheAndWindmillClient.windmillStreamFactory(), executorSupplier, - stageInfo); + stageInfo, + configFetcherComputationStateCacheAndWindmillClient.windmillDispatcherClient()); } /** @@ -436,6 +518,8 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o WindmillServerStub windmillServer; ComputationStateCache computationStateCache; GrpcWindmillStreamFactory windmillStreamFactory; + ConfigFetcherComputationStateCacheAndWindmillClient.Builder builder = + ConfigFetcherComputationStateCacheAndWindmillClient.builder(); if (options.isEnableStreamingEngine()) { GrpcDispatcherClient dispatcherClient = GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options)); @@ -452,19 +536,20 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o options.getWindmillServiceStreamingRpcHealthCheckPeriodMs()) .build(); windmillServer = GrpcWindmillServer.create(options, windmillStreamFactory, dispatcherClient); + builder.setWindmillDispatcherClient(dispatcherClient); } else { if (options.getWindmillServiceEndpoint() != null || options.getLocalWindmillHostport().startsWith("grpc:")) { + GrpcDispatcherClient dispatcherClient = + GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options)); windmillStreamFactory = windmillStreamFactoryBuilder .setHealthCheckIntervalMillis( options.getWindmillServiceStreamingRpcHealthCheckPeriodMs()) .build(); windmillServer = - GrpcWindmillServer.create( - options, - windmillStreamFactory, - GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options))); + GrpcWindmillServer.create(options, windmillStreamFactory, dispatcherClient); + builder.setWindmillDispatcherClient(dispatcherClient); } else { windmillStreamFactory = windmillStreamFactoryBuilder.build(); windmillServer = new JniWindmillApplianceServer(options.getLocalWindmillHostport()); @@ -477,8 +562,50 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o computationStateCache = computationStateCacheFactory.apply(configFetcher); } - return ConfigFetcherComputationStateCacheAndWindmillClient.create( - configFetcher, computationStateCache, windmillServer, windmillStreamFactory); + return builder + .setConfigFetcher(configFetcher) + .setComputationStateCache(computationStateCache) + .setWindmillServer(windmillServer) + .setWindmillStreamFactory(windmillStreamFactory) + .build(); + } + + private static boolean isDirectPathPipeline(DataflowWorkerHarnessOptions options) { + if (options.isEnableStreamingEngine() && options.getIsWindmillServiceDirectPathEnabled()) { + boolean isIpV6Enabled = + Optional.ofNullable(options.getDataflowServiceOptions()) + .map(serviceOptions -> serviceOptions.contains(ENABLE_IPV6_EXPERIMENT)) + .orElse(false); + if (isIpV6Enabled) { + return true; + } + LOG.warn( + "DirectPath is currently only supported with IPv6 networking stack. Defaulting to" + + " CloudPath."); + } + return false; + } + + private static void validateWorkerOptions(DataflowWorkerHarnessOptions options) { + Preconditions.checkArgument( + options.isStreaming(), + "%s instantiated with options indicating batch use", + StreamingDataflowWorker.class.getName()); + + Preconditions.checkArgument( + !DataflowRunner.hasExperiment(options, BEAM_FN_API_EXPERIMENT), + "%s cannot be main() class with beam_fn_api enabled", + StreamingDataflowWorker.class.getSimpleName()); + } + + private static ChannelCachingStubFactory createStubFactory( + DataflowWorkerHarnessOptions workerOptions) { + return ChannelCachingRemoteStubFactory.create( + workerOptions.getGcpCredential(), + ChannelCache.create( + serviceAddress -> + remoteChannel( + serviceAddress, workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec()))); } @VisibleForTesting @@ -494,7 +621,9 @@ static StreamingDataflowWorker forTesting( Supplier clock, Function executorSupplier, StreamingGlobalConfigHandleImpl globalConfigHandle, - int localRetryTimeoutMs) { + int localRetryTimeoutMs, + StreamingCounters streamingCounters, + WindmillStubFactoryFactory stubFactory) { ConcurrentMap stageInfo = new ConcurrentHashMap<>(); BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options); WindmillStateCache stateCache = @@ -537,7 +666,6 @@ static StreamingDataflowWorker forTesting( stateNameMap, stateCache.forComputation(mapTask.getStageName()))); MemoryMonitor memoryMonitor = MemoryMonitor.fromOptions(options); - StreamingCounters streamingCounters = StreamingCounters.create(); FailureTracker failureTracker = options.isEnableStreamingEngine() ? StreamingEngineFailureTracker.create( @@ -553,19 +681,23 @@ static StreamingDataflowWorker forTesting( () -> Optional.ofNullable(memoryMonitor.tryToDumpHeap()), clock, localRetryTimeoutMs); - StreamingWorkerStatusReporter workerStatusReporter = - StreamingWorkerStatusReporter.forTesting( - publishCounters, - workUnitClient, - windmillServer::getAndResetThrottleTime, - stageInfo::values, - failureTracker, - streamingCounters, - memoryMonitor, - workExecutor, - executorSupplier, - options.getWindmillHarnessUpdateReportingPeriod().getMillis(), - options.getPerWorkerMetricsUpdateReportingPeriodMillis()); + StreamingWorkerStatusReporterFactory workerStatusReporterFactory = + throttleTimeSupplier -> + StreamingWorkerStatusReporter.builder() + .setPublishCounters(publishCounters) + .setDataflowServiceClient(workUnitClient) + .setWindmillQuotaThrottleTime(throttleTimeSupplier) + .setAllStageInfo(stageInfo::values) + .setFailureTracker(failureTracker) + .setStreamingCounters(streamingCounters) + .setMemoryMonitor(memoryMonitor) + .setWorkExecutor(workExecutor) + .setExecutorFactory(executorSupplier) + .setWindmillHarnessUpdateReportingPeriodMillis( + options.getWindmillHarnessUpdateReportingPeriod().getMillis()) + .setPerWorkerMetricsUpdateReportingPeriodMillis( + options.getPerWorkerMetricsUpdateReportingPeriodMillis()) + .build(); GrpcWindmillStreamFactory.Builder windmillStreamFactory = createGrpcwindmillStreamFactoryBuilder(options, 1) @@ -583,7 +715,7 @@ static StreamingDataflowWorker forTesting( options, hotKeyLogger, clock, - workerStatusReporter, + workerStatusReporterFactory, failureTracker, workFailureProcessor, streamingCounters, @@ -595,7 +727,8 @@ static StreamingDataflowWorker forTesting( .build() : windmillStreamFactory.build(), executorSupplier, - stageInfo); + stageInfo, + GrpcDispatcherClient.create(options, stubFactory)); } private static GrpcWindmillStreamFactory.Builder createGrpcwindmillStreamFactoryBuilder( @@ -604,13 +737,7 @@ private static GrpcWindmillStreamFactory.Builder createGrpcwindmillStreamFactory !options.isEnableStreamingEngine() && options.getLocalWindmillHostport() != null ? GrpcWindmillServer.LOCALHOST_MAX_BACKOFF : Duration.millis(options.getWindmillServiceStreamMaxBackoffMillis()); - return GrpcWindmillStreamFactory.of( - JobHeader.newBuilder() - .setJobId(options.getJobId()) - .setProjectId(options.getProject()) - .setWorkerId(options.getWorkerId()) - .setClientId(clientId) - .build()) + return GrpcWindmillStreamFactory.of(createJobHeader(options, clientId)) .setWindmillMessagesBetweenIsReadyChecks(options.getWindmillMessagesBetweenIsReadyChecks()) .setMaxBackOffSupplier(() -> maxBackoff) .setLogEveryNStreamFailures(options.getWindmillServiceStreamingLogEveryNStreamFailures()) @@ -621,6 +748,15 @@ private static GrpcWindmillStreamFactory.Builder createGrpcwindmillStreamFactory options, "streaming_engine_disable_new_heartbeat_requests")); } + private static JobHeader createJobHeader(DataflowWorkerHarnessOptions options, long clientId) { + return JobHeader.newBuilder() + .setJobId(options.getJobId()) + .setProjectId(options.getProject()) + .setWorkerId(options.getWorkerId()) + .setClientId(clientId) + .build(); + } + private static BoundedQueueExecutor createWorkUnitExecutor(DataflowWorkerHarnessOptions options) { return new BoundedQueueExecutor( chooseMaxThreads(options), @@ -639,15 +775,7 @@ public static void main(String[] args) throws Exception { DataflowWorkerHarnessHelper.initializeGlobalStateAndPipelineOptions( StreamingDataflowWorker.class, DataflowWorkerHarnessOptions.class); DataflowWorkerHarnessHelper.configureLogging(options); - checkArgument( - options.isStreaming(), - "%s instantiated with options indicating batch use", - StreamingDataflowWorker.class.getName()); - - checkArgument( - !DataflowRunner.hasExperiment(options, "beam_fn_api"), - "%s cannot be main() class with beam_fn_api enabled", - StreamingDataflowWorker.class.getSimpleName()); + validateWorkerOptions(options); CoderTranslation.verifyModelCodersRegistered(); @@ -695,26 +823,20 @@ private static void enableBigQueryMetrics() { BigQuerySinkMetrics.setSupportStreamingInsertsMetrics(true); } + private static Windmill.GetWorkRequest createGetWorkRequest( + long clientId, DataflowWorkerHarnessOptions options) { + return Windmill.GetWorkRequest.newBuilder() + .setClientId(clientId) + .setMaxItems(chooseMaxBundlesOutstanding(options)) + .setMaxBytes(MAX_GET_WORK_FETCH_BYTES) + .build(); + } + @VisibleForTesting void reportPeriodicWorkerUpdatesForTest() { workerStatusReporter.reportPeriodicWorkerUpdates(); } - private int chooseMaximumNumberOfThreads() { - if (options.getNumberOfWorkerHarnessThreads() != 0) { - return options.getNumberOfWorkerHarnessThreads(); - } - return MAX_PROCESSING_THREADS; - } - - private int chooseMaximumBundlesOutstanding() { - int maxBundles = options.getMaxBundlesFromWindmillOutstanding(); - if (maxBundles > 0) { - return maxBundles; - } - return chooseMaximumNumberOfThreads() + 100; - } - @VisibleForTesting public boolean workExecutorIsEmpty() { return workUnitExecutor.executorQueueIsEmpty(); @@ -722,7 +844,7 @@ public boolean workExecutorIsEmpty() { @VisibleForTesting int numCommitThreads() { - return workCommitter.parallelism(); + return numCommitThreads; } @VisibleForTesting @@ -786,27 +908,17 @@ private void onCompleteCommit(CompleteCommit completeCommit) { completeCommit.shardedKey(), completeCommit.workId())); } - @VisibleForTesting - public Iterable buildCounters() { - return Iterables.concat( - streamingCounters - .pendingDeltaCounters() - .extractModifiedDeltaUpdates(DataflowCounterUpdateExtractor.INSTANCE), - streamingCounters - .pendingCumulativeCounters() - .extractUpdates(false, DataflowCounterUpdateExtractor.INSTANCE)); + @FunctionalInterface + private interface StreamingWorkerStatusReporterFactory { + StreamingWorkerStatusReporter createStatusReporter(Supplier throttleTimeSupplier); } @AutoValue abstract static class ConfigFetcherComputationStateCacheAndWindmillClient { - private static ConfigFetcherComputationStateCacheAndWindmillClient create( - ComputationConfig.Fetcher configFetcher, - ComputationStateCache computationStateCache, - WindmillServerStub windmillServer, - GrpcWindmillStreamFactory windmillStreamFactory) { - return new AutoValue_StreamingDataflowWorker_ConfigFetcherComputationStateCacheAndWindmillClient( - configFetcher, computationStateCache, windmillServer, windmillStreamFactory); + private static Builder builder() { + return new AutoValue_StreamingDataflowWorker_ConfigFetcherComputationStateCacheAndWindmillClient + .Builder(); } abstract ComputationConfig.Fetcher configFetcher(); @@ -816,6 +928,23 @@ private static ConfigFetcherComputationStateCacheAndWindmillClient create( abstract WindmillServerStub windmillServer(); abstract GrpcWindmillStreamFactory windmillStreamFactory(); + + abstract @Nullable GrpcDispatcherClient windmillDispatcherClient(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setConfigFetcher(ComputationConfig.Fetcher value); + + abstract Builder setComputationStateCache(ComputationStateCache value); + + abstract Builder setWindmillServer(WindmillServerStub value); + + abstract Builder setWindmillStreamFactory(GrpcWindmillStreamFactory value); + + abstract Builder setWindmillDispatcherClient(GrpcDispatcherClient value); + + abstract ConfigFetcherComputationStateCacheAndWindmillClient build(); + } } /** diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java index bc93e6d89c41..d71811e2d75d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java @@ -66,6 +66,7 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness { private final Function> computationStateFetcher; private final ExecutorService workProviderExecutor; private final GetWorkSender getWorkSender; + private final Supplier throttleTimeSupplier; SingleSourceWorkerHarness( WorkCommitter workCommitter, @@ -74,7 +75,8 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness { StreamingWorkScheduler streamingWorkScheduler, Runnable waitForResources, Function> computationStateFetcher, - GetWorkSender getWorkSender) { + GetWorkSender getWorkSender, + Supplier throttleTimeSupplier) { this.workCommitter = workCommitter; this.getDataClient = getDataClient; this.heartbeatSender = heartbeatSender; @@ -90,6 +92,7 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness { .build()); this.isRunning = new AtomicBoolean(false); this.getWorkSender = getWorkSender; + this.throttleTimeSupplier = throttleTimeSupplier; } public static SingleSourceWorkerHarness.Builder builder() { @@ -144,6 +147,11 @@ public void shutdown() { workCommitter.stop(); } + @Override + public long getAndResetThrottleTime() { + return throttleTimeSupplier.get(); + } + private void streamingEngineDispatchLoop( Function getWorkStreamFactory) { while (isRunning.get()) { @@ -254,6 +262,8 @@ Builder setComputationStateFetcher( Builder setGetWorkSender(GetWorkSender getWorkSender); + Builder setThrottleTimeSupplier(Supplier throttleTimeSupplier); + SingleSourceWorkerHarness build(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java index c1b4570e2260..02b101def753 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java @@ -25,4 +25,6 @@ public interface StreamingWorkerHarness { void start(); void shutdown(); + + long getAndResetThrottleTime(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java index ba77d8e1ce26..4c06d12d50a3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java @@ -27,6 +27,7 @@ import com.google.api.services.dataflow.model.WorkItemStatus; import com.google.api.services.dataflow.model.WorkerMessage; import com.google.api.services.dataflow.model.WorkerMessageResponse; +import com.google.auto.value.AutoBuilder; import java.io.IOException; import java.math.RoundingMode; import java.util.ArrayList; @@ -97,7 +98,7 @@ public final class StreamingWorkerStatusReporter { // Used to track the number of WorkerMessages that have been sent without PerWorkerMetrics. private final AtomicLong workerMessagesIndex; - private StreamingWorkerStatusReporter( + StreamingWorkerStatusReporter( boolean publishCounters, WorkUnitClient dataflowServiceClient, Supplier windmillQuotaThrottleTime, @@ -131,57 +132,13 @@ private StreamingWorkerStatusReporter( this.workerMessagesIndex = new AtomicLong(); } - public static StreamingWorkerStatusReporter create( - WorkUnitClient workUnitClient, - Supplier windmillQuotaThrottleTime, - Supplier> allStageInfo, - FailureTracker failureTracker, - StreamingCounters streamingCounters, - MemoryMonitor memoryMonitor, - BoundedQueueExecutor workExecutor, - long windmillHarnessUpdateReportingPeriodMillis, - long perWorkerMetricsUpdateReportingPeriodMillis) { - return new StreamingWorkerStatusReporter( - /* publishCounters= */ true, - workUnitClient, - windmillQuotaThrottleTime, - allStageInfo, - failureTracker, - streamingCounters, - memoryMonitor, - workExecutor, - threadName -> - Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat(threadName).build()), - windmillHarnessUpdateReportingPeriodMillis, - perWorkerMetricsUpdateReportingPeriodMillis); - } - - @VisibleForTesting - public static StreamingWorkerStatusReporter forTesting( - boolean publishCounters, - WorkUnitClient workUnitClient, - Supplier windmillQuotaThrottleTime, - Supplier> allStageInfo, - FailureTracker failureTracker, - StreamingCounters streamingCounters, - MemoryMonitor memoryMonitor, - BoundedQueueExecutor workExecutor, - Function executorFactory, - long windmillHarnessUpdateReportingPeriodMillis, - long perWorkerMetricsUpdateReportingPeriodMillis) { - return new StreamingWorkerStatusReporter( - publishCounters, - workUnitClient, - windmillQuotaThrottleTime, - allStageInfo, - failureTracker, - streamingCounters, - memoryMonitor, - workExecutor, - executorFactory, - windmillHarnessUpdateReportingPeriodMillis, - perWorkerMetricsUpdateReportingPeriodMillis); + public static Builder builder() { + return new AutoBuilder_StreamingWorkerStatusReporter_Builder() + .setPublishCounters(true) + .setExecutorFactory( + threadName -> + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat(threadName).build())); } /** @@ -228,6 +185,22 @@ private static void shutdownExecutor(ScheduledExecutorService executor) { } } + // Calculates the PerWorkerMetrics reporting frequency, ensuring alignment with the + // WorkerMessages RPC schedule. The desired reporting period + // (perWorkerMetricsUpdateReportingPeriodMillis) is adjusted to the nearest multiple + // of the RPC interval (windmillHarnessUpdateReportingPeriodMillis). + private static long getPerWorkerMetricsUpdateFrequency( + long windmillHarnessUpdateReportingPeriodMillis, + long perWorkerMetricsUpdateReportingPeriodMillis) { + if (windmillHarnessUpdateReportingPeriodMillis == 0) { + return 0; + } + return LongMath.divide( + perWorkerMetricsUpdateReportingPeriodMillis, + windmillHarnessUpdateReportingPeriodMillis, + RoundingMode.CEILING); + } + @SuppressWarnings("FutureReturnValueIgnored") public void start() { reportHarnessStartup(); @@ -276,22 +249,6 @@ private void reportHarnessStartup() { } } - // Calculates the PerWorkerMetrics reporting frequency, ensuring alignment with the - // WorkerMessages RPC schedule. The desired reporting period - // (perWorkerMetricsUpdateReportingPeriodMillis) is adjusted to the nearest multiple - // of the RPC interval (windmillHarnessUpdateReportingPeriodMillis). - private static long getPerWorkerMetricsUpdateFrequency( - long windmillHarnessUpdateReportingPeriodMillis, - long perWorkerMetricsUpdateReportingPeriodMillis) { - if (windmillHarnessUpdateReportingPeriodMillis == 0) { - return 0; - } - return LongMath.divide( - perWorkerMetricsUpdateReportingPeriodMillis, - windmillHarnessUpdateReportingPeriodMillis, - RoundingMode.CEILING); - } - /** Sends counter updates to Dataflow backend. */ private void sendWorkerUpdatesToDataflowService( CounterSet deltaCounters, CounterSet cumulativeCounters) throws IOException { @@ -496,4 +453,33 @@ private void updateThreadMetrics() { .maxOutstandingBundles() .addValue((long) workExecutor.maximumElementsOutstanding()); } + + @AutoBuilder + public interface Builder { + Builder setPublishCounters(boolean publishCounters); + + Builder setDataflowServiceClient(WorkUnitClient dataflowServiceClient); + + Builder setWindmillQuotaThrottleTime(Supplier windmillQuotaThrottleTime); + + Builder setAllStageInfo(Supplier> allStageInfo); + + Builder setFailureTracker(FailureTracker failureTracker); + + Builder setStreamingCounters(StreamingCounters streamingCounters); + + Builder setMemoryMonitor(MemoryMonitor memoryMonitor); + + Builder setWorkExecutor(BoundedQueueExecutor workExecutor); + + Builder setExecutorFactory(Function executorFactory); + + Builder setWindmillHarnessUpdateReportingPeriodMillis( + long windmillHarnessUpdateReportingPeriodMillis); + + Builder setPerWorkerMetricsUpdateReportingPeriodMillis( + long perWorkerMetricsUpdateReportingPeriodMillis); + + StreamingWorkerStatusReporter build(); + } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index dadf02171235..6eeb7bd6bbfc 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -96,6 +96,7 @@ import org.apache.beam.runners.dataflow.util.CloudObjects; import org.apache.beam.runners.dataflow.util.PropertyNames; import org.apache.beam.runners.dataflow.util.Structs; +import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor; import org.apache.beam.runners.dataflow.worker.streaming.ComputationState; import org.apache.beam.runners.dataflow.worker.streaming.ComputationStateCache; import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork; @@ -104,6 +105,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.Work; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandleImpl; +import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingCounters; import org.apache.beam.runners.dataflow.worker.testing.RestoreDataflowLoggingMDC; import org.apache.beam.runners.dataflow.worker.testing.TestCountingSource; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; @@ -129,6 +131,9 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WatermarkHold; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest; import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.FakeGetDataClient; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory; +import org.apache.beam.runners.dataflow.worker.windmill.testing.FakeWindmillStubFactory; +import org.apache.beam.runners.dataflow.worker.windmill.testing.FakeWindmillStubFactoryFactory; import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; @@ -178,6 +183,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheStats; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.UnsignedLong; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -285,6 +291,7 @@ public Long get() { private final FakeWindmillServer server = new FakeWindmillServer( errorCollector, computationId -> computationStateCache.get(computationId)); + private StreamingCounters streamingCounters; public StreamingDataflowWorkerTest(Boolean streamingEngine) { this.streamingEngine = streamingEngine; @@ -304,9 +311,20 @@ private static CounterUpdate getCounter(Iterable counters, String return null; } + private Iterable buildCounters() { + return Iterables.concat( + streamingCounters + .pendingDeltaCounters() + .extractModifiedDeltaUpdates(DataflowCounterUpdateExtractor.INSTANCE), + streamingCounters + .pendingCumulativeCounters() + .extractUpdates(false, DataflowCounterUpdateExtractor.INSTANCE)); + } + @Before public void setUp() { server.clearCommitsReceived(); + streamingCounters = StreamingCounters.create(); } @After @@ -856,7 +874,13 @@ private StreamingDataflowWorker makeWorker( streamingDataflowWorkerTestParams.clock(), streamingDataflowWorkerTestParams.executorSupplier(), mockGlobalConfigHandle, - streamingDataflowWorkerTestParams.localRetryTimeoutMs()); + streamingDataflowWorkerTestParams.localRetryTimeoutMs(), + streamingCounters, + new FakeWindmillStubFactoryFactory( + new FakeWindmillStubFactory( + () -> + WindmillChannelFactory.inProcessChannel( + "StreamingDataflowWorkerTestChannel")))); this.computationStateCache = worker.getComputationStateCache(); return worker; } @@ -1715,7 +1739,7 @@ public void testMergeWindows() throws Exception { intervalWindowBytes(WINDOW_AT_ZERO))); Map result = server.waitForAndGetCommits(1); - Iterable counters = worker.buildCounters(); + Iterable counters = buildCounters(); // These tags and data are opaque strings and this is a change detector test. // The "/u" indicates the user's namespace, versus "/s" for system namespace @@ -1836,7 +1860,7 @@ public void testMergeWindows() throws Exception { expectedBytesRead += dataBuilder.build().getSerializedSize(); result = server.waitForAndGetCommits(1); - counters = worker.buildCounters(); + counters = buildCounters(); actualOutput = result.get(2L); assertEquals(1, actualOutput.getOutputMessagesCount()); @@ -2004,7 +2028,7 @@ public void testMergeWindowsCaching() throws Exception { intervalWindowBytes(WINDOW_AT_ZERO))); Map result = server.waitForAndGetCommits(1); - Iterable counters = worker.buildCounters(); + Iterable counters = buildCounters(); // These tags and data are opaque strings and this is a change detector test. // The "/u" indicates the user's namespace, versus "/s" for system namespace @@ -2125,7 +2149,7 @@ public void testMergeWindowsCaching() throws Exception { expectedBytesRead += dataBuilder.build().getSerializedSize(); result = server.waitForAndGetCommits(1); - counters = worker.buildCounters(); + counters = buildCounters(); actualOutput = result.get(2L); assertEquals(1, actualOutput.getOutputMessagesCount()); @@ -2430,7 +2454,7 @@ public void testUnboundedSources() throws Exception { null)); Map result = server.waitForAndGetCommits(1); - Iterable counters = worker.buildCounters(); + Iterable counters = buildCounters(); Windmill.WorkItemCommitRequest commit = result.get(1L); UnsignedLong finalizeId = UnsignedLong.fromLongBits(commit.getSourceStateUpdates().getFinalizeIds(0)); @@ -2492,7 +2516,7 @@ public void testUnboundedSources() throws Exception { null)); result = server.waitForAndGetCommits(1); - counters = worker.buildCounters(); + counters = buildCounters(); commit = result.get(2L); finalizeId = UnsignedLong.fromLongBits(commit.getSourceStateUpdates().getFinalizeIds(0)); @@ -2540,7 +2564,7 @@ public void testUnboundedSources() throws Exception { null)); result = server.waitForAndGetCommits(1); - counters = worker.buildCounters(); + counters = buildCounters(); commit = result.get(3L); finalizeId = UnsignedLong.fromLongBits(commit.getSourceStateUpdates().getFinalizeIds(0)); @@ -2710,7 +2734,7 @@ public void testUnboundedSourceWorkRetry() throws Exception { server.whenGetWorkCalled().thenReturn(work); Map result = server.waitForAndGetCommits(1); - Iterable counters = worker.buildCounters(); + Iterable counters = buildCounters(); Windmill.WorkItemCommitRequest commit = result.get(1L); UnsignedLong finalizeId = UnsignedLong.fromLongBits(commit.getSourceStateUpdates().getFinalizeIds(0)); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporterTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporterTest.java index 7e65a495638f..8726b1242f9b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporterTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporterTest.java @@ -59,18 +59,19 @@ public void setUp() { @Test public void testOverrideMaximumThreadCount() throws Exception { StreamingWorkerStatusReporter reporter = - StreamingWorkerStatusReporter.forTesting( - true, - mockWorkUnitClient, - () -> DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME, - () -> Collections.emptyList(), - mockFailureTracker, - StreamingCounters.create(), - mockMemoryMonitor, - mockExecutor, - (threadName) -> Executors.newSingleThreadScheduledExecutor(), - DEFAULT_HARNESS_REPORTING_PERIOD, - DEFAULT_PER_WORKER_METRICS_PERIOD); + StreamingWorkerStatusReporter.builder() + .setPublishCounters(true) + .setDataflowServiceClient(mockWorkUnitClient) + .setWindmillQuotaThrottleTime(() -> DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME) + .setAllStageInfo(Collections::emptyList) + .setFailureTracker(mockFailureTracker) + .setStreamingCounters(StreamingCounters.create()) + .setMemoryMonitor(mockMemoryMonitor) + .setWorkExecutor(mockExecutor) + .setExecutorFactory((threadName) -> Executors.newSingleThreadScheduledExecutor()) + .setWindmillHarnessUpdateReportingPeriodMillis(DEFAULT_HARNESS_REPORTING_PERIOD) + .setPerWorkerMetricsUpdateReportingPeriodMillis(DEFAULT_PER_WORKER_METRICS_PERIOD) + .build(); StreamingScalingReportResponse streamingScalingReportResponse = new StreamingScalingReportResponse().setMaximumThreadCount(10); WorkerMessageResponse workerMessageResponse = @@ -85,21 +86,21 @@ public void testOverrideMaximumThreadCount() throws Exception { @Test public void testHandleEmptyWorkerMessageResponse() throws Exception { StreamingWorkerStatusReporter reporter = - StreamingWorkerStatusReporter.forTesting( - true, - mockWorkUnitClient, - () -> DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME, - () -> Collections.emptyList(), - mockFailureTracker, - StreamingCounters.create(), - mockMemoryMonitor, - mockExecutor, - (threadName) -> Executors.newSingleThreadScheduledExecutor(), - DEFAULT_HARNESS_REPORTING_PERIOD, - DEFAULT_PER_WORKER_METRICS_PERIOD); - WorkerMessageResponse workerMessageResponse = new WorkerMessageResponse(); + StreamingWorkerStatusReporter.builder() + .setPublishCounters(true) + .setDataflowServiceClient(mockWorkUnitClient) + .setWindmillQuotaThrottleTime(() -> DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME) + .setAllStageInfo(Collections::emptyList) + .setFailureTracker(mockFailureTracker) + .setStreamingCounters(StreamingCounters.create()) + .setMemoryMonitor(mockMemoryMonitor) + .setWorkExecutor(mockExecutor) + .setExecutorFactory((threadName) -> Executors.newSingleThreadScheduledExecutor()) + .setWindmillHarnessUpdateReportingPeriodMillis(DEFAULT_HARNESS_REPORTING_PERIOD) + .setPerWorkerMetricsUpdateReportingPeriodMillis(DEFAULT_PER_WORKER_METRICS_PERIOD) + .build(); when(mockWorkUnitClient.reportWorkerMessage(any())) - .thenReturn(Collections.singletonList(workerMessageResponse)); + .thenReturn(Collections.singletonList(new WorkerMessageResponse())); reporter.reportPeriodicWorkerMessage(); verify(mockExecutor, Mockito.times(0)).setMaximumPoolSize(anyInt(), anyInt()); }