From f10798e8a8cc3d180df5bbc73568e0e39617267f Mon Sep 17 00:00:00 2001 From: Timo Glane Date: Wed, 20 Nov 2024 23:23:46 +0100 Subject: [PATCH] Refactored --- tokio/src/runtime/task/harness.rs | 20 +++++----- tokio/src/runtime/task/mod.rs | 2 +- tokio/src/runtime/task/state.rs | 61 ++++++++++++++++++++----------- 3 files changed, 51 insertions(+), 32 deletions(-) diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index eebaafda3fe..cc8f11ee2b6 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -339,6 +339,17 @@ where // in task/mod.rs, since the JOIN_WAKER bit is set and the call // to transition_to_complete() above set the COMPLETE bit. self.trailer().wake_join(); + + // If JOIN_INTEREST is still set at this point the `JoinHandle` was not + // dropped since setting COMPLETE so we unset JOIN_WAKER so that the + // `JoinHandle` is able to drop the waker when itself gets dropped. + if self.state().unset_waker_if_join_interested().is_err() { + // If JOIN_INTEREST got unset since setting COMPLETE we need to drop + // the waker here because the `JoinHandle` is already dropped. + unsafe { + self.trailer().set_waker(None); + } + } } })); @@ -355,15 +366,6 @@ where })); } - if snapshot.is_join_interested() && snapshot.is_join_waker_set() { - // If JOIN_INTEREST and JOIN_WAKER are still set at this point, the runtime should - // drop the join waker as the join handle is not allowed to modify the waker - // following rule 6 in task/mod.rs - unsafe { - self.trailer().set_waker(None); - } - } - // The task has completed execution and will no longer be scheduled. let num_release = self.release(); diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index b60a679849b..33f54003d38 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -37,7 +37,7 @@ //! * `RUNNING` - Tracks whether the task is currently being polled or cancelled. //! This bit functions as a lock around the task. //! -//! * `COMPLETE` - Is one once the future has fully completed and the future is +//! * `COMPLETE` - Is one once the future has fully completed and has been //! dropped. Never unset once set. Never set together with RUNNING. //! //! * `NOTIFIED` - Tracks whether a Notified object currently exists. diff --git a/tokio/src/runtime/task/state.rs b/tokio/src/runtime/task/state.rs index aa03dc8f5e5..d4046a9178e 100644 --- a/tokio/src/runtime/task/state.rs +++ b/tokio/src/runtime/task/state.rs @@ -190,24 +190,14 @@ impl State { /// /// Returns true if the task should be deallocated. pub(super) fn transition_to_terminal(&self, count: usize) -> bool { - self.fetch_update_action(|mut snapshot| { - assert!( - snapshot.ref_count() >= count, - "current: {}, sub: {}", - snapshot.ref_count(), - count - ); - - snapshot.0 -= count * REF_ONE; - if snapshot.is_join_interested() { - // If there is still a join handle alive at this point we unset the - // JOIN_WAKER bit so that the join handle gains exclusive access to - // the waker field to actually drop it. - snapshot.unset_join_waker(); - } - - (snapshot.ref_count() == 0, Some(snapshot)) - }) + let prev = Snapshot(self.val.fetch_sub(count * REF_ONE, AcqRel)); + assert!( + prev.ref_count() >= count, + "current: {}, sub: {}", + prev.ref_count(), + count + ); + prev.ref_count() == count } /// Transitions the state to `NOTIFIED`. @@ -389,13 +379,20 @@ impl State { self.fetch_update(|curr| { assert!(curr.is_join_interested()); - if curr.is_complete() { - return None; - } + // if curr.is_complete() { + // return None; + // } + + // let mut next = curr; + // next.unset_join_interested(); + + // Some(next) let mut next = curr; next.unset_join_interested(); - + if !curr.is_complete() { + next.unset_join_waker(); + } Some(next) }) } @@ -440,6 +437,26 @@ impl State { }) } + /// Unsets the `JOIN_WAKER` bit only if the `JOIN_INTEREST` is still set. + /// + /// Returns `Ok` has been unset, `Err` otherwise. This operation requires + /// the task to be completed. + pub(super) fn unset_waker_if_join_interested(&self) -> UpdateResult { + self.fetch_update(|curr| { + assert!(curr.is_complete()); + assert!(curr.is_join_waker_set()); + + if !curr.is_join_interested() { + return None; + } + + let mut next = curr; + next.unset_join_waker(); + + Some(next) + }) + } + pub(super) fn ref_inc(&self) { use std::process; use std::sync::atomic::Ordering::Relaxed;