Skip to content

Commit

Permalink
Merge pull request #21469 from vespa-engine/toregge/compact-indirect-…
Browse files Browse the repository at this point in the history
…keys-and-values-in-btree-stress-test

Compact indirect keys and values in btree stress test.
  • Loading branch information
geirst authored Mar 1, 2022
2 parents 4b75187 + 5080840 commit e87944d
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 44 deletions.
2 changes: 1 addition & 1 deletion vespalib/src/tests/btree/btree-stress/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ vespa_add_executable(vespalib_btree_stress_test_app
vespalib
GTest::GTest
)
vespa_add_test(NAME vespalib_btree_stress_test_app COMMAND vespalib_btree_stress_test_app BENCHMARK)
vespa_add_test(NAME vespalib_btree_stress_test_app COMMAND vespalib_btree_stress_test_app --smoke-test)
203 changes: 167 additions & 36 deletions vespalib/src/tests/btree/btree-stress/btree_stress_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,34 +21,55 @@
#include <vespa/vespalib/btree/btree.hpp>
#include <vespa/vespalib/btree/btreestore.hpp>
#include <vespa/vespalib/btree/btreeaggregator.hpp>
#include <vespa/vespalib/datastore/atomic_entry_ref.h>
#include <vespa/vespalib/datastore/buffer_type.hpp>
#include <vespa/vespalib/datastore/compaction_spec.h>
#include <vespa/vespalib/datastore/compaction_strategy.h>
#include <vespa/vespalib/datastore/entry_ref_filter.h>

#include <vespa/log/log.h>
LOG_SETUP("btree_stress_test");

using GenerationHandler = vespalib::GenerationHandler;
using RefType = vespalib::datastore::EntryRefT<22>;
using vespalib::btree::NoAggregated;
using vespalib::datastore::AtomicEntryRef;
using vespalib::datastore::CompactionSpec;
using vespalib::datastore::CompactionStrategy;
using vespalib::datastore::EntryRef;
using vespalib::datastore::EntryRefFilter;
using vespalib::makeLambdaTask;
using generation_t = GenerationHandler::generation_t;

