From 23dcb7ec1d539759f5c587a6dfee357ad250db72 Mon Sep 17 00:00:00 2001 From: scwhittle Date: Tue, 6 Feb 2024 11:23:57 +0100 Subject: [PATCH] =?UTF-8?q?Revert=20"When=20failing=20work=20items=20durin?= =?UTF-8?q?g=20commit,=20make=20sure=20to=20call=20completeWork=E2=80=A6"?= =?UTF-8?q?=20(#30228)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit b0f2eebb0244302ac2315dc260536512d229401f. --- .../dataflow/worker/StreamingDataflowWorker.java | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index b48032677ff1..3ba27bd852fc 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -1397,20 +1397,12 @@ private void commitLoop() { // Adds the commit to the commitStream if it fits, returning true iff it is consumed. private boolean addCommitToStream(Commit commit, CommitWorkStream commitStream) { Preconditions.checkNotNull(commit); - final ComputationState state = commit.computationState(); - final Windmill.WorkItemCommitRequest request = commit.request(); // Drop commits for failed work. Such commits will be dropped by Windmill anyway. if (commit.work().isFailed()) { - readerCache.invalidateReader( - WindmillComputationKey.create( - state.getComputationId(), request.getKey(), request.getShardingKey())); - stateCache - .forComputation(state.getComputationId()) - .invalidate(request.getKey(), request.getShardingKey()); - state.completeWorkAndScheduleNextWorkForKey( - ShardedKey.create(request.getKey(), request.getShardingKey()), request.getWorkToken()); return true; } + final ComputationState state = commit.computationState(); + final Windmill.WorkItemCommitRequest request = commit.request(); final int size = commit.getSize(); commit.work().setState(Work.State.COMMITTING); activeCommitBytes.addAndGet(size);