diff --git a/mfio/Cargo.toml b/mfio/Cargo.toml index 04e9bc4..a16796c 100644 --- a/mfio/Cargo.toml +++ b/mfio/Cargo.toml @@ -44,6 +44,7 @@ windows-sys = { version = "0.48", features = ["Win32_System_Threading", "Win32_F [dev-dependencies] tokio = { version = "1.24", features = ["rt", "macros", "rt-multi-thread"] } +smol = "1" criterion = { version = "0.5", git = "https://github.com/h33p/criterion.rs", branch = "tput2", features = ["async_tokio", "async_smol", "async_futures"] } pollster = "0.2" bytemuck = { version = "1", features = ["derive"] } @@ -51,3 +52,4 @@ bytemuck = { version = "1", features = ["derive"] } [features] default = ["std", "http"] std = ["parking_lot"] +cglue-trait = [] diff --git a/mfio/src/backend/fd.rs b/mfio/src/backend/fd.rs index 4943cd9..29e7467 100644 --- a/mfio/src/backend/fd.rs +++ b/mfio/src/backend/fd.rs @@ -1,3 +1,10 @@ +//! File descriptor based waker. +//! +//! [`FdWaker`] allows one to wake a runtime by pushing a write operation to the underlying file +//! descriptor, be it a pipe, eventfd, or anything else that can be pollable for readability. +//! +//! Create a [`FdWakerOwner`] from a [`AsRawFd`] object to allow controlling the waker properties. + use core::mem::ManuallyDrop; use core::sync::atomic::{AtomicU8, Ordering}; use core::task::{RawWaker, RawWakerVTable, Waker}; @@ -6,6 +13,42 @@ use std::io::{ErrorKind, Write}; use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd}; use tarc::{Arc, BaseArc}; +/// Owner of [`FdWaker`]s. +/// +/// When this type gets dropped, the underlying file descriptor gets closed and released. This +/// effectively breaks all remaining wakers, however, the references to them stay valid. +/// +/// # Examples +/// +/// Poll for the pipe to become readable: +/// +/// ``` +/// # #[cfg(miri)] +/// # fn main() { } +/// # #[cfg(not(miri))] +/// # fn main() { +/// use mfio::backend::fd::FdWakerOwner; +/// use nix::poll::*; +/// +/// let (wake_read, wake_write) = nix::unistd::pipe().unwrap(); +/// +/// let waker_owner = FdWakerOwner::from(wake_write); +/// +/// std::thread::spawn({ +/// let waker = waker_owner.clone().into_waker(); +/// move || { +/// std::thread::sleep(std::time::Duration::from_millis(500)); +/// waker.wake(); +/// } +/// }); +/// +/// let mut fd = [PollFd::new(wake_read, PollFlags::POLLIN)]; +/// assert_ne!(0, poll(&mut fd[..], 5000).unwrap()); +/// +/// // Let's verify that we did indeed get woken up. +/// assert!(fd[0].revents().unwrap().contains(PollFlags::POLLIN)); +/// # } +/// ``` #[repr(transparent)] pub struct FdWakerOwner(FdWaker); diff --git a/mfio/src/backend/integrations/async_io.rs b/mfio/src/backend/integrations/async_io.rs index 1bc40f1..65ac4e8 100644 --- a/mfio/src/backend/integrations/async_io.rs +++ b/mfio/src/backend/integrations/async_io.rs @@ -10,6 +10,40 @@ use std::os::fd::BorrowedFd; use super::super::*; use super::{BorrowingFn, Integration}; +/// async-io integration. +/// +/// Unlike [`Null`], this integration supports backends with polling handles, however, only +/// async-io based runtimes are supported, such as smol and async_std. +/// +/// Internally, this uses async-io's [`Async`] to wait for readiness of the polling FD, which means +/// only unix platforms are supported. +/// +/// # Examples +/// +/// Using the integration with smol: +/// +/// ``` +/// # mod sample { +/// # include!("../../sample.rs"); +/// # } +/// # use sample::SampleIo; +/// use mfio::prelude::v1::*; +/// +/// # #[cfg(all(unix, not(miri)))] +/// smol::block_on(async { +/// let mut handle = SampleIo::new(vec![1, 2, 3, 4]); +/// +/// // Run the integration. Prefer to use `run_with_mut`, so that panics can be avoided. +/// AsyncIo::run_with_mut(&mut handle, |handle| async move { +/// // Read value +/// let val = handle.read(0).await.unwrap(); +/// assert_eq!(1u8, val); +/// }) +/// .await +/// }); +/// # #[cfg(not(all(unix, not(miri))))] +/// # fn main() {} +/// ``` #[derive(Clone, Copy, Default)] pub struct AsyncIo; @@ -37,6 +71,7 @@ enum AsyncIoState<'a, B: IoBackend + ?Sized + 'a, Func, F> { Finished, } +#[doc(hidden)] pub struct AsyncIoImpl<'a, B: LinksIoBackend + 'a, Func, F> { backend: B, state: AsyncIoState<'a, B::Link, Func, F>, diff --git a/mfio/src/backend/integrations/null.rs b/mfio/src/backend/integrations/null.rs index e8e7a18..53f8c2c 100644 --- a/mfio/src/backend/integrations/null.rs +++ b/mfio/src/backend/integrations/null.rs @@ -7,6 +7,34 @@ use super::super::*; use super::{BorrowingFn, Integration}; +/// Minimal integration. +/// +/// This integration works in all async runtimes, however, it does not support the backend's +/// `PollingHandle`. If the backend returns `Some(handle)`, then this integration panics. +/// +/// # Examples +/// +/// Running with `pollster`: +/// +/// ``` +/// # mod sample { +/// # include!("../../sample.rs"); +/// # } +/// # use sample::SampleIo; +/// use mfio::prelude::v1::*; +/// +/// pollster::block_on(async { +/// let mut handle = SampleIo::new(vec![1, 2, 3, 4]); +/// +/// // Run the integration. Prefer to use `run_with_mut`, so that panics can be avoided. +/// Null::run_with_mut(&mut handle, |handle| async move { +/// // Read value +/// let val = handle.read(0).await.unwrap(); +/// assert_eq!(1u8, val); +/// }) +/// .await +/// }); +/// ``` #[derive(Clone, Copy, Default)] pub struct Null; @@ -31,6 +59,7 @@ enum NullState<'a, B: IoBackend + ?Sized + 'a, Func, F> { Finished, } +#[doc(hidden)] pub struct NullImpl<'a, B: LinksIoBackend + 'a, Func, F> { backend: B, state: NullState<'a, B::Link, Func, F>, diff --git a/mfio/src/backend/integrations/tokio.rs b/mfio/src/backend/integrations/tokio.rs index 65c8eb4..b42d5a6 100644 --- a/mfio/src/backend/integrations/tokio.rs +++ b/mfio/src/backend/integrations/tokio.rs @@ -6,6 +6,39 @@ use tokio::io::{unix::AsyncFd, Interest}; use super::super::*; use super::{BorrowingFn, Integration}; +/// Tokio integration. +/// +/// Unlike [`Null`], this integration supports backends with polling handles, however, only tokio +/// runtime is supported. +/// +/// Internally, this uses tokio's [`AsyncFd`] to wait for readiness of the polling handle, which +/// means only unix platforms are supported. +/// +/// # Examples +/// +/// ``` +/// # mod sample { +/// # include!("../../sample.rs"); +/// # } +/// # use sample::SampleIo; +/// use mfio::prelude::v1::*; +/// +/// #[tokio::main] +/// # #[cfg(all(unix, not(miri)))] +/// async fn main() { +/// let mut handle = SampleIo::new(vec![1, 2, 3, 4]); +/// +/// // Run the integration. Prefer to use `run_with_mut`, so that panics can be avoided. +/// Tokio::run_with_mut(&mut handle, |handle| async move { +/// // Read value +/// let val = handle.read(0).await.unwrap(); +/// assert_eq!(1u8, val); +/// }) +/// .await +/// } +/// # #[cfg(not(all(unix, not(miri))))] +/// # fn main() {} +/// ``` #[derive(Clone, Copy, Default)] pub struct Tokio; @@ -42,6 +75,7 @@ enum TokioState<'a, B: IoBackend + ?Sized + 'a, Func, F> { Finished, } +#[doc(hidden)] pub struct TokioImpl<'a, B: LinksIoBackend + 'a, Func, F> { backend: B, state: TokioState<'a, B::Link, Func, F>, diff --git a/mfio/src/backend/mod.rs b/mfio/src/backend/mod.rs index edd6048..d41ee6a 100644 --- a/mfio/src/backend/mod.rs +++ b/mfio/src/backend/mod.rs @@ -37,6 +37,8 @@ pub mod handle; #[cfg(all(windows, feature = "std"))] pub mod windows; +// TODO: rename DefaultHandle to OsHandle, and get rid of Infallible one. + #[cfg(all(unix, feature = "std"))] pub type DefaultHandle = RawFd; #[cfg(all(windows, feature = "std"))] @@ -53,6 +55,12 @@ struct NestedBackend { release: unsafe extern "C" fn(*const ()), } +/// Stores a backend. +/// +/// This type is always stored on backends, and is acquired by users in [`IoBackend::get_backend`]. +/// A backend can only be acquired once at a time, however, it does not matter who does it. +/// +/// Once the backend is acquired, it can be used to drive I/O to completion. #[repr(C)] pub struct BackendContainer { nest: UnsafeCell>, @@ -64,8 +72,16 @@ unsafe impl Send for BackendContainer {} unsafe impl Sync for BackendContainer {} impl BackendContainer { + /// Acquire a backend. + /// + /// This function locks the backend and the returned handle keeps it locked, until the handle + /// gets released. + /// + /// # Panics + /// + /// Panics if the backend has already been acquired. pub fn acquire(&self, wake_flags: Option>) -> BackendHandle { - if self.lock.swap(true, Ordering::Acquire) { + if self.lock.swap(true, Ordering::AcqRel) { panic!("Tried to acquire backend twice!"); } @@ -78,6 +94,12 @@ impl BackendContainer { } } + /// Acquires a backend in nested mode. + /// + /// This function is useful when layered I/O backends are desirable. When polling, first, this + /// backend will be polled, and afterwards, the provided handle. The ordering is consistent + /// with the behavior of first polling the user's future, and then polling the backend. In the + /// end, backends will be peeled off layer by layer, until the innermost backend is reached. pub fn acquire_nested>( &self, mut handle: BackendHandle, @@ -122,6 +144,7 @@ impl BackendContainer { } impl BackendContainer { + /// Creates a new [`DynBackend`] container. pub fn new_dyn + Send + 'static>(backend: T) -> Self { Self { backend: UnsafeCell::new(Box::pin(backend) as Pin + Send>>), @@ -131,6 +154,14 @@ impl BackendContainer { } } +/// Handle to a backend. +/// +/// This handle can be used to drive arbitrary future to completion by attaching a backend to it. +/// This is typically done using [`WithBackend`] that is constructed in +/// [`IoBackendExt::with_backend`]. +/// +/// Usually, the user would want to bypass this type and use [`IoBackendExt::block_on`], or an +/// [`Integration`] equivalent. pub struct BackendHandle<'a, B: ?Sized> { owner: &'a BackendContainer, backend: Pin<&'a mut B>, @@ -166,6 +197,13 @@ impl<'a, B: ?Sized> core::ops::DerefMut for BackendHandle<'a, B> { } } +/// Future combined with a backend. +/// +/// This future can be used to drive arbitrary future to completion by attaching a backend to it. +/// Construct this type using [`IoBackendExt::with_backend`]. +/// +/// Usually, the user would want to bypass this type and use [`IoBackendExt::block_on`], or an +/// [`Integration`] equivalent. pub struct WithBackend<'a, Backend: ?Sized, Fut: ?Sized> { backend: BackendHandle<'a, Backend>, future: Fut, @@ -221,6 +259,12 @@ impl<'a, Backend: Future + ?Sized, Fut: Future + ?Sized> Future for WithBackend< } } +/// Cooperative polling handle. +/// +/// This handle contains a handle and necessary metadata needed to cooperatively drive mfio code to +/// completion. +/// +/// This handle is typically created on the [`IoBackend`] side. pub struct PollingHandle<'a, Handle = DefaultHandle> { pub handle: Handle, pub cur_flags: &'a PollingFlags, @@ -337,6 +381,9 @@ impl PollingFlags { /// /// This trait is implemented at the outer-most stateful object of the I/O context. A `IoBackend` /// has the opportunity to expose efficient ways of driving said backend to completion. +/// +/// Users may want to call methods available on [`IoBackendExt`], instead of the ones on this +/// trait. pub trait IoBackend { type Backend: Future + Send + ?Sized; @@ -362,6 +409,7 @@ pub trait IoBackend { fn get_backend(&self) -> BackendHandle; } +/// Helpers for [`IoBackend`]. pub trait IoBackendExt: IoBackend { /// Builds a composite future that also polls the backend future. /// diff --git a/mfio/src/error.rs b/mfio/src/error.rs index 1db514b..b54644b 100644 --- a/mfio/src/error.rs +++ b/mfio/src/error.rs @@ -26,6 +26,7 @@ impl core::fmt::Display for Code { const HTTP_SHIFT: usize = 399; +/// HTTP 500 error. pub const INTERNAL_ERROR: Code = Code(unsafe { NonZeroU8::new_unchecked((500 - HTTP_SHIFT) as u8) }); @@ -228,6 +229,10 @@ macro_rules! ienum { } ienum! { + /// Describes the error subject. + /// + /// While [`Location`] points to a module where the error originated from, Subject attempts to + /// narrow the error down to the main actor that was involved in the creation of the error. pub enum Subject { Argument, Data, @@ -267,6 +272,9 @@ ienum! { } ienum! { + /// Describes the state of the error subject. + /// + /// State allows to specify what caused the [`Subject`] to fail. pub enum State { Invalid, Unreadable, @@ -305,6 +313,11 @@ ienum! { } ienum! { + /// Describes the error origin. + /// + /// The Origin specifies general location where the error originates from - be it a module, + /// crate, or subsystem. It is not meant to be descriptive in terms of error handling. For + /// better locality, check the [`Subject`]. pub enum Location { Backend, Memory, @@ -322,19 +335,6 @@ ienum! { } } -pub struct ErrorConstLocation(pub Code, pub Subject, pub State); - -impl From> for Error { - fn from(ErrorConstLocation(code, subject, state): ErrorConstLocation) -> Self { - Self { - code, - subject, - state, - location: N.into(), - } - } -} - #[cfg(feature = "std")] impl From for Error { fn from(err: std::io::Error) -> Self { @@ -377,17 +377,15 @@ impl From for State { } } -#[cfg(feature = "std")] -impl From for ErrorConstLocation { - fn from(kind: std::io::ErrorKind) -> Self { - ErrorConstLocation(INTERNAL_ERROR, Subject::Io, State::from(kind)) - } -} - #[cfg(feature = "std")] impl From for Error { fn from(kind: std::io::ErrorKind) -> Self { - ErrorConstLocation::<{ Location::Other as u8 }>::from(kind).into() + Error { + code: INTERNAL_ERROR, + location: Location::Other, + subject: Subject::Io, + state: kind.into(), + } } } @@ -397,6 +395,7 @@ impl From for Error { } } +/// Allows easy specification of different error properties. pub trait ErrorSpecify: Sized { fn code(self, code: Code) -> Self; fn subject(self, subject: Subject) -> Self; diff --git a/mfio/src/futures_compat.rs b/mfio/src/futures_compat.rs index 06d5bcd..1d56856 100644 --- a/mfio/src/futures_compat.rs +++ b/mfio/src/futures_compat.rs @@ -1,14 +1,31 @@ +//! Provides compatibility with `futures` traits. + use crate::io::*; use crate::stdeq::{self, AsyncIoFut}; use crate::util::PosShift; use core::future::Future; use core::pin::Pin; use core::task::{Context, Poll}; -use futures::io::{AsyncRead, AsyncSeek, AsyncWrite}; +#[cfg(not(mfio_assume_linear_types))] +use futures::io::AsyncRead; +use futures::io::{AsyncSeek, AsyncWrite}; use std::io::{Result, SeekFrom}; +/// Container for intermediate values. +/// +/// Currently, reading and writing is not cancel safe. Meaning, cancelling the I/O operation and +/// issuing a new one would continue the previous operation, and sync the results to the currently +/// provided buffer. Note that the types of operations are handled separately so they do not mix +/// and it is okay to cancel a read to issue a write. +/// +/// If you wish to cancel the operation, do drop the entire `Compat` object. However, be warned +/// that `mfio` may panic, since it does not yet support cancellation at all. +/// +/// Note that at the time of writing, `AsyncRead` is not supported when `mfio_assume_linear_types` +/// config is set. pub struct Compat<'a, Io: ?Sized> { io: &'a Io, + #[cfg(not(mfio_assume_linear_types))] read: Option>, write: Option>, } @@ -25,6 +42,9 @@ pub struct Compat<'a, Io: ?Sized> { /// # } /// # use sample::SampleIo; /// # fn work() -> mfio::error::Result<()> { +/// # mfio::linear_types_switch!( +/// # Linear => { Ok(()) } +/// # Standard => {{ /// use futures::io::{AsyncReadExt, Cursor}; /// use mfio::backend::*; /// use mfio::futures_compat::FuturesCompat; @@ -42,6 +62,38 @@ pub struct Compat<'a, Io: ?Sized> { /// /// Ok(()) /// }) +/// # }} +/// # ) +/// # } +/// # work().unwrap(); +/// ``` +/// +/// Write using futures traits. +/// +/// ```rust +/// # mod sample { +/// # include!("sample.rs"); +/// # } +/// # use sample::SampleIo; +/// # fn work() -> mfio::error::Result<()> { +/// use futures::io::AsyncWriteExt; +/// use mfio::backend::*; +/// use mfio::futures_compat::FuturesCompat; +/// use mfio::stdeq::SeekableRef; +/// use mfio::traits::IoRead; +/// +/// let mut mem = vec![0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144]; +/// let handle = SampleIo::new(mem.clone()); +/// +/// handle.block_on(async { +/// let handle = SeekableRef::from(&handle); +/// handle.compat().write_all(&[9, 9, 9]).await?; +/// +/// handle.read_all(0, &mut mem[..5]).await.unwrap(); +/// assert_eq!(&mem[..5], &[9, 9, 9, 2, 3]); +/// +/// Ok(()) +/// }) /// # } /// # work().unwrap(); /// ``` @@ -49,6 +101,7 @@ pub trait FuturesCompat { fn compat(&self) -> Compat { Compat { io: self, + #[cfg(not(mfio_assume_linear_types))] read: None, write: None, } @@ -58,6 +111,10 @@ pub trait FuturesCompat { // StreamPos is needed for all I/O traits, so we use it to make sure rust gives better diagnostics. impl<'a, Io: ?Sized + stdeq::StreamPos> FuturesCompat for Io {} +// Currently we cannot guarantee that the user won't swap the buffer when using linear types. +// FIXME: always allocate an intermediary and sync in `Compat`. This way we could also retain the +// buffer, so that's nice. +#[cfg(not(mfio_assume_linear_types))] impl<'a, Io: ?Sized + stdeq::AsyncRead> AsyncRead for Compat<'a, Io> where u64: PosShift, @@ -68,12 +125,9 @@ where loop { if let Some(read) = this.read.as_mut() { // Update the sync handle. This is how we hack around the lifetimes of input buffer. - #[cfg(not(mfio_assume_linear_types))] - { - // SAFETY: AsyncIoFut will only use the sync object if, and only if the buffer is - // to be written in this poll. - read.sync = Some(unsafe { &mut *(buf as *mut _) }); - } + // SAFETY: AsyncIoFut will only use the sync object if, and only if the buffer is + // to be written in this poll. + read.sync = Some(unsafe { &mut *(buf as *mut _) }); let read = unsafe { Pin::new_unchecked(read) }; diff --git a/mfio/src/io/mod.rs b/mfio/src/io/mod.rs index 8777009..82337b9 100644 --- a/mfio/src/io/mod.rs +++ b/mfio/src/io/mod.rs @@ -100,6 +100,9 @@ pub use opaque::*; /// permission implies the packet holds the data, and that data can be transferred to the internal /// data store of the I/O backend. /// +/// You may want to check [`PacketIoExt`] trait and the [`traits`](crate::traits) module for easier +/// to use abstractions. +/// /// # Example /// /// Request-response handler: @@ -155,7 +158,7 @@ pub use opaque::*; /// assert_eq!(&buf[..10], b"Hiooooo\0\0\0"); /// # }); /// ``` -#[cglue_trait] +#[cfg_attr(feature = "cglue-trait", cglue_trait)] pub trait PacketIo: Sized { /// Send I/O request to the backend. /// @@ -203,13 +206,15 @@ pub trait PacketIo: Sized { fn send_io(&self, param: Param, view: BoundPacketView); } +/// I/O helpers. +/// +/// Use these helpers to simplify the usage of [`PacketIo`]. pub trait PacketIoExt: PacketIo { fn io<'a, T: PacketStore<'a, Perms>>( &'a self, param: Param, packet: T, ) -> IoFut<'a, Self, Perms, Param, T> { - //IoFut::NewId(self, param, packet.stack()) IoFut { pkt: UnsafeCell::new(Some(packet.stack())), initial_state: UnsafeCell::new(Some((self, param))), @@ -223,7 +228,6 @@ pub trait PacketIoExt: PacketIo { packet: T, output: O, ) -> IoToFut<'a, Self, Perms, Param, T, O> { - //IoFut::NewId(self, param, packet.stack()) IoToFut { pkt_out: UnsafeCell::new(Some((packet.stack(), output.stack()))), initial_state: UnsafeCell::new(Some((self, param))), @@ -256,6 +260,10 @@ pub trait PacketIoExt: PacketIo { impl, Perms: PacketPerms, Param> PacketIoExt for T {} +/// Helpers for Stream I/O. +/// +/// This is mainly meant for cases where I/O does not have a position parameter, such as TCP +/// streams. pub trait StreamIoExt: PacketIo { fn stream_io<'a, T: PacketStore<'a, Perms>>( &'a self, @@ -275,6 +283,10 @@ pub trait StreamIoExt: PacketIo { impl, Perms: PacketPerms> StreamIoExt for T {} +/// Describes lack of position. +/// +/// This type is used in streams to signify that I/O is sequential. The convention is that I/O is +/// processed on first-come, first-served basis #[repr(transparent)] #[derive(Clone)] pub struct NoPos(core::marker::PhantomData<()>); @@ -285,6 +297,11 @@ impl NoPos { } } +/// The simplest I/O future. +/// +/// This future will drive an operation on the packet to completion, and then return `Poll::Ready`. +/// +/// To perform more complex actions on partial results, please look at [`IoToFut`]. pub struct IoFut<'a, T: ?Sized, Perms: PacketPerms, Param, Packet: PacketStore<'a, Perms>> { pkt: UnsafeCell>>, initial_state: UnsafeCell>, @@ -336,6 +353,11 @@ impl< } } +/// I/O future with custom actions per returned packet segment. +/// +/// This future allows customizing behavior upon each completed packet segment. This may include +/// logging, storing segments in a collection, or processing them in a stream. Please see +/// appropriate output modules for more details. pub struct IoToFut< 'a, T: ?Sized, diff --git a/mfio/src/io/packet/mod.rs b/mfio/src/io/packet/mod.rs index 26e78f9..43f5920 100644 --- a/mfio/src/io/packet/mod.rs +++ b/mfio/src/io/packet/mod.rs @@ -15,8 +15,6 @@ use core::task::{Context, Poll}; use rangemap::RangeSet; use tarc::BaseArc; -pub type BoxedFuture = Pin + Send>>; - mod output; pub use output::*; mod view; @@ -527,6 +525,11 @@ pub enum PacketVtblTag { Complex, } +/// Describes packet type. +/// +/// If the value within this union is less than [`PacketVtblTag::Complex`], then the packet is +/// simple, and `tag` should be accessed. Meanwhile, any other value implies the packet is a +/// complex one, and `vtable` should be accessed instead. #[derive(Clone, Copy)] pub union PacketVtblRef { pub tag: usize, @@ -1275,6 +1278,10 @@ trait AnyBytes {} impl AnyBytes for u8 {} impl AnyBytes for MaybeUninit {} +/// Object convertable into a packet. +/// +/// This is an extension of [`OpaqueStore`], where an object may have additional synchronization +/// steps taken upon the end of I/O processing. pub trait IntoPacket<'a, Perms: PacketPerms> { type Target: PacketStore<'a, Perms>; type SyncHandle; @@ -1391,12 +1398,16 @@ pub type TransferDataFn = for<'a> unsafe extern "C" fn( ); /// Retrieves total length of the packet. -/// -/// This pub type LenFn = unsafe extern "C" fn(packet: &Packet) -> u64; +/// Packet that may be alloced. +/// +/// `Ok` represents alloced, `Err` represents unalloced (bound packet view). pub type MaybeAlloced = Result<::Alloced, BoundPacketView>; +/// Represents the state where packet is either alloced or transferred. +/// +/// `Ok` represents alloced, `Err` represents transferred. pub type AllocedOrTransferred = Result<::Alloced, TransferredPacket>; /// Describes type constraints on packet operations. @@ -1480,6 +1491,10 @@ pub trait PacketPerms: 'static + core::fmt::Debug + Clone + Copy { } } +/// ReadWrite permissions. +/// +/// The behavior of ReadWrite is not well defined, but for simple buffers, it implies the swapping +/// of data between the 2 buffers. #[repr(C)] #[derive(Clone, Copy)] pub struct ReadWrite { @@ -1534,6 +1549,9 @@ impl PacketPerms for ReadWrite { } } +/// Write permissions. +/// +/// This implies the packet is writeable and may not have valid data beforehand. #[repr(C)] #[derive(Clone, Copy)] pub struct Write { @@ -1592,6 +1610,9 @@ impl PacketPerms for Write { } } +/// Read permissions. +/// +/// This implies this packet contains valid data and it can be read. #[repr(C)] #[derive(Clone, Copy)] pub struct Read { @@ -1645,7 +1666,15 @@ impl PacketPerms for Read { } } +/// Objects that can be split. +/// +/// This trait enables splitting objects into non-overlapping parts. pub trait Splittable: Sized { + /// Splits an object at given position. + /// + /// # Panics + /// + /// This function may panic if len is outside the bounds of the given object. fn split_at(self, len: T) -> (Self, Self); fn len(&self) -> T; fn is_empty(&self) -> bool { @@ -1675,6 +1704,10 @@ impl, B: Splittable> Splittable f } } +/// Object that can be returned with an error. +/// +/// This is meant for all types of packets - allow backend to easily return them to the user with +/// an appropriate error condition attached. pub trait Errorable: Sized { fn error(self, err: Error); } @@ -1688,6 +1721,9 @@ impl Errorable for Result { } } +/// Packet which has been allocated. +/// +/// Allocated packets expose direct access to the underlying buffer. pub trait AllocatedPacket: Splittable + Errorable { type Perms: PacketPerms; type Pointer: Copy; @@ -1695,6 +1731,7 @@ pub trait AllocatedPacket: Splittable + Errorable { fn as_ptr(&self) -> Self::Pointer; } +/// Represents a simple allocated packet with write permissions. #[repr(C)] pub struct ReadWritePacketObj { alloced_packet: *mut u8, @@ -1756,6 +1793,9 @@ impl core::ops::DerefMut for ReadWritePacketObj { } } +/// Represents a simple allocated packet with write permissions. +/// +/// The data inside may not be initialized, therefore, this packet should only be written to. #[repr(C)] pub struct WritePacketObj { alloced_packet: *mut MaybeUninit, @@ -1817,6 +1857,7 @@ impl core::ops::DerefMut for WritePacketObj { } } +/// Represents a simple allocated packet with read permissions. #[repr(C)] pub struct ReadPacketObj { alloced_packet: *const u8, @@ -1870,6 +1911,12 @@ impl core::ops::Deref for ReadPacketObj { } } +/// Represents the state of the packet that has already had data transferred to. +/// +/// This struct is marked as `must_use`, because the intention is for the backend to be explicit +/// about when the packet is dropped and released to the user. Since bound packets may call a +/// function that generates more I/O requests, it is important to drop the packet outside critical +/// sections. #[repr(transparent)] #[must_use = "please handle point of drop intentionally"] pub struct TransferredPacket(BoundPacketView); diff --git a/mfio/src/io/packet/output.rs b/mfio/src/io/packet/output.rs index ae48d77..cd698d7 100644 --- a/mfio/src/io/packet/output.rs +++ b/mfio/src/io/packet/output.rs @@ -16,8 +16,20 @@ use tarc::BaseArc; use super::{OpaqueStore, PacketPerms, PacketView}; use crate::error::Error; +/// Typical output of packets. +/// +/// Whenever a backend returns a packet view to the user, it will have both the view, and optional +/// error case attached. If `Option` is not `None` then the packet should be considered +/// failed. However, note that the given packet view may have been accessed. If it was accessed, +/// the user should still assume the operation has failed. +/// +/// Meanwhile, if the error is `None`, then the packet will have been accessed. If it hasn't, then +/// it's a logic bug of the backend. +/// +/// Access considers allocation (+ transfer of data), or invokation of transfer function. pub type Output<'a, Perms> = (PacketView<'a, Perms>, Option); +/// Represents a reference where final packet views are collected. #[repr(C)] pub struct OutputRef<'a, Perms: PacketPerms> { pub(crate) out: NonNull>, @@ -110,6 +122,9 @@ impl<'a, Perms: PacketPerms, T> OutputStore<'a, Perms> for T where { } +/// Describes an object where packet output is collected. +/// +/// Note that this is an opaque object with data referenced beyond the size of this struct. #[repr(C)] #[derive(Debug)] pub struct PacketOutput { @@ -118,6 +133,7 @@ pub struct PacketOutput { // data afterwards } +/// Describes operations that can be performed on a packet output object. #[repr(C)] #[derive(Debug)] pub struct PacketOutputVtbl { @@ -135,6 +151,40 @@ pub struct PacketOutputVtbl { ), } +/// Invokes a closure on each packet segment. +/// +/// The closure must be const, and reentrable, i.e. `Fn`. +/// +/// # Examples +/// +/// ```rust +/// # mod sample { +/// # include!("../../sample.rs"); +/// # } +/// # use sample::SampleIo; +/// # fn work() -> mfio::error::Result<()> { +/// use mfio::backend::*; +/// use mfio::io::*; +/// +/// let mut mem = vec![0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144]; +/// let handle = SampleIo::new(mem.clone()); +/// +/// handle.block_on(async { +/// let (packet, _) = handle.io_to_fn(10, Packet::::new_buf(4), |packet, err| { +/// if err.is_some() { +/// assert_eq!(packet.len(), 1); +/// } else { +/// assert_eq!(packet.len(), 3); +/// } +/// }).await; +/// +/// assert_eq!(packet.simple_contiguous_slice(), Some(&[55, 89, 144][..])); +/// +/// Ok(()) +/// }) +/// # } +/// # work().unwrap(); +/// ``` #[repr(C)] pub struct OutputFunction { hdr: PacketOutput, @@ -214,6 +264,45 @@ unsafe impl<'b, F: Fn(PacketView<'b, Perms>, Option) + Send + Sync, Perms } } +/// Outputs resulting packet views to a stream. +/// +/// Custom backing storage may be supported through [`PushPop`] trait. +/// +/// # Examples +/// +/// ```rust +/// # mod sample { +/// # include!("../../sample.rs"); +/// # } +/// # use sample::SampleIo; +/// # fn work() -> mfio::error::Result<()> { +/// use mfio::backend::*; +/// use mfio::io::*; +/// use core::pin::pin; +/// use futures::stream::StreamExt; +/// +/// let mut mem = vec![0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144]; +/// let handle = SampleIo::new(mem.clone()); +/// +/// handle.block_on(async { +/// let mut fut = handle.io_to_stream(10, Packet::::new_buf(4), vec![]); +/// let mut fut = pin!(fut); +/// +/// let stream = fut.as_mut().submit(); +/// +/// while let Some((packet, err)) = (&**stream).next().await { +/// if err.is_some() { +/// assert_eq!(packet.len(), 1); +/// } else { +/// assert_eq!(packet.len(), 3); +/// } +/// } +/// +/// Ok(()) +/// }) +/// # } +/// # work().unwrap(); +/// ``` #[repr(C)] pub struct PacketStream<'a, T, Perms: PacketPerms> { hdr: PacketOutput, @@ -460,6 +549,7 @@ unsafe impl OpaqueStore for PacketStream<'_, T, Perms> { } } +/// Providees a stack/queue mechanism for [`PacketStream`]s. pub trait PushPop { fn push(&mut self, val: T); fn pop(&mut self) -> Option; diff --git a/mfio/src/io/packet/view.rs b/mfio/src/io/packet/view.rs index 238bad0..8ec2554 100644 --- a/mfio/src/io/packet/view.rs +++ b/mfio/src/io/packet/view.rs @@ -9,6 +9,11 @@ use core::ptr::NonNull; use core::sync::atomic::*; use tarc::BaseArc; +/// Bound Packet View. +/// +/// Views may be bound to an output reference before being sent to the I/O backend. If the packet +/// is bound to such output ref, then, upon completion of processing, every resulting packet view +/// is passed to the given output ref. #[repr(C)] pub struct BoundPacketView { pub(crate) view: ManuallyDrop>, @@ -198,6 +203,11 @@ impl BoundPacketView { } } +/// Describes a view to a given packet. +/// +/// A bound packet view is sent to an I/O backend, where it can be split into multiple, strictly +/// non-overlapping views. This structure contains necessary data for describing the bounds of each +/// sub-view. #[repr(C)] pub struct PacketView<'a, Perms: PacketPerms> { pub(crate) pkt: NonNull>, diff --git a/mfio/src/traits.rs b/mfio/src/traits.rs index 6b7ab1c..b7ca073 100644 --- a/mfio/src/traits.rs +++ b/mfio/src/traits.rs @@ -1,3 +1,5 @@ +//! Helper traits + use crate::std_prelude::*; use crate::io::*; @@ -11,7 +13,11 @@ use core::mem::MaybeUninit; use core::pin::Pin; use core::task::{Context, Poll}; +/// I/O read operations. pub trait IoRead: PacketIo { + /// Forwards a read request to the I/O object. + /// + /// This is equivalent to `PacketIo::io`, but disambiguates packet permissions. fn read_raw<'a, T: PacketStore<'a, Write>>( &'a self, pos: Pos, @@ -20,6 +26,7 @@ pub trait IoRead: PacketIo { self.io(pos, packet) } + /// Read all data into the given object. fn read_all<'a, T: IntoPacket<'a, Write>>( &'a self, pos: Pos, @@ -32,6 +39,7 @@ pub trait IoRead: PacketIo { } } + /// Reads data into a `Pod` struct. fn read_into<'a, T: Pod>( &'a self, pos: Pos, @@ -46,11 +54,13 @@ pub trait IoRead: PacketIo { self.read_all(pos, buf) } + /// Reads data into a new `Pod` struct. fn read(&self, pos: Pos) -> IoReadFut { let pkt = FullPacket::<_, Write>::new_uninit(); IoReadFut(self.io(pos, pkt)) } + /// Reads data into given buffer until a gap is reached. fn read_to_end<'a>(&'a self, pos: Pos, buf: &'a mut Vec) -> ReadToEndFut<'a, Self, Pos> where Pos: CopyPos, @@ -86,7 +96,11 @@ pub trait IoRead: PacketIo { impl IoRead for T where T: PacketIo {} +/// I/O write operations. pub trait IoWrite: PacketIo { + /// Forwards a write request to the I/O object. + /// + /// This is equivalent to `PacketIo::io`, but disambiguates packet permissions. fn write_raw<'a, T: PacketStore<'a, Read>>( &'a self, pos: Pos, @@ -95,6 +109,7 @@ pub trait IoWrite: PacketIo { self.io(pos, packet) } + /// Writes all data in the given packet to destination. fn write_all<'a, T: IntoPacket<'a, Read>>( &'a self, pos: Pos, @@ -107,6 +122,7 @@ pub trait IoWrite: PacketIo { } } + /// Writes a pod object into to destination. fn write<'a, T>(&'a self, pos: Pos, data: &'a T) -> IoFullFut<'a, Self, Read, Pos, &'a [u8]> { let buf = unsafe { core::slice::from_raw_parts(data as *const T as *const u8, core::mem::size_of::()) @@ -250,10 +266,15 @@ impl<'a, Io: PacketIo, Param, T: 'a> Future for IoReadFut<'a, Io, } pub mod sync { + //! Synchronous I/O wrappers use super::*; // TODO: figure out how to expose these over cglue + /// Synchronous I/O read operations. + /// + /// This trait simply wraps `PacketIo + IoBackend` types in order to not subject the user to + /// async code. pub trait SyncIoRead: IoRead + IoBackend { fn read_all<'a>( &'a self, @@ -286,6 +307,10 @@ pub mod sync { } } + /// Synchronous I/O write operations. + /// + /// This trait simply wraps `PacketIo + IoBackend` types in order to not subject the user to + /// async code. pub trait SyncIoWrite: IoWrite + IoBackend { fn write_all<'a>( &'a self,