Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async stack support to coroutines #632

Merged
merged 7 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 92 additions & 38 deletions include/unifex/at_coroutine_exit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <unifex/tag_invoke.hpp>
#include <unifex/unhandled_done.hpp>
#include <unifex/unstoppable_token.hpp>
#include <unifex/detail/unifex_fwd.hpp>

#if UNIFEX_NO_COROUTINES
# error "Coroutine support is required to use this header"
Expand Down Expand Up @@ -57,10 +58,25 @@ inline constexpr struct _fn {
} // namespace _xchg_cont
using _xchg_cont::exchange_continuation;

template <bool WithAsyncStackSupport>
struct _cleanup_promise_base {
struct final_awaitable {
bool await_ready() const noexcept { return false; }

template <typename CleanupPromise>
coro::coroutine_handle<> await_suspend_impl(
coro::coroutine_handle<CleanupPromise> h) const noexcept {
if constexpr (WithAsyncStackSupport) {
if (h.promise().parentFrame_ != nullptr) {
popAsyncStackFrameCallee(h.promise().frame_);
}
}

auto continuation = h.promise().next();
h.destroy(); // The cleanup action has finished executing. Destroy it.
return continuation;
}

// Clang before clang-12 has a bug with coroutines that self-destruct in an
// await_suspend that uses symmetric transfer. It appears that MSVC has the
// same bug, while Emscripten, the WebAssembly compiler just doesn't support
Expand All @@ -81,18 +97,14 @@ struct _cleanup_promise_base {
# endif
void
await_suspend(coro::coroutine_handle<CleanupPromise> h) const noexcept {
auto continuation = h.promise().next();
h.destroy(); // The cleanup action has finished executing. Destroy it.
continuation.resume();
await_suspend_impl(h).resume();
}
#else
// No bugs here! OK to use symmetric transfer.
template <typename CleanupPromise>
coro::coroutine_handle<>
await_suspend(coro::coroutine_handle<CleanupPromise> h) const noexcept {
auto continuation = h.promise().next();
h.destroy(); // The cleanup action has finished executing. Destroy it.
return continuation;
return await_suspend_impl(h);
}
#endif

Expand Down Expand Up @@ -135,10 +147,24 @@ struct _cleanup_promise_base {
return p.sched_;
}

template(typename Promise) //
(requires WithAsyncStackSupport AND
convertible_to<Promise, _cleanup_promise_base>) //
friend constexpr AsyncStackFrame* tag_invoke(
tag_t<get_async_stack_frame>, const Promise& p) noexcept {
return &p.frame_;
}

inline static constexpr inline_scheduler _default_scheduler{};
continuation_handle<> continuation_{};
any_scheduler sched_{_default_scheduler};
bool isUnhandledDone_{false};
UNIFEX_NO_UNIQUE_ADDRESS mutable std::
conditional_t<WithAsyncStackSupport, AsyncStackFrame*, detail::_empty<0>>
parentFrame_{};
UNIFEX_NO_UNIQUE_ADDRESS mutable std::
conditional_t<WithAsyncStackSupport, AsyncStackFrame, detail::_empty<1>>
frame_;
};

// The die_on_done algorithm implemented here could be implemented in terms of
Expand Down Expand Up @@ -233,16 +259,16 @@ struct _die_on_done_fn {
}
};

template <typename... Ts>
template <bool WithAsyncStackSupport, typename... Ts>
struct _cleanup_task;

template <typename... Ts>
struct _cleanup_promise : _cleanup_promise_base {
template <bool WithAsyncStackSupport, typename... Ts>
struct _cleanup_promise : _cleanup_promise_base<WithAsyncStackSupport> {
template <typename Action>
explicit _cleanup_promise(Action&&, Ts&... ts) : args_(ts...) {}

_cleanup_task<Ts...> get_return_object() noexcept {
return _cleanup_task<Ts...>(
_cleanup_task<WithAsyncStackSupport, Ts...> get_return_object() noexcept {
return _cleanup_task<WithAsyncStackSupport, Ts...>(
coro::coroutine_handle<_cleanup_promise>::from_promise(*this));
}

Expand All @@ -253,6 +279,12 @@ struct _cleanup_promise : _cleanup_promise_base {
template <typename Value>
decltype(auto) await_transform(Value&& value) noexcept(noexcept(
unifex::await_transform(*this, _die_on_done_fn{}((Value&&)value)))) {
if constexpr (WithAsyncStackSupport) {
if (this->parentFrame_ != nullptr) {
pushAsyncStackFrameCallerCallee(*this->parentFrame_, this->frame_);
}
}

return unifex::await_transform(*this, _die_on_done_fn{}((Value&&)value));
}

Expand All @@ -261,15 +293,15 @@ struct _cleanup_promise : _cleanup_promise_base {
// Record that we are processing an unhandled done signal. This is checked
// in the final_suspend of the cleanup action to know which subsequent
// continuation to resume.
isUnhandledDone_ = true;
this->isUnhandledDone_ = true;
// On unhandled_done, run the cleanup action:
return coro::coroutine_handle<_cleanup_promise>::from_promise(*this);
});
};

template <typename... Ts>
template <bool WithAsyncStackSupport, typename... Ts>
struct [[nodiscard]] _cleanup_task {
using promise_type = _cleanup_promise<Ts...>;
using promise_type = _cleanup_promise<WithAsyncStackSupport, Ts...>;

explicit _cleanup_task(coro::coroutine_handle<promise_type> coro) noexcept
: continuation_(coro) {}
Expand All @@ -279,29 +311,46 @@ struct [[nodiscard]] _cleanup_task {

~_cleanup_task() { UNIFEX_ASSERT(!continuation_); }

bool await_ready() const noexcept { return false; }
struct awaiter {
bool await_ready() const noexcept { return false; }

template <typename Promise>
bool await_suspend_impl_(Promise& parent) noexcept {
continuation_.promise().continuation_ =
exchange_continuation(parent, continuation_);
continuation_.promise().sched_ = get_scheduler(parent);
return false;
}
template <typename Promise>
bool await_suspend_impl_(
Promise& parent,
[[maybe_unused]] instruction_ptr returnAddress =
instruction_ptr::read_return_address()) noexcept {
continuation_.promise().continuation_ =
exchange_continuation(parent, continuation_);
continuation_.promise().sched_ = get_scheduler(parent);
if constexpr (WithAsyncStackSupport) {
continuation_.promise().parentFrame_ = get_async_stack_frame(parent);
continuation_.promise().frame_.setReturnAddress(returnAddress);
}
return false;
}

template <typename Promise>
bool await_suspend(coro::coroutine_handle<Promise> parent) noexcept {
return await_suspend_impl_(parent.promise());
}
template <typename Promise>
UNIFEX_NO_INLINE bool
await_suspend(coro::coroutine_handle<Promise> parent) noexcept {
return await_suspend_impl_(parent.promise());
}

std::tuple<Ts&...> await_resume() noexcept {
return std::move(std::exchange(continuation_, {}).promise().args_);
}
std::tuple<Ts&...> await_resume() noexcept {
return std::move(std::exchange(continuation_, {}).promise().args_);
}

// TODO: how do we address always-inline awaitables
friend constexpr auto tag_invoke(tag_t<blocking>, const awaiter&) noexcept {
return blocking_kind::always_inline;
}

// TODO: how do we address always-inline awaitables
friend constexpr auto
tag_invoke(tag_t<blocking>, const _cleanup_task&) noexcept {
return blocking_kind::always_inline;
continuation_handle<promise_type> continuation_;
};

template <typename Promise>
friend awaiter
tag_invoke(tag_t<await_transform>, Promise&, _cleanup_task task) noexcept {
return awaiter{std::exchange(task.continuation_, {})};
}

private:
Expand All @@ -311,18 +360,23 @@ struct [[nodiscard]] _cleanup_task {
namespace _at_coroutine_exit {
inline constexpr struct _fn {
private:
template <typename Action, typename... Ts>
static _cleanup_task<Ts...> at_coroutine_exit(Action action, Ts... ts) {
template <bool WithAsyncStackSupport, typename Action, typename... Ts>
static _cleanup_task<WithAsyncStackSupport, Ts...>
at_coroutine_exit(Action action, Ts... ts) {
co_await std::move(action)(std::move(ts)...);
}

public:
template(typename Action, typename... Ts) //
template(
typename Action,
typename... Ts,
bool WithAsyncStackSupport = !UNIFEX_NO_ASYNC_STACKS) //
(requires std::
is_invocable_v<std::decay_t<Action>, std::decay_t<Ts>...>) //
_cleanup_task<std::decay_t<Ts>...>
_cleanup_task<WithAsyncStackSupport, std::decay_t<Ts>...>
operator()(Action&& action, Ts&&... ts) const {
return _fn::at_coroutine_exit((Action&&)action, (Ts&&)ts...);
return _fn::at_coroutine_exit<WithAsyncStackSupport>(
(Action&&)action, (Ts&&)ts...);
}
} at_coroutine_exit{};
} // namespace _at_coroutine_exit
Expand Down
Loading
Loading