Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move msg functions to msg module #116

Merged
merged 3 commits into from
Apr 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 25 additions & 16 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,9 +484,9 @@ impl SubmissionQueue {

/// Setup a listener for user space messages.
///
/// The returned [`MsgListener`] iterator will return all messages send
/// using [`SubmissionQueue::try_send_msg`] and
/// [`SubmissionQueue::send_msg`] using the returned `MsgToken`.
/// The returned [`MsgListener`] will return all messages send using
/// [`msg::try_send_msg`] and [`msg::send_msg`] using the returned
/// `MsgToken`.
///
/// # Notes
///
Expand All @@ -503,34 +503,41 @@ impl SubmissionQueue {
/// don't use `MsgToken` after it became invalid. Furthermore to ensure
/// the creation of it succeeds it should be done early in the lifetime of
/// `Ring`.
///
/// This is deprecated, use [`msg::msg_listener`] instead.
#[deprecated(note = "use a10::msg::msg_listener instead")]
pub fn msg_listener(self) -> io::Result<(MsgListener, MsgToken)> {
MsgListener::new(self)
msg::msg_listener(self)
}

/// Try to send a message to iterator listening for message using `MsgToken`.
/// Try to send a message to iterator listening for message using [`MsgToken`].
///
/// This will use the io_uring submission queue to share `data` with the
/// receiving end. This means that it will wake up the thread if it's
/// currently [polling].
///
/// This will fail if the submission queue is currently full. See
/// [`SubmissionQueue::send_msg`] for a version that tries again when the
/// submission queue is full.
/// [`send_msg`] for a version that tries again when the submission queue is
/// full.
///
/// See [`msg_listener`] for examples.
///
/// See [`SubmissionQueue::msg_listener`] for examples.
/// This is deprecated, use [`msg::try_send_msg`] instead.
///
/// [polling]: Ring::poll
/// [`send_msg`]: msg::send_msg
/// [`msg_listener`]: msg::msg_listener
#[deprecated(note = "use a10::msg::try_send_msg instead")]
pub fn try_send_msg(&self, token: MsgToken, data: u32) -> io::Result<()> {
self.add_no_result(|submission| unsafe {
submission.msg(self.shared.ring_fd.as_raw_fd(), (token.0).0 as u64, data, 0);
submission.no_completion_event();
})?;
Ok(())
msg::try_send_msg(self, token, data)
}

/// Send a message to iterator listening for message using `MsgToken`.
/// Send a message to iterator listening for message using [`MsgToken`].
///
/// This is deprecated, use [`msg::send_msg`] instead.
#[deprecated(note = "use a10::msg::send_msg instead")]
pub const fn send_msg<'a>(&'a self, token: MsgToken, data: u32) -> SendMsg<'a> {
SendMsg::new(self, token, data)
msg::send_msg(self, token, data)
}

/// Wait for an event specified in `mask` on the file descriptor `fd`.
Expand All @@ -546,13 +553,15 @@ impl SubmissionQueue {
/// Returns an [`AsyncIterator`] that returns multiple events as specified
/// in `mask` on the file descriptor `fd`.
///
/// This is not the same as calling [`SubmissionQueue::oneshot_poll`] in a
/// This is not the same as calling [`oneshot_poll`] in a
/// loop as this uses a multishot operation, which means only a single
/// operation is created kernel side, making this more efficient.
///
/// This is deprecated, use [`poll::multishot_poll`] instead.
///
/// [`AsyncIterator`]: std::async_iter::AsyncIterator
///
/// [`oneshot_poll`]: poll::oneshot_poll
#[deprecated(note = "use a10::poll::multishot_poll instead")]
pub fn multishot_poll<'a>(&'a self, fd: BorrowedFd, mask: libc::c_int) -> MultishotPoll<'a> {
poll::multishot_poll(self, fd, mask)
Expand Down
90 changes: 66 additions & 24 deletions src/msg.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,53 @@
//! User space messages.
//!
//! To setup a [`MsgListener`] use [`SubmissionQueue::msg_listener`]. It returns
//! the listener as well as a [`MsgToken`], which can be used in
//! [`SubmissionQueue::try_send_msg`] and [`SubmissionQueue::send_msg`] to send
//! a message to the created `MsgListener`.
//! To setup a [`MsgListener`] use [`msg_listener`]. It returns the listener as
//! well as a [`MsgToken`], which can be used in [`try_send_msg`] and
//! [`send_msg`] to send a message to the created `MsgListener`.

