diff --git a/README.md b/README.md index a6049b9..579efa5 100644 --- a/README.md +++ b/README.md @@ -10,10 +10,10 @@ [codecov]: https://codecov.io/gh/memflow/mfio/branch/main/graph/badge.svg?token=IJ1K4QPAIM [codecov-link]: https://codecov.io/gh/memflow/mfio -## Async completion I/O with non-sequential results +## Async completion I/O research with non-sequential results -mfio is memflow's async completion based I/O base. It aims to make the following aspects of an -I/O chain as simple as possible: +mfio is memflow's research async completion based I/O base. It aims to make the following aspects +of an I/O chain as simple as possible: 1. Async 2. Automatic batching (vectoring) diff --git a/mfio/src/lib.rs b/mfio/src/lib.rs index 2b9b6e4..3b993c1 100644 --- a/mfio/src/lib.rs +++ b/mfio/src/lib.rs @@ -1,9 +1,9 @@ //! # mfio //! -//! ## Async completion I/O with non-sequential results +//! ## Research async completion I/O with non-sequential results //! -//! mfio is memflow's async completion based I/O base. It aims to make the following aspects of an -//! I/O chain as simple as possible: +//! mfio is memflow's research async completion based I/O base. It aims to make the following +//! aspects of an I/O chain as simple as possible: //! //! 1. Async //! 2. Automatic batching (vectoring) @@ -23,6 +23,12 @@ //! works really well with sparse files, albeit differs from the typical "stop as soon as an error //! occurs" model. //! +//! Lack of color is not true sync/async mix, instead, mfio is designed to expose minimal set of +//! data for invoking a built-in runtime, with handles of daisy chaining mfio on top of another +//! runtime. The end result is that mfio is able to provide sync wrappers that efficiently poll +//! async operations to completion, while staying runtime agnostic. We found that a single (unix) +//! file descriptor or (windows) handle is sufficient to connect multiple async runtimes together. +//! //! ### Examples //! //! Read primitive values: @@ -132,8 +138,16 @@ //! //! ## Safety //! -//! mfio can invoke UB in safe code if `mem::forget` is run on a packet stream and data sent to the -//! stream gets reused. +//! By default mfio is conservative and does not enable invoking undefined behavior. However, with +//! a custom opt-in config switch, enabled by passing `--cfg mfio_assume_linear_types` to the rust +//! compiler, mfio is able to provide significant performance improvements, at the cost of +//! potential for invoking UB in safe code. +//! +//! With `mfio_assume_linear_types` config enabled, mfio wrappers will prefer storing data on the +//! stack, and if a future waiting for I/O operations to complete is cancelled, a `panic!` may get +//! triggered. Moreover, if a future waiting for I/O operations to complete gets forgotten using +//! `mem::forget`, undefined behavior may be invoked, because use-after-(stack)-free safeguards are +//! discarded. //! //! ### Safety examples //! @@ -252,8 +266,8 @@ pub mod heap; pub(crate) mod multistack; pub mod packet; mod poller; -//pub mod stdeq; -//pub mod traits; +pub mod stdeq; +pub mod traits; pub mod util; pub use tarc; diff --git a/mfio/src/packet.rs b/mfio/src/packet.rs index 3de4a93..aba289b 100644 --- a/mfio/src/packet.rs +++ b/mfio/src/packet.rs @@ -106,7 +106,7 @@ use core::mem::MaybeUninit; use core::num::NonZeroI32; use core::pin::Pin; use core::ptr::NonNull; -use core::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering}; +use core::sync::atomic::*; use core::task::{Context, Poll}; use futures::stream::Stream; use parking_lot::Mutex; @@ -209,7 +209,7 @@ impl BoundPacketView { pub unsafe fn forget(mut self) { let pkt = ManuallyDrop::take(&mut self.view); - let waker = pkt.pkt().on_output(); + let waker = pkt.pkt().on_output(None); debug_assert!(waker.is_none()); @@ -251,7 +251,7 @@ impl BoundPacketView { let pkt = ManuallyDrop::take(&mut self.view); let error = error.map(IntError::into_int_err); - let waker = pkt.pkt().on_output(); + let waker = pkt.pkt().on_output(error.map(|v| (self.view.start, v))); if let Some(output) = self.output.take() { (output.vtbl.output)(output, pkt, error); @@ -294,7 +294,7 @@ impl BoundPacketView { mut self, input: Perms::ReverseDataType, ) -> TransferredPacket { - if let Some(vtable) = self.view.pkt().vtbl { + if let Some(vtable) = self.view.pkt().vtbl.vtbl() { (vtable.transfer_data_fn())(&mut self.view, input); } else { Perms::transfer_data_simple(&mut self.view, input); @@ -502,17 +502,72 @@ impl AsRef> for FullPacket { } } +#[repr(C)] +pub struct RefPacket<'a, Perms: PacketPerms> { + header: Packet, + data: PackedLenData, + // mut because it is more constrained than const. + _phantom: PhantomData<&'a mut u8>, +} + +impl AsRef> for RefPacket<'_, Perms> { + fn as_ref(&self) -> &Packet { + &self.header + } +} + #[repr(C, packed)] struct PackedLenData { len: usize, data: T, } +#[repr(usize)] +#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord)] +enum PacketVtblTag { + SimpleDirect = 0, + SimpleIndirect = 1, + Complex, +} + +#[derive(Clone, Copy)] +union PacketVtblRef { + tag: usize, + vtbl: &'static Perms, +} + +impl core::fmt::Debug for PacketVtblRef { + fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { + match self.tag() { + PacketVtblTag::Complex => core::fmt::Debug::fmt(unsafe { self.vtbl }, f), + v => core::fmt::Debug::fmt(&v, f), + } + } +} + +impl PacketVtblRef { + fn tag(&self) -> PacketVtblTag { + match unsafe { self.tag } { + 0 => PacketVtblTag::SimpleDirect, + 1 => PacketVtblTag::SimpleIndirect, + _ => PacketVtblTag::Complex, + } + } + + fn vtbl(&self) -> Option<&'static Perms> { + if self.tag() == PacketVtblTag::Complex { + unsafe { Some(self.vtbl) } + } else { + None + } + } +} + #[repr(C)] #[derive(Debug)] pub struct Packet { /// If `None`, then we have a len + byte buffer after the header - vtbl: Option<&'static Perms>, + vtbl: PacketVtblRef, /// Number of packet fragments currently in-flight (-1), and waker flags. /// /// The waker flags are encoded in the 2 highest bits. The left-most bit is a "start writing" @@ -572,6 +627,14 @@ pub struct Packet { rc_and_flags: AtomicUsize, /// Waker to be triggered, upon `rc` dropping down to 0. waker: UnsafeCell>, + /// What was the smallest position that resulted in an error. + /// + /// This value is initialized to !0, and upon each errored packet segment, is minned + /// atomically. Upon I/O is complete, this allows the caller to check for the size of the + /// contiguous memory region being successfully processed without gaps. + error_clamp: AtomicU64, + /// Note that this may be raced against so it should not be relied as "the minimum error". + min_error: AtomicI32, // data afterwards } @@ -609,7 +672,13 @@ impl<'a, Perms: PacketPerms> Future for &'a Packet { } impl Packet { - unsafe fn on_output(&self) -> Option { + unsafe fn on_output(&self, error: Option<(u64, NonZeroI32)>) -> Option { + if let Some((start, error)) = error { + if self.error_clamp.fetch_min(start, Ordering::AcqRel) < start { + self.min_error.store(error.into(), Ordering::Relaxed); + } + } + let loaded = self.rc_and_flags.fetch_sub(1, Ordering::AcqRel); // Do nothing, because we are either: @@ -631,9 +700,13 @@ impl Packet { pub unsafe fn new_unchecked(data: T) -> FullPacket { FullPacket { header: Packet { - vtbl: None, + vtbl: PacketVtblRef { + tag: PacketVtblTag::SimpleDirect as _, + }, rc_and_flags: AtomicUsize::new(0), waker: UnsafeCell::new(MaybeUninit::uninit()), + error_clamp: (!0u64).into(), + min_error: 0.into(), }, data: PackedLenData { len: core::mem::size_of::(), @@ -649,19 +722,62 @@ impl Packet { pub unsafe fn as_simple_mut(&mut self) -> &mut FullPacket, Perms> { &mut *(self as *mut Self).cast() } + + pub fn simple_data_ptr(&self) -> *const u8 { + match self.vtbl.tag() { + PacketVtblTag::SimpleDirect => { + let simple = unsafe { self.as_simple() }; + let data = unsafe { (simple as *const FullPacket<_, _>).add(1) }.cast::(); + data + } + PacketVtblTag::SimpleIndirect => { + let simple = unsafe { self.as_simple() }; + let data = + unsafe { (simple as *const FullPacket<_, _>).add(1) }.cast::<*const u8>(); + unsafe { *data } + } + PacketVtblTag::Complex => panic!("simple_data_ptr called on complex Packet"), + } + } + + pub fn min_error(&self) -> Option { + NonZeroI32::new(self.min_error.load(Ordering::Relaxed)).map(Error::from_int_err) + } + + pub fn error_clamp(&self) -> u64 { + self.error_clamp.load(Ordering::Relaxed) + } + + pub fn err_on_zero(&self) -> Result<(), Error> { + if self.error_clamp() > 0 { + Ok(()) + } else { + Err(self.min_error().expect("No error when error_clamp is 0")) + } + } + + pub fn err_any(&self) -> Result<(), Error> { + if let Some(err) = self.min_error() { + Err(err) + } else { + Ok(()) + } + } } -impl Packet { - pub fn new_uninit() -> FullPacket, Read> { +impl Packet { + pub fn new_uninit() -> FullPacket, Write> { unsafe { Self::new_unchecked(MaybeUninit::uninit()) } } +} - pub fn new_buf(len: usize) -> BaseArc> { +impl Packet { + pub fn new_buf(len: usize) -> BaseArc> { // TODO: feature gate this use std::alloc::Layout; - let size = core::mem::size_of::, Read>>() + len; - let align = core::mem::align_of::, Read>>(); + let size = core::mem::size_of::, Perms>>() + len; + let align = core::mem::align_of::, Perms>>(); // SAFETY: we are creating a packet that is sufficient for our needed amount of data. // In addition, we do not need a cleanup routine, because, because the only object that @@ -674,12 +790,16 @@ impl Packet { // SAFETY: we are initializing the packet here, hence the transmute is valid unsafe { // first we need to write the packet data to make it valid - (packet.exclusive_ptr().unwrap().as_ptr() as *mut FullPacket, Read>) + (packet.exclusive_ptr().unwrap().as_ptr() as *mut FullPacket, Perms>) .write(FullPacket { header: Packet { - vtbl: None, + vtbl: PacketVtblRef { + tag: PacketVtblTag::SimpleDirect as _, + }, rc_and_flags: AtomicUsize::new(0), waker: UnsafeCell::new(MaybeUninit::uninit()), + error_clamp: (!0u64).into(), + min_error: 0.into(), }, data: PackedLenData { len, @@ -692,6 +812,34 @@ impl Packet { } } +impl Packet { + pub fn from_slice(buf: &[u8]) -> BaseArc> { + let pkt = Self::new_buf(buf.len()); + unsafe { + core::ptr::copy_nonoverlapping( + buf.as_ptr(), + pkt.simple_data_ptr().cast_mut(), + buf.len(), + ) + }; + pkt + } +} + +impl Packet { + pub fn from_slice(buf: &[u8]) -> BaseArc> { + let pkt = Self::new_buf(buf.len()); + unsafe { + core::ptr::copy_nonoverlapping( + buf.as_ptr(), + pkt.simple_data_ptr().cast_mut(), + buf.len(), + ) + }; + pkt + } +} + /// Represents types that can be made opaque. /// /// This trait enables optimal object storage outcomes, based on the promises and assumptions the @@ -766,6 +914,37 @@ pub trait OpaqueStore { fn stack_opaque<'a>(stack: &'a mut Self::StackReq<'a>) -> Self::Opaque<'a>; } +impl OpaqueStore for BaseArc> { + type ConstHdr = Packet; + type Opaque<'a> = PacketView<'a, Perms> where Self: 'a; + // TODO: cfg switch this to a stack based obj. + type StackReq<'a> = Self where Self: 'a; + type HeapReq = Self where Self: 'static; + + fn stack<'a>(self) -> Self::StackReq<'a> + where + Self: 'a, + { + self + } + + fn heap(self) -> Self::HeapReq + where + Self: 'static, + { + self + } + + fn stack_hdr<'a>(stack: &'a Self::StackReq<'a>) -> &'a Self::ConstHdr { + stack + } + + fn stack_opaque<'a>(stack: &'a mut Self::StackReq<'a>) -> Self::Opaque<'a> { + // TODO: deal with tags + PacketView::from_arc_ref(stack, 0) + } +} + impl OpaqueStore for FullPacket { type ConstHdr = Packet; type Opaque<'a> = PacketView<'a, Perms> where Self: 'a; @@ -837,8 +1016,50 @@ impl OpaqueStore for BaseArc> { } } -impl From>> for PacketView<'static, Perms> { - fn from(pkt: BaseArc>) -> Self { +impl OpaqueStore for RefPacket<'static, Perms> { + type ConstHdr = Packet; + type Opaque<'a> = PacketView<'a, Perms> where Self: 'a; + type StackReq<'a> = Self; + type HeapReq = BaseArc; + + fn stack<'a>(self) -> Self::StackReq<'a> + where + Self: 'a, + { + self + } + + fn heap(self) -> Self::HeapReq + where + Self: 'static, + { + self.into() + } + + fn stack_hdr<'a>(stack: &'a Self::StackReq<'a>) -> &'a Self::ConstHdr { + stack.as_ref() + } + + fn stack_opaque<'a>(stack: &'a mut Self::StackReq<'a>) -> Self::Opaque<'a> { + // TODO: deal with tags + PacketView::from_arc_ref( + // SAFETY: we know the header is at the start of the fullpacket struct, therefore + // reinterpreting the reference is okay. + unsafe { &*(stack as *mut Self).cast() }, + 0, + ) + } +} + +impl From>> for PacketView<'static, Perms> { + fn from(pkt: BaseArc>) -> Self { + // TODO: deal with tags + Self::from_arc(pkt, 0) + } +} + +impl>, Perms: PacketPerms> From> for PacketView<'static, Perms> { + fn from(pkt: BaseArc) -> Self { // TODO: deal with tags Self::from_arc(pkt.transpose().into_base().unwrap(), 0) } @@ -854,6 +1075,83 @@ impl<'a, Perms: PacketPerms, T> PacketStore<'a, Perms> for T where { } +trait AnyBytes {} + +impl AnyBytes for u8 {} +impl AnyBytes for MaybeUninit {} + +pub trait IntoPacket<'a, Perms: PacketPerms> { + type Target: PacketStore<'a, Perms>; + type SyncHandle; + + /// Converts `Self` into a packet. + fn into_packet(self) -> (Self::Target, Self::SyncHandle); + + /// Sync data back from the packet. + /// + /// The blanket implementation of this method is a no-op, however, some types may need to have + /// the I/O results be synchronized back from the packets that were sent out (this is due to + /// necessary heap indirections needed to appease the borrow checker). Therefore, any I/O + /// abstraction should call this function before returning data back to the user. + fn sync_back(_hdr: &::ConstHdr, _handle: Self::SyncHandle) {} +} + +impl<'a, T: PacketStore<'a, Perms>, Perms: PacketPerms> IntoPacket<'a, Perms> for T { + type Target = Self; + type SyncHandle = (); + + fn into_packet(self) -> (Self, ()) { + (self, ()) + } +} + +impl<'a, 'b: 'a> IntoPacket<'a, Read> for &'b [u8] { + type Target = BaseArc>; + type SyncHandle = (); + + fn into_packet(self) -> (Self::Target, ()) { + (Packet::::from_slice(self), ()) + } +} + +impl<'a, 'b: 'a, T: AnyBytes> IntoPacket<'a, Write> for &'b mut [T] { + type Target = BaseArc>; + type SyncHandle = Self; + + fn into_packet(self) -> (Self::Target, Self) { + (Packet::::new_buf(self.len()), self) + } + + fn sync_back(hdr: &::ConstHdr, handle: Self::SyncHandle) { + unsafe { + core::ptr::copy_nonoverlapping( + hdr.simple_data_ptr(), + handle.as_mut_ptr().cast(), + core::cmp::min(handle.len(), hdr.error_clamp() as usize), + ); + } + } +} + +impl<'a, 'b: 'a> IntoPacket<'a, ReadWrite> for &'b mut [u8] { + type Target = BaseArc>; + type SyncHandle = Self; + + fn into_packet(self) -> (Self::Target, Self) { + (Packet::::from_slice(&*self), self) + } + + fn sync_back(hdr: &::ConstHdr, handle: Self::SyncHandle) { + unsafe { + core::ptr::copy_nonoverlapping( + hdr.simple_data_ptr(), + handle.as_mut_ptr(), + core::cmp::min(handle.len(), hdr.error_clamp() as usize), + ); + } + } +} + use cglue::prelude::v1::*; #[cglue_trait] @@ -901,6 +1199,14 @@ pub struct IoFut<'a, T, Perms: PacketPerms, Param, Packet: PacketStore<'a, Perms _phantom: PhantomData, } +impl<'a, T: PacketIo, Perms: PacketPerms, Param, Pkt: PacketStore<'a, Perms>> + IoFut<'a, T, Perms, Param, Pkt> +{ + pub fn hdr(&self) -> &Pkt::ConstHdr { + Pkt::stack_hdr(unsafe { &*self.pkt.get() }) + } +} + impl<'a, T: PacketIo, Perms: PacketPerms, Param, Pkt: PacketStore<'a, Perms>> Future for IoFut<'a, T, Perms, Param, Pkt> { @@ -980,7 +1286,7 @@ pub trait PacketPerms: 'static + core::fmt::Debug + Clone + Copy { fn len_fn(&self) -> LenFn; fn len(packet: &Packet) -> u64 { - if let Some(vtbl) = packet.vtbl { + if let Some(vtbl) = packet.vtbl.vtbl() { unsafe { (vtbl.len_fn())(packet) } } else { let simple = unsafe { packet.as_simple() }; @@ -991,7 +1297,7 @@ pub trait PacketPerms: 'static + core::fmt::Debug + Clone + Copy { fn alloc_fn(&self) -> AllocFn; unsafe fn alloced_simple(packet: BoundPacketView) -> Self::Alloced; fn try_alloc(packet: BoundPacketView, alignment: usize) -> MaybeAlloced { - if let Some(vtbl) = packet.view.pkt().vtbl { + if let Some(vtbl) = packet.view.pkt().vtbl.vtbl() { let mut view = ManuallyDrop::new(packet); let mut out = MaybeUninit::uninit(); let ret = unsafe { (vtbl.alloc_fn())(&mut view, alignment, &mut out) }; @@ -1001,8 +1307,7 @@ pub trait PacketPerms: 'static + core::fmt::Debug + Clone + Copy { Err(ManuallyDrop::into_inner(view)) } } else { - let simple = unsafe { packet.view.pkt().as_simple() }; - let data = unsafe { (simple as *const FullPacket<_, _>).add(1) }.cast::(); + let data = packet.view.pkt().simple_data_ptr(); if data.align_offset(alignment) == 0 { Ok(unsafe { Self::alloced_simple(packet) }) } else { @@ -1014,7 +1319,7 @@ pub trait PacketPerms: 'static + core::fmt::Debug + Clone + Copy { fn transfer_data_fn(&self) -> TransferDataFn; unsafe fn transfer_data_simple(packet: &mut PacketView, input: Self::ReverseDataType); unsafe fn transfer_data(packet: &mut PacketView, input: Self::ReverseDataType) { - if let Some(vtbl) = packet.pkt().vtbl { + if let Some(vtbl) = packet.pkt().vtbl.vtbl() { (vtbl.transfer_data_fn())(packet, input) } else { Self::transfer_data_simple(packet, input) @@ -1058,11 +1363,7 @@ impl PacketPerms for ReadWrite { } unsafe fn alloced_simple(packet: BoundPacketView) -> Self::Alloced { - let simple = unsafe { packet.view.pkt().as_simple() }; - let data = (simple as *const FullPacket<_, _>) - .add(1) - .cast::() - .cast_mut(); + let data = packet.view.pkt().simple_data_ptr().cast_mut(); ReadWritePacketObj { buffer: packet, alloced_packet: data, @@ -1071,10 +1372,7 @@ impl PacketPerms for ReadWrite { unsafe fn transfer_data_simple(view: &mut PacketView, data: *mut ()) { let simple = unsafe { view.pkt().as_simple() }; - let src = (simple as *const FullPacket<_, _>) - .add(1) - .cast::() - .cast_mut(); + let src = view.pkt().simple_data_ptr().cast_mut(); // TODO: does this operation even make sense? core::ptr::swap_nonoverlapping(src, data.cast(), simple.data.len); } @@ -1116,11 +1414,7 @@ impl PacketPerms for Write { } unsafe fn alloced_simple(packet: BoundPacketView) -> Self::Alloced { - let simple = unsafe { packet.view.pkt().as_simple() }; - let data = (simple as *const FullPacket<_, _>) - .add(1) - .cast::>() - .cast_mut(); + let data = packet.view.pkt().simple_data_ptr().cast_mut().cast(); WritePacketObj { buffer: packet, alloced_packet: data, @@ -1128,8 +1422,8 @@ impl PacketPerms for Write { } unsafe fn transfer_data_simple(view: &mut PacketView, data: *const ()) { - let simple = unsafe { view.pkt_mut().as_simple_mut() }; - let dst = (simple as *mut FullPacket<_, _>).add(1).cast::(); + let simple = unsafe { view.pkt().as_simple() }; + let dst = view.pkt().simple_data_ptr().cast_mut(); core::ptr::copy(data.cast(), dst, simple.data.len); } } @@ -1170,8 +1464,7 @@ impl PacketPerms for Read { } unsafe fn alloced_simple(packet: BoundPacketView) -> Self::Alloced { - let simple = unsafe { packet.view.pkt().as_simple() }; - let data = (simple as *const FullPacket<_, _>).add(1).cast::(); + let data = packet.view.pkt().simple_data_ptr().cast(); ReadPacketObj { buffer: packet, alloced_packet: data, @@ -1180,7 +1473,7 @@ impl PacketPerms for Read { unsafe fn transfer_data_simple(view: &mut PacketView, data: *mut ()) { let simple = unsafe { view.pkt().as_simple() }; - let src = (simple as *const FullPacket<_, _>).add(1).cast::(); + let src = view.pkt().simple_data_ptr(); core::ptr::copy(src, data.cast(), simple.data.len); } } diff --git a/mfio/src/stdeq.rs b/mfio/src/stdeq.rs index 939ca83..175278d 100644 --- a/mfio/src/stdeq.rs +++ b/mfio/src/stdeq.rs @@ -3,13 +3,10 @@ use crate as mfio; use crate::packet::*; use crate::traits::*; -use crate::util::PosShift; -use cglue::task::FastCWaker; +use crate::util::{PosShift, UsizeMath}; use core::future::Future; -use core::marker::PhantomData; use core::pin::Pin; use core::task::{Context, Poll}; -use futures::Stream; use mfio_derive::*; use parking_lot::Mutex; use std::io; @@ -38,18 +35,18 @@ impl> PosShift for Param { } pub trait AsyncRead: IoRead { - fn read<'a>(&'a self, buf: &'a mut [u8]) -> AsyncIoFut<'a, Self, Write, Param>; + fn read<'a>(&'a self, buf: &'a mut [u8]) -> AsyncIoFut<'a, Self, Write, Param, &'a mut [u8]>; fn read_to_end<'a>(&'a self, buf: &'a mut Vec) -> StdReadToEndFut<'a, Self, Param>; } impl + StreamPos, Param: 'static + Copy> AsyncRead for T { - fn read<'a>(&'a self, buf: &'a mut [u8]) -> AsyncIoFut<'a, Self, Write, Param> { + fn read<'a>(&'a self, buf: &'a mut [u8]) -> AsyncIoFut<'a, Self, Write, Param, &'a mut [u8]> { + let len = buf.len(); + let (pkt, sync) = <&'a mut [u8] as IntoPacket>::into_packet(buf); AsyncIoFut { - io: self, - pos: self.get_pos(), - len: buf.len(), - state: AsyncIoFutState::NewId(buf.into(), self.new_id()), - _phantom: PhantomData, + len, + fut: self.io(self.get_pos(), pkt), + sync: Some(sync), } } @@ -62,13 +59,13 @@ impl + StreamPos, Param: 'static + Copy> AsyncRead> AsyncRead for T { - fn read<'a>(&'a self, buf: &'a mut [u8]) -> AsyncIoFut<'a, Self, Write, NoPos> { + fn read<'a>(&'a self, buf: &'a mut [u8]) -> AsyncIoFut<'a, Self, Write, NoPos, &'a mut [u8]> { + let len = buf.len(); + let (pkt, sync) = <&'a mut [u8] as IntoPacket>::into_packet(buf); AsyncIoFut { - io: self, - pos: NoPos::new(), - len: buf.len(), - state: AsyncIoFutState::NewId(buf.into(), self.new_id()), - _phantom: PhantomData, + len, + fut: self.io(NoPos::new(), pkt), + sync: Some(sync), } } @@ -81,116 +78,62 @@ impl> AsyncRead for T { } pub trait AsyncWrite: IoWrite { - fn write<'a>(&'a self, buf: &'a [u8]) -> AsyncIoFut<'a, Self, Read, Param>; + fn write<'a>(&'a self, buf: &'a [u8]) -> AsyncIoFut<'a, Self, Read, Param, &'a [u8]>; } impl + StreamPos, Param: Copy> AsyncWrite for T { - fn write<'a>(&'a self, buf: &'a [u8]) -> AsyncIoFut<'a, Self, Read, Param> { + fn write<'a>(&'a self, buf: &'a [u8]) -> AsyncIoFut<'a, Self, Read, Param, &'a [u8]> { + let len = buf.len(); + let (pkt, sync) = buf.into_packet(); AsyncIoFut { - io: self, - pos: self.get_pos(), - len: buf.len(), - state: AsyncIoFutState::NewId(buf.into(), self.new_id()), - _phantom: PhantomData, + len, + fut: self.io(self.get_pos(), pkt), + sync: Some(sync), } } } impl> AsyncWrite for T { - fn write<'a>(&'a self, buf: &'a [u8]) -> AsyncIoFut<'a, Self, Read, NoPos> { + fn write<'a>(&'a self, buf: &'a [u8]) -> AsyncIoFut<'a, Self, Read, NoPos, &'a [u8]> { + let len = buf.len(); + let (pkt, sync) = buf.into_packet(); AsyncIoFut { - io: self, - pos: NoPos::new(), - len: buf.len(), - state: AsyncIoFutState::NewId(buf.into(), self.new_id()), - _phantom: PhantomData, + len, + fut: self.io(NoPos::new(), pkt), + sync: Some(sync), } } } -pub struct AsyncIoFut<'a, Io: PacketIo, Perms: PacketPerms, Param> { - io: *const Io, - pos: Param, +pub struct AsyncIoFut< + 'a, + Io: PacketIo, + Perms: PacketPerms, + Param: 'a, + Obj: IntoPacket<'a, Perms>, +> { + fut: IoFut<'a, Io, Perms, Param, Obj::Target>, + sync: Option, len: usize, - state: AsyncIoFutState<'a, Io, Perms, Param>, - _phantom: PhantomData<&'a mut [u8]>, } -pub enum AsyncIoFutState<'a, Io: PacketIo, Perms: PacketPerms, Param: 'a> { - NewId(Packet<'a, Perms>, NewIdFut<'a, Io, Perms, Param>), - Read( - Option, - Option, - as Future>::Output, - ), - Finished, -} - -impl<'a, Io: PacketIo, Perms: PacketPerms, Param: PosShift> Future - for AsyncIoFut<'a, Io, Perms, Param> +impl<'a, Io: PacketIo, Perms: PacketPerms, Param, Obj: IntoPacket<'a, Perms>> Future + for AsyncIoFut<'a, Io, Perms, Param, Obj> { type Output = io::Result; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { let this = unsafe { self.get_unchecked_mut() }; - loop { - match &mut this.state { - AsyncIoFutState::NewId(_, new_id) => { - let new_id = unsafe { Pin::new_unchecked(new_id) }; - - if let Poll::Ready(id) = new_id.poll(cx) { - let prev = core::mem::replace( - &mut this.state, - AsyncIoFutState::Read(None, None, id), - ); - match (prev, &this.state) { - ( - AsyncIoFutState::NewId(packet, _), - AsyncIoFutState::Read(_, _, id), - ) => { - unsafe { Pin::new_unchecked(id) } - .send_io(this.pos.copy_pos(), packet); - } - _ => unreachable!(), - } - } else { - break Poll::Pending; - } - } - AsyncIoFutState::Read(failed_pos, err, id) => { - match unsafe { Pin::new_unchecked(&mut *id) }.poll_next(cx) { - Poll::Ready(Some((pkt, err))) => { - // We failed, thus cap the output length, - // but we still need to complete outstanding reads. - if err.is_some() { - let new_end = pkt.end(); - let end = failed_pos.get_or_insert(new_end); - *end = core::cmp::min(*end, new_end); - } - } - Poll::Ready(None) => { - let out = failed_pos.unwrap_or(this.len); - - if let Some(err) = err.take() { - if out == 0 { - break Poll::Ready(Err(err)); - } - } - - // SAFETY: there are no more shared references to io. - this.pos.add_pos(out, unsafe { &*this.io }); + let fut = unsafe { Pin::new_unchecked(&mut this.fut) }; - break Poll::Ready(Ok(out)); - } - _ => { - break Poll::Pending; - } - } - } - AsyncIoFutState::Finished => unreachable!(), - } - } + fut.poll(cx).map(|_| { + let hdr = this.fut.hdr(); + // TODO: put this after error checking + Obj::sync_back(hdr, this.sync.take().unwrap()); + // TODO: actual error checking + Ok(core::cmp::min(hdr.error_clamp() as usize, this.len)) + }) } } @@ -208,11 +151,11 @@ impl<'a, Io: PacketIo, Param: PosShift> Future let this = unsafe { self.get_unchecked_mut() }; match unsafe { Pin::new_unchecked(&mut this.fut) }.poll(cx) { - Poll::Ready(Some(r)) => { + Poll::Ready(Ok(r)) => { Param::add_io_pos(this.io, r); Poll::Ready(Ok(())) } - Poll::Ready(None) => Poll::Ready(Err(io::ErrorKind::Other.into())), + Poll::Ready(Err(_)) => Poll::Ready(Err(io::ErrorKind::Other.into())), //Poll::Ready(Err(e)) => Poll::Ready(Err(e)), Poll::Pending => Poll::Pending, } @@ -319,8 +262,8 @@ impl From for Seekable { impl, Perms: PacketPerms, Param> PacketIo for Seekable { - fn try_new_id<'a>(&'a self, context: &mut FastCWaker) -> Option> { - self.handle.try_new_id(context) + fn send_io(&self, param: Param, view: BoundPacketView) { + self.handle.send_io(param, view) } } @@ -353,8 +296,8 @@ impl From for FakeSeek { } impl, Perms: PacketPerms, Param> PacketIo for FakeSeek { - fn try_new_id<'a>(&'a self, context: &mut FastCWaker) -> Option> { - self.handle.try_new_id(context) + fn send_io(&self, param: Param, view: BoundPacketView) { + self.handle.send_io(param, view) } } diff --git a/mfio/src/traits.rs b/mfio/src/traits.rs index 5a21e6d..833a0c7 100644 --- a/mfio/src/traits.rs +++ b/mfio/src/traits.rs @@ -4,36 +4,37 @@ use crate::backend::IoBackend; use crate::error::Error; use crate::util::{CopyPos, UsizeMath}; use bytemuck::Pod; -use cglue::prelude::v1::*; use core::future::Future; -use core::mem::ManuallyDrop; use core::mem::MaybeUninit; use core::pin::Pin; use core::task::{Context, Poll}; -use futures::Stream; pub trait IoRead: PacketIo { - fn read_raw<'a>( + fn read_raw<'a, T: PacketStore<'a, Write>>( &'a self, pos: Pos, - packet: impl Into>, - ) -> IoFut<'a, Self, Write, Pos> { + packet: T, + ) -> IoFut<'a, Self, Write, Pos, T> { self.io(pos, packet) } - fn read_all<'a>( + fn read_all<'a, T: IntoPacket<'a, Write>>( &'a self, pos: Pos, - packet: impl Into>, - ) -> IoFullFut<'a, Self, Write, Pos> { - IoFullFut::NewId(pos, packet.into(), self.new_id()) + packet: T, + ) -> IoFullFut<'a, Self, Write, Pos, T> { + let (packet, sync) = packet.into_packet(); + IoFullFut { + fut: self.io(pos, packet), + sync: Some(sync), + } } fn read_into<'a, T: Pod>( &'a self, pos: Pos, data: &'a mut MaybeUninit, - ) -> IoFullFut<'a, Self, Write, Pos> { + ) -> IoFullFut<'a, Self, Write, Pos, &'a mut [MaybeUninit]> { let buf = unsafe { core::slice::from_raw_parts_mut( data as *mut MaybeUninit as *mut MaybeUninit, @@ -43,21 +44,40 @@ pub trait IoRead: PacketIo { self.read_all(pos, buf) } - /// # Notes - /// - /// This function may break rust stacked borrows rules. If you wish to not do that, please use - /// [`read_into`](Self::read_into) function. - /// - /// This may be fixed once const generics are able to instantiate `[u8; mem::size_of::()]`. fn read(&self, pos: Pos) -> IoReadFut { - IoReadFut::NewId(pos, self.new_id()) + let pkt = Packet::::new_uninit::(); + IoReadFut(self.io(pos, pkt)) } - fn read_to_end<'a>(&'a self, pos: Pos, buf: &'a mut Vec) -> ReadToEndFut<'a, Self, Pos> { + fn read_to_end<'a>(&'a self, pos: Pos, buf: &'a mut Vec) -> ReadToEndFut<'a, Self, Pos> + where + Pos: CopyPos, + { + let start_len = buf.len(); + let start_cap = buf.capacity(); + + // Reserve enough for 32 bytes of data initially + if start_cap - start_len < 32 { + buf.reserve(32 - (start_cap - start_len)); + } + + // Issue a read + let data = buf.as_mut_ptr() as *mut MaybeUninit; + // SAFETY: the data here is uninitialized, and we are getting exclusive access + // to it. + let data = unsafe { + core::slice::from_raw_parts_mut(data.add(start_len), buf.capacity() - start_len) + }; + + let fut = Some(data.into_packet()).map(|(pkt, sync)| (self.io(pos.copy_pos(), pkt), sync)); + ReadToEndFut { + io: self, pos, buf, - state: ReadToEndFutState::NewId(self.new_id()), + fut, + start_len, + start_cap, } } } @@ -65,23 +85,27 @@ pub trait IoRead: PacketIo { impl IoRead for T where T: PacketIo {} pub trait IoWrite: PacketIo { - fn write_raw<'a>( + fn write_raw<'a, T: PacketStore<'a, Read>>( &'a self, pos: Pos, - packet: impl Into>, - ) -> IoFut<'a, Self, Read, Pos> { + packet: T, + ) -> IoFut<'a, Self, Read, Pos, T> { self.io(pos, packet) } - fn write_all<'a>( + fn write_all<'a, T: IntoPacket<'a, Read>>( &'a self, pos: Pos, - packet: impl Into>, - ) -> IoFullFut<'a, Self, Read, Pos> { - IoFullFut::NewId(pos, packet.into(), self.new_id()) + packet: T, + ) -> IoFullFut<'a, Self, Read, Pos, T> { + let (packet, sync) = packet.into_packet(); + IoFullFut { + fut: self.io(pos, packet), + sync: Some(sync), + } } - fn write<'a, T>(&'a self, pos: Pos, data: &'a T) -> IoFullFut<'a, Self, Read, Pos> { + fn write<'a, T>(&'a self, pos: Pos, data: &'a T) -> IoFullFut<'a, Self, Read, Pos, &'a [u8]> { let buf = unsafe { core::slice::from_raw_parts(data as *const T as *const u8, core::mem::size_of::()) }; @@ -91,106 +115,89 @@ pub trait IoWrite: PacketIo { impl IoWrite for T where T: PacketIo {} -pub enum IoFullFut<'a, Io: PacketIo, Perms: PacketPerms, Param: 'a> { - NewId(Param, Packet, NewIdFut<'a, Io, Perms, Param>), - Read( - Option, - ManuallyDrop< as Future>::Output>, - ), - Finished, +pub struct IoFullFut< + 'a, + Io: PacketIo, + Perms: PacketPerms, + Param: 'a, + Obj: IntoPacket<'a, Perms>, +> { + fut: IoFut<'a, Io, Perms, Param, Obj::Target>, + sync: Option, } -impl<'a, Io: PacketIo, Perms: PacketPerms, Param> Future - for IoFullFut<'a, Io, Perms, Param> +impl<'a, Io: PacketIo, Perms: PacketPerms, Param, Obj: IntoPacket<'a, Perms>> Future + for IoFullFut<'a, Io, Perms, Param, Obj> { type Output = Result<(), Error>; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { let this = unsafe { self.get_unchecked_mut() }; - loop { - match this { - Self::NewId(_, _, alloc) => { - let alloc = unsafe { Pin::new_unchecked(alloc) }; - - if let Poll::Ready(id) = alloc.poll(cx) { - let prev = - core::mem::replace(this, Self::Read(None, ManuallyDrop::new(id))); - match (prev, &mut *this) { - (Self::NewId(param, packet, _), Self::Read(_, id)) => { - unsafe { Pin::new_unchecked(&**id) }.send_io(param, packet) - } - _ => unreachable!(), - } - // Poll again to force processing of the stream - continue; - } else { - break Poll::Pending; - } - } - Self::Read(err, id) => match unsafe { Pin::new_unchecked(&mut **id) }.poll_next(cx) - { - Poll::Ready(None) => { - let err = err.take(); - unsafe { ManuallyDrop::drop(id) }; - *this = Self::Finished; - break Poll::Ready(err.map(Err).unwrap_or(Ok(()))); - } - Poll::Ready(Some((_, nerr))) => { - if let Some(nerr) = nerr { - *err = Some(nerr); - } - continue; - } - _ => break Poll::Pending, - }, - Self::Finished => unreachable!(), - } - } + let fut = unsafe { Pin::new_unchecked(&mut this.fut) }; + + fut.poll(cx).map(|_| { + let hdr = this.fut.hdr(); + // TODO: put this after error checking + Obj::sync_back(hdr, this.sync.take().unwrap()); + hdr.err_on_zero() + }) } } +type UninitSlice<'a> = &'a mut [MaybeUninit]; + pub struct ReadToEndFut<'a, Io: PacketIo, Param> { + io: &'a Io, pos: Param, buf: &'a mut Vec, - state: ReadToEndFutState<'a, Io, Param>, -} - -pub enum ReadToEndFutState<'a, Io: PacketIo, Param: 'a> { - NewId(NewIdFut<'a, Io, Write, Param>), - // TODO: change this to a struct - Read( - usize, - usize, - Option, - usize, - Option, - ManuallyDrop< as Future>::Output>, - ), - Finished, + fut: Option<( + IoFut<'a, Io, Write, Param, as IntoPacket<'a, Write>>::Target>, + as IntoPacket<'a, Write>>::SyncHandle, + )>, + start_len: usize, + start_cap: usize, } impl<'a, Io: PacketIo, Param: CopyPos + UsizeMath> Future for ReadToEndFut<'a, Io, Param> { - type Output = Option; + type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { let this = unsafe { self.get_unchecked_mut() }; loop { - match &mut this.state { - ReadToEndFutState::NewId(alloc) => { - let alloc = unsafe { Pin::new_unchecked(alloc) }; - - if let Poll::Ready(stream) = alloc.poll(cx) { - let start_len = this.buf.len(); - let start_cap = this.buf.capacity(); - - // Reserve enough for 32 bytes of data initially - if start_cap - start_len < 32 { - this.buf.reserve(32 - (start_cap - start_len)); - } + let (fut, _) = this.fut.as_mut().expect("Poll called in invalid state"); + let fut = unsafe { Pin::new_unchecked(fut) }; + + match fut.poll(cx) { + Poll::Ready(()) => { + // TODO: check into safety of this. We are technically unpinning a previously + // pinned object. + let (fut, sync) = this.fut.take().unwrap(); + + let hdr = fut.hdr(); + let len = Write::len(hdr); + let clamp = hdr.error_clamp(); + + as IntoPacket<'a, Write>>::sync_back(hdr, sync); + // SAFETY: all these bytes have been successfully read + unsafe { + this.buf + .set_len(this.buf.len() + core::cmp::min(clamp, len) as usize) + }; + + // We reached the end + if clamp < len || clamp == 0 { + let total_len = this.buf.len() - this.start_len; + // TODO: figure out how to extract error on 0 read + break Poll::Ready(Ok(total_len)); + } else { + // Double read size, but cap it to 2MB + let reserve_len = + core::cmp::min(this.buf.capacity() - this.start_cap, 0x20000); + this.buf.reserve(reserve_len); // Issue a read let data = this.buf.as_mut_ptr() as *mut MaybeUninit; @@ -198,200 +205,58 @@ impl<'a, Io: PacketIo, Param: CopyPos + UsizeMath> Future // to it. let data = unsafe { core::slice::from_raw_parts_mut( - data.add(start_len), - this.buf.capacity() - start_len, + data.add(this.buf.len()), + this.buf.capacity() - this.buf.len(), ) }; - this.state = ReadToEndFutState::Read( - start_len, - start_cap, - None, - 0, - None, - ManuallyDrop::new(stream), - ); - - match &mut this.state { - ReadToEndFutState::Read(_, _, _, _, _, stream) => { - unsafe { Pin::new_unchecked(&**stream) } - .send_io(this.pos.copy_pos(), data); - } - _ => unreachable!(), - } - } else { - break Poll::Pending; - } - } - ReadToEndFutState::Read( - start_len, - start_cap, - final_cap, - max_cap, - error, - stream, - ) => { - match unsafe { Pin::new_unchecked(&mut **stream) }.poll_next(cx) { - Poll::Ready(Some((pkt, err))) => { - // We failed, thus cap the buffer length, complete queued I/O, but do not - // perform any further reads. - if err.is_some() { - let new_end = pkt.start() + (this.buf.len() - *start_len); - let end = final_cap.get_or_insert(new_end); - *error = err; - *end = core::cmp::min(*end, new_end); - } else { - let new_end = pkt.end() + (this.buf.len() - *start_len); - *max_cap = core::cmp::max(*max_cap, new_end); - } - } - Poll::Ready(None) => { - // If we got no successful output, cap the capacity to 0 - // so that we don't end up in a deadlock if the backend does no I/O - // processing. - if *max_cap == 0 { - *final_cap = Some(0); - } - // If we read all bytes successfully, grow the buffer and keep going. - // Otherwise, return finished state. - match final_cap { - Some(cap) => { - let cap = core::cmp::min(cap, max_cap); - unsafe { ManuallyDrop::drop(stream) }; - // SAFETY: these bytes have been successfully read - unsafe { this.buf.set_len(*start_len + *cap) }; - this.pos.add_assign(this.buf.len() - *start_len); - if error.is_some() && *cap == 0 { - break Poll::Ready(None); - } else { - break Poll::Ready(Some(*cap)); - } - } - _ => { - // SAFETY: all these bytes have been successfully read - unsafe { this.buf.set_len(this.buf.capacity()) }; - - // Double read size, but cap it to 2MB - let reserve_len = - core::cmp::min(this.buf.capacity() - *start_cap, 0x20000); - this.buf.reserve(reserve_len); - - // Issue a read - let data = this.buf.as_mut_ptr() as *mut MaybeUninit; - // SAFETY: the data here is uninitialized, and we are getting exclusive access - // to it. - let data = unsafe { - core::slice::from_raw_parts_mut( - data.add(this.buf.len()), - this.buf.capacity() - this.buf.len(), - ) - }; - - unsafe { Pin::new_unchecked(&**stream) } - .send_io(this.pos.copy_pos().add(this.buf.len()), data); - } - } - } - _ => break Poll::Pending, + + this.fut = Some(data.into_packet()).map(|(pkt, sync)| { + ( + this.io.io( + this.pos.copy_pos().add(this.buf.len() - this.start_len), + pkt, + ), + sync, + ) + }); } } - ReadToEndFutState::Finished => unreachable!(), + Poll::Pending => break Poll::Pending, } } } } -pub enum IoReadFut<'a, Io: PacketIo, Param: 'a, T> { - NewId(Param, NewIdFut<'a, Io, Write, Param>), - Read( - MaybeUninit, - Option, - ManuallyDrop< as Future>::Output>, - ), - Finished, -} +pub struct IoReadFut<'a, Io: PacketIo, Param: 'a, T: 'a>( + IoFut<'a, Io, Write, Param, FullPacket, Write>>, +); -impl<'a, Io: PacketIo, Param, T> Future for IoReadFut<'a, Io, Param, T> { +impl<'a, Io: PacketIo, Param, T: 'a> Future for IoReadFut<'a, Io, Param, T> { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let f = move || { - let this = unsafe { self.get_unchecked_mut() }; - - loop { - match this { - Self::NewId(_, alloc) => { - let alloc = unsafe { Pin::new_unchecked(alloc) }; - - if let Poll::Ready(stream) = alloc.poll(cx) { - let prev = core::mem::replace( - this, - Self::Read(MaybeUninit::uninit(), None, ManuallyDrop::new(stream)), - ); - match (prev, &mut *this) { - (Self::NewId(param, _), Self::Read(data, _, stream)) => { - //let data = data.get_mut(); - let buf = unsafe { - core::slice::from_raw_parts_mut( - data as *mut MaybeUninit<_> as *mut MaybeUninit, - core::mem::size_of::(), - ) - }; - unsafe { Pin::new_unchecked(&**stream) }.send_io(param, buf) - } - _ => unreachable!(), - } - // Poll again to force processing of the stream - continue; - } else { - break Poll::Pending; - } - } - Self::Read(_, err, stream) => { - match unsafe { Pin::new_unchecked(&mut **stream) }.poll_next(cx) { - Poll::Ready(None) => { - unsafe { ManuallyDrop::drop(stream) }; - let prev = core::mem::replace(this, Self::Finished); - - match prev { - Self::Read(data, err, _) => { - break Poll::Ready(err.map(Err).unwrap_or_else(|| { - Ok(unsafe { - data /*.into_inner()*/ - .assume_init() - }) - })); - } - _ => unreachable!(), - } - } - Poll::Ready(Some((_, Some(e)))) => { - // TODO: what do we do here - *err = Some(e); - continue; - } - Poll::Ready(_) => { - continue; - } - _ => break Poll::Pending, - } - } - Self::Finished => unreachable!(), - } - } - }; - f() + let this = unsafe { self.get_unchecked_mut() }; + + let fut = unsafe { Pin::new_unchecked(&mut this.0) }; + + fut.poll(cx).map(|_| { + let hdr = this.0.hdr(); + hdr.err_on_zero() + .map(|_| unsafe { core::ptr::read(hdr.simple_data_ptr().cast::()) }) + }) } } pub mod sync { use super::*; - #[cglue_trait] + // TODO: figure out how to expose these over cglue + pub trait SyncIoRead: IoRead + IoBackend { fn read_all<'a>( &'a self, pos: Pos, - packet: impl Into>, + packet: impl IntoPacket<'a, Write>, ) -> Result<(), Error> { self.block_on(IoRead::read_all(self, pos, packet)) } @@ -408,21 +273,20 @@ pub mod sync { self.block_on(IoRead::read(self, pos)) } - #[skip_func] fn read_to_end<'a>(&'a self, pos: Pos, buf: &'a mut Vec) -> Option where ReadToEndFut<'a, Self, Pos>: Future>, + Pos: CopyPos, { self.block_on(IoRead::read_to_end(self, pos, buf)) } } - #[cglue_trait] pub trait SyncIoWrite: IoWrite + IoBackend { fn write_all<'a>( &'a self, pos: Pos, - packet: impl Into>, + packet: impl IntoPacket<'a, Read>, ) -> Result<(), Error> { self.block_on(IoWrite::write_all(self, pos, packet)) } diff --git a/mfio/src/util.rs b/mfio/src/util.rs index 43dfc8f..03e0eb4 100644 --- a/mfio/src/util.rs +++ b/mfio/src/util.rs @@ -94,7 +94,8 @@ impl UsizeMath for NoPos { } } -pub(crate) trait CopyPos: Sized { +// FIXME: this trait shouldn't be public +pub trait CopyPos: Sized { fn copy_pos(&self) -> Self; }