From 0d037a993b51406e6a6f630791a8d17bfbb608a7 Mon Sep 17 00:00:00 2001 From: martin trieu Date: Tue, 11 Jun 2024 00:35:47 -0700 Subject: [PATCH] Fix incorrect Work.java cast and logging (#31528) --- .../dataflow/worker/streaming/Work.java | 5 +- .../worker/util/BoundedQueueExecutor.java | 13 +++- .../worker/util/BoundedQueueExecutorTest.java | 76 +++++++++++++++++-- 3 files changed, 81 insertions(+), 13 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java index fa46bac36b58..ed3f2671b40c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java @@ -58,7 +58,7 @@ */ @NotThreadSafe @Internal -public class Work { +public final class Work { private final ShardedKey shardedKey; private final WorkItem workItem; private final ProcessingContext processingContext; @@ -196,8 +196,7 @@ public String getLatencyTrackingId() { return latencyTrackingId; } - public final void queueCommit( - WorkItemCommitRequest commitRequest, ComputationState computationState) { + public void queueCommit(WorkItemCommitRequest commitRequest, ComputationState computationState) { setState(State.COMMIT_QUEUED); processingContext.workCommitter().accept(Commit.create(commitRequest, computationState, this)); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java index 9a4811693500..5e3f293f7d5b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java @@ -22,7 +22,8 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.annotation.concurrent.GuardedBy; -import org.apache.beam.runners.dataflow.worker.streaming.Work; +import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor.Guard; @@ -224,9 +225,10 @@ private void executeMonitorHeld(Runnable work, long workBytes) { () -> { String threadName = Thread.currentThread().getName(); try { - if (work instanceof Work) { + if (work instanceof ExecutableWork) { String workToken = - String.format("%016x", ((Work) work).getWorkItem().getWorkToken()); + debugFormattedWorkToken( + ((ExecutableWork) work).work().getWorkItem().getWorkToken()); Thread.currentThread().setName(threadName + ":" + workToken); } work.run(); @@ -242,6 +244,11 @@ private void executeMonitorHeld(Runnable work, long workBytes) { } } + @VisibleForTesting + public static String debugFormattedWorkToken(long workToken) { + return String.format("%016x", workToken); + } + private void decrementCounters(long workBytes) { monitor.enter(); --elementsOutstanding; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java index c0620952ef9e..e08c951975fa 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java @@ -23,9 +23,17 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork; +import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; +import org.apache.beam.runners.dataflow.worker.streaming.Work; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.joda.time.Instant; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -39,13 +47,31 @@ // released (2.11.0) @SuppressWarnings("unused") public class BoundedQueueExecutorTest { - @Rule public transient Timeout globalTimeout = Timeout.seconds(300); private static final long MAXIMUM_BYTES_OUTSTANDING = 10000000; private static final int DEFAULT_MAX_THREADS = 2; private static final int DEFAULT_THREAD_EXPIRATION_SEC = 60; - + @Rule public transient Timeout globalTimeout = Timeout.seconds(300); private BoundedQueueExecutor executor; + private static ExecutableWork createWork(Consumer executeWorkFn) { + return ExecutableWork.create( + Work.create( + Windmill.WorkItem.newBuilder() + .setKey(ByteString.EMPTY) + .setShardingKey(1) + .setWorkToken(33) + .setCacheToken(1) + .build(), + Watermarks.builder().setInputDataWatermark(Instant.now()).build(), + Work.createProcessingContext( + "computationId", + (a, b) -> Windmill.KeyedGetDataResponse.getDefaultInstance(), + ignored -> {}), + Instant::now, + Collections.emptyList()), + executeWorkFn); + } + private Runnable createSleepProcessWorkFn(CountDownLatch start, CountDownLatch stop) { Runnable runnable = () -> { @@ -203,14 +229,14 @@ public void testRecordTotalTimeMaxActiveThreadsUsed() throws Exception { executor.execute(m3, 1); assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS)); - assertEquals(0l, executor.allThreadsActiveTime()); + assertEquals(0L, executor.allThreadsActiveTime()); stop.countDown(); while (executor.activeCount() != 0) { // Waiting for all threads to be ended. Thread.sleep(200); } // Max pool size was reached so the allThreadsActiveTime() was updated. - assertThat(executor.allThreadsActiveTime(), greaterThan(0l)); + assertThat(executor.allThreadsActiveTime(), greaterThan(0L)); executor.shutdown(); } @@ -241,7 +267,7 @@ public void testRecordTotalTimeMaxActiveThreadsUsedWhenMaximumPoolSizeUpdated() executor.execute(m3, 1); assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS)); - assertEquals(0l, executor.allThreadsActiveTime()); + assertEquals(0L, executor.allThreadsActiveTime()); // Increase the max thread count executor.setMaximumPoolSize(5, 105); stop.countDown(); @@ -251,13 +277,13 @@ public void testRecordTotalTimeMaxActiveThreadsUsedWhenMaximumPoolSizeUpdated() } // Max pool size was updated during execution but allThreadsActiveTime() was still recorded // for the thread which reached the old max pool size. - assertThat(executor.allThreadsActiveTime(), greaterThan(0l)); + assertThat(executor.allThreadsActiveTime(), greaterThan(0L)); executor.shutdown(); } @Test - public void testRenderSummaryHtml() throws Exception { + public void testRenderSummaryHtml() { String expectedSummaryHtml = "Worker Threads: 0/2
/n" + "Active Threads: 0
/n" @@ -265,4 +291,40 @@ public void testRenderSummaryHtml() throws Exception { + "Work Queue Bytes: 0/10000000
/n"; assertEquals(expectedSummaryHtml, executor.summaryHtml()); } + + @Test + public void testExecute_updatesThreadNameForExecutableWork() throws InterruptedException { + CountDownLatch waitForWorkExecution = new CountDownLatch(1); + ExecutableWork executableWork = + createWork( + work -> { + assertTrue( + Thread.currentThread() + .getName() + .contains( + BoundedQueueExecutor.debugFormattedWorkToken( + work.getWorkItem().getWorkToken()))); + waitForWorkExecution.countDown(); + }); + executor.execute(executableWork, executableWork.getWorkItem().getSerializedSize()); + waitForWorkExecution.await(); + } + + @Test + public void testForceExecute_updatesThreadNameForExecutableWork() throws InterruptedException { + CountDownLatch waitForWorkExecution = new CountDownLatch(1); + ExecutableWork executableWork = + createWork( + work -> { + assertTrue( + Thread.currentThread() + .getName() + .contains( + BoundedQueueExecutor.debugFormattedWorkToken( + work.getWorkItem().getWorkToken()))); + waitForWorkExecution.countDown(); + }); + executor.forceExecute(executableWork, executableWork.getWorkItem().getSerializedSize()); + waitForWorkExecution.await(); + } }