From ba5177d258a04cbbb371865abecaff25dc2c1401 Mon Sep 17 00:00:00 2001 From: Thomas de Zeeuw Date: Sun, 21 Jul 2024 12:18:50 +0200 Subject: [PATCH 1/5] Move DropWaker to its own module --- src/drop_waker.rs | 60 +++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 56 ++----------------------------------------- 2 files changed, 62 insertions(+), 54 deletions(-) create mode 100644 src/drop_waker.rs diff --git a/src/drop_waker.rs b/src/drop_waker.rs new file mode 100644 index 0000000..35430bf --- /dev/null +++ b/src/drop_waker.rs @@ -0,0 +1,60 @@ +//! [`DropWaker`] trait and implementations. +//! +//! See [`drop_task_waker`]. + +use std::sync::Arc; +use std::task; + +/// Create a [`task::Waker`] that will drop itself when the waker is dropped. +/// +/// # Safety +/// +/// The returned `task::Waker` cannot be cloned, it will panic. +pub(crate) unsafe fn drop_task_waker(to_drop: T) -> task::Waker { + unsafe fn drop_by_ptr(data: *const ()) { + T::drop_from_waker_data(data); + } + + // SAFETY: we meet the `task::Waker` and `task::RawWaker` requirements. + unsafe { + task::Waker::from_raw(task::RawWaker::new( + to_drop.into_waker_data(), + &task::RawWakerVTable::new( + |_| panic!("attempted to clone `a10::drop_task_waker`"), + // SAFETY: `wake` takes ownership, so dropping is safe. + drop_by_ptr::, + |_| { /* `wake_by_ref` is a no-op. */ }, + drop_by_ptr::, + ), + )) + } +} + +/// Trait used by [`drop_task_waker`]. +pub(crate) trait DropWaker { + /// Return itself as waker data. + fn into_waker_data(self) -> *const (); + + /// Drop the waker `data` created by `into_waker_data`. + unsafe fn drop_from_waker_data(data: *const ()); +} + +impl DropWaker for Box { + fn into_waker_data(self) -> *const () { + Box::into_raw(self).cast() + } + + unsafe fn drop_from_waker_data(data: *const ()) { + drop(Box::::from_raw(data.cast_mut().cast())); + } +} + +impl DropWaker for Arc { + fn into_waker_data(self) -> *const () { + Arc::into_raw(self).cast() + } + + unsafe fn drop_from_waker_data(data: *const ()) { + drop(Arc::::from_raw(data.cast_mut().cast())); + } +} diff --git a/src/lib.rs b/src/lib.rs index 48850e0..11eb357 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -147,6 +147,7 @@ use std::{fmt, ptr}; mod bitmap; mod config; +mod drop_waker; mod op; mod sys; @@ -167,6 +168,7 @@ use bitmap::AtomicBitMap; pub use cancel::Cancel; use config::munmap; pub use config::Config; +use drop_waker::drop_task_waker; #[doc(no_inline)] pub use extract::Extract; #[doc(no_inline)] @@ -1013,60 +1015,6 @@ impl fmt::Display for QueueFull { } } -/// Create a [`task::Waker`] that will drop itself when the waker is dropped. -/// -/// # Safety -/// -/// The returned `task::Waker` cannot be cloned, it will panic. -unsafe fn drop_task_waker(to_drop: T) -> task::Waker { - unsafe fn drop_by_ptr(data: *const ()) { - T::drop_from_waker_data(data); - } - - // SAFETY: we meet the `task::Waker` and `task::RawWaker` requirements. - unsafe { - task::Waker::from_raw(task::RawWaker::new( - to_drop.into_waker_data(), - &task::RawWakerVTable::new( - |_| panic!("attempted to clone `a10::drop_task_waker`"), - // SAFETY: `wake` takes ownership, so dropping is safe. - drop_by_ptr::, - |_| { /* `wake_by_ref` is a no-op. */ }, - drop_by_ptr::, - ), - )) - } -} - -/// Trait used by [`drop_task_waker`]. -trait DropWaker { - /// Return itself as waker data. - fn into_waker_data(self) -> *const (); - - /// Drop the waker `data` created by `into_waker_data`. - unsafe fn drop_from_waker_data(data: *const ()); -} - -impl DropWaker for Box { - fn into_waker_data(self) -> *const () { - Box::into_raw(self).cast() - } - - unsafe fn drop_from_waker_data(data: *const ()) { - drop(Box::::from_raw(data.cast_mut().cast())); - } -} - -impl DropWaker for Arc { - fn into_waker_data(self) -> *const () { - Arc::into_raw(self).cast() - } - - unsafe fn drop_from_waker_data(data: *const ()) { - drop(Arc::::from_raw(data.cast_mut().cast())); - } -} - /// Queue of completion events. #[derive(Debug)] struct CompletionQueue { From e7de419991ea97d777715d6b8f7dffd7b79986ab Mon Sep 17 00:00:00 2001 From: Thomas de Zeeuw Date: Sun, 21 Jul 2024 12:27:30 +0200 Subject: [PATCH 2/5] Add more implementations for DropWaker --- src/drop_waker.rs | 49 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/src/drop_waker.rs b/src/drop_waker.rs index 35430bf..607ee5e 100644 --- a/src/drop_waker.rs +++ b/src/drop_waker.rs @@ -2,6 +2,9 @@ //! //! See [`drop_task_waker`]. +use std::cell::UnsafeCell; +use std::ffi::CString; +use std::ptr; use std::sync::Arc; use std::task; @@ -39,6 +42,42 @@ pub(crate) trait DropWaker { unsafe fn drop_from_waker_data(data: *const ()); } +impl DropWaker for UnsafeCell +where + T: DropWaker, +{ + fn into_waker_data(self) -> *const () { + self.into_inner().into_waker_data() + } + + unsafe fn drop_from_waker_data(data: *const ()) { + T::drop_from_waker_data(data); + } +} + +impl DropWaker for (T,) +where + T: DropWaker, +{ + fn into_waker_data(self) -> *const () { + self.0.into_waker_data() + } + + unsafe fn drop_from_waker_data(data: *const ()) { + T::drop_from_waker_data(data); + } +} + +impl DropWaker for () { + fn into_waker_data(self) -> *const () { + ptr::null() + } + + unsafe fn drop_from_waker_data(_: *const ()) { + // Nothing. + } +} + impl DropWaker for Box { fn into_waker_data(self) -> *const () { Box::into_raw(self).cast() @@ -58,3 +97,13 @@ impl DropWaker for Arc { drop(Arc::::from_raw(data.cast_mut().cast())); } } + +impl DropWaker for CString { + fn into_waker_data(self) -> *const () { + CString::into_raw(self).cast() + } + + unsafe fn drop_from_waker_data(data: *const ()) { + drop(CString::from_raw(data.cast_mut().cast())); + } +} From 93030a563cf21d8cf56bd93e57de64573e20775f Mon Sep 17 00:00:00 2001 From: Thomas de Zeeuw Date: Sun, 21 Jul 2024 12:40:09 +0200 Subject: [PATCH 3/5] Use DropWaker SubmissionQueue::cancel_op Instead of always using the Box implementation, meaning we always allocate, we now require the caller to pass resources that implement DropWaker. This means we can avoid the allocation for various types that implement DropWaker already. The downside is that in the case where we got no more events the allocation is made upfront (for certain types) and then dropped, without actually being in a waker. --- src/fs.rs | 16 ++++++++-------- src/io/mod.rs | 4 ++++ src/lib.rs | 5 +++-- src/net.rs | 7 +++++++ src/op.rs | 29 +++++++++++++++++++++++++++-- src/process.rs | 2 +- 6 files changed, 50 insertions(+), 13 deletions(-) diff --git a/src/fs.rs b/src/fs.rs index 0cbbc9a..1d73009 100644 --- a/src/fs.rs +++ b/src/fs.rs @@ -909,14 +909,14 @@ impl Drop for Rename { match self.state { OpState::Running(op_index) => { - let result = self - .sq - .cancel_op(op_index, (from, to), |submission| unsafe { - submission.cancel_op(op_index); - // We'll get a canceled completion event if we succeeded, which - // is sufficient to cleanup the operation. - submission.no_completion_event(); - }); + let result = + self.sq + .cancel_op(op_index, Box::from((from, to)), |submission| unsafe { + submission.cancel_op(op_index); + // We'll get a canceled completion event if we succeeded, which + // is sufficient to cleanup the operation. + submission.no_completion_event(); + }); if let Err(err) = result { log::error!("dropped a10::CreateDir before completion, attempt to cancel failed: {err}"); } diff --git a/src/io/mod.rs b/src/io/mod.rs index 47aebbc..3546737 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -381,6 +381,7 @@ op_future! { /// access it safely. buf: B, }, + drop_using: Box, setup_state: offset: u64, setup: |submission, fd, (buf,), offset| unsafe { let (ptr, len) = buf.parts_mut(); @@ -479,6 +480,7 @@ op_future! { /// to heap allocation. iovecs: [libc::iovec; N], }, + drop_using: Box, /// `iovecs` can't move until the kernel has read the submission. impl !Unpin, setup_state: offset: u64, @@ -596,6 +598,7 @@ op_future! { /// access it safely. buf: B, }, + drop_using: Box, setup_state: offset: u64, setup: |submission, fd, (buf,), offset| unsafe { let (ptr, len) = buf.parts(); @@ -723,6 +726,7 @@ op_future! { /// to heap allocation. iovecs: [libc::iovec; N], }, + drop_using: Box, /// `iovecs` can't move until the kernel has read the submission. impl !Unpin, setup_state: offset: u64, diff --git a/src/lib.rs b/src/lib.rs index 11eb357..a948e74 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -168,7 +168,7 @@ use bitmap::AtomicBitMap; pub use cancel::Cancel; use config::munmap; pub use config::Config; -use drop_waker::drop_task_waker; +use drop_waker::{drop_task_waker, DropWaker}; #[doc(no_inline)] pub use extract::Extract; #[doc(no_inline)] @@ -826,6 +826,7 @@ impl SubmissionQueue { ) -> Result<(), QueueFull> where F: FnOnce(&mut Submission), + T: DropWaker, { log::trace!(op_index = op_index.0; "canceling operation"); if let Some(operation) = self.shared.queued_ops.get(op_index.0) { @@ -858,7 +859,7 @@ impl SubmissionQueue { // resources in `resources`. let waker = if needs_drop::() { // SAFETY: we're not going to clone the `waker`. - Some(unsafe { drop_task_waker(Box::from(resources)) }) + Some(unsafe { drop_task_waker(resources) }) } else { // Of course if we don't need to drop `T`, then we don't // have to use a special waker. But we still don't want to diff --git a/src/net.rs b/src/net.rs index d2abd52..c93e14e 100644 --- a/src/net.rs +++ b/src/net.rs @@ -483,6 +483,7 @@ op_future! { /// access it safely. buf: B, }, + drop_using: Box, setup_state: flags: (u8, libc::c_int), setup: |submission, fd, (buf,), (op, flags)| unsafe { let (ptr, len) = buf.parts(); @@ -579,6 +580,7 @@ op_future! { /// Address to send to. address: A, }, + drop_using: Box, setup_state: flags: (u8, libc::c_int), setup: |submission, fd, (buf, address), (op, flags)| unsafe { let (buf, buf_len) = buf.parts(); @@ -615,6 +617,7 @@ op_future! { msg: libc::msghdr, iovecs: [libc::iovec; N], }, + drop_using: Box, /// `msg` and `iovecs` can't move until the kernel has read the submission. impl !Unpin, setup_state: flags: (u8, libc::c_int), @@ -732,6 +735,7 @@ op_future! { /// access it safely. buf: B, }, + drop_using: Box, setup_state: flags: libc::c_int, setup: |submission, fd, (buf,), flags| unsafe { let (ptr, len) = buf.parts_mut(); @@ -849,6 +853,7 @@ op_future! { /// allocation. iovecs: [libc::iovec; N], }, + drop_using: Box, /// `iovecs` can't move until the kernel has read the submission. impl !Unpin, setup_state: flags: libc::c_int, @@ -943,6 +948,7 @@ op_future! { /// allocation. iovec: libc::iovec, }, + drop_using: Box, /// `iovec` can't move until the kernel has read the submission. impl !Unpin, setup_state: flags: libc::c_int, @@ -990,6 +996,7 @@ op_future! { /// allocation. iovecs: [libc::iovec; N], }, + drop_using: Box, /// `iovecs` can't move until the kernel has read the submission. impl !Unpin, setup_state: flags: libc::c_int, diff --git a/src/op.rs b/src/op.rs index 1e0b839..b5d2e30 100644 --- a/src/op.rs +++ b/src/op.rs @@ -1038,6 +1038,10 @@ macro_rules! op_future { $field: ident : $value: ty, )* }, + // Whether or not to use a special type to implemnt `DropWake`. + $( + drop_using: $drop_wake: tt, + )? // Whether or not the structure should be `!Unpin` by including // `PhantomPinned`. $( @@ -1055,7 +1059,7 @@ macro_rules! op_future { // Mapping function for `Extractor` implementation. See above. extract: |$extract_self: ident, $extract_resources: tt, $extract_flags: ident, $extract_arg: ident| -> $extract_result: ty $extract_map: block, ) => { - $crate::op::op_future!{ + $crate::op::op_future! { fn $type::$method -> $result, struct $name<$lifetime $(, $generic $(: $trait )? )* $(; const $const_generic: $const_ty )*> { $( @@ -1063,6 +1067,9 @@ macro_rules! op_future { $field: $value, )* }, + $( + drop_using: $drop_wake, + )? $( $(#[ $phantom_doc ])* impl !Unpin, @@ -1111,6 +1118,9 @@ macro_rules! op_future { $field: ident : $value: ty, )* }, + $( + drop_using: $drop_wake: tt, + )? $( $(#[ $phantom_doc: meta ])* impl !Unpin, @@ -1217,6 +1227,9 @@ macro_rules! op_future { if let std::option::Option::Some(resources) = self.resources.take() { match self.state { $crate::op::OpState::Running(op_index) => { + // Use a different type for the `DropWake` + // implementation. + $( let resources = $drop_wake::from(resources); )? let result = self.fd.sq.cancel_op(op_index, resources, |submission| unsafe { submission.cancel_op(op_index); // We'll get a canceled completion event if we succeeded, which @@ -1245,6 +1258,9 @@ macro_rules! op_future { $field: ident : $value: ty, )* }, + $( + drop_using: $drop_wake: tt, + )? $( $(#[ $phantom_doc: meta ])* impl !Unpin, @@ -1254,7 +1270,7 @@ macro_rules! op_future { map_result: |$self: ident, $resources: tt, $map_arg: ident| $map_result: expr, $( extract: |$extract_self: ident, $extract_resources: tt, $extract_flags: ident, $extract_arg: ident| -> $extract_result: ty $extract_map: block, )? ) => { - $crate::op::op_future!{ + $crate::op::op_future! { fn $type::$method -> $result, struct $name<$lifetime $(, $generic $(: $trait )? )* $(; const $const_generic: $const_ty )*> { $( @@ -1262,6 +1278,9 @@ macro_rules! op_future { $field: $value, )* }, + $( + drop_using: $drop_wake, + )? $( $(#[ $phantom_doc ])* impl !Unpin, @@ -1281,6 +1300,9 @@ macro_rules! op_future { $field: ident : $value: ty, )* }, + $( + drop_using: $drop_wake: tt, + )? $( $(#[ $phantom_doc: meta ])* impl !Unpin, @@ -1298,6 +1320,9 @@ macro_rules! op_future { $field: $value, )* }, + $( + drop_using: $drop_wake, + )? $( $(#[ $phantom_doc ])* impl !Unpin, diff --git a/src/process.rs b/src/process.rs index 871435e..120cd6f 100644 --- a/src/process.rs +++ b/src/process.rs @@ -432,7 +432,7 @@ impl Drop for ToSignalsDirect { let signals = unsafe { ManuallyDrop::take(&mut self.signals) }; let result = self.signals.fd.sq.cancel_op( op_index, - (signals, direct_fd), + Box::from((signals, direct_fd)), |submission| unsafe { submission.cancel_op(op_index); // We'll get a canceled completion event if we succeeded, which From 10b265df24cc4922384eecf1f5e3db4cb3b7c672 Mon Sep 17 00:00:00 2001 From: Thomas de Zeeuw Date: Sun, 21 Jul 2024 12:44:05 +0200 Subject: [PATCH 4/5] Rename DropWaker to DropWake A more action-y name for the trait, which matches the usual Rust trait naming convention. --- src/drop_waker.rs | 24 ++++++++++++------------ src/lib.rs | 4 ++-- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/drop_waker.rs b/src/drop_waker.rs index 607ee5e..51c9571 100644 --- a/src/drop_waker.rs +++ b/src/drop_waker.rs @@ -1,4 +1,4 @@ -//! [`DropWaker`] trait and implementations. +//! [`DropWake`] trait and implementations. //! //! See [`drop_task_waker`]. @@ -13,8 +13,8 @@ use std::task; /// # Safety /// /// The returned `task::Waker` cannot be cloned, it will panic. -pub(crate) unsafe fn drop_task_waker(to_drop: T) -> task::Waker { - unsafe fn drop_by_ptr(data: *const ()) { +pub(crate) unsafe fn drop_task_waker(to_drop: T) -> task::Waker { + unsafe fn drop_by_ptr(data: *const ()) { T::drop_from_waker_data(data); } @@ -34,7 +34,7 @@ pub(crate) unsafe fn drop_task_waker(to_drop: T) -> task::Waker { } /// Trait used by [`drop_task_waker`]. -pub(crate) trait DropWaker { +pub(crate) trait DropWake { /// Return itself as waker data. fn into_waker_data(self) -> *const (); @@ -42,9 +42,9 @@ pub(crate) trait DropWaker { unsafe fn drop_from_waker_data(data: *const ()); } -impl DropWaker for UnsafeCell +impl DropWake for UnsafeCell where - T: DropWaker, + T: DropWake, { fn into_waker_data(self) -> *const () { self.into_inner().into_waker_data() @@ -55,9 +55,9 @@ where } } -impl DropWaker for (T,) +impl DropWake for (T,) where - T: DropWaker, + T: DropWake, { fn into_waker_data(self) -> *const () { self.0.into_waker_data() @@ -68,7 +68,7 @@ where } } -impl DropWaker for () { +impl DropWake for () { fn into_waker_data(self) -> *const () { ptr::null() } @@ -78,7 +78,7 @@ impl DropWaker for () { } } -impl DropWaker for Box { +impl DropWake for Box { fn into_waker_data(self) -> *const () { Box::into_raw(self).cast() } @@ -88,7 +88,7 @@ impl DropWaker for Box { } } -impl DropWaker for Arc { +impl DropWake for Arc { fn into_waker_data(self) -> *const () { Arc::into_raw(self).cast() } @@ -98,7 +98,7 @@ impl DropWaker for Arc { } } -impl DropWaker for CString { +impl DropWake for CString { fn into_waker_data(self) -> *const () { CString::into_raw(self).cast() } diff --git a/src/lib.rs b/src/lib.rs index a948e74..5211b30 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -168,7 +168,7 @@ use bitmap::AtomicBitMap; pub use cancel::Cancel; use config::munmap; pub use config::Config; -use drop_waker::{drop_task_waker, DropWaker}; +use drop_waker::{drop_task_waker, DropWake}; #[doc(no_inline)] pub use extract::Extract; #[doc(no_inline)] @@ -826,7 +826,7 @@ impl SubmissionQueue { ) -> Result<(), QueueFull> where F: FnOnce(&mut Submission), - T: DropWaker, + T: DropWake, { log::trace!(op_index = op_index.0; "canceling operation"); if let Some(operation) = self.shared.queued_ops.get(op_index.0) { From 1fd4ec6d0b306543a7ff954067da9351c813ba07 Mon Sep 17 00:00:00 2001 From: Thomas de Zeeuw Date: Sun, 21 Jul 2024 12:53:35 +0200 Subject: [PATCH 5/5] Delay allocate of drop waker type This is useful for the types that don't implement DropWake themselves and thus have to use a Box. That Box allocation can now be delayed until it's actually needed, which might not be needed if we expect no further events for the submission. --- src/fs.rs | 66 ++++++++++++++++++++++++++++++-------------------- src/lib.rs | 24 +++++++++--------- src/op.rs | 9 ++++--- src/poll.rs | 34 ++++++++++++++++---------- src/process.rs | 38 ++++++++++++++++------------- 5 files changed, 101 insertions(+), 70 deletions(-) diff --git a/src/fs.rs b/src/fs.rs index 1d73009..a9e5ffd 100644 --- a/src/fs.rs +++ b/src/fs.rs @@ -269,12 +269,16 @@ impl Drop for Open { // `None`, but in that case `self.path` would also be // `None`. let sq = self.sq.as_ref().unwrap(); - let result = sq.cancel_op(op_index, path, |submission| unsafe { - submission.cancel_op(op_index); - // We'll get a canceled completion event if we succeeded, which - // is sufficient to cleanup the operation. - submission.no_completion_event(); - }); + let result = sq.cancel_op( + op_index, + || path, + |submission| unsafe { + submission.cancel_op(op_index); + // We'll get a canceled completion event if we succeeded, which + // is sufficient to cleanup the operation. + submission.no_completion_event(); + }, + ); if let Err(err) = result { log::error!( "dropped a10::Open before completion, attempt to cancel failed: {err}" @@ -811,12 +815,16 @@ impl Drop for CreateDir { if let Some(path) = self.path.take() { match self.state { OpState::Running(op_index) => { - let result = self.sq.cancel_op(op_index, path, |submission| unsafe { - submission.cancel_op(op_index); - // We'll get a canceled completion event if we succeeded, which - // is sufficient to cleanup the operation. - submission.no_completion_event(); - }); + let result = self.sq.cancel_op( + op_index, + || path, + |submission| unsafe { + submission.cancel_op(op_index); + // We'll get a canceled completion event if we succeeded, which + // is sufficient to cleanup the operation. + submission.no_completion_event(); + }, + ); if let Err(err) = result { log::error!("dropped a10::CreateDir before completion, attempt to cancel failed: {err}"); } @@ -909,14 +917,16 @@ impl Drop for Rename { match self.state { OpState::Running(op_index) => { - let result = - self.sq - .cancel_op(op_index, Box::from((from, to)), |submission| unsafe { - submission.cancel_op(op_index); - // We'll get a canceled completion event if we succeeded, which - // is sufficient to cleanup the operation. - submission.no_completion_event(); - }); + let result = self.sq.cancel_op( + op_index, + || Box::from((from, to)), + |submission| unsafe { + submission.cancel_op(op_index); + // We'll get a canceled completion event if we succeeded, which + // is sufficient to cleanup the operation. + submission.no_completion_event(); + }, + ); if let Err(err) = result { log::error!("dropped a10::CreateDir before completion, attempt to cancel failed: {err}"); } @@ -1016,12 +1026,16 @@ impl Drop for Delete { if let Some(path) = self.path.take() { match self.state { OpState::Running(op_index) => { - let result = self.sq.cancel_op(op_index, path, |submission| unsafe { - submission.cancel_op(op_index); - // We'll get a canceled completion event if we succeeded, which - // is sufficient to cleanup the operation. - submission.no_completion_event(); - }); + let result = self.sq.cancel_op( + op_index, + || path, + |submission| unsafe { + submission.cancel_op(op_index); + // We'll get a canceled completion event if we succeeded, which + // is sufficient to cleanup the operation. + submission.no_completion_event(); + }, + ); if let Err(err) = result { log::error!("dropped a10::CreateDir before completion, attempt to cancel failed: {err}"); } diff --git a/src/lib.rs b/src/lib.rs index 5211b30..2ce365e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -807,26 +807,28 @@ impl SubmissionQueue { /// Mark the operation with `op_index` as dropped, attempting to cancel it. /// - /// Because the kernel still has access to the `resources`, we might have to - /// do some trickery to delay the deallocation of `resources` and making the + /// Because the kernel still has access to the resources, we might have to + /// do some trickery to delay the deallocation of resources and making the /// queued operation slot available again. /// /// When the operation is still in progress we attempt to cancel it using /// submission created by `cancel`. If the operation has completed it will - /// just drop `resources` and make the slot available again. + /// just drop resources (using `create_drop_waker`) and make the slot + /// available again. /// /// # Notes /// /// `cancel` should most likely use [`Submission::no_completion_event`] - pub(crate) fn cancel_op( + pub(crate) fn cancel_op( &self, op_index: OpIndex, - resources: T, + create_drop_waker: R, cancel: F, ) -> Result<(), QueueFull> where + R: FnOnce() -> D, + D: DropWake, F: FnOnce(&mut Submission), - T: DropWake, { log::trace!(op_index = op_index.0; "canceling operation"); if let Some(operation) = self.shared.queued_ops.get(op_index.0) { @@ -835,13 +837,13 @@ impl SubmissionQueue { if op.no_more_events() { // Easy path, the operation has already been completed. *operation = None; - // Unlock defore dropping `resources`, which might take a + // Unlock defore dropping `create_drop_waker`, which might take a // while. drop(operation); self.shared.op_indices.make_available(op_index.0); // We can safely drop the resources. - drop(resources); + drop(create_drop_waker); return Ok(()); } @@ -856,10 +858,10 @@ impl SubmissionQueue { // until the kernel is done with the operation. // // We achieve 1 by creating a special waker that just drops the - // resources in `resources`. - let waker = if needs_drop::() { + // resources (created by `create_drop_waker`). + let waker = if needs_drop::() { // SAFETY: we're not going to clone the `waker`. - Some(unsafe { drop_task_waker(resources) }) + Some(unsafe { drop_task_waker(create_drop_waker()) }) } else { // Of course if we don't need to drop `T`, then we don't // have to use a special waker. But we still don't want to diff --git a/src/op.rs b/src/op.rs index b5d2e30..1b58317 100644 --- a/src/op.rs +++ b/src/op.rs @@ -1229,8 +1229,11 @@ macro_rules! op_future { $crate::op::OpState::Running(op_index) => { // Use a different type for the `DropWake` // implementation. - $( let resources = $drop_wake::from(resources); )? - let result = self.fd.sq.cancel_op(op_index, resources, |submission| unsafe { + let drop_resource = || { + $( let resources = $drop_wake::from(resources); )? + resources + }; + let result = self.fd.sq.cancel_op(op_index, drop_resource, |submission| unsafe { submission.cancel_op(op_index); // We'll get a canceled completion event if we succeeded, which // is sufficient to cleanup the operation. @@ -1586,7 +1589,7 @@ macro_rules! op_async_iter { impl<$lifetime, D: $crate::fd::Descriptor> std::ops::Drop for $name<$lifetime, D> { fn drop(&mut self) { if let $crate::op::OpState::Running(op_index) = self.state { - let result = self.fd.sq.cancel_op(op_index, (), |submission| unsafe { + let result = self.fd.sq.cancel_op(op_index, || (), |submission| unsafe { submission.cancel_op(op_index); // We'll get a canceled completion event if we succeeded, which // is sufficient to cleanup the operation. diff --git a/src/poll.rs b/src/poll.rs index 0eca564..ecf2a2a 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -91,13 +91,17 @@ impl<'sq> Cancel for OneshotPoll<'sq> { impl<'sq> Drop for OneshotPoll<'sq> { fn drop(&mut self) { if let OpState::Running(op_index) = self.state { - let result = self.sq.cancel_op(op_index, (), |submission| unsafe { - submission.remove_poll(op_index); - submission.set_async(); - // We'll get a canceled completion event if we succeeded, which - // is sufficient to cleanup the operation. - submission.no_completion_event(); - }); + let result = self.sq.cancel_op( + op_index, + || (), + |submission| unsafe { + submission.remove_poll(op_index); + submission.set_async(); + // We'll get a canceled completion event if we succeeded, which + // is sufficient to cleanup the operation. + submission.no_completion_event(); + }, + ); if let Err(err) = result { log::error!( "dropped a10::OneshotPoll before completion, attempt to cancel failed: {err}" @@ -217,12 +221,16 @@ impl<'sq> Cancel for MultishotPoll<'sq> { impl<'sq> Drop for MultishotPoll<'sq> { fn drop(&mut self) { if let OpState::Running(op_index) = self.state { - let result = self.sq.cancel_op(op_index, (), |submission| unsafe { - submission.remove_poll(op_index); - // We'll get a canceled completion event if we succeeded, which - // is sufficient to cleanup the operation. - submission.no_completion_event(); - }); + let result = self.sq.cancel_op( + op_index, + || (), + |submission| unsafe { + submission.remove_poll(op_index); + // We'll get a canceled completion event if we succeeded, which + // is sufficient to cleanup the operation. + submission.no_completion_event(); + }, + ); if let Err(err) = result { log::error!( "dropped a10::MultishotPoll before canceling it, attempt to cancel failed: {err}" diff --git a/src/process.rs b/src/process.rs index 120cd6f..5cfbc5b 100644 --- a/src/process.rs +++ b/src/process.rs @@ -125,12 +125,16 @@ impl Drop for WaitId { // Only drop the signal `info` field once we know the // operation has finished, otherwise the kernel might write // into memory we have deallocated. - let result = self.sq.cancel_op(op_index, info, |submission| unsafe { - submission.cancel_op(op_index); - // We'll get a canceled completion event if we succeeded, which - // is sufficient to cleanup the operation. - submission.no_completion_event(); - }); + let result = self.sq.cancel_op( + op_index, + || info, + |submission| unsafe { + submission.cancel_op(op_index); + // We'll get a canceled completion event if we succeeded, which + // is sufficient to cleanup the operation. + submission.no_completion_event(); + }, + ); if let Err(err) = result { log::error!("dropped a10::WaitId before canceling it, attempt to cancel failed: {err}"); } @@ -432,7 +436,7 @@ impl Drop for ToSignalsDirect { let signals = unsafe { ManuallyDrop::take(&mut self.signals) }; let result = self.signals.fd.sq.cancel_op( op_index, - Box::from((signals, direct_fd)), + || Box::from((signals, direct_fd)), |submission| unsafe { submission.cancel_op(op_index); // We'll get a canceled completion event if we succeeded, which @@ -589,16 +593,16 @@ impl ReceiveSignals { // deallocated. // SAFETY: we're in the `Drop` implementation, so `self.info` can't // be used anymore making it safe to take ownership. - let result = - self.signals - .fd - .sq - .cancel_op(op_index, signal_info, |submission| unsafe { - submission.cancel_op(op_index); - // We'll get a canceled completion event if we succeeded, which - // is sufficient to cleanup the operation. - submission.no_completion_event(); - }); + let result = self.signals.fd.sq.cancel_op( + op_index, + || signal_info, + |submission| unsafe { + submission.cancel_op(op_index); + // We'll get a canceled completion event if we succeeded, which + // is sufficient to cleanup the operation. + submission.no_completion_event(); + }, + ); if let Err(err) = result { log::error!( "dropped a10::ReceiveSignals before canceling it, attempt to cancel failed: {err}"