From 08253ea8a6ec5c9a6c8f530fb390b0d4e0702356 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Fri, 29 Mar 2024 10:10:17 -0700 Subject: [PATCH] [Dataflow Streaming] Add workToken to thread name for easier debugging (#30786) Co-authored-by: Arun Pandian --- .../dataflow/worker/util/BoundedQueueExecutor.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java index b1a0e087ef65..f7f6fd91a8c8 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java @@ -22,6 +22,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.beam.runners.dataflow.worker.streaming.Work; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor.Guard; @@ -185,10 +186,17 @@ private void executeLockHeld(Runnable work, long workBytes) { try { executor.execute( () -> { + String threadName = Thread.currentThread().getName(); try { + if (work instanceof Work) { + String workToken = + String.format("%016x", ((Work) work).getWorkItem().getWorkToken()); + Thread.currentThread().setName(threadName + ":" + workToken); + } work.run(); } finally { decrementCounters(workBytes); + Thread.currentThread().setName(threadName); } }); } catch (RuntimeException e) {