Skip to content

Commit

Permalink
Add virtual runtime (FS only)
Browse files Browse the repository at this point in the history
  • Loading branch information
h33p committed Nov 11, 2023
1 parent c5065f4 commit 8506d88
Show file tree
Hide file tree
Showing 13 changed files with 2,051 additions and 475 deletions.
14 changes: 9 additions & 5 deletions mfio-netfs/src/net/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,11 @@ impl DirHandle for NetworkFsDir {
///
/// This function accepts an absolute or relative path to a file for reading. If the path is
/// relative, it is opened relative to this `DirHandle`.
fn open_file<P: AsRef<Path>>(&self, path: P, options: OpenOptions) -> Self::OpenFileFuture<'_> {
fn open_file<'a, P: AsRef<Path> + ?Sized>(
&'a self,
path: &'a P,
options: OpenOptions,
) -> Self::OpenFileFuture<'a> {
OpenFileOp::make_future(
self,
FsRequest::OpenFile {
Expand All @@ -678,7 +682,7 @@ impl DirHandle for NetworkFsDir {
///
/// This function accepts an absolute or relative path to a directory for reading. If the path
/// is relative, it is opened relative to this `DirHandle`.
fn open_dir<P: AsRef<Path>>(&self, path: P) -> Self::OpenDirFuture<'_> {
fn open_dir<'a, P: AsRef<Path> + ?Sized>(&'a self, path: &'a P) -> Self::OpenDirFuture<'a> {
OpenDirOp::make_future(
self,
FsRequest::OpenDir {
Expand All @@ -687,7 +691,7 @@ impl DirHandle for NetworkFsDir {
)
}

fn metadata<P: AsRef<Path>>(&self, path: P) -> Self::MetadataFuture<'_> {
fn metadata<'a, P: AsRef<Path> + ?Sized>(&'a self, path: &'a P) -> Self::MetadataFuture<'a> {
MetadataOp::make_future(
self,
FsRequest::Metadata {
Expand All @@ -699,8 +703,8 @@ impl DirHandle for NetworkFsDir {
/// Do an operation.
///
/// This function performs an operation from the [`DirOp`](DirOp) enum.
fn do_op<P: AsRef<Path>>(&self, operation: DirOp<P>) -> Self::OpFuture<'_> {
OpOp::make_future(self, FsRequest::DirOp(operation.as_path().into_string()))
fn do_op<'a, P: AsRef<Path> + ?Sized>(&'a self, operation: DirOp<&'a P>) -> Self::OpFuture<'a> {
OpOp::make_future(self, FsRequest::DirOp(operation.into_string()))
}
}

Expand Down
33 changes: 23 additions & 10 deletions mfio-netfs/src/net/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ async fn run_server(stream: NativeTcpStream, fs: &NativeRt) {
}
FsRequest::OpenDir { path } => {
trace!("Open dir {path}");
let dir_id = match dh.open_dir(path).await {
let dir_id = match dh.open_dir(&path).await {
Ok(dir) => {
let dir_id = dir_handles
.borrow_mut()
Expand Down Expand Up @@ -507,15 +507,15 @@ async fn run_server(stream: NativeTcpStream, fs: &NativeRt) {
FsRequest::Metadata { path } => {
trace!("Metadata {path}");
let metadata = dh
.metadata(path)
.metadata(&path)
.await
.map_err(Error::into_int_err);
FsResponse::Metadata { metadata }
}
FsRequest::DirOp(op) => {
trace!("Do dir op");
FsResponse::DirOp(
dh.do_op(op)
dh.do_op(op.as_path())
.await
.map_err(Error::into_int_err)
.err(),
Expand Down Expand Up @@ -568,12 +568,13 @@ async fn run_server(stream: NativeTcpStream, fs: &NativeRt) {
l1.await;
}

pub fn single_client_server(addr: SocketAddr) -> (std::thread::JoinHandle<()>, SocketAddr) {
fn single_client_server_with(
addr: SocketAddr,
fs: NativeRt,
) -> (std::thread::JoinHandle<()>, SocketAddr) {
let (tx, rx) = flume::bounded(1);

let ret = std::thread::spawn(move || {
let fs = NativeRt::default();

fs.block_on(async {
let mut listener = fs.bind(addr).await.unwrap();
let _ = tx.send_async(listener.local_addr().unwrap()).await;
Expand All @@ -589,6 +590,10 @@ pub fn single_client_server(addr: SocketAddr) -> (std::thread::JoinHandle<()>, S
(ret, addr)
}

pub fn single_client_server(addr: SocketAddr) -> (std::thread::JoinHandle<()>, SocketAddr) {
single_client_server_with(addr, NativeRt::default())
}

pub async fn server_bind(fs: &NativeRt, bind_addr: SocketAddr) {
let listener = fs.bind(bind_addr).await.unwrap();
server(fs, listener).await
Expand Down Expand Up @@ -663,13 +668,18 @@ mod tests {
server.join().unwrap();
}

mfio_rt::test_suite!(tests, |closure| {
mfio_rt::test_suite!(tests, |test_name, closure| {
let _ = ::env_logger::builder().is_test(true).try_init();
use super::{single_client_server, NetworkFs, SocketAddr};
use super::{single_client_server_with, NetworkFs, SocketAddr};
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let (server, addr) = single_client_server(addr);

let mut rt = mfio_rt::NativeRt::default();
let dir = TempDir::new(test_name).unwrap();
rt.set_cwd(dir.path().to_path_buf());
let (server, addr) = single_client_server_with(addr, rt);

let rt = mfio_rt::NativeRt::default();

let mut rt = NetworkFs::with_fs(addr, rt.into(), true).unwrap();

let fs = staticify(&mut rt);
Expand All @@ -681,7 +691,10 @@ mod tests {
fs.block_on(func(fs))
}

run(fs, closure);
run(fs, move |rt| {
let run = TestRun::new(rt, dir);
closure(run)
});

core::mem::drop(rt);

Expand Down
19 changes: 11 additions & 8 deletions mfio-rt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@ required-features = ["mfio/tokio", "mfio/async-io"]

[dependencies]
mfio = { version = "0.1", path = "../mfio", default-features = false }
futures = { version = "0.3", default-features = false }
once_cell = { version = "1", default-features = false, optional = true }
futures = { version = "0.3", default-features = false, features = ["async-await"] }
once_cell = { version = "1", default-features = false }
log = "0.4"
serde = { version = "1", features = ["derive", "alloc"], default-features = false }

typed-path = { version = "0.7", default-features = false }

# native rt deps
tracing = { version = "0.1", optional = true }
flume = { version = "0.10", optional = true }
parking_lot = { version = "0.12", optional = true }
Expand All @@ -32,6 +35,7 @@ pathdiff = { version = "0.2", optional = true }
async-semaphore = { version = "1", optional = true }
slab = { version = "0.4", default-features = false }


[target.'cfg(windows)'.dependencies]
windows = { version = "0.51", features = ["Win32_System_IO", "Win32_Foundation", "Win32_System_WindowsProgramming", "Win32_Storage_FileSystem", "Win32_Networking_WinSock"] }
force-send-sync = "1"
Expand Down Expand Up @@ -60,18 +64,17 @@ async-semaphore = "1"
[target.'cfg(not(miri))'.dev-dependencies]
tokio = { version = "1.24", features = ["rt", "rt-multi-thread", "fs", "io-util"] }

#[target.'cfg(unix)'.dev-dependencies]
#nuclei = "0.2"

[target.'cfg(target_os = "linux")'.dev-dependencies]
rio = "0.9"
# We need git version to compile on alpine
glommio = { version = "0.8", git = "https://github.com/DataDog/glommio", rev = "517326bb2b63b6f6ddcf5deec7a283ee510f44df" }

[features]
default = ["mio", "io-uring", "iocp", "native", "std"]
native = ["once_cell", "oneshot", "parking_lot", "flume", "tracing", "std"]
std = ["mfio/std", "once_cell/std", "once_cell/parking_lot"]
default = ["mio", "io-uring", "iocp", "native", "std", "virt"]
native = ["oneshot", "parking_lot", "flume", "tracing", "std"]
virt = []
virt-sync = []
std = ["mfio/std", "once_cell/std"]
# technically iocp depends on native, but let's be in-line with other backends
iocp = []
test_suite = ["tempdir", "pathdiff", "async-semaphore"]
Expand Down
35 changes: 33 additions & 2 deletions mfio-rt/src/__doctest.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
#[cfg(all(any(miri, test, feature = "virt"), not(feature = "native")))]
use crate::virt::VirtRt;
#[cfg(feature = "native")]
use crate::NativeRt;
#[cfg(feature = "native")]
#[cfg(any(miri, test, feature = "native", feature = "virt"))]
use core::future::Future;
#[cfg(feature = "native")]
#[cfg(any(miri, test, feature = "native", feature = "virt"))]
use mfio::backend::IoBackend;

#[cfg(feature = "native")]
Expand All @@ -16,3 +18,32 @@ pub fn run_each<'a, Func: Fn(&'a NativeRt) -> F, F: Future>(func: Func) {
}
}
}

#[cfg(all(any(miri, test, feature = "virt"), not(feature = "native")))]
pub fn run_each<'a, Func: Fn(&'a VirtRt) -> F, F: Future>(func: Func) {
use crate::{DirHandle, Fs, OpenOptions};
use mfio::traits::*;

const FILES: &[(&str, &str)] = &[
("Cargo.toml", include_str!("../Cargo.toml")),
("src/lib.rs", include_str!("lib.rs")),
];

let fs = &VirtRt::new();

fs.block_on(async {
let cd = fs.current_dir();
cd.create_dir("src").await.unwrap();
for (p, data) in FILES {
let fh = cd
.open_file(p, OpenOptions::new().create_new(true).write(true))
.await
.unwrap();
fh.write_all(0, data.as_bytes()).await.unwrap();
}
});

// SAFETY: there isn't. The doctests shouldn't move the fs handle though.
let fs: &'a VirtRt = unsafe { &(*(fs as *const _)) };
fs.block_on(func(fs));
}
Loading

0 comments on commit 8506d88

Please sign in to comment.