From 1c895f5e5f1e1a3c45c8fc9c751070934e63d3f3 Mon Sep 17 00:00:00 2001 From: Martin Trieu Date: Tue, 19 Nov 2024 14:00:22 -0800 Subject: [PATCH] address PR comments --- .../worker/StreamingDataflowWorker.java | 109 +++++++++++------- .../harness/StreamingWorkerStatusPages.java | 2 +- .../refresh/StreamPoolHeartbeatSender.java | 4 +- .../StreamingWorkerStatusReporterTest.java | 52 ++++----- .../StreamPoolHeartbeatSenderTest.java | 6 +- 5 files changed, 95 insertions(+), 78 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 0ad9728b3063..f26b1aa74852 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -63,6 +63,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerStatusReporter; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor; +import org.apache.beam.runners.dataflow.worker.windmill.ApplianceWindmillClient; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader; import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub; @@ -102,6 +103,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.ApplianceHeartbeatSender; import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender; import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.StreamPoolHeartbeatSender; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.fn.IdGenerator; import org.apache.beam.sdk.fn.IdGenerators; import org.apache.beam.sdk.fn.JvmInitializers; @@ -121,10 +123,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Implements a Streaming Dataflow worker. */ +/** + * For internal use only. + * + *

Implements a Streaming Dataflow worker. + */ @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) +@Internal public final class StreamingDataflowWorker { /** @@ -135,8 +142,6 @@ public final class StreamingDataflowWorker { */ public static final int MAX_SINK_BYTES = 10_000_000; - public static final String STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL = - "streaming_engine_use_job_settings_for_heartbeat_pool"; private static final Logger LOG = LoggerFactory.getLogger(StreamingDataflowWorker.class); /** @@ -161,6 +166,8 @@ public final class StreamingDataflowWorker { private static final String CHANNELZ_PATH = "/channelz"; private static final String BEAM_FN_API_EXPERIMENT = "beam_fn_api"; private static final String ENABLE_IPV6_EXPERIMENT = "enable_private_ipv6_google_access"; + private static final String STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL_EXPERIMENT = + "streaming_engine_use_job_settings_for_heartbeat_pool"; private final WindmillStateCache stateCache; private final StreamingWorkerStatusPages statusPages; @@ -260,7 +267,7 @@ private StreamingDataflowWorker( processingContext, getWorkStreamLatencies); }), - createStubFactory(options), + createFanOutStubFactory(options), GetWorkBudgetDistributors.distributeEvenly(), Preconditions.checkNotNull(dispatcherClient), commitWorkStream -> @@ -278,10 +285,11 @@ private StreamingDataflowWorker( currentActiveCommitBytesProvider = fanOutStreamingEngineWorkerHarness::currentActiveCommitBytes; channelzServlet = - createChannelZServlet( + createChannelzServlet( options, fanOutStreamingEngineWorkerHarness::currentWindmillEndpoints); this.streamingWorkerHarness = fanOutStreamingEngineWorkerHarness; } else { + // Non-direct path pipelines. Windmill.GetWorkRequest request = Windmill.GetWorkRequest.newBuilder() .setClientId(clientId) @@ -303,7 +311,7 @@ private StreamingDataflowWorker( createStreamingEngineHeartbeatSender( options, windmillServer, getDataStreamPool, configFetcher.getGlobalConfigHandle()); channelzServlet = - createChannelZServlet(options, windmillServer::getWindmillServiceEndpoints); + createChannelzServlet(options, windmillServer::getWindmillServiceEndpoints); workCommitter = StreamingEngineWorkCommitter.builder() .setCommitWorkStreamFactory( @@ -398,7 +406,7 @@ private static StreamingWorkerStatusPages.Builder createStatusPageBuilder( : streamingStatusPages; } - private static ChannelzServlet createChannelZServlet( + private static ChannelzServlet createChannelzServlet( DataflowWorkerHarnessOptions options, Supplier> windmillEndpointProvider) { return new ChannelzServlet(CHANNELZ_PATH, options, windmillEndpointProvider); @@ -410,15 +418,16 @@ private static HeartbeatSender createStreamingEngineHeartbeatSender( WindmillStreamPool getDataStreamPool, StreamingGlobalConfigHandle globalConfigHandle) { // Experiment gates the logic till backend changes are rollback safe - if (!DataflowRunner.hasExperiment(options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL) + if (!DataflowRunner.hasExperiment( + options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL_EXPERIMENT) || options.getUseSeparateWindmillHeartbeatStreams() != null) { - return StreamPoolHeartbeatSender.Create( + return StreamPoolHeartbeatSender.create( Boolean.TRUE.equals(options.getUseSeparateWindmillHeartbeatStreams()) ? WindmillStreamPool.create(1, GET_DATA_STREAM_TIMEOUT, windmillClient::getDataStream) : getDataStreamPool); } else { - return StreamPoolHeartbeatSender.Create( + return StreamPoolHeartbeatSender.create( WindmillStreamPool.create(1, GET_DATA_STREAM_TIMEOUT, windmillClient::getDataStream), getDataStreamPool, globalConfigHandle); @@ -526,60 +535,72 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o WorkUnitClient dataflowServiceClient, GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder, Function computationStateCacheFactory) { - ComputationConfig.Fetcher configFetcher; - WindmillServerStub windmillServer; - ComputationStateCache computationStateCache; - GrpcWindmillStreamFactory windmillStreamFactory; - ConfigFetcherComputationStateCacheAndWindmillClient.Builder builder = - ConfigFetcherComputationStateCacheAndWindmillClient.builder(); if (options.isEnableStreamingEngine()) { GrpcDispatcherClient dispatcherClient = GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options)); - configFetcher = + ComputationConfig.Fetcher configFetcher = StreamingEngineComputationConfigFetcher.create( options.getGlobalConfigRefreshPeriod().getMillis(), dataflowServiceClient); configFetcher.getGlobalConfigHandle().registerConfigObserver(dispatcherClient::onJobConfig); - computationStateCache = computationStateCacheFactory.apply(configFetcher); - windmillStreamFactory = + ComputationStateCache computationStateCache = + computationStateCacheFactory.apply(configFetcher); + GrpcWindmillStreamFactory windmillStreamFactory = windmillStreamFactoryBuilder .setProcessHeartbeatResponses( new WorkHeartbeatResponseProcessor(computationStateCache::get)) .setHealthCheckIntervalMillis( options.getWindmillServiceStreamingRpcHealthCheckPeriodMs()) .build(); - windmillServer = GrpcWindmillServer.create(options, windmillStreamFactory, dispatcherClient); - builder.setWindmillDispatcherClient(dispatcherClient); + return ConfigFetcherComputationStateCacheAndWindmillClient.builder() + .setWindmillDispatcherClient(dispatcherClient) + .setConfigFetcher(configFetcher) + .setComputationStateCache(computationStateCache) + .setWindmillStreamFactory(windmillStreamFactory) + .setWindmillServer( + GrpcWindmillServer.create(options, windmillStreamFactory, dispatcherClient)) + .build(); } else { + // Build with local Windmill client. if (options.getWindmillServiceEndpoint() != null || options.getLocalWindmillHostport().startsWith("grpc:")) { GrpcDispatcherClient dispatcherClient = GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options)); - windmillStreamFactory = + GrpcWindmillStreamFactory windmillStreamFactory = windmillStreamFactoryBuilder .setHealthCheckIntervalMillis( options.getWindmillServiceStreamingRpcHealthCheckPeriodMs()) .build(); - windmillServer = + GrpcWindmillServer windmillServer = GrpcWindmillServer.create(options, windmillStreamFactory, dispatcherClient); - builder.setWindmillDispatcherClient(dispatcherClient); - } else { - windmillStreamFactory = windmillStreamFactoryBuilder.build(); - windmillServer = new JniWindmillApplianceServer(options.getLocalWindmillHostport()); + ComputationConfig.Fetcher configFetcher = + createApplianceComputationConfigFetcher(windmillServer); + return ConfigFetcherComputationStateCacheAndWindmillClient.builder() + .setWindmillDispatcherClient(dispatcherClient) + .setWindmillServer(windmillServer) + .setWindmillStreamFactory(windmillStreamFactory) + .setConfigFetcher(configFetcher) + .setComputationStateCache(computationStateCacheFactory.apply(configFetcher)) + .build(); } - configFetcher = - new StreamingApplianceComputationConfigFetcher( - windmillServer::getConfig, - new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build())); - computationStateCache = computationStateCacheFactory.apply(configFetcher); + WindmillServerStub windmillServer = + new JniWindmillApplianceServer(options.getLocalWindmillHostport()); + ComputationConfig.Fetcher configFetcher = + createApplianceComputationConfigFetcher(windmillServer); + return ConfigFetcherComputationStateCacheAndWindmillClient.builder() + .setWindmillStreamFactory(windmillStreamFactoryBuilder.build()) + .setWindmillServer(windmillServer) + .setConfigFetcher(configFetcher) + .setComputationStateCache(computationStateCacheFactory.apply(configFetcher)) + .build(); } + } - return builder - .setConfigFetcher(configFetcher) - .setComputationStateCache(computationStateCache) - .setWindmillServer(windmillServer) - .setWindmillStreamFactory(windmillStreamFactory) - .build(); + private static StreamingApplianceComputationConfigFetcher createApplianceComputationConfigFetcher( + ApplianceWindmillClient windmillClient) { + return new StreamingApplianceComputationConfigFetcher( + windmillClient::getConfig, + new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build())); } private static boolean isDirectPathPipeline(DataflowWorkerHarnessOptions options) { @@ -588,13 +609,19 @@ private static boolean isDirectPathPipeline(DataflowWorkerHarnessOptions options Optional.ofNullable(options.getDataflowServiceOptions()) .map(serviceOptions -> serviceOptions.contains(ENABLE_IPV6_EXPERIMENT)) .orElse(false); + if (isIpV6Enabled) { return true; } + LOG.warn( - "DirectPath is currently only supported with IPv6 networking stack. Defaulting to" - + " CloudPath."); + "DirectPath is currently only supported with IPv6 networking stack. This requires setting " + + "\"enable_private_ipv6_google_access\" in experimental pipeline options. " + + "For information on how to set experimental pipeline options see " + + "https://cloud.google.com/dataflow/docs/guides/setting-pipeline-options#experimental. " + + "Defaulting to CloudPath."); } + return false; } @@ -610,7 +637,7 @@ private static void validateWorkerOptions(DataflowWorkerHarnessOptions options) StreamingDataflowWorker.class.getSimpleName()); } - private static ChannelCachingStubFactory createStubFactory( + private static ChannelCachingStubFactory createFanOutStubFactory( DataflowWorkerHarnessOptions workerOptions) { return ChannelCachingRemoteStubFactory.create( workerOptions.getGcpCredential(), diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java index 6981312eff1d..ddfc6809231a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java @@ -258,7 +258,7 @@ public interface Builder { Builder setDebugCapture(DebugCapture.Manager debugCapture); - Builder setChannelzServlet(ChannelzServlet channelzServlet); + Builder setChannelzServlet(@Nullable ChannelzServlet channelzServlet); Builder setStateCache(WindmillStateCache stateCache); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java index fa36b11ffe55..f54091dc2b95 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java @@ -42,7 +42,7 @@ private StreamPoolHeartbeatSender( this.heartbeatStreamPool.set(heartbeatStreamPool); } - public static StreamPoolHeartbeatSender Create( + public static StreamPoolHeartbeatSender create( @Nonnull WindmillStreamPool heartbeatStreamPool) { return new StreamPoolHeartbeatSender(heartbeatStreamPool); } @@ -55,7 +55,7 @@ public static StreamPoolHeartbeatSender Create( * enabled. * @param getDataPool stream to use when using separate streams for heartbeat is disabled. */ - public static StreamPoolHeartbeatSender Create( + public static StreamPoolHeartbeatSender create( @Nonnull WindmillStreamPool dedicatedHeartbeatPool, @Nonnull WindmillStreamPool getDataPool, @Nonnull StreamingGlobalConfigHandle configHandle) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporterTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporterTest.java index 8726b1242f9b..f348e4cf1bdb 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporterTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporterTest.java @@ -39,14 +39,15 @@ @RunWith(JUnit4.class) public class StreamingWorkerStatusReporterTest { - private final long DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME = 1000; - private final long DEFAULT_HARNESS_REPORTING_PERIOD = 10000; - private final long DEFAULT_PER_WORKER_METRICS_PERIOD = 30000; + private static final long DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME = 1000; + private static final long DEFAULT_HARNESS_REPORTING_PERIOD = 10000; + private static final long DEFAULT_PER_WORKER_METRICS_PERIOD = 30000; private BoundedQueueExecutor mockExecutor; private WorkUnitClient mockWorkUnitClient; private FailureTracker mockFailureTracker; private MemoryMonitor mockMemoryMonitor; + private StreamingWorkerStatusReporter reporter; @Before public void setUp() { @@ -54,24 +55,11 @@ public void setUp() { this.mockWorkUnitClient = mock(WorkUnitClient.class); this.mockFailureTracker = mock(FailureTracker.class); this.mockMemoryMonitor = mock(MemoryMonitor.class); + this.reporter = buildWorkerStatusReporterForTest(); } @Test public void testOverrideMaximumThreadCount() throws Exception { - StreamingWorkerStatusReporter reporter = - StreamingWorkerStatusReporter.builder() - .setPublishCounters(true) - .setDataflowServiceClient(mockWorkUnitClient) - .setWindmillQuotaThrottleTime(() -> DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME) - .setAllStageInfo(Collections::emptyList) - .setFailureTracker(mockFailureTracker) - .setStreamingCounters(StreamingCounters.create()) - .setMemoryMonitor(mockMemoryMonitor) - .setWorkExecutor(mockExecutor) - .setExecutorFactory((threadName) -> Executors.newSingleThreadScheduledExecutor()) - .setWindmillHarnessUpdateReportingPeriodMillis(DEFAULT_HARNESS_REPORTING_PERIOD) - .setPerWorkerMetricsUpdateReportingPeriodMillis(DEFAULT_PER_WORKER_METRICS_PERIOD) - .build(); StreamingScalingReportResponse streamingScalingReportResponse = new StreamingScalingReportResponse().setMaximumThreadCount(10); WorkerMessageResponse workerMessageResponse = @@ -85,23 +73,25 @@ public void testOverrideMaximumThreadCount() throws Exception { @Test public void testHandleEmptyWorkerMessageResponse() throws Exception { - StreamingWorkerStatusReporter reporter = - StreamingWorkerStatusReporter.builder() - .setPublishCounters(true) - .setDataflowServiceClient(mockWorkUnitClient) - .setWindmillQuotaThrottleTime(() -> DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME) - .setAllStageInfo(Collections::emptyList) - .setFailureTracker(mockFailureTracker) - .setStreamingCounters(StreamingCounters.create()) - .setMemoryMonitor(mockMemoryMonitor) - .setWorkExecutor(mockExecutor) - .setExecutorFactory((threadName) -> Executors.newSingleThreadScheduledExecutor()) - .setWindmillHarnessUpdateReportingPeriodMillis(DEFAULT_HARNESS_REPORTING_PERIOD) - .setPerWorkerMetricsUpdateReportingPeriodMillis(DEFAULT_PER_WORKER_METRICS_PERIOD) - .build(); when(mockWorkUnitClient.reportWorkerMessage(any())) .thenReturn(Collections.singletonList(new WorkerMessageResponse())); reporter.reportPeriodicWorkerMessage(); verify(mockExecutor, Mockito.times(0)).setMaximumPoolSize(anyInt(), anyInt()); } + + private StreamingWorkerStatusReporter buildWorkerStatusReporterForTest() { + return StreamingWorkerStatusReporter.builder() + .setPublishCounters(true) + .setDataflowServiceClient(mockWorkUnitClient) + .setWindmillQuotaThrottleTime(() -> DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME) + .setAllStageInfo(Collections::emptyList) + .setFailureTracker(mockFailureTracker) + .setStreamingCounters(StreamingCounters.create()) + .setMemoryMonitor(mockMemoryMonitor) + .setWorkExecutor(mockExecutor) + .setExecutorFactory((threadName) -> Executors.newSingleThreadScheduledExecutor()) + .setWindmillHarnessUpdateReportingPeriodMillis(DEFAULT_HARNESS_REPORTING_PERIOD) + .setPerWorkerMetricsUpdateReportingPeriodMillis(DEFAULT_PER_WORKER_METRICS_PERIOD) + .build(); + } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSenderTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSenderTest.java index ed915088d0a6..acbb3aebbcf5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSenderTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSenderTest.java @@ -39,7 +39,7 @@ public class StreamPoolHeartbeatSenderTest { public void sendsHeartbeatsOnStream() { FakeWindmillServer server = new FakeWindmillServer(new ErrorCollector(), c -> Optional.empty()); StreamPoolHeartbeatSender heartbeatSender = - StreamPoolHeartbeatSender.Create( + StreamPoolHeartbeatSender.create( WindmillStreamPool.create(1, Duration.standardSeconds(10), server::getDataStream)); Heartbeats.Builder heartbeatsBuilder = Heartbeats.builder(); heartbeatsBuilder @@ -59,7 +59,7 @@ public void sendsHeartbeatsOnDedicatedStream() { FakeGlobalConfigHandle configHandle = new FakeGlobalConfigHandle(getGlobalConfig(/*useSeparateHeartbeatStreams=*/ true)); StreamPoolHeartbeatSender heartbeatSender = - StreamPoolHeartbeatSender.Create( + StreamPoolHeartbeatSender.create( WindmillStreamPool.create( 1, Duration.standardSeconds(10), dedicatedServer::getDataStream), WindmillStreamPool.create( @@ -104,7 +104,7 @@ public void sendsHeartbeatsOnGetDataStream() { FakeGlobalConfigHandle configHandle = new FakeGlobalConfigHandle(getGlobalConfig(/*useSeparateHeartbeatStreams=*/ false)); StreamPoolHeartbeatSender heartbeatSender = - StreamPoolHeartbeatSender.Create( + StreamPoolHeartbeatSender.create( WindmillStreamPool.create( 1, Duration.standardSeconds(10), dedicatedServer::getDataStream), WindmillStreamPool.create(