Skip to content

Commit

Permalink
implement ContinuableTask by inheritance from Task and only create Co…
Browse files Browse the repository at this point in the history
…ntinuableTask object by empace_continuable_task() if its really needed
  • Loading branch information
michaelsippel committed Dec 6, 2023
1 parent debfbc0 commit 2bfb808
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 153 deletions.
4 changes: 2 additions & 2 deletions examples/6_resource_scope.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ int main()
rg::init(1);
rg::IOResource< int > a; // scope-level=0

rg::emplace_task(
rg::emplace_continuable_task(
[]( auto a )
{
std::cout << "scope = " << rg::scope_depth() << std::endl;
Expand All @@ -36,7 +36,7 @@ int main()
std::cout << "scope = " << rg::scope_depth() << std::endl;
},
a.read()
).enable_stack_switching();
);

rg::finalize();
}
12 changes: 5 additions & 7 deletions examples/mpi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ int main()

redGrapes::Task * task;

if( task = mpi_worker->ready_queue.pop() )
if( (task = mpi_worker->ready_queue.pop()) )
redGrapes::SingletonContext::get().execute_task( *task );

while( mpi_worker->init_dependencies( task, true ) )
Expand Down Expand Up @@ -119,7 +119,7 @@ int main()
*/

// Send
rg::emplace_task(
rg::emplace_continuable_task(
[i, current, mpi_request_pool]( auto field, auto mpi_config )
{
int dst = ( mpi_config->world_rank + 1 ) % mpi_config->world_size;
Expand All @@ -131,11 +131,10 @@ int main()
},
field[current].at({3}).read(),
mpi_config.read())
.scheduling_tags({ SCHED_MPI })
.enable_stack_switching();
.scheduling_tags({ SCHED_MPI });

// Receive
rg::emplace_task(
rg::emplace_continuable_task(
[i, current, mpi_request_pool]( auto field, auto mpi_config )
{
int src = ( mpi_config->world_rank - 1 ) % mpi_config->world_size;
Expand All @@ -150,8 +149,7 @@ int main()
},
field[current].at({0}).write(),
mpi_config.read())
.scheduling_tags({ SCHED_MPI })
.enable_stack_switching();
.scheduling_tags({ SCHED_MPI });

/*
* Compute iteration
Expand Down
6 changes: 3 additions & 3 deletions redGrapes/dispatch/thread/execute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ void Context::execute_task( Task & task )
current_task = &task;

auto event = task();

if( event )
{
event->get_event().waker_id = current_worker->get_waker_id();
task.sg_pause( *event );
event.get_event().waker_id = current_waker_id;
task.sg_pause( event );

task.pre_event.up();
task.get_pre_event().notify();
Expand Down
2 changes: 1 addition & 1 deletion redGrapes/dispatch/thread/worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void WorkerPool::emplace_workers( size_t n_workers )
REDGRAPES_ALLOC_CHUNKSIZE
);

SingletonContext::get().current_arena = pu_id;
SingletonContext::get().current_arena = worker_id;
auto worker = memory::alloc_shared_bind<WorkerThread>( pu_id, get_alloc(pu_id), hwloc_ctx, obj, worker_id );
// auto worker = std::make_shared< WorkerThread >( get_alloc(i), hwloc_ctx, obj, i );
workers.emplace_back( worker );
Expand Down
30 changes: 22 additions & 8 deletions redGrapes/redGrapes.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ struct Context
template< typename Callable, typename... Args >
auto emplace_task(Callable&& f, Args&&... args);

template< typename Callable, typename... Args >
auto emplace_continuable_task(Callable&& f, Args&&... args);

static thread_local Task * current_task;
static thread_local std::function< void () > idle;
static thread_local unsigned next_worker;
Expand Down Expand Up @@ -151,6 +154,15 @@ inline auto emplace_task(Callable&& f, Args&&... args) {
)
); }

template<typename Callable, typename... Args>
inline auto emplace_continuable_task(Callable&& f, Args&&... args) {
return std::move(
SingletonContext::get().emplace_continuable_task(
std::move(f),
std::forward<Args>(args)...
)
); }

} //namespace redGrapes


Expand All @@ -163,19 +175,21 @@ namespace redGrapes
template<typename Callable, typename... Args>
auto Context::emplace_task(Callable&& f, Args&&... args)
{
dispatch::thread::WorkerId worker_id =
// linear
next_worker % worker_pool->size();
dispatch::thread::WorkerId worker_id = next_worker++ % worker_pool->size();
current_arena = worker_id;
SPDLOG_TRACE("emplace task to worker {} next_worker={}", worker_id, next_worker);

// interleaved
// 2*next_worker % worker_pool->size() + ((2*next_worker) / worker_pool->size())%2;
return std::move(TaskBuilder< Callable, Args... >( false, std::move(f), std::forward<Args>(args)... ));
}

next_worker++;
template<typename Callable, typename... Args>
auto Context::emplace_continuable_task(Callable&& f, Args&&... args)
{
dispatch::thread::WorkerId worker_id = next_worker++ % worker_pool->size();
current_arena = worker_id;

SPDLOG_TRACE("emplace task to worker {} next_worker={}", worker_id, next_worker);

return std::move(TaskBuilder< Callable, Args... >( std::move(f), std::forward<Args>(args)... ));
return std::move(TaskBuilder< Callable, Args... >( true, std::move(f), std::forward<Args>(args)... ));
}
} // namespace redGrapes

89 changes: 71 additions & 18 deletions redGrapes/task/task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
*/
#pragma once

#include <redGrapes/scheduler/event.hpp>
#include <type_traits>
#include <redGrapes/task/task_base.hpp>
#include <redGrapes/task/property/inherit.hpp>
#include <redGrapes/task/property/trait.hpp>
#include <redGrapes/task/property/id.hpp>
Expand All @@ -23,37 +23,44 @@ namespace redGrapes
{

using TaskProperties = TaskProperties1<
IDProperty,
ResourceProperty,
QueueProperty,
GraphProperty
, ResourceProperty
// , QueueProperty
#ifdef REDGRAPES_TASK_PROPERTIES
, REDGRAPES_TASK_PROPERTIES
#endif
, IDProperty
>;

struct Task :
TaskBase,
TaskProperties
struct Task
: TaskProperties
{
virtual ~Task() {}

unsigned arena_id;
std::atomic_int removal_countdown;
uint16_t arena_id;
std::atomic< uint8_t > removal_countdown;

Task()
: removal_countdown(2)
{}

virtual ~Task() {}

inline scheduler::EventPtr operator() ()
{
return this->run();
}

virtual scheduler::EventPtr run() = 0;
virtual void yield( scheduler::EventPtr event )
{
spdlog::error("Task {} does not support yield()", this->task_id);
}

virtual void * get_result_data()
{
return nullptr;
}
};

// TODO: fuse ResultTask and FunTask into one template
// ---> removes one layer of virtual function calls

template < typename Result >
struct ResultTask : Task
{
Expand All @@ -67,11 +74,11 @@ struct ResultTask : Task
}

virtual Result run_result() = 0;

void run() final
virtual scheduler::EventPtr run()
{
result_data = run_result();
get_result_set_event().notify(); // result event now ready
return scheduler::EventPtr{};
}
};

Expand All @@ -81,11 +88,11 @@ struct ResultTask<void> : Task
virtual ~ResultTask() {}

virtual void run_result() {}

void run() final
virtual scheduler::EventPtr run()
{
run_result();
get_result_set_event().notify();
return scheduler::EventPtr{};
}
};

Expand All @@ -105,3 +112,49 @@ struct FunTask

} // namespace redGrapes



#include <boost/context/continuation.hpp>
#include <redGrapes/scheduler/event.hpp>

namespace redGrapes
{

template< typename F >
struct ContinuableTask
: FunTask< F >
{
boost::context::continuation yield_cont;
boost::context::continuation resume_cont;
scheduler::EventPtr event;

scheduler::EventPtr run()
{
if( ! resume_cont )
{
resume_cont = boost::context::callcc(
[this]( boost::context::continuation && c )
{
this->yield_cont = std::move(c);
this->FunTask< F >::run();
this->event = scheduler::EventPtr{};

return std::move(this->yield_cont);
}
);
}
else
resume_cont = resume_cont.resume();

return event;
}

void yield( scheduler::EventPtr e )
{
this->event = e;
yield_cont = yield_cont.resume();
}
};

} // namespace redGrapes

100 changes: 0 additions & 100 deletions redGrapes/task/task_base.hpp

This file was deleted.

Loading

0 comments on commit 2bfb808

Please sign in to comment.