Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelsippel committed Oct 22, 2023
1 parent 07fd2ff commit 5bd057a
Show file tree
Hide file tree
Showing 18 changed files with 171 additions and 58 deletions.
3 changes: 2 additions & 1 deletion examples/mpi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ int main()
auto mpi_request_pool = std::make_shared<rg::dispatch::mpi::RequestPool>();

hwloc_obj_t obj = hwloc_get_obj_by_type( redGrapes::hwloc_ctx->topology, HWLOC_OBJ_PU, 1 );
auto mpi_worker = std::make_shared<rg::dispatch::thread::Worker>( redGrapes::hwloc_ctx, obj, 4 );
rg::memory::ChunkedBumpAlloc< rg::memory::HwlocAlloc > mpi_alloc( rg::memory::HwlocAlloc<uint8_t>( redGrapes::hwloc_ctx, obj ) );
auto mpi_worker = std::make_shared<rg::dispatch::thread::Worker>( mpi_alloc, redGrapes::hwloc_ctx, obj, 4 );

// initialize main thread to execute tasks from the mpi-queue and poll
rg::idle =
Expand Down
12 changes: 6 additions & 6 deletions redGrapes/dispatch/thread/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@ namespace dispatch
{
namespace thread
{
WorkerThread::WorkerThread( std::shared_ptr< HwlocContext > hwloc_ctx, hwloc_obj_t const & obj, WorkerId worker_id )
: Worker( hwloc_ctx, obj, worker_id ),
thread([this] { this->run(); })
WorkerThread::WorkerThread( memory::ChunkedBumpAlloc< memory::HwlocAlloc > & alloc, std::shared_ptr< HwlocContext > hwloc_ctx, hwloc_obj_t const & obj, WorkerId worker_id )
: Worker( alloc, hwloc_ctx, obj, worker_id )
, thread([this] { this->run(); })
{
}

WorkerThread::~WorkerThread()
{
}

Worker::Worker( std::shared_ptr<HwlocContext> hwloc_ctx, hwloc_obj_t const & obj, WorkerId worker_id )
: hwloc_ctx(hwloc_ctx)
, alloc( memory::HwlocAlloc<uint8_t>( hwloc_ctx, obj ) )
Worker::Worker( memory::ChunkedBumpAlloc<memory::HwlocAlloc> & alloc, std::shared_ptr<HwlocContext> hwloc_ctx, hwloc_obj_t const & obj, WorkerId worker_id )
: alloc( alloc )
, hwloc_ctx( hwloc_ctx )
, id( worker_id )
{
}
Expand Down
6 changes: 3 additions & 3 deletions redGrapes/dispatch/thread/worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@ struct Worker
static constexpr size_t queue_capacity = 128;

public:
memory::ChunkedBumpAlloc< memory::HwlocAlloc > alloc;
memory::ChunkedBumpAlloc< memory::HwlocAlloc > & alloc;
std::shared_ptr< HwlocContext > hwloc_ctx;

task::Queue emplacement_queue{ queue_capacity };
task::Queue ready_queue{ queue_capacity };

Worker( std::shared_ptr<HwlocContext > hwloc_ctx, hwloc_obj_t const & obj, WorkerId id );
Worker( memory::ChunkedBumpAlloc< memory::HwlocAlloc > & alloc, std::shared_ptr<HwlocContext > hwloc_ctx, hwloc_obj_t const & obj, WorkerId id );
virtual ~Worker();

inline WorkerId get_worker_id() { return id; }
Expand Down Expand Up @@ -129,7 +129,7 @@ struct WorkerThread
{
std::thread thread;

WorkerThread( std::shared_ptr<HwlocContext> hwloc_ctx, hwloc_obj_t const & obj, WorkerId worker_id );
WorkerThread( memory::ChunkedBumpAlloc<memory::HwlocAlloc> & alloc, std::shared_ptr<HwlocContext> hwloc_ctx, hwloc_obj_t const & obj, WorkerId worker_id );
~WorkerThread();

void stop();
Expand Down
26 changes: 20 additions & 6 deletions redGrapes/dispatch/thread/worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
*/
#include <redGrapes/dispatch/thread/worker.hpp>
#include <redGrapes/dispatch/thread/worker_pool.hpp>
#include <redGrapes/util/allocator.hpp>
#include <redGrapes/util/hwloc_alloc.hpp>
#include <redGrapes/util/chunked_bump_alloc.hpp>
#include <redGrapes_config.hpp>

namespace redGrapes
{
Expand All @@ -19,23 +21,33 @@ namespace thread
WorkerPool::WorkerPool( std::shared_ptr< HwlocContext > hwloc_ctx, size_t n_workers )
: worker_state( n_workers )
{
workers.reserve( n_workers );
redGrapes::dispatch::thread::current_waker_id = 0;
}

void WorkerPool::emplace_workers( size_t n_workers )
{
unsigned n_pus = hwloc_get_nbobjs_by_type(hwloc_ctx->topology, HWLOC_OBJ_PU);
if( n_workers > n_pus )
spdlog::warn("{} worker-threads requested, but only {} PUs available!", n_workers, n_pus);

SPDLOG_INFO("create WorkerPool with {} workers", n_workers);
allocs.reserve( n_workers );
workers.reserve( n_workers );

SPDLOG_INFO("populate WorkerPool with {} workers", n_workers);
for( size_t i = 0; i < n_workers; ++i )
{
// allocate worker with id `i` on arena `i`,
hwloc_obj_t obj = hwloc_get_obj_by_type(hwloc_ctx->topology, HWLOC_OBJ_PU, i);
memory::HwlocAlloc< dispatch::thread::WorkerThread > hwloc_alloc( hwloc_ctx, obj );
auto worker = std::allocate_shared< dispatch::thread::WorkerThread >( hwloc_alloc, hwloc_ctx, obj, i );
allocs.emplace_back(
memory::HwlocAlloc<uint8_t>( hwloc_ctx, obj ),
REDGRAPES_ALLOC_CHUNKSIZE
);

memory::current_arena = i;
auto worker = memory::alloc_shared_bind<WorkerThread>( i, get_alloc(i), hwloc_ctx, obj, i );
// auto worker = std::make_shared< WorkerThread >( get_alloc(i), hwloc_ctx, obj, i );
workers.emplace_back( worker );
}

redGrapes::dispatch::thread::current_waker_id = 0;
}

WorkerPool::~WorkerPool()
Expand All @@ -52,6 +64,8 @@ void WorkerPool::stop()
{
for( auto & worker : workers )
worker->stop();

workers.clear();
}

int WorkerPool::find_free_worker()
Expand Down
11 changes: 11 additions & 0 deletions redGrapes/dispatch/thread/worker_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

#include <memory>
#include <redGrapes/util/bitfield.hpp>
#include <redGrapes/util/hwloc_alloc.hpp>
#include <redGrapes/util/chunked_bump_alloc.hpp>

namespace redGrapes
{
Expand All @@ -31,6 +33,8 @@ struct WorkerPool
WorkerPool( std::shared_ptr< HwlocContext > hwloc_ctx, size_t n_workers = 1 );
~WorkerPool();

void emplace_workers( size_t n_workers );

/* get the number of workers in this pool
*/
inline size_t size()
Expand All @@ -46,6 +50,12 @@ struct WorkerPool
*/
void stop();

inline memory::ChunkedBumpAlloc< memory::HwlocAlloc > & get_alloc( WorkerId worker_id )
{
assert( worker_id < allocs.size() );
return allocs[ worker_id ];
}

inline WorkerThread & get_worker( WorkerId worker_id )
{
assert( worker_id < size() );
Expand Down Expand Up @@ -84,6 +94,7 @@ struct WorkerPool
int find_free_worker();

private:
std::vector< memory::ChunkedBumpAlloc< memory::HwlocAlloc > > allocs;
std::vector< std::shared_ptr< dispatch::thread::WorkerThread > > workers;
AtomicBitfield worker_state;
};
Expand Down
1 change: 1 addition & 0 deletions redGrapes/redGrapes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ void init( size_t n_workers, std::shared_ptr<scheduler::IScheduler> scheduler )
{
init_tracing();
worker_pool = std::make_shared<dispatch::thread::WorkerPool>( hwloc_ctx, n_workers );
worker_pool->emplace_workers( n_workers );

top_space = std::make_shared<TaskSpace>();
top_scheduler = scheduler;
Expand Down
1 change: 1 addition & 0 deletions redGrapes/resource/resource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <redGrapes/util/allocator.hpp>
#include <redGrapes/util/chunked_list.hpp>
#include <redGrapes/util/spinlock.hpp>
#include <redGrapes/dispatch/thread/worker_pool.hpp>
#include <redGrapes_config.hpp>

#include <fmt/format.h>
Expand Down
33 changes: 16 additions & 17 deletions redGrapes/task/property/graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,25 +54,24 @@ void GraphProperty::init_graph()
TRACE_EVENT("Graph", "CheckPredecessors");
auto it = r->task_entry;

++it;
for(; it != r->resource->users.rend(); ++it )
{
TRACE_EVENT("Graph", "Check Pred");
Task * preceding_task = *it;

if( preceding_task == this->space->parent )
break;
++it;
for(; it != r->resource->users.rend(); ++it )
{
TRACE_EVENT("Graph", "Check Pred");
Task * preceding_task = *it;

if(
preceding_task->space == this->space &&
this->space->is_serial( *preceding_task, *this->task )
)
{
add_dependency( *preceding_task );
if( preceding_task == this->space->parent )
break;

if( preceding_task->has_sync_access( r->resource ) )
break;
}
if(
preceding_task->space == this->space &&
this->space->is_serial( *preceding_task, *this->task )
)
{
add_dependency( *preceding_task );
if( preceding_task->has_sync_access( r->resource ) )
break;
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions redGrapes/util/allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ UntypedAllocator::UntypedAllocator( dispatch::thread::WorkerId worker_id )

void * UntypedAllocator::allocate( size_t n_bytes )
{
return (void*)worker_pool->get_worker( worker_id ).alloc.allocate< uint8_t >( n_bytes );
return (void*)worker_pool->get_alloc( worker_id ).allocate< uint8_t >( n_bytes );
}

void UntypedAllocator::deallocate( void * ptr )
{
worker_pool->get_worker( worker_id ).alloc.deallocate( ptr );
worker_pool->get_alloc( worker_id ).deallocate( ptr );
}

} // namespace memory
Expand Down
11 changes: 10 additions & 1 deletion redGrapes/util/allocator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,22 @@
#include <boost/core/demangle.hpp>
#include <memory>
#include <redGrapes/dispatch/thread/local.hpp>
#include <redGrapes/dispatch/thread/worker_pool.hpp>
// #include <redGrapes/dispatch/thread/worker_pool.hpp>
#include <redGrapes/resource/access/area.hpp>
//#include <redGrapes/context.hpp>

namespace redGrapes
{

namespace dispatch
{
namespace thread
{
using WorkerId = unsigned;
struct WorkerPool;
}
}

extern std::shared_ptr< dispatch::thread::WorkerPool > worker_pool;

namespace memory
Expand Down
4 changes: 2 additions & 2 deletions redGrapes/util/bump_alloc_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ void * BumpAllocChunk::m_alloc( size_t n_bytes )
return nullptr;
}

size_t BumpAllocChunk::m_free( void * )
uint16_t BumpAllocChunk::m_free( void * )
{
return count.fetch_sub(1) - 1;
return count.fetch_sub(1);
}

bool BumpAllocChunk::contains( void * ptr ) const
Expand Down
4 changes: 2 additions & 2 deletions redGrapes/util/bump_alloc_chunk.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ struct BumpAllocChunk
void reset();

void * m_alloc( size_t n_bytes );
size_t m_free( void * );
uint16_t m_free( void * );

bool contains( void * ) const;

std::atomic< uintptr_t > next_addr;
uintptr_t const lower_limit;
uintptr_t const upper_limit;
std::atomic< size_t > count;
std::atomic< uint16_t > count;
};

} // namespace memory
Expand Down
10 changes: 8 additions & 2 deletions redGrapes/util/chunked_bump_alloc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <redGrapes/dispatch/thread/local.hpp>
#include <redGrapes/dispatch/thread/cpuset.hpp>
#include <redGrapes/util/trace.hpp>
#include <backward.hpp>

namespace redGrapes
{
Expand Down Expand Up @@ -143,7 +144,7 @@ struct ChunkedBumpAlloc
* and this chunk is not `head`,
* remove this chunk
*/
if( it->m_free((void*)ptr) == 0 )
if( it->m_free((void*)ptr) == 1 )
{
SPDLOG_TRACE("ChunkedBumpAlloc: erase chunk {}", it->lower_limit);
bump_allocators.erase( it );
Expand All @@ -155,7 +156,12 @@ struct ChunkedBumpAlloc
prev = it;
}

spdlog::error("try to deallocate invalid pointer ({})", (void*)ptr);
spdlog::error("try to deallocate invalid pointer ({}). current_arena={}, this={}", (void*)ptr, current_arena, (void*)this);

backward::StackTrace st;
st.load_here(32);
backward::Printer p;
p.print(st);
}
};

Expand Down
2 changes: 1 addition & 1 deletion redGrapes/util/chunked_list.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ struct ChunkedList

~Item()
{
if( iter_offset == 0 )
if( refcount.fetch_sub(1) == 1 )
storage.value.~T();
}

Expand Down
8 changes: 6 additions & 2 deletions redGrapes/util/chunklist.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,17 @@ struct ChunkList
ChunkData * chunk_data;

template < typename... Args >
Chunk( uintptr_t chunk_data, Args&&... args ) : deleted(false), prev(nullptr), chunk_data((ChunkData*)chunk_data)
Chunk( uintptr_t chunk_data, Args&&... args )
: deleted(false)
, prev(nullptr)
, chunk_data((ChunkData*)chunk_data)
{
new ( get() ) ChunkData ( std::forward<Args>(args)... );
}

~Chunk()
{
spdlog::info("destruct chunk {}", (void*)chunk_data);
get()->~ChunkData();
}

Expand Down Expand Up @@ -141,7 +145,7 @@ struct ChunkList

/* TODO: use sizeof( ...shared_ptr_inplace_something... )
*/
size_t const shared_ptr_size = 128;
size_t const shared_ptr_size = 512;

return sizeof(Chunk) + shared_ptr_size;
}
Expand Down
2 changes: 1 addition & 1 deletion redGrapes/util/trace.hpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

#pragma once

#include <redGrapes_config.hpp>
//#include <redGrapes_config.hpp>

#ifndef REDGRAPES_ENABLE_TRACE
#define REDGRAPES_ENABLE_TRACE 0
Expand Down
Loading

0 comments on commit 5bd057a

Please sign in to comment.