Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

check for cachetoken representing a retry before activating and completing work #29082

Merged
merged 12 commits into from
Feb 13, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import org.apache.beam.runners.dataflow.worker.streaming.WeightedBoundedQueue;
import org.apache.beam.runners.dataflow.worker.streaming.Work;
import org.apache.beam.runners.dataflow.worker.streaming.Work.State;
import org.apache.beam.runners.dataflow.worker.streaming.WorkId;
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
Expand Down Expand Up @@ -1308,7 +1309,7 @@ public void close() {
// Consider the item invalid. It will eventually be retried by Windmill if it still needs to
// be processed.
computationState.completeWorkAndScheduleNextWorkForKey(
ShardedKey.create(key, workItem.getShardingKey()), workItem.getWorkToken());
ShardedKey.create(key, workItem.getShardingKey()), work.id());
}
} finally {
// Update total processing time counters. Updating in finally clause ensures that
Expand Down Expand Up @@ -1386,7 +1387,10 @@ private void commitLoop() {
for (Windmill.WorkItemCommitRequest workRequest : entry.getValue().getRequestsList()) {
computationState.completeWorkAndScheduleNextWorkForKey(
ShardedKey.create(workRequest.getKey(), workRequest.getShardingKey()),
workRequest.getWorkToken());
WorkId.builder()
.setCacheToken(workRequest.getCacheToken())
.setWorkToken(workRequest.getWorkToken())
.build());
}
}
}
Expand All @@ -1406,7 +1410,11 @@ private boolean addCommitToStream(Commit commit, CommitWorkStream commitStream)
.forComputation(state.getComputationId())
.invalidate(request.getKey(), request.getShardingKey());
state.completeWorkAndScheduleNextWorkForKey(
ShardedKey.create(request.getKey(), request.getShardingKey()), request.getWorkToken());
ShardedKey.create(request.getKey(), request.getShardingKey()),
WorkId.builder()
.setWorkToken(request.getWorkToken())
.setCacheToken(request.getCacheToken())
.build());
return true;
}

