Skip to content

Commit

Permalink
Backport on_backpressure_buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
Neverlord committed Sep 14, 2024
1 parent 442fd10 commit 10afbbc
Show file tree
Hide file tree
Showing 5 changed files with 294 additions and 3 deletions.
44 changes: 44 additions & 0 deletions libcaf_core/caf/flow/backpressure_overflow_strategy.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// This file is part of CAF, the C++ Actor Framework. See the file LICENSE in
// the main distribution directory for license terms and copyright or visit
// https://github.com/actor-framework/actor-framework/blob/main/LICENSE.

#pragma once

#include "caf/default_enum_inspect.hpp"
#include "caf/detail/core_export.hpp"

#include <cstdint>
#include <string>
#include <type_traits>

namespace caf::flow {

/// Selects a strategy for handling backpressure.
enum class backpressure_overflow_strategy {
/// Drops the newest item when the buffer is full.
drop_newest,
/// Drops the oldest item when the buffer is full.
drop_oldest,
/// Raises an error when the buffer is full.
fail
};

/// @relates backpressure_overflow_strategy
CAF_CORE_EXPORT std::string to_string(backpressure_overflow_strategy);

/// @relates backpressure_overflow_strategy
CAF_CORE_EXPORT bool from_string(std::string_view,
backpressure_overflow_strategy&);

/// @relates backpressure_overflow_strategy
CAF_CORE_EXPORT bool
from_integer(std::underlying_type_t<backpressure_overflow_strategy>,
backpressure_overflow_strategy&);

/// @relates backpressure_overflow_strategy
template <class Inspector>
bool inspect(Inspector& f, backpressure_overflow_strategy& x) {
return default_enum_inspect(f, x);
}

} // namespace caf::flow
22 changes: 19 additions & 3 deletions libcaf_core/caf/flow/observable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "caf/flow/op/interval.hpp"
#include "caf/flow/op/merge.hpp"
#include "caf/flow/op/never.hpp"
#include "caf/flow/op/on_backpressure_buffer.hpp"
#include "caf/flow/op/prefix_and_tail.hpp"
#include "caf/flow/op/publish.hpp"
#include "caf/flow/step/all.hpp"
Expand Down Expand Up @@ -292,6 +293,13 @@ class observable_def {
return add_step(step::do_finally<output_type, F>{std::move(f)});
}

/// @copydoc observable::on_backpressure_buffer
auto on_backpressure_buffer(size_t buffer_size,
backpressure_overflow_strategy strategy
= backpressure_overflow_strategy::fail) {
return materialize().on_backpressure_buffer(buffer_size, strategy);
}

auto on_error_complete() {
return add_step(step::on_error_complete<output_type>{});
}
Expand Down Expand Up @@ -553,6 +561,14 @@ transformation<step::on_error_complete<T>> observable<T>::on_error_complete() {
return transform(step::on_error_complete<T>{});
}

template <class T>
observable<T>
observable<T>::on_backpressure_buffer(size_t buffer_size,
backpressure_overflow_strategy strategy) {
using impl_t = op::on_backpressure_buffer<T>;
return make_observable<impl_t>(ctx(), *this, buffer_size, strategy);
}

