Skip to content

Commit

Permalink
fix class hierarchy
Browse files Browse the repository at this point in the history
  • Loading branch information
arunpandianp committed Sep 9, 2024
1 parent e7b705d commit a8bb92f
Show file tree
Hide file tree
Showing 20 changed files with 211 additions and 180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,11 @@
import org.apache.beam.runners.dataflow.worker.streaming.StageInfo;
import org.apache.beam.runners.dataflow.worker.streaming.WorkHeartbeatResponseProcessor;
import org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig;
import org.apache.beam.runners.dataflow.worker.streaming.config.FixedPipelineConfigManagerImpl;
import org.apache.beam.runners.dataflow.worker.streaming.config.FixedGlobalConfigHandle;
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingApplianceComputationConfigFetcher;
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEngineComputationConfigFetcher;
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEnginePipelineConfig;
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEnginePipelineConfigManager;
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEnginePipelineConfigManagerImpl;
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandleImpl;
import org.apache.beam.runners.dataflow.worker.streaming.harness.SingleSourceWorkerHarness;
import org.apache.beam.runners.dataflow.worker.streaming.harness.SingleSourceWorkerHarness.GetWorkSender;
import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingCounters;
Expand Down Expand Up @@ -180,7 +179,6 @@ private StreamingDataflowWorker(
WorkFailureProcessor workFailureProcessor,
StreamingCounters streamingCounters,
MemoryMonitor memoryMonitor,
StreamingEnginePipelineConfigManager configManager,
GrpcWindmillStreamFactory windmillStreamFactory,
Function<String, ScheduledExecutorService> executorSupplier,
ConcurrentMap<String, StageInfo> stageInfoMap) {
Expand Down Expand Up @@ -237,7 +235,7 @@ private StreamingDataflowWorker(
hotKeyLogger,
sampler,
ID_GENERATOR,
configManager,
configFetcher.getGlobalConfigHandle(),
stageInfoMap);

ThrottlingGetDataMetricTracker getDataMetricTracker =
Expand Down Expand Up @@ -297,7 +295,7 @@ private StreamingDataflowWorker(
.setCurrentActiveCommitBytes(workCommitter::currentActiveCommitBytes)
.setGetDataStatusProvider(getDataClient::printHtml)
.setWorkUnitExecutor(workUnitExecutor)
.setConfigManager(configManager)
.setglobalConfigHandle(configFetcher.getGlobalConfigHandle())
.build();

Windmill.GetWorkRequest request =
Expand Down Expand Up @@ -346,20 +344,13 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
new ThreadFactoryBuilder().setNameFormat(threadName).build());
GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder =
createGrpcwindmillStreamFactoryBuilder(options, clientId);
StreamingEnginePipelineConfigManager configManager =
options.isEnableStreamingEngine()
? new StreamingEnginePipelineConfigManagerImpl()
: new FixedPipelineConfigManagerImpl(
StreamingEnginePipelineConfig.builder()
.build()); // appliance is initialized with default settings

ConfigFetcherComputationStateCacheAndWindmillClient
configFetcherComputationStateCacheAndWindmillClient =
createConfigFetcherComputationStateCacheAndWindmillClient(
options,
dataflowServiceClient,
windmillStreamFactoryBuilder,
configManager,
configFetcher ->
ComputationStateCache.create(
configFetcher,
Expand Down Expand Up @@ -416,7 +407,6 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
workFailureProcessor,
streamingCounters,
memoryMonitor,
configManager,
configFetcherComputationStateCacheAndWindmillClient.windmillStreamFactory(),
executorSupplier,
stageInfo);
Expand All @@ -433,7 +423,6 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
DataflowWorkerHarnessOptions options,
WorkUnitClient dataflowServiceClient,
GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder,
StreamingEnginePipelineConfigManager configManager,
Function<ComputationConfig.Fetcher, ComputationStateCache> computationStateCacheFactory) {
ComputationConfig.Fetcher configFetcher;
WindmillServerStub windmillServer;
Expand All @@ -442,12 +431,10 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
if (options.isEnableStreamingEngine()) {
GrpcDispatcherClient dispatcherClient =
GrpcDispatcherClient.create(createStubFactory(options));
configManager.onConfig(dispatcherClient::onJobConfig);
configFetcher =
StreamingEngineComputationConfigFetcher.create(
options.getGlobalConfigRefreshPeriod().getMillis(),
dataflowServiceClient,
configManager);
options.getGlobalConfigRefreshPeriod().getMillis(), dataflowServiceClient);
configFetcher.getGlobalConfigHandle().onConfig(dispatcherClient::onJobConfig);
computationStateCache = computationStateCacheFactory.apply(configFetcher);
windmillStreamFactory =
windmillStreamFactoryBuilder
Expand Down Expand Up @@ -475,7 +462,10 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
windmillServer = new JniWindmillApplianceServer(options.getLocalWindmillHostport());
}

configFetcher = new StreamingApplianceComputationConfigFetcher(windmillServer::getConfig);
configFetcher =
new StreamingApplianceComputationConfigFetcher(
windmillServer::getConfig,
new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build()));
computationStateCache = computationStateCacheFactory.apply(configFetcher);
}

