diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSystemMetrics.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSystemMetrics.java index ee2a04af9982..640febc616ba 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSystemMetrics.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSystemMetrics.java @@ -42,6 +42,10 @@ public enum StreamingSystemCounterNames { TIME_AT_MAX_ACTIVE_THREADS("dataflow_time_at_max_active_threads"), ACTIVE_THREADS("dataflow_active_threads"), TOTAL_ALLOCATED_THREADS("dataflow_total_allocated_threads"), + OUTSTANDING_BYTES("dataflow_outstanding_bytes"), + MAX_OUTSTANDING_BYTES("dataflow_max_outstanding_bytes"), + OUTSTANDING_BUNDLES("dataflow_outstanding_bundles"), + MAX_OUTSTANDING_BUNDLES("dataflow_max_outstanding_bundles"), WINDMILL_QUOTA_THROTTLING("dataflow_streaming_engine_throttled_msecs"), MEMORY_THRASHING("dataflow_streaming_engine_user_worker_thrashing"); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 77f5205cf7e9..811250ee785c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -251,6 +251,10 @@ public class StreamingDataflowWorker { private final Counter timeAtMaxActiveThreads; private final Counter activeThreads; private final Counter totalAllocatedThreads; + private final Counter outstandingBytes; + private final Counter maxOutstandingBytes; + private final Counter outstandingBundles; + private final Counter maxOutstandingBundles; private final Counter windmillMaxObservedWorkItemCommitBytes; private final Counter memoryThrashing; private final boolean publishCounters; @@ -337,6 +341,18 @@ public class StreamingDataflowWorker { StreamingSystemCounterNames.TIME_AT_MAX_ACTIVE_THREADS.counterName()); this.activeThreads = pendingCumulativeCounters.intSum(StreamingSystemCounterNames.ACTIVE_THREADS.counterName()); + this.outstandingBytes = + pendingCumulativeCounters.longSum( + StreamingSystemCounterNames.OUTSTANDING_BYTES.counterName()); + this.maxOutstandingBytes = + pendingCumulativeCounters.longSum( + StreamingSystemCounterNames.MAX_OUTSTANDING_BYTES.counterName()); + this.outstandingBundles = + pendingCumulativeCounters.longSum( + StreamingSystemCounterNames.OUTSTANDING_BUNDLES.counterName()); + this.maxOutstandingBundles = + pendingCumulativeCounters.longSum( + StreamingSystemCounterNames.MAX_OUTSTANDING_BUNDLES.counterName()); this.totalAllocatedThreads = pendingCumulativeCounters.intSum( StreamingSystemCounterNames.TOTAL_ALLOCATED_THREADS.counterName()); @@ -1721,6 +1737,14 @@ private void updateThreadMetrics() { activeThreads.addValue(workUnitExecutor.activeCount()); totalAllocatedThreads.getAndReset(); totalAllocatedThreads.addValue(chooseMaximumNumberOfThreads()); + outstandingBytes.getAndReset(); + outstandingBytes.addValue(workUnitExecutor.bytesOutstanding()); + maxOutstandingBytes.getAndReset(); + maxOutstandingBytes.addValue(workUnitExecutor.maximumBytesOutstanding()); + outstandingBundles.getAndReset(); + outstandingBundles.addValue(workUnitExecutor.elementsOutstanding()); + maxOutstandingBundles.getAndReset(); + maxOutstandingBundles.addValue(workUnitExecutor.maximumElementsOutstanding()); } @VisibleForTesting diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java index a160b0e6ad03..dcff1f73f10f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java @@ -123,6 +123,22 @@ public int activeCount() { return activeCount.intValue(); } + public long bytesOutstanding() { + return bytesOutstanding; + } + + public long elementsOutstanding() { + return elementsOutstanding; + } + + public long maximumBytesOutstanding() { + return maximumBytesOutstanding; + } + + public long maximumElementsOutstanding() { + return maximumElementsOutstanding; + } + public String summaryHtml() { monitor.enter(); try { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index fdec36d688e9..6826607513d9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -80,6 +80,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; @@ -226,6 +227,7 @@ public class StreamingDataflowWorkerTest { private static final ByteString DEFAULT_KEY_BYTES = ByteString.copyFromUtf8(DEFAULT_KEY_STRING); private static final String DEFAULT_DATA_STRING = "data"; private static final String DEFAULT_DESTINATION_STREAM_ID = "out"; + private static final long MAXIMUM_BYTES_OUTSTANDING = 10000000; private static final Function EMPTY_DATA_RESPONDER = (GetDataRequest request) -> { GetDataResponse.Builder builder = GetDataResponse.newBuilder(); @@ -2747,7 +2749,7 @@ public void testMaxThreadMetric() throws Exception { threadExpiration, TimeUnit.SECONDS, maxThreads, - 10000000, + MAXIMUM_BYTES_OUTSTANDING, new ThreadFactoryBuilder() .setNameFormat("DataflowWorkUnits-%d") .setDaemon(true) @@ -2791,12 +2793,14 @@ public void testMaxThreadMetric() throws Exception { executor.shutdown(); } - volatile boolean stop = false; - @Test public void testActiveThreadMetric() throws Exception { int maxThreads = 5; int threadExpirationSec = 60; + CountDownLatch processStart1 = new CountDownLatch(2); + CountDownLatch processStart2 = new CountDownLatch(3); + CountDownLatch processStart3 = new CountDownLatch(4); + AtomicBoolean stop = new AtomicBoolean(false); // setting up actual implementation of executor instead of mocking to keep track of // active thread count. BoundedQueueExecutor executor = @@ -2805,7 +2809,7 @@ public void testActiveThreadMetric() throws Exception { threadExpirationSec, TimeUnit.SECONDS, maxThreads, - 10000000, + MAXIMUM_BYTES_OUTSTANDING, new ThreadFactoryBuilder() .setNameFormat("DataflowWorkUnits-%d") .setDaemon(true) @@ -2823,11 +2827,11 @@ public void testActiveThreadMetric() throws Exception { Consumer sleepProcessWorkFn = unused -> { - synchronized (this) { - this.notify(); - } + processStart1.countDown(); + processStart2.countDown(); + processStart3.countDown(); int count = 0; - while (!stop) { + while (!stop.get()) { count += 1; } }; @@ -2840,27 +2844,163 @@ public void testActiveThreadMetric() throws Exception { assertEquals(0, executor.activeCount()); assertTrue(computationState.activateWork(key1Shard1, m2)); - synchronized (this) { - executor.execute(m2, m2.getWorkItem().getSerializedSize()); - this.wait(); - // Seems current executor executes the initial work item twice - this.wait(); - } + // activate work starts executing work if no other work is queued for that shard + executor.execute(m2, m2.getWorkItem().getSerializedSize()); + processStart1.await(); assertEquals(2, executor.activeCount()); assertTrue(computationState.activateWork(key1Shard1, m3)); assertTrue(computationState.activateWork(key1Shard1, m4)); - synchronized (this) { - executor.execute(m3, m3.getWorkItem().getSerializedSize()); - this.wait(); - } + executor.execute(m3, m3.getWorkItem().getSerializedSize()); + processStart2.await(); + assertEquals(3, executor.activeCount()); - synchronized (this) { - executor.execute(m4, m4.getWorkItem().getSerializedSize()); - this.wait(); - } + executor.execute(m4, m4.getWorkItem().getSerializedSize()); + processStart3.await(); assertEquals(4, executor.activeCount()); - stop = true; + stop.set(true); + executor.shutdown(); + } + + @Test + public void testOutstandingBytesMetric() throws Exception { + int maxThreads = 5; + int threadExpirationSec = 60; + CountDownLatch processStart1 = new CountDownLatch(2); + CountDownLatch processStart2 = new CountDownLatch(3); + CountDownLatch processStart3 = new CountDownLatch(4); + AtomicBoolean stop = new AtomicBoolean(false); + // setting up actual implementation of executor instead of mocking to keep track of + // active thread count. + BoundedQueueExecutor executor = + new BoundedQueueExecutor( + maxThreads, + threadExpirationSec, + TimeUnit.SECONDS, + maxThreads, + MAXIMUM_BYTES_OUTSTANDING, + new ThreadFactoryBuilder() + .setNameFormat("DataflowWorkUnits-%d") + .setDaemon(true) + .build()); + + ComputationState computationState = + new ComputationState( + "computation", + defaultMapTask(Arrays.asList(makeSourceInstruction(StringUtf8Coder.of()))), + executor, + ImmutableMap.of(), + null); + + ShardedKey key1Shard1 = ShardedKey.create(ByteString.copyFromUtf8("key1"), 1); + Consumer sleepProcessWorkFn = + unused -> { + processStart1.countDown(); + processStart2.countDown(); + processStart3.countDown(); + int count = 0; + while (!stop.get()) { + count += 1; + } + }; + + Work m2 = createMockWork(2, sleepProcessWorkFn); + + Work m3 = createMockWork(3, sleepProcessWorkFn); + + Work m4 = createMockWork(4, sleepProcessWorkFn); + assertEquals(0, executor.bytesOutstanding()); + + long bytes = m2.getWorkItem().getSerializedSize(); + assertTrue(computationState.activateWork(key1Shard1, m2)); + // activate work starts executing work if no other work is queued for that shard + bytes += m2.getWorkItem().getSerializedSize(); + executor.execute(m2, m2.getWorkItem().getSerializedSize()); + processStart1.await(); + assertEquals(bytes, executor.bytesOutstanding()); + + assertTrue(computationState.activateWork(key1Shard1, m3)); + assertTrue(computationState.activateWork(key1Shard1, m4)); + + bytes += m3.getWorkItem().getSerializedSize(); + executor.execute(m3, m3.getWorkItem().getSerializedSize()); + processStart2.await(); + assertEquals(bytes, executor.bytesOutstanding()); + + bytes += m4.getWorkItem().getSerializedSize(); + executor.execute(m4, m4.getWorkItem().getSerializedSize()); + processStart3.await(); + assertEquals(bytes, executor.bytesOutstanding()); + stop.set(true); + executor.shutdown(); + } + + @Test + public void testOutstandingBundlesMetric() throws Exception { + int maxThreads = 5; + int threadExpirationSec = 60; + CountDownLatch processStart1 = new CountDownLatch(2); + CountDownLatch processStart2 = new CountDownLatch(3); + CountDownLatch processStart3 = new CountDownLatch(4); + AtomicBoolean stop = new AtomicBoolean(false); + // setting up actual implementation of executor instead of mocking to keep track of + // active thread count. + BoundedQueueExecutor executor = + new BoundedQueueExecutor( + maxThreads, + threadExpirationSec, + TimeUnit.SECONDS, + maxThreads, + MAXIMUM_BYTES_OUTSTANDING, + new ThreadFactoryBuilder() + .setNameFormat("DataflowWorkUnits-%d") + .setDaemon(true) + .build()); + + ComputationState computationState = + new ComputationState( + "computation", + defaultMapTask(Arrays.asList(makeSourceInstruction(StringUtf8Coder.of()))), + executor, + ImmutableMap.of(), + null); + + ShardedKey key1Shard1 = ShardedKey.create(ByteString.copyFromUtf8("key1"), 1); + Consumer sleepProcessWorkFn = + unused -> { + processStart1.countDown(); + processStart2.countDown(); + processStart3.countDown(); + int count = 0; + while (!stop.get()) { + count += 1; + } + }; + + Work m2 = createMockWork(2, sleepProcessWorkFn); + + Work m3 = createMockWork(3, sleepProcessWorkFn); + + Work m4 = createMockWork(4, sleepProcessWorkFn); + assertEquals(0, executor.elementsOutstanding()); + + assertTrue(computationState.activateWork(key1Shard1, m2)); + // activate work starts executing work if no other work is queued for that shard + executor.execute(m2, m2.getWorkItem().getSerializedSize()); + processStart1.await(); + assertEquals(2, executor.elementsOutstanding()); + + assertTrue(computationState.activateWork(key1Shard1, m3)); + assertTrue(computationState.activateWork(key1Shard1, m4)); + + executor.execute(m3, m3.getWorkItem().getSerializedSize()); + processStart2.await(); + assertEquals(3, executor.elementsOutstanding()); + + executor.execute(m4, m4.getWorkItem().getSerializedSize()); + processStart3.await(); + assertEquals(4, executor.elementsOutstanding()); + stop.set(true); executor.shutdown(); }