Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
arunpandianp committed Sep 20, 2024
1 parent fb080ea commit 92dc8ff
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -256,26 +256,21 @@ private StreamingDataflowWorker(
windmillServer::getDataStream);
getDataClient = new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool);
// Experiment gates the logic till backend changes are rollback safe
if (DataflowRunner.hasExperiment(
options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL)) {
heartbeatSender =
// If the setting is explicitly passed in via PipelineOptions use it,
// else rely on the global config
options.getUseSeparateWindmillHeartbeatStreams() != null
? StreamPoolHeartbeatSender.Create(
options.getUseSeparateWindmillHeartbeatStreams()
? separateHeartbeatPool(windmillServer)
: getDataStreamPool)
: StreamPoolHeartbeatSender.Create(
separateHeartbeatPool(windmillServer),
getDataStreamPool,
configFetcher.getGlobalConfigHandle());
} else {
if (!DataflowRunner.hasExperiment(
options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL)
|| options.getUseSeparateWindmillHeartbeatStreams() != null) {
heartbeatSender =
StreamPoolHeartbeatSender.Create(
Boolean.TRUE.equals(options.getUseSeparateWindmillHeartbeatStreams())
? separateHeartbeatPool(windmillServer)
: getDataStreamPool);

} else {
heartbeatSender =
StreamPoolHeartbeatSender.Create(
separateHeartbeatPool(windmillServer),
getDataStreamPool,
configFetcher.getGlobalConfigHandle());
}

stuckCommitDurationMillis =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,23 +51,24 @@ public static StreamPoolHeartbeatSender Create(
* Creates StreamPoolHeartbeatSender that switches between the passed in stream pools depending on
* global config.
*
* @param heartbeatStreamPool stream to use when using separate streams for heartbeat is enabled.
* @param dedicatedHeartbeatPool stream to use when using separate streams for heartbeat is
* enabled.
* @param getDataPool stream to use when using separate streams for heartbeat is disabled.
*/
public static StreamPoolHeartbeatSender Create(
@Nonnull WindmillStreamPool<WindmillStream.GetDataStream> heartbeatStreamPool,
@Nonnull WindmillStreamPool<WindmillStream.GetDataStream> dedicatedHeartbeatPool,
@Nonnull WindmillStreamPool<WindmillStream.GetDataStream> getDataPool,
@Nonnull StreamingGlobalConfigHandle configHandle) {
// Use getDataPool as the default, settings callback will
// switch to the separate pool if enabled before processing any elements are processed.
StreamPoolHeartbeatSender heartbeatSender = new StreamPoolHeartbeatSender(heartbeatStreamPool);
StreamPoolHeartbeatSender heartbeatSender = new StreamPoolHeartbeatSender(getDataPool);
configHandle.registerConfigObserver(
streamingGlobalConfig ->
heartbeatSender.heartbeatStreamPool.set(
streamingGlobalConfig
.userWorkerJobSettings()
.getUseSeparateWindmillHeartbeatStreams()
? heartbeatStreamPool
? dedicatedHeartbeatPool
: getDataPool));
return heartbeatSender;
}
Expand Down

0 comments on commit 92dc8ff

Please sign in to comment.