Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

check for cachetoken representing a retry before activating and completing work #29082

Merged
merged 12 commits into from
Feb 13, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@
import org.apache.beam.runners.dataflow.worker.status.LastExceptionDataProvider;
import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
import org.apache.beam.runners.dataflow.worker.status.WorkerStatusPages;
import org.apache.beam.runners.dataflow.worker.streaming.ActiveWorkState.FailedTokens;
import org.apache.beam.runners.dataflow.worker.streaming.Commit;
import org.apache.beam.runners.dataflow.worker.streaming.ComputationState;
import org.apache.beam.runners.dataflow.worker.streaming.ExecutionState;
Expand All @@ -97,13 +96,15 @@
import org.apache.beam.runners.dataflow.worker.streaming.WeightedBoundedQueue;
import org.apache.beam.runners.dataflow.worker.streaming.Work;
import org.apache.beam.runners.dataflow.worker.streaming.Work.State;
import org.apache.beam.runners.dataflow.worker.streaming.WorkId;
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter;
import org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.ComputationHeartbeatResponse;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest;
import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
Expand Down Expand Up @@ -1308,7 +1309,7 @@ public void close() {
// Consider the item invalid. It will eventually be retried by Windmill if it still needs to
// be processed.
computationState.completeWorkAndScheduleNextWorkForKey(
ShardedKey.create(key, workItem.getShardingKey()), workItem.getWorkToken());
ShardedKey.create(key, workItem.getShardingKey()), work.id());
}
} finally {
// Update total processing time counters. Updating in finally clause ensures that
Expand Down Expand Up @@ -1386,7 +1387,10 @@ private void commitLoop() {
for (Windmill.WorkItemCommitRequest workRequest : entry.getValue().getRequestsList()) {
computationState.completeWorkAndScheduleNextWorkForKey(
ShardedKey.create(workRequest.getKey(), workRequest.getShardingKey()),
workRequest.getWorkToken());
WorkId.builder()
.setCacheToken(workRequest.getCacheToken())
.setWorkToken(workRequest.getWorkToken())
.build());
}
}
}
Expand All @@ -1406,7 +1410,11 @@ private boolean addCommitToStream(Commit commit, CommitWorkStream commitStream)
.forComputation(state.getComputationId())
.invalidate(request.getKey(), request.getShardingKey());
state.completeWorkAndScheduleNextWorkForKey(
ShardedKey.create(request.getKey(), request.getShardingKey()), request.getWorkToken());
ShardedKey.create(request.getKey(), request.getShardingKey()),
WorkId.builder()
.setWorkToken(request.getWorkToken())
.setCacheToken(request.getCacheToken())
.build());
return true;
}

Expand All @@ -1428,7 +1436,10 @@ private boolean addCommitToStream(Commit commit, CommitWorkStream commitStream)
activeCommitBytes.addAndGet(-size);
state.completeWorkAndScheduleNextWorkForKey(
ShardedKey.create(request.getKey(), request.getShardingKey()),
request.getWorkToken());
WorkId.builder()
.setCacheToken(request.getCacheToken())
.setWorkToken(request.getWorkToken())
.build());
})) {
return true;
} else {
Expand Down Expand Up @@ -1960,20 +1971,19 @@ private void sendWorkerUpdatesToDataflowService(
}
}

public void handleHeartbeatResponses(List<Windmill.ComputationHeartbeatResponse> responses) {
for (Windmill.ComputationHeartbeatResponse computationHeartbeatResponse : responses) {
public void handleHeartbeatResponses(List<ComputationHeartbeatResponse> responses) {
for (ComputationHeartbeatResponse computationHeartbeatResponse : responses) {
// Maps sharding key to (work token, cache token) for work that should be marked failed.
Map<Long, List<FailedTokens>> failedWork = new HashMap<>();
Multimap<Long, WorkId> failedWork = ArrayListMultimap.create();
for (Windmill.HeartbeatResponse heartbeatResponse :
computationHeartbeatResponse.getHeartbeatResponsesList()) {
if (heartbeatResponse.getFailed()) {
failedWork
.computeIfAbsent(heartbeatResponse.getShardingKey(), key -> new ArrayList<>())
.add(
FailedTokens.newBuilder()
.setWorkToken(heartbeatResponse.getWorkToken())
.setCacheToken(heartbeatResponse.getCacheToken())
.build());
failedWork.put(
heartbeatResponse.getShardingKey(),
WorkId.builder()
.setWorkToken(heartbeatResponse.getWorkToken())
.setCacheToken(heartbeatResponse.getCacheToken())
.build());
}
}
ComputationState state = computationMap.get(computationHeartbeatResponse.getComputationId());
Expand Down
Loading
Loading