diff --git a/playground/block_merge.cpp b/playground/block_merge.cpp index 6085b6e12..33d101f27 100644 --- a/playground/block_merge.cpp +++ b/playground/block_merge.cpp @@ -57,14 +57,19 @@ class source { std::vector blocks_; }; +template class block_merger { public: + using source_type = SourceT; + using block_type = BlockT; + virtual ~block_merger() = default; - virtual void add(block blk, bool is_last) = 0; - virtual std::vector const& merged() const = 0; + virtual void add(source_type src, block_type blk, bool is_last) = 0; + virtual std::vector const& merged() const = 0; }; +#if 0 class simple_block_merger : public block_merger { public: static constexpr size_t const kEmpty{std::numeric_limits::max()}; @@ -130,122 +135,110 @@ class simple_block_merger : public block_merger { std::vector active_; std::vector merged_; }; +#endif -class multi_queue_block_merger : public block_merger { +template +class multi_queue_block_merger : public block_merger { public: - static constexpr size_t const kEmpty{std::numeric_limits::max()}; + using source_type = SourceT; + using block_type = BlockT; - multi_queue_block_merger(size_t num_slots, size_t max_in_flight, + multi_queue_block_merger(size_t num_slots, size_t max_queued, std::vector const& sources) - : free_{max_in_flight} + : num_queueable_{max_queued} , sources_{sources.begin(), sources.end()} - , active_(num_slots, kEmpty) { + , active_(num_slots) { for (size_t i = 0; i < active_.size() && !sources_.empty(); ++i) { active_[i] = sources_.front(); sources_.pop_front(); - // std::cout << "[" << i << "] -> " << active_[i] << "\n"; } } - void add(block blk, bool is_last) override { + void add(source_type src, block_type blk, bool is_last) override { std::unique_lock lock{mx_}; - cv_.wait(lock, [this, &blk] { - auto ix = index_; - size_t dist{0}; - while (active_[ix] != blk.first) { - ++dist; + cv_.wait(lock, [this, &src] { + auto ix = active_index_; + size_t distance{0}; + while (active_[ix] && active_[ix].value() != src) { + ++distance; ix = (ix + 1) % active_.size(); - if (ix == index_) { + if (ix == active_index_) { break; } } - // std::cout << "free: " << free_ << ", dist: " << dist << "\n"; - return free_ > dist; + return distance < num_queueable_; }); - --free_; + --num_queueable_; - queues_[blk.first].emplace(blk, is_last); + queues_[src].emplace(blk, is_last); - // auto it = std::find(begin(active_), end(active_), blk.first); - - // if (it == end(active_)) { - // throw std::runtime_error( - // fmt::format("unexpected source {}.{}", blk.first, blk.second)); - // } - - // auto ix = std::distance(begin(active_), it); + while (try_merge_block()) { + } - for (;;) { - auto const ix = index_; + cv_.notify_all(); + } - auto src = active_[ix]; + std::vector const& merged() const override { return merged_; } - if (src == kEmpty) { - throw std::runtime_error("active source is empty"); - } + private: + bool try_merge_block() { + auto const ix = active_index_; - auto it = queues_.find(src); + if (!active_[ix]) { + throw std::runtime_error("active source is empty"); + } - if (it == queues_.end()) { - // nothing yet... - break; - } + auto src = active_[ix].value(); - if (it->second.empty()) { - // nothing yet... - break; - } + auto it = queues_.find(src); - auto [q_blk, q_is_last] = it->second.front(); - it->second.pop(); + if (it == queues_.end() || it->second.empty()) { + // nothing yet... + return false; + } - ++free_; + auto [blk, is_last] = it->second.front(); + it->second.pop(); - merged_.emplace_back(q_blk); + ++num_queueable_; - if (q_is_last) { - // gone forever - queues_.erase(it); + merged_.emplace_back(blk); - if (sources_.empty()) { - active_[ix] = kEmpty; - } else { - active_[ix] = sources_.front(); - sources_.pop_front(); - } - } + if (is_last) { + // gone forever + queues_.erase(it); - for (;;) { - index_ = (index_ + 1) % active_.size(); - if (index_ == ix || active_[index_] != kEmpty) { - break; - } + if (sources_.empty()) { + active_[ix].reset(); + } else { + active_[ix] = sources_.front(); + sources_.pop_front(); } + } - if (index_ == ix && active_[index_] == kEmpty) { + for (;;) { + active_index_ = (active_index_ + 1) % active_.size(); + if (active_index_ == ix || active_[active_index_]) { break; } } - cv_.notify_all(); + return active_index_ != ix || active_[active_index_]; } - std::vector const& merged() const override { return merged_; } - - private: std::mutex mx_; std::condition_variable cv_; - size_t index_{0}; - size_t free_; - std::unordered_map>> queues_; - std::deque sources_; - std::vector active_; - std::vector merged_; + size_t active_index_{0}; + size_t num_queueable_; + std::unordered_map>> queues_; + std::deque sources_; + std::vector> active_; + std::vector merged_; }; -void emitter(sync_queue& sources, block_merger& merger) { +void emitter(sync_queue& sources, block_merger& merger) { for (;;) { auto src = sources.withWLock([](auto&& q) { std::optional src; @@ -267,7 +260,7 @@ void emitter(sync_queue& sources, block_merger& merger) { std::this_thread::sleep_for(std::chrono::duration(wait)); - merger.add(blk, is_last); + merger.add(blk.first, blk, is_last); if (is_last) { break; @@ -300,7 +293,8 @@ std::vector do_run(size_t run, std::mt19937& delay_rng) { } // block_merger merger(num_threads, source_ids); - multi_queue_block_merger merger(num_threads, max_in_flight, source_ids); + multi_queue_block_merger merger(num_threads, max_in_flight, + source_ids); std::vector thr;