use std::future::Future;
use std::io;
use std::os::fd::AsRawFd;
use std::pin::Pin;
use std::task::{self, Poll};

use crate::{OpIndex, SubmissionQueue};

/// Token used to the messages.
///
/// See [`SubmissionQueue::msg_listener`].
/// See [`msg_listener`].
#[derive(Copy, Clone, Debug)]
#[allow(clippy::module_name_repetitions)]
pub struct MsgToken(pub(crate) OpIndex);

/// [`AsyncIterator`] behind [`SubmissionQueue::msg_listener`].
/// Setup a listener for user space messages.
///
/// The returned [`MsgListener`] will return all messages send using
/// [`try_send_msg`] and [`send_msg`] using the returned `MsgToken`.
///
/// # Notes
///
/// This will return an error if too many operations are already queued,
/// this is usually resolved by calling [`Ring::poll`].
///
/// The returned `MsgToken` has an implicitly lifetime linked to
/// `MsgListener`. If `MsgListener` is dropped the `MsgToken` will
/// become invalid.
///
/// Due to the limitations mentioned above it's advised to consider the
/// usefulness of the type severly limited. The returned `MsgListener`
/// iterator should live for the entire lifetime of the `Ring`, to ensure we
/// don't use `MsgToken` after it became invalid. Furthermore to ensure
/// the creation of it succeeds it should be done early in the lifetime of
/// `Ring`.
///
/// [`Ring::poll`]: crate::Ring::poll
#[allow(clippy::module_name_repetitions)]
pub fn msg_listener(sq: SubmissionQueue) -> io::Result<(MsgListener, MsgToken)> {
let op_index = sq.queue_multishot()?;
Ok((MsgListener { sq, op_index }, MsgToken(op_index)))
}

/// [`AsyncIterator`] behind [`msg_listener`].
///
/// [`AsyncIterator`]: std::async_iter::AsyncIterator
#[derive(Debug)]
Expand All @@ -31,12 +59,6 @@ pub struct MsgListener {
}

impl MsgListener {
/// Create a new `MsgListener`.
pub(crate) fn new(sq: SubmissionQueue) -> io::Result<(MsgListener, MsgToken)> {
let op_index = sq.queue_multishot()?;
Ok((MsgListener { sq, op_index }, MsgToken(op_index)))
}

/// This is the same as the `AsyncIterator::poll_next` function, but then
/// available on stable Rust.
pub fn poll_next(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Option<u32>> {
Expand All @@ -63,28 +85,48 @@ impl std::async_iter::AsyncIterator for MsgListener {
}
}

/// [`Future`] behind [`SubmissionQueue::send_msg`].
/// Try to send a message to iterator listening for message using [`MsgToken`].
///
/// This will use the io_uring submission queue to share `data` with the
/// receiving end. This means that it will wake up the thread if it's
/// currently [polling].
///
/// This will fail if the submission queue is currently full. See [`send_msg`]
/// for a version that tries again when the submission queue is full.
///
/// See [`msg_listener`] for examples.
///
/// [polling]: crate::Ring::poll
#[allow(clippy::module_name_repetitions)]
pub fn try_send_msg(sq: &SubmissionQueue, token: MsgToken, data: u32) -> io::Result<()> {
sq.add_no_result(|submission| unsafe {
submission.msg(sq.shared.ring_fd.as_raw_fd(), (token.0).0 as u64, data, 0);
submission.no_completion_event();
})?;
Ok(())
}

/// Send a message to iterator listening for message using [`MsgToken`].
#[allow(clippy::module_name_repetitions)]
pub const fn send_msg<'sq>(sq: &'sq SubmissionQueue, token: MsgToken, data: u32) -> SendMsg<'sq> {
SendMsg { sq, token, data }
}

/// [`Future`] behind [`send_msg`].
#[derive(Debug)]
#[must_use = "`Future`s do nothing unless polled"]
#[allow(clippy::module_name_repetitions)]
pub struct SendMsg<'a> {
sq: &'a SubmissionQueue,
pub struct SendMsg<'sq> {
sq: &'sq SubmissionQueue,
token: MsgToken,
data: u32,
}

