Skip to content

Commit

Permalink
clarify & add comments in item-refcounting mechanism of ChunkedList
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelsippel committed Dec 14, 2023
1 parent 99249a2 commit a25a596
Showing 1 changed file with 162 additions and 54 deletions.
216 changes: 162 additions & 54 deletions redGrapes/util/chunked_list.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ template <
>
struct ChunkedList
{
using chunk_offset_t = uint32_t;
using refcount_t = uint16_t;
using iter_offset_t = uint16_t;
using refcount_t = int16_t;

struct Item
{
Expand All @@ -107,58 +107,126 @@ struct ChunkedList
*/
ItemStorage storage;

/* in case this item is deleted, `iter_offset` gives an
/* this variable tells the distance to the next initialized
* and not already deleted item, where a distance of `0`
* means that this item exists.
* In case this item is deleted, `iter_offset` gives an
* offset by which we can safely jump to find the next
* existing item.
*
* iter_offset = 0 means this item exists
* iter_offset = 1 means previous item exists
* ...
*/
std::atomic< chunk_offset_t > iter_offset;
std::atomic< iter_offset_t > iter_offset;

/* counts the number of iterators pointing
* at this item currently
* at this item currently.
* It is possible that iterators keep their
* reference to an item while this item is being
* deleted. In this case, iter_offset will already
* be set, so any new iterators will now skip
* this item but as long as some iterators referencing
* the already deleted item exist, the item data will
* not be destructed.
*/
std::atomic< refcount_t > refcount;

Item()
: iter_offset( 0 )
// any item starts uninitialized
: iter_offset( 1 )
, refcount( 0 )
, storage( TrivialInit_t{} )
{}

~Item()
{
if( refcount.fetch_sub(1) == 1 )
storage.value.~T();
release<false>();
}

/* initialize value of this item.
* only intended for new elements,
* re-assigning is not allowed.
* Per Item, only one thread is allowed to
* call the assignment operator once.
*/
T & operator=(T const & value)
{
auto old_refcount = refcount.fetch_add(1);
assert( old_refcount == 0 );
assert( iter_offset != 0 );
assert( refcount == 0 );

storage.value = value;

/* here, item.value is now fully initalized,
* so allow iterators to access this item now.
*/
iter_offset = 0;

return storage.value;
}

/* Try to increment `refcount` and check if this
* item is still alive.
*
* @return 0 if acquisition was successful,
* otherwise return iterator distance to the next
* valid item
*/
iter_offset_t acquire()
{
iter_offset_t off = iter_offset.load();
refcount_t old_refcount = refcount.load();

if( iter_offset == 0 && old_refcount >= 0 )
{
old_refcount = refcount.fetch_add(1);
off = iter_offset.load();

if( old_refcount >= 0 )
{
/* The item data is not already destructed,
* but only when `iter_offset` is still set to `0`
* as initialized by `operator=`, the item still exists.
* In case `off > 0`, some thread already called `remove()`
* on this iterator position.
*/

/* keep others from falsely trying to acquire this item
* if it is deleted already.
*/
if( off != 0 )
--refcount;
}
else
/* item data is already destructed.
* just decrement refcount to keep others from trying to
* acquire this item.
*/
--refcount;
}

return off;
}

/* decrement refcount and in case this
* was the last reference, deconstruct the element
* @tparam fail_on_invalid if true, this function will
* throw if the item was already deleted
*/
void remove()
template < bool fail_on_invalid = true >
void release()
{
refcount_t old_refcount = refcount.fetch_sub(1);

if( old_refcount == 1 )
storage.value.~T();

if( old_refcount == 0 )
throw std::runtime_error("remove inexistent item");
{
// item is now deleted, and refcount set to -1
storage.value.~T();
}
else if( old_refcount < 0 )
{
if( fail_on_invalid )
throw std::runtime_error("ChunkedList: try to remove invalid item!");
}
}
};

Expand All @@ -178,7 +246,7 @@ struct ChunkedList
*/
std::atomic< Item * > next_item;

std::atomic< chunk_offset_t > freed_items{ 0 };
std::atomic< size_t > freed_items{ 0 };

Chunk( memory::Block blk )
: first_item( (Item*) blk.ptr )
Expand Down Expand Up @@ -239,20 +307,22 @@ struct ChunkedList
* tries to acquire the element this iterator points to
* by incrementing the reference count, so it will not be
* deleted concurrently to the usage of this iterator.
* @return 0 if acquisition was successful,
* otherwise return iterator distance to the next
* valid item
*/
bool try_acquire()
iter_offset_t try_acquire()
{
if( is_valid_idx() )
{
refcount_t old_refcount = item().refcount.fetch_add(1);
if( old_refcount >= 1 )
{
chunk_offset_t off = item().iter_offset.load();
if( off == 0 )
set_item();
}
iter_offset_t off = item().acquire();
if( off == 0 )
set_item();

return off;
}
return has_item();
else
return 1;
}

/*!
Expand All @@ -262,8 +332,8 @@ struct ChunkedList
{
if( has_item() )
{
item().remove();
unset_item();
item().release();
}
}

Expand All @@ -275,26 +345,30 @@ struct ChunkedList
{
while( is_valid_idx() )
{
uint16_t step = item().iter_offset;

iter_offset_t step = try_acquire();
if( step == 0 )
{
if( try_acquire() )
return;
else
step = 1;
// item was successfully acquired.
assert( has_item() );
return;
}
else
{
// item is not existent
assert( ! has_item() );

assert( ! has_item() );
cur_item = (uintptr_t) (get_item_ptr() - step);
// jump to next valid item
cur_item = (uintptr_t) (get_item_ptr() - step);

if( ! is_valid_idx() )
{
++chunk;
if( chunk )
cur_item = (uintptr_t) chunk->last_item.load();
else
cur_item = 0;
// goto next chunk if necessary
if( ! is_valid_idx() )
{
++chunk;
if( chunk )
cur_item = (uintptr_t) chunk->last_item.load();
else
cur_item = 0;
}
}
}

Expand Down Expand Up @@ -392,7 +466,7 @@ struct ChunkedList
{
this->release();

if( this->get_item_ptr() > this->chunk->first_item )
if( (uintptr_t)(this->get_item_ptr() - 1u) >= (uintptr_t)this->chunk->first_item.load() )
this->cur_item = (uintptr_t) (this->get_item_ptr() - 1);
else
{
Expand Down Expand Up @@ -446,22 +520,49 @@ struct ChunkedList
{
Item * chunk_begin = chunk->first_item;
Item * chunk_end = chunk_begin + T_chunk_size;
Item * next_item = chunk->next_item.fetch_add(1);

if( (uintptr_t)next_item < (uintptr_t)chunk_end )
/* check if there is a chance to get a slot in this chunk
*/
if( (uintptr_t)chunk->next_item.load() <= (uintptr_t)chunk_end )
{
*next_item = item;
chunk->last_item ++;
if( (uintptr_t)next_item == (uintptr_t)(chunk_end - 1u))
Item * next_item = chunk->next_item.fetch_add(1);

if( (uintptr_t)next_item < (uintptr_t)chunk_end )
{
// the thread who took the last item of this chunk must allocate
// the next batch of items
chunks.allocate_item();
/* successfully allocated a slot in the current chunk
*/

// initialize item value
*next_item = item;

/* allow iteration to start at the newly initialized item.
* in case it happens that
*/
chunk->last_item ++;

return MutBackwardIterator( chunk, next_item );
}
else if ( (uintptr_t)next_item == (uintptr_t)chunk_end )
{
/* here we are the first thread that overflows
* the current chunk, so allocate a new chunk here
*/
chunks.allocate_item();
}
else
{
/* another one, but not the first thread that overflowed
* this chunk. wait for the allocation now.
*/
}
return MutBackwardIterator( chunk, next_item );
}
else
chunk->next_item.fetch_sub(1);
{
/* `chunk` is already full,
* don't even attempt to increment `next_item`
* just wait for the allocation of the new chunk to happen...
*/
}
}
else
{
Expand All @@ -486,10 +587,15 @@ struct ChunkedList
else
pos.item().iter_offset = (pos.get_item_ptr() - 1)->iter_offset + 1;


/* TODO: scan in other direction for deleted items too,
and update their `iter_offset`
*/

/* decrement refcount once so the item will be deconstructed
* eventually, when all iterators drop their references
*/
pos.item().remove();
pos.item().release();

release_chunk( pos.chunk );
}
Expand All @@ -510,6 +616,8 @@ struct ChunkedList
return MutBackwardIterator(
c,
( c != chunks.rend() ) ? c->last_item.load() : nullptr
// TODO: change to this when `last_item` is removed
// ( c != chunks.rend() ) ? min(c->last_item.load()-1, c->first_item+T_chunk_size) : nullptr
);
}

Expand Down

0 comments on commit a25a596

Please sign in to comment.