Skip to content

Commit

Permalink
Add futures trait compatibility
Browse files Browse the repository at this point in the history
h33p committed Dec 5, 2023
1 parent 2941ce7 commit 342d6ed
Showing 2 changed files with 144 additions and 0 deletions.
140 changes: 140 additions & 0 deletions mfio/src/futures_compat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
use crate::io::*;
use crate::stdeq::{self, AsyncIoFut};
use crate::util::PosShift;
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use futures::io::{AsyncRead, AsyncSeek, AsyncWrite};
use std::io::{Result, SeekFrom};

pub struct Compat<'a, Io: ?Sized> {
io: &'a Io,
read: Option<AsyncIoFut<'a, Io, Write, u64, &'a mut [u8]>>,
write: Option<AsyncIoFut<'a, Io, Read, u64, &'a [u8]>>,
}

/// Bridges mfio with futures.
///
/// # Examples
///
/// Read from mfio object through futures traits.
///
/// ```rust
/// # mod sample {
/// # include!("sample.rs");
/// # }
/// # use sample::SampleIo;
/// # fn work() -> mfio::error::Result<()> {
/// use futures::io::{AsyncReadExt, Cursor};
/// use mfio::backend::*;
/// use mfio::futures_compat::FuturesCompat;
/// use mfio::stdeq::SeekableRef;
///
/// let mem = vec![0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144];
/// let handle = SampleIo::new(mem.clone());
///
/// handle.block_on(async {
/// let mut buf = Cursor::new(vec![0; mem.len()]);
///
/// let handle = SeekableRef::from(&handle);
/// futures::io::copy(&mut handle.compat(), &mut buf).await?;
/// assert_eq!(mem, buf.into_inner());
///
/// Ok(())
/// })
/// # }
/// # work().unwrap();
/// ```
pub trait FuturesCompat {
fn compat(&self) -> Compat<Self> {
Compat {
io: self,
read: None,
write: None,
}
}
}

// StreamPos is needed for all I/O traits, so we use it to make sure rust gives better diagnostics.
impl<'a, Io: ?Sized + stdeq::StreamPos<u64>> FuturesCompat for Io {}

Check warning on line 59 in mfio/src/futures_compat.rs

GitHub Actions / clippy

this lifetime isn't used in the impl

warning: this lifetime isn't used in the impl --> mfio/src/futures_compat.rs:59:6 | 59 | impl<'a, Io: ?Sized + stdeq::StreamPos<u64>> FuturesCompat for Io {} | ^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#extra_unused_lifetimes = note: `#[warn(clippy::extra_unused_lifetimes)]` on by default

Check warning on line 59 in mfio/src/futures_compat.rs

GitHub Actions / clippy

this lifetime isn't used in the impl

warning: this lifetime isn't used in the impl --> mfio/src/futures_compat.rs:59:6 | 59 | impl<'a, Io: ?Sized + stdeq::StreamPos<u64>> FuturesCompat for Io {} | ^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#extra_unused_lifetimes = note: `#[warn(clippy::extra_unused_lifetimes)]` on by default

impl<'a, Io: ?Sized + stdeq::AsyncRead<u64>> AsyncRead for Compat<'a, Io>
where
u64: PosShift<Io>,
{
fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<Result<usize>> {
let this = unsafe { self.get_unchecked_mut() };

loop {
if let Some(read) = this.read.as_mut() {
// Update the sync handle. This is how we hack around the lifetimes of input buffer.
#[cfg(not(mfio_assume_linear_types))]
{
assert_eq!(read.sync.as_ref().map(|v| v.len()), Some(buf.len()));
// SAFETY: AsyncIoFut will only use the sync object if, and only if the buffer is
// to be written in this poll.
read.sync = Some(unsafe { &mut *(buf as *mut _) });
}

let read = unsafe { Pin::new_unchecked(read) };

break read.poll(cx).map(|v| {
this.read = None;
v.map_err(|_| std::io::ErrorKind::Other.into())
});
} else {
// SAFETY: on mfio_assume_linear_types, this is unsafe. Without the switch this is
// safe, because the buffer is stored in a sync variable that is only used whenever
// the I/O completes. That is processed in this poll function, and we update the
// sync at every iteration of the loop.
let buf = unsafe { &mut *(buf as *mut _) };
this.read = Some(stdeq::AsyncRead::read(this.io, buf));
}
}
}
}

impl<'a, Io: ?Sized + stdeq::AsyncWrite<u64>> AsyncWrite for Compat<'a, Io>
where
u64: PosShift<Io>,
{
fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<Result<usize>> {
let this = unsafe { self.get_unchecked_mut() };

loop {
if let Some(write) = this.write.as_mut() {
let write = unsafe { Pin::new_unchecked(write) };

break write.poll(cx).map(|v| {
this.write = None;
v.map_err(|_| std::io::ErrorKind::Other.into())
});
} else {
// SAFETY: on mfio_assume_linear_types, this is unsafe. Without the switch this is
// safe, because the buffer is transferred to an intermediate one before this
// function returns..
let buf = unsafe { &*(buf as *const _) };
this.write = Some(stdeq::AsyncWrite::write(this.io, buf));
}
}
}

fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<()>> {
// Completion of every request currently implies we've flushed.
// TODO: improve semantics
Poll::Ready(Ok(()))
}

fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<()>> {
// We currently imply that we can just close on drop.
// TODO: improve semantics
Poll::Ready(Ok(()))
}
}

impl<'a, Io: ?Sized + stdeq::StreamPos<u64>> AsyncSeek for Compat<'a, Io> {
fn poll_seek(self: Pin<&mut Self>, _: &mut Context<'_>, pos: SeekFrom) -> Poll<Result<u64>> {
let this = unsafe { self.get_unchecked_mut() };
Poll::Ready(stdeq::std_seek(this.io, pos))
}
}
4 changes: 4 additions & 0 deletions mfio/src/lib.rs
Original file line number Diff line number Diff line change
@@ -181,6 +181,8 @@ pub(crate) mod std_prelude {

pub mod backend;
pub mod error;
#[cfg(feature = "std")]
pub mod futures_compat;
pub mod io;
pub mod stdeq;
pub mod traits;
@@ -193,6 +195,8 @@ pub mod prelude {
pub use crate::backend::integrations::tokio::Tokio;
pub use crate::backend::{Integration, IoBackend, IoBackendExt, Null};
pub use crate::error::*;
#[cfg(feature = "std")]
pub use crate::futures_compat::FuturesCompat;
pub use crate::io::{
FullPacket, IntoPacket, OwnedPacket, Packet, PacketIo, PacketIoExt, PacketView, Read,
RefPacket, VecPacket, Write,

0 comments on commit 342d6ed

Please sign in to comment.