Skip to content

Commit

Permalink
add a poll variant to DmaStreamReader::get_buffer_aligned
Browse files Browse the repository at this point in the history
Since `DmaStreamReader` is a stream, we have to play well with other
streams and make our functions easy to use from a polling context.
`get_buffer_aligned` is an async function and, as such, is hard to use
from there (not impossible, just hard).

If we look at the `AsyncRead` trait, one may see that it defines only
poll functions. Async functions are then built on top in the
`AsyncReadExt.` This is done because creating a future from a poll
function is trivial, while the other way around is hard.

Therefore, we create a new function `poll_get_buffer_aligned` and we
reimplement `get_buffer_aligned` as a wrapper around it:

```rust
pub async fn get_buffer_aligned(&mut self, len: u64) -> Result<ReadResult> {
    poll_fn(|cx| self.poll_get_buffer_aligned(cx, len)).await
}
```
  • Loading branch information
HippoBaro committed Nov 4, 2021
1 parent 6df07a6 commit ace65de
Showing 1 changed file with 48 additions and 7 deletions.
55 changes: 48 additions & 7 deletions glommio/src/io/dma_file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use core::task::Waker;
use futures_lite::{
future::poll_fn,
io::{AsyncRead, AsyncWrite},
ready,
stream::{self, StreamExt},
};
use std::{
Expand Down Expand Up @@ -519,7 +520,7 @@ impl DmaStreamReader {

/// Allows access to the buffer that holds the current position with no
/// extra copy
//_ In order to use this API, one must guarantee that reading the specified
/// In order to use this API, one must guarantee that reading the specified
/// length may not cross into a different buffer. Users of this API are
/// expected to be aware of their buffer size (selectable in the
/// [`DmaStreamReaderBuilder`]) and act accordingly.
Expand Down Expand Up @@ -562,8 +563,45 @@ impl DmaStreamReader {
/// [`AsyncReadExt`]: https://docs.rs/futures-lite/1.11.2/futures_lite/io/trait.AsyncReadExt.html
/// [`ReadResult`]: struct.ReadResult.html
pub async fn get_buffer_aligned(&mut self, len: u64) -> Result<ReadResult> {
poll_fn(|cx| self.poll_get_buffer_aligned(cx, len)).await
}

/// A variant of [`get_buffer_aligned`] that can be called from a poll
/// function context.
///
/// Allows access to the buffer that holds the current position with no
/// extra copy
/// In order to use this API, one must guarantee that reading the specified
/// length may not cross into a different buffer. Users of this API are
/// expected to be aware of their buffer size (selectable in the
/// [`DmaStreamReaderBuilder`]) and act accordingly.
///
/// The buffer is also not released until the returned [`ReadResult`] goes
/// out of scope. So if you plan to keep this alive for a long time this
/// is probably the wrong API.
///
/// If EOF is hit while reading with this method, the number of bytes in the
/// returned buffer will be less than number requested.
///
/// Let's say you want to open a file and check if its header is sane: this
/// is a good API for that.
///
/// But if after such header there is an index that you want to keep in
/// memory, then you are probably better off with one of the methods
/// from [`AsyncReadExt`].
///
/// [`get_buffer_aligned`]: Self::get_buffer_aligned
/// [`DmaStreamReader`]: struct.DmaStreamReader.html
/// [`DmaStreamReaderBuilder`]: struct.DmaStreamReaderBuilder.html
/// [`AsyncReadExt`]: https://docs.rs/futures-lite/1.11.2/futures_lite/io/trait.AsyncReadExt.html
/// [`ReadResult`]: struct.ReadResult.html
pub fn poll_get_buffer_aligned(
&mut self,
cx: &mut Context<'_>,
len: u64,
) -> Poll<Result<ReadResult>> {
if len == 0 {
return Ok(ReadResult::empty_buffer());
return Poll::Ready(Ok(ReadResult::empty_buffer()));
}

let (start_id, end_id, buffer_size) = {
Expand All @@ -574,15 +612,18 @@ impl DmaStreamReader {
};

if start_id != end_id {
return Err(GlommioError::<()>::WouldBlock(ResourceType::File(format!(
"Reading {} bytes from position {} would cross a buffer boundary (Buffer size {})",
len, self.current_pos, buffer_size
return Poll::Ready(Err(GlommioError::<()>::WouldBlock(ResourceType::File(
format!(
"Reading {} bytes from position {} would cross a buffer boundary (Buffer size \
{})",
len, self.current_pos, buffer_size
),
))));
}

let x = poll_fn(|cx| self.get_buffer(cx, len, start_id)).await?;
let x = ready!(self.poll_get_buffer(cx, len, start_id))?;
self.skip(len);
Ok(x)
Poll::Ready(Ok(x))
}

fn get_buffer(
Expand Down

0 comments on commit ace65de

Please sign in to comment.