Skip to content

Commit

Permalink
Networking improvements (#26)
Browse files Browse the repository at this point in the history
* Rework networking a bit, add some functions

* Update to new lints
  • Loading branch information
MarcusGrass authored May 5, 2024
1 parent 9961f0a commit 2d573ad
Show file tree
Hide file tree
Showing 33 changed files with 1,483 additions and 91 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions Notes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# General dev-notes

Running multi-arch is difficult.

## Qemu

`qemu-aarch64` version 9.0.0 [seems to have an issue](https://gitlab.com/qemu-project/qemu/-/issues/2326).
Use a previous version, 0.7.2 works, manifests as VDSO-image address-alignment being zero, which
causes a div-by-zero.
1 change: 0 additions & 1 deletion Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ At present, the minimal WM builds statically pie-linked at `790K`.
both with and without an allocator/threading.



## License

This project and any contributions are licensed under [MPL-2.0](LICENSE).
Expand Down
2 changes: 1 addition & 1 deletion rusl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ alloc = []
integration-test = []

[dependencies]
linux-rust-bindings = { version = "0.1.1", features = ["all"] }
linux-rust-bindings = { version = "0.1.3", features = ["all"] }
sc = "0.2.7"

[dev-dependencies]
2 changes: 1 addition & 1 deletion rusl/src/futex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub fn futex_wait(
val,
timeout
.as_ref()
.map_or_else(core::ptr::null, |ts| ts as *const TimeSpec),
.map_or_else(core::ptr::null, core::ptr::from_ref::<TimeSpec>),
0,
0
)
Expand Down
194 changes: 183 additions & 11 deletions rusl/src/io_uring/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,15 @@ use crate::io_uring::{
io_uring_enter, io_uring_register_buffers, io_uring_register_files,
io_uring_register_io_slices, io_uring_setup, setup_io_uring,
};
use crate::network::{bind, connect, listen, socket};
use crate::platform::{
AddressFamily, Fd, IoSlice, IoSliceMut, IoUring, IoUringCompletionQueueEntry,
IoUringEnterFlags, IoUringParamFlags, IoUringParams, IoUringSQEFlags,
IoUringSubmissionQueueEntry, Mode, NonNegativeI32, OpenFlags, RenameFlags, SocketAddress,
IoUringSubmissionQueueEntry, Mode, OpenFlags, PollAddMultiFlags, PollEvents, RenameFlags,
SocketFlags, SocketOptions, SocketType, StatxFlags, StatxMask, TimeSpec, STDERR, STDIN, STDOUT,
};
use crate::string::unix_str::UnixStr;
use crate::time::clock_get_monotonic_time;
use crate::unistd::{close, open, open_mode, read, stat, unlink, unlink_flags, UnlinkFlags};
use crate::unistd::{close, open, open_mode, read, stat, unlink_flags, UnlinkFlags};

#[test]
fn uring_setup() {
Expand Down Expand Up @@ -364,7 +363,10 @@ fn uring_single_socket() {
}

#[test]
fn uring_single_accept() {
#[cfg(feature = "alloc")]
#[allow(clippy::cast_sign_loss, clippy::too_many_lines)]
fn uring_unix_accept_send_recv() {
use crate::network::{bind_unix, connect_unix, listen, socket};
let Some(mut uring) = setup_ignore_enosys(8, IoUringParamFlags::empty()) else {
return;
};
Expand All @@ -376,20 +378,25 @@ fn uring_single_accept() {
.unwrap();
let sock_path =
unsafe { UnixStr::from_str_unchecked("test-files/io_uring/test-sock-accept\0") };
let addr = SocketAddress::try_from_unix(sock_path).unwrap();
let addr = crate::platform::SocketAddressUnix::try_from_unix(sock_path).unwrap();
// Ensure socket doesn't exist before test
if let Err(e) = stat(sock_path) {
assert_eq!(Errno::ENOENT, e.code.unwrap());
} else {
unlink(sock_path).unwrap();
crate::unistd::unlink(sock_path).unwrap();
}
bind(server_socket, &addr).unwrap();
listen(server_socket, NonNegativeI32::comptime_checked_new(100)).unwrap();
bind_unix(server_socket, &addr).unwrap();
listen(
server_socket,
crate::platform::NonNegativeI32::comptime_checked_new(100),
)
.unwrap();
let user_data = 10011;
let entry = unsafe {
IoUringSubmissionQueueEntry::new_accept(
IoUringSubmissionQueueEntry::new_accept_unix(
server_socket,
&addr,
core::ptr::null_mut(),
core::ptr::null_mut(),
SocketFlags::SOCK_CLOEXEC | SocketFlags::SOCK_NONBLOCK,
user_data,
// Run as async since we know we won't be able to connect yet
Expand All @@ -408,11 +415,142 @@ fn uring_single_accept() {
0,
)
.unwrap();
connect(conn_sock, &addr).unwrap();
connect_unix(conn_sock, &addr).unwrap();
io_uring_enter(uring.fd, 0, 1, IoUringEnterFlags::IORING_ENTER_GETEVENTS).unwrap();
let cqe = uring.get_next_cqe().unwrap();
assert_eq!(user_data, cqe.0.user_data, "Bad user data in cqe {cqe:?}");
assert!(cqe.0.res >= 0, "Failed res for cqe: {cqe:?}");
let client_to_server_socket = Fd::try_new(cqe.0.res).unwrap();
let next_slot = uring.get_next_sqe_slot().unwrap();
let msg_content = b"Ping!";
let msg_fd = open(
unix_lit!("test-files/io_uring/uring_recv_send_pass_fd.txt"),
OpenFlags::O_RDONLY,
)
.unwrap();
let mut file_buf_orig = [0u8; 64];
read(msg_fd, &mut file_buf_orig).unwrap();
let msg_fds = [msg_fd];
unsafe {
let io = [IoSlice::new(msg_content)];
let msg = crate::platform::MsgHdrBorrow::create_send(
None,
&io,
Some(crate::platform::ControlMessageSend::ScmRights(&msg_fds)),
);
*next_slot = IoUringSubmissionQueueEntry::new_sendmsg(
conn_sock,
&msg,
0,
99,
IoUringSQEFlags::empty(),
);
uring.flush_submission_queue();
io_uring_enter(uring.fd, 1, 1, IoUringEnterFlags::IORING_ENTER_GETEVENTS).unwrap();
let cqe = uring.get_next_cqe().unwrap();
assert_eq!(msg_content.len(), cqe.0.res as usize);
assert_eq!(99, cqe.0.user_data);
let next_slot = uring.get_next_sqe_slot().unwrap();
let mut buf = [0u8; 32];
let mut ctrl_buf = [0u8; 126];
let mut iov = [IoSliceMut::new(&mut buf)];
let mut recv_hdr =
crate::platform::MsgHdrBorrow::create_recv(&mut iov, Some(&mut ctrl_buf));
*next_slot = IoUringSubmissionQueueEntry::new_recvmsg(
client_to_server_socket,
core::ptr::addr_of_mut!(recv_hdr).cast(),
0,
999,
IoUringSQEFlags::empty(),
);
uring.flush_submission_queue();
io_uring_enter(uring.fd, 1, 1, IoUringEnterFlags::IORING_ENTER_GETEVENTS).unwrap();
let cqe = uring.get_next_cqe().unwrap();
assert!(cqe.0.res > 0, "Got error result {}", cqe.0.res);
assert_eq!(msg_content.len(), cqe.0.res as usize);
assert_eq!(b"Ping!", &buf[..5]);
let mut ctrl = recv_hdr.control_messages();
let fd = ctrl.next().unwrap();
assert!(ctrl.next().is_none());
match fd {
crate::platform::ControlMessageSend::ScmRights(fds) => {
assert_eq!(1, fds.len());
assert!(msg_fd < fds[0]);
// Since it's the same fd a reseek is necessary here
let off = crate::unistd::lseek(fds[0], 0, crate::unistd::Whence::SET).unwrap();
assert_eq!(0, off);
let mut file_buf_passed = [0u8; 64];
read(fds[0], &mut file_buf_passed).unwrap();
assert_eq!(file_buf_orig, file_buf_passed);
let expect = b"Text content!\n";
assert_eq!(expect, &file_buf_passed[..14]);
}
}
}
}

#[test]
#[cfg(feature = "alloc")]
#[allow(clippy::cast_sign_loss, clippy::too_many_lines)]
fn uring_tcp_accept() {
use crate::network::{bind_inet, listen, socket};
const FIFTEEN: crate::platform::NonNegativeI32 =
crate::platform::NonNegativeI32::comptime_checked_new(15);
let Some(mut uring) = setup_ignore_enosys(8, IoUringParamFlags::empty()) else {
return;
};
// Dynamically assign port
let mut addr = crate::platform::SocketAddressInet::new([0, 0, 0, 0], 0);
let srv_sock = socket(
AddressFamily::AF_INET,
SocketOptions::new(SocketType::SOCK_STREAM, SocketFlags::SOCK_CLOEXEC),
6,
)
.unwrap();
bind_inet(srv_sock, &addr).unwrap();
// Get port
let sockname = crate::network::get_inet_sock_name(srv_sock).unwrap();
addr.0.sin_port = sockname.0.sin_port;
listen(srv_sock, FIFTEEN).unwrap();
let user_data = 10012;
let entry = unsafe {
IoUringSubmissionQueueEntry::new_accept_inet(
srv_sock,
core::ptr::null_mut(),
core::ptr::null_mut(),
SocketFlags::SOCK_CLOEXEC | SocketFlags::SOCK_NONBLOCK,
user_data,
// Run as async since we know we won't be able to connect yet
IoUringSQEFlags::IOSQE_ASYNC,
)
};
// We actually have to handle this async since we're on a single thread and accept will block
// for a connect
let next_slot = uring.get_next_sqe_slot().unwrap();
unsafe { next_slot.write(entry) }
uring.flush_submission_queue();
io_uring_enter(uring.fd, 1, 0, IoUringEnterFlags::empty()).unwrap();
let conn_sock = socket(
AddressFamily::AF_INET,
SocketOptions::new(SocketType::SOCK_STREAM, SocketFlags::SOCK_CLOEXEC),
6,
)
.unwrap();
crate::network::connect_inet(conn_sock, &addr).unwrap();
io_uring_enter(uring.fd, 0, 1, IoUringEnterFlags::IORING_ENTER_GETEVENTS).unwrap();
let cqe = uring.get_next_cqe().unwrap();
assert_eq!(user_data, cqe.0.user_data, "Bad user data in cqe {cqe:?}");
assert!(cqe.0.res >= 0, "Failed res for cqe: {cqe:?}");
let server_to_client_socket = Fd::try_new(cqe.0.res).unwrap();
let msg_content = b"Ping!";
crate::unistd::write(server_to_client_socket, msg_content).unwrap();
let mut ret_buf = [0u8; 5];
crate::unistd::read(conn_sock, &mut ret_buf).unwrap();
assert_eq!(&ret_buf, msg_content);
// Todo: Better cleanup on test failures
let _ = crate::unistd::close(server_to_client_socket);
let _ = crate::unistd::close(conn_sock);
let _ = crate::unistd::close(srv_sock);
}

fn write_await_single_entry(
Expand Down Expand Up @@ -781,3 +919,37 @@ fn uring_multi_linked_crud() {
}
}
}

#[test]
fn poll_add() {
let Some(mut uring) = setup_ignore_enosys(8, IoUringParamFlags::empty()) else {
return;
};
let poll_file_path = unix_lit!("test-files/io_uring/tmp_uring_poll_add_1.txt");
let poll_fd = open_mode(
poll_file_path,
OpenFlags::O_CREAT | OpenFlags::O_RDWR,
Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IRGRP | Mode::S_IROTH,
)
.unwrap();
unsafe {
let sqe = uring.get_next_sqe_slot().unwrap();
*sqe = IoUringSubmissionQueueEntry::new_poll_add(
poll_fd,
PollEvents::POLLIN | PollEvents::POLLOUT,
PollAddMultiFlags::empty(),
0,
IoUringSQEFlags::empty(),
);
}
uring.flush_submission_queue();
assert_eq!(
1,
io_uring_enter(uring.fd, 1, 1, IoUringEnterFlags::IORING_ENTER_GETEVENTS).unwrap()
);
let next = uring.get_next_cqe().unwrap();
assert_eq!(
next.0.res,
i32::from((PollEvents::POLLIN | PollEvents::POLLOUT).0)
);
}
14 changes: 10 additions & 4 deletions rusl/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
pub use accept::accept;
pub use bind::bind;
pub use connect::connect;
pub use accept::{accept_inet, accept_unix};
pub use bind::{bind_inet, bind_unix};
pub use connect::{connect_inet, connect_unix};
pub use listen::listen;
pub use socket::socket;

#[cfg(feature = "alloc")]
pub use socket::{get_inet_sock_name, get_unix_sock_name, recvmsg, sendmsg, socket};
#[cfg(not(feature = "alloc"))]
pub use socket::{get_inet_sock_name, get_unix_sock_name, socket};

mod accept;
mod bind;
mod connect;
mod listen;
mod socket;
#[cfg(all(test, feature = "alloc"))]
mod test;
56 changes: 47 additions & 9 deletions rusl/src/network/accept.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,57 @@
use core::mem::MaybeUninit;
use sc::syscall;

use crate::platform::{Fd, SocketArg, SocketFlags};
use crate::platform::{Fd, SocketAddressInet, SocketAddressUnix, SocketArgUnix, SocketFlags};
use crate::Result;

/// Accept a new connection and set flags on the new connection's `Fd`
/// Accept a new unix-connection and set flags on the new connection's `Fd`
/// Accepted flags are 0, `SOCK_NONBLOCK` an `SOCK_CLOEXEC`
/// The `socket_address` is the peer address, if applicable
/// See [Linux documentation for more details](https://man7.org/linux/man-pages/man2/accept.2.html)
/// # Errors
/// See above
#[inline]
pub fn accept(sock_fd: Fd, socket_address: Option<&SocketArg>, flags: SocketFlags) -> Result<Fd> {
let (addr, addr_len) = socket_address.map_or((0, 0), |addr| {
(core::ptr::addr_of!(addr.addr) as usize, addr.addr_len)
});
let res = unsafe { syscall!(ACCEPT4, sock_fd.0, addr, addr_len, flags.0) };
Fd::coerce_from_register(res, "`ACCEPT4` syscall failed")
pub fn accept_unix(sock_fd: Fd, flags: SocketFlags) -> Result<(Fd, SocketArgUnix)> {
let mut addr = MaybeUninit::zeroed();
let mut addr_len = core::mem::size_of::<SocketAddressUnix>();
let res = unsafe {
syscall!(
ACCEPT4,
sock_fd.0,
core::ptr::addr_of_mut!(addr),
core::ptr::addr_of_mut!(addr_len),
flags.0
)
};
let fd = Fd::coerce_from_register(res, "`ACCEPT4` syscall failed")?;
unsafe {
Ok((
fd,
SocketArgUnix {
addr: addr.assume_init(),
addr_len,
},
))
}
}

/// Accept a new tcp-connection and set flags on the new connection's `Fd`
/// Accepted flags are 0, `SOCK_NONBLOCK` an `SOCK_CLOEXEC`
/// See [Linux documentation for more details](https://man7.org/linux/man-pages/man2/accept.2.html)
/// # Errors
/// See above
#[inline]
pub fn accept_inet(sock_fd: Fd, flags: SocketFlags) -> Result<(Fd, SocketAddressInet)> {
let mut addr = MaybeUninit::zeroed();
let mut addr_len = core::mem::size_of::<SocketAddressUnix>();
let res = unsafe {
syscall!(
ACCEPT4,
sock_fd.0,
core::ptr::addr_of_mut!(addr),
core::ptr::addr_of_mut!(addr_len),
flags.0
)
};
let fd = Fd::coerce_from_register(res, "`ACCEPT4` syscall failed")?;
unsafe { Ok((fd, addr.assume_init())) }
}
Loading

0 comments on commit 2d573ad

Please sign in to comment.