template <class T>
template <class Init, class Reducer>
transformation<step::reduce<Reducer>>
Expand Down Expand Up @@ -607,9 +623,9 @@ auto observable<T>::merge(Inputs&&... xs) {
static_assert(
sizeof...(Inputs) > 0,
"merge without arguments expects this observable to emit observables");
static_assert(
(std::is_same_v<Out, output_type_t<std::decay_t<Inputs>>> && ...),
"can only merge observables with the same observed type");
static_assert((std::is_same_v<Out, output_type_t<std::decay_t<Inputs>>>
&& ...),
"can only merge observables with the same observed type");
using impl_t = op::merge<Out>;
return make_observable<impl_t>(ctx(), *this, std::forward<Inputs>(xs)...);
}
Expand Down
7 changes: 7 additions & 0 deletions libcaf_core/caf/flow/observable_decl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "caf/defaults.hpp"
#include "caf/detail/is_complete.hpp"
#include "caf/disposable.hpp"
#include "caf/flow/backpressure_overflow_strategy.hpp"
#include "caf/flow/fwd.hpp"
#include "caf/flow/op/base.hpp"
#include "caf/flow/step/fwd.hpp"
Expand Down Expand Up @@ -104,6 +105,12 @@ class observable {
template <class F>
transformation<step::map<F>> map(F f);

/// When producing items faster than the consumer can consume them, the
/// observable will buffer up to `buffer_size` items before raising an error.
observable<T> on_backpressure_buffer(size_t buffer_size,
backpressure_overflow_strategy strategy
= backpressure_overflow_strategy::fail);

/// Recovers from errors by converting `on_error` to `on_complete` events.
transformation<step::on_error_complete<T>> on_error_complete();

Expand Down
222 changes: 222 additions & 0 deletions libcaf_core/caf/flow/op/on_backpressure_buffer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
// This file is part of CAF, the C++ Actor Framework. See the file LICENSE in
// the main distribution directory for license terms and copyright or visit
// https://github.com/actor-framework/actor-framework/blob/main/LICENSE.

#pragma once

#include "caf/config.hpp"
#include "caf/flow/backpressure_overflow_strategy.hpp"
#include "caf/flow/observer.hpp"
#include "caf/flow/op/hot.hpp"
#include "caf/flow/subscription.hpp"

#include <deque>
#include <utility>

namespace caf::flow::op {

template <class T>
class on_backpressure_buffer_sub : public subscription::impl_base,
public observer_impl<T> {
public:
// -- constructors, destructors, and assignment operators --------------------

on_backpressure_buffer_sub(coordinator* parent, observer<T> out,
size_t buffer_size,
backpressure_overflow_strategy strategy)
: parent_(parent),
out_(std::move(out)),
buffer_size_(buffer_size),
strategy_(strategy) {
// nop
}

// -- implementation of subscription -----------------------------------------

bool disposed() const noexcept override {
return !out_;
}

void dispose() override {
if (out_) {
auto strong_this = intrusive_ptr<on_backpressure_buffer_sub>{this};
parent_->delay_fn([strong_this] { strong_this->do_dispose(); });
}
}

void request(size_t new_demand) override {
if (new_demand == 0)
return;
demand_ += new_demand;
if (demand_ == new_demand && !buffer_.empty()) {
parent_->delay_fn([strong_this = intrusive_ptr{this}] { //
strong_this->on_request();
});
}
}

// -- implementation of observer_impl ----------------------------------------

void ref_coordinated() const noexcept override {
ref();
}

void deref_coordinated() const noexcept override {
deref();
}

void on_subscribe(subscription sub) override {
if (sub_) {
sub.dispose();
return;
}
sub_ = std::move(sub);
sub_.request(buffer_size_);
}

void on_next(const T& item) override {
if (!out_)
return;
if (demand_ > 0 && buffer_.empty()) {
--demand_;
out_.on_next(item);
if (sub_)
sub_.request(1);
return;
}
if (buffer_.size() == buffer_size_) {
switch (strategy_) {
case backpressure_overflow_strategy::drop_newest:
sub_.request(1);
break;
case backpressure_overflow_strategy::drop_oldest:
buffer_.pop_front();
buffer_.push_back(item);
sub_.request(1);
break;
default: // backpressure_overflow_strategy::fail
sub_.dispose();
buffer_.clear();
out_.on_error(make_error(sec::backpressure_overflow));
}
return;
}
buffer_.push_back(item);
sub_.request(1);
}

void on_complete() override {
if (!out_ || src_error_)
return;
src_error_ = error{};
subscription tmp;
tmp.swap(sub_);
if (buffer_.empty())
out_.on_complete();
}

void on_error(const error& what) override {
if (!out_ || src_error_)
return;
src_error_ = what;
subscription tmp;
tmp.swap(sub_);
if (buffer_.empty())
out_.on_error(what);
}

friend void
intrusive_ptr_add_ref(const on_backpressure_buffer_sub* ptr) noexcept {
ptr->ref();
}

friend void
intrusive_ptr_release(const on_backpressure_buffer_sub* ptr) noexcept {
ptr->deref();
}

private:
void do_dispose() {
if (!out_)
return;
sub_.dispose();
out_.on_complete();
}

void on_request() {
while (out_ && demand_ > 0 && !buffer_.empty()) {
--demand_;
if (sub_)
sub_.request(1);
out_.on_next(buffer_.front());
buffer_.pop_front();
}
if (out_ && src_error_) {
CAF_ASSERT(!sub_);
if (*src_error_)
out_.on_error(*src_error_);
else
out_.on_complete();
}
}

/// Stores the context (coordinator) that runs this flow.
coordinator* parent_;

/// Stores a handle to the subscribed observer.
observer<T> out_;

subscription sub_;

size_t buffer_size_ = 0;

size_t demand_ = 0;

backpressure_overflow_strategy strategy_;

/// Stores whether the input observable has signaled on_complete or on_error.
/// A default-constructed error represents on_complete.
std::optional<error> src_error_;

std::deque<T> buffer_;
};

/// An observable that on_backpressure_buffer calls any callbacks on its
/// subscribers.
template <class T>
class on_backpressure_buffer : public hot<T> {
public:
// -- member types -----------------------------------------------------------

using super = hot<T>;

// -- constructors, destructors, and assignment operators --------------------

on_backpressure_buffer(coordinator* parent, observable<T> decorated,
size_t buffer_size,
backpressure_overflow_strategy strategy)
: super(parent),
decorated_(std::move(decorated)),
buffer_size_(buffer_size),
strategy_(strategy) {
// nop
}

// -- implementation of observable_impl<T> -----------------------------------

disposable subscribe(observer<T> out) override {
CAF_ASSERT(out.valid());
using sub_t = on_backpressure_buffer_sub<T>;
auto ptr = make_counted<sub_t>(super::ctx(), out, buffer_size_, strategy_);
out.on_subscribe(subscription{ptr});
decorated_.subscribe(ptr->as_observer());
return disposable{ptr->as_disposable()};
}

private:
observable<T> decorated_;
size_t buffer_size_;
backpressure_overflow_strategy strategy_;
};

} // namespace caf::flow::op
2 changes: 2 additions & 0 deletions libcaf_core/caf/sec.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ enum class sec : uint8_t {
protocol_error,
/// Subscribing to a stream failed because it can only be subscribed to once.
cannot_resubscribe_stream,
/// A downstream operator failed to process inputs on time.
backpressure_overflow,
};
// --(rst-sec-end)--

Expand Down

0 comments on commit 10afbbc

Please sign in to comment.