Skip to content

Commit

Permalink
track work before it gets queued/executed as it is awaiting a free th…
Browse files Browse the repository at this point in the history
…read for processing
  • Loading branch information
m-trieu committed Oct 15, 2024
1 parent 1f575d4 commit 7081b95
Showing 1 changed file with 9 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor.Guard;
Expand All @@ -37,6 +38,7 @@ public class BoundedQueueExecutor {
private final Monitor monitor = new Monitor();
private int elementsOutstanding = 0;
private long bytesOutstanding = 0;
private final AtomicLong numQueued;

@GuardedBy("this")
private int maximumElementsOutstanding;
Expand All @@ -61,6 +63,7 @@ public BoundedQueueExecutor(
long maximumBytesOutstanding,
ThreadFactory threadFactory) {
this.maximumPoolSize = maximumPoolSize;
this.numQueued = new AtomicLong();
executor =
new ThreadPoolExecutor(
maximumPoolSize,
Expand Down Expand Up @@ -99,6 +102,7 @@ protected void afterExecute(Runnable r, Throwable t) {
// Before adding a Work to the queue, check that there are enough bytes of space or no other
// outstanding elements of work.
public void execute(Runnable work, long workBytes) {
numQueued.incrementAndGet();
monitor.enterWhenUninterruptibly(
new Guard(monitor) {
@Override
Expand All @@ -108,6 +112,7 @@ public boolean isSatisfied() {
&& elementsOutstanding < maximumElementsOutstanding());
}
});
numQueued.decrementAndGet();
executeMonitorHeld(work, workBytes);
}

Expand Down Expand Up @@ -195,6 +200,10 @@ public String summaryHtml() {
builder.append(executor.getActiveCount());
builder.append("<br>/n");

builder.append("Queued Work awaiting execution: ");
builder.append(numQueued.get());
builder.append("<br>/n");

builder.append("Work Queue Size: ");
builder.append(elementsOutstanding);
builder.append("/");
Expand Down

0 comments on commit 7081b95

Please sign in to comment.