Skip to content

Commit

Permalink
Add async stack support to coroutines
Browse files Browse the repository at this point in the history
This change extends the work in #616 to support async stack frames in
`task<>` coroutines, including those that invoke `at_coroutine_exit()`.

In `task<>`, when `UNIFEX_NO_ASYNC_STACKS` is falsey, the awaiter returned from
`task<>`'s customization of `unifex::await_transform` stores an
`AsyncStackFrame`. The awaiter pushes its frame onto the current async stack in
`await_suspend()` and pops it again in `await_resume()`; since
`await_resume()` is only invoked for value and error completions, this
arrangement leaves it up to the waiting task to pop the awaiter's frame
when the awaited task completes with done. This can be expressed as a
new rule:

- when a coroutine completes with a value or an error, it is responsible
  for popping its own `AsyncStackFrame`; but
- when a coroutine completes with done, the *caller* is responsible for
  popping the callee's `AsyncStackFrame` as a part of the caller's
  `unhandled_done()` coroutine.

To support this new requirement of `unhandled_done()` (that it is
responsible for popping the callee's stack frame), this change
introduces `popAsyncStackFrameFromCaller`, which takes the caller's
stack frame by reference so that it can assert that, after popping the
current async frame (whatever it is), the new top frame is the caller's
frame.

A `task<>` promise has an `AsyncStackFrame*` that, when it's not
`nullptr`, points to the `AsyncStackFrame` in the awaiter waiting for
the task. This pointer exists even when `UNIFEX_NO_ASYNC_STACKS` is
truthy to help mitigate against ODR violations; linking together two TUs
with `UNIFEX_NO_ASYNC_STACKS` set differently is not explicitly
supported but, by ensuring this pointer always exists, some ODR problems
are avoided. When a `task<>` is awaited from a TU with async stack
support enabled, the awaited task's awaiter sets the promise's
`AsyncStackFrame*` to point to the awaiter's frame; when a `task<>` is
awaited from a TU with async stack support disabled, this assignment
never happens and the promise's pointer remains null.

The above description of `task<>`'s async stack maintenance only covers
the recursive case of on coroutine awaiting another. The base case is
handled in `connect_awaitable()`, where an `AsyncStackRoot` is set up
before starting the connected awaitable.

`stop_if_requested` used to model both `sender` and `awaitable` so that
`co_await stop_if_requested();` could take advantage of symmetric
transfer. The `stop_if_requested` sender now customizes
`await_transform` to express its participation in async stack
management. This means of expressing async stack awareness is
unsatisfying but I don't have any better ideas right now.

Lastly, `unifex::await_transform()` now wraps naturally-awaitable
arguments in an `awaiter_wrapper` that ensures the `coroutine_handle<>`
passed to the wrapped awaitable is one that establishes an active
`AsyncStackRoot` before resuming the real waiting coroutine.
  • Loading branch information
ispeters committed Sep 11, 2024
1 parent c93740d commit 45ca45b
Show file tree
Hide file tree
Showing 8 changed files with 763 additions and 176 deletions.
130 changes: 92 additions & 38 deletions include/unifex/at_coroutine_exit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <unifex/tag_invoke.hpp>
#include <unifex/unhandled_done.hpp>
#include <unifex/unstoppable_token.hpp>
#include <unifex/detail/unifex_fwd.hpp>

#if UNIFEX_NO_COROUTINES
# error "Coroutine support is required to use this header"
Expand Down Expand Up @@ -57,10 +58,25 @@ inline constexpr struct _fn {
} // namespace _xchg_cont
using _xchg_cont::exchange_continuation;

template <bool WithAsyncStackSupport>
struct _cleanup_promise_base {
struct final_awaitable {
bool await_ready() const noexcept { return false; }

template <typename CleanupPromise>
coro::coroutine_handle<> await_suspend_impl(
coro::coroutine_handle<CleanupPromise> h) const noexcept {
if constexpr (WithAsyncStackSupport) {
if (h.promise().parentFrame_ != nullptr) {
popAsyncStackFrameCallee(h.promise().frame_);
}
}

auto continuation = h.promise().next();
h.destroy(); // The cleanup action has finished executing. Destroy it.
return continuation;
}

// Clang before clang-12 has a bug with coroutines that self-destruct in an
// await_suspend that uses symmetric transfer. It appears that MSVC has the
// same bug, while Emscripten, the WebAssembly compiler just doesn't support
Expand All @@ -81,18 +97,14 @@ struct _cleanup_promise_base {
# endif
void
await_suspend(coro::coroutine_handle<CleanupPromise> h) const noexcept {
auto continuation = h.promise().next();
h.destroy(); // The cleanup action has finished executing. Destroy it.
continuation.resume();
await_suspend_impl(h).resume();
}
#else
// No bugs here! OK to use symmetric transfer.
template <typename CleanupPromise>
coro::coroutine_handle<>
await_suspend(coro::coroutine_handle<CleanupPromise> h) const noexcept {
auto continuation = h.promise().next();
h.destroy(); // The cleanup action has finished executing. Destroy it.
return continuation;
return await_suspend_impl(h);
}
#endif

Expand Down Expand Up @@ -135,10 +147,24 @@ struct _cleanup_promise_base {
return p.sched_;
}

template(typename Promise) //
(requires WithAsyncStackSupport AND
convertible_to<Promise, _cleanup_promise_base>) //
friend constexpr AsyncStackFrame* tag_invoke(
tag_t<get_async_stack_frame>, const Promise& p) noexcept {
return &p.frame_;
}

inline static constexpr inline_scheduler _default_scheduler{};
continuation_handle<> continuation_{};
any_scheduler sched_{_default_scheduler};
bool isUnhandledDone_{false};
UNIFEX_NO_UNIQUE_ADDRESS mutable std::
conditional_t<WithAsyncStackSupport, AsyncStackFrame*, detail::_empty<0>>
parentFrame_{};
UNIFEX_NO_UNIQUE_ADDRESS mutable std::
conditional_t<WithAsyncStackSupport, AsyncStackFrame, detail::_empty<1>>
frame_;
};

// The die_on_done algorithm implemented here could be implemented in terms of
Expand Down Expand Up @@ -233,16 +259,16 @@ struct _die_on_done_fn {
}
};

template <typename... Ts>
template <bool WithAsyncStackSupport, typename... Ts>
struct _cleanup_task;

template <typename... Ts>
struct _cleanup_promise : _cleanup_promise_base {
template <bool WithAsyncStackSupport, typename... Ts>
struct _cleanup_promise : _cleanup_promise_base<WithAsyncStackSupport> {
template <typename Action>
explicit _cleanup_promise(Action&&, Ts&... ts) : args_(ts...) {}

_cleanup_task<Ts...> get_return_object() noexcept {
return _cleanup_task<Ts...>(
_cleanup_task<WithAsyncStackSupport, Ts...> get_return_object() noexcept {
return _cleanup_task<WithAsyncStackSupport, Ts...>(
coro::coroutine_handle<_cleanup_promise>::from_promise(*this));
}

Expand All @@ -253,6 +279,12 @@ struct _cleanup_promise : _cleanup_promise_base {
template <typename Value>
decltype(auto) await_transform(Value&& value) noexcept(noexcept(
unifex::await_transform(*this, _die_on_done_fn{}((Value&&)value)))) {
if constexpr (WithAsyncStackSupport) {
if (this->parentFrame_ != nullptr) {
pushAsyncStackFrameCallerCallee(*this->parentFrame_, this->frame_);
}
}

return unifex::await_transform(*this, _die_on_done_fn{}((Value&&)value));
}

Expand All @@ -261,15 +293,15 @@ struct _cleanup_promise : _cleanup_promise_base {
// Record that we are processing an unhandled done signal. This is checked
// in the final_suspend of the cleanup action to know which subsequent
// continuation to resume.
isUnhandledDone_ = true;
this->isUnhandledDone_ = true;
// On unhandled_done, run the cleanup action:
return coro::coroutine_handle<_cleanup_promise>::from_promise(*this);
});
};

template <typename... Ts>
template <bool WithAsyncStackSupport, typename... Ts>
struct [[nodiscard]] _cleanup_task {
using promise_type = _cleanup_promise<Ts...>;
using promise_type = _cleanup_promise<WithAsyncStackSupport, Ts...>;

explicit _cleanup_task(coro::coroutine_handle<promise_type> coro) noexcept
: continuation_(coro) {}
Expand All @@ -279,29 +311,46 @@ struct [[nodiscard]] _cleanup_task {

~_cleanup_task() { UNIFEX_ASSERT(!continuation_); }

bool await_ready() const noexcept { return false; }
struct awaiter {
bool await_ready() const noexcept { return false; }

template <typename Promise>
bool await_suspend_impl_(Promise& parent) noexcept {
continuation_.promise().continuation_ =
exchange_continuation(parent, continuation_);
continuation_.promise().sched_ = get_scheduler(parent);
return false;
}
template <typename Promise>
bool await_suspend_impl_(
Promise& parent,
[[maybe_unused]] instruction_ptr returnAddress =
instruction_ptr::read_return_address()) noexcept {
continuation_.promise().continuation_ =
exchange_continuation(parent, continuation_);
continuation_.promise().sched_ = get_scheduler(parent);
if constexpr (WithAsyncStackSupport) {
continuation_.promise().parentFrame_ = get_async_stack_frame(parent);
continuation_.promise().frame_.setReturnAddress(returnAddress);
}
return false;
}

template <typename Promise>
bool await_suspend(coro::coroutine_handle<Promise> parent) noexcept {
return await_suspend_impl_(parent.promise());
}
template <typename Promise>
UNIFEX_NO_INLINE bool
await_suspend(coro::coroutine_handle<Promise> parent) noexcept {
return await_suspend_impl_(parent.promise());
}

std::tuple<Ts&...> await_resume() noexcept {
return std::move(std::exchange(continuation_, {}).promise().args_);
}
std::tuple<Ts&...> await_resume() noexcept {
return std::move(std::exchange(continuation_, {}).promise().args_);
}

// TODO: how do we address always-inline awaitables
friend constexpr auto tag_invoke(tag_t<blocking>, const awaiter&) noexcept {
return blocking_kind::always_inline;
}

// TODO: how do we address always-inline awaitables
friend constexpr auto
tag_invoke(tag_t<blocking>, const _cleanup_task&) noexcept {
return blocking_kind::always_inline;
continuation_handle<promise_type> continuation_;
};

template <typename Promise>
friend awaiter
tag_invoke(tag_t<await_transform>, Promise&, _cleanup_task task) noexcept {
return awaiter{std::exchange(task.continuation_, {})};
}

private:
Expand All @@ -311,18 +360,23 @@ struct [[nodiscard]] _cleanup_task {
namespace _at_coroutine_exit {
inline constexpr struct _fn {
private:
template <typename Action, typename... Ts>
static _cleanup_task<Ts...> at_coroutine_exit(Action action, Ts... ts) {
template <bool WithAsyncStackSupport, typename Action, typename... Ts>
static _cleanup_task<WithAsyncStackSupport, Ts...>
at_coroutine_exit(Action action, Ts... ts) {
co_await std::move(action)(std::move(ts)...);
}

public:
template(typename Action, typename... Ts) //
template(
typename Action,
typename... Ts,
bool WithAsyncStackSupport = !UNIFEX_NO_ASYNC_STACKS) //
(requires std::
is_invocable_v<std::decay_t<Action>, std::decay_t<Ts>...>) //
_cleanup_task<std::decay_t<Ts>...>
_cleanup_task<WithAsyncStackSupport, std::decay_t<Ts>...>
operator()(Action&& action, Ts&&... ts) const {
return _fn::at_coroutine_exit((Action&&)action, (Ts&&)ts...);
return _fn::at_coroutine_exit<WithAsyncStackSupport>(
(Action&&)action, (Ts&&)ts...);
}
} at_coroutine_exit{};
} // namespace _at_coroutine_exit
Expand Down
Loading

0 comments on commit 45ca45b

Please sign in to comment.