Skip to content

Commit

Permalink
Drop the join waker of a task eagerly when the task completes and the…
Browse files Browse the repository at this point in the history
…re is no

join interest
  • Loading branch information
tglane committed Nov 20, 2024
1 parent bb7ca75 commit e94c7d8
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 26 deletions.
51 changes: 36 additions & 15 deletions tokio/src/runtime/task/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,21 +284,34 @@ where
}

pub(super) fn drop_join_handle_slow(self) {
// Try to unset `JOIN_INTEREST`. This must be done as a first step in
// Try to unset `JOIN_INTEREST` and `JOIN_WAKER`. This must be done as a first step in
// case the task concurrently completed.
if self.state().unset_join_interested().is_err() {
// It is our responsibility to drop the output. This is critical as
// the task output may not be `Send` and as such must remain with
// the scheduler or `JoinHandle`. i.e. if the output remains in the
// task structure until the task is deallocated, it may be dropped
// by a Waker on any arbitrary thread.
//
// Panics are delivered to the user via the `JoinHandle`. Given that
// they are dropping the `JoinHandle`, we assume they are not
// interested in the panic and swallow it.
let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
self.core().drop_future_or_output();
}));
let snapshot = match self.state().unset_join_interested_and_waker() {
Ok(snapshot) => snapshot,
Err(snapshot) => {
// It is our responsibility to drop the output. This is critical as
// the task output may not be `Send` and as such must remain with
// the scheduler or `JoinHandle`. i.e. if the output remains in the
// task structure until the task is deallocated, it may be dropped
// by a Waker on any arbitrary thread.
//
// Panics are delivered to the user via the `JoinHandle`. Given that
// they are dropping the `JoinHandle`, we assume they are not
// interested in the panic and swallow it.
let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
self.core().drop_future_or_output();
}));
snapshot
}
};

if !snapshot.is_join_waker_set() {
// If the JOIN_WAKER bit is not set the join handle has exclusive access to the waker
// at this point following rule 2 in task/mod.rs so we drop the waker at this point
// together with the join handle.
unsafe {
self.trailer().set_waker(None);
}
}

// Drop the `JoinHandle` reference, possibly deallocating the task
Expand All @@ -311,7 +324,6 @@ where
fn complete(self) {
// The future has completed and its output has been written to the task
// stage. We transition from running to complete.

let snapshot = self.state().transition_to_complete();

// We catch panics here in case dropping the future or waking the
Expand Down Expand Up @@ -343,6 +355,15 @@ where
}));
}

if snapshot.is_join_interested() && snapshot.is_join_waker_set() {
// If JOIN_INTEREST and JOIN_WAKER are still set at this point, the runtime should
// drop the join waker as the join handle is not allowed to modify the waker
// following rule 6 in task/mod.rs
unsafe {
self.trailer().set_waker(None);
}
}

// The task has completed execution and will no longer be scheduled.
let num_release = self.release();

Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
//! * `RUNNING` - Tracks whether the task is currently being polled or cancelled.
//! This bit functions as a lock around the task.
//!
//! * `COMPLETE` - Is one once the future has fully completed and has been
//! * `COMPLETE` - Is one once the future has fully completed and the future is
//! dropped. Never unset once set. Never set together with RUNNING.
//!
//! * `NOTIFIED` - Tracks whether a Notified object currently exists.
Expand Down
30 changes: 20 additions & 10 deletions tokio/src/runtime/task/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,24 @@ impl State {
///
/// Returns true if the task should be deallocated.
pub(super) fn transition_to_terminal(&self, count: usize) -> bool {
let prev = Snapshot(self.val.fetch_sub(count * REF_ONE, AcqRel));
assert!(
prev.ref_count() >= count,
"current: {}, sub: {}",
prev.ref_count(),
count
);
prev.ref_count() == count
self.fetch_update_action(|mut snapshot| {
assert!(
snapshot.ref_count() >= count,
"current: {}, sub: {}",
snapshot.ref_count(),
count
);

snapshot.0 -= count * REF_ONE;
if snapshot.is_join_interested() {
// If there is still a join handle alive at this point we unset the
// JOIN_WAKER bit so that the join handle gains exclusive access to
// the waker field to actually drop it.
snapshot.unset_join_waker();
}

(snapshot.ref_count() == 0, Some(snapshot))
})
}

/// Transitions the state to `NOTIFIED`.
Expand Down Expand Up @@ -371,11 +381,11 @@ impl State {
.map_err(|_| ())
}

/// Tries to unset the `JOIN_INTEREST` flag.
/// Tries to unset the `JOIN_INTEREST` and `JOIN_WAKER` flag.
///
/// Returns `Ok` if the operation happens before the task transitions to a
/// completed state, `Err` otherwise.
pub(super) fn unset_join_interested(&self) -> UpdateResult {
pub(super) fn unset_join_interested_and_waker(&self) -> UpdateResult {
self.fetch_update(|curr| {
assert!(curr.is_join_interested());

Expand Down
30 changes: 30 additions & 0 deletions tokio/src/runtime/tests/loom_multi_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod yield_now;
/// In order to speed up the C
use crate::runtime::tests::loom_oneshot as oneshot;
use crate::runtime::{self, Runtime};
use crate::sync::mpsc::channel;
use crate::{spawn, task};
use tokio_test::assert_ok;

Expand Down Expand Up @@ -459,3 +460,32 @@ impl<T: Future> Future for Track<T> {
})
}
}

#[test]
fn drop_tasks_with_reference_cycle() {
loom::model(|| {
let pool = mk_pool(2);

pool.block_on(async move {
let (tx, mut rx) = channel(1);

let (a_closer, mut wait_for_close_a) = channel::<()>(1);
let (b_closer, mut wait_for_close_b) = channel::<()>(1);

let a = spawn(async move {
let b = rx.recv().await.unwrap();

futures::future::select(std::pin::pin!(b), std::pin::pin!(a_closer.send(()))).await;
});

let b = spawn(async move {
let _ = a.await;
let _ = b_closer.send(()).await;
});

tx.send(b).await.unwrap();

futures::future::join(wait_for_close_a.recv(), wait_for_close_b.recv()).await;
});
});
}

0 comments on commit e94c7d8

Please sign in to comment.