From a8bb92f2206f65e1cca7ab8334f3a576364a53c6 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Mon, 9 Sep 2024 12:41:49 -0700 Subject: [PATCH] fix class hierarchy --- .../worker/StreamingDataflowWorker.java | 50 +++++++---------- .../worker/StreamingModeExecutionContext.java | 6 +- .../streaming/config/ComputationConfig.java | 3 +- ...Impl.java => FixedGlobalConfigHandle.java} | 14 +++-- ...mingApplianceComputationConfigFetcher.java | 10 +++- ...reamingEngineComputationConfigFetcher.java | 41 +++++++------- ...Config.java => StreamingGlobalConfig.java} | 8 +-- ....java => StreamingGlobalConfigHandle.java} | 6 +- ...a => StreamingGlobalConfigHandleImpl.java} | 19 ++++--- .../harness/StreamingWorkerStatusPages.java | 12 ++-- .../client/grpc/GrpcDispatcherClient.java | 4 +- .../ComputationWorkExecutorFactory.java | 6 +- .../processing/StreamingWorkScheduler.java | 8 +-- .../worker/StreamingDataflowWorkerTest.java | 27 +++++---- .../StreamingModeExecutionContextTest.java | 10 ++-- .../worker/WorkerCustomSourcesTest.java | 14 ++--- ....java => FixedGlobalConfigHandleTest.java} | 40 +++++++++++-- ...ApplianceComputationConfigFetcherTest.java | 4 +- ...ingEngineComputationConfigFetcherTest.java | 53 ++++++++---------- ... StreamingGlobalConfigHandleImplTest.java} | 56 +++++++++---------- 20 files changed, 211 insertions(+), 180 deletions(-) rename runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/{FixedPipelineConfigManagerImpl.java => FixedGlobalConfigHandle.java} (77%) rename runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/{StreamingEnginePipelineConfig.java => StreamingGlobalConfig.java} (89%) rename runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/{StreamingEnginePipelineConfigManager.java => StreamingGlobalConfigHandle.java} (88%) rename runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/{StreamingEnginePipelineConfigManagerImpl.java => StreamingGlobalConfigHandleImpl.java} (79%) rename runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/{FixedPipelineConfigManagerImplTest.java => FixedGlobalConfigHandleTest.java} (56%) rename runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/{StreamingEnginePipelineConfigManagerImplTest.java => StreamingGlobalConfigHandleImplTest.java} (75%) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index aceedb95665d..092e43bf3a6f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -47,12 +47,11 @@ import org.apache.beam.runners.dataflow.worker.streaming.StageInfo; import org.apache.beam.runners.dataflow.worker.streaming.WorkHeartbeatResponseProcessor; import org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig; -import org.apache.beam.runners.dataflow.worker.streaming.config.FixedPipelineConfigManagerImpl; +import org.apache.beam.runners.dataflow.worker.streaming.config.FixedGlobalConfigHandle; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingApplianceComputationConfigFetcher; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEngineComputationConfigFetcher; -import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEnginePipelineConfig; -import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEnginePipelineConfigManager; -import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEnginePipelineConfigManagerImpl; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandleImpl; import org.apache.beam.runners.dataflow.worker.streaming.harness.SingleSourceWorkerHarness; import org.apache.beam.runners.dataflow.worker.streaming.harness.SingleSourceWorkerHarness.GetWorkSender; import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingCounters; @@ -180,7 +179,6 @@ private StreamingDataflowWorker( WorkFailureProcessor workFailureProcessor, StreamingCounters streamingCounters, MemoryMonitor memoryMonitor, - StreamingEnginePipelineConfigManager configManager, GrpcWindmillStreamFactory windmillStreamFactory, Function executorSupplier, ConcurrentMap stageInfoMap) { @@ -237,7 +235,7 @@ private StreamingDataflowWorker( hotKeyLogger, sampler, ID_GENERATOR, - configManager, + configFetcher.getGlobalConfigHandle(), stageInfoMap); ThrottlingGetDataMetricTracker getDataMetricTracker = @@ -297,7 +295,7 @@ private StreamingDataflowWorker( .setCurrentActiveCommitBytes(workCommitter::currentActiveCommitBytes) .setGetDataStatusProvider(getDataClient::printHtml) .setWorkUnitExecutor(workUnitExecutor) - .setConfigManager(configManager) + .setglobalConfigHandle(configFetcher.getGlobalConfigHandle()) .build(); Windmill.GetWorkRequest request = @@ -346,12 +344,6 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o new ThreadFactoryBuilder().setNameFormat(threadName).build()); GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder = createGrpcwindmillStreamFactoryBuilder(options, clientId); - StreamingEnginePipelineConfigManager configManager = - options.isEnableStreamingEngine() - ? new StreamingEnginePipelineConfigManagerImpl() - : new FixedPipelineConfigManagerImpl( - StreamingEnginePipelineConfig.builder() - .build()); // appliance is initialized with default settings ConfigFetcherComputationStateCacheAndWindmillClient configFetcherComputationStateCacheAndWindmillClient = @@ -359,7 +351,6 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o options, dataflowServiceClient, windmillStreamFactoryBuilder, - configManager, configFetcher -> ComputationStateCache.create( configFetcher, @@ -416,7 +407,6 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o workFailureProcessor, streamingCounters, memoryMonitor, - configManager, configFetcherComputationStateCacheAndWindmillClient.windmillStreamFactory(), executorSupplier, stageInfo); @@ -433,7 +423,6 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o DataflowWorkerHarnessOptions options, WorkUnitClient dataflowServiceClient, GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder, - StreamingEnginePipelineConfigManager configManager, Function computationStateCacheFactory) { ComputationConfig.Fetcher configFetcher; WindmillServerStub windmillServer; @@ -442,12 +431,10 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o if (options.isEnableStreamingEngine()) { GrpcDispatcherClient dispatcherClient = GrpcDispatcherClient.create(createStubFactory(options)); - configManager.onConfig(dispatcherClient::onJobConfig); configFetcher = StreamingEngineComputationConfigFetcher.create( - options.getGlobalConfigRefreshPeriod().getMillis(), - dataflowServiceClient, - configManager); + options.getGlobalConfigRefreshPeriod().getMillis(), dataflowServiceClient); + configFetcher.getGlobalConfigHandle().onConfig(dispatcherClient::onJobConfig); computationStateCache = computationStateCacheFactory.apply(configFetcher); windmillStreamFactory = windmillStreamFactoryBuilder @@ -475,7 +462,10 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o windmillServer = new JniWindmillApplianceServer(options.getLocalWindmillHostport()); } - configFetcher = new StreamingApplianceComputationConfigFetcher(windmillServer::getConfig); + configFetcher = + new StreamingApplianceComputationConfigFetcher( + windmillServer::getConfig, + new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build())); computationStateCache = computationStateCacheFactory.apply(configFetcher); } @@ -495,7 +485,7 @@ static StreamingDataflowWorker forTesting( HotKeyLogger hotKeyLogger, Supplier clock, Function executorSupplier, - StreamingEnginePipelineConfigManager configManager, + StreamingGlobalConfigHandleImpl globalConfigHandle, int localRetryTimeoutMs) { ConcurrentMap stageInfo = new ConcurrentHashMap<>(); BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options); @@ -504,19 +494,20 @@ static StreamingDataflowWorker forTesting( .setSizeMb(options.getWorkerCacheMb()) .setSupportMapViaMultimap(options.isEnableStreamingEngine()) .build(); - StreamingEnginePipelineConfig config = configManager.getConfig(); - if (!config.windmillServiceEndpoints().isEmpty()) { - windmillServer.setWindmillServiceEndpoints(config.windmillServiceEndpoints()); - } ComputationConfig.Fetcher configFetcher = options.isEnableStreamingEngine() ? StreamingEngineComputationConfigFetcher.forTesting( /* hasReceivedGlobalConfig= */ true, options.getGlobalConfigRefreshPeriod().getMillis(), workUnitClient, - executorSupplier, - configManager) - : new StreamingApplianceComputationConfigFetcher(windmillServer::getConfig); + globalConfigHandle, + executorSupplier) + : new StreamingApplianceComputationConfigFetcher( + windmillServer::getConfig, globalConfigHandle); + StreamingGlobalConfig config = configFetcher.getGlobalConfigHandle().getConfig(); + if (!config.windmillServiceEndpoints().isEmpty()) { + windmillServer.setWindmillServiceEndpoints(config.windmillServiceEndpoints()); + } ConcurrentMap stateNameMap = new ConcurrentHashMap<>(prePopulatedStateNameMappings); ComputationStateCache computationStateCache = @@ -583,7 +574,6 @@ static StreamingDataflowWorker forTesting( workFailureProcessor, streamingCounters, memoryMonitor, - configManager, options.isEnableStreamingEngine() ? windmillStreamFactory .setHealthCheckIntervalMillis( diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index 35426e3b0ab0..85af41577358 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -50,7 +50,7 @@ import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope; import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.streaming.Work; -import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEnginePipelineConfigManager; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle; import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInput; import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputState; import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher; @@ -108,7 +108,7 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext stateNameMap; private final WindmillStateCache.ForComputation stateCache; private final ReaderCache readerCache; - private final StreamingEnginePipelineConfigManager configManager; + private final StreamingGlobalConfigHandle configManager; private final boolean throwExceptionOnLargeOutput; private volatile long backlogBytes; @@ -155,7 +155,7 @@ public StreamingModeExecutionContext( MetricsContainerRegistry metricsContainerRegistry, DataflowExecutionStateTracker executionStateTracker, StreamingModeExecutionStateRegistry executionStateRegistry, - StreamingEnginePipelineConfigManager configManager, + StreamingGlobalConfigHandle configManager, long sinkByteLimit, boolean throwExceptionOnLargeOutput) { super( diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/ComputationConfig.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/ComputationConfig.java index fb8bcf7edbfb..9702751aeb98 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/ComputationConfig.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/ComputationConfig.java @@ -48,12 +48,13 @@ public static ComputationConfig create( public abstract ImmutableMap stateNameMap(); /** Interface to fetch configurations for a specific computation. */ - @FunctionalInterface public interface Fetcher { default void start() {} default void stop() {} Optional fetchConfig(String computationId); + + StreamingGlobalConfigHandle getGlobalConfigHandle(); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedPipelineConfigManagerImpl.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedGlobalConfigHandle.java similarity index 77% rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedPipelineConfigManagerImpl.java rename to runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedGlobalConfigHandle.java index a2026c5d1632..e0485ae77b71 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedPipelineConfigManagerImpl.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedGlobalConfigHandle.java @@ -28,17 +28,21 @@ * StreamingEnginePipelineConfigManager returning a fixed config * initialized during construction. Used for Appliance and Tests. */ -public class FixedPipelineConfigManagerImpl implements StreamingEnginePipelineConfigManager { +public class FixedGlobalConfigHandle implements StreamingGlobalConfigHandle { - private final StreamingEnginePipelineConfig config; + private final StreamingGlobalConfig config; - public FixedPipelineConfigManagerImpl(StreamingEnginePipelineConfig config) { + public FixedGlobalConfigHandle(StreamingGlobalConfig config) { this.config = config; } - public StreamingEnginePipelineConfig getConfig() { + @Override + public StreamingGlobalConfig getConfig() { return config; } - public void onConfig(@Nonnull Consumer callback) {} + @Override + public void onConfig(@Nonnull Consumer callback) { + callback.accept(config); + } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceComputationConfigFetcher.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceComputationConfigFetcher.java index 786ded09498a..025e66be79c1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceComputationConfigFetcher.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceComputationConfigFetcher.java @@ -48,11 +48,14 @@ public final class StreamingApplianceComputationConfigFetcher implements Computa private final ApplianceComputationConfigFetcher applianceComputationConfigFetcher; private final ConcurrentHashMap systemNameToComputationIdMap; + private final StreamingGlobalConfigHandle globalConfigHandle; public StreamingApplianceComputationConfigFetcher( - ApplianceComputationConfigFetcher applianceComputationConfigFetcher) { + ApplianceComputationConfigFetcher applianceComputationConfigFetcher, + StreamingGlobalConfigHandle globalConfigHandle) { this.applianceComputationConfigFetcher = applianceComputationConfigFetcher; this.systemNameToComputationIdMap = new ConcurrentHashMap<>(); + this.globalConfigHandle = globalConfigHandle; } /** Returns a {@code Table} */ @@ -112,6 +115,11 @@ public Optional fetchConfig(String computationId) { .collect(toImmutableMap(NameMapEntry::getUserName, NameMapEntry::getSystemName))); } + @Override + public StreamingGlobalConfigHandle getGlobalConfigHandle() { + return globalConfigHandle; + } + private Optional createComputationConfig( String serializedMapTask, Table transformUserNameToStateFamilyByComputationId, diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java index d9377a258dc7..22b0dac6eb22 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java @@ -74,33 +74,31 @@ public final class StreamingEngineComputationConfigFetcher implements Computatio private final long globalConfigRefreshPeriodMillis; private final WorkUnitClient dataflowServiceClient; private final ScheduledExecutorService globalConfigRefresher; - private final StreamingEnginePipelineConfigManager configManager; + private final StreamingGlobalConfigHandleImpl globalConfigHandle; private final AtomicBoolean hasReceivedGlobalConfig; private StreamingEngineComputationConfigFetcher( boolean hasReceivedGlobalConfig, long globalConfigRefreshPeriodMillis, WorkUnitClient dataflowServiceClient, - ScheduledExecutorService globalConfigRefresher, - StreamingEnginePipelineConfigManager configManager) { + StreamingGlobalConfigHandleImpl globalConfigHandle, + ScheduledExecutorService globalConfigRefresher) { this.globalConfigRefreshPeriodMillis = globalConfigRefreshPeriodMillis; this.dataflowServiceClient = dataflowServiceClient; this.globalConfigRefresher = globalConfigRefresher; - this.configManager = configManager; + this.globalConfigHandle = globalConfigHandle; this.hasReceivedGlobalConfig = new AtomicBoolean(hasReceivedGlobalConfig); } public static StreamingEngineComputationConfigFetcher create( - long globalConfigRefreshPeriodMillis, - WorkUnitClient dataflowServiceClient, - StreamingEnginePipelineConfigManager configManager) { + long globalConfigRefreshPeriodMillis, WorkUnitClient dataflowServiceClient) { return new StreamingEngineComputationConfigFetcher( /* hasReceivedGlobalConfig= */ false, globalConfigRefreshPeriodMillis, dataflowServiceClient, + new StreamingGlobalConfigHandleImpl(), Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat(CONFIG_REFRESHER_THREAD_NAME).build()), - configManager); + new ThreadFactoryBuilder().setNameFormat(CONFIG_REFRESHER_THREAD_NAME).build())); } @VisibleForTesting @@ -108,14 +106,14 @@ public static StreamingEngineComputationConfigFetcher forTesting( boolean hasReceivedGlobalConfig, long globalConfigRefreshPeriodMillis, WorkUnitClient dataflowServiceClient, - Function executorSupplier, - StreamingEnginePipelineConfigManager configManager) { + StreamingGlobalConfigHandleImpl globalConfigHandle, + Function executorSupplier) { return new StreamingEngineComputationConfigFetcher( hasReceivedGlobalConfig, globalConfigRefreshPeriodMillis, dataflowServiceClient, - executorSupplier.apply(CONFIG_REFRESHER_THREAD_NAME), - configManager); + globalConfigHandle, + executorSupplier.apply(CONFIG_REFRESHER_THREAD_NAME)); } @VisibleForTesting @@ -159,8 +157,8 @@ private static Optional fetchConfigWithRetry( } } - private StreamingEnginePipelineConfig createPipelineConfig(StreamingConfigTask config) { - StreamingEnginePipelineConfig.Builder pipelineConfig = StreamingEnginePipelineConfig.builder(); + private StreamingGlobalConfig createPipelineConfig(StreamingConfigTask config) { + StreamingGlobalConfig.Builder pipelineConfig = StreamingGlobalConfig.builder(); OperationalLimits.Builder operationalLimits = OperationalLimits.builder(); if (config.getWindmillServiceEndpoint() != null @@ -246,6 +244,11 @@ public Optional fetchConfig(String computationId) { .flatMap(StreamingEngineComputationConfigFetcher::createComputationConfig); } + @Override + public StreamingGlobalConfigHandle getGlobalConfigHandle() { + return globalConfigHandle; + } + @Override public void stop() { // We have already shutdown or start has not been called. @@ -272,7 +275,7 @@ public void stop() { @SuppressWarnings("FutureReturnValueIgnored") private void schedulePeriodicGlobalConfigRequests() { globalConfigRefresher.scheduleWithFixedDelay( - () -> fetchGlobalConfig().ifPresent(configManager::setConfig), + () -> fetchGlobalConfig().ifPresent(globalConfigHandle::setConfig), 0, globalConfigRefreshPeriodMillis, TimeUnit.MILLISECONDS); @@ -285,9 +288,9 @@ private void schedulePeriodicGlobalConfigRequests() { private synchronized void fetchInitialPipelineGlobalConfig() { while (!hasReceivedGlobalConfig.get()) { LOG.info("Sending request to get initial global configuration for this worker."); - Optional globalConfig = fetchGlobalConfig(); + Optional globalConfig = fetchGlobalConfig(); if (globalConfig.isPresent()) { - configManager.setConfig(globalConfig.get()); + globalConfigHandle.setConfig(globalConfig.get()); hasReceivedGlobalConfig.set(true); break; } @@ -298,7 +301,7 @@ private synchronized void fetchInitialPipelineGlobalConfig() { LOG.info("Initial global configuration received, harness is now ready"); } - private Optional fetchGlobalConfig() { + private Optional fetchGlobalConfig() { return fetchConfigWithRetry(dataflowServiceClient::getGlobalStreamingConfigWorkItem) .map(config -> createPipelineConfig(config)); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfig.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfig.java similarity index 89% rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfig.java rename to runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfig.java index d64f8b4d10cc..8f76f5ec27af 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfig.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfig.java @@ -27,10 +27,10 @@ /** Global pipeline config for pipelines running in Streaming Engine mode. */ @AutoValue @Internal -public abstract class StreamingEnginePipelineConfig { +public abstract class StreamingGlobalConfig { - public static StreamingEnginePipelineConfig.Builder builder() { - return new AutoValue_StreamingEnginePipelineConfig.Builder() + public static StreamingGlobalConfig.Builder builder() { + return new AutoValue_StreamingGlobalConfig.Builder() .setWindmillServiceEndpoints(ImmutableSet.of()) .setUserWorkerJobSettings(UserWorkerRunnerV1Settings.newBuilder().build()) .setOperationalLimits(OperationalLimits.builder().build()); @@ -51,6 +51,6 @@ public abstract static class Builder { public abstract Builder setUserWorkerJobSettings(UserWorkerRunnerV1Settings settings); - public abstract StreamingEnginePipelineConfig build(); + public abstract StreamingGlobalConfig build(); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfigManager.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandle.java similarity index 88% rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfigManager.java rename to runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandle.java index 3ab842ac2c7a..646f3e212e31 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfigManager.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandle.java @@ -24,16 +24,16 @@ @Internal @ThreadSafe -public interface StreamingEnginePipelineConfigManager { +public interface StreamingGlobalConfigHandle { /* * Returns the latest StreamingEnginePipelineConfig */ - StreamingEnginePipelineConfig getConfig(); + StreamingGlobalConfig getConfig(); /* * Subscribe to config updates by registering a callback. * Callback should be called the first time with settings, if any, inline before the method returns. */ - void onConfig(@Nonnull Consumer callback); + void onConfig(@Nonnull Consumer callback); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfigManagerImpl.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImpl.java similarity index 79% rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfigManagerImpl.java rename to runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImpl.java index 1a8228e1c74e..7c9e4ab8f6dc 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfigManagerImpl.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImpl.java @@ -28,19 +28,19 @@ @Internal @ThreadSafe -public class StreamingEnginePipelineConfigManagerImpl - implements StreamingEnginePipelineConfigManager { +public class StreamingGlobalConfigHandleImpl implements StreamingGlobalConfigHandle { - private final AtomicReference streamingEngineConfig = + private final AtomicReference streamingEngineConfig = new AtomicReference<>(); - private final CopyOnWriteArrayList> config_callbacks = + private final CopyOnWriteArrayList> config_callbacks = new CopyOnWriteArrayList<>(); /* * Returns the latest StreamingEnginePipelineConfig */ - public StreamingEnginePipelineConfig getConfig() { + @Override + public StreamingGlobalConfig getConfig() { Preconditions.checkState( streamingEngineConfig.get() != null, "Global config should be set before any processing is done"); @@ -51,8 +51,9 @@ public StreamingEnginePipelineConfig getConfig() { * Subscribe to config updates by registering a callback. * Callback will be called the first time with settings, if any, inline before the method returns. */ - public void onConfig(@Nonnull Consumer callback) { - StreamingEnginePipelineConfig config; + @Override + public void onConfig(@Nonnull Consumer callback) { + StreamingGlobalConfig config; synchronized (this) { config_callbacks.add(callback); config = streamingEngineConfig.get(); @@ -65,8 +66,8 @@ public void onConfig(@Nonnull Consumer callback) /* * Package private setter for setting config */ - void setConfig(@Nonnull StreamingEnginePipelineConfig config) { - Iterator> iterator; + void setConfig(@Nonnull StreamingGlobalConfig config) { + Iterator> iterator; synchronized (this) { if (config.equals(streamingEngineConfig.get())) { return; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java index 5088ef334069..4654845c83da 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java @@ -39,8 +39,8 @@ import org.apache.beam.runners.dataflow.worker.status.LastExceptionDataProvider; import org.apache.beam.runners.dataflow.worker.status.WorkerStatusPages; import org.apache.beam.runners.dataflow.worker.streaming.ComputationStateCache; -import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEnginePipelineConfig; -import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEnginePipelineConfigManager; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.ChannelzServlet; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory; @@ -80,7 +80,7 @@ public final class StreamingWorkerStatusPages { private final DebugCapture.@Nullable Manager debugCapture; private final @Nullable ChannelzServlet channelzServlet; - private final AtomicReference seConfig = new AtomicReference<>(); + private final AtomicReference seConfig = new AtomicReference<>(); StreamingWorkerStatusPages( Supplier clock, @@ -96,7 +96,7 @@ public final class StreamingWorkerStatusPages { Consumer getDataStatusProvider, BoundedQueueExecutor workUnitExecutor, ScheduledExecutorService statusPageDumper, - StreamingEnginePipelineConfigManager configManager) { + StreamingGlobalConfigHandle globalConfigHandle) { this.clock = clock; this.clientId = clientId; this.isRunning = isRunning; @@ -110,7 +110,7 @@ public final class StreamingWorkerStatusPages { this.getDataStatusProvider = getDataStatusProvider; this.workUnitExecutor = workUnitExecutor; this.statusPageDumper = statusPageDumper; - configManager.onConfig(seConfig::set); + globalConfigHandle.onConfig(seConfig::set); } public static StreamingWorkerStatusPages.Builder builder() { @@ -273,7 +273,7 @@ public interface Builder { Builder setStatusPageDumper(ScheduledExecutorService statusPageDumper); - Builder setConfigManager(StreamingEnginePipelineConfigManager configManager); + Builder setglobalConfigHandle(StreamingGlobalConfigHandle globalConfigHandle); StreamingWorkerStatusPages build(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java index d34826a9b9a8..4a264b84db9c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java @@ -29,7 +29,7 @@ import java.util.concurrent.atomic.AtomicReference; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; -import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEnginePipelineConfig; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc; import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc.CloudWindmillMetadataServiceV1Alpha1Stub; import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc; @@ -148,7 +148,7 @@ public boolean hasInitializedEndpoints() { return dispatcherStubs.get().hasInitializedEndpoints(); } - public void onJobConfig(StreamingEnginePipelineConfig config) { + public void onJobConfig(StreamingGlobalConfig config) { if (config.windmillServiceEndpoints().isEmpty()) { return; } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java index cd08677573be..c1a761c8b41a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java @@ -47,7 +47,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.ComputationState; import org.apache.beam.runners.dataflow.worker.streaming.ComputationWorkExecutor; import org.apache.beam.runners.dataflow.worker.streaming.StageInfo; -import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEnginePipelineConfigManager; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle; import org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor; import org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter; import org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation; @@ -95,7 +95,7 @@ final class ComputationWorkExecutorFactory { private final long maxSinkBytes; private final IdGenerator idGenerator; - private final StreamingEnginePipelineConfigManager configManager; + private final StreamingGlobalConfigHandle configManager; private final boolean throwExceptionOnLargeOutput; ComputationWorkExecutorFactory( @@ -106,7 +106,7 @@ final class ComputationWorkExecutorFactory { DataflowExecutionStateSampler sampler, CounterSet pendingDeltaCounters, IdGenerator idGenerator, - StreamingEnginePipelineConfigManager configManager) { + StreamingGlobalConfigHandle configManager) { this.options = options; this.mapTaskExecutorFactory = mapTaskExecutorFactory; this.readerCache = readerCache; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java index 6df49bba9b30..3c968f71bf44 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java @@ -42,7 +42,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.StageInfo; import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.streaming.Work; -import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEnginePipelineConfigManager; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle; import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingCounters; import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher; import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcherFactory; @@ -84,7 +84,7 @@ public final class StreamingWorkScheduler { private final HotKeyLogger hotKeyLogger; private final ConcurrentMap stageInfoMap; private final DataflowExecutionStateSampler sampler; - private final StreamingEnginePipelineConfigManager configManager; + private final StreamingGlobalConfigHandle configManager; public StreamingWorkScheduler( DataflowWorkerHarnessOptions options, @@ -98,7 +98,7 @@ public StreamingWorkScheduler( HotKeyLogger hotKeyLogger, ConcurrentMap stageInfoMap, DataflowExecutionStateSampler sampler, - StreamingEnginePipelineConfigManager configManager) { + StreamingGlobalConfigHandle configManager) { this.options = options; this.clock = clock; this.computationWorkExecutorFactory = computationWorkExecutorFactory; @@ -126,7 +126,7 @@ public static StreamingWorkScheduler create( HotKeyLogger hotKeyLogger, DataflowExecutionStateSampler sampler, IdGenerator idGenerator, - StreamingEnginePipelineConfigManager configManager, + StreamingGlobalConfigHandle configManager, ConcurrentMap stageInfoMap) { ComputationWorkExecutorFactory computationWorkExecutorFactory = new ComputationWorkExecutorFactory( diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index 06adb93f2a79..eafd0d1f6f49 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -102,8 +102,8 @@ import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey; import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.streaming.Work; -import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEnginePipelineConfig; -import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEnginePipelineConfigManager; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandleImpl; import org.apache.beam.runners.dataflow.worker.testing.RestoreDataflowLoggingMDC; import org.apache.beam.runners.dataflow.worker.testing.TestCountingSource; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; @@ -277,8 +277,7 @@ public Long get() { @Rule public TestRule restoreMDC = new RestoreDataflowLoggingMDC(); @Rule public ErrorCollector errorCollector = new ErrorCollector(); WorkUnitClient mockWorkUnitClient = mock(WorkUnitClient.class); - StreamingEnginePipelineConfigManager mockConfigManager = - mock(StreamingEnginePipelineConfigManager.class); + StreamingGlobalConfigHandleImpl mockConfigManager = mock(StreamingGlobalConfigHandleImpl.class); HotKeyLogger hotKeyLogger = mock(HotKeyLogger.class); private @Nullable ComputationStateCache computationStateCache = null; @@ -841,7 +840,7 @@ private DataflowWorkerHarnessOptions createTestingPipelineOptions(String... args private StreamingDataflowWorker makeWorker( StreamingDataflowWorkerTestParams streamingDataflowWorkerTestParams) { when(mockConfigManager.getConfig()) - .thenReturn(streamingDataflowWorkerTestParams.streamingEnginePipelineConfig()); + .thenReturn(streamingDataflowWorkerTestParams.streamingGlobalConfig()); StreamingDataflowWorker worker = StreamingDataflowWorker.forTesting( streamingDataflowWorkerTestParams.stateNameMappings(), @@ -1218,8 +1217,8 @@ public void testKeyCommitTooLargeException() throws Exception { makeWorker( defaultWorkerParams() .setInstructions(instructions) - .setStreamingEnginePipelineConfig( - StreamingEnginePipelineConfig.builder() + .setStreamingGlobalConfig( + StreamingGlobalConfig.builder() .setOperationalLimits( OperationalLimits.builder().setMaxWorkItemCommitBytes(1000).build()) .build()) @@ -1293,8 +1292,8 @@ public void testOutputKeyTooLargeException() throws Exception { makeWorker( defaultWorkerParams("--experiments=throw_exceptions_on_large_output") .setInstructions(instructions) - .setStreamingEnginePipelineConfig( - StreamingEnginePipelineConfig.builder() + .setStreamingGlobalConfig( + StreamingGlobalConfig.builder() .setOperationalLimits( OperationalLimits.builder().setMaxOutputKeyBytes(15).build()) .build()) @@ -1330,8 +1329,8 @@ public void testOutputValueTooLargeException() throws Exception { makeWorker( defaultWorkerParams("--experiments=throw_exceptions_on_large_output") .setInstructions(instructions) - .setStreamingEnginePipelineConfig( - StreamingEnginePipelineConfig.builder() + .setStreamingGlobalConfig( + StreamingGlobalConfig.builder() .setOperationalLimits( OperationalLimits.builder().setMaxOutputValueBytes(15).build()) .build()) @@ -4528,7 +4527,7 @@ private static StreamingDataflowWorkerTestParams.Builder builder() { .setLocalRetryTimeoutMs(-1) .setPublishCounters(false) .setClock(Instant::now) - .setStreamingEnginePipelineConfig(StreamingEnginePipelineConfig.builder().build()); + .setStreamingGlobalConfig(StreamingGlobalConfig.builder().build()); } abstract ImmutableMap stateNameMappings(); @@ -4545,7 +4544,7 @@ private static StreamingDataflowWorkerTestParams.Builder builder() { abstract int localRetryTimeoutMs(); - abstract StreamingEnginePipelineConfig streamingEnginePipelineConfig(); + abstract StreamingGlobalConfig streamingGlobalConfig(); @AutoValue.Builder abstract static class Builder { @@ -4580,7 +4579,7 @@ final Builder publishCounters() { abstract Builder setLocalRetryTimeoutMs(int value); - abstract Builder setStreamingEnginePipelineConfig(StreamingEnginePipelineConfig config); + abstract Builder setStreamingGlobalConfig(StreamingGlobalConfig config); abstract StreamingDataflowWorkerTestParams build(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java index 3263a2c942a0..f178576c085f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java @@ -59,9 +59,9 @@ import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope; import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.streaming.Work; -import org.apache.beam.runners.dataflow.worker.streaming.config.FixedPipelineConfigManagerImpl; -import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEnginePipelineConfig; -import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEnginePipelineConfigManager; +import org.apache.beam.runners.dataflow.worker.streaming.config.FixedGlobalConfigHandle; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle; import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.FakeGetDataClient; @@ -110,8 +110,8 @@ public void setUp() { options = PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class); CounterSet counterSet = new CounterSet(); ConcurrentHashMap stateNameMap = new ConcurrentHashMap<>(); - StreamingEnginePipelineConfigManager configManager = - new FixedPipelineConfigManagerImpl(StreamingEnginePipelineConfig.builder().build()); + StreamingGlobalConfigHandle configManager = + new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build()); stateNameMap.put(NameContextsForTests.nameContextForTest().userName(), "testStateFamily"); executionContext = new StreamingModeExecutionContext( diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java index 82e209605e3c..6d56ec78f4eb 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java @@ -90,9 +90,9 @@ import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.NoopProfileScope; import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.streaming.Work; -import org.apache.beam.runners.dataflow.worker.streaming.config.FixedPipelineConfigManagerImpl; -import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEnginePipelineConfig; -import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEnginePipelineConfigManager; +import org.apache.beam.runners.dataflow.worker.streaming.config.FixedGlobalConfigHandle; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle; import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher; import org.apache.beam.runners.dataflow.worker.testing.TestCountingSource; import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader; @@ -597,8 +597,8 @@ public void testReadUnboundedReader() throws Exception { StreamingModeExecutionStateRegistry executionStateRegistry = new StreamingModeExecutionStateRegistry(); ReaderCache readerCache = new ReaderCache(Duration.standardMinutes(1), Runnable::run); - StreamingEnginePipelineConfigManager configManager = - new FixedPipelineConfigManagerImpl(StreamingEnginePipelineConfig.builder().build()); + StreamingGlobalConfigHandle configManager = + new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build()); StreamingModeExecutionContext context = new StreamingModeExecutionContext( counterSet, @@ -965,8 +965,8 @@ public void testFailedWorkItemsAbort() throws Exception { CounterSet counterSet = new CounterSet(); StreamingModeExecutionStateRegistry executionStateRegistry = new StreamingModeExecutionStateRegistry(); - StreamingEnginePipelineConfigManager configManager = - new FixedPipelineConfigManagerImpl(StreamingEnginePipelineConfig.builder().build()); + StreamingGlobalConfigHandle configManager = + new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build()); StreamingModeExecutionContext context = new StreamingModeExecutionContext( counterSet, diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedPipelineConfigManagerImplTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedGlobalConfigHandleTest.java similarity index 56% rename from runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedPipelineConfigManagerImplTest.java rename to runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedGlobalConfigHandleTest.java index 13dd635da182..823e0c42822c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedPipelineConfigManagerImplTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedGlobalConfigHandleTest.java @@ -18,7 +18,11 @@ package org.apache.beam.runners.dataflow.worker.streaming.config; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.apache.beam.runners.dataflow.worker.OperationalLimits; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.UserWorkerRunnerV1Settings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; @@ -28,12 +32,12 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) -public class FixedPipelineConfigManagerImplTest { +public class FixedGlobalConfigHandleTest { @Test public void getConfig() { - StreamingEnginePipelineConfig config = - StreamingEnginePipelineConfig.builder() + StreamingGlobalConfig config = + StreamingGlobalConfig.builder() .setOperationalLimits( OperationalLimits.builder() .setMaxOutputValueBytes(123) @@ -46,7 +50,35 @@ public void getConfig() { .setUseSeparateWindmillHeartbeatStreams(false) .build()) .build(); - FixedPipelineConfigManagerImpl configManager = new FixedPipelineConfigManagerImpl(config); + FixedGlobalConfigHandle configManager = new FixedGlobalConfigHandle(config); + assertEquals(config, configManager.getConfig()); + } + + @Test + public void onConfig() throws InterruptedException { + StreamingGlobalConfig config = + StreamingGlobalConfig.builder() + .setOperationalLimits( + OperationalLimits.builder() + .setMaxOutputValueBytes(123) + .setMaxOutputKeyBytes(324) + .setMaxWorkItemCommitBytes(456) + .build()) + .setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost"))) + .setUserWorkerJobSettings( + UserWorkerRunnerV1Settings.newBuilder() + .setUseSeparateWindmillHeartbeatStreams(false) + .build()) + .build(); + FixedGlobalConfigHandle configManager = new FixedGlobalConfigHandle(config); + AtomicReference configFromCallback = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + configManager.onConfig( + cbConfig -> { + configFromCallback.set(cbConfig); + latch.countDown(); + }); + assertTrue(latch.await(10, TimeUnit.SECONDS)); assertEquals(config, configManager.getConfig()); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceComputationConfigFetcherTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceComputationConfigFetcherTest.java index f39c98c61b19..2586ae2be86f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceComputationConfigFetcherTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceComputationConfigFetcherTest.java @@ -137,6 +137,8 @@ public void testGetComputationConfig_onFetchConfigError() { } private StreamingApplianceComputationConfigFetcher createStreamingApplianceConfigLoader() { - return new StreamingApplianceComputationConfigFetcher(mockWindmillServer::getConfig); + return new StreamingApplianceComputationConfigFetcher( + mockWindmillServer::getConfig, + new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build())); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java index 01df8f6a95c1..24fe85aa5471 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java @@ -55,13 +55,13 @@ public class StreamingEngineComputationConfigFetcherTest { private StreamingEngineComputationConfigFetcher createConfigFetcher( boolean waitForInitialConfig, long globalConfigRefreshPeriod, - StreamingEnginePipelineConfigManagerImpl configManager) { + StreamingGlobalConfigHandleImpl globalConfigHandle) { return StreamingEngineComputationConfigFetcher.forTesting( !waitForInitialConfig, globalConfigRefreshPeriod, mockDataflowServiceClient, - ignored -> Executors.newSingleThreadScheduledExecutor(), - configManager); + globalConfigHandle, + ignored -> Executors.newSingleThreadScheduledExecutor()); } @After @@ -76,12 +76,11 @@ public void testStart_requiresInitialConfig() throws IOException, InterruptedExc .setJobId("job") .setStreamingConfigTask(new StreamingConfigTask().setMaxWorkItemCommitBytes(10L)); CountDownLatch waitForInitialConfig = new CountDownLatch(1); - Set receivedPipelineConfig = new HashSet<>(); + Set receivedPipelineConfig = new HashSet<>(); when(mockDataflowServiceClient.getGlobalStreamingConfigWorkItem()) .thenReturn(Optional.of(initialConfig)); - StreamingEnginePipelineConfigManagerImpl configManager = - new StreamingEnginePipelineConfigManagerImpl(); - configManager.onConfig( + StreamingGlobalConfigHandleImpl globalConfigHandle = new StreamingGlobalConfigHandleImpl(); + globalConfigHandle.onConfig( config -> { try { receivedPipelineConfig.add(config); @@ -91,13 +90,13 @@ public void testStart_requiresInitialConfig() throws IOException, InterruptedExc } }); streamingEngineConfigFetcher = - createConfigFetcher(/* waitForInitialConfig= */ true, 0, configManager); + createConfigFetcher(/* waitForInitialConfig= */ true, 0, globalConfigHandle); Thread asyncStartConfigLoader = new Thread(streamingEngineConfigFetcher::start); asyncStartConfigLoader.start(); waitForInitialConfig.countDown(); asyncStartConfigLoader.join(); - StreamingEnginePipelineConfig.Builder configBuilder = - StreamingEnginePipelineConfig.builder() + StreamingGlobalConfig.Builder configBuilder = + StreamingGlobalConfig.builder() .setOperationalLimits( OperationalLimits.builder() .setMaxWorkItemCommitBytes( @@ -121,7 +120,7 @@ public void testStart_startsPeriodicConfigRequests() throws IOException, Interru .setJobId("job") .setStreamingConfigTask(new StreamingConfigTask().setMaxWorkItemCommitBytes(100L)); CountDownLatch numExpectedRefreshes = new CountDownLatch(3); - Set receivedPipelineConfig = new HashSet<>(); + Set receivedPipelineConfig = new HashSet<>(); when(mockDataflowServiceClient.getGlobalStreamingConfigWorkItem()) .thenReturn(Optional.of(firstConfig)) .thenReturn(Optional.of(secondConfig)) @@ -131,16 +130,15 @@ public void testStart_startsPeriodicConfigRequests() throws IOException, Interru // ConfigFetcher should not do anything with a config that doesn't contain a // StreamingConfigTask. .thenReturn(Optional.of(new WorkItem().setJobId("jobId"))); - StreamingEnginePipelineConfigManagerImpl configManager = - new StreamingEnginePipelineConfigManagerImpl(); - configManager.onConfig( + StreamingGlobalConfigHandleImpl globalConfigHandle = new StreamingGlobalConfigHandleImpl(); + globalConfigHandle.onConfig( config -> { receivedPipelineConfig.add(config); numExpectedRefreshes.countDown(); }); streamingEngineConfigFetcher = createConfigFetcher( - /* waitForInitialConfig= */ true, Duration.millis(100).getMillis(), configManager); + /* waitForInitialConfig= */ true, Duration.millis(100).getMillis(), globalConfigHandle); Thread asyncStartConfigLoader = new Thread(streamingEngineConfigFetcher::start); asyncStartConfigLoader.start(); @@ -148,21 +146,21 @@ public void testStart_startsPeriodicConfigRequests() throws IOException, Interru asyncStartConfigLoader.join(); assertThat(receivedPipelineConfig) .containsExactly( - StreamingEnginePipelineConfig.builder() + StreamingGlobalConfig.builder() .setOperationalLimits( OperationalLimits.builder() .setMaxWorkItemCommitBytes( firstConfig.getStreamingConfigTask().getMaxWorkItemCommitBytes()) .build()) .build(), - StreamingEnginePipelineConfig.builder() + StreamingGlobalConfig.builder() .setOperationalLimits( OperationalLimits.builder() .setMaxWorkItemCommitBytes( secondConfig.getStreamingConfigTask().getMaxWorkItemCommitBytes()) .build()) .build(), - StreamingEnginePipelineConfig.builder() + StreamingGlobalConfig.builder() .setOperationalLimits( OperationalLimits.builder() .setMaxWorkItemCommitBytes( @@ -173,10 +171,9 @@ public void testStart_startsPeriodicConfigRequests() throws IOException, Interru @Test public void testGetComputationConfig() throws IOException { - StreamingEnginePipelineConfigManagerImpl configManager = - new StreamingEnginePipelineConfigManagerImpl(); + StreamingGlobalConfigHandleImpl globalConfigHandle = new StreamingGlobalConfigHandleImpl(); streamingEngineConfigFetcher = - createConfigFetcher(/* waitForInitialConfig= */ false, 0, configManager); + createConfigFetcher(/* waitForInitialConfig= */ false, 0, globalConfigHandle); String computationId = "computationId"; String stageName = "stageName"; String systemName = "systemName"; @@ -209,12 +206,11 @@ public void testGetComputationConfig() throws IOException { @Test public void testGetComputationConfig_noComputationPresent() throws IOException { - Set receivedPipelineConfig = new HashSet<>(); - StreamingEnginePipelineConfigManagerImpl configManager = - new StreamingEnginePipelineConfigManagerImpl(); - configManager.onConfig(receivedPipelineConfig::add); + Set receivedPipelineConfig = new HashSet<>(); + StreamingGlobalConfigHandleImpl globalConfigHandle = new StreamingGlobalConfigHandleImpl(); + globalConfigHandle.onConfig(receivedPipelineConfig::add); streamingEngineConfigFetcher = - createConfigFetcher(/* waitForInitialConfig= */ false, 0, configManager); + createConfigFetcher(/* waitForInitialConfig= */ false, 0, globalConfigHandle); when(mockDataflowServiceClient.getStreamingConfigWorkItem(anyString())) .thenReturn(Optional.empty()); Optional pipelineConfig = @@ -225,10 +221,9 @@ public void testGetComputationConfig_noComputationPresent() throws IOException { @Test public void testGetComputationConfig_fetchConfigFromDataflowError() throws IOException { - StreamingEnginePipelineConfigManagerImpl configManager = - new StreamingEnginePipelineConfigManagerImpl(); + StreamingGlobalConfigHandleImpl globalConfigHandle = new StreamingGlobalConfigHandleImpl(); streamingEngineConfigFetcher = - createConfigFetcher(/* waitForInitialConfig= */ false, 0, configManager); + createConfigFetcher(/* waitForInitialConfig= */ false, 0, globalConfigHandle); RuntimeException e = new RuntimeException("something bad happened."); when(mockDataflowServiceClient.getStreamingConfigWorkItem(anyString())).thenThrow(e); Throwable fetchConfigError = diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfigManagerImplTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImplTest.java similarity index 75% rename from runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfigManagerImplTest.java rename to runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImplTest.java index fe99331eeda0..c025dbc476c0 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfigManagerImplTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImplTest.java @@ -34,13 +34,12 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) -public class StreamingEnginePipelineConfigManagerImplTest { +public class StreamingGlobalConfigHandleImplTest { @Test public void getConfig() { - StreamingEnginePipelineConfigManagerImpl configManager = - new StreamingEnginePipelineConfigManagerImpl(); - StreamingEnginePipelineConfig config = - StreamingEnginePipelineConfig.builder() + StreamingGlobalConfigHandleImpl globalConfigHandle = new StreamingGlobalConfigHandleImpl(); + StreamingGlobalConfig config = + StreamingGlobalConfig.builder() .setOperationalLimits( OperationalLimits.builder() .setMaxOutputValueBytes(123) @@ -53,17 +52,16 @@ public void getConfig() { .setUseSeparateWindmillHeartbeatStreams(false) .build()) .build(); - configManager.setConfig(config); - assertEquals(config, configManager.getConfig()); + globalConfigHandle.setConfig(config); + assertEquals(config, globalConfigHandle.getConfig()); } @Test public void onConfig_configSetAfterRegisteringCallback() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); - StreamingEnginePipelineConfigManagerImpl configManager = - new StreamingEnginePipelineConfigManagerImpl(); - StreamingEnginePipelineConfig configToSet = - StreamingEnginePipelineConfig.builder() + StreamingGlobalConfigHandleImpl globalConfigHandle = new StreamingGlobalConfigHandleImpl(); + StreamingGlobalConfig configToSet = + StreamingGlobalConfig.builder() .setOperationalLimits( OperationalLimits.builder() .setMaxOutputValueBytes(123) @@ -76,24 +74,23 @@ public void onConfig_configSetAfterRegisteringCallback() throws InterruptedExcep .setUseSeparateWindmillHeartbeatStreams(false) .build()) .build(); - AtomicReference configFromCallback = new AtomicReference<>(); - configManager.onConfig( + AtomicReference configFromCallback = new AtomicReference<>(); + globalConfigHandle.onConfig( config -> { configFromCallback.set(config); latch.countDown(); }); - configManager.setConfig(configToSet); + globalConfigHandle.setConfig(configToSet); assertTrue(latch.await(10, TimeUnit.SECONDS)); - assertEquals(configFromCallback.get(), configManager.getConfig()); + assertEquals(configFromCallback.get(), globalConfigHandle.getConfig()); } @Test public void onConfig_configSetBeforeRegisteringCallback() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); - StreamingEnginePipelineConfigManagerImpl configManager = - new StreamingEnginePipelineConfigManagerImpl(); - StreamingEnginePipelineConfig configToSet = - StreamingEnginePipelineConfig.builder() + StreamingGlobalConfigHandleImpl globalConfigHandle = new StreamingGlobalConfigHandleImpl(); + StreamingGlobalConfig configToSet = + StreamingGlobalConfig.builder() .setOperationalLimits( OperationalLimits.builder() .setMaxOutputValueBytes(123) @@ -106,26 +103,25 @@ public void onConfig_configSetBeforeRegisteringCallback() throws InterruptedExce .setUseSeparateWindmillHeartbeatStreams(false) .build()) .build(); - AtomicReference configFromCallback = new AtomicReference<>(); - configManager.setConfig(configToSet); - configManager.onConfig( + AtomicReference configFromCallback = new AtomicReference<>(); + globalConfigHandle.setConfig(configToSet); + globalConfigHandle.onConfig( config -> { configFromCallback.set(config); latch.countDown(); }); assertTrue(latch.await(10, TimeUnit.SECONDS)); - assertEquals(configFromCallback.get(), configManager.getConfig()); + assertEquals(configFromCallback.get(), globalConfigHandle.getConfig()); } @Test public void onConfig_shouldNotCallCallbackForIfConfigRemainsSame() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); AtomicInteger callbackCount = new AtomicInteger(0); - StreamingEnginePipelineConfigManagerImpl configManager = - new StreamingEnginePipelineConfigManagerImpl(); - Supplier configToSet = + StreamingGlobalConfigHandleImpl globalConfigHandle = new StreamingGlobalConfigHandleImpl(); + Supplier configToSet = () -> - StreamingEnginePipelineConfig.builder() + StreamingGlobalConfig.builder() .setOperationalLimits( OperationalLimits.builder() .setMaxOutputValueBytes(123) @@ -138,14 +134,14 @@ public void onConfig_shouldNotCallCallbackForIfConfigRemainsSame() throws Interr .setUseSeparateWindmillHeartbeatStreams(false) .build()) .build(); - configManager.onConfig( + globalConfigHandle.onConfig( config -> { callbackCount.incrementAndGet(); latch.countDown(); }); - configManager.setConfig(configToSet.get()); + globalConfigHandle.setConfig(configToSet.get()); // call setter again with same config - configManager.setConfig(configToSet.get()); + globalConfigHandle.setConfig(configToSet.get()); assertTrue(latch.await(10, TimeUnit.SECONDS)); assertEquals(1, callbackCount.get()); }