diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index 996f0f2d9b4..eebaafda3fe 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -284,21 +284,34 @@ where } pub(super) fn drop_join_handle_slow(self) { - // Try to unset `JOIN_INTEREST`. This must be done as a first step in + // Try to unset `JOIN_INTEREST` and `JOIN_WAKER`. This must be done as a first step in // case the task concurrently completed. - if self.state().unset_join_interested().is_err() { - // It is our responsibility to drop the output. This is critical as - // the task output may not be `Send` and as such must remain with - // the scheduler or `JoinHandle`. i.e. if the output remains in the - // task structure until the task is deallocated, it may be dropped - // by a Waker on any arbitrary thread. - // - // Panics are delivered to the user via the `JoinHandle`. Given that - // they are dropping the `JoinHandle`, we assume they are not - // interested in the panic and swallow it. - let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| { - self.core().drop_future_or_output(); - })); + let snapshot = match self.state().unset_join_interested_and_waker() { + Ok(snapshot) => snapshot, + Err(snapshot) => { + // It is our responsibility to drop the output. This is critical as + // the task output may not be `Send` and as such must remain with + // the scheduler or `JoinHandle`. i.e. if the output remains in the + // task structure until the task is deallocated, it may be dropped + // by a Waker on any arbitrary thread. + // + // Panics are delivered to the user via the `JoinHandle`. Given that + // they are dropping the `JoinHandle`, we assume they are not + // interested in the panic and swallow it. + let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| { + self.core().drop_future_or_output(); + })); + snapshot + } + }; + + if !snapshot.is_join_waker_set() { + // If the JOIN_WAKER bit is not set the join handle has exclusive access to the waker + // at this point following rule 2 in task/mod.rs so we drop the waker at this point + // together with the join handle. + unsafe { + self.trailer().set_waker(None); + } } // Drop the `JoinHandle` reference, possibly deallocating the task @@ -311,7 +324,6 @@ where fn complete(self) { // The future has completed and its output has been written to the task // stage. We transition from running to complete. - let snapshot = self.state().transition_to_complete(); // We catch panics here in case dropping the future or waking the @@ -343,6 +355,15 @@ where })); } + if snapshot.is_join_interested() && snapshot.is_join_waker_set() { + // If JOIN_INTEREST and JOIN_WAKER are still set at this point, the runtime should + // drop the join waker as the join handle is not allowed to modify the waker + // following rule 6 in task/mod.rs + unsafe { + self.trailer().set_waker(None); + } + } + // The task has completed execution and will no longer be scheduled. let num_release = self.release(); diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 33f54003d38..b60a679849b 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -37,7 +37,7 @@ //! * `RUNNING` - Tracks whether the task is currently being polled or cancelled. //! This bit functions as a lock around the task. //! -//! * `COMPLETE` - Is one once the future has fully completed and has been +//! * `COMPLETE` - Is one once the future has fully completed and the future is //! dropped. Never unset once set. Never set together with RUNNING. //! //! * `NOTIFIED` - Tracks whether a Notified object currently exists. diff --git a/tokio/src/runtime/task/state.rs b/tokio/src/runtime/task/state.rs index 0fc7bb0329b..26ed450b248 100644 --- a/tokio/src/runtime/task/state.rs +++ b/tokio/src/runtime/task/state.rs @@ -89,6 +89,21 @@ pub(crate) enum TransitionToNotifiedByRef { Submit, } +#[must_use] +pub(crate) enum TransitionToJoinHandleDrop { + DoNothing, + DropOutput, + DropJoinWaker, + DropBoth, +} + +#[must_use] +pub(crate) enum TransitionToTerminal { + OkDoNothing, + OkDealloc, + FailedDropJoinWaker, +} + /// All transitions are performed via RMW operations. This establishes an /// unambiguous modification order. impl State { @@ -190,14 +205,24 @@ impl State { /// /// Returns true if the task should be deallocated. pub(super) fn transition_to_terminal(&self, count: usize) -> bool { - let prev = Snapshot(self.val.fetch_sub(count * REF_ONE, AcqRel)); - assert!( - prev.ref_count() >= count, - "current: {}, sub: {}", - prev.ref_count(), - count - ); - prev.ref_count() == count + self.fetch_update_action(|mut snapshot| { + assert!( + snapshot.ref_count() >= count, + "current: {}, sub: {}", + snapshot.ref_count(), + count + ); + + snapshot.0 -= count * REF_ONE; + if snapshot.is_join_interested() { + // If there is still a join handle alive at this point we unset the + // JOIN_WAKER bit so that the join handle gains exclusive access to + // the waker field to actually drop it. + snapshot.unset_join_waker(); + } + + (snapshot.ref_count() == 0, Some(snapshot)) + }) } /// Transitions the state to `NOTIFIED`. @@ -371,11 +396,11 @@ impl State { .map_err(|_| ()) } - /// Tries to unset the `JOIN_INTEREST` flag. + /// Tries to unset the `JOIN_INTEREST` and `JOIN_WAKER` flag. /// /// Returns `Ok` if the operation happens before the task transitions to a /// completed state, `Err` otherwise. - pub(super) fn unset_join_interested(&self) -> UpdateResult { + pub(super) fn unset_join_interested_and_waker(&self) -> UpdateResult { self.fetch_update(|curr| { assert!(curr.is_join_interested()); diff --git a/tokio/src/runtime/tests/loom_multi_thread.rs b/tokio/src/runtime/tests/loom_multi_thread.rs index ddd14b7fb3f..e2706e65c65 100644 --- a/tokio/src/runtime/tests/loom_multi_thread.rs +++ b/tokio/src/runtime/tests/loom_multi_thread.rs @@ -10,6 +10,7 @@ mod yield_now; /// In order to speed up the C use crate::runtime::tests::loom_oneshot as oneshot; use crate::runtime::{self, Runtime}; +use crate::sync::mpsc::channel; use crate::{spawn, task}; use tokio_test::assert_ok; @@ -459,3 +460,32 @@ impl Future for Track { }) } } + +#[test] +fn drop_tasks_with_reference_cycle() { + loom::model(|| { + let pool = mk_pool(2); + + pool.block_on(async move { + let (tx, mut rx) = channel(1); + + let (a_closer, mut wait_for_close_a) = channel::<()>(1); + let (b_closer, mut wait_for_close_b) = channel::<()>(1); + + let a = spawn(async move { + let b = rx.recv().await.unwrap(); + + futures::future::select(std::pin::pin!(b), std::pin::pin!(a_closer.send(()))).await; + }); + + let b = spawn(async move { + let _ = a.await; + let _ = b_closer.send(()).await; + }); + + tx.send(b).await.unwrap(); + + futures::future::join(wait_for_close_a.recv(), wait_for_close_b.recv()).await; + }); + }); +}