Skip to content

Commit

Permalink
wip: refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
mhx committed Nov 18, 2023
1 parent e37eeb8 commit e943863
Showing 1 changed file with 42 additions and 111 deletions.
153 changes: 42 additions & 111 deletions playground/block_merge.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <algorithm>
#include <cassert>
#include <condition_variable>
#include <iostream>
#include <mutex>
Expand Down Expand Up @@ -66,88 +67,21 @@ class block_merger {
virtual ~block_merger() = default;

virtual void add(source_type src, block_type blk, bool is_last) = 0;
virtual std::vector<block_type> const& merged() const = 0;
};

#if 0
class simple_block_merger : public block_merger {
public:
static constexpr size_t const kEmpty{std::numeric_limits<size_t>::max()};

simple_block_merger(size_t num_slots, std::vector<size_t> const& sources)
: sources_{sources.begin(), sources.end()}
, active_(num_slots, kEmpty) {
for (size_t i = 0; i < active_.size() && !sources_.empty(); ++i) {
active_[i] = sources_.front();
sources_.pop_front();
// std::cout << "[" << i << "] -> " << active_[i] << "\n";
}
}

void add(block blk, bool is_last) override {
std::unique_lock lock{mx_};

auto it = std::find(begin(active_), end(active_), blk.first);

// std::cout << "size=" << active_.size() << "\n";
// std::cout << "{" << blk.first << "," << blk.second << "}\n";
// for (size_t i = 0; i < active_.size(); ++i) {
// std::cout << "[" << i << "] -> " << active_[i] << "\n";
// }

if (it == end(active_)) {
throw std::runtime_error(
fmt::format("unexpected source {}.{}", blk.first, blk.second));
}

auto ix = std::distance(begin(active_), it);

cv_.wait(lock, [this, ix] { return ix == index_; });

merged_.emplace_back(blk);

if (is_last) {
if (sources_.empty()) {
active_[ix] = kEmpty;
} else {
active_[ix] = sources_.front();
sources_.pop_front();
}
}

for (;;) {
index_ = (index_ + 1) % active_.size();
if (index_ == ix || active_[index_] != kEmpty) {
break;
}
}

cv_.notify_all();
}

std::vector<block> const& merged() const override { return merged_; }

private:
std::mutex mx_;
std::condition_variable cv_;
size_t index_{0};
std::deque<size_t> sources_;
std::vector<size_t> active_;
std::vector<block> merged_;
};
#endif

template <typename SourceT, typename BlockT>
class multi_queue_block_merger : public block_merger<SourceT, BlockT> {
public:
using source_type = SourceT;
using block_type = BlockT;

multi_queue_block_merger(size_t num_slots, size_t max_queued,
std::vector<size_t> const& sources)
: num_queueable_{max_queued}
multi_queue_block_merger(size_t num_active_slots, size_t max_queued_blocks,
std::vector<source_type> const& sources,
std::function<void(block_type)> on_block_merged)
: num_queueable_{max_queued_blocks}
, sources_{sources.begin(), sources.end()}
, active_(num_slots) {
, active_(num_active_slots)
, on_block_merged_{on_block_merged} {
for (size_t i = 0; i < active_.size() && !sources_.empty(); ++i) {
active_[i] = sources_.front();
sources_.pop_front();
Expand All @@ -157,73 +91,66 @@ class multi_queue_block_merger : public block_merger<SourceT, BlockT> {
void add(source_type src, block_type blk, bool is_last) override {
std::unique_lock lock{mx_};

cv_.wait(lock, [this, &src] {
auto ix = active_index_;
size_t distance{0};
while (active_[ix] && active_[ix].value() != src) {
++distance;
ix = (ix + 1) % active_.size();
if (ix == active_index_) {
break;
}
}
return distance < num_queueable_;
});
cv_.wait(lock,
[this, &src] { return source_distance(src) < num_queueable_; });

--num_queueable_;

queues_[src].emplace(blk, is_last);
queues_[src].emplace(std::move(blk), is_last);

while (try_merge_block()) {
}

cv_.notify_all();
}

std::vector<block_type> const& merged() const override { return merged_; }

private:
size_t source_distance(source_type src) const {
auto ix = active_index_;
size_t distance{0};
while (active_[ix] && active_[ix].value() != src) {
++distance;
ix = (ix + 1) % active_.size();
if (ix == active_index_) {
break;
}
}
return distance;
}

bool try_merge_block() {
auto const ix = active_index_;

if (!active_[ix]) {
throw std::runtime_error("active source is empty");
}
assert(active_[ix]);

auto src = active_[ix].value();

auto it = queues_.find(src);

if (it == queues_.end() || it->second.empty()) {
// nothing yet...
return false;
}

auto [blk, is_last] = it->second.front();
auto [blk, is_last] = std::move(it->second.front());
it->second.pop();

++num_queueable_;
on_block_merged_(std::move(blk));

merged_.emplace_back(blk);
++num_queueable_;

if (is_last) {
// gone forever
queues_.erase(it);

if (sources_.empty()) {
active_[ix].reset();
} else {
if (!sources_.empty()) {
active_[ix] = sources_.front();
sources_.pop_front();
} else {
active_[ix].reset();
}
}

for (;;) {
do {
active_index_ = (active_index_ + 1) % active_.size();
if (active_index_ == ix || active_[active_index_]) {
break;
}
}
} while (active_index_ != ix && !active_[active_index_]);

return active_index_ != ix || active_[active_index_];
}
Expand All @@ -232,10 +159,11 @@ class multi_queue_block_merger : public block_merger<SourceT, BlockT> {
std::condition_variable cv_;
size_t active_index_{0};
size_t num_queueable_;
std::unordered_map<size_t, std::queue<std::pair<block_type, bool>>> queues_;
std::unordered_map<source_type, std::queue<std::pair<block_type, bool>>>
queues_;
std::deque<source_type> sources_;
std::vector<std::optional<source_type>> active_;
std::vector<block_type> merged_;
std::function<void(block_type)> on_block_merged_;
};

void emitter(sync_queue<source>& sources, block_merger<size_t, block>& merger) {
Expand Down Expand Up @@ -285,16 +213,19 @@ std::vector<block> do_run(size_t run, std::mt19937& delay_rng) {
std::chrono::nanoseconds total_time{};

for (size_t i = 0; i < num_sources; ++i) {
auto src = source(i, delay_rng, rng, 30, 2000.0 * speed_dist(delay_rng));
auto src = source(i, delay_rng, rng, 30, 5000.0 * speed_dist(delay_rng));
total_blocks += src.num_blocks();
total_time += src.total_time();
source_ids.emplace_back(src.id());
sources.wlock()->emplace(std::move(src));
}

std::vector<block> merged;

// block_merger merger(num_threads, source_ids);
multi_queue_block_merger<size_t, block> merger(num_threads, max_in_flight,
source_ids);
multi_queue_block_merger<size_t, block> merger(
num_threads, max_in_flight, source_ids,
[&merged](block blk) { merged.emplace_back(std::move(blk)); });

std::vector<std::thread> thr;

Expand Down Expand Up @@ -323,7 +254,7 @@ std::vector<block> do_run(size_t run, std::mt19937& delay_rng) {
100.0 * efficiency)
<< "\n";

return merger.merged();
return merged;
}

[[maybe_unused]] void dump(std::vector<block> const& blocks) {
Expand Down

0 comments on commit e943863

Please sign in to comment.