Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
arunpandianp committed Sep 13, 2024
1 parent a8bb92f commit c713718
Show file tree
Hide file tree
Showing 14 changed files with 60 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
configFetcher =
StreamingEngineComputationConfigFetcher.create(
options.getGlobalConfigRefreshPeriod().getMillis(), dataflowServiceClient);
configFetcher.getGlobalConfigHandle().onConfig(dispatcherClient::onJobConfig);
configFetcher.getGlobalConfigHandle().registerConfigObserver(dispatcherClient::onJobConfig);
computationStateCache = computationStateCacheFactory.apply(configFetcher);
windmillStreamFactory =
windmillStreamFactoryBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext<Step
private final ImmutableMap<String, String> stateNameMap;
private final WindmillStateCache.ForComputation stateCache;
private final ReaderCache readerCache;
private final StreamingGlobalConfigHandle configManager;
private final StreamingGlobalConfigHandle globalConfigHandle;
private final boolean throwExceptionOnLargeOutput;
private volatile long backlogBytes;

Expand Down Expand Up @@ -155,7 +155,7 @@ public StreamingModeExecutionContext(
MetricsContainerRegistry<StreamingStepMetricsContainer> metricsContainerRegistry,
DataflowExecutionStateTracker executionStateTracker,
StreamingModeExecutionStateRegistry executionStateRegistry,
StreamingGlobalConfigHandle configManager,
StreamingGlobalConfigHandle globalConfigHandle,
long sinkByteLimit,
boolean throwExceptionOnLargeOutput) {
super(
Expand All @@ -166,7 +166,7 @@ public StreamingModeExecutionContext(
sinkByteLimit);
this.computationId = computationId;
this.readerCache = readerCache;
this.configManager = configManager;
this.globalConfigHandle = globalConfigHandle;
this.sideInputCache = new HashMap<>();
this.stateNameMap = ImmutableMap.copyOf(stateNameMap);
this.stateCache = stateCache;
Expand Down Expand Up @@ -205,7 +205,8 @@ public void start(
this.work = work;
this.computationKey = WindmillComputationKey.create(computationId, work.getShardedKey());
this.sideInputStateFetcher = sideInputStateFetcher;
this.operationalLimits = configManager.getConfig().operationalLimits();
// Snapshot the limits for entire bundle processing.
this.operationalLimits = globalConfigHandle.getConfig().operationalLimits();
this.outputBuilder = outputBuilder;
this.sideInputCache.clear();
clearSinkFullHint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
@Internal
@ThreadSafe
/*
* StreamingEnginePipelineConfigManager returning a fixed config
* StreamingGlobalConfigHandle returning a fixed config
* initialized during construction. Used for Appliance and Tests.
*/
public class FixedGlobalConfigHandle implements StreamingGlobalConfigHandle {
Expand All @@ -42,7 +42,7 @@ public StreamingGlobalConfig getConfig() {
}

@Override
public void onConfig(@Nonnull Consumer<StreamingGlobalConfig> callback) {
public void registerConfigObserver(@Nonnull Consumer<StreamingGlobalConfig> callback) {
callback.accept(config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,12 @@
@ThreadSafe
public interface StreamingGlobalConfigHandle {

/*
* Returns the latest StreamingEnginePipelineConfig
*/
/** Returns the latest StreamingGlobalConfig */
StreamingGlobalConfig getConfig();

/*
* Subscribe to config updates by registering a callback.
* Callback should be called the first time with settings, if any, inline before the method returns.
/**
* Subscribe to config updates by registering a callback. Callback should be called the first time
* with settings, if any, inline before the method returns.
*/
void onConfig(@Nonnull Consumer<StreamingGlobalConfig> callback);
void registerConfigObserver(@Nonnull Consumer<StreamingGlobalConfig> callback);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ public class StreamingGlobalConfigHandleImpl implements StreamingGlobalConfigHan
private final CopyOnWriteArrayList<Consumer<StreamingGlobalConfig>> config_callbacks =
new CopyOnWriteArrayList<>();

/*
* Returns the latest StreamingEnginePipelineConfig
*/
@Override
public StreamingGlobalConfig getConfig() {
Preconditions.checkState(
Expand All @@ -47,25 +44,22 @@ public StreamingGlobalConfig getConfig() {
return streamingEngineConfig.get();
}

/*
* Subscribe to config updates by registering a callback.
* Callback will be called the first time with settings, if any, inline before the method returns.
*/
@Override
public void onConfig(@Nonnull Consumer<StreamingGlobalConfig> callback) {
public void registerConfigObserver(@Nonnull Consumer<StreamingGlobalConfig> callback) {
StreamingGlobalConfig config;
synchronized (this) {
config_callbacks.add(callback);
config = streamingEngineConfig.get();
}
if (config != null) {
callback.accept(config);
// read config from streamingEngineConfig again
// to prevent calling callback with stale config.
// The cached `config` will be stale if setConfig
// ran after the synchronized block.
callback.accept(streamingEngineConfig.get());
}
}

/*
* Package private setter for setting config
*/
void setConfig(@Nonnull StreamingGlobalConfig config) {
Iterator<Consumer<StreamingGlobalConfig>> iterator;
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public final class StreamingWorkerStatusPages {
this.getDataStatusProvider = getDataStatusProvider;
this.workUnitExecutor = workUnitExecutor;
this.statusPageDumper = statusPageDumper;
globalConfigHandle.onConfig(seConfig::set);
globalConfigHandle.registerConfigObserver(seConfig::set);
}

public static StreamingWorkerStatusPages.Builder builder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ final class ComputationWorkExecutorFactory {

private final long maxSinkBytes;
private final IdGenerator idGenerator;
private final StreamingGlobalConfigHandle configManager;
private final StreamingGlobalConfigHandle globalConfigHandle;
private final boolean throwExceptionOnLargeOutput;

ComputationWorkExecutorFactory(
Expand All @@ -106,13 +106,13 @@ final class ComputationWorkExecutorFactory {
DataflowExecutionStateSampler sampler,
CounterSet pendingDeltaCounters,
IdGenerator idGenerator,
StreamingGlobalConfigHandle configManager) {
StreamingGlobalConfigHandle globalConfigHandle) {
this.options = options;
this.mapTaskExecutorFactory = mapTaskExecutorFactory;
this.readerCache = readerCache;
this.stateCacheFactory = stateCacheFactory;
this.idGenerator = idGenerator;
this.configManager = configManager;
this.globalConfigHandle = globalConfigHandle;
this.readerRegistry = ReaderRegistry.defaultRegistry();
this.sinkRegistry = SinkRegistry.defaultRegistry();
this.sampler = sampler;
Expand Down Expand Up @@ -266,7 +266,7 @@ private StreamingModeExecutionContext createExecutionContext(
stageInfo.metricsContainerRegistry(),
executionStateTracker,
stageInfo.executionStateRegistry(),
configManager,
globalConfigHandle,
maxSinkBytes,
throwExceptionOnLargeOutput);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public final class StreamingWorkScheduler {
private final HotKeyLogger hotKeyLogger;
private final ConcurrentMap<String, StageInfo> stageInfoMap;
private final DataflowExecutionStateSampler sampler;
private final StreamingGlobalConfigHandle configManager;
private final StreamingGlobalConfigHandle globalConfigHandle;

public StreamingWorkScheduler(
DataflowWorkerHarnessOptions options,
Expand All @@ -98,7 +98,7 @@ public StreamingWorkScheduler(
HotKeyLogger hotKeyLogger,
ConcurrentMap<String, StageInfo> stageInfoMap,
DataflowExecutionStateSampler sampler,
StreamingGlobalConfigHandle configManager) {
StreamingGlobalConfigHandle globalConfigHandle) {
this.options = options;
this.clock = clock;
this.computationWorkExecutorFactory = computationWorkExecutorFactory;
Expand All @@ -110,7 +110,7 @@ public StreamingWorkScheduler(
this.hotKeyLogger = hotKeyLogger;
this.stageInfoMap = stageInfoMap;
this.sampler = sampler;
this.configManager = configManager;
this.globalConfigHandle = globalConfigHandle;
}

public static StreamingWorkScheduler create(
Expand All @@ -126,7 +126,7 @@ public static StreamingWorkScheduler create(
HotKeyLogger hotKeyLogger,
DataflowExecutionStateSampler sampler,
IdGenerator idGenerator,
StreamingGlobalConfigHandle configManager,
StreamingGlobalConfigHandle globalConfigHandle,
ConcurrentMap<String, StageInfo> stageInfoMap) {
ComputationWorkExecutorFactory computationWorkExecutorFactory =
new ComputationWorkExecutorFactory(
Expand All @@ -137,7 +137,7 @@ public static StreamingWorkScheduler create(
sampler,
streamingCounters.pendingDeltaCounters(),
idGenerator,
configManager);
globalConfigHandle);

return new StreamingWorkScheduler(
options,
Expand All @@ -151,7 +151,7 @@ public static StreamingWorkScheduler create(
hotKeyLogger,
stageInfoMap,
sampler,
configManager);
globalConfigHandle);
}

private static long computeShuffleBytesRead(Windmill.WorkItem workItem) {
Expand Down Expand Up @@ -295,7 +295,7 @@ private Windmill.WorkItemCommitRequest validateCommitRequestSize(
Windmill.WorkItemCommitRequest commitRequest,
String computationId,
Windmill.WorkItem workItem) {
long byteLimit = configManager.getConfig().operationalLimits().getMaxWorkItemCommitBytes();
long byteLimit = globalConfigHandle.getConfig().operationalLimits().getMaxWorkItemCommitBytes();
int commitSize = commitRequest.getSerializedSize();
int estimatedCommitSize = commitSize < 0 ? Integer.MAX_VALUE : commitSize;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,8 @@ public Long get() {
@Rule public TestRule restoreMDC = new RestoreDataflowLoggingMDC();
@Rule public ErrorCollector errorCollector = new ErrorCollector();
WorkUnitClient mockWorkUnitClient = mock(WorkUnitClient.class);
StreamingGlobalConfigHandleImpl mockConfigManager = mock(StreamingGlobalConfigHandleImpl.class);
StreamingGlobalConfigHandleImpl mockglobalConfigHandle =
mock(StreamingGlobalConfigHandleImpl.class);
HotKeyLogger hotKeyLogger = mock(HotKeyLogger.class);

private @Nullable ComputationStateCache computationStateCache = null;
Expand Down Expand Up @@ -839,7 +840,7 @@ private DataflowWorkerHarnessOptions createTestingPipelineOptions(String... args

private StreamingDataflowWorker makeWorker(
StreamingDataflowWorkerTestParams streamingDataflowWorkerTestParams) {
when(mockConfigManager.getConfig())
when(mockglobalConfigHandle.getConfig())
.thenReturn(streamingDataflowWorkerTestParams.streamingGlobalConfig());
StreamingDataflowWorker worker =
StreamingDataflowWorker.forTesting(
Expand All @@ -854,7 +855,7 @@ private StreamingDataflowWorker makeWorker(
hotKeyLogger,
streamingDataflowWorkerTestParams.clock(),
streamingDataflowWorkerTestParams.executorSupplier(),
mockConfigManager,
mockglobalConfigHandle,
streamingDataflowWorkerTestParams.localRetryTimeoutMs());
this.computationStateCache = worker.getComputationStateCache();
return worker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void setUp() {
options = PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class);
CounterSet counterSet = new CounterSet();
ConcurrentHashMap<String, String> stateNameMap = new ConcurrentHashMap<>();
StreamingGlobalConfigHandle configManager =
StreamingGlobalConfigHandle globalConfigHandle =
new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build());
stateNameMap.put(NameContextsForTests.nameContextForTest().userName(), "testStateFamily");
executionContext =
Expand All @@ -132,7 +132,7 @@ public void setUp() {
PipelineOptionsFactory.create(),
"test-work-item-id"),
executionStateRegistry,
configManager,
globalConfigHandle,
Long.MAX_VALUE,
/*throwExceptionOnLargeOutput=*/ false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ public void testReadUnboundedReader() throws Exception {
StreamingModeExecutionStateRegistry executionStateRegistry =
new StreamingModeExecutionStateRegistry();
ReaderCache readerCache = new ReaderCache(Duration.standardMinutes(1), Runnable::run);
StreamingGlobalConfigHandle configManager =
StreamingGlobalConfigHandle globalConfigHandle =
new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build());
StreamingModeExecutionContext context =
new StreamingModeExecutionContext(
Expand All @@ -615,7 +615,7 @@ public void testReadUnboundedReader() throws Exception {
PipelineOptionsFactory.create(),
"test-work-item-id"),
executionStateRegistry,
configManager,
globalConfigHandle,
Long.MAX_VALUE,
/*throwExceptionOnLargeOutput=*/ false);

Expand Down Expand Up @@ -965,7 +965,7 @@ public void testFailedWorkItemsAbort() throws Exception {
CounterSet counterSet = new CounterSet();
StreamingModeExecutionStateRegistry executionStateRegistry =
new StreamingModeExecutionStateRegistry();
StreamingGlobalConfigHandle configManager =
StreamingGlobalConfigHandle globalConfigHandle =
new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build());
StreamingModeExecutionContext context =
new StreamingModeExecutionContext(
Expand All @@ -986,7 +986,7 @@ public void testFailedWorkItemsAbort() throws Exception {
PipelineOptionsFactory.create(),
"test-work-item-id"),
executionStateRegistry,
configManager,
globalConfigHandle,
Long.MAX_VALUE,
/*throwExceptionOnLargeOutput=*/ false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ public void getConfig() {
.setUseSeparateWindmillHeartbeatStreams(false)
.build())
.build();
FixedGlobalConfigHandle configManager = new FixedGlobalConfigHandle(config);
assertEquals(config, configManager.getConfig());
FixedGlobalConfigHandle globalConfigHandle = new FixedGlobalConfigHandle(config);
assertEquals(config, globalConfigHandle.getConfig());
}

@Test
public void onConfig() throws InterruptedException {
public void registerConfigObserver() throws InterruptedException {
StreamingGlobalConfig config =
StreamingGlobalConfig.builder()
.setOperationalLimits(
Expand All @@ -70,15 +70,15 @@ public void onConfig() throws InterruptedException {
.setUseSeparateWindmillHeartbeatStreams(false)
.build())
.build();
FixedGlobalConfigHandle configManager = new FixedGlobalConfigHandle(config);
FixedGlobalConfigHandle globalConfigHandle = new FixedGlobalConfigHandle(config);
AtomicReference<StreamingGlobalConfig> configFromCallback = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
configManager.onConfig(
globalConfigHandle.registerConfigObserver(
cbConfig -> {
configFromCallback.set(cbConfig);
latch.countDown();
});
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertEquals(config, configManager.getConfig());
assertEquals(configFromCallback.get(), globalConfigHandle.getConfig());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void testStart_requiresInitialConfig() throws IOException, InterruptedExc
when(mockDataflowServiceClient.getGlobalStreamingConfigWorkItem())
.thenReturn(Optional.of(initialConfig));
StreamingGlobalConfigHandleImpl globalConfigHandle = new StreamingGlobalConfigHandleImpl();
globalConfigHandle.onConfig(
globalConfigHandle.registerConfigObserver(
config -> {
try {
receivedPipelineConfig.add(config);
Expand Down Expand Up @@ -131,7 +131,7 @@ public void testStart_startsPeriodicConfigRequests() throws IOException, Interru
// StreamingConfigTask.
.thenReturn(Optional.of(new WorkItem().setJobId("jobId")));
StreamingGlobalConfigHandleImpl globalConfigHandle = new StreamingGlobalConfigHandleImpl();
globalConfigHandle.onConfig(
globalConfigHandle.registerConfigObserver(
config -> {
receivedPipelineConfig.add(config);
numExpectedRefreshes.countDown();
Expand Down Expand Up @@ -208,7 +208,7 @@ public void testGetComputationConfig() throws IOException {
public void testGetComputationConfig_noComputationPresent() throws IOException {
Set<StreamingGlobalConfig> receivedPipelineConfig = new HashSet<>();
StreamingGlobalConfigHandleImpl globalConfigHandle = new StreamingGlobalConfigHandleImpl();
globalConfigHandle.onConfig(receivedPipelineConfig::add);
globalConfigHandle.registerConfigObserver(receivedPipelineConfig::add);
streamingEngineConfigFetcher =
createConfigFetcher(/* waitForInitialConfig= */ false, 0, globalConfigHandle);
when(mockDataflowServiceClient.getStreamingConfigWorkItem(anyString()))
Expand Down
Loading

0 comments on commit c713718

Please sign in to comment.