Skip to content

Commit

Permalink
remove processing/scheduling logic from StreamingDataflowWorker (#31317)
Browse files Browse the repository at this point in the history
* use work processing context in work processing
* break out work scheduling/processing from StreamingDataflowWorker
  • Loading branch information
m-trieu authored May 31, 2024
1 parent 7b6f941 commit f93a67a
Show file tree
Hide file tree
Showing 44 changed files with 2,212 additions and 1,400 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public NativeReader<?> create(

@Override
public NativeReaderIterator<WindowedValue<T>> iterator() throws IOException {
return new PubsubReaderIterator(context.getWork());
return new PubsubReaderIterator(context.getWorkItem());
}

class PubsubReaderIterator extends WindmillReaderIteratorBase<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
Expand All @@ -39,7 +40,8 @@
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
class ReaderCache {
@Internal
public class ReaderCache {

private static final Logger LOG = LoggerFactory.getLogger(ReaderCache.class);
private final Executor invalidationExecutor;
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public NativeReader<?> create(

@Override
public NativeReaderIterator<WindowedValue<T>> iterator() throws IOException {
return new UngroupedWindmillReaderIterator(context.getWork());
return new UngroupedWindmillReaderIterator(context.getWorkItem());
}

class UngroupedWindmillReaderIterator extends WindmillReaderIteratorBase {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
import org.apache.beam.sdk.coders.Coder;
Expand Down Expand Up @@ -65,45 +66,33 @@ class WindmillTimerInternals implements TimerInternals {
// though technically in Windmill this is only enforced per ID and namespace
// and TimeDomain. This TimerInternals is scoped to a step and key, shared
// across namespaces.
private Table<String, StateNamespace, TimerData> timers = HashBasedTable.create();
private final Table<String, StateNamespace, TimerData> timers = HashBasedTable.create();

// Map from timer id to whether it is to be deleted or set
private Table<String, StateNamespace, Boolean> timerStillPresent = HashBasedTable.create();
private final Table<String, StateNamespace, Boolean> timerStillPresent = HashBasedTable.create();

private Instant inputDataWatermark;
private Instant processingTime;
private @Nullable Instant outputDataWatermark;
private @Nullable Instant synchronizedProcessingTime;
private String stateFamily;
private WindmillNamespacePrefix prefix;
private Consumer<TimerData> onTimerModified;
private final Watermarks watermarks;
private final Instant processingTime;
private final String stateFamily;
private final WindmillNamespacePrefix prefix;
private final Consumer<TimerData> onTimerModified;

public WindmillTimerInternals(
String stateFamily, // unique identifies a step
WindmillNamespacePrefix prefix, // partitions user and system namespaces into "/u" and "/s"
Instant inputDataWatermark,
Instant processingTime,
@Nullable Instant outputDataWatermark,
@Nullable Instant synchronizedProcessingTime,
Watermarks watermarks,
Consumer<TimerData> onTimerModified) {
this.inputDataWatermark = checkNotNull(inputDataWatermark);
this.watermarks = watermarks;
this.processingTime = checkNotNull(processingTime);
this.outputDataWatermark = outputDataWatermark;
this.synchronizedProcessingTime = synchronizedProcessingTime;
this.stateFamily = stateFamily;
this.prefix = prefix;
this.onTimerModified = onTimerModified;
}

public WindmillTimerInternals withPrefix(WindmillNamespacePrefix prefix) {
return new WindmillTimerInternals(
stateFamily,
prefix,
inputDataWatermark,
processingTime,
outputDataWatermark,
synchronizedProcessingTime,
onTimerModified);
stateFamily, prefix, processingTime, watermarks, onTimerModified);
}

@Override
Expand Down Expand Up @@ -170,7 +159,7 @@ public Instant currentProcessingTime() {

@Override
public @Nullable Instant currentSynchronizedProcessingTime() {
return synchronizedProcessingTime;
return watermarks.synchronizedProcessingTime();
}

/**
Expand All @@ -184,7 +173,7 @@ public Instant currentProcessingTime() {
*/
@Override
public Instant currentInputWatermarkTime() {
return inputDataWatermark;
return watermarks.inputDataWatermark();
}

/**
Expand All @@ -198,7 +187,7 @@ public Instant currentInputWatermarkTime() {
*/
@Override
public @Nullable Instant currentOutputWatermarkTime() {
return outputDataWatermark;
return watermarks.outputDataWatermark();
}

public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public static <K, T> WindowingWindmillReader<K, T> create(
@Override
public NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>> iterator() throws IOException {
final K key = keyCoder.decode(context.getSerializedKey().newInput(), Coder.Context.OUTER);
final WorkItem workItem = context.getWork();
final WorkItem workItem = context.getWorkItem();
KeyedWorkItem<K, T> keyedWorkItem =
new WindmillKeyedWorkItem<>(key, workItem, windowCoder, windowsCoder, valueCoder);
final boolean isEmptyWorkItem =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ public NativeReaderIterator<WindowedValue<ValueWithRecordId<T>>> iterator() thro

UnboundedSource<T, UnboundedSource.CheckpointMark> splitSource = parseSource(splitIndex);

UnboundedSource.CheckpointMark checkpoint = null;
UnboundedSource.@Nullable CheckpointMark checkpoint = null;
if (splitSource.getCheckpointMarkCoder() != null) {
checkpoint = context.getReaderCheckpoint(splitSource.getCheckpointMarkCoder());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,20 +69,20 @@ public final class ActiveWorkState {
* Queue<Work>} is actively processing.
*/
@GuardedBy("this")
private final Map<ShardedKey, Deque<Work>> activeWork;
private final Map<ShardedKey, Deque<ExecutableWork>> activeWork;

@GuardedBy("this")
private final WindmillStateCache.ForComputation computationStateCache;

/**
* Current budget that is being processed or queued on the user worker. Incremented when work is
* activated in {@link #activateWorkForKey(ShardedKey, Work)}, and decremented when work is
* activated in {@link #activateWorkForKey(ExecutableWork)}, and decremented when work is
* completed in {@link #completeWorkAndGetNextWorkForKey(ShardedKey, WorkId)}.
*/
private final AtomicReference<GetWorkBudget> activeGetWorkBudget;

private ActiveWorkState(
Map<ShardedKey, Deque<Work>> activeWork,
Map<ShardedKey, Deque<ExecutableWork>> activeWork,
WindmillStateCache.ForComputation computationStateCache) {
this.activeWork = activeWork;
this.computationStateCache = computationStateCache;
Expand All @@ -95,7 +95,7 @@ static ActiveWorkState create(WindmillStateCache.ForComputation computationState

@VisibleForTesting
static ActiveWorkState forTesting(
Map<ShardedKey, Deque<Work>> activeWork,
Map<ShardedKey, Deque<ExecutableWork>> activeWork,
WindmillStateCache.ForComputation computationStateCache) {
return new ActiveWorkState(activeWork, computationStateCache);
}
Expand All @@ -107,13 +107,14 @@ private static String elapsedString(Instant start, Instant end) {
}

private static Stream<HeartbeatRequest> toHeartbeatRequestStream(
Entry<ShardedKey, Deque<Work>> shardedKeyAndWorkQueue,
Entry<ShardedKey, Deque<ExecutableWork>> shardedKeyAndWorkQueue,
Instant refreshDeadline,
DataflowExecutionStateSampler sampler) {
ShardedKey shardedKey = shardedKeyAndWorkQueue.getKey();
Deque<Work> workQueue = shardedKeyAndWorkQueue.getValue();
Deque<ExecutableWork> workQueue = shardedKeyAndWorkQueue.getValue();

return workQueue.stream()
.map(ExecutableWork::work)
.filter(work -> work.getStartTime().isBefore(refreshDeadline))
// Don't send heartbeats for queued work we already know is failed.
.filter(work -> !work.isFailed())
Expand All @@ -124,8 +125,7 @@ private static Stream<HeartbeatRequest> toHeartbeatRequestStream(
.setWorkToken(work.getWorkItem().getWorkToken())
.setCacheToken(work.getWorkItem().getCacheToken())
.addAllLatencyAttribution(
work.getLatencyAttributions(
/* isHeartbeat= */ true, work.getLatencyTrackingId(), sampler))
work.getLatencyAttributions(/* isHeartbeat= */ true, sampler))
.build());
}

Expand All @@ -146,31 +146,32 @@ private static Stream<HeartbeatRequest> toHeartbeatRequestStream(
* <p>4. STALE: A work queue for the {@link ShardedKey} exists, and there is a queued {@link Work}
* with a greater workToken than the passed in {@link Work}.
*/
synchronized ActivateWorkResult activateWorkForKey(ShardedKey shardedKey, Work work) {
Deque<Work> workQueue = activeWork.getOrDefault(shardedKey, new ArrayDeque<>());
synchronized ActivateWorkResult activateWorkForKey(ExecutableWork executableWork) {
ShardedKey shardedKey = executableWork.work().getShardedKey();
Deque<ExecutableWork> workQueue = activeWork.getOrDefault(shardedKey, new ArrayDeque<>());
// This key does not have any work queued up on it. Create one, insert Work, and mark the work
// to be executed.
if (!activeWork.containsKey(shardedKey) || workQueue.isEmpty()) {
workQueue.addLast(work);
workQueue.addLast(executableWork);
activeWork.put(shardedKey, workQueue);
incrementActiveWorkBudget(work);
incrementActiveWorkBudget(executableWork.work());
return ActivateWorkResult.EXECUTE;
}

// Check to see if we have this work token queued.
Iterator<Work> workIterator = workQueue.iterator();
Iterator<ExecutableWork> workIterator = workQueue.iterator();
while (workIterator.hasNext()) {
Work queuedWork = workIterator.next();
if (queuedWork.id().equals(work.id())) {
ExecutableWork queuedWork = workIterator.next();
if (queuedWork.id().equals(executableWork.id())) {
return ActivateWorkResult.DUPLICATE;
}
if (queuedWork.id().cacheToken() == work.id().cacheToken()) {
if (work.id().workToken() > queuedWork.id().workToken()) {
if (queuedWork.id().cacheToken() == executableWork.id().cacheToken()) {
if (executableWork.id().workToken() > queuedWork.id().workToken()) {
// Check to see if the queuedWork is active. We only want to remove it if it is NOT
// currently active.
if (!queuedWork.equals(workQueue.peek())) {
workIterator.remove();
decrementActiveWorkBudget(queuedWork);
decrementActiveWorkBudget(queuedWork.work());
}
// Continue here to possibly remove more non-active stale work that is queued.
} else {
Expand All @@ -180,8 +181,8 @@ synchronized ActivateWorkResult activateWorkForKey(ShardedKey shardedKey, Work w
}

// Queue the work for later processing.
workQueue.addLast(work);
incrementActiveWorkBudget(work);
workQueue.addLast(executableWork);
incrementActiveWorkBudget(executableWork.work());
return ActivateWorkResult.QUEUED;
}

Expand All @@ -193,11 +194,11 @@ synchronized ActivateWorkResult activateWorkForKey(ShardedKey shardedKey, Work w
synchronized void failWorkForKey(Multimap<Long, WorkId> failedWork) {
// Note we can't construct a ShardedKey and look it up in activeWork directly since
// HeartbeatResponse doesn't include the user key.
for (Entry<ShardedKey, Deque<Work>> entry : activeWork.entrySet()) {
for (Entry<ShardedKey, Deque<ExecutableWork>> entry : activeWork.entrySet()) {
Collection<WorkId> failedWorkIds = failedWork.get(entry.getKey().shardingKey());
for (WorkId failedWorkId : failedWorkIds) {
for (Work queuedWork : entry.getValue()) {
WorkItem workItem = queuedWork.getWorkItem();
for (ExecutableWork queuedWork : entry.getValue()) {
WorkItem workItem = queuedWork.work().getWorkItem();
if (workItem.getWorkToken() == failedWorkId.workToken()
&& workItem.getCacheToken() == failedWorkId.cacheToken()) {
LOG.debug(
Expand All @@ -210,7 +211,7 @@ synchronized void failWorkForKey(Multimap<Long, WorkId> failedWork) {
+ " "
+ failedWorkId.cacheToken()
+ ". The work will be retried and is not lost.");
queuedWork.setFailed();
queuedWork.work().setFailed();
break;
}
}
Expand All @@ -234,9 +235,9 @@ private void decrementActiveWorkBudget(Work work) {
* ShardedKey}'s work queue, if one exists else removes the {@link ShardedKey} from {@link
* #activeWork}.
*/
synchronized Optional<Work> completeWorkAndGetNextWorkForKey(
synchronized Optional<ExecutableWork> completeWorkAndGetNextWorkForKey(
ShardedKey shardedKey, WorkId workId) {
@Nullable Queue<Work> workQueue = activeWork.get(shardedKey);
@Nullable Queue<ExecutableWork> workQueue = activeWork.get(shardedKey);
if (workQueue == null) {
// Work may have been completed due to clearing of stuck commits.
LOG.warn("Unable to complete inactive work for key {} and token {}.", shardedKey, workId);
Expand All @@ -247,10 +248,10 @@ synchronized Optional<Work> completeWorkAndGetNextWorkForKey(
}

private synchronized void removeCompletedWorkFromQueue(
Queue<Work> workQueue, ShardedKey shardedKey, WorkId workId) {
Queue<ExecutableWork> workQueue, ShardedKey shardedKey, WorkId workId) {
// avoid Preconditions.checkState here to prevent eagerly evaluating the
// format string parameters for the error message.
Work completedWork = workQueue.peek();
ExecutableWork completedWork = workQueue.peek();
if (completedWork == null) {
// Work may have been completed due to clearing of stuck commits.
LOG.warn("Active key {} without work, expected token {}", shardedKey, workId);
Expand All @@ -272,11 +273,12 @@ private synchronized void removeCompletedWorkFromQueue(

// We consumed the matching work item.
workQueue.remove();
decrementActiveWorkBudget(completedWork);
decrementActiveWorkBudget(completedWork.work());
}

private synchronized Optional<Work> getNextWork(Queue<Work> workQueue, ShardedKey shardedKey) {
Optional<Work> nextWork = Optional.ofNullable(workQueue.peek());
private synchronized Optional<ExecutableWork> getNextWork(
Queue<ExecutableWork> workQueue, ShardedKey shardedKey) {
Optional<ExecutableWork> nextWork = Optional.ofNullable(workQueue.peek());
if (!nextWork.isPresent()) {
Preconditions.checkState(workQueue == activeWork.remove(shardedKey));
}
Expand Down Expand Up @@ -304,10 +306,11 @@ private synchronized ImmutableMap<ShardedKey, WorkId> getStuckCommitsAt(
// Determine the stuck commit keys but complete them outside the loop iterating over
// activeWork as completeWork may delete the entry from activeWork.
ImmutableMap.Builder<ShardedKey, WorkId> stuckCommits = ImmutableMap.builder();
for (Entry<ShardedKey, Deque<Work>> entry : activeWork.entrySet()) {
for (Entry<ShardedKey, Deque<ExecutableWork>> entry : activeWork.entrySet()) {
ShardedKey shardedKey = entry.getKey();
@Nullable Work work = entry.getValue().peek();
if (work != null) {
@Nullable ExecutableWork executableWork = entry.getValue().peek();
if (executableWork != null) {
Work work = executableWork.work();
if (work.isStuckCommittingAt(stuckCommitDeadline)) {
LOG.error(
"Detected key {} stuck in COMMITTING state since {}, completing it with error.",
Expand Down Expand Up @@ -346,9 +349,9 @@ synchronized void printActiveWork(PrintWriter writer, Instant now) {
// Use StringBuilder because we are appending in loop.
StringBuilder activeWorkStatus = new StringBuilder();
int commitsPendingCount = 0;
for (Map.Entry<ShardedKey, Deque<Work>> entry : activeWork.entrySet()) {
Queue<Work> workQueue = Preconditions.checkNotNull(entry.getValue());
Work activeWork = Preconditions.checkNotNull(workQueue.peek());
for (Map.Entry<ShardedKey, Deque<ExecutableWork>> entry : activeWork.entrySet()) {
Queue<ExecutableWork> workQueue = Preconditions.checkNotNull(entry.getValue());
Work activeWork = Preconditions.checkNotNull(workQueue.peek()).work();
WorkItem workItem = activeWork.getWorkItem();
if (activeWork.isCommitPending()) {
if (++commitsPendingCount >= MAX_PRINTABLE_COMMIT_PENDING_KEYS) {
Expand Down
Loading

0 comments on commit f93a67a

Please sign in to comment.