Skip to content

Commit

Permalink
Report Outstanding Bundles and Bytes (#29041)
Browse files Browse the repository at this point in the history
* New active threads metric and initial tests

* create unit tests for max active threads metric

* remove test filter

* fix formatting with spotless apply

* revert format changes

* revert format change straggler

* revert format change straggler

* remove unnecessary comment

* synchronize threads in unit tests

* fix formatting with spotless apply

* remove comments and rename counter

* fix formatting with spotless apply

* fix tests for StreamingDataflowWorker change and fixed createMockWork

* Add outstanding bytes and bundle metrics + outstanding bytes test + refactor metric unit tests

* report outstand bundles and byte metrics

* Add variable for max bytes outstanding

* remove duplicate test
  • Loading branch information
edman124 authored Oct 31, 2023
1 parent 28f6e33 commit 41369a7
Show file tree
Hide file tree
Showing 4 changed files with 207 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ public enum StreamingSystemCounterNames {
TIME_AT_MAX_ACTIVE_THREADS("dataflow_time_at_max_active_threads"),
ACTIVE_THREADS("dataflow_active_threads"),
TOTAL_ALLOCATED_THREADS("dataflow_total_allocated_threads"),
OUTSTANDING_BYTES("dataflow_outstanding_bytes"),
MAX_OUTSTANDING_BYTES("dataflow_max_outstanding_bytes"),
OUTSTANDING_BUNDLES("dataflow_outstanding_bundles"),
MAX_OUTSTANDING_BUNDLES("dataflow_max_outstanding_bundles"),
WINDMILL_QUOTA_THROTTLING("dataflow_streaming_engine_throttled_msecs"),
MEMORY_THRASHING("dataflow_streaming_engine_user_worker_thrashing");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ public class StreamingDataflowWorker {
private final Counter<Long, Long> timeAtMaxActiveThreads;
private final Counter<Integer, Integer> activeThreads;
private final Counter<Integer, Integer> totalAllocatedThreads;
private final Counter<Long, Long> outstandingBytes;
private final Counter<Long, Long> maxOutstandingBytes;
private final Counter<Long, Long> outstandingBundles;
private final Counter<Long, Long> maxOutstandingBundles;
private final Counter<Integer, Integer> windmillMaxObservedWorkItemCommitBytes;
private final Counter<Integer, Integer> memoryThrashing;
private final boolean publishCounters;
Expand Down Expand Up @@ -337,6 +341,18 @@ public class StreamingDataflowWorker {
StreamingSystemCounterNames.TIME_AT_MAX_ACTIVE_THREADS.counterName());
this.activeThreads =
pendingCumulativeCounters.intSum(StreamingSystemCounterNames.ACTIVE_THREADS.counterName());
this.outstandingBytes =
pendingCumulativeCounters.longSum(
StreamingSystemCounterNames.OUTSTANDING_BYTES.counterName());
this.maxOutstandingBytes =
pendingCumulativeCounters.longSum(
StreamingSystemCounterNames.MAX_OUTSTANDING_BYTES.counterName());
this.outstandingBundles =
pendingCumulativeCounters.longSum(
StreamingSystemCounterNames.OUTSTANDING_BUNDLES.counterName());
this.maxOutstandingBundles =
pendingCumulativeCounters.longSum(
StreamingSystemCounterNames.MAX_OUTSTANDING_BUNDLES.counterName());
this.totalAllocatedThreads =
pendingCumulativeCounters.intSum(
StreamingSystemCounterNames.TOTAL_ALLOCATED_THREADS.counterName());
Expand Down Expand Up @@ -1721,6 +1737,14 @@ private void updateThreadMetrics() {
activeThreads.addValue(workUnitExecutor.activeCount());
totalAllocatedThreads.getAndReset();
totalAllocatedThreads.addValue(chooseMaximumNumberOfThreads());
outstandingBytes.getAndReset();
outstandingBytes.addValue(workUnitExecutor.bytesOutstanding());
maxOutstandingBytes.getAndReset();
maxOutstandingBytes.addValue(workUnitExecutor.maximumBytesOutstanding());
outstandingBundles.getAndReset();
outstandingBundles.addValue(workUnitExecutor.elementsOutstanding());
maxOutstandingBundles.getAndReset();
maxOutstandingBundles.addValue(workUnitExecutor.maximumElementsOutstanding());
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,22 @@ public int activeCount() {
return activeCount.intValue();
}

public long bytesOutstanding() {
return bytesOutstanding;
}

public long elementsOutstanding() {
return elementsOutstanding;
}

public long maximumBytesOutstanding() {
return maximumBytesOutstanding;
}

public long maximumElementsOutstanding() {
return maximumElementsOutstanding;
}

public String summaryHtml() {
monitor.enter();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
Expand Down Expand Up @@ -226,6 +227,7 @@ public class StreamingDataflowWorkerTest {
private static final ByteString DEFAULT_KEY_BYTES = ByteString.copyFromUtf8(DEFAULT_KEY_STRING);
private static final String DEFAULT_DATA_STRING = "data";
private static final String DEFAULT_DESTINATION_STREAM_ID = "out";
private static final long MAXIMUM_BYTES_OUTSTANDING = 10000000;
private static final Function<GetDataRequest, GetDataResponse> EMPTY_DATA_RESPONDER =
(GetDataRequest request) -> {
GetDataResponse.Builder builder = GetDataResponse.newBuilder();
Expand Down Expand Up @@ -2747,7 +2749,7 @@ public void testMaxThreadMetric() throws Exception {
threadExpiration,
TimeUnit.SECONDS,
maxThreads,
10000000,
MAXIMUM_BYTES_OUTSTANDING,
new ThreadFactoryBuilder()
.setNameFormat("DataflowWorkUnits-%d")
.setDaemon(true)
Expand Down Expand Up @@ -2791,12 +2793,14 @@ public void testMaxThreadMetric() throws Exception {
executor.shutdown();
}

volatile boolean stop = false;

@Test
public void testActiveThreadMetric() throws Exception {
int maxThreads = 5;
int threadExpirationSec = 60;
CountDownLatch processStart1 = new CountDownLatch(2);
CountDownLatch processStart2 = new CountDownLatch(3);
CountDownLatch processStart3 = new CountDownLatch(4);
AtomicBoolean stop = new AtomicBoolean(false);
// setting up actual implementation of executor instead of mocking to keep track of
// active thread count.
BoundedQueueExecutor executor =
Expand All @@ -2805,7 +2809,7 @@ public void testActiveThreadMetric() throws Exception {
threadExpirationSec,
TimeUnit.SECONDS,
maxThreads,
10000000,
MAXIMUM_BYTES_OUTSTANDING,
new ThreadFactoryBuilder()
.setNameFormat("DataflowWorkUnits-%d")
.setDaemon(true)
Expand All @@ -2823,11 +2827,11 @@ public void testActiveThreadMetric() throws Exception {

Consumer<Work> sleepProcessWorkFn =
unused -> {
synchronized (this) {
this.notify();
}
processStart1.countDown();
processStart2.countDown();
processStart3.countDown();
int count = 0;
while (!stop) {
while (!stop.get()) {
count += 1;
}
};
Expand All @@ -2840,27 +2844,163 @@ public void testActiveThreadMetric() throws Exception {
assertEquals(0, executor.activeCount());

assertTrue(computationState.activateWork(key1Shard1, m2));
synchronized (this) {
executor.execute(m2, m2.getWorkItem().getSerializedSize());
this.wait();
// Seems current executor executes the initial work item twice
this.wait();
}
// activate work starts executing work if no other work is queued for that shard
executor.execute(m2, m2.getWorkItem().getSerializedSize());
processStart1.await();
assertEquals(2, executor.activeCount());

assertTrue(computationState.activateWork(key1Shard1, m3));
assertTrue(computationState.activateWork(key1Shard1, m4));
synchronized (this) {
executor.execute(m3, m3.getWorkItem().getSerializedSize());
this.wait();
}
executor.execute(m3, m3.getWorkItem().getSerializedSize());
processStart2.await();

assertEquals(3, executor.activeCount());
synchronized (this) {
executor.execute(m4, m4.getWorkItem().getSerializedSize());
this.wait();
}
executor.execute(m4, m4.getWorkItem().getSerializedSize());
processStart3.await();
assertEquals(4, executor.activeCount());
stop = true;
stop.set(true);
executor.shutdown();
}

@Test
public void testOutstandingBytesMetric() throws Exception {
int maxThreads = 5;
int threadExpirationSec = 60;
CountDownLatch processStart1 = new CountDownLatch(2);
CountDownLatch processStart2 = new CountDownLatch(3);
CountDownLatch processStart3 = new CountDownLatch(4);
AtomicBoolean stop = new AtomicBoolean(false);
// setting up actual implementation of executor instead of mocking to keep track of
// active thread count.
BoundedQueueExecutor executor =
new BoundedQueueExecutor(
maxThreads,
threadExpirationSec,
TimeUnit.SECONDS,
maxThreads,
MAXIMUM_BYTES_OUTSTANDING,
new ThreadFactoryBuilder()
.setNameFormat("DataflowWorkUnits-%d")
.setDaemon(true)
.build());

ComputationState computationState =
new ComputationState(
"computation",
defaultMapTask(Arrays.asList(makeSourceInstruction(StringUtf8Coder.of()))),
executor,
ImmutableMap.of(),
null);

ShardedKey key1Shard1 = ShardedKey.create(ByteString.copyFromUtf8("key1"), 1);
Consumer<Work> sleepProcessWorkFn =
unused -> {
processStart1.countDown();
processStart2.countDown();
processStart3.countDown();
int count = 0;
while (!stop.get()) {
count += 1;
}
};

Work m2 = createMockWork(2, sleepProcessWorkFn);

Work m3 = createMockWork(3, sleepProcessWorkFn);

Work m4 = createMockWork(4, sleepProcessWorkFn);
assertEquals(0, executor.bytesOutstanding());

long bytes = m2.getWorkItem().getSerializedSize();
assertTrue(computationState.activateWork(key1Shard1, m2));
// activate work starts executing work if no other work is queued for that shard
bytes += m2.getWorkItem().getSerializedSize();
executor.execute(m2, m2.getWorkItem().getSerializedSize());
processStart1.await();
assertEquals(bytes, executor.bytesOutstanding());

assertTrue(computationState.activateWork(key1Shard1, m3));
assertTrue(computationState.activateWork(key1Shard1, m4));

bytes += m3.getWorkItem().getSerializedSize();
executor.execute(m3, m3.getWorkItem().getSerializedSize());
processStart2.await();
assertEquals(bytes, executor.bytesOutstanding());

bytes += m4.getWorkItem().getSerializedSize();
executor.execute(m4, m4.getWorkItem().getSerializedSize());
processStart3.await();
assertEquals(bytes, executor.bytesOutstanding());
stop.set(true);
executor.shutdown();
}

@Test
public void testOutstandingBundlesMetric() throws Exception {
int maxThreads = 5;
int threadExpirationSec = 60;
CountDownLatch processStart1 = new CountDownLatch(2);
CountDownLatch processStart2 = new CountDownLatch(3);
CountDownLatch processStart3 = new CountDownLatch(4);
AtomicBoolean stop = new AtomicBoolean(false);
// setting up actual implementation of executor instead of mocking to keep track of
// active thread count.
BoundedQueueExecutor executor =
new BoundedQueueExecutor(
maxThreads,
threadExpirationSec,
TimeUnit.SECONDS,
maxThreads,
MAXIMUM_BYTES_OUTSTANDING,
new ThreadFactoryBuilder()
.setNameFormat("DataflowWorkUnits-%d")
.setDaemon(true)
.build());

ComputationState computationState =
new ComputationState(
"computation",
defaultMapTask(Arrays.asList(makeSourceInstruction(StringUtf8Coder.of()))),
executor,
ImmutableMap.of(),
null);

ShardedKey key1Shard1 = ShardedKey.create(ByteString.copyFromUtf8("key1"), 1);
Consumer<Work> sleepProcessWorkFn =
unused -> {
processStart1.countDown();
processStart2.countDown();
processStart3.countDown();
int count = 0;
while (!stop.get()) {
count += 1;
}
};

Work m2 = createMockWork(2, sleepProcessWorkFn);

Work m3 = createMockWork(3, sleepProcessWorkFn);

Work m4 = createMockWork(4, sleepProcessWorkFn);
assertEquals(0, executor.elementsOutstanding());

assertTrue(computationState.activateWork(key1Shard1, m2));
// activate work starts executing work if no other work is queued for that shard
executor.execute(m2, m2.getWorkItem().getSerializedSize());
processStart1.await();
assertEquals(2, executor.elementsOutstanding());

assertTrue(computationState.activateWork(key1Shard1, m3));
assertTrue(computationState.activateWork(key1Shard1, m4));

executor.execute(m3, m3.getWorkItem().getSerializedSize());
processStart2.await();
assertEquals(3, executor.elementsOutstanding());

executor.execute(m4, m4.getWorkItem().getSerializedSize());
processStart3.await();
assertEquals(4, executor.elementsOutstanding());
stop.set(true);
executor.shutdown();
}

Expand Down

0 comments on commit 41369a7

Please sign in to comment.