Skip to content

Commit

Permalink
Add WorkProvider interfaces and implementations (#31883)
Browse files Browse the repository at this point in the history
  • Loading branch information
m-trieu authored Aug 5, 2024
1 parent 7e75087 commit ca744ae
Show file tree
Hide file tree
Showing 21 changed files with 839 additions and 608 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
package org.apache.beam.runners.dataflow.worker.streaming.harness;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet.toImmutableSet;
Expand Down Expand Up @@ -47,6 +47,8 @@
import org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter;
import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.StreamGetDataClient;
import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.ThrottlingGetDataMetricTracker;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcDispatcherClient;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingStubFactory;
import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer;
import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemScheduler;
Expand All @@ -68,16 +70,19 @@
import org.slf4j.LoggerFactory;

/**
* Client for StreamingEngine. Given a {@link GetWorkBudget}, divides the budget and starts the
* {@link WindmillStream.GetWorkStream}(s).
* {@link StreamingWorkerHarness} implementation that manages fan out to multiple backend
* destinations. Given a {@link GetWorkBudget}, divides the budget and starts the {@link
* WindmillStream.GetWorkStream}(s).
*/
@Internal
@CheckReturnValue
@ThreadSafe
public final class StreamingEngineClient {
private static final Logger LOG = LoggerFactory.getLogger(StreamingEngineClient.class);
public final class FanOutStreamingEngineWorkerHarness implements StreamingWorkerHarness {
private static final Logger LOG =
LoggerFactory.getLogger(FanOutStreamingEngineWorkerHarness.class);
private static final String PUBLISH_NEW_WORKER_METADATA_THREAD = "PublishNewWorkerMetadataThread";
private static final String CONSUME_NEW_WORKER_METADATA_THREAD = "ConsumeNewWorkerMetadataThread";

private final JobHeader jobHeader;
private final GrpcWindmillStreamFactory streamFactory;
private final WorkItemScheduler workItemScheduler;
Expand All @@ -101,7 +106,7 @@ public final class StreamingEngineClient {
private volatile boolean started;

@SuppressWarnings("FutureReturnValueIgnored")
private StreamingEngineClient(
private FanOutStreamingEngineWorkerHarness(
JobHeader jobHeader,
GetWorkBudget totalGetWorkBudget,
GrpcWindmillStreamFactory streamFactory,
Expand Down Expand Up @@ -152,23 +157,15 @@ private StreamingEngineClient(

private static ExecutorService singleThreadedExecutorServiceOf(String threadName) {
return Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat(threadName)
.setUncaughtExceptionHandler(
(t, e) -> {
LOG.error(
"{} failed due to uncaught exception during execution. ", t.getName(), e);
throw new StreamingEngineClientException(e);
})
.build());
new ThreadFactoryBuilder().setNameFormat(threadName).build());
}

/**
* Creates an instance of {@link StreamingEngineClient} in a non-started state.
* Creates an instance of {@link FanOutStreamingEngineWorkerHarness} in a non-started state.
*
* @implNote Does not block the calling thread. Callers must explicitly call {@link #start()}.
*/
public static StreamingEngineClient create(
public static FanOutStreamingEngineWorkerHarness create(
JobHeader jobHeader,
GetWorkBudget totalGetWorkBudget,
GrpcWindmillStreamFactory streamingEngineStreamFactory,
Expand All @@ -178,7 +175,7 @@ public static StreamingEngineClient create(
GrpcDispatcherClient dispatcherClient,
Function<WindmillStream.CommitWorkStream, WorkCommitter> workCommitterFactory,
ThrottlingGetDataMetricTracker getDataMetricTracker) {
return new StreamingEngineClient(
return new FanOutStreamingEngineWorkerHarness(
jobHeader,
totalGetWorkBudget,
streamingEngineStreamFactory,
Expand All @@ -192,7 +189,7 @@ public static StreamingEngineClient create(
}

@VisibleForTesting
static StreamingEngineClient forTesting(
static FanOutStreamingEngineWorkerHarness forTesting(
JobHeader jobHeader,
GetWorkBudget totalGetWorkBudget,
GrpcWindmillStreamFactory streamFactory,
Expand All @@ -203,8 +200,8 @@ static StreamingEngineClient forTesting(
long clientId,
Function<WindmillStream.CommitWorkStream, WorkCommitter> workCommitterFactory,
ThrottlingGetDataMetricTracker getDataMetricTracker) {
StreamingEngineClient streamingEngineClient =
new StreamingEngineClient(
FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkProvider =
new FanOutStreamingEngineWorkerHarness(
jobHeader,
totalGetWorkBudget,
streamFactory,
Expand All @@ -215,11 +212,12 @@ static StreamingEngineClient forTesting(
clientId,
workCommitterFactory,
getDataMetricTracker);
streamingEngineClient.start();
return streamingEngineClient;
fanOutStreamingEngineWorkProvider.start();
return fanOutStreamingEngineWorkProvider;
}

@SuppressWarnings("ReturnValueIgnored")
@Override
public synchronized void start() {
Preconditions.checkState(!started, "StreamingEngineClient cannot start twice.");
// Starts the stream, this value is memoized.
Expand Down Expand Up @@ -270,7 +268,8 @@ private void startWorkerMetadataConsumer() {
}

@VisibleForTesting
public synchronized void finish() {
@Override
public synchronized void shutdown() {
Preconditions.checkState(started, "StreamingEngineClient never started.");
getWorkerMetadataStream.get().halfClose();
getWorkBudgetRefresher.stop();
Expand Down Expand Up @@ -334,10 +333,13 @@ private synchronized ImmutableMap<Endpoint, WindmillConnection> createNewWindmil
.collect(
toImmutableMap(
Function.identity(),
// Reuse existing stubs if they exist.
endpoint ->
currentConnections.getOrDefault(
endpoint, WindmillConnection.from(endpoint, this::createWindmillStub))));
// Reuse existing stubs if they exist. Optional.orElseGet only calls the
// supplier if the value is not present, preventing constructing expensive
// objects.
Optional.ofNullable(currentConnections.get(endpoint))
.orElseGet(
() -> WindmillConnection.from(endpoint, this::createWindmillStub))));
}

private synchronized ImmutableMap<WindmillConnection, WindmillStreamSender>
Expand Down Expand Up @@ -423,11 +425,4 @@ private CloudWindmillServiceV1Alpha1Stub createWindmillStub(Endpoint endpoint) {
.map(channelCachingStubFactory::createWindmillServiceStub)
.orElseGet(dispatcherClient::getWindmillServiceStub);
}

private static class StreamingEngineClientException extends IllegalStateException {

private StreamingEngineClientException(Throwable exception) {
super(exception);
}
}
}
Loading

0 comments on commit ca744ae

Please sign in to comment.