From c713718dfe6938e39f3c78fe36e6e717bd48de9d Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Thu, 12 Sep 2024 19:43:50 -0700 Subject: [PATCH] review comments --- .../worker/StreamingDataflowWorker.java | 2 +- .../worker/StreamingModeExecutionContext.java | 9 +++++---- .../config/FixedGlobalConfigHandle.java | 4 ++-- .../config/StreamingGlobalConfigHandle.java | 12 +++++------- .../StreamingGlobalConfigHandleImpl.java | 18 ++++++------------ .../harness/StreamingWorkerStatusPages.java | 2 +- .../ComputationWorkExecutorFactory.java | 8 ++++---- .../processing/StreamingWorkScheduler.java | 14 +++++++------- .../worker/StreamingDataflowWorkerTest.java | 7 ++++--- .../StreamingModeExecutionContextTest.java | 4 ++-- .../worker/WorkerCustomSourcesTest.java | 8 ++++---- .../config/FixedGlobalConfigHandleTest.java | 12 ++++++------ ...mingEngineComputationConfigFetcherTest.java | 6 +++--- .../StreamingGlobalConfigHandleImplTest.java | 16 ++++++++++------ 14 files changed, 60 insertions(+), 62 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 092e43bf3a6f..c6cc58739f41 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -434,7 +434,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o configFetcher = StreamingEngineComputationConfigFetcher.create( options.getGlobalConfigRefreshPeriod().getMillis(), dataflowServiceClient); - configFetcher.getGlobalConfigHandle().onConfig(dispatcherClient::onJobConfig); + configFetcher.getGlobalConfigHandle().registerConfigObserver(dispatcherClient::onJobConfig); computationStateCache = computationStateCacheFactory.apply(configFetcher); windmillStreamFactory = windmillStreamFactoryBuilder diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index 85af41577358..5ff94884e974 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -108,7 +108,7 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext stateNameMap; private final WindmillStateCache.ForComputation stateCache; private final ReaderCache readerCache; - private final StreamingGlobalConfigHandle configManager; + private final StreamingGlobalConfigHandle globalConfigHandle; private final boolean throwExceptionOnLargeOutput; private volatile long backlogBytes; @@ -155,7 +155,7 @@ public StreamingModeExecutionContext( MetricsContainerRegistry metricsContainerRegistry, DataflowExecutionStateTracker executionStateTracker, StreamingModeExecutionStateRegistry executionStateRegistry, - StreamingGlobalConfigHandle configManager, + StreamingGlobalConfigHandle globalConfigHandle, long sinkByteLimit, boolean throwExceptionOnLargeOutput) { super( @@ -166,7 +166,7 @@ public StreamingModeExecutionContext( sinkByteLimit); this.computationId = computationId; this.readerCache = readerCache; - this.configManager = configManager; + this.globalConfigHandle = globalConfigHandle; this.sideInputCache = new HashMap<>(); this.stateNameMap = ImmutableMap.copyOf(stateNameMap); this.stateCache = stateCache; @@ -205,7 +205,8 @@ public void start( this.work = work; this.computationKey = WindmillComputationKey.create(computationId, work.getShardedKey()); this.sideInputStateFetcher = sideInputStateFetcher; - this.operationalLimits = configManager.getConfig().operationalLimits(); + // Snapshot the limits for entire bundle processing. + this.operationalLimits = globalConfigHandle.getConfig().operationalLimits(); this.outputBuilder = outputBuilder; this.sideInputCache.clear(); clearSinkFullHint(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedGlobalConfigHandle.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedGlobalConfigHandle.java index e0485ae77b71..c244ecb8c7a8 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedGlobalConfigHandle.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedGlobalConfigHandle.java @@ -25,7 +25,7 @@ @Internal @ThreadSafe /* - * StreamingEnginePipelineConfigManager returning a fixed config + * StreamingGlobalConfigHandle returning a fixed config * initialized during construction. Used for Appliance and Tests. */ public class FixedGlobalConfigHandle implements StreamingGlobalConfigHandle { @@ -42,7 +42,7 @@ public StreamingGlobalConfig getConfig() { } @Override - public void onConfig(@Nonnull Consumer callback) { + public void registerConfigObserver(@Nonnull Consumer callback) { callback.accept(config); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandle.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandle.java index 646f3e212e31..f0a8989fbe3c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandle.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandle.java @@ -26,14 +26,12 @@ @ThreadSafe public interface StreamingGlobalConfigHandle { - /* - * Returns the latest StreamingEnginePipelineConfig - */ + /** Returns the latest StreamingGlobalConfig */ StreamingGlobalConfig getConfig(); - /* - * Subscribe to config updates by registering a callback. - * Callback should be called the first time with settings, if any, inline before the method returns. + /** + * Subscribe to config updates by registering a callback. Callback should be called the first time + * with settings, if any, inline before the method returns. */ - void onConfig(@Nonnull Consumer callback); + void registerConfigObserver(@Nonnull Consumer callback); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImpl.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImpl.java index 7c9e4ab8f6dc..458f9b1630d9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImpl.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImpl.java @@ -36,9 +36,6 @@ public class StreamingGlobalConfigHandleImpl implements StreamingGlobalConfigHan private final CopyOnWriteArrayList> config_callbacks = new CopyOnWriteArrayList<>(); - /* - * Returns the latest StreamingEnginePipelineConfig - */ @Override public StreamingGlobalConfig getConfig() { Preconditions.checkState( @@ -47,25 +44,22 @@ public StreamingGlobalConfig getConfig() { return streamingEngineConfig.get(); } - /* - * Subscribe to config updates by registering a callback. - * Callback will be called the first time with settings, if any, inline before the method returns. - */ @Override - public void onConfig(@Nonnull Consumer callback) { + public void registerConfigObserver(@Nonnull Consumer callback) { StreamingGlobalConfig config; synchronized (this) { config_callbacks.add(callback); config = streamingEngineConfig.get(); } if (config != null) { - callback.accept(config); + // read config from streamingEngineConfig again + // to prevent calling callback with stale config. + // The cached `config` will be stale if setConfig + // ran after the synchronized block. + callback.accept(streamingEngineConfig.get()); } } - /* - * Package private setter for setting config - */ void setConfig(@Nonnull StreamingGlobalConfig config) { Iterator> iterator; synchronized (this) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java index 4654845c83da..b65a5f10658e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java @@ -110,7 +110,7 @@ public final class StreamingWorkerStatusPages { this.getDataStatusProvider = getDataStatusProvider; this.workUnitExecutor = workUnitExecutor; this.statusPageDumper = statusPageDumper; - globalConfigHandle.onConfig(seConfig::set); + globalConfigHandle.registerConfigObserver(seConfig::set); } public static StreamingWorkerStatusPages.Builder builder() { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java index c1a761c8b41a..d5e0b3a24e2a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java @@ -95,7 +95,7 @@ final class ComputationWorkExecutorFactory { private final long maxSinkBytes; private final IdGenerator idGenerator; - private final StreamingGlobalConfigHandle configManager; + private final StreamingGlobalConfigHandle globalConfigHandle; private final boolean throwExceptionOnLargeOutput; ComputationWorkExecutorFactory( @@ -106,13 +106,13 @@ final class ComputationWorkExecutorFactory { DataflowExecutionStateSampler sampler, CounterSet pendingDeltaCounters, IdGenerator idGenerator, - StreamingGlobalConfigHandle configManager) { + StreamingGlobalConfigHandle globalConfigHandle) { this.options = options; this.mapTaskExecutorFactory = mapTaskExecutorFactory; this.readerCache = readerCache; this.stateCacheFactory = stateCacheFactory; this.idGenerator = idGenerator; - this.configManager = configManager; + this.globalConfigHandle = globalConfigHandle; this.readerRegistry = ReaderRegistry.defaultRegistry(); this.sinkRegistry = SinkRegistry.defaultRegistry(); this.sampler = sampler; @@ -266,7 +266,7 @@ private StreamingModeExecutionContext createExecutionContext( stageInfo.metricsContainerRegistry(), executionStateTracker, stageInfo.executionStateRegistry(), - configManager, + globalConfigHandle, maxSinkBytes, throwExceptionOnLargeOutput); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java index 3c968f71bf44..641fd119a42d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java @@ -84,7 +84,7 @@ public final class StreamingWorkScheduler { private final HotKeyLogger hotKeyLogger; private final ConcurrentMap stageInfoMap; private final DataflowExecutionStateSampler sampler; - private final StreamingGlobalConfigHandle configManager; + private final StreamingGlobalConfigHandle globalConfigHandle; public StreamingWorkScheduler( DataflowWorkerHarnessOptions options, @@ -98,7 +98,7 @@ public StreamingWorkScheduler( HotKeyLogger hotKeyLogger, ConcurrentMap stageInfoMap, DataflowExecutionStateSampler sampler, - StreamingGlobalConfigHandle configManager) { + StreamingGlobalConfigHandle globalConfigHandle) { this.options = options; this.clock = clock; this.computationWorkExecutorFactory = computationWorkExecutorFactory; @@ -110,7 +110,7 @@ public StreamingWorkScheduler( this.hotKeyLogger = hotKeyLogger; this.stageInfoMap = stageInfoMap; this.sampler = sampler; - this.configManager = configManager; + this.globalConfigHandle = globalConfigHandle; } public static StreamingWorkScheduler create( @@ -126,7 +126,7 @@ public static StreamingWorkScheduler create( HotKeyLogger hotKeyLogger, DataflowExecutionStateSampler sampler, IdGenerator idGenerator, - StreamingGlobalConfigHandle configManager, + StreamingGlobalConfigHandle globalConfigHandle, ConcurrentMap stageInfoMap) { ComputationWorkExecutorFactory computationWorkExecutorFactory = new ComputationWorkExecutorFactory( @@ -137,7 +137,7 @@ public static StreamingWorkScheduler create( sampler, streamingCounters.pendingDeltaCounters(), idGenerator, - configManager); + globalConfigHandle); return new StreamingWorkScheduler( options, @@ -151,7 +151,7 @@ public static StreamingWorkScheduler create( hotKeyLogger, stageInfoMap, sampler, - configManager); + globalConfigHandle); } private static long computeShuffleBytesRead(Windmill.WorkItem workItem) { @@ -295,7 +295,7 @@ private Windmill.WorkItemCommitRequest validateCommitRequestSize( Windmill.WorkItemCommitRequest commitRequest, String computationId, Windmill.WorkItem workItem) { - long byteLimit = configManager.getConfig().operationalLimits().getMaxWorkItemCommitBytes(); + long byteLimit = globalConfigHandle.getConfig().operationalLimits().getMaxWorkItemCommitBytes(); int commitSize = commitRequest.getSerializedSize(); int estimatedCommitSize = commitSize < 0 ? Integer.MAX_VALUE : commitSize; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index eafd0d1f6f49..95d3a9f7ce55 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -277,7 +277,8 @@ public Long get() { @Rule public TestRule restoreMDC = new RestoreDataflowLoggingMDC(); @Rule public ErrorCollector errorCollector = new ErrorCollector(); WorkUnitClient mockWorkUnitClient = mock(WorkUnitClient.class); - StreamingGlobalConfigHandleImpl mockConfigManager = mock(StreamingGlobalConfigHandleImpl.class); + StreamingGlobalConfigHandleImpl mockglobalConfigHandle = + mock(StreamingGlobalConfigHandleImpl.class); HotKeyLogger hotKeyLogger = mock(HotKeyLogger.class); private @Nullable ComputationStateCache computationStateCache = null; @@ -839,7 +840,7 @@ private DataflowWorkerHarnessOptions createTestingPipelineOptions(String... args private StreamingDataflowWorker makeWorker( StreamingDataflowWorkerTestParams streamingDataflowWorkerTestParams) { - when(mockConfigManager.getConfig()) + when(mockglobalConfigHandle.getConfig()) .thenReturn(streamingDataflowWorkerTestParams.streamingGlobalConfig()); StreamingDataflowWorker worker = StreamingDataflowWorker.forTesting( @@ -854,7 +855,7 @@ private StreamingDataflowWorker makeWorker( hotKeyLogger, streamingDataflowWorkerTestParams.clock(), streamingDataflowWorkerTestParams.executorSupplier(), - mockConfigManager, + mockglobalConfigHandle, streamingDataflowWorkerTestParams.localRetryTimeoutMs()); this.computationStateCache = worker.getComputationStateCache(); return worker; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java index f178576c085f..a1d4210f3dbc 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java @@ -110,7 +110,7 @@ public void setUp() { options = PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class); CounterSet counterSet = new CounterSet(); ConcurrentHashMap stateNameMap = new ConcurrentHashMap<>(); - StreamingGlobalConfigHandle configManager = + StreamingGlobalConfigHandle globalConfigHandle = new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build()); stateNameMap.put(NameContextsForTests.nameContextForTest().userName(), "testStateFamily"); executionContext = @@ -132,7 +132,7 @@ public void setUp() { PipelineOptionsFactory.create(), "test-work-item-id"), executionStateRegistry, - configManager, + globalConfigHandle, Long.MAX_VALUE, /*throwExceptionOnLargeOutput=*/ false); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java index 6d56ec78f4eb..8ad73a5145bc 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java @@ -597,7 +597,7 @@ public void testReadUnboundedReader() throws Exception { StreamingModeExecutionStateRegistry executionStateRegistry = new StreamingModeExecutionStateRegistry(); ReaderCache readerCache = new ReaderCache(Duration.standardMinutes(1), Runnable::run); - StreamingGlobalConfigHandle configManager = + StreamingGlobalConfigHandle globalConfigHandle = new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build()); StreamingModeExecutionContext context = new StreamingModeExecutionContext( @@ -615,7 +615,7 @@ public void testReadUnboundedReader() throws Exception { PipelineOptionsFactory.create(), "test-work-item-id"), executionStateRegistry, - configManager, + globalConfigHandle, Long.MAX_VALUE, /*throwExceptionOnLargeOutput=*/ false); @@ -965,7 +965,7 @@ public void testFailedWorkItemsAbort() throws Exception { CounterSet counterSet = new CounterSet(); StreamingModeExecutionStateRegistry executionStateRegistry = new StreamingModeExecutionStateRegistry(); - StreamingGlobalConfigHandle configManager = + StreamingGlobalConfigHandle globalConfigHandle = new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build()); StreamingModeExecutionContext context = new StreamingModeExecutionContext( @@ -986,7 +986,7 @@ public void testFailedWorkItemsAbort() throws Exception { PipelineOptionsFactory.create(), "test-work-item-id"), executionStateRegistry, - configManager, + globalConfigHandle, Long.MAX_VALUE, /*throwExceptionOnLargeOutput=*/ false); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedGlobalConfigHandleTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedGlobalConfigHandleTest.java index 823e0c42822c..b5cb85a58c12 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedGlobalConfigHandleTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedGlobalConfigHandleTest.java @@ -50,12 +50,12 @@ public void getConfig() { .setUseSeparateWindmillHeartbeatStreams(false) .build()) .build(); - FixedGlobalConfigHandle configManager = new FixedGlobalConfigHandle(config); - assertEquals(config, configManager.getConfig()); + FixedGlobalConfigHandle globalConfigHandle = new FixedGlobalConfigHandle(config); + assertEquals(config, globalConfigHandle.getConfig()); } @Test - public void onConfig() throws InterruptedException { + public void registerConfigObserver() throws InterruptedException { StreamingGlobalConfig config = StreamingGlobalConfig.builder() .setOperationalLimits( @@ -70,15 +70,15 @@ public void onConfig() throws InterruptedException { .setUseSeparateWindmillHeartbeatStreams(false) .build()) .build(); - FixedGlobalConfigHandle configManager = new FixedGlobalConfigHandle(config); + FixedGlobalConfigHandle globalConfigHandle = new FixedGlobalConfigHandle(config); AtomicReference configFromCallback = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); - configManager.onConfig( + globalConfigHandle.registerConfigObserver( cbConfig -> { configFromCallback.set(cbConfig); latch.countDown(); }); assertTrue(latch.await(10, TimeUnit.SECONDS)); - assertEquals(config, configManager.getConfig()); + assertEquals(configFromCallback.get(), globalConfigHandle.getConfig()); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java index 24fe85aa5471..9fa17588c94d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java @@ -80,7 +80,7 @@ public void testStart_requiresInitialConfig() throws IOException, InterruptedExc when(mockDataflowServiceClient.getGlobalStreamingConfigWorkItem()) .thenReturn(Optional.of(initialConfig)); StreamingGlobalConfigHandleImpl globalConfigHandle = new StreamingGlobalConfigHandleImpl(); - globalConfigHandle.onConfig( + globalConfigHandle.registerConfigObserver( config -> { try { receivedPipelineConfig.add(config); @@ -131,7 +131,7 @@ public void testStart_startsPeriodicConfigRequests() throws IOException, Interru // StreamingConfigTask. .thenReturn(Optional.of(new WorkItem().setJobId("jobId"))); StreamingGlobalConfigHandleImpl globalConfigHandle = new StreamingGlobalConfigHandleImpl(); - globalConfigHandle.onConfig( + globalConfigHandle.registerConfigObserver( config -> { receivedPipelineConfig.add(config); numExpectedRefreshes.countDown(); @@ -208,7 +208,7 @@ public void testGetComputationConfig() throws IOException { public void testGetComputationConfig_noComputationPresent() throws IOException { Set receivedPipelineConfig = new HashSet<>(); StreamingGlobalConfigHandleImpl globalConfigHandle = new StreamingGlobalConfigHandleImpl(); - globalConfigHandle.onConfig(receivedPipelineConfig::add); + globalConfigHandle.registerConfigObserver(receivedPipelineConfig::add); streamingEngineConfigFetcher = createConfigFetcher(/* waitForInitialConfig= */ false, 0, globalConfigHandle); when(mockDataflowServiceClient.getStreamingConfigWorkItem(anyString())) diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImplTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImplTest.java index c025dbc476c0..0fae05050205 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImplTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImplTest.java @@ -35,6 +35,7 @@ @RunWith(JUnit4.class) public class StreamingGlobalConfigHandleImplTest { + @Test public void getConfig() { StreamingGlobalConfigHandleImpl globalConfigHandle = new StreamingGlobalConfigHandleImpl(); @@ -57,7 +58,8 @@ public void getConfig() { } @Test - public void onConfig_configSetAfterRegisteringCallback() throws InterruptedException { + public void registerConfigObserver_configSetAfterRegisteringCallback() + throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); StreamingGlobalConfigHandleImpl globalConfigHandle = new StreamingGlobalConfigHandleImpl(); StreamingGlobalConfig configToSet = @@ -75,7 +77,7 @@ public void onConfig_configSetAfterRegisteringCallback() throws InterruptedExcep .build()) .build(); AtomicReference configFromCallback = new AtomicReference<>(); - globalConfigHandle.onConfig( + globalConfigHandle.registerConfigObserver( config -> { configFromCallback.set(config); latch.countDown(); @@ -86,7 +88,8 @@ public void onConfig_configSetAfterRegisteringCallback() throws InterruptedExcep } @Test - public void onConfig_configSetBeforeRegisteringCallback() throws InterruptedException { + public void registerConfigObserver_configSetBeforeRegisteringCallback() + throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); StreamingGlobalConfigHandleImpl globalConfigHandle = new StreamingGlobalConfigHandleImpl(); StreamingGlobalConfig configToSet = @@ -105,7 +108,7 @@ public void onConfig_configSetBeforeRegisteringCallback() throws InterruptedExce .build(); AtomicReference configFromCallback = new AtomicReference<>(); globalConfigHandle.setConfig(configToSet); - globalConfigHandle.onConfig( + globalConfigHandle.registerConfigObserver( config -> { configFromCallback.set(config); latch.countDown(); @@ -115,7 +118,8 @@ public void onConfig_configSetBeforeRegisteringCallback() throws InterruptedExce } @Test - public void onConfig_shouldNotCallCallbackForIfConfigRemainsSame() throws InterruptedException { + public void registerConfigObserver_shouldNotCallCallbackForIfConfigRemainsSame() + throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); AtomicInteger callbackCount = new AtomicInteger(0); StreamingGlobalConfigHandleImpl globalConfigHandle = new StreamingGlobalConfigHandleImpl(); @@ -134,7 +138,7 @@ public void onConfig_shouldNotCallCallbackForIfConfigRemainsSame() throws Interr .setUseSeparateWindmillHeartbeatStreams(false) .build()) .build(); - globalConfigHandle.onConfig( + globalConfigHandle.registerConfigObserver( config -> { callbackCount.incrementAndGet(); latch.countDown();