diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java index c80c3a882e52..4607096dd66a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java @@ -338,7 +338,7 @@ synchronized void printActiveWork(PrintWriter writer, Instant now) { "
Key | Token | Queued | Active For | State | State Active For | |
---|---|---|---|---|---|---|
Key | Token | Queued | Active For | State | State Active For | Processing Thread | "); activeWorkStatus.append(elapsedString(activeWork.getStateStartTime(), now)); + activeWorkStatus.append(" | "); + activeWorkStatus.append(activeWork.getProcessingThreadName()); activeWorkStatus.append(" | \n"); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java index e77823602eda..03d1e1ae469a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java @@ -72,6 +72,7 @@ public final class Work implements RefreshableWork { private final String latencyTrackingId; private TimedState currentState; private volatile boolean isFailed; + private volatile String processingThreadName = ""; private Work( WorkItem workItem, @@ -188,6 +189,14 @@ public void setState(State state) { this.currentState = TimedState.create(state, now); } + public String getProcessingThreadName() { + return processingThreadName; + } + + public void setProcessingThreadName(String processingThreadName) { + this.processingThreadName = processingThreadName; + } + @Override public void setFailed() { this.isFailed = true; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java index 5e3f293f7d5b..9286be84ceaa 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java @@ -22,8 +22,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.annotation.concurrent.GuardedBy; -import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor.Guard; @@ -223,18 +221,10 @@ private void executeMonitorHeld(Runnable work, long workBytes) { try { executor.execute( () -> { - String threadName = Thread.currentThread().getName(); try { - if (work instanceof ExecutableWork) { - String workToken = - debugFormattedWorkToken( - ((ExecutableWork) work).work().getWorkItem().getWorkToken()); - Thread.currentThread().setName(threadName + ":" + workToken); - } work.run(); } finally { decrementCounters(workBytes); - Thread.currentThread().setName(threadName); } }); } catch (RuntimeException e) { @@ -244,11 +234,6 @@ private void executeMonitorHeld(Runnable work, long workBytes) { } } - @VisibleForTesting - public static String debugFormattedWorkToken(long workToken) { - return String.format("%016x", workToken); - } - private void decrementCounters(long workBytes) { monitor.enter(); --elementsOutstanding; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java index 965a29126dc2..9a3e6eb6b099 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java @@ -225,6 +225,7 @@ private void processWork(ComputationState computationState, Work work) { Windmill.WorkItem workItem = work.getWorkItem(); String computationId = computationState.getComputationId(); ByteString key = workItem.getKey(); + work.setProcessingThreadName(Thread.currentThread().getName()); work.setState(Work.State.PROCESSING); setUpWorkLoggingContext(work.getLatencyTrackingId(), computationId); LOG.debug("Starting processing for {}:\n{}", computationId, work); @@ -288,6 +289,7 @@ private void processWork(ComputationState computationState, Work work) { } resetWorkLoggingContext(work.getLatencyTrackingId()); + work.setProcessingThreadName(""); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java index ad77958837a1..734925289920 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java @@ -293,40 +293,4 @@ public void testRenderSummaryHtml() { + "Work Queue Bytes: 0/10000000