Skip to content

Commit

Permalink
plumb backend worker token to work items (apache#32777)
Browse files Browse the repository at this point in the history
  • Loading branch information
m-trieu authored and reeba212 committed Dec 4, 2024
1 parent 4f2ed62 commit efc1de0
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,18 +240,20 @@ synchronized Optional<ExecutableWork> completeWorkAndGetNextWorkForKey(
@Nullable Queue<ExecutableWork> workQueue = activeWork.get(shardedKey);
if (workQueue == null) {
// Work may have been completed due to clearing of stuck commits.
LOG.warn("Unable to complete inactive work for key {} and token {}.", shardedKey, workId);
LOG.warn(
"Unable to complete inactive work for key={} and token={}. Work queue for key does not exist.",
shardedKey,
workId);
return Optional.empty();
}

removeCompletedWorkFromQueue(workQueue, shardedKey, workId);
return getNextWork(workQueue, shardedKey);
}

private synchronized void removeCompletedWorkFromQueue(
Queue<ExecutableWork> workQueue, ShardedKey shardedKey, WorkId workId) {
// avoid Preconditions.checkState here to prevent eagerly evaluating the
// format string parameters for the error message.
ExecutableWork completedWork = workQueue.peek();
@Nullable ExecutableWork completedWork = workQueue.peek();
if (completedWork == null) {
// Work may have been completed due to clearing of stuck commits.
LOG.warn("Active key {} without work, expected token {}", shardedKey, workId);
Expand Down Expand Up @@ -337,8 +339,18 @@ synchronized void printActiveWork(PrintWriter writer, Instant now) {
writer.println(
"<table border=\"1\" "
+ "style=\"border-collapse:collapse;padding:5px;border-spacing:5px;border:1px\">");
// Columns.
writer.println(
"<tr><th>Key</th><th>Token</th><th>Queued</th><th>Active For</th><th>State</th><th>State Active For</th><th>Processing Thread</th></tr>");
"<tr>"
+ "<th>Key</th>"
+ "<th>Token</th>"
+ "<th>Queued</th>"
+ "<th>Active For</th>"
+ "<th>State</th>"
+ "<th>State Active For</th>"
+ "<th>Processing Thread</th>"
+ "<th>Backend</th>"
+ "</tr>");
// Use StringBuilder because we are appending in loop.
StringBuilder activeWorkStatus = new StringBuilder();
int commitsPendingCount = 0;
Expand Down Expand Up @@ -366,6 +378,8 @@ synchronized void printActiveWork(PrintWriter writer, Instant now) {
activeWorkStatus.append(elapsedString(activeWork.getStateStartTime(), now));
activeWorkStatus.append("</td><td>");
activeWorkStatus.append(activeWork.getProcessingThreadName());
activeWorkStatus.append("</td><td>");
activeWorkStatus.append(activeWork.backendWorkerToken());
activeWorkStatus.append("</td></tr>\n");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
/**
* Represents the state of an attempt to process a {@link WorkItem} by executing user code.
*
* @implNote Not thread safe, should not be executed or accessed by more than 1 thread at a time.
* @implNote Not thread safe, should not be modified by more than 1 thread at a time.
*/
@NotThreadSafe
@Internal
Expand All @@ -70,7 +70,7 @@ public final class Work implements RefreshableWork {
private final Map<LatencyAttribution.State, Duration> totalDurationPerState;
private final WorkId id;
private final String latencyTrackingId;
private TimedState currentState;
private volatile TimedState currentState;
private volatile boolean isFailed;
private volatile String processingThreadName = "";

Expand Down Expand Up @@ -111,7 +111,18 @@ public static ProcessingContext createProcessingContext(
GetDataClient getDataClient,
Consumer<Commit> workCommitter,
HeartbeatSender heartbeatSender) {
return ProcessingContext.create(computationId, getDataClient, workCommitter, heartbeatSender);
return ProcessingContext.create(
computationId, getDataClient, workCommitter, heartbeatSender, /* backendWorkerToken= */ "");
}

public static ProcessingContext createProcessingContext(
String computationId,
GetDataClient getDataClient,
Consumer<Commit> workCommitter,
HeartbeatSender heartbeatSender,
String backendWorkerToken) {
return ProcessingContext.create(
computationId, getDataClient, workCommitter, heartbeatSender, backendWorkerToken);
}

private static LatencyAttribution.Builder createLatencyAttributionWithActiveLatencyBreakdown(
Expand Down Expand Up @@ -168,6 +179,10 @@ public GlobalData fetchSideInput(GlobalDataRequest request) {
return processingContext.getDataClient().getSideInputData(request);
}

public String backendWorkerToken() {
return processingContext.backendWorkerToken();
}

public Watermarks watermarks() {
return watermarks;
}
Expand Down Expand Up @@ -351,9 +366,10 @@ private static ProcessingContext create(
String computationId,
GetDataClient getDataClient,
Consumer<Commit> workCommitter,
HeartbeatSender heartbeatSender) {
HeartbeatSender heartbeatSender,
String backendWorkerToken) {
return new AutoValue_Work_ProcessingContext(
computationId, getDataClient, heartbeatSender, workCommitter);
computationId, getDataClient, heartbeatSender, workCommitter, backendWorkerToken);
}

/** Computation that the {@link Work} belongs to. */
Expand All @@ -370,6 +386,8 @@ private static ProcessingContext create(
*/
public abstract Consumer<Commit> workCommitter();

public abstract String backendWorkerToken();

private Optional<KeyedGetDataResponse> fetchKeyedState(KeyedGetDataRequest request) {
return Optional.ofNullable(getDataClient().getStateData(computationId(), request));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,11 @@ private void consumeAssembledWorkItem(AssembledWorkItem assembledWorkItem) {

private Work.ProcessingContext createProcessingContext(String computationId) {
return Work.createProcessingContext(
computationId, getDataClient.get(), workCommitter.get()::commit, heartbeatSender.get());
computationId,
getDataClient.get(),
workCommitter.get()::commit,
heartbeatSender.get(),
backendWorkerToken());
}

@Override
Expand Down

0 comments on commit efc1de0

Please sign in to comment.