Skip to content

Commit

Permalink
rebase and fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
m-trieu committed Nov 19, 2024
1 parent b7dba9e commit 395abf1
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.beam.runners.dataflow.worker.streaming.ComputationState;
import org.apache.beam.runners.dataflow.worker.streaming.ComputationStateCache;
import org.apache.beam.runners.dataflow.worker.streaming.StageInfo;
import org.apache.beam.runners.dataflow.worker.streaming.WeightedSemaphore;
import org.apache.beam.runners.dataflow.worker.streaming.WorkHeartbeatResponseProcessor;
import org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig;
import org.apache.beam.runners.dataflow.worker.streaming.config.FixedGlobalConfigHandle;
Expand All @@ -69,6 +70,8 @@
import org.apache.beam.runners.dataflow.worker.windmill.client.CloseableStream;
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool;
import org.apache.beam.runners.dataflow.worker.windmill.client.commits.Commit;
import org.apache.beam.runners.dataflow.worker.windmill.client.commits.Commits;
import org.apache.beam.runners.dataflow.worker.windmill.client.commits.CompleteCommit;
import org.apache.beam.runners.dataflow.worker.windmill.client.commits.StreamingApplianceWorkCommitter;
import org.apache.beam.runners.dataflow.worker.windmill.client.commits.StreamingEngineWorkCommitter;
Expand Down Expand Up @@ -235,6 +238,7 @@ private StreamingDataflowWorker(
Consumer<PrintWriter> getDataStatusProvider;
Supplier<Long> currentActiveCommitBytesProvider;
if (isDirectPathPipeline(options)) {
WeightedSemaphore<Commit> maxCommitByteSemaphore = Commits.maxCommitByteSemaphore();
FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness =
FanOutStreamingEngineWorkerHarness.create(
createJobHeader(options, clientId),
Expand All @@ -261,6 +265,8 @@ private StreamingDataflowWorker(
Preconditions.checkNotNull(dispatcherClient),
commitWorkStream ->
StreamingEngineWorkCommitter.builder()
// Share the commitByteSemaphore across all created workCommitters.
.setCommitByteSemaphore(maxCommitByteSemaphore)
.setBackendWorkerToken(commitWorkStream.backendWorkerToken())
.setOnCommitComplete(this::onCompleteCommit)
.setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1))
Expand All @@ -276,7 +282,12 @@ private StreamingDataflowWorker(
options, fanOutStreamingEngineWorkerHarness::currentWindmillEndpoints);
this.streamingWorkerHarness = fanOutStreamingEngineWorkerHarness;
} else {
Windmill.GetWorkRequest request = createGetWorkRequest(clientId, options);
Windmill.GetWorkRequest request =
Windmill.GetWorkRequest.newBuilder()
.setClientId(clientId)
.setMaxItems(chooseMaxBundlesOutstanding(options))
.setMaxBytes(MAX_GET_WORK_FETCH_BYTES)
.build();
GetDataClient getDataClient;
HeartbeatSender heartbeatSender;
WorkCommitter workCommitter;
Expand All @@ -301,6 +312,7 @@ private StreamingDataflowWorker(
COMMIT_STREAM_TIMEOUT,
windmillServer::commitWorkStream)
::getCloseableStream)
.setCommitByteSemaphore(Commits.maxCommitByteSemaphore())
.setNumCommitSenders(numCommitThreads)
.setOnCommitComplete(this::onCompleteCommit)
.build();
Expand Down Expand Up @@ -832,15 +844,6 @@ private static void enableBigQueryMetrics() {
BigQuerySinkMetrics.setSupportStreamingInsertsMetrics(true);
}

private static Windmill.GetWorkRequest createGetWorkRequest(
long clientId, DataflowWorkerHarnessOptions options) {
return Windmill.GetWorkRequest.newBuilder()
.setClientId(clientId)
.setMaxItems(chooseMaxBundlesOutstanding(options))
.setMaxBytes(MAX_GET_WORK_FETCH_BYTES)
.build();
}

@VisibleForTesting
void reportPeriodicWorkerUpdatesForTest() {
workerStatusReporter.reportPeriodicWorkerUpdates();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ private SingleSourceWorkerHarness createWorkerHarness(
.setWaitForResources(waitForResources)
.setStreamingWorkScheduler(streamingWorkScheduler)
.setComputationStateFetcher(computationStateFetcher)
// no-op throttle time supplier.
.setThrottleTimeSupplier(() -> 0L)
.setGetWorkSender(getWorkSender)
.build();
}
Expand Down

0 comments on commit 395abf1

Please sign in to comment.