Skip to content

Commit

Permalink
Merge pull request #4469 from acmorrow/newsched-dynamic-workers
Browse files Browse the repository at this point in the history
NewParallel only adds worker threads as executable tasks are discovered
  • Loading branch information
bdbaddog authored Feb 13, 2024
2 parents ef925ad + dca250f commit c03c104
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 49 deletions.
3 changes: 3 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ RELEASE VERSION/DATE TO BE FILLED IN LATER
- CacheDir writes no longer happen within the taskmaster critical section,
and therefore can run in parallel with both other CacheDir writes and the
taskmaster DAG walk.
- The NewParallel scheduler now only adds threads as new work requiring execution
is discovered, up to the limit set by -j. This should reduce resource utilization
when the achievable parallelism in the DAG is less than the -j limit.

From Mats Wichmann:
- Add support for Python 3.13 (as of alpha 2). So far only affects
Expand Down
3 changes: 3 additions & 0 deletions RELEASE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ IMPROVEMENTS
(Larger -j values)
- CacheDir writes no longer happen within the taskmaster critical section, and therefore
can run in parallel with both other CacheDir writes and the taskmaster DAG walk.
- The NewParallel scheduler now only adds threads as new work requiring execution
is discovered, up to the limit set by -j. This should reduce resource utilization
when the achievable parallelism in the DAG is less than the -j limit.


PACKAGING
Expand Down
36 changes: 23 additions & 13 deletions SCons/Taskmaster/Job.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ def __exit__(self, *args):

def __init__(self, taskmaster, num, stack_size) -> None:
self.taskmaster = taskmaster
self.num_workers = num
self.max_workers = num
self.stack_size = stack_size
self.interrupted = InterruptState()
self.workers = []
Expand All @@ -484,7 +484,7 @@ def __init__(self, taskmaster, num, stack_size) -> None:
# also protects access to our state that gets updated
# concurrently. The `can_search_cv` is associated with
# this mutex.
self.tm_lock = (threading.Lock if self.num_workers > 1 else NewParallel.FakeLock)()
self.tm_lock = (threading.Lock if self.max_workers > 1 else NewParallel.FakeLock)()

# Guarded under `tm_lock`.
self.jobs = 0
Expand All @@ -493,11 +493,11 @@ def __init__(self, taskmaster, num, stack_size) -> None:
# The `can_search_cv` is used to manage a leader /
# follower pattern for access to the taskmaster, and to
# awaken from stalls.
self.can_search_cv = (threading.Condition if self.num_workers > 1 else NewParallel.FakeCondition)(self.tm_lock)
self.can_search_cv = (threading.Condition if self.max_workers > 1 else NewParallel.FakeCondition)(self.tm_lock)

# The queue of tasks that have completed execution. The
# next thread to obtain `tm_lock`` will retire them.
self.results_queue_lock = (threading.Lock if self.num_workers > 1 else NewParallel.FakeLock)()
self.results_queue_lock = (threading.Lock if self.max_workers > 1 else NewParallel.FakeLock)()
self.results_queue = []

if self.taskmaster.trace:
Expand All @@ -516,22 +516,27 @@ def trace_message(self, message) -> None:
method_name = sys._getframe(1).f_code.co_name + "():"
thread_id=threading.get_ident()
self.trace.debug('%s.%s [Thread:%s] %s' % (type(self).__name__, method_name, thread_id, message))
# print('%-15s %s' % (method_name, message))

def start(self) -> None:
if self.num_workers == 1:
if self.max_workers == 1:
self._work()
else:
self._start_workers()
for worker in self.workers:
worker.join()
self.workers = []
self._start_worker()
while len(self.workers) > 0:
self.workers[0].join()
self.workers.pop(0)
self.taskmaster.cleanup()

def _start_workers(self) -> None:
def _maybe_start_worker(self) -> None:
if self.max_workers > 1 and len(self.workers) < self.max_workers:
if self.jobs >= len(self.workers):
self._start_worker()

def _start_worker(self) -> None:
prev_size = self._adjust_stack_size()
for _ in range(self.num_workers):
self.workers.append(NewParallel.Worker(self))
if self.trace:
self.trace_message("Starting new worker thread")
self.workers.append(NewParallel.Worker(self))
self._restore_stack_size(prev_size)

def _adjust_stack_size(self):
Expand Down Expand Up @@ -680,6 +685,11 @@ def _work(self):
self.trace_message("Found task requiring execution")
self.state = NewParallel.State.READY
self.can_search_cv.notify()
# This thread will be busy taking care of
# `execute`ing this task. If we haven't
# reached the limit, spawn a new thread to
# turn the crank and find the next task.
self._maybe_start_worker()

else:
# We failed to find a task, so this thread
Expand Down
3 changes: 1 addition & 2 deletions test/option/fixture/taskmaster_expected_new_parallel.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
Job.NewParallel._start_worker(): [Thread:XXXXX] Starting new worker thread
Job.NewParallel._work(): [Thread:XXXXX] Gained exclusive access
Job.NewParallel._work(): [Thread:XXXXX] Starting search
Job.NewParallel._work(): [Thread:XXXXX] Found 0 completed tasks to process
Expand Down Expand Up @@ -86,5 +87,3 @@ Taskmaster: No candidate anymore.
Job.NewParallel._work(): [Thread:XXXXX] Found no task requiring execution, and have no jobs: marking complete
Job.NewParallel._work(): [Thread:XXXXX] Gained exclusive access
Job.NewParallel._work(): [Thread:XXXXX] Completion detected, breaking from main loop
Job.NewParallel._work(): [Thread:XXXXX] Gained exclusive access
Job.NewParallel._work(): [Thread:XXXXX] Completion detected, breaking from main loop
Loading

0 comments on commit c03c104

Please sign in to comment.