diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 5ee81b2ee362..0ad9728b3063 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -45,6 +45,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.ComputationState; import org.apache.beam.runners.dataflow.worker.streaming.ComputationStateCache; import org.apache.beam.runners.dataflow.worker.streaming.StageInfo; +import org.apache.beam.runners.dataflow.worker.streaming.WeightedSemaphore; import org.apache.beam.runners.dataflow.worker.streaming.WorkHeartbeatResponseProcessor; import org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig; import org.apache.beam.runners.dataflow.worker.streaming.config.FixedGlobalConfigHandle; @@ -69,6 +70,8 @@ import org.apache.beam.runners.dataflow.worker.windmill.client.CloseableStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool; +import org.apache.beam.runners.dataflow.worker.windmill.client.commits.Commit; +import org.apache.beam.runners.dataflow.worker.windmill.client.commits.Commits; import org.apache.beam.runners.dataflow.worker.windmill.client.commits.CompleteCommit; import org.apache.beam.runners.dataflow.worker.windmill.client.commits.StreamingApplianceWorkCommitter; import org.apache.beam.runners.dataflow.worker.windmill.client.commits.StreamingEngineWorkCommitter; @@ -235,6 +238,7 @@ private StreamingDataflowWorker( Consumer getDataStatusProvider; Supplier currentActiveCommitBytesProvider; if (isDirectPathPipeline(options)) { + WeightedSemaphore maxCommitByteSemaphore = Commits.maxCommitByteSemaphore(); FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness = FanOutStreamingEngineWorkerHarness.create( createJobHeader(options, clientId), @@ -261,6 +265,8 @@ private StreamingDataflowWorker( Preconditions.checkNotNull(dispatcherClient), commitWorkStream -> StreamingEngineWorkCommitter.builder() + // Share the commitByteSemaphore across all created workCommitters. + .setCommitByteSemaphore(maxCommitByteSemaphore) .setBackendWorkerToken(commitWorkStream.backendWorkerToken()) .setOnCommitComplete(this::onCompleteCommit) .setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1)) @@ -276,7 +282,12 @@ private StreamingDataflowWorker( options, fanOutStreamingEngineWorkerHarness::currentWindmillEndpoints); this.streamingWorkerHarness = fanOutStreamingEngineWorkerHarness; } else { - Windmill.GetWorkRequest request = createGetWorkRequest(clientId, options); + Windmill.GetWorkRequest request = + Windmill.GetWorkRequest.newBuilder() + .setClientId(clientId) + .setMaxItems(chooseMaxBundlesOutstanding(options)) + .setMaxBytes(MAX_GET_WORK_FETCH_BYTES) + .build(); GetDataClient getDataClient; HeartbeatSender heartbeatSender; WorkCommitter workCommitter; @@ -301,6 +312,7 @@ private StreamingDataflowWorker( COMMIT_STREAM_TIMEOUT, windmillServer::commitWorkStream) ::getCloseableStream) + .setCommitByteSemaphore(Commits.maxCommitByteSemaphore()) .setNumCommitSenders(numCommitThreads) .setOnCommitComplete(this::onCompleteCommit) .build(); @@ -832,15 +844,6 @@ private static void enableBigQueryMetrics() { BigQuerySinkMetrics.setSupportStreamingInsertsMetrics(true); } - private static Windmill.GetWorkRequest createGetWorkRequest( - long clientId, DataflowWorkerHarnessOptions options) { - return Windmill.GetWorkRequest.newBuilder() - .setClientId(clientId) - .setMaxItems(chooseMaxBundlesOutstanding(options)) - .setMaxBytes(MAX_GET_WORK_FETCH_BYTES) - .build(); - } - @VisibleForTesting void reportPeriodicWorkerUpdatesForTest() { workerStatusReporter.reportPeriodicWorkerUpdates(); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java index 5a2df4baae61..a65985b91f4e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java @@ -60,6 +60,8 @@ private SingleSourceWorkerHarness createWorkerHarness( .setWaitForResources(waitForResources) .setStreamingWorkScheduler(streamingWorkScheduler) .setComputationStateFetcher(computationStateFetcher) + // no-op throttle time supplier. + .setThrottleTimeSupplier(() -> 0L) .setGetWorkSender(getWorkSender) .build(); }