From 642f5217560aa85dacf4443b75038c481f4ce29c Mon Sep 17 00:00:00 2001 From: Michael Sippel Date: Tue, 12 Sep 2023 22:23:05 +0200 Subject: [PATCH] factor AtomicBitfield & WorkerPool out from scheduler --- examples/2_functors.cpp | 1 + examples/mpi.cpp | 12 +- redGrapes/context.hpp | 15 +- redGrapes/dispatch/thread/execute.cpp | 4 +- redGrapes/dispatch/thread/local.hpp | 2 +- redGrapes/dispatch/thread/worker.cpp | 189 ++++++++++ redGrapes/dispatch/thread/worker.hpp | 177 ++-------- redGrapes/dispatch/thread/worker_pool.cpp | 81 +++++ redGrapes/dispatch/thread/worker_pool.hpp | 82 +++++ redGrapes/redGrapes.cpp | 42 ++- redGrapes/redGrapes.hpp | 51 +-- redGrapes/scheduler/default_scheduler.hpp | 399 +++++----------------- redGrapes/scheduler/event.cpp | 3 +- redGrapes/scheduler/event.hpp | 2 +- redGrapes/scheduler/scheduler.hpp | 23 +- redGrapes/task/queue.hpp | 1 + redGrapes/task/task.hpp | 1 - redGrapes/task/task_space.cpp | 7 +- redGrapes/task/task_space.hpp | 7 +- redGrapes/util/bitfield.hpp | 202 +++++++++++ redGrapesConfig.cmake | 2 + 21 files changed, 767 insertions(+), 536 deletions(-) create mode 100644 redGrapes/dispatch/thread/worker.cpp create mode 100644 redGrapes/dispatch/thread/worker_pool.cpp create mode 100644 redGrapes/dispatch/thread/worker_pool.hpp create mode 100644 redGrapes/util/bitfield.hpp diff --git a/examples/2_functors.cpp b/examples/2_functors.cpp index 3255cca9..f0f3be6f 100644 --- a/examples/2_functors.cpp +++ b/examples/2_functors.cpp @@ -18,6 +18,7 @@ int square (int x) int main() { + spdlog::set_level(spdlog::level::trace); redGrapes::init(1); fmt::print( diff --git a/examples/mpi.cpp b/examples/mpi.cpp index ea0ebc2c..a5d64b73 100644 --- a/examples/mpi.cpp +++ b/examples/mpi.cpp @@ -40,14 +40,18 @@ int main() { //spdlog::set_pattern("[thread %t] %^[%l]%$ %v"); //spdlog::set_level( spdlog::level::trace ); + /* int prov; MPI_Init_thread( nullptr, nullptr, MPI_THREAD_MULTIPLE, &prov ); assert( prov == MPI_THREAD_MULTIPLE ); */ + MPI_Init( nullptr, nullptr ); - auto default_scheduler = std::make_shared( 2 ); + rg::init_allocator(4); + + auto default_scheduler = std::make_shared(); auto mpi_request_pool = std::make_shared(); auto mpi_fifo = std::make_shared(); @@ -60,11 +64,11 @@ int main() rg::dispatch::thread::execute_task( *task ); }; - rg::init( - rg::scheduler::make_tag_match_scheduler( ) + rg::init(4, + rg::scheduler::make_tag_match_scheduler() .add({}, default_scheduler) .add({ SCHED_MPI }, mpi_fifo)); - + // initialize MPI config rg::IOResource< MPIConfig > mpi_config; rg::emplace_task( diff --git a/redGrapes/context.hpp b/redGrapes/context.hpp index 1f9712ab..56131e32 100644 --- a/redGrapes/context.hpp +++ b/redGrapes/context.hpp @@ -1,9 +1,7 @@ #pragma once #include -#include - -#include +#include namespace redGrapes { @@ -11,6 +9,16 @@ namespace redGrapes struct Task; struct TaskSpace; +namespace dispatch { +namespace thread{ +struct WorkerPool; +} +} + +namespace scheduler { +struct IScheduler; +} + /*! global context */ extern thread_local Task * current_task; @@ -18,6 +26,7 @@ extern thread_local std::function< void () > idle; extern std::shared_ptr< TaskSpace > top_space; extern std::shared_ptr< scheduler::IScheduler > top_scheduler; +extern std::shared_ptr< dispatch::thread::WorkerPool > worker_pool; unsigned scope_depth(); diff --git a/redGrapes/dispatch/thread/execute.cpp b/redGrapes/dispatch/thread/execute.cpp index 12a60bea..7aa1e16d 100644 --- a/redGrapes/dispatch/thread/execute.cpp +++ b/redGrapes/dispatch/thread/execute.cpp @@ -1,4 +1,4 @@ -/* Copyright 2022 Michael Sippel +/* Copyright 2022-2023 Michael Sippel * * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this @@ -21,7 +21,7 @@ namespace dispatch namespace thread { -thread_local scheduler::WakerID current_waker_id; +thread_local scheduler::WakerId current_waker_id; thread_local std::shared_ptr< WorkerThread > current_worker; void execute_task( Task & task ) diff --git a/redGrapes/dispatch/thread/local.hpp b/redGrapes/dispatch/thread/local.hpp index e5f2e7c4..24e5cc06 100644 --- a/redGrapes/dispatch/thread/local.hpp +++ b/redGrapes/dispatch/thread/local.hpp @@ -17,7 +17,7 @@ namespace thread struct WorkerThread; -extern thread_local scheduler::WakerID current_waker_id; +extern thread_local scheduler::WakerId current_waker_id; extern thread_local std::shared_ptr< WorkerThread > current_worker; } // namespace thread diff --git a/redGrapes/dispatch/thread/worker.cpp b/redGrapes/dispatch/thread/worker.cpp new file mode 100644 index 00000000..0da85734 --- /dev/null +++ b/redGrapes/dispatch/thread/worker.cpp @@ -0,0 +1,189 @@ +/* Copyright 2020-2023 Michael Sippel + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#include +#include + +namespace redGrapes +{ +namespace dispatch +{ +namespace thread +{ + +WorkerThread::WorkerThread( WorkerId worker_id ) + : id( worker_id ), + thread( + [this] { + /* setup membind- & cpubind policies using hwloc + */ + this->cpubind(); + this->membind(); + + /* since we are in a worker, there should always + * be a task running (we always have a parent task + * and therefore yield() guarantees to do + * a context-switch instead of idling + */ + redGrapes::idle = [this] { + throw std::runtime_error("idle in worker thread!"); + }; + + /* wait for start-flag to be triggerd in order + * to avoid premature access to `shared_from_this` + */ + while( ! m_start.load(std::memory_order_consume) ) + cv.wait(); + + /* initialize thread-local variables + */ + current_worker = this->shared_from_this(); + current_waker_id = this->get_waker_id(); + memory::current_arena = this->get_worker_id(); + + /* execute tasks until stop() + */ + this->work_loop(); + + SPDLOG_TRACE("Worker Finished!"); + } + ) +{ +} + +void WorkerThread::start() +{ + m_start.store(true, std::memory_order_release); + wake(); +} + +void WorkerThread::stop() +{ + SPDLOG_TRACE("Worker::stop()"); + m_stop.store(true, std::memory_order_release); + wake(); + thread.join(); +} + +void WorkerThread::cpubind() +{ + hwloc_obj_t obj = hwloc_get_obj_by_type(topology, HWLOC_OBJ_PU, this->id); + + if( hwloc_set_cpubind(topology, obj->cpuset, HWLOC_CPUBIND_THREAD | HWLOC_CPUBIND_STRICT) ) + { + char *str; + int error = errno; + hwloc_bitmap_asprintf(&str, obj->cpuset); + spdlog::warn("Couldn't cpubind to cpuset {}: {}\n", str, strerror(error)); + free(str); + } +} + +void WorkerThread::membind() +{ + hwloc_obj_t obj = hwloc_get_obj_by_type(topology, HWLOC_OBJ_PU, this->id); + + if( hwloc_set_membind(topology, obj->cpuset, HWLOC_MEMBIND_BIND, HWLOC_MEMBIND_THREAD | HWLOC_MEMBIND_STRICT ) ) + { + char *str; + int error = errno; + hwloc_bitmap_asprintf(&str, obj->cpuset); + spdlog::warn("Couldn't membind to cpuset {}: {}\n", str, strerror(error)); + free(str); + } +} + +void WorkerThread::work_loop() +{ + SPDLOG_TRACE("Worker {} start work_loop()", id); + while( ! m_stop.load(std::memory_order_consume) ) + { + while( Task * task = this->gather_task() ) + { + worker_pool->set_worker_state( id, dispatch::thread::WorkerState::BUSY ); + dispatch::thread::execute_task( *task ); + } + + worker_pool->set_worker_state( id, dispatch::thread::WorkerState::BUSY ); + + if( !m_stop.load(std::memory_order_consume) ) + cv.wait(); + } + SPDLOG_TRACE("Worker {} end work_loop()", id); +} + +Task * WorkerThread::gather_task() +{ + Task * task = nullptr; + + /* STAGE 1: + * + * first, execute all tasks in the ready queue + */ + SPDLOG_TRACE("Worker {}: consume ready queue", id); + if( task = ready_queue.pop() ) + return task; + + /* STAGE 2: + * + * after the ready queue is fully consumed, + * try initializing new tasks until one + * of them is found to be ready + */ + SPDLOG_TRACE("Worker {}: try init new tasks", id); + while( this->init_dependencies( task, true ) ) + if( task ) + return task; + + /* set worker state to signal that we are requesting tasks + */ + worker_pool->set_worker_state( id, dispatch::thread::WorkerState::AVAILABLE ); + +#ifndef ENABLE_WORKSTEALING +#define ENABLE_WORKSTEALING 1 +#endif + +#if ENABLE_WORKSTEALING + + /* STAGE 3: + * + * after all tasks are workstealing + */ + SPDLOG_TRACE("Worker {}: try to steal tasks", id); + task = top_scheduler->steal_task( *this ); + +#endif + + return task; +} + +bool WorkerThread::init_dependencies( Task* & t, bool claimed ) +{ + if(Task * task = emplacement_queue.pop()) + { + SPDLOG_DEBUG("init task {}", task->task_id); + + task->pre_event.up(); + task->init_graph(); + + if( task->get_pre_event().notify( claimed ) ) + t = task; + else + { + t = nullptr; + } + + return true; + } + else + return false; +} + +} // namespace thread +} // namespace dispatch +} // namespace redGrapes + diff --git a/redGrapes/dispatch/thread/worker.hpp b/redGrapes/dispatch/thread/worker.hpp index 48bff1cb..19a5141c 100644 --- a/redGrapes/dispatch/thread/worker.hpp +++ b/redGrapes/dispatch/thread/worker.hpp @@ -1,4 +1,4 @@ -/* Copyright 2020 Michael Sippel +/* Copyright 2020-2023 Michael Sippel * * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this @@ -13,13 +13,9 @@ #include #include -#include -#include -#include -#include +#include #include #include -#include namespace redGrapes { @@ -31,9 +27,18 @@ namespace dispatch namespace thread { +using WorkerId = unsigned; +enum WorkerState { + BUSY = 0, + AVAILABLE = 1 +}; + + + + void execute_task( Task & task_id ); -extern thread_local scheduler::WakerID current_waker_id; +extern thread_local scheduler::WakerId current_waker_id; extern thread_local std::shared_ptr< WorkerThread > current_worker; /*! @@ -45,6 +50,7 @@ extern thread_local std::shared_ptr< WorkerThread > current_worker; struct WorkerThread : std::enable_shared_from_this { private: + WorkerId id; /*! * if true, the thread shall start @@ -58,13 +64,9 @@ struct WorkerThread : std::enable_shared_from_this */ std::atomic_bool m_stop{ false }; -public: - unsigned id; std::atomic task_count{ 0 }; - std::atomic_bool ready{false}; - //! condition variable for waiting if queue is empty CondVar cv; @@ -75,126 +77,39 @@ struct WorkerThread : std::enable_shared_from_this task::Queue ready_queue{ queue_capacity }; std::thread thread; -public: - WorkerThread( unsigned id ) : - id( id ), - thread( - [this] - { - ready = true; - - while( ! m_start.load(std::memory_order_consume) ) - cv.wait(); - - this->cpubind(); - this->membind(); - - /* since we are in a worker, there should always - * be a task running (we always have a parent task - * and therefore yield() guarantees to do - * a context-switch instead of idling - */ - redGrapes::idle = - [this] - { - throw std::runtime_error("idle in worker thread!"); - }; - - current_worker = this->shared_from_this(); - current_waker_id = this->get_waker_id(); - memory::current_arena = get_worker_id(); - - while( ! m_stop.load(std::memory_order_consume) ) - { - SPDLOG_TRACE("Worker: work on queue"); - - while( Task * task = ready_queue.pop() ) - dispatch::thread::execute_task( *task ); - - if( Task * task = redGrapes::schedule( *this ) ) - dispatch::thread::execute_task( *task ); - - else if( !m_stop.load(std::memory_order_consume) ) - { - SPDLOG_TRACE("worker sleep"); - //TRACE_EVENT("Worker", "sleep"); - cv.wait(); - } - } - - SPDLOG_TRACE("Worker Finished!"); - } - ) - { - } - ~WorkerThread() - { - } + WorkerThread( WorkerId id ); - void cpubind() - { - hwloc_obj_t obj = hwloc_get_obj_by_type(topology, HWLOC_OBJ_PU, this->id); - - if( hwloc_set_cpubind(topology, obj->cpuset, HWLOC_CPUBIND_THREAD | HWLOC_CPUBIND_STRICT) ) - { - char *str; - int error = errno; - hwloc_bitmap_asprintf(&str, obj->cpuset); - spdlog::warn("Couldn't cpubind to cpuset {}: {}\n", str, strerror(error)); - free(str); - } - } + inline unsigned get_worker_id() { return id; } + inline scheduler::WakerId get_waker_id() { return id + 1; } + inline bool wake() { return cv.notify(); } - void membind() - { - hwloc_obj_t obj = hwloc_get_obj_by_type(topology, HWLOC_OBJ_PU, this->id); - - if( hwloc_set_membind(topology, obj->cpuset, HWLOC_MEMBIND_BIND, HWLOC_MEMBIND_THREAD | HWLOC_MEMBIND_STRICT ) ) - { - char *str; - int error = errno; - hwloc_bitmap_asprintf(&str, obj->cpuset); - spdlog::warn("Couldn't membind to cpuset {}: {}\n", str, strerror(error)); - free(str); - } - } + void start(); + void stop(); - inline unsigned get_worker_id() - { - return id; - } - - inline scheduler::WakerID get_waker_id() - { - return id + 1; - } + void cpubind(); + void membind(); - inline bool wake() - { - return cv.notify(); - } - - void start() + /* adds a new task to the emplacement queue + * and wakes up thread to kickstart execution + */ + inline void emplace_task( Task * task ) { - m_start.store(true, std::memory_order_release); + emplacement_queue.push( task ); wake(); } - void stop() - { - SPDLOG_TRACE("Worker::stop()"); - m_stop.store(true, std::memory_order_release); - wake(); - thread.join(); - } +private: - void emplace_task( Task * task ) - { - emplacement_queue.push( task ); - wake(); - } + /* repeatedly try to find and execute tasks + * until stop-flag is triggered by stop() + */ + void work_loop(); + /* find a task that shall be executed next + */ + Task * gather_task(); + /*! take a task from the emplacement queue and initialize it, * @param t is set to the task if the new task is ready, * @param t is set to nullptr if the new task is blocked. @@ -203,27 +118,7 @@ struct WorkerThread : std::enable_shared_from_this * * @return false if queue is empty */ - bool init_dependencies( Task* & t, bool claimed = true ) - { - if(Task * task = emplacement_queue.pop()) - { - SPDLOG_DEBUG("init task {}", task->task_id); - - task->pre_event.up(); - task->init_graph(); - - if( task->get_pre_event().notify( claimed ) ) - t = task; - else - { - t = nullptr; - } - - return true; - } - else - return false; - } + bool init_dependencies( Task* & t, bool claimed = true ); }; } // namespace thread diff --git a/redGrapes/dispatch/thread/worker_pool.cpp b/redGrapes/dispatch/thread/worker_pool.cpp new file mode 100644 index 00000000..f5c1b0dd --- /dev/null +++ b/redGrapes/dispatch/thread/worker_pool.cpp @@ -0,0 +1,81 @@ +/* Copyright 2022-2023 Michael Sippel + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ +#include +#include + +namespace redGrapes +{ +namespace dispatch +{ +namespace thread +{ + +WorkerPool::WorkerPool( size_t n_workers ) + : worker_state( n_workers ) +{ + workers.reserve( n_workers ); + + unsigned n_pus = hwloc_get_nbobjs_by_type(topology, HWLOC_OBJ_PU); + if( n_workers > n_pus ) + spdlog::warn("{} worker-threads requested, but only {} PUs available!", n_workers, n_pus); + + SPDLOG_INFO("create WorkerPool with {} workers", n_workers); + for( size_t i = 0; i < n_workers; ++i ) + { + // allocate worker with id `i` on arena `i`, + auto worker = memory::alloc_shared_bind< dispatch::thread::WorkerThread >( i, i ); + workers.emplace_back( worker ); + } + + redGrapes::dispatch::thread::current_waker_id = 0; +} + +void WorkerPool::start() +{ + for( auto & worker : workers ) + worker->start(); +} +void WorkerPool::stop() +{ + for( auto & worker : workers ) + worker->stop(); +} + +int WorkerPool::find_free_worker() +{ + TRACE_EVENT("Scheduler", "find_worker"); + + SPDLOG_TRACE("find worker..."); + + unsigned start_idx = 0; + if(auto w = dispatch::thread::current_worker) + start_idx = w->get_worker_id(); + + std::optional idx = + this->probe_worker_by_state( + [this](unsigned idx) -> std::optional + { + if(set_worker_state(idx, WorkerState::BUSY)) + return idx; + else + return std::nullopt; + }, + dispatch::thread::WorkerState::AVAILABLE, // find a free worker + start_idx, + false); + + if( idx ) + return *idx; + else + // no free worker found, + return -1; +} + +} // namespace thread +} // namespace dispatch +} // namespace redGrapes + diff --git a/redGrapes/dispatch/thread/worker_pool.hpp b/redGrapes/dispatch/thread/worker_pool.hpp new file mode 100644 index 00000000..27e5468d --- /dev/null +++ b/redGrapes/dispatch/thread/worker_pool.hpp @@ -0,0 +1,82 @@ +/* Copyright 2022-2023 Michael Sippel + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ +#pragma once + +#include +#include + +namespace redGrapes +{ +namespace dispatch +{ +namespace thread +{ + +struct WorkerPool +{ + WorkerPool( size_t n_workers = 1 ); + + /* get the number of workers in this pool + */ + inline size_t size() + { + return workers.size(); + } + + /* signals all workers to start executing tasks + */ + void start(); + + /* signals all workers that no new tasks will be added + */ + void stop(); + + inline WorkerThread & get_worker( WorkerId worker_id ) + { + return *workers[ worker_id ]; + } + + inline WorkerState get_worker_state( WorkerId worker_id ) + { + return worker_state.get(worker_id) ? WorkerState::AVAILABLE : WorkerState::BUSY; + } + + /* return true on success + */ + inline bool set_worker_state( WorkerId worker_id, WorkerState state ) + { + return worker_state.set( worker_id, state ) != state; + } + + template + inline std::optional< T > + probe_worker_by_state( + F && f, + bool expected_worker_state, + unsigned start_worker_idx, + bool exclude_start = true) + { + return worker_state.template probe_by_value( std::move(f), expected_worker_state, start_worker_idx ); + } + + /*! + * tries to find an available worker, but potentially + * returns a busy worker if no free worker is available + * + * @return worker_id + */ + int find_free_worker(); + +private: + std::vector< std::shared_ptr > workers; + AtomicBitfield worker_state; +}; + +} // namespace thread +} // namespace dispatch +} // namespace redGrapes + diff --git a/redGrapes/redGrapes.cpp b/redGrapes/redGrapes.cpp index afddbc8a..f23734f9 100644 --- a/redGrapes/redGrapes.cpp +++ b/redGrapes/redGrapes.cpp @@ -7,11 +7,14 @@ #include #include +#include #include -#include #include -#include +#include +#include + +#include #include @@ -32,6 +35,7 @@ thread_local unsigned current_arena; } // namespace memory std::shared_ptr< TaskSpace > top_space; +std::shared_ptr< dispatch::thread::WorkerPool > worker_pool; std::shared_ptr< scheduler::IScheduler > top_scheduler; #if REDGRAPES_ENABLE_TRACE @@ -96,14 +100,14 @@ std::vector> backtrace() return bt; } -void init( size_t n_threads ) +void init_allocator( size_t n_arenas ) { hwloc_topology_init(&topology); hwloc_topology_load(topology); // use one arena with 8 MiB chunksize per worker size_t chunk_size = 8 * 1024 * 1024 - sizeof(memory::BumpAllocChunk); - memory::alloc = std::make_shared< memory::MultiArenaAlloc >( chunk_size, n_threads ); + memory::alloc = std::make_shared< memory::MultiArenaAlloc >( chunk_size, n_arenas ); #if REDGRAPES_ENABLE_TRACE perfetto::TracingInitArgs args; @@ -113,10 +117,22 @@ void init( size_t n_threads ) tracing_session = StartTracing(); #endif +} +void init( size_t n_workers, std::shared_ptr scheduler) +{ top_space = std::make_shared(); - top_scheduler = std::make_shared(n_threads); - top_scheduler->start(); + worker_pool = std::make_shared( n_workers ); + + top_scheduler = scheduler; + + worker_pool->start(); +} + +void init( size_t n_workers ) +{ + init_allocator( n_workers ); + init(n_workers, std::make_shared()); } /*! wait until all tasks in the current task space finished @@ -130,7 +146,8 @@ void barrier() void finalize() { barrier(); - top_scheduler->stop(); + worker_pool->stop(); + top_scheduler.reset(); top_space.reset(); @@ -157,17 +174,6 @@ void yield( scheduler::EventPtr event ) } } -Task * schedule( dispatch::thread::WorkerThread & worker ) -{ - auto sched = top_scheduler; - auto space = top_space; - - if( sched && space ) - return sched->schedule(worker); - - return nullptr; -} - //! apply a patch to the properties of the currently running task void update_properties(typename TaskProperties::Patch const& patch) { diff --git a/redGrapes/redGrapes.hpp b/redGrapes/redGrapes.hpp index 820ac2d6..deb8f7ea 100644 --- a/redGrapes/redGrapes.hpp +++ b/redGrapes/redGrapes.hpp @@ -7,9 +7,9 @@ #pragma once + +#include #include -#include -#include #include #include #include @@ -17,36 +17,40 @@ #include #include #include +#include +#include + #include #include namespace redGrapes { - /* USER INTERFACE */ - void init(std::shared_ptr scheduler); - void init(size_t n_threads = std::thread::hardware_concurrency()); - void finalize(); +void init_allocator( size_t n_arenas ); +void init( size_t n_workers, std::shared_ptr scheduler ); +void init( size_t n_workers = std::thread::hardware_concurrency() ); - //! wait until all tasks in the current task space finished - void barrier(); +void finalize(); - //! pause the currently running task at least until event is reached - void yield(scheduler::EventPtr event); +//! wait until all tasks in the current task space finished +void barrier(); - //! apply a patch to the properties of the currently running task - void update_properties(typename TaskProperties::Patch const& patch); +//! pause the currently running task at least until event is reached +void yield(scheduler::EventPtr event); - //! get backtrace from currently running task - std::vector> backtrace(); +//! apply a patch to the properties of the currently running task +void update_properties(typename TaskProperties::Patch const& patch); - /*! Create an event on which the termination of the current task depends. - * A task must currently be running. - * - * @return Handle to flag the event with `reach_event` later. - * nullopt if there is no task running currently - */ - std::optional create_event(); +//! get backtrace from currently running task +std::vector> backtrace(); + +/*! Create an event on which the termination of the current task depends. + * A task must currently be running. + * + * @return Handle to flag the event with `reach_event` later. + * nullopt if there is no task running currently + */ +std::optional create_event(); /*! create a new task, as child of the currently running task (if there is one) * @@ -65,10 +69,11 @@ inline auto emplace_task(Callable&& f, Args&&... args) { static std::atomic< unsigned int > next_worker(0); - // Fixme: hardcoded 64 - unsigned worker_id = next_worker.fetch_add(1) % 64; + dispatch::thread::WorkerId worker_id = next_worker.fetch_add(1) % worker_pool->size(); memory::current_arena = worker_id; + SPDLOG_INFO("emplace task to worker {}", worker_id); + return std::move(TaskBuilder< Callable, Args... >( std::move(f), std::forward(args)... )); } diff --git a/redGrapes/scheduler/default_scheduler.hpp b/redGrapes/scheduler/default_scheduler.hpp index 5ba91b1f..7fc9fe1e 100644 --- a/redGrapes/scheduler/default_scheduler.hpp +++ b/redGrapes/scheduler/default_scheduler.hpp @@ -5,9 +5,13 @@ #include #include +#include +#include + +#include #include + #include -#include namespace redGrapes { @@ -15,36 +19,15 @@ namespace scheduler { /* - * Combines a FIFO with worker threads + * Uses simple round-robin algorithm to distribute tasks to workers + * and implements work-stealing */ struct DefaultScheduler : public IScheduler { CondVar cv; - unsigned n_workers; - - //! bit is true if worker available, false if worker busy - static constexpr uint64_t bitfield_len = 64; - std::vector< std::atomic< uint64_t > > worker_state; - std::vector> threads; - DefaultScheduler( size_t n_threads = std::thread::hardware_concurrency() ) - : n_workers( n_threads ) - , worker_state( ceil_div(n_threads, bitfield_len) ) + DefaultScheduler() { - threads.reserve( n_threads ); - - unsigned n_pus = hwloc_get_nbobjs_by_type(topology, HWLOC_OBJ_PU); - if( n_threads > n_pus ) - spdlog::warn("{} worker-threads requested, but only {} PUs available!", n_threads, n_pus); - - for( size_t i = 0; i < n_threads; ++i ) - { - auto worker = memory::alloc_shared_bind< dispatch::thread::WorkerThread >( i, i ); - threads.emplace_back( worker ); - } - - redGrapes::dispatch::thread::current_waker_id = 0; - // if not configured otherwise, // the main thread will simply wait redGrapes::idle = @@ -55,224 +38,72 @@ struct DefaultScheduler : public IScheduler }; } - /* signals all workers to start executing tasks + /* send the new task to a worker */ - void start() - { - bool all_ready = false; - while( ! all_ready ) - { - all_ready = true; - for( auto & worker : threads ) - all_ready &= worker->ready.load(); - } - - for( auto & worker : threads ) - worker->start(); - } - - /* signals all workers that no new tasks will be added - */ - void stop() - { - for( auto & worker : threads ) - worker->stop(); - } - - inline bool is_worker_free( unsigned worker_id ) - { - return worker_state[ worker_id/64 ] & ((uint64_t)1 << (worker_id%64)); - } - inline bool is_worker_busy( unsigned worker_id ) - { - return !is_worker_free( worker_id ); - } - - /* sets the worker-state bitfield to free for the specified worker - */ - inline void free_worker( unsigned id ) - { - SPDLOG_TRACE("free worker", id); - //assert( id < n_workers ); - worker_state[ id / 64 ].fetch_or( (uint64_t)1 << ( id % 64 ), std::memory_order_release); - } - - /* try to allocate worker with id - * and mark it to be busy - * - * @return true if worker was free and is now allocated, - * false if worker is already busy - */ - inline bool alloc_worker( unsigned id ) + void emplace_task( Task & task ) { - unsigned j = id / 64; - unsigned k = id % 64; + // todo: properly store affinity information in task + dispatch::thread::WorkerId worker_id = task->arena_id % worker_pool->size(); - uint64_t old_val = worker_state[j].fetch_and(~((uint64_t)1 << k), std::memory_order_acquire); - return old_val & ((uint64_t)1 << k); + worker_pool->get_worker(worker_id).emplace_task( &task ); } - // find index of first set bit - // taken from https://graphics.stanford.edu/~seander/bithacks.html#ZerosOnRightParallel - inline unsigned int first_one_idx( uint64_t v ) - { - unsigned int c = 64; // c will be the number of zero bits on the right - v &= -int64_t(v); - if (v) c--; - if (v & 0x00000000FFFFFFFF) c -= 32; - if (v & 0x0000FFFF0000FFFF) c -= 16; - if (v & 0x00FF00FF00FF00FF) c -= 8; - if (v & 0x0F0F0F0F0F0F0F0F) c -= 4; - if (v & 0x3333333333333333) c -= 2; - if (v & 0x5555555555555555) c -= 1; - - return c; - } - - /*! calculates ceil( a / b ) + /* send this already existing, + * but only through follower-list so it is not assigned to a worker yet. + * since this task is now ready, send find a worker for it */ - inline uint64_t ceil_div( uint64_t a, uint64_t b ) + void activate_task( Task & task ) { - return (a+b-1)/b; - } + //! worker id to use in case all workers are busy + static thread_local std::atomic< unsigned int > next_worker(dispatch::thread::current_worker ? + dispatch::thread::current_worker->get_worker_id() + 1 : 0); + TRACE_EVENT("Scheduler", "activate_task"); + SPDLOG_TRACE("DefaultScheduler::activate_task({})", task.task_id); - template - inline std::optional< T > find_worker_in_field( unsigned j, uint64_t mask, bool expected_worker_state, F && f ) - { - while( true ) + int worker_id = worker_pool->find_free_worker(); + if( worker_id < 0 ) { - uint64_t field = expected_worker_state ? - uint64_t(worker_state[j]) : ~uint64_t(worker_state[j]); - - uint64_t masked_field = field & mask; - if( masked_field == 0 ) - break; - - // find index of first worker - unsigned int k = first_one_idx( masked_field ); - - if( k < bitfield_len ) - { - unsigned int idx = j * bitfield_len + k; - //spdlog::info("find worker: j = {}, k = {}, idx= {}", j , k, idx); - - if( std::optional x = f( idx ) ) - return x; - - // dont check this worker again - mask &= ~(uint64_t(1) << k); - } + worker_id = next_worker.fetch_add(1) % worker_pool->size(); + if( worker_id == dispatch::thread::current_worker->get_worker_id() ) + worker_id = next_worker.fetch_add(1) % worker_pool->size(); } - return std::nullopt; + worker_pool->get_worker( worker_id ).ready_queue.push(&task); + worker_pool->set_worker_state( worker_id, dispatch::thread::WorkerState::BUSY ); + worker_pool->get_worker( worker_id ).wake(); } - template - inline std::optional< T > - find_worker( - F && f, - bool expected_worker_state, - unsigned start_worker_idx, - bool exclude_start = true) - { - uint64_t start_field_idx = start_worker_idx / bitfield_len; - uint64_t first_mask = (uint64_t(1) << (start_worker_idx%bitfield_len)) - 1; - - { - uint64_t mask = ~first_mask; - if( start_field_idx == worker_state.size() - 1 && n_workers % bitfield_len != 0 ) - mask &= (uint64_t(1) << (n_workers % bitfield_len)) - 1; - - if( exclude_start ) - mask &= ~(uint64_t(1) << (start_worker_idx%bitfield_len)); - - if( auto x = find_worker_in_field( start_field_idx, mask, expected_worker_state, f ) ) - return x; - } - - for( - uint64_t b = 1; - b < ceil_div(n_workers, bitfield_len); - ++b - ) { - uint64_t field_idx = (start_field_idx + b) % worker_state.size(); - uint64_t mask = ~0; - - if( field_idx == worker_state.size() - 1 && n_workers % bitfield_len != 0 ) - mask &= (uint64_t(1) << (n_workers % bitfield_len)) - 1; - - if( auto x = find_worker_in_field( field_idx, mask, expected_worker_state, f ) ) - return x; - } - - { - uint64_t mask = first_mask; - if( start_field_idx == worker_state.size() - 1 && n_workers % bitfield_len != 0 ) - mask &= (uint64_t(1) << (n_workers % bitfield_len)) - 1; - if( auto x = find_worker_in_field( start_field_idx, mask, expected_worker_state, f ) ) - return x; - } - - return std::nullopt; - } - - /* - * try to find an available worker, - * returns a busy worker if no free worker is available - * @return worker_id - */ - inline int find_free_worker() - { - TRACE_EVENT("Scheduler", "find_worker"); - - SPDLOG_TRACE("find worker..."); - - unsigned start_idx = 0; - if(auto w = dispatch::thread::current_worker) - start_idx = w->get_worker_id(); - - std::optional idx = find_worker( - [this](unsigned idx) -> std::optional - { - if(alloc_worker(idx)) - return idx; - else - return std::nullopt; - }, - true, // find a free worker - start_idx, - false); - - if( idx ) - return *idx; - else - // no free worker found, - return -1; - } - - /* tries to find a ready task in any queue of other workers - * and removes it from the queue + /* tries to find a task with uninialized dependency edges in the + * task-graph in the emplacement queues of other workers + * and removes it from there */ Task * steal_new_task( dispatch::thread::WorkerThread & worker ) { - std::optional task = find_worker( - [this, &worker](unsigned idx) -> std::optional + std::optional task = worker_pool->probe_worker_by_state( + [&worker](unsigned idx) -> std::optional { - if(Task* t = this->threads[idx]->emplacement_queue.pop()) + // we have a candidate of a busy worker, + // now check its queue + if(Task* t = worker_pool->get_worker(idx).emplacement_queue.pop()) return t; + + // otherwise check own queue again else if(Task* t = worker.emplacement_queue.pop()) return t; + + // else continue search else return std::nullopt; }, - false, // find a busy worker + + // find a busy worker + dispatch::thread::WorkerState::BUSY, + + // start next to current worker worker.get_worker_id()); - if(task) - return *task; - else - return nullptr; + return task ? *task : nullptr; } /* tries to find a ready task in any queue of other workers @@ -280,145 +111,85 @@ struct DefaultScheduler : public IScheduler */ Task * steal_ready_task( dispatch::thread::WorkerThread & worker ) { - std::optional task = find_worker( - [this, &worker](unsigned idx) -> std::optional + std::optional task = worker_pool->probe_worker_by_state( + [&worker](unsigned idx) -> std::optional { - if(Task* t = this->threads[idx]->ready_queue.pop()) + // we have a candidate of a busy worker, + // now check its queue + if(Task* t = worker_pool->get_worker(idx).ready_queue.pop()) return t; + + // otherwise check own queue again else if(Task* t = worker.ready_queue.pop()) return t; + // else continue search else return std::nullopt; }, - false, // find a busy worker + + // find a busy worker + dispatch::thread::WorkerState::BUSY, + + // start next to current worker worker.get_worker_id()); - if( task ) - return *task; - else - return nullptr; + return task ? *task : nullptr; } // give worker a ready task if available // @return task if a new task was found, nullptr otherwise - Task * schedule( dispatch::thread::WorkerThread & worker ) + Task * steal_task( dispatch::thread::WorkerThread & worker ) { unsigned worker_id = worker.get_worker_id(); - TRACE_EVENT("Scheduler", "schedule"); - SPDLOG_INFO("schedule worker {}", worker_id); - - Task * task = nullptr; - while( worker.init_dependencies( task, true ) ) - { - if( task ) - return task; - } - - free_worker( worker_id ); + SPDLOG_INFO("steal task for worker {}", worker_id); - #ifndef ENABLE_WORKSTEALING - #define ENABLE_WORKSTEALING 1 - #endif - - #if ENABLE_WORKSTEALING - - if( task = steal_ready_task( worker ) ) + if( Task * task = steal_ready_task( worker ) ) { - alloc_worker( worker_id ); + worker_pool->set_worker_state( worker_id, dispatch::thread::WorkerState::BUSY ); return task; } - if( task = steal_new_task( worker ) ) + if( Task * task = steal_new_task( worker ) ) { task->pre_event.up(); task->init_graph(); if( task->get_pre_event().notify( true ) ) { - alloc_worker( worker_id ); + worker_pool->set_worker_state( worker_id, dispatch::thread::WorkerState::BUSY ); return task; } } - #endif - return nullptr; } - void emplace_task( Task & task ) - { - unsigned worker_id = task->arena_id % n_workers; - auto & worker = threads[ worker_id ]; - worker->emplace_task( &task ); - } - - void activate_task( Task & task ) - { - //! worker id to use in case all workers are busy - static thread_local std::atomic< unsigned int > next_worker(dispatch::thread::current_worker ? - dispatch::thread::current_worker->get_worker_id() + 1 : 0); - - TRACE_EVENT("Scheduler", "activate_task"); - SPDLOG_TRACE("DefaultScheduler::activate_task({})", task.task_id); - - int worker_id = find_free_worker(); - if( worker_id < 0 ) - { - worker_id = next_worker.fetch_add(1) % n_workers; - if( worker_id == dispatch::thread::current_worker->get_worker_id() ) - worker_id = next_worker.fetch_add(1) % n_workers; - } - - threads[ worker_id ]->ready_queue.push(&task); - alloc_worker(worker_id); - - threads[ worker_id ]->wake(); - } - - bool wake_one_worker() + /* Wakeup some worker or the main thread + * + * WakerId = 0 for main thread + * WakerId = WorkerId + 1 + * + * @return true if thread was indeed asleep + */ + bool wake( WakerId id = 0 ) { - TRACE_EVENT("Scheduler", "wake_one_worker"); - SPDLOG_INFO("DefaultScheduler: wake_one_worker()"); - - int worker_id = find_free_worker(); - if( worker_id < 0 ) - { - // FIXME: this is a workaround for a racecondition - // a worker may be searching for a task and thus not marked free, - // so alloc_worker will not return this worker and wake_one_worker - // will notify no one. - // shortly after that the worker is marked as free and begins to sleep, - // but the newly created task will not be executed - SPDLOG_INFO("no busy worker found, wake all"); - - wake_all_workers(); - return false; - } + if( id == 0 ) + return cv.notify(); + else if( id > 0 && id <= worker_pool->size() ) + return worker_pool->get_worker(id - 1).wake(); else - { - SPDLOG_INFO("wake worker {}", worker_id); - threads[ worker_id ]->wake(); - return true; - } + return false; } - void wake_all_workers() + /* wakeup all wakers (workers + main thread) + */ + void wake_all() { - for( uint16_t i = 0; i <= threads.size(); ++i ) + for( uint16_t i = 0; i <= worker_pool->size(); ++i ) this->wake( i ); } - - bool wake( WakerID id = 0 ) - { - if( id == 0 ) - return cv.notify(); - else if( id > 0 && id <= threads.size() ) - return threads[ id - 1 ]->wake(); - else - return false; - } }; } // namespace scheduler diff --git a/redGrapes/scheduler/event.cpp b/redGrapes/scheduler/event.cpp index ce23f64f..29a569a6 100644 --- a/redGrapes/scheduler/event.cpp +++ b/redGrapes/scheduler/event.cpp @@ -12,14 +12,13 @@ #include #include +#include #include #include #include - #include #include #include -#include #include namespace redGrapes diff --git a/redGrapes/scheduler/event.hpp b/redGrapes/scheduler/event.hpp index b0e743ae..fc152582 100644 --- a/redGrapes/scheduler/event.hpp +++ b/redGrapes/scheduler/event.hpp @@ -95,7 +95,7 @@ struct Event std::atomic_uint16_t state; //! waker that is waiting for this event - WakerID waker_id; + WakerId waker_id; //! the set of subsequent events ChunkedList< EventPtr > followers; diff --git a/redGrapes/scheduler/scheduler.hpp b/redGrapes/scheduler/scheduler.hpp index 0e48a878..6d8e41ae 100644 --- a/redGrapes/scheduler/scheduler.hpp +++ b/redGrapes/scheduler/scheduler.hpp @@ -26,7 +26,7 @@ struct WorkerThread; namespace scheduler { -using WakerID = int16_t; +using WakerId = int16_t; /*! Scheduler Interface */ @@ -36,14 +36,6 @@ struct IScheduler { } - virtual void start() - { - } - - virtual void stop() - { - } - /*! whats the task dependency type for the edge a -> b (task a precedes task b) * @return true if task b depends on the pre event of task a, false if task b depends on the post event of task b. */ @@ -59,20 +51,13 @@ struct IScheduler virtual void activate_task( Task & task ) {} //! give worker work if available - virtual Task * schedule( dispatch::thread::WorkerThread & worker ) + virtual Task * steal_task( dispatch::thread::WorkerThread & worker ) { return nullptr; } - virtual bool wake( WakerID id = 0 ) - { - return false; - } - - virtual void wake_all_workers() - {} - - virtual bool wake_one_worker() + virtual void wake_all() {} + virtual bool wake( WakerId id = 0 ) { return false; } diff --git a/redGrapes/task/queue.hpp b/redGrapes/task/queue.hpp index 0b4b0302..22246694 100644 --- a/redGrapes/task/queue.hpp +++ b/redGrapes/task/queue.hpp @@ -10,6 +10,7 @@ #include #include #include +#include namespace redGrapes { diff --git a/redGrapes/task/task.hpp b/redGrapes/task/task.hpp index 7ae5308b..d9e6ce1e 100644 --- a/redGrapes/task/task.hpp +++ b/redGrapes/task/task.hpp @@ -16,7 +16,6 @@ #include #include - // defines REDGRAPES_TASK_PROPERTIES #include diff --git a/redGrapes/task/task_space.cpp b/redGrapes/task/task_space.cpp index 95e69dbc..54d9e92c 100644 --- a/redGrapes/task/task_space.cpp +++ b/redGrapes/task/task_space.cpp @@ -1,4 +1,4 @@ -/* Copyright 2021-2022 Michael Sippel +/* Copyright 2021-2023 Michael Sippel * * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this @@ -62,10 +62,7 @@ namespace redGrapes // -> never have current_task = nullptr //spdlog::info("kill task... {} remaining", count); if( count == 0 ) - { - //spdlog::info("last task, wake all"); - top_scheduler->wake_all_workers(); - } + top_scheduler->wake_all(); } void TaskSpace::submit( Task * task ) diff --git a/redGrapes/task/task_space.hpp b/redGrapes/task/task_space.hpp index 247650e7..63f45abe 100644 --- a/redGrapes/task/task_space.hpp +++ b/redGrapes/task/task_space.hpp @@ -19,7 +19,7 @@ namespace redGrapes { -/*! +/*! TaskSpace handles sub-taskspaces of child tasks */ struct TaskSpace : std::enable_shared_from_this { @@ -42,9 +42,12 @@ struct TaskSpace : std::enable_shared_from_this virtual bool is_serial( Task& a, Task& b ); virtual bool is_superset( Task& a, Task& b ); - void free_task( Task * task ); + // add a new task to the task-space void submit( Task * task ); + // remove task from task-space + void free_task( Task * task ); + bool empty() const; }; diff --git a/redGrapes/util/bitfield.hpp b/redGrapes/util/bitfield.hpp new file mode 100644 index 00000000..51159ff0 --- /dev/null +++ b/redGrapes/util/bitfield.hpp @@ -0,0 +1,202 @@ +/* Copyright 2023 Michael Sippel + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#pragma once + +#include +#include +#include + +namespace redGrapes +{ + +struct AtomicBitfield +{ + AtomicBitfield( size_t m_size ) + : m_size(m_size) + , state( ceil_div(m_size, bitfield_len) ) + { + + } + + size_t size() + { + return m_size; + } + +#define __INDEX_CALC(i, j, k, m) \ + unsigned j = i / bitfield_len; \ + unsigned k = i % bitfield_len; \ + uint64_t m = (uint64_t)1 << k; + + /* atomically update bit at index `idx` + * + * @return previous value + */ + inline bool set( unsigned idx, bool new_value ) + { + __INDEX_CALC(idx, chunk_idx, k, mask) + unsigned old_val; + + switch(new_value) { + case false: + old_val = state[chunk_idx].fetch_and(~mask, std::memory_order_acquire); + break; + + case true: + old_val = state[chunk_idx].fetch_or(mask, std::memory_order_release); + break; + } + + return old_val & mask; + } + + /* get current value of bit at `idx` + */ + inline bool get( unsigned idx ) + { + __INDEX_CALC(idx,chunk_idx,k,mask) + state[chunk_idx] & mask; + } + + /* searches for a bit which is of state `expected_value` + * and suffices the condition given by `f`. + * + * @param start_idx gives a initial position where + * elements in the same chunk as `start_idx` are preferred over + * elements from differening chunks + * and elements following `start_idx` are preferred over preceding ones + * + * @return element given by `f(idx)` where `state[idx] == expected_value` + */ + template + inline std::optional< T > + probe_by_value( + F && f, + bool expected_value, + unsigned start_idx, + bool exclude_start = true) + { + uint64_t start_field_idx = start_idx / bitfield_len; + uint64_t first_mask = (uint64_t(-1) << (start_idx%bitfield_len)); + uint64_t second_mask = ~first_mask; + + /* probe second-half of current chunk + */ + if( start_field_idx == state.size() - 1 && size() % bitfield_len != 0 ) + second_mask &= (uint64_t(1) << (size() % bitfield_len)) - 1; + + if( exclude_start ) + second_mask &= ~(uint64_t(1) << (start_idx%bitfield_len)); + + if( auto x = probe_chunk_by_value( start_field_idx, second_mask, expected_value, f ) ) + return x; + + /* probe first half of current chunk + */ + if( start_field_idx == state.size() - 1 && size() % bitfield_len != 0 ) + first_mask &= (uint64_t(1) << (size() % bitfield_len)) - 1; + if( auto x = probe_chunk_by_value( start_field_idx, first_mask, expected_value, f ) ) + return x; + + /* probe remaining chunks + */ + for( + uint64_t b = 1; + b < ceil_div(size(), bitfield_len); + ++b + ) { + uint64_t field_idx = (start_field_idx + b) % state.size(); + uint64_t mask = ~0; + + if( field_idx == state.size() - 1 && size() % bitfield_len != 0 ) + mask &= (uint64_t(1) << (size() % bitfield_len)) - 1; + + if( auto x = probe_chunk_by_value( field_idx, mask, expected_value, f ) ) + return x; + } + + return std::nullopt; + } + + + +private: + // TODO: try different values, e.g. 8 + // to add hierarchy matching the NUMA architecture + static constexpr uint64_t bitfield_len = 64; + + size_t m_size; + std::vector< std::atomic< uint64_t > > state; + + + /*! calculates ceil( a / b ) + */ + static inline uint64_t ceil_div( uint64_t a, uint64_t b ) + { + return (a+b-1)/b; + } + + // find index of first set bit + // taken from https://graphics.stanford.edu/~seander/bithacks.html#ZerosOnRightParallel + static inline unsigned int first_one_idx( uint64_t v ) + { + unsigned int c = 64; // c will be the number of zero bits on the right + v &= -int64_t(v); + if (v) c--; + if (v & 0x00000000FFFFFFFF) c -= 32; + if (v & 0x0000FFFF0000FFFF) c -= 16; + if (v & 0x00FF00FF00FF00FF) c -= 8; + if (v & 0x0F0F0F0F0F0F0F0F) c -= 4; + if (v & 0x3333333333333333) c -= 2; + if (v & 0x5555555555555555) c -= 1; + + return c; + } + + + /* searches for a bit which is of state `expected_value` + * and suffices the condition given by `f` in the chunk `j`. + * + * @return element given by `f(idx)` where `state[idx] == expected_value` + */ + template + inline std::optional< T > probe_chunk_by_value( unsigned j, uint64_t mask, bool expected_value, F && f ) + { + while( true ) + { + uint64_t field = expected_value ? + uint64_t(state[j]) : ~uint64_t(state[j]); + + uint64_t masked_field = field & mask; + if( masked_field == 0 ) + break; + + // find index of first worker + unsigned int k = first_one_idx( masked_field ); + + if( k < bitfield_len ) + { + unsigned int idx = j * bitfield_len + k; + //spdlog::info("find worker: j = {}, k = {}, idx= {}", j , k, idx); + + if( std::optional x = f( idx ) ) + return x; + + // dont check this worker again + mask &= ~(uint64_t(1) << k); + } + } + + return std::nullopt; + } + + +}; + +} // namespace redGrapes + diff --git a/redGrapesConfig.cmake b/redGrapesConfig.cmake index 1d552b76..84e6c202 100644 --- a/redGrapesConfig.cmake +++ b/redGrapesConfig.cmake @@ -14,6 +14,8 @@ add_library(redGrapes ${CMAKE_CURRENT_LIST_DIR}/redGrapes/resource/resource.cpp ${CMAKE_CURRENT_LIST_DIR}/redGrapes/dispatch/thread/execute.cpp ${CMAKE_CURRENT_LIST_DIR}/redGrapes/dispatch/thread/cpuset.cpp + ${CMAKE_CURRENT_LIST_DIR}/redGrapes/dispatch/thread/worker.cpp + ${CMAKE_CURRENT_LIST_DIR}/redGrapes/dispatch/thread/worker_pool.cpp ${CMAKE_CURRENT_LIST_DIR}/redGrapes/scheduler/event.cpp ${CMAKE_CURRENT_LIST_DIR}/redGrapes/scheduler/event_ptr.cpp ${CMAKE_CURRENT_LIST_DIR}/redGrapes/task/property/graph.cpp