Expand All @@ -495,7 +485,7 @@ static StreamingDataflowWorker forTesting(
HotKeyLogger hotKeyLogger,
Supplier<Instant> clock,
Function<String, ScheduledExecutorService> executorSupplier,
StreamingEnginePipelineConfigManager configManager,
StreamingGlobalConfigHandleImpl globalConfigHandle,
int localRetryTimeoutMs) {
ConcurrentMap<String, StageInfo> stageInfo = new ConcurrentHashMap<>();
BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options);
Expand All @@ -504,19 +494,20 @@ static StreamingDataflowWorker forTesting(
.setSizeMb(options.getWorkerCacheMb())
.setSupportMapViaMultimap(options.isEnableStreamingEngine())
.build();
StreamingEnginePipelineConfig config = configManager.getConfig();
if (!config.windmillServiceEndpoints().isEmpty()) {
windmillServer.setWindmillServiceEndpoints(config.windmillServiceEndpoints());
}
ComputationConfig.Fetcher configFetcher =
options.isEnableStreamingEngine()
? StreamingEngineComputationConfigFetcher.forTesting(
/* hasReceivedGlobalConfig= */ true,
options.getGlobalConfigRefreshPeriod().getMillis(),
workUnitClient,
executorSupplier,
configManager)
: new StreamingApplianceComputationConfigFetcher(windmillServer::getConfig);
globalConfigHandle,
executorSupplier)
: new StreamingApplianceComputationConfigFetcher(
windmillServer::getConfig, globalConfigHandle);
StreamingGlobalConfig config = configFetcher.getGlobalConfigHandle().getConfig();
if (!config.windmillServiceEndpoints().isEmpty()) {
windmillServer.setWindmillServiceEndpoints(config.windmillServiceEndpoints());
}
ConcurrentMap<String, String> stateNameMap =
new ConcurrentHashMap<>(prePopulatedStateNameMappings);
ComputationStateCache computationStateCache =
Expand Down Expand Up @@ -583,7 +574,6 @@ static StreamingDataflowWorker forTesting(
workFailureProcessor,
streamingCounters,
memoryMonitor,
configManager,
options.isEnableStreamingEngine()
? windmillStreamFactory
.setHealthCheckIntervalMillis(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope;
import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
import org.apache.beam.runners.dataflow.worker.streaming.Work;
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEnginePipelineConfigManager;
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle;
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInput;
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputState;
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
Expand Down Expand Up @@ -108,7 +108,7 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext<Step
private final ImmutableMap<String, String> stateNameMap;
private final WindmillStateCache.ForComputation stateCache;
private final ReaderCache readerCache;
private final StreamingEnginePipelineConfigManager configManager;
private final StreamingGlobalConfigHandle configManager;
private final boolean throwExceptionOnLargeOutput;
private volatile long backlogBytes;

Expand Down Expand Up @@ -155,7 +155,7 @@ public StreamingModeExecutionContext(
MetricsContainerRegistry<StreamingStepMetricsContainer> metricsContainerRegistry,
DataflowExecutionStateTracker executionStateTracker,
StreamingModeExecutionStateRegistry executionStateRegistry,
StreamingEnginePipelineConfigManager configManager,
StreamingGlobalConfigHandle configManager,
long sinkByteLimit,
boolean throwExceptionOnLargeOutput) {
super(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ public static ComputationConfig create(
public abstract ImmutableMap<String, String> stateNameMap();

/** Interface to fetch configurations for a specific computation. */
@FunctionalInterface
public interface Fetcher {
default void start() {}

default void stop() {}

Optional<ComputationConfig> fetchConfig(String computationId);

StreamingGlobalConfigHandle getGlobalConfigHandle();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,21 @@
* StreamingEnginePipelineConfigManager returning a fixed config
* initialized during construction. Used for Appliance and Tests.
*/
public class FixedPipelineConfigManagerImpl implements StreamingEnginePipelineConfigManager {
public class FixedGlobalConfigHandle implements StreamingGlobalConfigHandle {

private final StreamingEnginePipelineConfig config;
private final StreamingGlobalConfig config;

public FixedPipelineConfigManagerImpl(StreamingEnginePipelineConfig config) {
public FixedGlobalConfigHandle(StreamingGlobalConfig config) {
this.config = config;
}

public StreamingEnginePipelineConfig getConfig() {
@Override
public StreamingGlobalConfig getConfig() {
return config;
}

public void onConfig(@Nonnull Consumer<StreamingEnginePipelineConfig> callback) {}
@Override
public void onConfig(@Nonnull Consumer<StreamingGlobalConfig> callback) {
callback.accept(config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,14 @@ public final class StreamingApplianceComputationConfigFetcher implements Computa

private final ApplianceComputationConfigFetcher applianceComputationConfigFetcher;
private final ConcurrentHashMap<String, String> systemNameToComputationIdMap;
private final StreamingGlobalConfigHandle globalConfigHandle;

public StreamingApplianceComputationConfigFetcher(
ApplianceComputationConfigFetcher applianceComputationConfigFetcher) {
ApplianceComputationConfigFetcher applianceComputationConfigFetcher,
StreamingGlobalConfigHandle globalConfigHandle) {
this.applianceComputationConfigFetcher = applianceComputationConfigFetcher;
this.systemNameToComputationIdMap = new ConcurrentHashMap<>();
this.globalConfigHandle = globalConfigHandle;
}

/** Returns a {@code Table<ComputationId, TransformUserName, StateFamilyName>} */
Expand Down Expand Up @@ -112,6 +115,11 @@ public Optional<ComputationConfig> fetchConfig(String computationId) {
.collect(toImmutableMap(NameMapEntry::getUserName, NameMapEntry::getSystemName)));
}

@Override
public StreamingGlobalConfigHandle getGlobalConfigHandle() {
return globalConfigHandle;
}

private Optional<ComputationConfig> createComputationConfig(
String serializedMapTask,
Table<String, String, String> transformUserNameToStateFamilyByComputationId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,48 +74,46 @@ public final class StreamingEngineComputationConfigFetcher implements Computatio
private final long globalConfigRefreshPeriodMillis;
private final WorkUnitClient dataflowServiceClient;
private final ScheduledExecutorService globalConfigRefresher;
private final StreamingEnginePipelineConfigManager configManager;
private final StreamingGlobalConfigHandleImpl globalConfigHandle;
private final AtomicBoolean hasReceivedGlobalConfig;

private StreamingEngineComputationConfigFetcher(
boolean hasReceivedGlobalConfig,
long globalConfigRefreshPeriodMillis,
WorkUnitClient dataflowServiceClient,
ScheduledExecutorService globalConfigRefresher,
StreamingEnginePipelineConfigManager configManager) {
StreamingGlobalConfigHandleImpl globalConfigHandle,
ScheduledExecutorService globalConfigRefresher) {
this.globalConfigRefreshPeriodMillis = globalConfigRefreshPeriodMillis;
this.dataflowServiceClient = dataflowServiceClient;
this.globalConfigRefresher = globalConfigRefresher;
this.configManager = configManager;
this.globalConfigHandle = globalConfigHandle;
this.hasReceivedGlobalConfig = new AtomicBoolean(hasReceivedGlobalConfig);
}

public static StreamingEngineComputationConfigFetcher create(
long globalConfigRefreshPeriodMillis,
WorkUnitClient dataflowServiceClient,
StreamingEnginePipelineConfigManager configManager) {
long globalConfigRefreshPeriodMillis, WorkUnitClient dataflowServiceClient) {
return new StreamingEngineComputationConfigFetcher(
/* hasReceivedGlobalConfig= */ false,
globalConfigRefreshPeriodMillis,
dataflowServiceClient,
new StreamingGlobalConfigHandleImpl(),
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(CONFIG_REFRESHER_THREAD_NAME).build()),
configManager);
new ThreadFactoryBuilder().setNameFormat(CONFIG_REFRESHER_THREAD_NAME).build()));
}

@VisibleForTesting
public static StreamingEngineComputationConfigFetcher forTesting(
boolean hasReceivedGlobalConfig,
long globalConfigRefreshPeriodMillis,
WorkUnitClient dataflowServiceClient,
Function<String, ScheduledExecutorService> executorSupplier,
StreamingEnginePipelineConfigManager configManager) {
StreamingGlobalConfigHandleImpl globalConfigHandle,
Function<String, ScheduledExecutorService> executorSupplier) {
return new StreamingEngineComputationConfigFetcher(
hasReceivedGlobalConfig,
globalConfigRefreshPeriodMillis,
dataflowServiceClient,
executorSupplier.apply(CONFIG_REFRESHER_THREAD_NAME),
configManager);
globalConfigHandle,
executorSupplier.apply(CONFIG_REFRESHER_THREAD_NAME));
}

@VisibleForTesting
Expand Down Expand Up @@ -159,8 +157,8 @@ private static Optional<StreamingConfigTask> fetchConfigWithRetry(
}
}

private StreamingEnginePipelineConfig createPipelineConfig(StreamingConfigTask config) {
StreamingEnginePipelineConfig.Builder pipelineConfig = StreamingEnginePipelineConfig.builder();
private StreamingGlobalConfig createPipelineConfig(StreamingConfigTask config) {
StreamingGlobalConfig.Builder pipelineConfig = StreamingGlobalConfig.builder();
OperationalLimits.Builder operationalLimits = OperationalLimits.builder();

if (config.getWindmillServiceEndpoint() != null
Expand Down Expand Up @@ -246,6 +244,11 @@ public Optional<ComputationConfig> fetchConfig(String computationId) {
.flatMap(StreamingEngineComputationConfigFetcher::createComputationConfig);
}

@Override
public StreamingGlobalConfigHandle getGlobalConfigHandle() {
return globalConfigHandle;
}

@Override
public void stop() {
// We have already shutdown or start has not been called.
Expand All @@ -272,7 +275,7 @@ public void stop() {
@SuppressWarnings("FutureReturnValueIgnored")
private void schedulePeriodicGlobalConfigRequests() {
globalConfigRefresher.scheduleWithFixedDelay(
() -> fetchGlobalConfig().ifPresent(configManager::setConfig),
() -> fetchGlobalConfig().ifPresent(globalConfigHandle::setConfig),
0,
globalConfigRefreshPeriodMillis,
TimeUnit.MILLISECONDS);
Expand All @@ -285,9 +288,9 @@ private void schedulePeriodicGlobalConfigRequests() {
private synchronized void fetchInitialPipelineGlobalConfig() {
while (!hasReceivedGlobalConfig.get()) {
LOG.info("Sending request to get initial global configuration for this worker.");
Optional<StreamingEnginePipelineConfig> globalConfig = fetchGlobalConfig();
Optional<StreamingGlobalConfig> globalConfig = fetchGlobalConfig();
if (globalConfig.isPresent()) {
configManager.setConfig(globalConfig.get());
globalConfigHandle.setConfig(globalConfig.get());
hasReceivedGlobalConfig.set(true);
break;
}
Expand All @@ -298,7 +301,7 @@ private synchronized void fetchInitialPipelineGlobalConfig() {
LOG.info("Initial global configuration received, harness is now ready");
}

private Optional<StreamingEnginePipelineConfig> fetchGlobalConfig() {
private Optional<StreamingGlobalConfig> fetchGlobalConfig() {
return fetchConfigWithRetry(dataflowServiceClient::getGlobalStreamingConfigWorkItem)
.map(config -> createPipelineConfig(config));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
/** Global pipeline config for pipelines running in Streaming Engine mode. */
@AutoValue
@Internal
public abstract class StreamingEnginePipelineConfig {
public abstract class StreamingGlobalConfig {

public static StreamingEnginePipelineConfig.Builder builder() {
return new AutoValue_StreamingEnginePipelineConfig.Builder()
public static StreamingGlobalConfig.Builder builder() {
return new AutoValue_StreamingGlobalConfig.Builder()
.setWindmillServiceEndpoints(ImmutableSet.of())
.setUserWorkerJobSettings(UserWorkerRunnerV1Settings.newBuilder().build())
.setOperationalLimits(OperationalLimits.builder().build());
Expand All @@ -51,6 +51,6 @@ public abstract static class Builder {

public abstract Builder setUserWorkerJobSettings(UserWorkerRunnerV1Settings settings);

public abstract StreamingEnginePipelineConfig build();
public abstract StreamingGlobalConfig build();
}
}
Loading

0 comments on commit a8bb92f

Please sign in to comment.