impl<'a> SendMsg<'a> {
/// Create a new `SendMsg`.
pub(crate) const fn new(sq: &'a SubmissionQueue, token: MsgToken, data: u32) -> SendMsg {
SendMsg { sq, token, data }
}
}

impl<'a> Future for SendMsg<'a> {
impl<'sq> Future for SendMsg<'sq> {
type Output = io::Result<()>;

fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
match self.sq.try_send_msg(self.token, self.data) {
match try_send_msg(self.sq, self.token, self.data) {
Ok(()) => Poll::Ready(Ok(())),
Err(_) => {
self.sq.wait_for_submission(ctx.waker().clone());
Expand Down
17 changes: 8 additions & 9 deletions src/poll.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
//! Poll for file descriptor events.
//!
//! To wait for events on a file descriptor use:
//! * [`SubmissionQueue::oneshot_poll`] a [`Future`] returning a single
//! [`PollEvent`].
//! * [`SubmissionQueue::multishot_poll`] an [`AsyncIterator`] returning
//! multiple [`PollEvent`]s.
//! * [`oneshot_poll`] a [`Future`] returning a single [`PollEvent`].
//! * [`multishot_poll`] an [`AsyncIterator`] returning multiple
//! [`PollEvent`]s.
//!
//! [`AsyncIterator`]: std::async_iter::AsyncIterator

Expand Down Expand Up @@ -36,7 +35,7 @@ pub fn oneshot_poll<'sq>(
}
}

/// [`Future`] behind [`SubmissionQueue::oneshot_poll`].
/// [`Future`] behind [`oneshot_poll`].
#[derive(Debug)]
#[must_use = "`Future`s do nothing unless polled"]
#[allow(clippy::module_name_repetitions)]
Expand Down Expand Up @@ -104,9 +103,9 @@ impl<'sq> Drop for OneshotPoll<'sq> {
/// Returns an [`AsyncIterator`] that returns multiple events as specified
/// in `mask` on the file descriptor `fd`.
///
/// This is not the same as calling [`SubmissionQueue::oneshot_poll`] in a
/// loop as this uses a multishot operation, which means only a single
/// operation is created kernel side, making this more efficient.
/// This is not the same as calling [`oneshot_poll`] in a loop as this uses a
/// multishot operation, which means only a single operation is created kernel
/// side, making this more efficient.
///
/// [`AsyncIterator`]: std::async_iter::AsyncIterator
#[allow(clippy::module_name_repetitions)]
Expand All @@ -121,7 +120,7 @@ pub fn multishot_poll<'sq>(
}
}

/// [`AsyncIterator`] behind [`SubmissionQueue::multishot_poll`].
/// [`AsyncIterator`] behind [`multishot_poll`].
///
/// [`AsyncIterator`]: std::async_iter::AsyncIterator
#[derive(Debug)]
Expand Down
10 changes: 6 additions & 4 deletions tests/ring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::time::{Duration, Instant};
use a10::cancel::Cancel;
use a10::fs::OpenOptions;
use a10::io::ReadBufPool;
use a10::msg::{MsgListener, MsgToken, SendMsg};
use a10::msg::{msg_listener, send_msg, try_send_msg, MsgListener, MsgToken, SendMsg};
use a10::poll::{multishot_poll, oneshot_poll, MultishotPoll, OneshotPoll};
use a10::{mem, process, AsyncFd, Config, Ring, SubmissionQueue};

Expand Down Expand Up @@ -279,13 +279,15 @@ fn message_sending() {
is_send::<MsgToken>();
is_sync::<MsgToken>();

let (msg_listener, msg_token) = sq.clone().msg_listener().unwrap();
let (msg_listener, msg_token) = msg_listener(sq.clone()).unwrap();
let mut msg_listener = pin!(msg_listener);
start_mulitshot_op(msg_listener.as_mut());

// Send some messages.
sq.try_send_msg(msg_token, DATA1).unwrap();
waker.block_on(pin!(sq.send_msg(msg_token, DATA2))).unwrap();
try_send_msg(&sq, msg_token, DATA1).unwrap();
waker
.block_on(pin!(send_msg(&sq, msg_token, DATA2)))
.unwrap();

assert_eq!(waker.block_on(next(msg_listener.as_mut())), Some(DATA1));
assert_eq!(waker.block_on(next(msg_listener.as_mut())), Some(DATA2));
Expand Down
Loading