Skip to content

Commit

Permalink
factor AtomicBitfield & WorkerPool out from scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelsippel committed Sep 12, 2023
1 parent 00f0cce commit 642f521
Show file tree
Hide file tree
Showing 21 changed files with 767 additions and 536 deletions.
1 change: 1 addition & 0 deletions examples/2_functors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ int square (int x)

int main()
{
spdlog::set_level(spdlog::level::trace);
redGrapes::init(1);

fmt::print(
Expand Down
12 changes: 8 additions & 4 deletions examples/mpi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,18 @@ int main()
{
//spdlog::set_pattern("[thread %t] %^[%l]%$ %v");
//spdlog::set_level( spdlog::level::trace );

/*
int prov;
MPI_Init_thread( nullptr, nullptr, MPI_THREAD_MULTIPLE, &prov );
assert( prov == MPI_THREAD_MULTIPLE );
*/

MPI_Init( nullptr, nullptr );

auto default_scheduler = std::make_shared<rg::scheduler::DefaultScheduler>( 2 );
rg::init_allocator(4);

auto default_scheduler = std::make_shared<rg::scheduler::DefaultScheduler>();
auto mpi_request_pool = std::make_shared<rg::dispatch::mpi::RequestPool>();
auto mpi_fifo = std::make_shared<rg::scheduler::FIFO>();

Expand All @@ -60,11 +64,11 @@ int main()
rg::dispatch::thread::execute_task( *task );
};

rg::init(
rg::scheduler::make_tag_match_scheduler( )
rg::init(4,
rg::scheduler::make_tag_match_scheduler()
.add({}, default_scheduler)
.add({ SCHED_MPI }, mpi_fifo));

// initialize MPI config
rg::IOResource< MPIConfig > mpi_config;
rg::emplace_task(
Expand Down
15 changes: 12 additions & 3 deletions redGrapes/context.hpp
Original file line number Diff line number Diff line change
@@ -1,23 +1,32 @@
#pragma once

#include <memory>
#include <moodycamel/concurrentqueue.h>

#include <redGrapes/scheduler/scheduler.hpp>
#include <functional>

namespace redGrapes
{

struct Task;
struct TaskSpace;

namespace dispatch {
namespace thread{
struct WorkerPool;
}
}

namespace scheduler {
struct IScheduler;
}

/*! global context
*/
extern thread_local Task * current_task;
extern thread_local std::function< void () > idle;

extern std::shared_ptr< TaskSpace > top_space;
extern std::shared_ptr< scheduler::IScheduler > top_scheduler;
extern std::shared_ptr< dispatch::thread::WorkerPool > worker_pool;

unsigned scope_depth();

Expand Down
4 changes: 2 additions & 2 deletions redGrapes/dispatch/thread/execute.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* Copyright 2022 Michael Sippel
/* Copyright 2022-2023 Michael Sippel
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
Expand All @@ -21,7 +21,7 @@ namespace dispatch
namespace thread
{

thread_local scheduler::WakerID current_waker_id;
thread_local scheduler::WakerId current_waker_id;
thread_local std::shared_ptr< WorkerThread > current_worker;

void execute_task( Task & task )
Expand Down
2 changes: 1 addition & 1 deletion redGrapes/dispatch/thread/local.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace thread

struct WorkerThread;

extern thread_local scheduler::WakerID current_waker_id;
extern thread_local scheduler::WakerId current_waker_id;
extern thread_local std::shared_ptr< WorkerThread > current_worker;

} // namespace thread
Expand Down
189 changes: 189 additions & 0 deletions redGrapes/dispatch/thread/worker.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/* Copyright 2020-2023 Michael Sippel
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

#include <redGrapes/dispatch/thread/worker.hpp>
#include <redGrapes/dispatch/thread/worker_pool.hpp>

namespace redGrapes
{
namespace dispatch
{
namespace thread
{

WorkerThread::WorkerThread( WorkerId worker_id )
: id( worker_id ),
thread(
[this] {
/* setup membind- & cpubind policies using hwloc
*/
this->cpubind();
this->membind();

/* since we are in a worker, there should always
* be a task running (we always have a parent task
* and therefore yield() guarantees to do
* a context-switch instead of idling
*/
redGrapes::idle = [this] {
throw std::runtime_error("idle in worker thread!");
};

/* wait for start-flag to be triggerd in order
* to avoid premature access to `shared_from_this`
*/
while( ! m_start.load(std::memory_order_consume) )
cv.wait();

/* initialize thread-local variables
*/
current_worker = this->shared_from_this();
current_waker_id = this->get_waker_id();
memory::current_arena = this->get_worker_id();

/* execute tasks until stop()
*/
this->work_loop();

SPDLOG_TRACE("Worker Finished!");
}
)
{
}

void WorkerThread::start()
{
m_start.store(true, std::memory_order_release);
wake();
}

void WorkerThread::stop()
{
SPDLOG_TRACE("Worker::stop()");
m_stop.store(true, std::memory_order_release);
wake();
thread.join();
}

void WorkerThread::cpubind()
{
hwloc_obj_t obj = hwloc_get_obj_by_type(topology, HWLOC_OBJ_PU, this->id);

if( hwloc_set_cpubind(topology, obj->cpuset, HWLOC_CPUBIND_THREAD | HWLOC_CPUBIND_STRICT) )
{
char *str;
int error = errno;
hwloc_bitmap_asprintf(&str, obj->cpuset);
spdlog::warn("Couldn't cpubind to cpuset {}: {}\n", str, strerror(error));
free(str);
}
}

void WorkerThread::membind()
{
hwloc_obj_t obj = hwloc_get_obj_by_type(topology, HWLOC_OBJ_PU, this->id);

if( hwloc_set_membind(topology, obj->cpuset, HWLOC_MEMBIND_BIND, HWLOC_MEMBIND_THREAD | HWLOC_MEMBIND_STRICT ) )
{
char *str;
int error = errno;
hwloc_bitmap_asprintf(&str, obj->cpuset);
spdlog::warn("Couldn't membind to cpuset {}: {}\n", str, strerror(error));
free(str);
}
}

void WorkerThread::work_loop()
{
SPDLOG_TRACE("Worker {} start work_loop()", id);
while( ! m_stop.load(std::memory_order_consume) )
{
while( Task * task = this->gather_task() )
{
worker_pool->set_worker_state( id, dispatch::thread::WorkerState::BUSY );
dispatch::thread::execute_task( *task );
}

worker_pool->set_worker_state( id, dispatch::thread::WorkerState::BUSY );

if( !m_stop.load(std::memory_order_consume) )
cv.wait();
}
SPDLOG_TRACE("Worker {} end work_loop()", id);
}

Task * WorkerThread::gather_task()
{
Task * task = nullptr;

/* STAGE 1:
*
* first, execute all tasks in the ready queue
*/
SPDLOG_TRACE("Worker {}: consume ready queue", id);
if( task = ready_queue.pop() )
return task;

/* STAGE 2:
*
* after the ready queue is fully consumed,
* try initializing new tasks until one
* of them is found to be ready
*/
SPDLOG_TRACE("Worker {}: try init new tasks", id);
while( this->init_dependencies( task, true ) )
if( task )
return task;

/* set worker state to signal that we are requesting tasks
*/
worker_pool->set_worker_state( id, dispatch::thread::WorkerState::AVAILABLE );

#ifndef ENABLE_WORKSTEALING
#define ENABLE_WORKSTEALING 1
#endif

#if ENABLE_WORKSTEALING

/* STAGE 3:
*
* after all tasks are workstealing
*/
SPDLOG_TRACE("Worker {}: try to steal tasks", id);
task = top_scheduler->steal_task( *this );

#endif

return task;
}

bool WorkerThread::init_dependencies( Task* & t, bool claimed )
{
if(Task * task = emplacement_queue.pop())
{
SPDLOG_DEBUG("init task {}", task->task_id);

task->pre_event.up();
task->init_graph();

if( task->get_pre_event().notify( claimed ) )
t = task;
else
{
t = nullptr;
}

return true;
}
else
return false;
}

} // namespace thread
} // namespace dispatch
} // namespace redGrapes

Loading

0 comments on commit 642f521

Please sign in to comment.