diff --git a/examples/h1-server-multishot.rs b/examples/h1-server-multishot.rs index 3493672..8df8b8e 100644 --- a/examples/h1-server-multishot.rs +++ b/examples/h1-server-multishot.rs @@ -1,55 +1,56 @@ -use nuclei::*; -use std::net::TcpListener; +#[cfg(target_os = "linux")] +fn main() -> anyhow::Result<()> { + use nuclei::*; + use std::net::TcpListener; -use futures::stream::StreamExt; -use anyhow::Result; -use async_dup::Arc; + use futures::stream::StreamExt; + use anyhow::Result; + use async_dup::Arc; -use futures::prelude::*; -use http_types::{Request, Response, StatusCode}; + use futures::prelude::*; + use http_types::{Request, Response, StatusCode}; -///////////////////////////////////////////////////////////////////////// -////// NOTE: This example can only be run by IO_URING backend. -////// If you try to use epoll, it will not compile. -////// Reason is: Multishot based IO is only part of io_uring backend. -///////////////////////////////////////////////////////////////////////// + ///////////////////////////////////////////////////////////////////////// + ////// NOTE: This example can only be run by IO_URING backend. + ////// If you try to use epoll, it will not compile. + ////// Reason is: Multishot based IO is only part of io_uring backend. + ///////////////////////////////////////////////////////////////////////// -static DATA: &'static str = include_str!("data/quark-gluon-plasma"); + static DATA: &'static str = include_str!("data/quark-gluon-plasma"); -/// Serves a request and returns a response. -async fn serve(req: Request) -> http_types::Result { - // println!("Serving {}", req.url()); + /// Serves a request and returns a response. + async fn serve(req: Request) -> http_types::Result { + // println!("Serving {}", req.url()); - let mut res = Response::new(StatusCode::Ok); - res.insert_header("Content-Type", "text/plain"); - res.set_body(DATA); - Ok(res) -} - -/// Listens for incoming connections and serves them. -async fn listen(listener: Handle) -> Result<()> { - // Format the full host address. - let host = format!("http://{}", listener.get_ref().local_addr()?); - println!("Listening on {}", host); - - let mut streams = listener.accept_multi().await?; - - while let Some(stream) = streams.next().await { - // Spawn a background task serving this connection. - let stream = Arc::new(stream); - spawn(async move { - if let Err(err) = async_h1::accept(stream, serve).await { - println!("Connection error: {:#?}", err); - } - }) - .detach(); + let mut res = Response::new(StatusCode::Ok); + res.insert_header("Content-Type", "text/plain"); + res.set_body(DATA); + Ok(res) } - Ok(()) -} + /// Listens for incoming connections and serves them. + async fn listen(listener: Handle) -> Result<()> { + // Format the full host address. + let host = format!("http://{}", listener.get_ref().local_addr()?); + println!("Listening on {}", host); + + let mut streams = listener.accept_multi().await?; + + while let Some(stream) = streams.next().await { + // Spawn a background task serving this connection. + let stream = Arc::new(stream); + spawn(async move { + if let Err(err) = async_h1::accept(stream, serve).await { + println!("Connection error: {:#?}", err); + } + }) + .detach(); + } + + Ok(()) + } -fn main() -> Result<()> { spawn_blocking(|| drive(future::pending::<()>())).detach(); block_on(async { @@ -59,3 +60,8 @@ fn main() -> Result<()> { Ok(()) }) } + +#[cfg(target_os = "macos")] +fn main() { + panic!("This example can only be run by IO_URING backend."); +} \ No newline at end of file diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..304cd5e --- /dev/null +++ b/src/config.rs @@ -0,0 +1,72 @@ + +/// +/// Nuclei's proactor configuration. +#[derive(Clone, Debug, Default)] +pub struct NucleiConfig { + /// **IO_URING Configuration** allows you to configure [io_uring](https://unixism.net/loti/what_is_io_uring.html) backend. + pub iouring: IoUringConfiguration +} + +/// **IO_URING Configuration** +/// 5.19+ Kernel is required to operate IO_URING. +#[derive(Clone, Debug)] +pub struct IoUringConfiguration { + /// Allowed entries in both submission and completion queues. + /// + /// **[default]**: By default this value is 2048. + pub queue_len: u32, + /// SQPOLL kernel thread wake up interval. + /// + /// Before version 5.11 of the Linux kernel, to successfully use this feature, the application + /// must register a set of files to be used for IO through io_uring_register(2) using the + /// `IORING_REGISTER_FILES` opcode. Failure to do so will result in submitted IO being errored + /// with EBADF. The presence of this feature can be detected by the `IORING_FEAT_SQPOLL_NONFIXED` + /// feature flag. In version 5.11 and later, it is no longer necessary to register files to use + /// this feature. 5.11 also allows using this as non-root, if the user has the CAP_SYS_NICE + /// capability. In 5.13 this requirement was also relaxed, and no special privileges are needed + /// for SQPOLL in newer kernels. Certain stable kernels older than 5.13 may also support + /// unprivileged SQPOLL. + /// + /// Decreasing this will put more pressure to kernel, increases cpu usage. + /// Increasing it will slow down completion push rate from kernel. + /// This config is in milliseconds. + /// + /// **[default]**: If [None] then SQPOLL will be disabled. + pub sqpoll_wake_interval: Option, + + /// Get and/or set the limit for number of io_uring worker threads per NUMA + /// node. `per_numa_bounded_worker_count` holds the limit for bounded workers, + /// which process I/O operations expected to be bound in time, + /// that is I/O on regular files or block devices. Passing `0` does not change + /// the current limit. + /// + /// **[default]**: If [None] then Nuclei default value of will be used `256`. + /// + /// If [None] passed, by default, the amount of bounded IO workers is limited to how + /// many SQ entries the ring was setup with, or 4 times the number of + /// online CPUs in the system, whichever is smaller. + pub per_numa_bounded_worker_count: Option, + + /// This `per_numa_unbounded_worker_count` holds the limit for unbounded workers, + /// which carry out I/O operations that can never complete, for instance I/O + /// on sockets. Passing `0` does not change the current limit. + /// + /// **[default]**: If [None] then Nuclei default value of will be used `512`. + /// + /// If [None] passed unbounded workers will be limited by the process task limit, + /// as indicated by the rlimit [RLIMIT_NPROC](https://man7.org/linux/man-pages/man2/getrlimit.2.html) limit. + pub per_numa_unbounded_worker_count: Option, + + // XXX: `redrive_kthread_wake` = bool, syncs queue changes so kernel threads got awakened. increased cpu usage. +} + +impl Default for IoUringConfiguration { + fn default() -> Self { + Self { + queue_len: 1 << 11, + sqpoll_wake_interval: Some(2), + per_numa_bounded_worker_count: Some(1 << 8), + per_numa_unbounded_worker_count: Some(1 << 9), + } + } +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index f15c018..eaeeae2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,6 +38,8 @@ mod submission_handler; mod sys; mod utils; mod waker; +/// Nuclei's configuration options reside here. +pub mod config; #[cfg(not(any( target_os = "linux", // epoll, iouring diff --git a/src/proactor.rs b/src/proactor.rs index 5c1c91b..7c51553 100644 --- a/src/proactor.rs +++ b/src/proactor.rs @@ -1,12 +1,15 @@ use std::task::{Context, Poll}; use std::time::Duration; use std::{future::Future, io}; +use std::ops::DerefMut; -use once_cell::sync::Lazy; +use once_cell::sync::{Lazy, OnceCell}; +use crate::config::NucleiConfig; use super::syscore::*; use super::waker::*; use crate::spawn_blocking; +use crate::sys::IoBackend; pub use super::handle::*; @@ -16,13 +19,26 @@ pub struct Proactor(SysProactor); unsafe impl Send for Proactor {} unsafe impl Sync for Proactor {} +static mut PROACTOR: OnceCell = OnceCell::new(); + impl Proactor { /// Returns a reference to the proactor. pub fn get() -> &'static Proactor { - static PROACTOR: Lazy = - Lazy::new(|| Proactor(SysProactor::new().expect("cannot initialize poll backend"))); + unsafe { + &PROACTOR.get_or_init(|| { + Proactor(SysProactor::new(NucleiConfig::default()).expect("cannot initialize IO backend")) + }) + } + } - &PROACTOR + /// Builds a proactor instance with given config and returns a reference to it. + pub fn with_config(config: NucleiConfig) -> &'static Proactor { + unsafe { + let proactor = Proactor(SysProactor::new(config.clone()).expect("cannot initialize IO backend")); + PROACTOR.set(proactor); + let proactor = Proactor(SysProactor::new(config).expect("cannot initialize IO backend")); + &PROACTOR.get_or_init(|| proactor) + } } /// Wakes the thread waiting on proactor. @@ -35,14 +51,25 @@ impl Proactor { self.0.wait(max_event_size, duration) } + /// Get the IO backend that is used with Nuclei's proactor. + pub fn backend() -> IoBackend { + BACKEND + } + /// Get underlying proactor instance. pub(crate) fn inner(&self) -> &SysProactor { &self.0 } + + #[cfg(all(feature = "iouring", target_os = "linux"))] + /// Get IO_URING backend probes + pub fn ring_params(&self) -> &rustix_uring::Parameters { + unsafe { IO_URING.as_ref().unwrap().params() } + } } /// -/// IO driver that drives event systems +/// IO driver that drives underlying event systems pub fn drive(future: impl Future) -> T { let p = Proactor::get(); let waker = waker_fn(move || { @@ -69,4 +96,4 @@ pub fn drive(future: impl Future) -> T { // let _duration = Duration::from_millis(1); let _ = driver.as_mut().poll(cx); } -} +} \ No newline at end of file diff --git a/src/sys.rs b/src/sys.rs index 3add5d4..56fdb19 100644 --- a/src/sys.rs +++ b/src/sys.rs @@ -1,3 +1,14 @@ +/// +/// Backends that are possible to use with Nuclei +pub enum IoBackend { + /// BSD-like backend + Kqueue, + /// Linux backend + Epoll, + /// Linux backend + IoUring +} + #[cfg(unix)] fn check_err(res: libc::c_int) -> Result { if res == -1 { diff --git a/src/syscore/bsd/kqueue.rs b/src/syscore/bsd/kqueue.rs index 96ab6fa..d71f66b 100644 --- a/src/syscore/bsd/kqueue.rs +++ b/src/syscore/bsd/kqueue.rs @@ -28,6 +28,7 @@ macro_rules! syscall { use socket2::SockAddr; use std::mem; use std::os::unix::net::SocketAddr as UnixSocketAddr; +use crate::config::NucleiConfig; fn max_len() -> usize { // The maximum read limit on most posix-like systems is `SSIZE_MAX`, @@ -151,7 +152,7 @@ pub struct SysProactor { } impl SysProactor { - pub(crate) fn new() -> io::Result { + pub(crate) fn new(config: NucleiConfig) -> io::Result { let kqueue_fd = kqueue()?; syscall!(fcntl(kqueue_fd, libc::F_SETFD, libc::FD_CLOEXEC))?; let (read_stream, write_stream) = UnixStream::pair()?; diff --git a/src/syscore/bsd/mod.rs b/src/syscore/bsd/mod.rs index c2b1c42..ab3ec73 100644 --- a/src/syscore/bsd/mod.rs +++ b/src/syscore/bsd/mod.rs @@ -7,3 +7,5 @@ pub(crate) use fs::*; pub(crate) use kqueue::*; pub(crate) use processor::*; + +pub const BACKEND: crate::sys::IoBackend = crate::sys::IoBackend::Kqueue; \ No newline at end of file diff --git a/src/syscore/linux/epoll/epoll.rs b/src/syscore/linux/epoll/epoll.rs index d35aaca..381a8ac 100644 --- a/src/syscore/linux/epoll/epoll.rs +++ b/src/syscore/linux/epoll/epoll.rs @@ -28,6 +28,7 @@ macro_rules! syscall { use socket2::SockAddr; use std::mem; use std::os::unix::net::SocketAddr as UnixSocketAddr; +use crate::config::NucleiConfig; fn max_len() -> usize { // The maximum read limit on most posix-like systems is `SSIZE_MAX`, @@ -148,7 +149,7 @@ pub struct SysProactor { } impl SysProactor { - pub(crate) fn new() -> io::Result { + pub(crate) fn new(config: NucleiConfig) -> io::Result { let epoll_fd: i32 = epoll_create1()?; let event_fd: i32 = syscall!(eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK))?; let event_fd_raw = event_fd; diff --git a/src/syscore/linux/epoll/mod.rs b/src/syscore/linux/epoll/mod.rs index 9f8136c..2911440 100644 --- a/src/syscore/linux/epoll/mod.rs +++ b/src/syscore/linux/epoll/mod.rs @@ -7,3 +7,5 @@ pub(crate) use epoll::*; pub(crate) use fs::*; pub(crate) use processor::*; + +pub const BACKEND: crate::sys::IoBackend = crate::sys::IoBackend::Epoll; \ No newline at end of file diff --git a/src/syscore/linux/iouring/iouring.rs b/src/syscore/linux/iouring/iouring.rs index 5678406..525fd81 100644 --- a/src/syscore/linux/iouring/iouring.rs +++ b/src/syscore/linux/iouring/iouring.rs @@ -33,6 +33,7 @@ use crossbeam_channel::{Receiver, Sender, unbounded}; use rustix::io_uring::IoringOp; use rustix_uring::{CompletionQueue, IoUring, SubmissionQueue, Submitter, squeue::Entry as SQEntry, cqueue::Entry as CQEntry}; use rustix_uring::cqueue::{more, sock_nonempty}; +use crate::config::NucleiConfig; fn max_len() -> usize { // The maximum read limit on most posix-like systems is `SSIZE_MAX`, @@ -136,8 +137,6 @@ pub(crate) fn shim_to_af_unix(sockaddr: &SockAddr) -> io::Result //// uring impl /////////////////// -const QUEUE_LEN: u32 = 1 << 11; - pub struct SysProactor { sq: TTas>, cq: TTas>, @@ -152,26 +151,32 @@ pub type RingTypes = ( Submitter<'static>, ); -static mut IO_URING: Option = None; +pub(crate) static mut IO_URING: Option = None; impl SysProactor { - pub(crate) fn new() -> io::Result { + pub(crate) fn new(config: NucleiConfig) -> io::Result { unsafe { - let ring = IoUring::builder() - .setup_sqpoll(2) - .build(QUEUE_LEN) + let mut rb = IoUring::builder(); + config.iouring.sqpoll_wake_interval.map(|e| rb.setup_sqpoll(e)); + let mut ring = rb.build(config.iouring.queue_len) .expect("nuclei: uring can't be initialized"); IO_URING = Some(ring); let (submitter, sq, cq) = IO_URING.as_mut().unwrap().split(); - submitter.register_iowq_max_workers(&mut [QUEUE_LEN*8, 16])?; + + match (config.iouring.per_numa_bounded_worker_count, config.iouring.per_numa_unbounded_worker_count) { + (Some(bw), Some(ubw)) => submitter.register_iowq_max_workers(&mut [bw, ubw])?, + (None, Some(ubw)) => submitter.register_iowq_max_workers(&mut [0, ubw])?, + (Some(bw), None) => submitter.register_iowq_max_workers(&mut [bw, 0])?, + (None, None) => submitter.register_iowq_max_workers(&mut [0, 0])?, + } Ok(SysProactor { sq: TTas::new(sq), cq: TTas::new(cq), sbmt: TTas::new(submitter), - submitters: TTas::new(HashMap::with_capacity(QUEUE_LEN as usize)), + submitters: TTas::new(HashMap::with_capacity(config.iouring.queue_len as usize)), submitter_id: AtomicU64::default(), }) } diff --git a/src/syscore/linux/iouring/mod.rs b/src/syscore/linux/iouring/mod.rs index b4fa7e7..71b7efb 100644 --- a/src/syscore/linux/iouring/mod.rs +++ b/src/syscore/linux/iouring/mod.rs @@ -9,3 +9,5 @@ pub(crate) use fs::*; pub(crate) use iouring::*; pub(crate) use nethandle::*; pub(crate) use processor::*; + +pub const BACKEND: crate::sys::IoBackend = crate::sys::IoBackend::IoUring; \ No newline at end of file