From e2e3734c72c881a14db389d716394696d115c13b Mon Sep 17 00:00:00 2001 From: Marcus Holland-Moritz Date: Sat, 18 Nov 2023 20:34:21 +0100 Subject: [PATCH] feat: add multi queue block merger + unit tests --- CMakeLists.txt | 4 + include/dwarfs/block_merger.h | 37 +++ include/dwarfs/multi_queue_block_merger.h | 143 ++++++++++++ test/block_merger_test.cpp | 266 ++++++++++++++++++++++ 4 files changed, 450 insertions(+) create mode 100644 include/dwarfs/block_merger.h create mode 100644 include/dwarfs/multi_queue_block_merger.h create mode 100644 test/block_merger_test.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 57e3213e7..bc28ef9cc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -598,6 +598,10 @@ if(WITH_TESTS) target_link_libraries(dwarfs_utils_test gtest gtest_main) list(APPEND TEST_TARGETS dwarfs_utils_test) + add_executable(block_merger_test test/block_merger_test.cpp) + target_link_libraries(block_merger_test gtest gtest_main) + list(APPEND TEST_TARGETS block_merger_test) + add_executable(dwarfs_pcm_sample_transformer_test test/pcm_sample_transformer_test.cpp) target_link_libraries(dwarfs_pcm_sample_transformer_test gtest gtest_main) list(APPEND TEST_TARGETS dwarfs_pcm_sample_transformer_test) diff --git a/include/dwarfs/block_merger.h b/include/dwarfs/block_merger.h new file mode 100644 index 000000000..27c92b0c9 --- /dev/null +++ b/include/dwarfs/block_merger.h @@ -0,0 +1,37 @@ +/* vim:set ts=2 sw=2 sts=2 et: */ +/** + * \author Marcus Holland-Moritz (github@mhxnet.de) + * \copyright Copyright (c) Marcus Holland-Moritz + * + * This file is part of dwarfs. + * + * dwarfs is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * dwarfs is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with dwarfs. If not, see . + */ + +#pragma once + +namespace dwarfs { + +template +class block_merger { + public: + using source_type = SourceT; + using block_type = BlockT; + + virtual ~block_merger() = default; + + virtual void add(source_type src, block_type blk, bool is_last) = 0; +}; + +} // namespace dwarfs diff --git a/include/dwarfs/multi_queue_block_merger.h b/include/dwarfs/multi_queue_block_merger.h new file mode 100644 index 000000000..0f8e7ab65 --- /dev/null +++ b/include/dwarfs/multi_queue_block_merger.h @@ -0,0 +1,143 @@ +/* vim:set ts=2 sw=2 sts=2 et: */ +/** + * \author Marcus Holland-Moritz (github@mhxnet.de) + * \copyright Copyright (c) Marcus Holland-Moritz + * + * This file is part of dwarfs. + * + * dwarfs is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * dwarfs is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with dwarfs. If not, see . + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "dwarfs/block_merger.h" + +namespace dwarfs { + +template +class multi_queue_block_merger : public block_merger { + public: + using source_type = SourceT; + using block_type = BlockT; + + multi_queue_block_merger(size_t num_active_slots, size_t max_queued_blocks, + std::vector const& sources, + std::function on_block_merged) + : num_queueable_{max_queued_blocks} + , sources_{sources.begin(), sources.end()} + , active_(num_active_slots) + , on_block_merged_{on_block_merged} { + for (size_t i = 0; i < active_.size() && !sources_.empty(); ++i) { + active_[i] = sources_.front(); + sources_.pop_front(); + } + } + + void add(source_type src, block_type blk, bool is_last) override { + std::unique_lock lock{mx_}; + + cv_.wait(lock, + [this, &src] { return source_distance(src) < num_queueable_; }); + + --num_queueable_; + + queues_[src].emplace(std::move(blk), is_last); + + while (try_merge_block()) { + } + + cv_.notify_all(); + } + + private: + size_t source_distance(source_type src) const { + auto ix = active_index_; + size_t distance{0}; + + while (active_[ix] && active_[ix].value() != src) { + ++distance; + ix = (ix + 1) % active_.size(); + + if (ix == active_index_) { + auto it = std::find(begin(sources_), end(sources_), src); + distance += std::distance(begin(sources_), it); + break; + } + } + + return distance; + } + + bool try_merge_block() { + auto const ix = active_index_; + + assert(active_[ix]); + + auto src = active_[ix].value(); + auto it = queues_.find(src); + + if (it == queues_.end() || it->second.empty()) { + return false; + } + + auto [blk, is_last] = std::move(it->second.front()); + it->second.pop(); + + on_block_merged_(std::move(blk)); + + ++num_queueable_; + + if (is_last) { + queues_.erase(it); + update_active(ix); + } + + do { + active_index_ = (active_index_ + 1) % active_.size(); + } while (active_index_ != ix && !active_[active_index_]); + + return active_index_ != ix || active_[active_index_]; + } + + void update_active(size_t ix) { + if (!sources_.empty()) { + active_[ix] = sources_.front(); + sources_.pop_front(); + } else { + active_[ix].reset(); + } + } + + std::mutex mx_; + std::condition_variable cv_; + size_t active_index_{0}; + size_t num_queueable_; + std::unordered_map>> + queues_; + std::deque sources_; + std::vector> active_; + std::function on_block_merged_; +}; + +} // namespace dwarfs diff --git a/test/block_merger_test.cpp b/test/block_merger_test.cpp new file mode 100644 index 000000000..357872da9 --- /dev/null +++ b/test/block_merger_test.cpp @@ -0,0 +1,266 @@ +/* vim:set ts=2 sw=2 sts=2 et: */ +/** + * \author Marcus Holland-Moritz (github@mhxnet.de) + * \copyright Copyright (c) Marcus Holland-Moritz + * + * This file is part of dwarfs. + * + * dwarfs is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * dwarfs is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with dwarfs. If not, see . + */ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "dwarfs/multi_queue_block_merger.h" + +using namespace dwarfs; + +namespace { + +constexpr int const debuglevel{0}; + +constexpr size_t const max_runs{250}; +constexpr size_t const num_runner_threads{16}; +constexpr size_t const num_repetitions{4}; + +using block = std::pair; + +// Use std::shared_mutex because folly::SharedMutex might trigger TSAN +template +using synchronized = folly::Synchronized; + +template +using sync_queue = synchronized>; + +class source { + public: + source(size_t id, std::mt19937& delay_rng, std::mt19937& rng, + size_t max_blocks = 20, double ips = 5000.0) + : id_{id} + , blocks_{init_blocks(delay_rng, rng, max_blocks, ips)} {} + + std::tuple next() { + auto idx = idx_++; + return {std::make_pair(id_, idx), idx_ >= blocks_.size(), blocks_[idx]}; + } + + size_t id() const { return id_; } + + size_t num_blocks() const { return blocks_.size(); } + + std::chrono::nanoseconds total_time() const { + auto seconds = std::accumulate(begin(blocks_), end(blocks_), 0.0); + return std::chrono::duration_cast( + std::chrono::duration(seconds)); + } + + private: + static std::vector + init_blocks(std::mt19937& delay_rng, std::mt19937& rng, size_t max_blocks, + double ips) { + std::uniform_int_distribution<> idist(1, max_blocks); + std::exponential_distribution<> edist(ips); + std::vector blocks; + blocks.resize(idist(rng)); + std::generate(begin(blocks), end(blocks), [&] { return edist(delay_rng); }); + return blocks; + } + + size_t idx_{0}; + size_t id_; + std::vector blocks_; +}; + +void emitter(sync_queue& sources, + dwarfs::block_merger& merger) { + for (;;) { + auto src = sources.withWLock([](auto&& q) { + std::optional src; + + if (!q.empty()) { + src = std::move(q.front()); + q.pop(); + } + + return src; + }); + + if (!src) { + break; + } + + for (;;) { + auto [blk, is_last, wait] = src->next(); + + std::this_thread::sleep_for(std::chrono::duration(wait)); + + merger.add(blk.first, blk, is_last); + + if (is_last) { + break; + } + } + } +} + +std::vector +do_run(std::mutex& out_mx, size_t run, std::mt19937& delay_rng) { + std::mt19937 rng(run); + std::exponential_distribution<> sources_dist(0.1); + std::exponential_distribution<> threads_dist(0.1); + std::exponential_distribution<> slots_dist(0.1); + std::exponential_distribution<> inflight_dist(0.1); + std::uniform_real_distribution<> speed_dist(0.1, 10.0); + auto const num_sources{std::max(1, sources_dist(rng))}; + auto const num_slots{std::max(1, slots_dist(rng))}; + auto const num_threads{std::max(num_slots, threads_dist(delay_rng))}; + auto const max_in_flight{std::max(1, inflight_dist(delay_rng))}; + + std::vector source_ids; + sync_queue sources; + std::chrono::nanoseconds total_time{}; + + for (size_t i = 0; i < num_sources; ++i) { + auto src = source(i, delay_rng, rng, 30, 10000.0 * speed_dist(delay_rng)); + total_time += src.total_time(); + source_ids.emplace_back(src.id()); + sources.wlock()->emplace(std::move(src)); + } + + auto config = + fmt::format("sources: {}, slots: {}, threads: {}, max in flight: {}", + num_sources, num_slots, num_threads, max_in_flight); + + if constexpr (debuglevel > 0) { + std::lock_guard lock{out_mx}; + std::cout << config << "\n"; + } + + std::vector merged; + + dwarfs::multi_queue_block_merger merger( + num_slots, max_in_flight, source_ids, + [&merged](block blk) { merged.emplace_back(std::move(blk)); }); + + std::vector thr; + + auto t0 = std::chrono::steady_clock::now(); + + for (size_t i = 0; i < num_threads; ++i) { + thr.emplace_back([&] { emitter(sources, merger); }); + } + + for (auto& t : thr) { + t.join(); + } + + auto t1 = std::chrono::steady_clock::now(); + + auto elapsed = num_threads * (t1 - t0); + auto efficiency = + std::chrono::duration_cast>(total_time) + .count() / + std::chrono::duration_cast>(elapsed) + .count(); + + if constexpr (debuglevel > 0) { + std::lock_guard lock{out_mx}; + std::cout << config + << fmt::format(" => efficiency: {:.2f}%\n", 100.0 * efficiency); + } + + return merged; +} + +[[maybe_unused]] void +dump(std::mutex& out_mx, std::vector const& blocks) { + if constexpr (debuglevel > 1) { + std::lock_guard lock{out_mx}; + for (size_t i = 0; i < blocks.size(); ++i) { + if (i > 0) { + std::cout << ", "; + } + auto const& b = blocks[i]; + std::cout << b.first << "." << b.second; + } + std::cout << "\n"; + } +} + +void runner_thread(size_t tid, std::mutex& out_mx, std::atomic& runs, + size_t const max_runs, std::atomic& passes, + synchronized>& fails) { + std::mt19937 delay_rng(tid); + + for (;;) { + auto run = runs++; + if (run >= max_runs) { + break; + } + if constexpr (debuglevel > 0) { + std::lock_guard lock{out_mx}; + std::cout << "[" << run << "/" << tid << "] ref\n"; + } + auto ref = do_run(out_mx, run, delay_rng); + dump(out_mx, ref); + for (size_t rep = 0; rep < num_repetitions; ++rep) { + if constexpr (debuglevel > 0) { + std::lock_guard lock{out_mx}; + std::cout << "[" << run << "/" << tid << "] test\n"; + } + auto test = do_run(out_mx, run, delay_rng); + dump(out_mx, test); + if (test == ref) { + ++passes; + } else { + fails.wlock()->emplace_back(run); + } + } + } +} + +} // namespace + +TEST(block_merger, random) { + std::mutex out_mx; + std::atomic runs{0}; + std::atomic passes{0}; + synchronized> fails; + + std::vector thr; + + for (size_t i = 0; i < num_runner_threads; ++i) { + thr.emplace_back( + [&, i] { runner_thread(i, out_mx, runs, max_runs, passes, fails); }); + } + + for (auto& t : thr) { + t.join(); + } + + EXPECT_EQ(max_runs * num_repetitions, passes); + EXPECT_TRUE(fails.rlock()->empty()) << folly::join(", ", *fails.rlock()); +}