Skip to content

Commit

Permalink
Integrate direct path with StreamingDataflowWorker code path (#32778)
Browse files Browse the repository at this point in the history
  • Loading branch information
m-trieu authored Nov 26, 2024
1 parent ad8545c commit aa21e4a
Show file tree
Hide file tree
Showing 14 changed files with 561 additions and 340 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.Hidden;
import org.apache.beam.sdk.options.PipelineOptions;
import org.joda.time.Duration;
Expand Down Expand Up @@ -219,10 +220,8 @@ public interface DataflowStreamingPipelineOptions extends PipelineOptions {

void setWindmillServiceStreamMaxBackoffMillis(int value);

@Description(
"If true, Dataflow streaming pipeline will be running in direct path mode."
+ " VMs must have IPv6 enabled for this to work.")
@Default.Boolean(false)
@Description("Enables direct path mode for streaming engine.")
@Default.InstanceFactory(EnableWindmillServiceDirectPathFactory.class)
boolean getIsWindmillServiceDirectPathEnabled();

void setIsWindmillServiceDirectPathEnabled(boolean isWindmillServiceDirectPathEnabled);
Expand Down Expand Up @@ -300,4 +299,12 @@ public Integer create(PipelineOptions options) {
return streamingOptions.isEnableStreamingEngine() ? Integer.MAX_VALUE : 1;
}
}

/** EnableStreamingEngine defaults to false unless one of the experiment is set. */
class EnableWindmillServiceDirectPathFactory implements DefaultValueFactory<Boolean> {
@Override
public Boolean create(PipelineOptions options) {
return ExperimentalOptions.hasExperiment(options, "enable_windmill_service_direct_path");
}
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ private void closeStreamSender(Endpoint endpoint, StreamSender sender) {
}

/** Add up all the throttle times of all streams including GetWorkerMetadataStream. */
@Override
public long getAndResetThrottleTime() {
return backends.get().windmillStreams().values().stream()
.map(WindmillStreamSender::getAndResetThrottleTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream;
import org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter;
import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.GetDataClient;
import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottledTimeTracker;
import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
import org.apache.beam.runners.dataflow.worker.windmill.work.processing.StreamingWorkScheduler;
import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender;
Expand Down Expand Up @@ -66,6 +67,7 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness {
private final Function<String, Optional<ComputationState>> computationStateFetcher;
private final ExecutorService workProviderExecutor;
private final GetWorkSender getWorkSender;
private final ThrottledTimeTracker throttledTimeTracker;

SingleSourceWorkerHarness(
WorkCommitter workCommitter,
Expand All @@ -74,7 +76,8 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness {
StreamingWorkScheduler streamingWorkScheduler,
Runnable waitForResources,
Function<String, Optional<ComputationState>> computationStateFetcher,
GetWorkSender getWorkSender) {
GetWorkSender getWorkSender,
ThrottledTimeTracker throttledTimeTracker) {
this.workCommitter = workCommitter;
this.getDataClient = getDataClient;
this.heartbeatSender = heartbeatSender;
Expand All @@ -90,6 +93,7 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness {
.build());
this.isRunning = new AtomicBoolean(false);
this.getWorkSender = getWorkSender;
this.throttledTimeTracker = throttledTimeTracker;
}

public static SingleSourceWorkerHarness.Builder builder() {
Expand Down Expand Up @@ -144,6 +148,11 @@ public void shutdown() {
workCommitter.stop();
}

@Override
public long getAndResetThrottleTime() {
return throttledTimeTracker.getAndResetThrottleTime();
}

private void streamingEngineDispatchLoop(
Function<WorkItemReceiver, WindmillStream.GetWorkStream> getWorkStreamFactory) {
while (isRunning.get()) {
Expand Down Expand Up @@ -254,6 +263,8 @@ Builder setComputationStateFetcher(

Builder setGetWorkSender(GetWorkSender getWorkSender);

Builder setThrottledTimeTracker(ThrottledTimeTracker throttledTimeTracker);

SingleSourceWorkerHarness build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
*/
package org.apache.beam.runners.dataflow.worker.streaming.harness;

import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottledTimeTracker;
import org.apache.beam.sdk.annotations.Internal;

/** Provides an interface to start streaming worker processing. */
@Internal
public interface StreamingWorkerHarness {
public interface StreamingWorkerHarness extends ThrottledTimeTracker {
void start();

void shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ public interface Builder {

Builder setDebugCapture(DebugCapture.Manager debugCapture);

Builder setChannelzServlet(ChannelzServlet channelzServlet);
Builder setChannelzServlet(@Nullable ChannelzServlet channelzServlet);

Builder setStateCache(WindmillStateCache stateCache);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.api.services.dataflow.model.WorkItemStatus;
import com.google.api.services.dataflow.model.WorkerMessage;
import com.google.api.services.dataflow.model.WorkerMessageResponse;
import com.google.auto.value.AutoBuilder;
import java.io.IOException;
import java.math.RoundingMode;
import java.util.ArrayList;
Expand All @@ -51,6 +52,7 @@
import org.apache.beam.runners.dataflow.worker.streaming.StageInfo;
import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottledTimeTracker;
import org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.FailureTracker;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -78,7 +80,7 @@ public final class StreamingWorkerStatusReporter {
private final int initialMaxThreadCount;
private final int initialMaxBundlesOutstanding;
private final WorkUnitClient dataflowServiceClient;
private final Supplier<Long> windmillQuotaThrottleTime;
private final ThrottledTimeTracker windmillQuotaThrottleTime;
private final Supplier<Collection<StageInfo>> allStageInfo;
private final FailureTracker failureTracker;
private final StreamingCounters streamingCounters;
Expand All @@ -97,10 +99,10 @@ public final class StreamingWorkerStatusReporter {
// Used to track the number of WorkerMessages that have been sent without PerWorkerMetrics.
private final AtomicLong workerMessagesIndex;

private StreamingWorkerStatusReporter(
StreamingWorkerStatusReporter(
boolean publishCounters,
WorkUnitClient dataflowServiceClient,
Supplier<Long> windmillQuotaThrottleTime,
ThrottledTimeTracker windmillQuotaThrottleTime,
Supplier<Collection<StageInfo>> allStageInfo,
FailureTracker failureTracker,
StreamingCounters streamingCounters,
Expand Down Expand Up @@ -131,57 +133,13 @@ private StreamingWorkerStatusReporter(
this.workerMessagesIndex = new AtomicLong();
}

public static StreamingWorkerStatusReporter create(
WorkUnitClient workUnitClient,
Supplier<Long> windmillQuotaThrottleTime,
Supplier<Collection<StageInfo>> allStageInfo,
FailureTracker failureTracker,
StreamingCounters streamingCounters,
MemoryMonitor memoryMonitor,
BoundedQueueExecutor workExecutor,
long windmillHarnessUpdateReportingPeriodMillis,
long perWorkerMetricsUpdateReportingPeriodMillis) {
return new StreamingWorkerStatusReporter(
/* publishCounters= */ true,
workUnitClient,
windmillQuotaThrottleTime,
allStageInfo,
failureTracker,
streamingCounters,
memoryMonitor,
workExecutor,
threadName ->
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(threadName).build()),
windmillHarnessUpdateReportingPeriodMillis,
perWorkerMetricsUpdateReportingPeriodMillis);
}

@VisibleForTesting
public static StreamingWorkerStatusReporter forTesting(
boolean publishCounters,
WorkUnitClient workUnitClient,
Supplier<Long> windmillQuotaThrottleTime,
Supplier<Collection<StageInfo>> allStageInfo,
FailureTracker failureTracker,
StreamingCounters streamingCounters,
MemoryMonitor memoryMonitor,
BoundedQueueExecutor workExecutor,
Function<String, ScheduledExecutorService> executorFactory,
long windmillHarnessUpdateReportingPeriodMillis,
long perWorkerMetricsUpdateReportingPeriodMillis) {
return new StreamingWorkerStatusReporter(
publishCounters,
workUnitClient,
windmillQuotaThrottleTime,
allStageInfo,
failureTracker,
streamingCounters,
memoryMonitor,
workExecutor,
executorFactory,
windmillHarnessUpdateReportingPeriodMillis,
perWorkerMetricsUpdateReportingPeriodMillis);
public static Builder builder() {
return new AutoBuilder_StreamingWorkerStatusReporter_Builder()
.setPublishCounters(true)
.setExecutorFactory(
threadName ->
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(threadName).build()));
}

/**
Expand Down Expand Up @@ -228,6 +186,22 @@ private static void shutdownExecutor(ScheduledExecutorService executor) {
}
}

// Calculates the PerWorkerMetrics reporting frequency, ensuring alignment with the
// WorkerMessages RPC schedule. The desired reporting period
// (perWorkerMetricsUpdateReportingPeriodMillis) is adjusted to the nearest multiple
// of the RPC interval (windmillHarnessUpdateReportingPeriodMillis).
private static long getPerWorkerMetricsUpdateFrequency(
long windmillHarnessUpdateReportingPeriodMillis,
long perWorkerMetricsUpdateReportingPeriodMillis) {
if (windmillHarnessUpdateReportingPeriodMillis == 0) {
return 0;
}
return LongMath.divide(
perWorkerMetricsUpdateReportingPeriodMillis,
windmillHarnessUpdateReportingPeriodMillis,
RoundingMode.CEILING);
}

@SuppressWarnings("FutureReturnValueIgnored")
public void start() {
reportHarnessStartup();
Expand Down Expand Up @@ -276,27 +250,13 @@ private void reportHarnessStartup() {
}
}

// Calculates the PerWorkerMetrics reporting frequency, ensuring alignment with the
// WorkerMessages RPC schedule. The desired reporting period
// (perWorkerMetricsUpdateReportingPeriodMillis) is adjusted to the nearest multiple
// of the RPC interval (windmillHarnessUpdateReportingPeriodMillis).
private static long getPerWorkerMetricsUpdateFrequency(
long windmillHarnessUpdateReportingPeriodMillis,
long perWorkerMetricsUpdateReportingPeriodMillis) {
if (windmillHarnessUpdateReportingPeriodMillis == 0) {
return 0;
}
return LongMath.divide(
perWorkerMetricsUpdateReportingPeriodMillis,
windmillHarnessUpdateReportingPeriodMillis,
RoundingMode.CEILING);
}

/** Sends counter updates to Dataflow backend. */
private void sendWorkerUpdatesToDataflowService(
CounterSet deltaCounters, CounterSet cumulativeCounters) throws IOException {
// Throttle time is tracked by the windmillServer but is reported to DFE here.
streamingCounters.windmillQuotaThrottling().addValue(windmillQuotaThrottleTime.get());
streamingCounters
.windmillQuotaThrottling()
.addValue(windmillQuotaThrottleTime.getAndResetThrottleTime());
if (memoryMonitor.isThrashing()) {
streamingCounters.memoryThrashing().addValue(1);
}
Expand Down Expand Up @@ -496,4 +456,33 @@ private void updateThreadMetrics() {
.maxOutstandingBundles()
.addValue((long) workExecutor.maximumElementsOutstanding());
}

@AutoBuilder
public interface Builder {
Builder setPublishCounters(boolean publishCounters);

Builder setDataflowServiceClient(WorkUnitClient dataflowServiceClient);

Builder setWindmillQuotaThrottleTime(ThrottledTimeTracker windmillQuotaThrottledTimeTracker);

Builder setAllStageInfo(Supplier<Collection<StageInfo>> allStageInfo);

Builder setFailureTracker(FailureTracker failureTracker);

Builder setStreamingCounters(StreamingCounters streamingCounters);

Builder setMemoryMonitor(MemoryMonitor memoryMonitor);

Builder setWorkExecutor(BoundedQueueExecutor workExecutor);

Builder setExecutorFactory(Function<String, ScheduledExecutorService> executorFactory);

Builder setWindmillHarnessUpdateReportingPeriodMillis(
long windmillHarnessUpdateReportingPeriodMillis);

Builder setPerWorkerMetricsUpdateReportingPeriodMillis(
long perWorkerMetricsUpdateReportingPeriodMillis);

StreamingWorkerStatusReporter build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* CommitWork are both blocked for x, totalTime will be 2x. However, if 2 GetWork streams are both
* blocked for x totalTime will be x. All methods are thread safe.
*/
public final class ThrottleTimer {
public final class ThrottleTimer implements ThrottledTimeTracker {
// This is -1 if not currently being throttled or the time in
// milliseconds when throttling for this type started.
private long startTime = -1;
Expand Down Expand Up @@ -56,6 +56,7 @@ public synchronized boolean throttled() {
}

/** Returns the combined total of all throttle times and resets those times to 0. */
@Override
public synchronized long getAndResetThrottleTime() {
if (throttled()) {
stop();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.windmill.client.throttling;

import org.apache.beam.sdk.annotations.Internal;

/**
* Tracks time spent in a throttled state due to {@code Status.RESOURCE_EXHAUSTED} errors returned
* from gRPC calls.
*/
@Internal
@FunctionalInterface
public interface ThrottledTimeTracker {

/** Returns the combined total of all throttle times and resets those times to 0. */
long getAndResetThrottleTime();
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ private StreamPoolHeartbeatSender(
this.heartbeatStreamPool.set(heartbeatStreamPool);
}

public static StreamPoolHeartbeatSender Create(
public static StreamPoolHeartbeatSender create(
@Nonnull WindmillStreamPool<WindmillStream.GetDataStream> heartbeatStreamPool) {
return new StreamPoolHeartbeatSender(heartbeatStreamPool);
}
Expand All @@ -55,7 +55,7 @@ public static StreamPoolHeartbeatSender Create(
* enabled.
* @param getDataPool stream to use when using separate streams for heartbeat is disabled.
*/
public static StreamPoolHeartbeatSender Create(
public static StreamPoolHeartbeatSender create(
@Nonnull WindmillStreamPool<WindmillStream.GetDataStream> dedicatedHeartbeatPool,
@Nonnull WindmillStreamPool<WindmillStream.GetDataStream> getDataPool,
@Nonnull StreamingGlobalConfigHandle configHandle) {
Expand Down
Loading

0 comments on commit aa21e4a

Please sign in to comment.