Expand All @@ -1428,7 +1436,10 @@ private boolean addCommitToStream(Commit commit, CommitWorkStream commitStream)
activeCommitBytes.addAndGet(-size);
state.completeWorkAndScheduleNextWorkForKey(
ShardedKey.create(request.getKey(), request.getShardingKey()),
request.getWorkToken());
WorkId.builder()
.setCacheToken(request.getCacheToken())
.setWorkToken(request.getWorkToken())
.build());
})) {
return true;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.stream.Stream;
Expand Down Expand Up @@ -60,7 +62,7 @@
public final class ActiveWorkState {
private static final Logger LOG = LoggerFactory.getLogger(ActiveWorkState.class);

/* The max number of keys in COMMITTING or COMMIT_QUEUED status to be shown.*/
/* The max number of keys in COMMITTING or COMMIT_QUEUED status to be shown for observability.*/
private static final int MAX_PRINTABLE_COMMIT_PENDING_KEYS = 50;

/**
Expand All @@ -76,7 +78,7 @@ public final class ActiveWorkState {
/**
* Current budget that is being processed or queued on the user worker. Incremented when work is
* activated in {@link #activateWorkForKey(ShardedKey, Work)}, and decremented when work is
* completed in {@link #completeWorkAndGetNextWorkForKey(ShardedKey, long)}.
* completed in {@link #completeWorkAndGetNextWorkForKey(ShardedKey, WorkId)}.
*/
private final AtomicReference<GetWorkBudget> activeGetWorkBudget;

Expand Down Expand Up @@ -106,7 +108,7 @@ private static String elapsedString(Instant start, Instant end) {
}

/**
* Activates {@link Work} for the {@link ShardedKey}. Outcome can be 1 of 3 {@link
* Activates {@link Work} for the {@link ShardedKey}. Outcome can be 1 of 4 {@link
* ActivateWorkResult}
*
* <p>1. EXECUTE: The {@link ShardedKey} has not been seen before, create a {@link Queue<Work>}
Expand All @@ -116,7 +118,11 @@ private static String elapsedString(Instant start, Instant end) {
* the {@link ShardedKey}'s work queue, mark the {@link Work} as a duplicate.
*
* <p>3. QUEUED: A work queue for the {@link ShardedKey} exists, and the work is not in the key's
* work queue, queue the work for later processing.
* work queue, OR the work in the work queue is stale, OR the work in the queue has a matching
* work token but different cache token, queue the work for later processing.
*
* <p>4. STALE: A work queue for the {@link ShardedKey} exists, and there is a queued {@link Work}
* with a greater workToken than the passed in {@link Work}.
*/
synchronized ActivateWorkResult activateWorkForKey(ShardedKey shardedKey, Work work) {
Deque<Work> workQueue = activeWork.getOrDefault(shardedKey, new ArrayDeque<>());
Expand All @@ -129,13 +135,29 @@ synchronized ActivateWorkResult activateWorkForKey(ShardedKey shardedKey, Work w
return ActivateWorkResult.EXECUTE;
}

// Ensure we don't already have this work token queued.
// Check to see if we have this work token queued.
// This set is for adding remove-able WorkItems if they exist in the workQueue. We add them to
// this set since a ConcurrentModificationException will be thrown if we modify the workQueue
// and then resume iteration.
Set<WorkId> queuedWorkToRemove = new HashSet<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of a set and then removeif, how about doing in a single pass with an iterator and Iterator.remove()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

for (Work queuedWork : workQueue) {
if (queuedWork.getWorkItem().getWorkToken() == work.getWorkItem().getWorkToken()) {
if (queuedWork.id().equals(work.id())) {
return ActivateWorkResult.DUPLICATE;
}
if (queuedWork.id().cacheToken() == work.id().cacheToken()) {
if (work.id().workToken() > queuedWork.id().workToken()) {
queuedWorkToRemove.add(queuedWork.id());
// Continue here to possibly remove more non-active stale work that is queued.
} else {
return ActivateWorkResult.STALE;
}
}
}

workQueue.removeIf(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't updating the activeworkbudget for the removals
see/share removal logic with the failing for heartbeats

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

queuedWork ->
queuedWorkToRemove.contains(queuedWork.id()) && !queuedWork.equals(workQueue.peek()));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't add a comment on the right line because github is stupid, but can you replace FailedTokens below with the new WorkId class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// Queue the work for later processing.
workQueue.addLast(work);
incrementActiveWorkBudget(work);
Expand Down Expand Up @@ -213,34 +235,38 @@ private void decrementActiveWorkBudget(Work work) {
* #activeWork}.
*/
synchronized Optional<Work> completeWorkAndGetNextWorkForKey(
ShardedKey shardedKey, long workToken) {
ShardedKey shardedKey, WorkId workId) {
@Nullable Queue<Work> workQueue = activeWork.get(shardedKey);
if (workQueue == null) {
// Work may have been completed due to clearing of stuck commits.
LOG.warn("Unable to complete inactive work for key {} and token {}.", shardedKey, workToken);
LOG.warn("Unable to complete inactive work for key {} and token {}.", shardedKey, workId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: replace token with ids

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

return Optional.empty();
}
removeCompletedWorkFromQueue(workQueue, shardedKey, workToken);
removeCompletedWorkFromQueue(workQueue, shardedKey, workId);
return getNextWork(workQueue, shardedKey);
}

private synchronized void removeCompletedWorkFromQueue(
Queue<Work> workQueue, ShardedKey shardedKey, long workToken) {
Queue<Work> workQueue, ShardedKey shardedKey, WorkId workId) {
// avoid Preconditions.checkState here to prevent eagerly evaluating the
// format string parameters for the error message.
Work completedWork = workQueue.peek();
if (completedWork == null) {
// Work may have been completed due to clearing of stuck commits.
LOG.warn(
String.format("Active key %s without work, expected token %d", shardedKey, workToken));
LOG.warn("Active key {} without work, expected token {}", shardedKey, workId);
return;
}

if (completedWork.getWorkItem().getWorkToken() != workToken) {
if (!completedWork.id().equals(workId)) {
// Work may have been completed due to clearing of stuck commits.
LOG.warn(
"Unable to complete due to token mismatch for key {} and token {}, actual token was {}.",
"Unable to complete due to token mismatch for "
+ "key {},"
+ "expected work_id {}, "
+ "actual work_id was {}",
shardedKey,
workToken,
completedWork.getWorkItem().getWorkToken());
workId,
completedWork.id());
return;
}

Expand All @@ -263,21 +289,21 @@ private synchronized Optional<Work> getNextWork(Queue<Work> workQueue, ShardedKe
* before the stuckCommitDeadline.
*/
synchronized void invalidateStuckCommits(
Instant stuckCommitDeadline, BiConsumer<ShardedKey, Long> shardedKeyAndWorkTokenConsumer) {
for (Entry<ShardedKey, Long> shardedKeyAndWorkToken :
Instant stuckCommitDeadline, BiConsumer<ShardedKey, WorkId> shardedKeyAndWorkTokenConsumer) {
for (Entry<ShardedKey, WorkId> shardedKeyAndWorkId :
getStuckCommitsAt(stuckCommitDeadline).entrySet()) {
ShardedKey shardedKey = shardedKeyAndWorkToken.getKey();
long workToken = shardedKeyAndWorkToken.getValue();
ShardedKey shardedKey = shardedKeyAndWorkId.getKey();
WorkId workId = shardedKeyAndWorkId.getValue();
computationStateCache.invalidate(shardedKey.key(), shardedKey.shardingKey());
shardedKeyAndWorkTokenConsumer.accept(shardedKey, workToken);
shardedKeyAndWorkTokenConsumer.accept(shardedKey, workId);
}
}

private synchronized ImmutableMap<ShardedKey, Long> getStuckCommitsAt(
private synchronized ImmutableMap<ShardedKey, WorkId> getStuckCommitsAt(
Instant stuckCommitDeadline) {
// Determine the stuck commit keys but complete them outside the loop iterating over
// activeWork as completeWork may delete the entry from activeWork.
ImmutableMap.Builder<ShardedKey, Long> stuckCommits = ImmutableMap.builder();
ImmutableMap.Builder<ShardedKey, WorkId> stuckCommits = ImmutableMap.builder();
for (Entry<ShardedKey, Deque<Work>> entry : activeWork.entrySet()) {
ShardedKey shardedKey = entry.getKey();
@Nullable Work work = entry.getValue().peek();
Expand All @@ -287,7 +313,7 @@ private synchronized ImmutableMap<ShardedKey, Long> getStuckCommitsAt(
"Detected key {} stuck in COMMITTING state since {}, completing it with error.",
shardedKey,
work.getStateStartTime());
stuckCommits.put(shardedKey, work.getWorkItem().getWorkToken());
stuckCommits.put(shardedKey, work.id());
}
}
}
Expand Down Expand Up @@ -320,7 +346,8 @@ private static Stream<HeartbeatRequest> toHeartbeatRequestStream(
.setWorkToken(work.getWorkItem().getWorkToken())
.setCacheToken(work.getWorkItem().getCacheToken())
.addAllLatencyAttribution(
work.getLatencyAttributions(true, work.getLatencyTrackingId(), sampler))
work.getLatencyAttributions(
/* isHeartbeat= */ true, work.getLatencyTrackingId(), sampler))
.build());
}

Expand Down Expand Up @@ -386,6 +413,7 @@ synchronized void printActiveWork(PrintWriter writer, Instant now) {
enum ActivateWorkResult {
QUEUED,
EXECUTE,
DUPLICATE
DUPLICATE,
STALE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,14 @@ public ConcurrentLinkedQueue<ExecutionState> getExecutionStateQueue() {

/**
* Mark the given {@link ShardedKey} and {@link Work} as active, and schedules execution of {@link
* Work} if there is no active {@link Work} for the {@link ShardedKey} already processing.
* Work} if there is no active {@link Work} for the {@link ShardedKey} already processing. Returns
* whether the {@link Work} will be activated, either immediately or sometime in the future.
*/
public boolean activateWork(ShardedKey shardedKey, Work work) {
switch (activeWorkState.activateWorkForKey(shardedKey, work)) {
case DUPLICATE:
// Fall through intentionally. Work was not and will not be activated in these cases.
case STALE:
return false;
case QUEUED:
return true;
Expand All @@ -107,9 +110,9 @@ public void failWork(Map<Long, List<FailedTokens>> failedWork) {
/**
* Marks the work for the given shardedKey as complete. Schedules queued work for the key if any.
*/
public void completeWorkAndScheduleNextWorkForKey(ShardedKey shardedKey, long workToken) {
public void completeWorkAndScheduleNextWorkForKey(ShardedKey shardedKey, WorkId workId) {
activeWorkState
.completeWorkAndGetNextWorkForKey(shardedKey, workToken)
.completeWorkAndGetNextWorkForKey(shardedKey, workId)
.ifPresent(this::forceExecute);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class Work implements Runnable {
private final Instant startTime;
private final Map<Windmill.LatencyAttribution.State, Duration> totalDurationPerState;
private final Consumer<Work> processWorkFn;
private final WorkId id;
private TimedState currentState;
private volatile boolean isFailed;

Expand All @@ -58,6 +59,11 @@ private Work(Windmill.WorkItem workItem, Supplier<Instant> clock, Consumer<Work>
this.totalDurationPerState = new EnumMap<>(Windmill.LatencyAttribution.State.class);
this.currentState = TimedState.initialState(startTime);
this.isFailed = false;
this.id =
WorkId.builder()
.setCacheToken(workItem.getCacheToken())
.setWorkToken(workItem.getWorkToken())
.build();
}

public static Work create(
Expand Down Expand Up @@ -116,6 +122,10 @@ public String getLatencyTrackingId() {
return workIdBuilder.toString();
}

public WorkId id() {
return id;
}

private void recordGetWorkStreamLatencies(
Collection<Windmill.LatencyAttribution> getWorkStreamLatencies) {
for (Windmill.LatencyAttribution latency : getWorkStreamLatencies) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.streaming;

import com.google.auto.value.AutoValue;

/**
* A composite key used to identify a unit of {@link Work}. If multiple units of {@link Work} have
* the same workToken AND cacheToken, the {@link Work} is a duplicate. If multiple units of {@link
* Work} have the same workToken, but different cacheTokens, the {@link Work} is a retry. If
* multiple units of {@link Work} have the same cacheToken, but different workTokens, the {@link
* Work} is obsolete.
*/
@AutoValue
public abstract class WorkId {

public static Builder builder() {
return new AutoValue_WorkId.Builder();
}

abstract long cacheToken();

abstract long workToken();

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setCacheToken(long value);

public abstract Builder setWorkToken(long value);

public abstract WorkId build();
}
}
Loading
Loading