Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move output IO throttler to IO queue level #2332

Draft
wants to merge 12 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion apps/io_tester/ioinfo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ int main(int ac, char** av) {
}
out << YAML::EndMap;

const auto& fg = internal::get_fair_group(ioq, internal::io_direction_and_length::write_idx);
const auto& fg = internal::io_throttle(ioq, internal::io_direction_and_length::write_idx);
out << YAML::Key << "per_tick_grab_threshold" << YAML::Value << fg.per_tick_grab_threshold();

const auto& tb = fg.token_bucket();
Expand Down
12 changes: 6 additions & 6 deletions include/seastar/core/fair_queue.hh
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public:
/// dispatch them efficiently. The inability can be of two kinds -- either disk cannot
/// cope with the number of arriving requests, or the total size of the data withing
/// the given time frame exceeds the disk throughput.
class fair_group {
class shared_throttle {
public:
using capacity_t = fair_queue_entry::capacity_t;
using clock_type = std::chrono::steady_clock;
Expand Down Expand Up @@ -247,8 +247,8 @@ public:
std::chrono::duration<double> rate_limit_duration = std::chrono::milliseconds(1);
};

explicit fair_group(config cfg, unsigned nr_queues);
fair_group(fair_group&&) = delete;
explicit shared_throttle(config cfg, unsigned nr_queues);
shared_throttle(shared_throttle&&) = delete;

capacity_t maximum_capacity() const noexcept { return _token_bucket.limit(); }
capacity_t per_tick_grab_threshold() const noexcept { return _per_tick_threshold; }
Expand Down Expand Up @@ -299,7 +299,7 @@ public:

using class_id = unsigned int;
class priority_class_data;
using capacity_t = fair_group::capacity_t;
using capacity_t = shared_throttle::capacity_t;
using signed_capacity_t = std::make_signed_t<capacity_t>;

private:
Expand All @@ -322,7 +322,7 @@ private:
};

config _config;
fair_group& _group;
shared_throttle& _group;
clock_type::time_point _group_replenish;
fair_queue_ticket _resources_executing;
fair_queue_ticket _resources_queued;
Expand Down Expand Up @@ -364,7 +364,7 @@ public:
/// Constructs a fair queue with configuration parameters \c cfg.
///
/// \param cfg an instance of the class \ref config
explicit fair_queue(fair_group& shared, config cfg);
explicit fair_queue(shared_throttle& shared, config cfg);
fair_queue(fair_queue&&) = delete;
~fair_queue();

Expand Down
10 changes: 5 additions & 5 deletions include/seastar/core/io_queue.hh
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ namespace seastar {

class io_queue;
namespace internal {
const fair_group& get_fair_group(const io_queue& ioq, unsigned stream);
const shared_throttle& io_throttle(const io_queue& ioq, unsigned stream);
}

#if SEASTAR_API_LEVEL < 7
Expand Down Expand Up @@ -97,7 +97,7 @@ private:
internal::io_sink& _sink;

friend struct ::io_queue_for_tests;
friend const fair_group& internal::get_fair_group(const io_queue& ioq, unsigned stream);
friend const shared_throttle& internal::io_throttle(const io_queue& ioq, unsigned stream);

priority_class_data& find_or_create_class(internal::priority_class pc);
future<size_t> queue_request(internal::priority_class pc, internal::io_direction_and_length dnl, internal::io_request req, io_intent* intent, iovec_keeper iovs) noexcept;
Expand Down Expand Up @@ -218,16 +218,16 @@ public:
private:
friend class io_queue;
friend struct ::io_queue_for_tests;
friend const fair_group& internal::get_fair_group(const io_queue& ioq, unsigned stream);
friend const shared_throttle& internal::io_throttle(const io_queue& ioq, unsigned stream);

const io_queue::config _config;
size_t _max_request_length[2];
boost::container::static_vector<fair_group, 2> _fgs;
boost::container::static_vector<shared_throttle, 2> _fgs;
std::vector<std::unique_ptr<priority_class_data>> _priority_classes;
util::spinlock _lock;
const shard_id _allocated_on;

static fair_group::config make_fair_group_config(const io_queue::config& qcfg) noexcept;
static shared_throttle::config make_throttle_config(const io_queue::config& qcfg) noexcept;
priority_class_data& find_or_create_class(internal::priority_class pc);
};

Expand Down
18 changes: 9 additions & 9 deletions src/core/fair_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ fair_queue_ticket wrapping_difference(const fair_queue_ticket& a, const fair_que
std::max<int32_t>(a._size - b._size, 0));
}

