From 10afbbc5ee40263b258b7cf3f0e5abb436f79e89 Mon Sep 17 00:00:00 2001 From: Dominik Charousset Date: Sat, 14 Sep 2024 17:57:12 +0200 Subject: [PATCH] Backport on_backpressure_buffer --- .../flow/backpressure_overflow_strategy.hpp | 44 ++++ libcaf_core/caf/flow/observable.hpp | 22 +- libcaf_core/caf/flow/observable_decl.hpp | 7 + .../caf/flow/op/on_backpressure_buffer.hpp | 222 ++++++++++++++++++ libcaf_core/caf/sec.hpp | 2 + 5 files changed, 294 insertions(+), 3 deletions(-) create mode 100644 libcaf_core/caf/flow/backpressure_overflow_strategy.hpp create mode 100644 libcaf_core/caf/flow/op/on_backpressure_buffer.hpp diff --git a/libcaf_core/caf/flow/backpressure_overflow_strategy.hpp b/libcaf_core/caf/flow/backpressure_overflow_strategy.hpp new file mode 100644 index 0000000000..d95cf8c825 --- /dev/null +++ b/libcaf_core/caf/flow/backpressure_overflow_strategy.hpp @@ -0,0 +1,44 @@ +// This file is part of CAF, the C++ Actor Framework. See the file LICENSE in +// the main distribution directory for license terms and copyright or visit +// https://github.com/actor-framework/actor-framework/blob/main/LICENSE. + +#pragma once + +#include "caf/default_enum_inspect.hpp" +#include "caf/detail/core_export.hpp" + +#include +#include +#include + +namespace caf::flow { + +/// Selects a strategy for handling backpressure. +enum class backpressure_overflow_strategy { + /// Drops the newest item when the buffer is full. + drop_newest, + /// Drops the oldest item when the buffer is full. + drop_oldest, + /// Raises an error when the buffer is full. + fail +}; + +/// @relates backpressure_overflow_strategy +CAF_CORE_EXPORT std::string to_string(backpressure_overflow_strategy); + +/// @relates backpressure_overflow_strategy +CAF_CORE_EXPORT bool from_string(std::string_view, + backpressure_overflow_strategy&); + +/// @relates backpressure_overflow_strategy +CAF_CORE_EXPORT bool +from_integer(std::underlying_type_t, + backpressure_overflow_strategy&); + +/// @relates backpressure_overflow_strategy +template +bool inspect(Inspector& f, backpressure_overflow_strategy& x) { + return default_enum_inspect(f, x); +} + +} // namespace caf::flow diff --git a/libcaf_core/caf/flow/observable.hpp b/libcaf_core/caf/flow/observable.hpp index 71bdb86c24..0f1d140c08 100644 --- a/libcaf_core/caf/flow/observable.hpp +++ b/libcaf_core/caf/flow/observable.hpp @@ -26,6 +26,7 @@ #include "caf/flow/op/interval.hpp" #include "caf/flow/op/merge.hpp" #include "caf/flow/op/never.hpp" +#include "caf/flow/op/on_backpressure_buffer.hpp" #include "caf/flow/op/prefix_and_tail.hpp" #include "caf/flow/op/publish.hpp" #include "caf/flow/step/all.hpp" @@ -292,6 +293,13 @@ class observable_def { return add_step(step::do_finally{std::move(f)}); } + /// @copydoc observable::on_backpressure_buffer + auto on_backpressure_buffer(size_t buffer_size, + backpressure_overflow_strategy strategy + = backpressure_overflow_strategy::fail) { + return materialize().on_backpressure_buffer(buffer_size, strategy); + } + auto on_error_complete() { return add_step(step::on_error_complete{}); } @@ -553,6 +561,14 @@ transformation> observable::on_error_complete() { return transform(step::on_error_complete{}); } +template +observable +observable::on_backpressure_buffer(size_t buffer_size, + backpressure_overflow_strategy strategy) { + using impl_t = op::on_backpressure_buffer; + return make_observable(ctx(), *this, buffer_size, strategy); +} + template template transformation> @@ -607,9 +623,9 @@ auto observable::merge(Inputs&&... xs) { static_assert( sizeof...(Inputs) > 0, "merge without arguments expects this observable to emit observables"); - static_assert( - (std::is_same_v>> && ...), - "can only merge observables with the same observed type"); + static_assert((std::is_same_v>> + && ...), + "can only merge observables with the same observed type"); using impl_t = op::merge; return make_observable(ctx(), *this, std::forward(xs)...); } diff --git a/libcaf_core/caf/flow/observable_decl.hpp b/libcaf_core/caf/flow/observable_decl.hpp index 9ac56ffbcb..67bb09d2b3 100644 --- a/libcaf_core/caf/flow/observable_decl.hpp +++ b/libcaf_core/caf/flow/observable_decl.hpp @@ -8,6 +8,7 @@ #include "caf/defaults.hpp" #include "caf/detail/is_complete.hpp" #include "caf/disposable.hpp" +#include "caf/flow/backpressure_overflow_strategy.hpp" #include "caf/flow/fwd.hpp" #include "caf/flow/op/base.hpp" #include "caf/flow/step/fwd.hpp" @@ -104,6 +105,12 @@ class observable { template transformation> map(F f); + /// When producing items faster than the consumer can consume them, the + /// observable will buffer up to `buffer_size` items before raising an error. + observable on_backpressure_buffer(size_t buffer_size, + backpressure_overflow_strategy strategy + = backpressure_overflow_strategy::fail); + /// Recovers from errors by converting `on_error` to `on_complete` events. transformation> on_error_complete(); diff --git a/libcaf_core/caf/flow/op/on_backpressure_buffer.hpp b/libcaf_core/caf/flow/op/on_backpressure_buffer.hpp new file mode 100644 index 0000000000..014654a044 --- /dev/null +++ b/libcaf_core/caf/flow/op/on_backpressure_buffer.hpp @@ -0,0 +1,222 @@ +// This file is part of CAF, the C++ Actor Framework. See the file LICENSE in +// the main distribution directory for license terms and copyright or visit +// https://github.com/actor-framework/actor-framework/blob/main/LICENSE. + +#pragma once + +#include "caf/config.hpp" +#include "caf/flow/backpressure_overflow_strategy.hpp" +#include "caf/flow/observer.hpp" +#include "caf/flow/op/hot.hpp" +#include "caf/flow/subscription.hpp" + +#include +#include + +namespace caf::flow::op { + +template +class on_backpressure_buffer_sub : public subscription::impl_base, + public observer_impl { +public: + // -- constructors, destructors, and assignment operators -------------------- + + on_backpressure_buffer_sub(coordinator* parent, observer out, + size_t buffer_size, + backpressure_overflow_strategy strategy) + : parent_(parent), + out_(std::move(out)), + buffer_size_(buffer_size), + strategy_(strategy) { + // nop + } + + // -- implementation of subscription ----------------------------------------- + + bool disposed() const noexcept override { + return !out_; + } + + void dispose() override { + if (out_) { + auto strong_this = intrusive_ptr{this}; + parent_->delay_fn([strong_this] { strong_this->do_dispose(); }); + } + } + + void request(size_t new_demand) override { + if (new_demand == 0) + return; + demand_ += new_demand; + if (demand_ == new_demand && !buffer_.empty()) { + parent_->delay_fn([strong_this = intrusive_ptr{this}] { // + strong_this->on_request(); + }); + } + } + + // -- implementation of observer_impl ---------------------------------------- + + void ref_coordinated() const noexcept override { + ref(); + } + + void deref_coordinated() const noexcept override { + deref(); + } + + void on_subscribe(subscription sub) override { + if (sub_) { + sub.dispose(); + return; + } + sub_ = std::move(sub); + sub_.request(buffer_size_); + } + + void on_next(const T& item) override { + if (!out_) + return; + if (demand_ > 0 && buffer_.empty()) { + --demand_; + out_.on_next(item); + if (sub_) + sub_.request(1); + return; + } + if (buffer_.size() == buffer_size_) { + switch (strategy_) { + case backpressure_overflow_strategy::drop_newest: + sub_.request(1); + break; + case backpressure_overflow_strategy::drop_oldest: + buffer_.pop_front(); + buffer_.push_back(item); + sub_.request(1); + break; + default: // backpressure_overflow_strategy::fail + sub_.dispose(); + buffer_.clear(); + out_.on_error(make_error(sec::backpressure_overflow)); + } + return; + } + buffer_.push_back(item); + sub_.request(1); + } + + void on_complete() override { + if (!out_ || src_error_) + return; + src_error_ = error{}; + subscription tmp; + tmp.swap(sub_); + if (buffer_.empty()) + out_.on_complete(); + } + + void on_error(const error& what) override { + if (!out_ || src_error_) + return; + src_error_ = what; + subscription tmp; + tmp.swap(sub_); + if (buffer_.empty()) + out_.on_error(what); + } + + friend void + intrusive_ptr_add_ref(const on_backpressure_buffer_sub* ptr) noexcept { + ptr->ref(); + } + + friend void + intrusive_ptr_release(const on_backpressure_buffer_sub* ptr) noexcept { + ptr->deref(); + } + +private: + void do_dispose() { + if (!out_) + return; + sub_.dispose(); + out_.on_complete(); + } + + void on_request() { + while (out_ && demand_ > 0 && !buffer_.empty()) { + --demand_; + if (sub_) + sub_.request(1); + out_.on_next(buffer_.front()); + buffer_.pop_front(); + } + if (out_ && src_error_) { + CAF_ASSERT(!sub_); + if (*src_error_) + out_.on_error(*src_error_); + else + out_.on_complete(); + } + } + + /// Stores the context (coordinator) that runs this flow. + coordinator* parent_; + + /// Stores a handle to the subscribed observer. + observer out_; + + subscription sub_; + + size_t buffer_size_ = 0; + + size_t demand_ = 0; + + backpressure_overflow_strategy strategy_; + + /// Stores whether the input observable has signaled on_complete or on_error. + /// A default-constructed error represents on_complete. + std::optional src_error_; + + std::deque buffer_; +}; + +/// An observable that on_backpressure_buffer calls any callbacks on its +/// subscribers. +template +class on_backpressure_buffer : public hot { +public: + // -- member types ----------------------------------------------------------- + + using super = hot; + + // -- constructors, destructors, and assignment operators -------------------- + + on_backpressure_buffer(coordinator* parent, observable decorated, + size_t buffer_size, + backpressure_overflow_strategy strategy) + : super(parent), + decorated_(std::move(decorated)), + buffer_size_(buffer_size), + strategy_(strategy) { + // nop + } + + // -- implementation of observable_impl ----------------------------------- + + disposable subscribe(observer out) override { + CAF_ASSERT(out.valid()); + using sub_t = on_backpressure_buffer_sub; + auto ptr = make_counted(super::ctx(), out, buffer_size_, strategy_); + out.on_subscribe(subscription{ptr}); + decorated_.subscribe(ptr->as_observer()); + return disposable{ptr->as_disposable()}; + } + +private: + observable decorated_; + size_t buffer_size_; + backpressure_overflow_strategy strategy_; +}; + +} // namespace caf::flow::op diff --git a/libcaf_core/caf/sec.hpp b/libcaf_core/caf/sec.hpp index ab7d19057d..32b192ecd6 100644 --- a/libcaf_core/caf/sec.hpp +++ b/libcaf_core/caf/sec.hpp @@ -181,6 +181,8 @@ enum class sec : uint8_t { protocol_error, /// Subscribing to a stream failed because it can only be subscribed to once. cannot_resubscribe_stream, +/// A downstream operator failed to process inputs on time. + backpressure_overflow, }; // --(rst-sec-end)--