diff --git a/glommio/src/io/dma_file_stream.rs b/glommio/src/io/dma_file_stream.rs index e274c41ff..2ab20f092 100644 --- a/glommio/src/io/dma_file_stream.rs +++ b/glommio/src/io/dma_file_stream.rs @@ -17,6 +17,7 @@ use core::task::Waker; use futures_lite::{ future::poll_fn, io::{AsyncRead, AsyncWrite}, + ready, stream::{self, StreamExt}, }; use std::{ @@ -519,7 +520,7 @@ impl DmaStreamReader { /// Allows access to the buffer that holds the current position with no /// extra copy - //_ In order to use this API, one must guarantee that reading the specified + /// In order to use this API, one must guarantee that reading the specified /// length may not cross into a different buffer. Users of this API are /// expected to be aware of their buffer size (selectable in the /// [`DmaStreamReaderBuilder`]) and act accordingly. @@ -562,8 +563,45 @@ impl DmaStreamReader { /// [`AsyncReadExt`]: https://docs.rs/futures-lite/1.11.2/futures_lite/io/trait.AsyncReadExt.html /// [`ReadResult`]: struct.ReadResult.html pub async fn get_buffer_aligned(&mut self, len: u64) -> Result { + poll_fn(|cx| self.poll_get_buffer_aligned(cx, len)).await + } + + /// A variant of [`get_buffer_aligned`] that can be called from a poll + /// function context. + /// + /// Allows access to the buffer that holds the current position with no + /// extra copy + /// In order to use this API, one must guarantee that reading the specified + /// length may not cross into a different buffer. Users of this API are + /// expected to be aware of their buffer size (selectable in the + /// [`DmaStreamReaderBuilder`]) and act accordingly. + /// + /// The buffer is also not released until the returned [`ReadResult`] goes + /// out of scope. So if you plan to keep this alive for a long time this + /// is probably the wrong API. + /// + /// If EOF is hit while reading with this method, the number of bytes in the + /// returned buffer will be less than number requested. + /// + /// Let's say you want to open a file and check if its header is sane: this + /// is a good API for that. + /// + /// But if after such header there is an index that you want to keep in + /// memory, then you are probably better off with one of the methods + /// from [`AsyncReadExt`]. + /// + /// [`get_buffer_aligned`]: Self::get_buffer_aligned + /// [`DmaStreamReader`]: struct.DmaStreamReader.html + /// [`DmaStreamReaderBuilder`]: struct.DmaStreamReaderBuilder.html + /// [`AsyncReadExt`]: https://docs.rs/futures-lite/1.11.2/futures_lite/io/trait.AsyncReadExt.html + /// [`ReadResult`]: struct.ReadResult.html + pub fn poll_get_buffer_aligned( + &mut self, + cx: &mut Context<'_>, + len: u64, + ) -> Poll> { if len == 0 { - return Ok(ReadResult::empty_buffer()); + return Poll::Ready(Ok(ReadResult::empty_buffer())); } let (start_id, end_id, buffer_size) = { @@ -574,15 +612,18 @@ impl DmaStreamReader { }; if start_id != end_id { - return Err(GlommioError::<()>::WouldBlock(ResourceType::File(format!( - "Reading {} bytes from position {} would cross a buffer boundary (Buffer size {})", - len, self.current_pos, buffer_size + return Poll::Ready(Err(GlommioError::<()>::WouldBlock(ResourceType::File( + format!( + "Reading {} bytes from position {} would cross a buffer boundary (Buffer size \ + {})", + len, self.current_pos, buffer_size + ), )))); } - let x = poll_fn(|cx| self.get_buffer(cx, len, start_id)).await?; + let x = ready!(self.poll_get_buffer(cx, len, start_id))?; self.skip(len); - Ok(x) + Poll::Ready(Ok(x)) } fn get_buffer(