Skip to content

Commit

Permalink
DynamicThreadPool: better thread management
Browse files Browse the repository at this point in the history
1. Only gate thread can create new threads
2. Gate thread tries to wake up a worker before creating new one
3. Keep some pending workers around to minimize creation of new threads
4. Active/Created/Reserve thread stats
5. Name threads to ease memory dump analysis
  • Loading branch information
ezsilmar authored and Gregory LEOCADIE committed Oct 15, 2020
1 parent 48f4af5 commit 3b2896b
Showing 1 changed file with 44 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2144,20 +2144,28 @@ public static void OutputTrace(string s)

internal class DynamicThreadPool
{
private long _createdWorkers;
public long CreatedWorkers => _createdWorkers;

private int _aliveWorkers;
public int AliveWorkers => _aliveWorkers;

private int _reserveWorkers;
public int ReserveWorkers => _reserveWorkers;

private const int ReserveWorkersLimit = 5;
private const int WorkerTimeout = 20 * 1000;
private const int StartWorkerDelay = 500;

private readonly ConcurrentQueue<(WaitCallback callback, object state)> _queue;
private readonly ConcurrentStack<Worker> _pendingWorkers;

private long _lastTickCount;

public DynamicThreadPool(int minSize)
{
_queue = new ConcurrentQueue<(WaitCallback callback, object state)>();
_pendingWorkers = new ConcurrentStack<Worker>();

new Thread(GateThread) { IsBackground = true }.Start();
new Thread(GateThread) { IsBackground = true, Name = "DynamicThreadPoolGate" }.Start();

for (int i = 0; i < minSize; i++)
{
Expand All @@ -2167,87 +2175,70 @@ public DynamicThreadPool(int minSize)

public void QueueWorkItem(IThreadPoolWorkItem item)
{
QueueWorkItem(i => ((IThreadPoolWorkItem)i).Execute(), item);
QueueWorkItem(i => ((IThreadPoolWorkItem)i!).Execute(), item);
}

public void QueueWorkItem(WaitCallback callback, object state)
{
_queue.Enqueue((callback, state));
TryWakeUpWorker();
}

private bool TryWakeUpWorker()
{
if (_queue.IsEmpty)
{
return true;
}

// There's something in the queue. Let's try to wake up a worker.
while (_pendingWorkers.TryPop(out var worker))
{
if (worker.TryWakeUp())
{
// At least one worker is awake, the action will be dequeued eventually
return;
return true;
}

if (_queue.IsEmpty)
{
// The work got processed, no need to wake anybody anymore
return;
// No need to wake up anyone, as the items already got processed
return true;
}
}

// No worker, try to start one
StartWorkerIfNeeded();
// There's still something in the queue, and we failed to wake up a worker
return false;
}

private void GateThread()
{
// There's a subtle race-condition with workers: if an item get enqueued while the worker prepares to sleep, or right after the sleep timed out,
// the worker will be in the "Pending workers" queue, yet busy executing some work.
// It's possible that this work depends synchronously on another item enqueued to the same threadpool, and this same worker
// gets waken up to execute it, leading to a deadlock.
// The gate thread is there as a secondary mechanism to make the threadpool grow if it happens.
// The gate thread adds new worker to the thread pool when needed
while (true)
{
Thread.Sleep(StartWorkerDelay);
StartWorkerIfNeeded();
}
}

private void StartWorkerIfNeeded()
{
if (_queue.IsEmpty)
{
return;
}

while (true)
{
var tickCount = Environment.TickCount64;
var lastTickCount = _lastTickCount;

// Make sure enough time elapsed since we last started a worker
if (tickCount - _lastTickCount > StartWorkerDelay)
if (!TryWakeUpWorker())
{
if (Interlocked.CompareExchange(ref _lastTickCount, tickCount, lastTickCount) != lastTickCount)
{
// Another thread beat us, retry
continue;
}

StartWorker(WorkerTimeout);

return;
}

return;
}
}

private void StartWorker(int timeout)
{
new Thread(() => WorkerThread(timeout)) { IsBackground = true }.Start();
Interlocked.Increment(ref _createdWorkers);
new Thread(() => WorkerThread(timeout)) { IsBackground = true, Name = "DynamicThreadPoolWorker" }.Start();
}

private void WorkerThread(int timeout)
{
Interlocked.Increment(ref _aliveWorkers);
var worker = new Worker();

while (true)
{
// Execute items while the queue is not empty
if (TryExecuteItem())
{
continue;
Expand All @@ -2256,25 +2247,29 @@ private void WorkerThread(int timeout)
// Nothing left in the queue, prepare to sleep
worker.Event.Reset();
_pendingWorkers.Push(worker);

// We need to check the queue again before sleeping, as an item could have been added in the meantime.
if (TryExecuteItem())
if (worker.Event.Wait(timeout))
{
continue;
}

if (worker.Event.Wait(timeout))
// Nobody wakes us up for a long time. Let's decide if the thread pool should be scaled down
// Workers created at startup (minSize) will never reach this line as they have infinite wait timeout
var reserveCount = Interlocked.Increment(ref _reserveWorkers);
if (reserveCount <= ReserveWorkersLimit)
{
// Too early to scale down. This worker will wait "in reserve" until woken up
worker.Event.Wait();
Interlocked.Decrement(ref _reserveWorkers);
continue;
}

// The wait timed out. Scale down the threadpool
// ReserveWorkersLimit is reached, we can try to exit
Interlocked.Decrement(ref _reserveWorkers);
if (worker.TryExit())
{
Interlocked.Decrement(ref _aliveWorkers);
return;
}

// If we got there, it means we lost the race condition and got signaled as we tried to exit. Resume the loop.
}
}

Expand Down

0 comments on commit 3b2896b

Please sign in to comment.