diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java index 4607096dd66a..aec52cd7d9a6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java @@ -240,18 +240,20 @@ synchronized Optional completeWorkAndGetNextWorkForKey( @Nullable Queue workQueue = activeWork.get(shardedKey); if (workQueue == null) { // Work may have been completed due to clearing of stuck commits. - LOG.warn("Unable to complete inactive work for key {} and token {}.", shardedKey, workId); + LOG.warn( + "Unable to complete inactive work for key={} and token={}. Work queue for key does not exist.", + shardedKey, + workId); return Optional.empty(); } + removeCompletedWorkFromQueue(workQueue, shardedKey, workId); return getNextWork(workQueue, shardedKey); } private synchronized void removeCompletedWorkFromQueue( Queue workQueue, ShardedKey shardedKey, WorkId workId) { - // avoid Preconditions.checkState here to prevent eagerly evaluating the - // format string parameters for the error message. - ExecutableWork completedWork = workQueue.peek(); + @Nullable ExecutableWork completedWork = workQueue.peek(); if (completedWork == null) { // Work may have been completed due to clearing of stuck commits. LOG.warn("Active key {} without work, expected token {}", shardedKey, workId); @@ -337,8 +339,18 @@ synchronized void printActiveWork(PrintWriter writer, Instant now) { writer.println( ""); + // Columns. writer.println( - ""); + "" + + "" + + "" + + "" + + "" + + "" + + "" + + "" + + "" + + ""); // Use StringBuilder because we are appending in loop. StringBuilder activeWorkStatus = new StringBuilder(); int commitsPendingCount = 0; @@ -366,6 +378,8 @@ synchronized void printActiveWork(PrintWriter writer, Instant now) { activeWorkStatus.append(elapsedString(activeWork.getStateStartTime(), now)); activeWorkStatus.append("\n"); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java index 03d1e1ae469a..6f97cbca9a80 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java @@ -56,7 +56,7 @@ /** * Represents the state of an attempt to process a {@link WorkItem} by executing user code. * - * @implNote Not thread safe, should not be executed or accessed by more than 1 thread at a time. + * @implNote Not thread safe, should not be modified by more than 1 thread at a time. */ @NotThreadSafe @Internal @@ -70,7 +70,7 @@ public final class Work implements RefreshableWork { private final Map totalDurationPerState; private final WorkId id; private final String latencyTrackingId; - private TimedState currentState; + private volatile TimedState currentState; private volatile boolean isFailed; private volatile String processingThreadName = ""; @@ -111,7 +111,18 @@ public static ProcessingContext createProcessingContext( GetDataClient getDataClient, Consumer workCommitter, HeartbeatSender heartbeatSender) { - return ProcessingContext.create(computationId, getDataClient, workCommitter, heartbeatSender); + return ProcessingContext.create( + computationId, getDataClient, workCommitter, heartbeatSender, /* backendWorkerToken= */ ""); + } + + public static ProcessingContext createProcessingContext( + String computationId, + GetDataClient getDataClient, + Consumer workCommitter, + HeartbeatSender heartbeatSender, + String backendWorkerToken) { + return ProcessingContext.create( + computationId, getDataClient, workCommitter, heartbeatSender, backendWorkerToken); } private static LatencyAttribution.Builder createLatencyAttributionWithActiveLatencyBreakdown( @@ -168,6 +179,10 @@ public GlobalData fetchSideInput(GlobalDataRequest request) { return processingContext.getDataClient().getSideInputData(request); } + public String backendWorkerToken() { + return processingContext.backendWorkerToken(); + } + public Watermarks watermarks() { return watermarks; } @@ -351,9 +366,10 @@ private static ProcessingContext create( String computationId, GetDataClient getDataClient, Consumer workCommitter, - HeartbeatSender heartbeatSender) { + HeartbeatSender heartbeatSender, + String backendWorkerToken) { return new AutoValue_Work_ProcessingContext( - computationId, getDataClient, heartbeatSender, workCommitter); + computationId, getDataClient, heartbeatSender, workCommitter, backendWorkerToken); } /** Computation that the {@link Work} belongs to. */ @@ -370,6 +386,8 @@ private static ProcessingContext create( */ public abstract Consumer workCommitter(); + public abstract String backendWorkerToken(); + private Optional fetchKeyedState(KeyedGetDataRequest request) { return Optional.ofNullable(getDataClient().getStateData(computationId(), request)); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java index 45d010d7cfac..19de998b1da8 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java @@ -254,7 +254,11 @@ private void consumeAssembledWorkItem(AssembledWorkItem assembledWorkItem) { private Work.ProcessingContext createProcessingContext(String computationId) { return Work.createProcessingContext( - computationId, getDataClient.get(), workCommitter.get()::commit, heartbeatSender.get()); + computationId, + getDataClient.get(), + workCommitter.get()::commit, + heartbeatSender.get(), + backendWorkerToken()); } @Override
KeyTokenQueuedActive ForStateState Active ForProcessing Thread
KeyTokenQueuedActive ForStateState Active ForProcessing ThreadBackend
"); activeWorkStatus.append(activeWork.getProcessingThreadName()); + activeWorkStatus.append(""); + activeWorkStatus.append(activeWork.backendWorkerToken()); activeWorkStatus.append("