diff --git a/Cargo.toml b/Cargo.toml index b7b054b..8209d33 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,3 @@ default-members = [ "mfio-netfs", ] -[patch.crates-io] -tracy-client = { git = "https://github.com/h33p/rust_tracy_client", branch = "fibers" } -tracing-tracy = { git = "https://github.com/h33p/rust_tracy_client", branch = "fibers" } diff --git a/mfio-netfs/Cargo.toml b/mfio-netfs/Cargo.toml index f7812b9..2f5e3f4 100644 --- a/mfio-netfs/Cargo.toml +++ b/mfio-netfs/Cargo.toml @@ -4,14 +4,6 @@ version = "0.1.0" rust-version = "1.72" edition = "2021" -[lib] -bench = false - -[[bench]] -name = "main" -harness = false -required-features = ["mfio/tokio"] - [[example]] name = "trace" required-features = ["tokio/net"] diff --git a/mfio-netfs/benches/main.rs b/mfio-netfs/benches/main.rs deleted file mode 100644 index c048680..0000000 --- a/mfio-netfs/benches/main.rs +++ /dev/null @@ -1,330 +0,0 @@ -use core::mem::MaybeUninit; -use criterion::*; -use futures::stream::{FuturesUnordered, StreamExt}; -#[cfg(unix)] -use mfio::backend::integrations::tokio::Tokio; -use mfio::backend::*; -use mfio::traits::*; -use mfio_rt::*; -use rand::prelude::*; -use std::fs::{write, File}; -use std::path::Path; -use std::time::{Duration, Instant}; - -#[no_mangle] -static mut FH: *const mfio::stdeq::Seekable = core::ptr::null(); - -const MB: usize = 0x10000; -const SPARSE: usize = 256 * 4; - -async fn fread( - file: &impl IoRead, - i: usize, - num_chunks: usize, - order: &[usize], - mut b: Vec>, -) -> Vec> { - file.read_all( - (order[i % order.len()] * SPARSE % num_chunks) as u64, - &mut b[..], - ) - .await - .unwrap(); - b -} - -async fn mfio_bench( - fs: &impl Fs, - size: usize, - iters: u64, - order: &[usize], - temp_path: &Path, -) -> Duration { - let mut iters = iters as usize; - - let num_chunks = MB / size; - let bufs = vec![vec![MaybeUninit::uninit(); size]; num_chunks]; - - let file = fs - .open(temp_path, OpenOptions::new().read(true)) - .await - .unwrap(); - - let _elapsed = Duration::default(); - - let mut futures = FuturesUnordered::new(); - let mut i = 0; - - for b in bufs.into_iter().take(iters) { - i += 1; - iters -= 1; - futures.push(fread(&file, i, num_chunks, order, b)); - } - - let now = Instant::now(); - - loop { - futures::select! { - b = futures.next() => { - if let Some(b) = b { - if iters > 0 { - i += 1; - iters -= 1; - futures.push(fread(&file, i, num_chunks, order, b)); - } - } else { - break; - } - }, - } - } - - assert_eq!(iters, 0); - - now.elapsed() -} - -#[tracing::instrument(skip_all)] -fn file_read(c: &mut Criterion) { - env_logger::init(); - - let mut group = c.benchmark_group("File Read"); - - let plot_config = PlotConfiguration::default().summary_scale(AxisScale::Logarithmic); - - group.plot_config(plot_config); - - let test_buf = &(0..(MB * SPARSE)) - .map(|i| (i % 256) as u8) - .collect::>(); - let mut temp_path = std::path::PathBuf::from("."); - temp_path.push("mfio-bench"); - let temp_path = &temp_path; - - let sizes = [/* 16, */ 64, 256, 1024, 4096, 16384, 65536]; - - let mut rng = rand::thread_rng(); - let mut order = (0..MB).step_by(sizes[0]).collect::>(); - order.shuffle(&mut rng); - let order = ℴ - - let drop_cache = |path: &Path| { - std::process::Command::new("/usr/bin/env") - .args([ - "dd", - &format!("if={}", path.to_str().unwrap()), - "iflag=nocache", - "count=0", - ]) - .output() - }; - - write(temp_path, test_buf).unwrap(); - - #[cfg(unix)] - for size in sizes { - group.throughput(Throughput::Bytes(size as u64)); - - let addr: std::net::SocketAddr = "127.0.0.1:54321".parse().unwrap(); - - group.bench_function(BenchmarkId::new("tcp-tokio", size), |b| { - b.to_async(tokio::runtime::Runtime::new().unwrap()) - .iter_custom(|iters| async move { - //println!("Do {iters}"); - let _num_chunks = MB / size; - //let mut bufs = vec![vec![MaybeUninit::uninit(); size]; num_chunks]; - - use tokio::io::{AsyncReadExt, AsyncWriteExt}; - use tokio::io::{BufReader, BufWriter}; - use tokio::net::{TcpListener, TcpStream}; - - let listener = TcpListener::bind(addr).await.unwrap(); - - let listen = async move { - let (mut client, _) = listener.accept().await.unwrap(); - - let buf = vec![0; size]; - - let (recv, mut send) = client.split(); - let mut recv = BufReader::new(recv); - //let mut send = BufWriter::new(send); - - while let Ok(_v) = recv.read_u8().await { - send.write_all(&buf).await.unwrap(); - } - send.flush().await.unwrap(); - }; - - let listen = tokio::spawn(listen); - - let client = async { - let mut client = TcpStream::connect(addr).await.unwrap(); - let start = Instant::now(); - let (recv, send) = client.split(); - - let mut recv = BufReader::new(recv); - let mut send = BufWriter::new(send); - - let send = async move { - for _ in 0..iters { - send.write_all(&[0u8]).await.unwrap(); - } - let _ = send.flush().await; - }; - - let recv = async move { - let mut buf = vec![0; size]; - for _ in 0..iters { - recv.read_exact(&mut buf).await.unwrap(); - } - }; - - futures::join!(send, recv); - start.elapsed() - }; - - futures::join!(listen, client).1 - }); - }); - } - - for size in sizes { - group.throughput(Throughput::Bytes(size as u64)); - - let addr: std::net::SocketAddr = "127.0.0.1:54321".parse().unwrap(); - - group.bench_function(BenchmarkId::new("mfio-net", size), |b| { - b.iter_custom(|iters| { - drop_cache(temp_path).unwrap(); - - //println!("Create thing"); - - let (server, addr) = mfio_netfs::single_client_server(addr); - let fs = mfio_netfs::NetworkFs::try_new(addr).unwrap(); - - let elapsed = fs.block_on(mfio_bench(&fs, size, iters, order, temp_path)); - - core::mem::drop(fs); - server.join().unwrap(); - - elapsed - }); - }); - } - - for size in sizes { - group.throughput(Throughput::Bytes(size as u64)); - - group.bench_function(BenchmarkId::new("mfio", size), |b| { - b.iter_custom(|iters| { - drop_cache(temp_path).unwrap(); - - let _elapsed = Duration::default(); - - NativeRt::default().run(|fs| mfio_bench(fs, size, iters, order, temp_path)) - }); - }); - } - - #[cfg(unix)] - for size in sizes { - group.throughput(Throughput::Bytes(size as u64)); - - group.bench_function(BenchmarkId::new("mfio-tokio", size), |b| { - b.to_async(tokio::runtime::Runtime::new().unwrap()) - .iter_custom(|iters| async move { - drop_cache(temp_path).unwrap(); - - let mut fs = NativeRt::default(); - - Tokio::run_with_mut(&mut fs, |fs| mfio_bench(fs, size, iters, order, temp_path)) - .await - }); - }); - } - - for size in sizes { - use tokio::fs::*; - use tokio::io::*; - - group.throughput(Throughput::Bytes(size as u64)); - - group.bench_function(BenchmarkId::new("tokio", size), |b| { - b.to_async(tokio::runtime::Runtime::new().unwrap()) - .iter_custom(|mut iters| async move { - let num_chunks = MB / size; - let mut bufs = vec![vec![0u8; size]; num_chunks]; - - drop_cache(temp_path).unwrap(); - - let mut file = File::open(temp_path).await.unwrap(); - - let mut elapsed = Duration::default(); - - while iters > 0 { - let start = Instant::now(); - - for (i, b) in order.iter().take(iters as _).copied().zip(bufs.iter_mut()) { - file.seek(SeekFrom::Start((i * SPARSE) as u64)) - .await - .unwrap(); - file.read_exact(&mut b[..]).await.unwrap(); - } - - elapsed += start.elapsed(); - - iters = iters.saturating_sub(num_chunks as _); - } - - elapsed - }); - }); - } - - for size in sizes { - group.throughput(Throughput::Bytes(size as u64)); - - group.bench_function(BenchmarkId::new("std", size), |b| { - b.iter_custom(|mut iters| { - use std::io::{Read, Seek, SeekFrom}; - let num_chunks = MB / size; - let mut bufs = vec![vec![0u8; size]; num_chunks]; - - drop_cache(temp_path).unwrap(); - - let mut elapsed = Duration::default(); - - let mut file = File::open(temp_path).unwrap(); - - while iters > 0 { - file.rewind().unwrap(); - - let start = Instant::now(); - - for (i, b) in order.iter().take(iters as _).copied().zip(bufs.iter_mut()) { - file.seek(SeekFrom::Start((i * SPARSE) as u64)).unwrap(); - file.read_exact(&mut b[..]).unwrap(); - } - - elapsed += start.elapsed(); - - iters = iters.saturating_sub(num_chunks as _); - } - - elapsed - }); - }); - } -} - -criterion_group! { - name = benches; - config = Criterion::default() - //.plotting_backend(PlottingBackend::Plotters) - .with_plots() - .warm_up_time(std::time::Duration::from_millis(1000)) - .measurement_time(std::time::Duration::from_millis(5000)); - targets = - file_read, -} -criterion_main!(benches); diff --git a/mfio-rt/Cargo.toml b/mfio-rt/Cargo.toml index d5cd406..31edf8e 100644 --- a/mfio-rt/Cargo.toml +++ b/mfio-rt/Cargo.toml @@ -8,14 +8,6 @@ license = "MIT" repository = "https://github.com/memflow/mfio" description = "mfio based async runtime" -[lib] -bench = false - -[[bench]] -name = "main" -harness = false -required-features = ["mfio/tokio", "mfio/async-io"] - [dependencies] mfio = { version = "0.1", path = "../mfio", default-features = false } futures = { version = "0.3", default-features = false, features = ["async-await"] } @@ -56,7 +48,6 @@ rand = "0.8" flume = "0.10" smol = "1" env_logger = "0.10" -tracing-tracy = { version = "0.10", features = ["fibers", "only-localhost"] } tempdir = "0.3" pathdiff = "0.2" async-semaphore = "1" diff --git a/mfio-rt/benches/main.rs b/mfio-rt/benches/main.rs deleted file mode 100644 index 6e93e5e..0000000 --- a/mfio-rt/benches/main.rs +++ /dev/null @@ -1,447 +0,0 @@ -use core::mem::MaybeUninit; -use criterion::async_executor::*; -use criterion::*; -#[cfg(unix)] -use mfio::backend::integrations::{async_io::AsyncIo, tokio::Tokio}; -use mfio::backend::*; -use mfio::traits::*; -use mfio_rt::*; -use rand::prelude::*; -use std::fs::{write, File}; -use std::path::Path; -use std::time::{Duration, Instant}; - -struct PollsterExecutor; - -impl AsyncExecutor for PollsterExecutor { - fn block_on(&self, fut: impl core::future::Future) -> T { - pollster::block_on(fut) - } -} - -#[no_mangle] -static mut FH: *const mfio::stdeq::Seekable = core::ptr::null(); - -fn file_read(c: &mut Criterion) { - env_logger::init(); - - let mut group = c.benchmark_group("File Read"); - - let plot_config = PlotConfiguration::default().summary_scale(AxisScale::Logarithmic); - - group.plot_config(plot_config); - - const MB: usize = 0x10000; - const SPARSE: usize = 256 * 4; - - let test_buf = &(0..(MB * SPARSE)) - .map(|i| (i % 256) as u8) - .collect::>(); - let mut temp_path = std::path::PathBuf::from("."); - temp_path.push("mfio-bench"); - let temp_path = &temp_path; - - let sizes = [/* 16, */ 64, 256, 1024, 4096, 16384, 65536]; - - let mut rng = rand::thread_rng(); - let mut order = (0..MB).step_by(sizes[0]).collect::>(); - order.shuffle(&mut rng); - let order = ℴ - - let drop_cache = |path: &Path| { - std::process::Command::new("/usr/bin/env") - .args([ - "dd", - &format!("if={}", path.to_str().unwrap()), - "iflag=nocache", - "count=0", - ]) - .output() - }; - - write(temp_path, test_buf).unwrap(); - - for size in sizes { - group.throughput(Throughput::Bytes(size as u64)); - - group.bench_function(BenchmarkId::new("mfio", size), |b| { - b.iter_custom(|mut iters| { - let num_chunks = MB / size; - let mut bufs = vec![vec![MaybeUninit::uninit(); size]; num_chunks]; - - drop_cache(temp_path).unwrap(); - - let mut elapsed = Duration::default(); - - NativeRt::default().run(|fs| async move { - let file = fs - .open(temp_path, OpenOptions::new().read(true)) - .await - .unwrap(); - unsafe { FH = &file as *const _ }; - - while iters > 0 { - let mut output = vec![]; - output.reserve(num_chunks); - - let start = Instant::now(); - - for (i, b) in order.iter().take(iters as _).copied().zip(bufs.iter_mut()) { - // Issue a direct read @ here, because we want to queue up multiple - // reads and have them all finish concurrently. - let fut = file.read_all((i * SPARSE) as u64, &mut b[..]); - let fut = async move { - fut.await.unwrap(); - }; - output.push(fut); - } - - let _ = futures::future::join_all(output).await; - - elapsed += start.elapsed(); - - iters = iters.saturating_sub(num_chunks as _); - } - - elapsed - }) - }); - }); - } - - #[cfg(unix)] - for size in sizes { - group.throughput(Throughput::Bytes(size as u64)); - - group.bench_function(BenchmarkId::new("mfio-tokio", size), |b| { - b.to_async(tokio::runtime::Runtime::new().unwrap()) - .iter_custom(|mut iters| async move { - let num_chunks = MB / size; - let mut bufs = vec![vec![MaybeUninit::uninit(); size]; num_chunks]; - - drop_cache(temp_path).unwrap(); - - let mut elapsed = Duration::default(); - - let mut fs = NativeRt::default(); - - Tokio::run_with_mut(&mut fs, |fs| async move { - let file = fs - .open(temp_path, OpenOptions::new().read(true)) - .await - .unwrap(); - unsafe { FH = &file as *const _ }; - - while iters > 0 { - let mut output = vec![]; - output.reserve(num_chunks); - - let start = Instant::now(); - - for (i, b) in - order.iter().take(iters as _).copied().zip(bufs.iter_mut()) - { - // Issue a direct read @ here, because we want to queue up multiple - // reads and have them all finish concurrently. - let fut = file.read_all((i * SPARSE) as u64, &mut b[..]); - output.push(fut); - } - - let _ = futures::future::join_all(output).await; - - elapsed += start.elapsed(); - - iters = iters.saturating_sub(num_chunks as _); - } - - elapsed - }) - .await - }); - }); - } - - #[cfg(unix)] - for size in sizes { - group.throughput(Throughput::Bytes(size as u64)); - - group.bench_function(BenchmarkId::new("mfio-smol", size), |b| { - b.to_async(SmolExecutor) - .iter_custom(|mut iters| async move { - let num_chunks = MB / size; - let mut bufs = vec![vec![MaybeUninit::uninit(); size]; num_chunks]; - - drop_cache(temp_path).unwrap(); - - let mut elapsed = Duration::default(); - - let mut fs = NativeRt::default(); - - AsyncIo::run_with_mut(&mut fs, |fs| async move { - let file = fs - .open(temp_path, OpenOptions::new().read(true)) - .await - .unwrap(); - unsafe { FH = &file as *const _ }; - - while iters > 0 { - let mut output = vec![]; - output.reserve(num_chunks); - - let start = Instant::now(); - - for (i, b) in - order.iter().take(iters as _).copied().zip(bufs.iter_mut()) - { - // Issue a direct read @ here, because we want to queue up multiple - // reads and have them all finish concurrently. - let fut = file.read_all((i * SPARSE) as u64, &mut b[..]); - output.push(fut); - } - - let _ = futures::future::join_all(output).await; - - elapsed += start.elapsed(); - - iters = iters.saturating_sub(num_chunks as _); - } - - elapsed - }) - .await - }); - }); - } - #[cfg(target_os = "linux")] - for size in sizes { - use glommio::io::BufferedFile; - use glommio::LocalExecutor; - - struct GlommioExecutor(LocalExecutor); - - impl<'a> AsyncExecutor for &'a GlommioExecutor { - fn block_on(&self, fut: impl core::future::Future) -> T { - self.0.run(fut) - } - } - - let glommio = GlommioExecutor(LocalExecutor::default()); - - group.throughput(Throughput::Bytes(size as u64)); - - group.bench_function(BenchmarkId::new("glommio", size), |b| { - b.to_async(&glommio).iter_custom(|mut iters| async move { - let num_chunks = MB / size; - let mut bufs = vec![vec![0u8; size]; num_chunks]; - - drop_cache(temp_path).unwrap(); - - let file = &BufferedFile::open(temp_path).await.unwrap(); - - let mut elapsed = Duration::default(); - - while iters > 0 { - let mut output = vec![]; - output.reserve(num_chunks); - - let start = Instant::now(); - - for (i, b) in order.iter().take(iters as _).copied().zip(bufs.iter_mut()) { - let comp = async move { - let res = file.read_at((i * SPARSE) as u64, b.len()).await.unwrap(); - b.copy_from_slice(&res[..]); - }; - output.push(comp); - } - - let _ = futures::future::join_all(output).await; - - elapsed += start.elapsed(); - - iters = iters.saturating_sub(num_chunks as _); - } - - elapsed - }); - }); - } - - /*#[cfg(unix)] - for size in sizes { - use futures::AsyncReadExt; - use futures::AsyncSeekExt; - use nuclei::Handle; - use std::io::SeekFrom; - - struct NucleiExecutor; - - impl AsyncExecutor for NucleiExecutor { - fn block_on(&self, fut: impl core::future::Future) -> T { - nuclei::drive(fut) - } - } - - group.throughput(Throughput::Bytes(size as u64)); - - group.bench_function(BenchmarkId::new("nuclei", size), |b| { - b.to_async(NucleiExecutor) - .iter_custom(|mut iters| async move { - let num_chunks = MB / size; - let mut bufs = vec![vec![0u8; size]; num_chunks]; - - drop_cache(temp_path).unwrap(); - - let file = File::open(temp_path).unwrap(); - let file = &mut Handle::::new(file).unwrap(); - - let mut elapsed = Duration::default(); - - while iters > 0 { - let start = Instant::now(); - - for (i, b) in order.iter().take(iters as _).copied().zip(bufs.iter_mut()) { - file.seek(SeekFrom::Start((i * SPARSE) as u64)) - .await - .unwrap(); - file.read_exact(&mut b[..]).await.unwrap(); - } - - elapsed += start.elapsed(); - - iters = iters.saturating_sub(num_chunks as _); - } - - elapsed - }); - }); - }*/ - - for size in sizes { - use tokio::fs::*; - use tokio::io::*; - - group.throughput(Throughput::Bytes(size as u64)); - - group.bench_function(BenchmarkId::new("tokio", size), |b| { - b.to_async(tokio::runtime::Runtime::new().unwrap()) - .iter_custom(|mut iters| async move { - let num_chunks = MB / size; - let mut bufs = vec![vec![0u8; size]; num_chunks]; - - drop_cache(temp_path).unwrap(); - - let mut file = File::open(temp_path).await.unwrap(); - - let mut elapsed = Duration::default(); - - while iters > 0 { - let start = Instant::now(); - - for (i, b) in order.iter().take(iters as _).copied().zip(bufs.iter_mut()) { - file.seek(SeekFrom::Start((i * SPARSE) as u64)) - .await - .unwrap(); - file.read_exact(&mut b[..]).await.unwrap(); - } - - elapsed += start.elapsed(); - - iters = iters.saturating_sub(num_chunks as _); - } - - elapsed - }); - }); - } - - for size in sizes { - group.throughput(Throughput::Bytes(size as u64)); - - group.bench_function(BenchmarkId::new("std", size), |b| { - b.to_async(PollsterExecutor) - .iter_custom(|mut iters| async move { - use std::io::{Read, Seek, SeekFrom}; - let num_chunks = MB / size; - let mut bufs = vec![vec![0u8; size]; num_chunks]; - - drop_cache(temp_path).unwrap(); - - let mut elapsed = Duration::default(); - - let mut file = File::open(temp_path).unwrap(); - - while iters > 0 { - file.rewind().unwrap(); - - let start = Instant::now(); - - for (i, b) in order.iter().take(iters as _).copied().zip(bufs.iter_mut()) { - file.seek(SeekFrom::Start((i * SPARSE) as u64)).unwrap(); - file.read_exact(&mut b[..]).unwrap(); - } - - elapsed += start.elapsed(); - - iters = iters.saturating_sub(num_chunks as _); - } - - elapsed - }); - }); - } - - #[cfg(target_os = "linux")] - for size in sizes { - group.throughput(Throughput::Bytes(size as u64)); - - let ring = &rio::new().unwrap(); - - group.bench_function(BenchmarkId::new("rio", size), |b| { - b.to_async(PollsterExecutor) - .iter_custom(|mut iters| async move { - let num_chunks = MB / size; - let mut bufs = vec![vec![0u8; size]; num_chunks]; - - drop_cache(temp_path).unwrap(); - - let file = File::open(temp_path).unwrap(); - - let mut elapsed = Duration::default(); - - while iters > 0 { - let mut output = vec![]; - output.reserve(num_chunks); - - let start = Instant::now(); - - for (i, b) in order.iter().take(iters as _).copied().zip(bufs.iter_mut()) { - let comp = ring.read_at(&file, b, (i * SPARSE) as u64); - output.push(comp); - } - - for comp in output { - comp.wait().unwrap(); - } - - elapsed += start.elapsed(); - - iters = iters.saturating_sub(num_chunks as _); - } - - elapsed - }); - }); - } -} - -criterion_group! { - name = benches; - config = Criterion::default() - //.plotting_backend(PlottingBackend::Plotters) - .with_plots() - .warm_up_time(std::time::Duration::from_millis(1000)) - .measurement_time(std::time::Duration::from_millis(5000)); - targets = - file_read, -} -criterion_main!(benches);