Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add backend selector #35

Merged
merged 8 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 48 additions & 42 deletions examples/h1-server-multishot.rs
Original file line number Diff line number Diff line change
@@ -1,55 +1,56 @@
use nuclei::*;
use std::net::TcpListener;
#[cfg(target_os = "linux")]
fn main() -> anyhow::Result<()> {
use nuclei::*;
use std::net::TcpListener;

use futures::stream::StreamExt;
use anyhow::Result;
use async_dup::Arc;
use futures::stream::StreamExt;
use anyhow::Result;
use async_dup::Arc;

use futures::prelude::*;
use http_types::{Request, Response, StatusCode};
use futures::prelude::*;
use http_types::{Request, Response, StatusCode};

/////////////////////////////////////////////////////////////////////////
////// NOTE: This example can only be run by IO_URING backend.
////// If you try to use epoll, it will not compile.
////// Reason is: Multishot based IO is only part of io_uring backend.
/////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////
////// NOTE: This example can only be run by IO_URING backend.
////// If you try to use epoll, it will not compile.
////// Reason is: Multishot based IO is only part of io_uring backend.
/////////////////////////////////////////////////////////////////////////


static DATA: &'static str = include_str!("data/quark-gluon-plasma");
static DATA: &'static str = include_str!("data/quark-gluon-plasma");

/// Serves a request and returns a response.
async fn serve(req: Request) -> http_types::Result<Response> {
// println!("Serving {}", req.url());
/// Serves a request and returns a response.
async fn serve(req: Request) -> http_types::Result<Response> {
// println!("Serving {}", req.url());

let mut res = Response::new(StatusCode::Ok);
res.insert_header("Content-Type", "text/plain");
res.set_body(DATA);
Ok(res)
}

/// Listens for incoming connections and serves them.
async fn listen(listener: Handle<TcpListener>) -> Result<()> {
// Format the full host address.
let host = format!("http://{}", listener.get_ref().local_addr()?);
println!("Listening on {}", host);

let mut streams = listener.accept_multi().await?;

while let Some(stream) = streams.next().await {
// Spawn a background task serving this connection.
let stream = Arc::new(stream);
spawn(async move {
if let Err(err) = async_h1::accept(stream, serve).await {
println!("Connection error: {:#?}", err);
}
})
.detach();
let mut res = Response::new(StatusCode::Ok);
res.insert_header("Content-Type", "text/plain");
res.set_body(DATA);
Ok(res)
}

Ok(())
}
/// Listens for incoming connections and serves them.
async fn listen(listener: Handle<TcpListener>) -> Result<()> {
// Format the full host address.
let host = format!("http://{}", listener.get_ref().local_addr()?);
println!("Listening on {}", host);

let mut streams = listener.accept_multi().await?;

while let Some(stream) = streams.next().await {
// Spawn a background task serving this connection.
let stream = Arc::new(stream);
spawn(async move {
if let Err(err) = async_h1::accept(stream, serve).await {
println!("Connection error: {:#?}", err);
}
})
.detach();
}

Ok(())
}

fn main() -> Result<()> {
spawn_blocking(|| drive(future::pending::<()>())).detach();

block_on(async {
Expand All @@ -59,3 +60,8 @@ fn main() -> Result<()> {
Ok(())
})
}

#[cfg(target_os = "macos")]
fn main() {
panic!("This example can only be run by IO_URING backend.");
}
72 changes: 72 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@

///
/// Nuclei's proactor configuration.
#[derive(Clone, Debug, Default)]
pub struct NucleiConfig {
/// **IO_URING Configuration** allows you to configure [io_uring](https://unixism.net/loti/what_is_io_uring.html) backend.
pub iouring: IoUringConfiguration
}

/// **IO_URING Configuration**
/// 5.19+ Kernel is required to operate IO_URING.
#[derive(Clone, Debug)]
pub struct IoUringConfiguration {
/// Allowed entries in both submission and completion queues.
///
/// **[default]**: By default this value is 2048.
pub queue_len: u32,
/// SQPOLL kernel thread wake up interval.
///
/// Before version 5.11 of the Linux kernel, to successfully use this feature, the application
/// must register a set of files to be used for IO through io_uring_register(2) using the
/// `IORING_REGISTER_FILES` opcode. Failure to do so will result in submitted IO being errored
/// with EBADF. The presence of this feature can be detected by the `IORING_FEAT_SQPOLL_NONFIXED`
/// feature flag. In version 5.11 and later, it is no longer necessary to register files to use
/// this feature. 5.11 also allows using this as non-root, if the user has the CAP_SYS_NICE
/// capability. In 5.13 this requirement was also relaxed, and no special privileges are needed
/// for SQPOLL in newer kernels. Certain stable kernels older than 5.13 may also support
/// unprivileged SQPOLL.
///
/// Decreasing this will put more pressure to kernel, increases cpu usage.
/// Increasing it will slow down completion push rate from kernel.
/// This config is in milliseconds.
///
/// **[default]**: If [None] then SQPOLL will be disabled.
pub sqpoll_wake_interval: Option<u32>,

/// Get and/or set the limit for number of io_uring worker threads per NUMA
/// node. `per_numa_bounded_worker_count` holds the limit for bounded workers,
/// which process I/O operations expected to be bound in time,
/// that is I/O on regular files or block devices. Passing `0` does not change
/// the current limit.
///
/// **[default]**: If [None] then Nuclei default value of will be used `256`.
///
/// If [None] passed, by default, the amount of bounded IO workers is limited to how
/// many SQ entries the ring was setup with, or 4 times the number of
/// online CPUs in the system, whichever is smaller.
pub per_numa_bounded_worker_count: Option<u32>,

/// This `per_numa_unbounded_worker_count` holds the limit for unbounded workers,
/// which carry out I/O operations that can never complete, for instance I/O
/// on sockets. Passing `0` does not change the current limit.
///
/// **[default]**: If [None] then Nuclei default value of will be used `512`.
///
/// If [None] passed unbounded workers will be limited by the process task limit,
/// as indicated by the rlimit [RLIMIT_NPROC](https://man7.org/linux/man-pages/man2/getrlimit.2.html) limit.
pub per_numa_unbounded_worker_count: Option<u32>,

// XXX: `redrive_kthread_wake` = bool, syncs queue changes so kernel threads got awakened. increased cpu usage.
}

impl Default for IoUringConfiguration {
fn default() -> Self {
Self {
queue_len: 1 << 11,
sqpoll_wake_interval: Some(2),
per_numa_bounded_worker_count: Some(1 << 8),
per_numa_unbounded_worker_count: Some(1 << 9),
}
}
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ mod submission_handler;
mod sys;
mod utils;
mod waker;
/// Nuclei's configuration options reside here.
pub mod config;

#[cfg(not(any(
target_os = "linux", // epoll, iouring
Expand Down
39 changes: 33 additions & 6 deletions src/proactor.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use std::task::{Context, Poll};
use std::time::Duration;
use std::{future::Future, io};
use std::ops::DerefMut;

Check warning on line 4 in src/proactor.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-unknown-linux-gnu

unused import: `std::ops::DerefMut`

Check warning on line 4 in src/proactor.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-unknown-linux-gnu

unused import: `std::ops::DerefMut`

Check warning on line 4 in src/proactor.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-unknown-linux-gnu

unused import: `std::ops::DerefMut`

Check warning on line 4 in src/proactor.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-unknown-linux-gnu

unused import: `std::ops::DerefMut`

Check warning on line 4 in src/proactor.rs

View workflow job for this annotation

GitHub Actions / nightly - x86_64-unknown-linux-gnu

unused import: `std::ops::DerefMut`

Check warning on line 4 in src/proactor.rs

View workflow job for this annotation

GitHub Actions / nightly - x86_64-unknown-linux-gnu

unused import: `std::ops::DerefMut`

Check warning on line 4 in src/proactor.rs

View workflow job for this annotation

GitHub Actions / nightly - x86_64-unknown-linux-gnu

unused import: `std::ops::DerefMut`

Check warning on line 4 in src/proactor.rs

View workflow job for this annotation

GitHub Actions / nightly - x86_64-unknown-linux-gnu

unused import: `std::ops::DerefMut`

Check warning on line 4 in src/proactor.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-apple-darwin

unused import: `std::ops::DerefMut`

Check warning on line 4 in src/proactor.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-apple-darwin

unused import: `std::ops::DerefMut`

use once_cell::sync::Lazy;
use once_cell::sync::{Lazy, OnceCell};

Check warning on line 6 in src/proactor.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-unknown-linux-gnu

unused import: `Lazy`

Check warning on line 6 in src/proactor.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-unknown-linux-gnu

unused import: `Lazy`

Check warning on line 6 in src/proactor.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-unknown-linux-gnu

unused import: `Lazy`

Check warning on line 6 in src/proactor.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-unknown-linux-gnu

unused import: `Lazy`

Check warning on line 6 in src/proactor.rs

View workflow job for this annotation

GitHub Actions / nightly - x86_64-unknown-linux-gnu

unused import: `Lazy`

Check warning on line 6 in src/proactor.rs

View workflow job for this annotation

GitHub Actions / nightly - x86_64-unknown-linux-gnu

unused import: `Lazy`

Check warning on line 6 in src/proactor.rs

View workflow job for this annotation

GitHub Actions / nightly - x86_64-unknown-linux-gnu

unused import: `Lazy`

Check warning on line 6 in src/proactor.rs

View workflow job for this annotation

GitHub Actions / nightly - x86_64-unknown-linux-gnu

unused import: `Lazy`

Check warning on line 6 in src/proactor.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-apple-darwin

unused import: `Lazy`

Check warning on line 6 in src/proactor.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-apple-darwin

unused import: `Lazy`
use crate::config::NucleiConfig;

use super::syscore::*;
use super::waker::*;
use crate::spawn_blocking;
use crate::sys::IoBackend;

pub use super::handle::*;

Expand All @@ -16,13 +19,26 @@
unsafe impl Send for Proactor {}
unsafe impl Sync for Proactor {}

static mut PROACTOR: OnceCell<Proactor> = OnceCell::new();

impl Proactor {
/// Returns a reference to the proactor.
pub fn get() -> &'static Proactor {
static PROACTOR: Lazy<Proactor> =
Lazy::new(|| Proactor(SysProactor::new().expect("cannot initialize poll backend")));
unsafe {
&PROACTOR.get_or_init(|| {
Proactor(SysProactor::new(NucleiConfig::default()).expect("cannot initialize IO backend"))
})
}
}

&PROACTOR
/// Builds a proactor instance with given config and returns a reference to it.
pub fn with_config(config: NucleiConfig) -> &'static Proactor {
unsafe {
let proactor = Proactor(SysProactor::new(config.clone()).expect("cannot initialize IO backend"));
PROACTOR.set(proactor);

Check warning on line 38 in src/proactor.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-apple-darwin

unused `Result` that must be used

Check warning on line 38 in src/proactor.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-apple-darwin

unused `Result` that must be used
let proactor = Proactor(SysProactor::new(config).expect("cannot initialize IO backend"));
&PROACTOR.get_or_init(|| proactor)
}
}

/// Wakes the thread waiting on proactor.
Expand All @@ -35,14 +51,25 @@
self.0.wait(max_event_size, duration)
}

/// Get the IO backend that is used with Nuclei's proactor.
pub fn backend() -> IoBackend {
BACKEND
}

/// Get underlying proactor instance.
pub(crate) fn inner(&self) -> &SysProactor {
&self.0
}

#[cfg(all(feature = "iouring", target_os = "linux"))]
/// Get IO_URING backend probes
pub fn ring_params(&self) -> &rustix_uring::Parameters {
unsafe { IO_URING.as_ref().unwrap().params() }
}
}

///
/// IO driver that drives event systems
/// IO driver that drives underlying event systems
pub fn drive<T>(future: impl Future<Output = T>) -> T {
let p = Proactor::get();
let waker = waker_fn(move || {
Expand All @@ -69,4 +96,4 @@
// let _duration = Duration::from_millis(1);
let _ = driver.as_mut().poll(cx);
}
}
}
11 changes: 11 additions & 0 deletions src/sys.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
///
/// Backends that are possible to use with Nuclei
pub enum IoBackend {
/// BSD-like backend
Kqueue,
/// Linux backend
Epoll,
/// Linux backend
IoUring
}

#[cfg(unix)]
fn check_err(res: libc::c_int) -> Result<libc::c_int, std::io::Error> {
if res == -1 {
Expand Down
3 changes: 2 additions & 1 deletion src/syscore/bsd/kqueue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ macro_rules! syscall {
use socket2::SockAddr;
use std::mem;
use std::os::unix::net::SocketAddr as UnixSocketAddr;
use crate::config::NucleiConfig;

fn max_len() -> usize {
// The maximum read limit on most posix-like systems is `SSIZE_MAX`,
Expand Down Expand Up @@ -151,7 +152,7 @@ pub struct SysProactor {
}

impl SysProactor {
pub(crate) fn new() -> io::Result<SysProactor> {
pub(crate) fn new(config: NucleiConfig) -> io::Result<SysProactor> {
let kqueue_fd = kqueue()?;
syscall!(fcntl(kqueue_fd, libc::F_SETFD, libc::FD_CLOEXEC))?;
let (read_stream, write_stream) = UnixStream::pair()?;
Expand Down
2 changes: 2 additions & 0 deletions src/syscore/bsd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ pub(crate) use fs::*;
pub(crate) use kqueue::*;

pub(crate) use processor::*;

pub const BACKEND: crate::sys::IoBackend = crate::sys::IoBackend::Kqueue;
3 changes: 2 additions & 1 deletion src/syscore/linux/epoll/epoll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ macro_rules! syscall {
use socket2::SockAddr;
use std::mem;
use std::os::unix::net::SocketAddr as UnixSocketAddr;
use crate::config::NucleiConfig;

fn max_len() -> usize {
// The maximum read limit on most posix-like systems is `SSIZE_MAX`,
Expand Down Expand Up @@ -148,7 +149,7 @@ pub struct SysProactor {
}

impl SysProactor {
pub(crate) fn new() -> io::Result<SysProactor> {
pub(crate) fn new(config: NucleiConfig) -> io::Result<SysProactor> {
let epoll_fd: i32 = epoll_create1()?;
let event_fd: i32 = syscall!(eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK))?;
let event_fd_raw = event_fd;
Expand Down
2 changes: 2 additions & 0 deletions src/syscore/linux/epoll/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ pub(crate) use epoll::*;
pub(crate) use fs::*;

pub(crate) use processor::*;

pub const BACKEND: crate::sys::IoBackend = crate::sys::IoBackend::Epoll;
23 changes: 14 additions & 9 deletions src/syscore/linux/iouring/iouring.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use core::mem::MaybeUninit;
use futures::channel::oneshot;

Check warning on line 2 in src/syscore/linux/iouring/iouring.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-unknown-linux-gnu

unused import: `futures::channel::oneshot`

Check warning on line 2 in src/syscore/linux/iouring/iouring.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-unknown-linux-gnu

unused import: `futures::channel::oneshot`

Check warning on line 2 in src/syscore/linux/iouring/iouring.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-unknown-linux-gnu

unused import: `futures::channel::oneshot`

Check warning on line 2 in src/syscore/linux/iouring/iouring.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-unknown-linux-gnu

unused import: `futures::channel::oneshot`

Check warning on line 2 in src/syscore/linux/iouring/iouring.rs

View workflow job for this annotation

GitHub Actions / nightly - x86_64-unknown-linux-gnu

unused import: `futures::channel::oneshot`

Check warning on line 2 in src/syscore/linux/iouring/iouring.rs

View workflow job for this annotation

GitHub Actions / nightly - x86_64-unknown-linux-gnu

unused import: `futures::channel::oneshot`

Check warning on line 2 in src/syscore/linux/iouring/iouring.rs

View workflow job for this annotation

GitHub Actions / nightly - x86_64-unknown-linux-gnu

unused import: `futures::channel::oneshot`

Check warning on line 2 in src/syscore/linux/iouring/iouring.rs

View workflow job for this annotation

GitHub Actions / nightly - x86_64-unknown-linux-gnu

unused import: `futures::channel::oneshot`
use lever::sync::prelude::*;
use pin_utils::unsafe_pinned;
use ahash::{HashMap, HashMapExt};
Expand Down Expand Up @@ -33,6 +33,7 @@
use rustix::io_uring::IoringOp;
use rustix_uring::{CompletionQueue, IoUring, SubmissionQueue, Submitter, squeue::Entry as SQEntry, cqueue::Entry as CQEntry};
use rustix_uring::cqueue::{more, sock_nonempty};
use crate::config::NucleiConfig;

fn max_len() -> usize {
// The maximum read limit on most posix-like systems is `SSIZE_MAX`,
Expand Down Expand Up @@ -136,8 +137,6 @@
//// uring impl
///////////////////

const QUEUE_LEN: u32 = 1 << 11;

pub struct SysProactor {
sq: TTas<SubmissionQueue<'static>>,
cq: TTas<CompletionQueue<'static>>,
Expand All @@ -152,26 +151,32 @@
Submitter<'static>,
);

static mut IO_URING: Option<IoUring> = None;
pub(crate) static mut IO_URING: Option<IoUring> = None;

impl SysProactor {
pub(crate) fn new() -> io::Result<SysProactor> {
pub(crate) fn new(config: NucleiConfig) -> io::Result<SysProactor> {
unsafe {
let ring = IoUring::builder()
.setup_sqpoll(2)
.build(QUEUE_LEN)
let mut rb = IoUring::builder();
config.iouring.sqpoll_wake_interval.map(|e| rb.setup_sqpoll(e));
let mut ring = rb.build(config.iouring.queue_len)
.expect("nuclei: uring can't be initialized");

IO_URING = Some(ring);

let (submitter, sq, cq) = IO_URING.as_mut().unwrap().split();
submitter.register_iowq_max_workers(&mut [QUEUE_LEN*8, 16])?;

match (config.iouring.per_numa_bounded_worker_count, config.iouring.per_numa_unbounded_worker_count) {
(Some(bw), Some(ubw)) => submitter.register_iowq_max_workers(&mut [bw, ubw])?,
(None, Some(ubw)) => submitter.register_iowq_max_workers(&mut [0, ubw])?,
(Some(bw), None) => submitter.register_iowq_max_workers(&mut [bw, 0])?,
(None, None) => submitter.register_iowq_max_workers(&mut [0, 0])?,
}

Ok(SysProactor {
sq: TTas::new(sq),
cq: TTas::new(cq),
sbmt: TTas::new(submitter),
submitters: TTas::new(HashMap::with_capacity(QUEUE_LEN as usize)),
submitters: TTas::new(HashMap::with_capacity(config.iouring.queue_len as usize)),
submitter_id: AtomicU64::default(),
})
}
Expand Down
2 changes: 2 additions & 0 deletions src/syscore/linux/iouring/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ pub(crate) use fs::*;
pub(crate) use iouring::*;
pub(crate) use nethandle::*;
pub(crate) use processor::*;

pub const BACKEND: crate::sys::IoBackend = crate::sys::IoBackend::IoUring;
Loading