namespace {

constexpr uint32_t value_offset = 1000000000;

bool smoke_test = false;
const vespalib::string smoke_test_option = "--smoke-test";

class RealIntStore {
vespalib::datastore::DataStore<uint32_t> _store;
using StoreType = vespalib::datastore::DataStore<uint32_t>;
using StoreRefType = StoreType::RefType;
StoreType _store;
public:
RealIntStore();
~RealIntStore();
EntryRef add(uint32_t value) { return _store.addEntry(value); }
void hold(EntryRef ref) { _store.holdElem(ref, 1); }
AtomicEntryRef add_relaxed(uint32_t value) { return AtomicEntryRef(add(value)); }
void hold(const AtomicEntryRef& ref) { _store.holdElem(ref.load_relaxed(), 1); }
EntryRef move(EntryRef ref);
void transfer_hold_lists(generation_t gen) { _store.transferHoldLists(gen); }
void trim_hold_lists(generation_t gen) { _store.trimHoldLists(gen); }

uint32_t get(EntryRef ref) const { return _store.getEntry(ref); }
uint32_t get_acquire(const AtomicEntryRef& ref) const { return get(ref.load_acquire()); }
uint32_t get_relaxed(const AtomicEntryRef& ref) const { return get(ref.load_relaxed()); }
std::vector<uint32_t> start_compact();
void finish_compact(std::vector<uint32_t> to_hold);
static constexpr bool is_indirect = true;
static uint32_t get_offset_bits() { return StoreRefType::offset_bits; }
static uint32_t get_num_buffers() { return StoreRefType::numBuffers(); }
bool has_held_buffers() const noexcept { return _store.has_held_buffers(); }
};

RealIntStore::RealIntStore()
Expand All @@ -58,6 +79,27 @@ RealIntStore::RealIntStore()

RealIntStore::~RealIntStore() = default;

std::vector<uint32_t>
RealIntStore::start_compact()
{
// Use a compaction strategy that will compact all active buffers
CompactionStrategy compaction_strategy(0.0, 0.0, get_num_buffers(), 1.0);
CompactionSpec compaction_spec(true, false);
return _store.startCompactWorstBuffers(compaction_spec, compaction_strategy);
}

void
RealIntStore::finish_compact(std::vector<uint32_t> to_hold)
{
_store.finishCompact(to_hold);
}

EntryRef
RealIntStore::move(EntryRef ref)
{
return add(get(ref));
}

class RealIntStoreCompare
{
const RealIntStore& _store;
Expand All @@ -71,10 +113,10 @@ class RealIntStoreCompare
uint32_t get(EntryRef ref) const {
return (ref.valid() ? _store.get(ref) : _lookup_key);
}
bool operator()(EntryRef lhs, EntryRef rhs) const {
return get(lhs) < get(rhs);
bool operator()(const AtomicEntryRef& lhs, const AtomicEntryRef& rhs) const {
return get(lhs.load_acquire()) < get(rhs.load_acquire());
}
static EntryRef lookup_key() noexcept { return EntryRef(); }
static AtomicEntryRef lookup_key() noexcept { return AtomicEntryRef(); }
const RealIntStoreCompare& get_compare() const noexcept { return *this; }
};

Expand All @@ -83,10 +125,14 @@ class NoIntStore {
NoIntStore() = default;
~NoIntStore() = default;
static uint32_t add(uint32_t value) noexcept { return value; }
static uint32_t add_relaxed(uint32_t value) noexcept { return value; }
static void hold(uint32_t) noexcept { }
static void transfer_hold_lists(generation_t) noexcept { }
static void trim_hold_lists(generation_t) noexcept { }
static uint32_t get(uint32_t value) noexcept { return value; }
static uint32_t get_acquire(uint32_t value) noexcept { return value; }
static uint32_t get_relaxed(uint32_t value) noexcept { return value; }
static constexpr bool is_indirect = false;
};

class NoIntStoreCompare
Expand All @@ -109,7 +155,7 @@ class NoIntStoreCompare
struct IndirectKeyValueParams {
using IntStore = RealIntStore;
using MyCompare = RealIntStoreCompare;
using MyTree = vespalib::btree::BTree<EntryRef, EntryRef, NoAggregated, RealIntStoreCompare>;
using MyTree = vespalib::btree::BTree<AtomicEntryRef, AtomicEntryRef, NoAggregated, RealIntStoreCompare>;
};

struct DirectKeyValueParams {
Expand All @@ -118,6 +164,29 @@ struct DirectKeyValueParams {
using MyTree = vespalib::btree::BTree<uint32_t, uint32_t>;
};

template <uint32_t divisor, uint32_t remainder>
class ConsiderCompact {
uint32_t _count;
bool _want_compact;
public:
ConsiderCompact()
: _count(0u),
_want_compact(false)
{
}
bool consider(uint32_t idx) {
if ((idx % divisor) == remainder) {
_want_compact = true;
}
return _want_compact;
}
void track_compacted() {
++_count;
_want_compact = false;
}
uint32_t get_count() const noexcept { return _count; }
};

template <typename Params>
class Fixture : public testing::Test
{
Expand All @@ -141,9 +210,9 @@ class Fixture : public testing::Test
std::atomic<long> _doneReadWork;
std::atomic<bool> _stopRead;
bool _reportWork;
bool _want_compact_tree;
uint32_t _consider_compact_tree_checks;
uint32_t _compact_tree_count;
ConsiderCompact<1000, 0> _compact_tree;
ConsiderCompact<1000, 300> _compact_keys;
ConsiderCompact<1000, 600> _compact_values;

Fixture();
~Fixture() override;
Expand All @@ -152,7 +221,9 @@ class Fixture : public testing::Test
void insert(uint32_t key);
void remove(uint32_t key);
void compact_tree();
void consider_compact_tree();
void compact_keys();
void compact_values();
void consider_compact(uint32_t idx);

void readWork(uint32_t cnt);
void readWork();
Expand Down Expand Up @@ -180,9 +251,9 @@ Fixture<Params>::Fixture()
_doneReadWork(0),
_stopRead(false),
_reportWork(false),
_want_compact_tree(false),
_consider_compact_tree_checks(0u),
_compact_tree_count(0u)
_compact_tree(),
_compact_keys(),
_compact_values()
{
_rnd.srand48(32);
}
Expand Down Expand Up @@ -226,23 +297,23 @@ bool
Fixture<Params>::adjustWriteIterator(uint32_t key)
{
MyCompare compare(_keys, key);
if (_writeItr.valid() && _keys.get(_writeItr.getKey()) < key) {
if (_writeItr.valid() && _keys.get_relaxed(_writeItr.getKey()) < key) {
_writeItr.binarySeek(compare.lookup_key(), compare.get_compare());
} else {
_writeItr.lower_bound(compare.lookup_key(), compare.get_compare());
}
assert(!_writeItr.valid() || _keys.get(_writeItr.getKey()) >= key);
return (_writeItr.valid() && _keys.get(_writeItr.getKey()) == key);
assert(!_writeItr.valid() || _keys.get_relaxed(_writeItr.getKey()) >= key);
return (_writeItr.valid() && _keys.get_relaxed(_writeItr.getKey()) == key);
}

template <typename Params>
void
Fixture<Params>::insert(uint32_t key)
{
if (!adjustWriteIterator(key)) {
_tree.insert(_writeItr, _keys.add(key), _values.add(key + value_offset));
_tree.insert(_writeItr, _keys.add_relaxed(key), _values.add_relaxed(key + value_offset));
} else {
EXPECT_EQ(key + value_offset, _values.get(_writeItr.getData()));
EXPECT_EQ(key + value_offset, _values.get_relaxed(_writeItr.getData()));
}
}

Expand All @@ -251,7 +322,7 @@ void
Fixture<Params>::remove(uint32_t key)
{
if (adjustWriteIterator(key)) {
EXPECT_EQ(key + value_offset, _values.get(_writeItr.getData()));
EXPECT_EQ(key + value_offset, _values.get_relaxed(_writeItr.getData()));
_keys.hold(_writeItr.getKey());
_values.hold(_writeItr.getData());
_tree.remove(_writeItr);
Expand All @@ -266,20 +337,69 @@ Fixture<Params>::compact_tree()
CompactionStrategy compaction_strategy(0.0, 0.0, RefType::numBuffers(), 1.0);
_tree.compact_worst(compaction_strategy);
_writeItr = _tree.begin();
_compact_tree.track_compacted();
}

template <typename Params>
void
Fixture<Params>::compact_keys()
{
if constexpr (_keys.is_indirect) {
auto to_hold = _keys.start_compact();
EntryRefFilter filter(_keys.get_num_buffers(), _keys.get_offset_bits());
filter.add_buffers(to_hold);
auto itr = _tree.begin();
while (itr.valid()) {
auto old_ref = itr.getKey().load_relaxed();
if (filter.has(old_ref)) {
auto new_ref = _keys.move(old_ref);
itr.writeKey(AtomicEntryRef(new_ref));
}
++itr;
}
_keys.finish_compact(std::move(to_hold));
}
_compact_keys.track_compacted();
}

template <typename Params>
void
Fixture<Params>::consider_compact_tree()
Fixture<Params>::compact_values()
{
if ((_consider_compact_tree_checks % 1000) == 0) {
_want_compact_tree = true;
if constexpr (_values.is_indirect) {
auto to_hold = _values.start_compact();
EntryRefFilter filter(_values.get_num_buffers(), _values.get_offset_bits());
filter.add_buffers(to_hold);
auto itr = _tree.begin();
while (itr.valid()) {
auto old_ref = itr.getData().load_relaxed();
if (filter.has(old_ref)) {
auto new_ref = _values.move(old_ref);
itr.getWData().store_release(new_ref);
}
++itr;
}
_values.finish_compact(std::move(to_hold));
}
++_consider_compact_tree_checks;
if (_want_compact_tree && !_tree.getAllocator().getNodeStore().has_held_buffers()) {
_compact_values.track_compacted();
}

template <typename Params>
void
Fixture<Params>::consider_compact(uint32_t idx)
{
if (_compact_tree.consider(idx) && !_tree.getAllocator().getNodeStore().has_held_buffers()) {
compact_tree();
_want_compact_tree = false;
++_compact_tree_count;
}
if constexpr (_keys.is_indirect) {
if (_compact_keys.consider(idx) && !_keys.has_held_buffers()) {
compact_keys();
}
}
if constexpr (_values.is_indirect) {
if (_compact_values.consider(idx) && !_values.has_held_buffers()) {
compact_values();
}
}
}

Expand All @@ -296,9 +416,9 @@ Fixture<Params>::readWork(uint32_t cnt)
uint32_t key = rnd.lrand48() % (_keyLimit + 1);
MyCompare compare(_keys, key);
MyTreeConstIterator itr = _tree.getFrozenView().lowerBound(compare.lookup_key(), compare.get_compare());
assert(!itr.valid() || _keys.get(itr.getKey()) >= key);
if (itr.valid() && _keys.get(itr.getKey()) == key) {
EXPECT_EQ(key + value_offset, _values.get(itr.getData()));
assert(!itr.valid() || _keys.get_acquire(itr.getKey()) >= key);
if (itr.valid() && _keys.get_acquire(itr.getKey()) == key) {
EXPECT_EQ(key + value_offset, _values.get_acquire(itr.getData()));
++hits;
}
}
Expand All @@ -319,7 +439,7 @@ Fixture<Params>::writeWork(uint32_t cnt)
{
vespalib::Rand48 &rnd(_rnd);
for (uint32_t i = 0; i < cnt; ++i) {
consider_compact_tree();
consider_compact(i);
uint32_t key = rnd.lrand48() % _keyLimit;
if ((rnd.lrand48() & 1) == 0) {
insert(key);
Expand All @@ -330,7 +450,10 @@ Fixture<Params>::writeWork(uint32_t cnt)
}
_doneWriteWork += cnt;
_stopRead = true;
LOG(info, "done %u write work, %u compact tree", cnt, _compact_tree_count);
LOG(info, "done %u write work, %u compact tree, %u compact keys, %u compact values", cnt,
_compact_tree.get_count(),
_compact_keys.get_count(),
_compact_values.get_count());
}

template <typename Params>
Expand All @@ -348,7 +471,7 @@ Fixture<Params>::basic_lower_bound()
MyCompare compare(_keys, 3);
auto itr = _tree.getFrozenView().lowerBound(compare.lookup_key(), compare.get_compare());
ASSERT_TRUE(itr.valid());
EXPECT_EQ(4u, _keys.get(itr.getKey()));
EXPECT_EQ(4u, _keys.get_acquire(itr.getKey()));
}

template <typename Params>
Expand All @@ -365,7 +488,7 @@ template <typename Params>
void
Fixture<Params>::single_lower_bound_reader_during_updates()
{
uint32_t cnt = 1000000;
uint32_t cnt = smoke_test ? 10000 : 1000000;
_reportWork = true;
_writer.execute(makeLambdaTask([this, cnt]() { writeWork(cnt); }));
_readers.execute(makeLambdaTask([this]() { readWork(); }));
Expand All @@ -377,7 +500,7 @@ template <typename Params>
void
Fixture<Params>::multiple_lower_bound_readers_during_updates()
{
uint32_t cnt = 1000000;
uint32_t cnt = smoke_test ? 10000 : 1000000;
_reportWork = true;
_writer.execute(makeLambdaTask([this, cnt]() { writeWork(cnt); }));
_readers.execute(makeLambdaTask([this]() { readWork(); }));
Expand Down Expand Up @@ -423,4 +546,12 @@ TYPED_TEST(BTreeStressTest, multiple_lower_bound_readers_during_updates)

#pragma GCC diagnostic pop

GTEST_MAIN_RUN_ALL_TESTS()
int main(int argc, char **argv) {
if (argc > 1 && argv[1] == smoke_test_option) {
smoke_test = true;
++argv;
--argc;
}
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
Loading

0 comments on commit e87944d

Please sign in to comment.