Skip to content

Commit

Permalink
Report total active threads (#28513)
Browse files Browse the repository at this point in the history
* New active threads metric and initial tests

* create unit tests for max active threads metric

* remove test filter

* fix formatting with spotless apply

* revert format changes

* revert format change straggler

* revert format change straggler

* remove unnecessary comment

* synchronize threads in unit tests

* fix formatting with spotless apply

* remove comments and rename counter

* fix formatting with spotless apply

* fix tests for StreamingDataflowWorker change and fixed createMockWork
  • Loading branch information
edman124 authored Oct 2, 2023
1 parent f2194f6 commit bf1e829
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public enum StreamingSystemCounterNames {
JAVA_HARNESS_MAX_MEMORY("dataflow_java_harness_max_memory"),
JAVA_HARNESS_RESTARTS("dataflow_java_harness_restarts"),
TIME_AT_MAX_ACTIVE_THREADS("dataflow_time_at_max_active_threads"),
ACTIVE_THREADS("dataflow_active_threads"),
TOTAL_ALLOCATED_THREADS("dataflow_total_allocated_threads"),
WINDMILL_QUOTA_THROTTLING("dataflow_streaming_engine_throttled_msecs"),
MEMORY_THRASHING("dataflow_streaming_engine_user_worker_thrashing");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ public class StreamingDataflowWorker {
private final Counter<Long, Long> javaHarnessUsedMemory;
private final Counter<Long, Long> javaHarnessMaxMemory;
private final Counter<Long, Long> timeAtMaxActiveThreads;
private final Counter<Integer, Integer> activeThreads;
private final Counter<Integer, Integer> totalAllocatedThreads;
private final Counter<Integer, Integer> windmillMaxObservedWorkItemCommitBytes;
private final Counter<Integer, Integer> memoryThrashing;
private final boolean publishCounters;
Expand Down Expand Up @@ -330,6 +332,11 @@ public class StreamingDataflowWorker {
this.timeAtMaxActiveThreads =
pendingCumulativeCounters.longSum(
StreamingSystemCounterNames.TIME_AT_MAX_ACTIVE_THREADS.counterName());
this.activeThreads =
pendingCumulativeCounters.intSum(StreamingSystemCounterNames.ACTIVE_THREADS.counterName());
this.totalAllocatedThreads =
pendingCumulativeCounters.intSum(
StreamingSystemCounterNames.TOTAL_ALLOCATED_THREADS.counterName());
this.windmillMaxObservedWorkItemCommitBytes =
pendingCumulativeCounters.intMax(
StreamingSystemCounterNames.WINDMILL_MAX_WORK_ITEM_COMMIT_BYTES.counterName());
Expand Down Expand Up @@ -1702,6 +1709,10 @@ private void updateVMMetrics() {
private void updateThreadMetrics() {
timeAtMaxActiveThreads.getAndReset();
timeAtMaxActiveThreads.addValue(workUnitExecutor.allThreadsActiveTime());
activeThreads.getAndReset();
activeThreads.addValue(workUnitExecutor.activeCount());
totalAllocatedThreads.getAndReset();
totalAllocatedThreads.addValue(chooseMaximumNumberOfThreads());
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ public long allThreadsActiveTime() {
return totalTimeMaxActiveThreadsUsed;
}

public int activeCount() {
return activeCount.intValue();
}

public String summaryHtml() {
monitor.enter();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ static Work createMockWork(long workToken, Consumer<Work> processWorkFn) {
Windmill.WorkItem.newBuilder().setKey(ByteString.EMPTY).setWorkToken(workToken).build(),
Instant::now,
Collections.emptyList(),
work -> {});
processWorkFn);
}

private byte[] intervalWindowBytes(IntervalWindow window) throws Exception {
Expand Down Expand Up @@ -2793,6 +2793,79 @@ public void testMaxThreadMetric() throws Exception {
executor.shutdown();
}

volatile boolean stop = false;

@Test
public void testActiveThreadMetric() throws Exception {
int maxThreads = 5;
int threadExpirationSec = 60;
// setting up actual implementation of executor instead of mocking to keep track of
// active thread count.
BoundedQueueExecutor executor =
new BoundedQueueExecutor(
maxThreads,
threadExpirationSec,
TimeUnit.SECONDS,
maxThreads,
10000000,
new ThreadFactoryBuilder()
.setNameFormat("DataflowWorkUnits-%d")
.setDaemon(true)
.build());

ComputationState computationState =
new ComputationState(
"computation",
defaultMapTask(Arrays.asList(makeSourceInstruction(StringUtf8Coder.of()))),
executor,
ImmutableMap.of(),
null);

ShardedKey key1Shard1 = ShardedKey.create(ByteString.copyFromUtf8("key1"), 1);

Consumer<Work> sleepProcessWorkFn =
unused -> {
synchronized (this) {
this.notify();
}
int count = 0;
while (!stop) {
count += 1;
}
};

Work m2 = createMockWork(2, sleepProcessWorkFn);

Work m3 = createMockWork(3, sleepProcessWorkFn);

Work m4 = createMockWork(4, sleepProcessWorkFn);
assertEquals(0, executor.activeCount());

assertTrue(computationState.activateWork(key1Shard1, m2));
synchronized (this) {
executor.execute(m2, m2.getWorkItem().getSerializedSize());
this.wait();
// Seems current executor executes the initial work item twice
this.wait();
}
assertEquals(2, executor.activeCount());

assertTrue(computationState.activateWork(key1Shard1, m3));
assertTrue(computationState.activateWork(key1Shard1, m4));
synchronized (this) {
executor.execute(m3, m3.getWorkItem().getSerializedSize());
this.wait();
}
assertEquals(3, executor.activeCount());
synchronized (this) {
executor.execute(m4, m4.getWorkItem().getSerializedSize());
this.wait();
}
assertEquals(4, executor.activeCount());
stop = true;
executor.shutdown();
}

@Test
public void testExceptionInvalidatesCache() throws Exception {
// We'll need to force the system to limit bundles to one message at a time.
Expand Down

0 comments on commit bf1e829

Please sign in to comment.