Skip to content

Commit

Permalink
When failing work items during commit, make sure to call completeWork… (
Browse files Browse the repository at this point in the history
#30215)

* When failing work items during commit, make sure to call completeWorkAndScheduleNextWorkForKey to ensure future work can be processed.
* Also invalidates reader cache and state cache when failing a work item during commit.
  • Loading branch information
acrites authored Feb 6, 2024
1 parent cd5f271 commit b0f2eeb
Showing 1 changed file with 10 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1397,12 +1397,20 @@ private void commitLoop() {
// Adds the commit to the commitStream if it fits, returning true iff it is consumed.
private boolean addCommitToStream(Commit commit, CommitWorkStream commitStream) {
Preconditions.checkNotNull(commit);
final ComputationState state = commit.computationState();
final Windmill.WorkItemCommitRequest request = commit.request();
// Drop commits for failed work. Such commits will be dropped by Windmill anyway.
if (commit.work().isFailed()) {
readerCache.invalidateReader(
WindmillComputationKey.create(
state.getComputationId(), request.getKey(), request.getShardingKey()));
stateCache
.forComputation(state.getComputationId())
.invalidate(request.getKey(), request.getShardingKey());
state.completeWorkAndScheduleNextWorkForKey(
ShardedKey.create(request.getKey(), request.getShardingKey()), request.getWorkToken());
return true;
}
final ComputationState state = commit.computationState();
final Windmill.WorkItemCommitRequest request = commit.request();
final int size = commit.getSize();
commit.work().setState(Work.State.COMMITTING);
activeCommitBytes.addAndGet(size);
Expand Down

0 comments on commit b0f2eeb

Please sign in to comment.