fair_group::fair_group(config cfg, unsigned nr_queues)
shared_throttle::shared_throttle(config cfg, unsigned nr_queues)
: _token_bucket(fixed_point_factor,
std::max<capacity_t>(fixed_point_factor * token_bucket_t::rate_cast(cfg.rate_limit_duration).count(), tokens_capacity(cfg.limit_min_tokens)),
tokens_capacity(cfg.min_tokens)
Expand All @@ -114,16 +114,16 @@ fair_group::fair_group(config cfg, unsigned nr_queues)
}
}

auto fair_group::grab_capacity(capacity_t cap) noexcept -> capacity_t {
auto shared_throttle::grab_capacity(capacity_t cap) noexcept -> capacity_t {
assert(cap <= _token_bucket.limit());
return _token_bucket.grab(cap);
}

void fair_group::replenish_capacity(clock_type::time_point now) noexcept {
void shared_throttle::replenish_capacity(clock_type::time_point now) noexcept {
_token_bucket.replenish(now);
}

void fair_group::maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept {
void shared_throttle::maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept {
auto now = clock_type::now();
auto extra = _token_bucket.accumulated_in(now - local_ts);

Expand All @@ -133,7 +133,7 @@ void fair_group::maybe_replenish_capacity(clock_type::time_point& local_ts) noex
}
}

auto fair_group::capacity_deficiency(capacity_t from) const noexcept -> capacity_t {
auto shared_throttle::capacity_deficiency(capacity_t from) const noexcept -> capacity_t {
return _token_bucket.deficiency(from);
}

Expand Down Expand Up @@ -161,7 +161,7 @@ bool fair_queue::class_compare::operator() (const priority_class_ptr& lhs, const
return lhs->_accumulated > rhs->_accumulated;
}

fair_queue::fair_queue(fair_group& group, config cfg)
fair_queue::fair_queue(shared_throttle& group, config cfg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: in fair_queue.hh the names of parameters of the constructor do not match the ones that are used here. In the header there is fair_queue(shared_throttle& shared, config cfg).

Do we want to keep both places consistent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will need to update header name too

: _config(std::move(cfg))
, _group(group)
, _group_replenish(clock_type::now())
Expand All @@ -187,7 +187,7 @@ void fair_queue::push_priority_class_from_idle(priority_class_data& pc) noexcept
// duration. For this estimate how many capacity units can be
// accumulated with the current class shares per rate resulution
// and scale it up to tau.
capacity_t max_deviation = fair_group::fixed_point_factor / pc._shares * fair_group::token_bucket_t::rate_cast(_config.tau).count();
capacity_t max_deviation = shared_throttle::fixed_point_factor / pc._shares * shared_throttle::token_bucket_t::rate_cast(_config.tau).count();
// On start this deviation can go to negative values, so not to
// introduce extra if's for that short corner case, use signed
// arithmetics and make sure the _accumulated value doesn't grow
Expand Down Expand Up @@ -397,10 +397,10 @@ std::vector<seastar::metrics::impl::metric_definition_impl> fair_queue::metrics(
priority_class_data& pc = *_priority_classes[c];
return std::vector<sm::impl::metric_definition_impl>({
sm::make_counter("consumption",
[&pc] { return fair_group::capacity_tokens(pc._pure_accumulated); },
[&pc] { return shared_throttle::capacity_tokens(pc._pure_accumulated); },
sm::description("Accumulated disk capacity units consumed by this class; an increment per-second rate indicates full utilization")),
sm::make_counter("adjusted_consumption",
[&pc] { return fair_group::capacity_tokens(pc._accumulated); },
[&pc] { return shared_throttle::capacity_tokens(pc._accumulated); },
sm::description("Consumed disk capacity units adjusted for class shares and idling preemption")),
});
}
Expand Down
8 changes: 4 additions & 4 deletions src/core/io_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ sstring io_request::opname() const {
std::abort();
}

const fair_group& get_fair_group(const io_queue& ioq, unsigned stream) {
const shared_throttle& io_throttle(const io_queue& ioq, unsigned stream) {
return ioq._group->_fgs[stream];
}

Expand Down Expand Up @@ -588,8 +588,8 @@ io_queue::io_queue(io_group_ptr group, internal::io_sink& sink)
});
}

fair_group::config io_group::make_fair_group_config(const io_queue::config& qcfg) noexcept {
fair_group::config cfg;
shared_throttle::config io_group::make_throttle_config(const io_queue::config& qcfg) noexcept {
shared_throttle::config cfg;
cfg.label = fmt::format("io-queue-{}", qcfg.devid);
double min_weight = std::min(io_queue::read_request_base_count, qcfg.disk_req_write_to_read_multiplier);
double min_size = std::min(io_queue::read_request_base_count, qcfg.disk_blocks_write_to_read_multiplier);
Expand All @@ -609,7 +609,7 @@ io_group::io_group(io_queue::config io_cfg, unsigned nr_queues)
: _config(std::move(io_cfg))
, _allocated_on(this_shard_id())
{
auto fg_cfg = make_fair_group_config(_config);
auto fg_cfg = make_throttle_config(_config);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: maybe we could also adjust the variable name to throttle_cfg?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point

_fgs.emplace_back(fg_cfg, nr_queues);
if (_config.duplex) {
_fgs.emplace_back(fg_cfg, nr_queues);
Expand Down
14 changes: 7 additions & 7 deletions tests/perf/fair_queue_perf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,19 @@
static constexpr fair_queue::class_id cid = 0;

struct local_fq_and_class {
seastar::fair_group fg;
seastar::shared_throttle fg;
xemul marked this conversation as resolved.
Show resolved Hide resolved
seastar::fair_queue fq;
seastar::fair_queue sfq;
unsigned executed = 0;

static fair_group::config fg_config() {
fair_group::config cfg;
static shared_throttle::config fg_config() {
xemul marked this conversation as resolved.
Show resolved Hide resolved
shared_throttle::config cfg;
return cfg;
}

seastar::fair_queue& queue(bool local) noexcept { return local ? fq : sfq; }

local_fq_and_class(seastar::fair_group& sfg)
local_fq_and_class(seastar::shared_throttle& sfg)
: fg(fg_config(), 1)
, fq(fg, seastar::fair_queue::config())
, sfq(sfg, seastar::fair_queue::config())
Expand Down Expand Up @@ -75,10 +75,10 @@ struct perf_fair_queue {

seastar::sharded<local_fq_and_class> local_fq;

seastar::fair_group shared_fg;
seastar::shared_throttle shared_fg;

static fair_group::config fg_config() {
fair_group::config cfg;
static shared_throttle::config fg_config() {
shared_throttle::config cfg;
return cfg;
}
xemul marked this conversation as resolved.
Show resolved Hide resolved

Expand Down
43 changes: 26 additions & 17 deletions tests/unit/fair_queue_test.cc
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pulled the change to my local repository and fair_queue_test.cc fails.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ACK, fails for me too

Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ struct request {
};

class test_env {
fair_group _fg;
shared_throttle _fg;
xemul marked this conversation as resolved.
Show resolved Hide resolved
fair_queue _fq;
std::vector<int> _results;
std::vector<std::vector<std::exception_ptr>> _exceptions;
fair_queue::class_id _nr_classes = 0;
std::vector<request> _inflight;

static fair_group::config fg_config(unsigned cap) {
fair_group::config cfg;
static shared_throttle::config fg_config(unsigned cap) {
xemul marked this conversation as resolved.
Show resolved Hide resolved
shared_throttle::config cfg;
cfg.rate_limit_duration = std::chrono::microseconds(cap);
return cfg;
}
Expand All @@ -75,7 +75,16 @@ class test_env {
}

void drain() {
do {} while (tick() != 0);
while (true) {
if (tick()) {
continue;
}
auto ts = _fq.next_pending_aio();
if (ts == std::chrono::steady_clock::time_point::max()) {
break;
}
sleep(ts - std::chrono::steady_clock::now()).get();
}
}
public:
test_env(unsigned capacity)
Expand All @@ -90,27 +99,27 @@ class test_env {
// Because of this property, one useful use of tick() is to implement a drain()
// method (see above) in which all requests currently sent to the queue are drained
// before the queue is destroyed.
unsigned tick(unsigned n = 1) {
unsigned tick(unsigned n = 0) {
unsigned processed = 0;
_fg.replenish_capacity(_fg.replenished_ts() + std::chrono::microseconds(1));
_fq.dispatch_requests([] (fair_queue_entry& ent) {
boost::intrusive::get_parent_from_member(&ent, &request::fqent)->submit();
});
while (true) {
_fg.replenish_capacity(_fg.replenished_ts() + std::chrono::microseconds(1));
_fq.dispatch_requests([] (fair_queue_entry& ent) {
boost::intrusive::get_parent_from_member(&ent, &request::fqent)->submit();
});

for (unsigned i = 0; i < n; ++i) {
std::vector<request> curr;
curr.swap(_inflight);

for (auto& req : curr) {
processed++;
_results[req.index]++;
if (processed < n) {
_results[req.index]++;
}
_fq.notify_request_finished(req.fqent.capacity());
processed++;
}
if (processed >= n) {
break;
}

_fg.replenish_capacity(_fg.replenished_ts() + std::chrono::microseconds(1));
_fq.dispatch_requests([] (fair_queue_entry& ent) {
boost::intrusive::get_parent_from_member(&ent, &request::fqent)->submit();
});
}
return processed;
}
Expand Down