Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate direct path with StreamingDataflowWorker code path #32778

Merged
merged 6 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.Hidden;
import org.apache.beam.sdk.options.PipelineOptions;
import org.joda.time.Duration;
Expand Down Expand Up @@ -219,10 +220,8 @@ public interface DataflowStreamingPipelineOptions extends PipelineOptions {

void setWindmillServiceStreamMaxBackoffMillis(int value);

@Description(
"If true, Dataflow streaming pipeline will be running in direct path mode."
+ " VMs must have IPv6 enabled for this to work.")
@Default.Boolean(false)
@Description("Enables direct path mode for streaming engine.")
@Default.InstanceFactory(EnableWindmillServiceDirectPathFactory.class)
boolean getIsWindmillServiceDirectPathEnabled();

void setIsWindmillServiceDirectPathEnabled(boolean isWindmillServiceDirectPathEnabled);
Expand Down Expand Up @@ -300,4 +299,12 @@ public Integer create(PipelineOptions options) {
return streamingOptions.isEnableStreamingEngine() ? Integer.MAX_VALUE : 1;
}
}

/** EnableStreamingEngine defaults to false unless one of the experiment is set. */
class EnableWindmillServiceDirectPathFactory implements DefaultValueFactory<Boolean> {
@Override
public Boolean create(PipelineOptions options) {
return ExperimentalOptions.hasExperiment(options, "enable_windmill_service_direct_path");
}
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ private void closeStreamSender(Endpoint endpoint, Closeable sender) {
}

/** Add up all the throttle times of all streams including GetWorkerMetadataStream. */
@Override
public long getAndResetThrottleTime() {
return backends.get().windmillStreams().values().stream()
.map(WindmillStreamSender::getAndResetThrottleTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness {
private final Function<String, Optional<ComputationState>> computationStateFetcher;
private final ExecutorService workProviderExecutor;
private final GetWorkSender getWorkSender;
private final Supplier<Long> throttleTimeSupplier;

SingleSourceWorkerHarness(
WorkCommitter workCommitter,
Expand All @@ -74,7 +75,8 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness {
StreamingWorkScheduler streamingWorkScheduler,
Runnable waitForResources,
Function<String, Optional<ComputationState>> computationStateFetcher,
GetWorkSender getWorkSender) {
GetWorkSender getWorkSender,
Supplier<Long> throttleTimeSupplier) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make a functional interface instead of a supplier? It's not clear that the supplier should get+reset, generally Supplier seems like it would just get.

Seems like this could then take supplier to vend but also could implement the same interface

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

this.workCommitter = workCommitter;
this.getDataClient = getDataClient;
this.heartbeatSender = heartbeatSender;
Expand All @@ -90,6 +92,7 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness {
.build());
this.isRunning = new AtomicBoolean(false);
this.getWorkSender = getWorkSender;
this.throttleTimeSupplier = throttleTimeSupplier;
}

public static SingleSourceWorkerHarness.Builder builder() {
Expand Down Expand Up @@ -144,6 +147,11 @@ public void shutdown() {
workCommitter.stop();
}

@Override
public long getAndResetThrottleTime() {
return throttleTimeSupplier.get();
}

private void streamingEngineDispatchLoop(
Function<WorkItemReceiver, WindmillStream.GetWorkStream> getWorkStreamFactory) {
while (isRunning.get()) {
Expand Down Expand Up @@ -254,6 +262,8 @@ Builder setComputationStateFetcher(

Builder setGetWorkSender(GetWorkSender getWorkSender);

Builder setThrottleTimeSupplier(Supplier<Long> throttleTimeSupplier);

SingleSourceWorkerHarness build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ public interface StreamingWorkerHarness {
void start();

void shutdown();

long getAndResetThrottleTime();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto could make StreamingWorkerHarness implement the ThrottlingTimerInterface (or whatever you name it).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ public interface Builder {

Builder setDebugCapture(DebugCapture.Manager debugCapture);

Builder setChannelzServlet(ChannelzServlet channelzServlet);
Builder setChannelzServlet(@Nullable ChannelzServlet channelzServlet);

Builder setStateCache(WindmillStateCache stateCache);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.api.services.dataflow.model.WorkItemStatus;
import com.google.api.services.dataflow.model.WorkerMessage;
import com.google.api.services.dataflow.model.WorkerMessageResponse;
import com.google.auto.value.AutoBuilder;
import java.io.IOException;
import java.math.RoundingMode;
import java.util.ArrayList;
Expand Down Expand Up @@ -97,7 +98,7 @@ public final class StreamingWorkerStatusReporter {
// Used to track the number of WorkerMessages that have been sent without PerWorkerMetrics.
private final AtomicLong workerMessagesIndex;

private StreamingWorkerStatusReporter(
StreamingWorkerStatusReporter(
boolean publishCounters,
WorkUnitClient dataflowServiceClient,
Supplier<Long> windmillQuotaThrottleTime,
Expand Down Expand Up @@ -131,57 +132,13 @@ private StreamingWorkerStatusReporter(
this.workerMessagesIndex = new AtomicLong();
}

public static StreamingWorkerStatusReporter create(
WorkUnitClient workUnitClient,
Supplier<Long> windmillQuotaThrottleTime,
Supplier<Collection<StageInfo>> allStageInfo,
FailureTracker failureTracker,
StreamingCounters streamingCounters,
MemoryMonitor memoryMonitor,
BoundedQueueExecutor workExecutor,
long windmillHarnessUpdateReportingPeriodMillis,
long perWorkerMetricsUpdateReportingPeriodMillis) {
return new StreamingWorkerStatusReporter(
/* publishCounters= */ true,
workUnitClient,
windmillQuotaThrottleTime,
allStageInfo,
failureTracker,
streamingCounters,
memoryMonitor,
workExecutor,
threadName ->
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(threadName).build()),
windmillHarnessUpdateReportingPeriodMillis,
perWorkerMetricsUpdateReportingPeriodMillis);
}

@VisibleForTesting
public static StreamingWorkerStatusReporter forTesting(
boolean publishCounters,
WorkUnitClient workUnitClient,
Supplier<Long> windmillQuotaThrottleTime,
Supplier<Collection<StageInfo>> allStageInfo,
FailureTracker failureTracker,
StreamingCounters streamingCounters,
MemoryMonitor memoryMonitor,
BoundedQueueExecutor workExecutor,
Function<String, ScheduledExecutorService> executorFactory,
long windmillHarnessUpdateReportingPeriodMillis,
long perWorkerMetricsUpdateReportingPeriodMillis) {
return new StreamingWorkerStatusReporter(
publishCounters,
workUnitClient,
windmillQuotaThrottleTime,
allStageInfo,
failureTracker,
streamingCounters,
memoryMonitor,
workExecutor,
executorFactory,
windmillHarnessUpdateReportingPeriodMillis,
perWorkerMetricsUpdateReportingPeriodMillis);
public static Builder builder() {
return new AutoBuilder_StreamingWorkerStatusReporter_Builder()
.setPublishCounters(true)
.setExecutorFactory(
threadName ->
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(threadName).build()));
}

/**
Expand Down Expand Up @@ -228,6 +185,22 @@ private static void shutdownExecutor(ScheduledExecutorService executor) {
}
}

// Calculates the PerWorkerMetrics reporting frequency, ensuring alignment with the
// WorkerMessages RPC schedule. The desired reporting period
// (perWorkerMetricsUpdateReportingPeriodMillis) is adjusted to the nearest multiple
// of the RPC interval (windmillHarnessUpdateReportingPeriodMillis).
private static long getPerWorkerMetricsUpdateFrequency(
long windmillHarnessUpdateReportingPeriodMillis,
long perWorkerMetricsUpdateReportingPeriodMillis) {
if (windmillHarnessUpdateReportingPeriodMillis == 0) {
return 0;
}
return LongMath.divide(
perWorkerMetricsUpdateReportingPeriodMillis,
windmillHarnessUpdateReportingPeriodMillis,
RoundingMode.CEILING);
}

@SuppressWarnings("FutureReturnValueIgnored")
public void start() {
reportHarnessStartup();
Expand Down Expand Up @@ -276,22 +249,6 @@ private void reportHarnessStartup() {
}
}

// Calculates the PerWorkerMetrics reporting frequency, ensuring alignment with the
// WorkerMessages RPC schedule. The desired reporting period
// (perWorkerMetricsUpdateReportingPeriodMillis) is adjusted to the nearest multiple
// of the RPC interval (windmillHarnessUpdateReportingPeriodMillis).
private static long getPerWorkerMetricsUpdateFrequency(
long windmillHarnessUpdateReportingPeriodMillis,
long perWorkerMetricsUpdateReportingPeriodMillis) {
if (windmillHarnessUpdateReportingPeriodMillis == 0) {
return 0;
}
return LongMath.divide(
perWorkerMetricsUpdateReportingPeriodMillis,
windmillHarnessUpdateReportingPeriodMillis,
RoundingMode.CEILING);
}

/** Sends counter updates to Dataflow backend. */
private void sendWorkerUpdatesToDataflowService(
CounterSet deltaCounters, CounterSet cumulativeCounters) throws IOException {
Expand Down Expand Up @@ -496,4 +453,33 @@ private void updateThreadMetrics() {
.maxOutstandingBundles()
.addValue((long) workExecutor.maximumElementsOutstanding());
}

@AutoBuilder
public interface Builder {
Builder setPublishCounters(boolean publishCounters);

Builder setDataflowServiceClient(WorkUnitClient dataflowServiceClient);

Builder setWindmillQuotaThrottleTime(Supplier<Long> windmillQuotaThrottleTime);

Builder setAllStageInfo(Supplier<Collection<StageInfo>> allStageInfo);

Builder setFailureTracker(FailureTracker failureTracker);

Builder setStreamingCounters(StreamingCounters streamingCounters);

Builder setMemoryMonitor(MemoryMonitor memoryMonitor);

Builder setWorkExecutor(BoundedQueueExecutor workExecutor);

Builder setExecutorFactory(Function<String, ScheduledExecutorService> executorFactory);

Builder setWindmillHarnessUpdateReportingPeriodMillis(
long windmillHarnessUpdateReportingPeriodMillis);

Builder setPerWorkerMetricsUpdateReportingPeriodMillis(
long perWorkerMetricsUpdateReportingPeriodMillis);

StreamingWorkerStatusReporter build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ private StreamPoolHeartbeatSender(
this.heartbeatStreamPool.set(heartbeatStreamPool);
}

public static StreamPoolHeartbeatSender Create(
public static StreamPoolHeartbeatSender create(
@Nonnull WindmillStreamPool<WindmillStream.GetDataStream> heartbeatStreamPool) {
return new StreamPoolHeartbeatSender(heartbeatStreamPool);
}
Expand All @@ -55,7 +55,7 @@ public static StreamPoolHeartbeatSender Create(
* enabled.
* @param getDataPool stream to use when using separate streams for heartbeat is disabled.
*/
public static StreamPoolHeartbeatSender Create(
public static StreamPoolHeartbeatSender create(
@Nonnull WindmillStreamPool<WindmillStream.GetDataStream> dedicatedHeartbeatPool,
@Nonnull WindmillStreamPool<WindmillStream.GetDataStream> getDataPool,
@Nonnull StreamingGlobalConfigHandle configHandle) {
Expand Down
Loading
Loading