Skip to content

Commit

Permalink
WIP soundness 4
Browse files Browse the repository at this point in the history
  • Loading branch information
h33p committed Oct 5, 2023
1 parent e6cd5a6 commit d2ff03f
Show file tree
Hide file tree
Showing 11 changed files with 770 additions and 634 deletions.
116 changes: 46 additions & 70 deletions mfio/benches/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use core::future::IntoFuture;
use core::mem::MaybeUninit;
use criterion::async_executor::*;
use criterion::measurement::Measurement;
use criterion::*;
use futures::{pin_mut, StreamExt};
use futures::StreamExt;
use mfio::backend::*;
use mfio::packet::*;
use mfio::io::*;
use std::cell::RefCell;
use std::time::{Duration, Instant};
use tokio::runtime::Runtime as TokioRuntime;
Expand Down Expand Up @@ -86,41 +85,6 @@ impl AsyncExecutor3 for TokioRuntime {
}
}

fn allocations(c: &mut Criterion) {
let mut group = c.benchmark_group("Allocations");

let plot_config = PlotConfiguration::default().summary_scale(AxisScale::Logarithmic);

group.plot_config(plot_config);

for size in [1, 4, 16, 64, 256, 1024, 4096, 16384, 65536] {
group.throughput(Throughput::Elements(size as u64));

group.bench_function(BenchmarkId::new("alloc", size), move |b| {
b.to_async(PollsterExecutor)
.iter_custom(move |iters| async move {
let mut scope = SampleIo::default();
Null::run_with_mut(&mut scope, |scope| async move {
let mut elapsed = Duration::default();
for _ in 0..iters {
let streams = (0..size)
.into_iter()
.map(|_| PacketIo::<Write, _>::new_id(scope))
.collect::<Vec<_>>();
let futures = futures::future::join_all(streams);

let start = Instant::now();
black_box(futures.await);
elapsed += start.elapsed();
}
elapsed
})
.await
})
});
}
}

fn singlestream_reads(c: &mut Criterion) {
let mut group = c.benchmark_group("Singlestream reads");

Expand Down Expand Up @@ -149,22 +113,30 @@ fn singlestream_reads(c: &mut Criterion) {
let handle = &RefCell::new(&mut handle);

b.to_async(T::executor()).iter_custom(|iters| async move {
let mut bufs = vec![[MaybeUninit::uninit()]; size];
let bufs = (0..size)
.map(|_| Packet::<Write>::new_buf(1))
.collect::<Vec<_>>();

Null::run_with_mut(*handle.borrow_mut(), |scope| async move {
let mut elapsed = Duration::default();

for _ in 0..iters {
let start = Instant::now();

let stream = scope.new_id().await;
pin_mut!(stream);

for b in &mut bufs {
stream.as_ref().send_io(0, b);
for b in &bufs {
unsafe { b.reset_err() };
let pv = PacketView::from_arc_ref(b, 0);
let bpv = unsafe { pv.bind(None) };
scope.send_io(0, bpv);
}

black_box(stream.count().await);
// We can await for a packet until it has no more active views, and
// then reuse it again at the next iteration of the loop.
black_box(
futures::stream::iter(&bufs)
.for_each(|v| async move { (&**v).await })
.await,
);

elapsed += start.elapsed();
}
Expand Down Expand Up @@ -212,23 +184,28 @@ fn reads(c: &mut Criterion) {
let handle = &RefCell::new(&mut handle);

b.to_async(T::executor()).iter_custom(|iters| async move {
let mut bufs = vec![[MaybeUninit::uninit()]; size];
let bufs = (0..size)
.map(|_| Packet::<Write>::new_buf(1))
.collect::<Vec<_>>();

Null::run_with_mut(*handle.borrow_mut(), |scope| async move {
let mut elapsed = Duration::default();

for _ in 0..iters {
let futures = bufs
.iter_mut()
.map(|b| async { scope.io(0, b) })
.collect::<Vec<_>>();

let futures = futures::future::join_all(futures).await;
let streams = futures::stream::iter(futures).flatten();
let streams = futures::stream::iter(bufs.iter().map(|b| {
unsafe { b.reset_err() };
scope.io(0, b)
}));

let start = Instant::now();

black_box(streams.count().await);
black_box(
streams
.for_each(|v| async move {
v.await;
})
.await,
);

elapsed += start.elapsed();
}
Expand Down Expand Up @@ -278,36 +255,36 @@ fn reads_tasked(c: &mut Criterion) {
let start = Instant::now();

let join_tasks = (0..tasks).map(|_| {
let bufs = (0..size)
.map(|_| Packet::<Write>::new_buf(1))
.collect::<Vec<_>>();

T::spawn({
let subtract = Instant::now();

let mut bufs = vec![[MaybeUninit::uninit()]; size as _];

let mut scope = handle.clone();

async move {
Null::run_with_mut(&mut scope, move |scope| async move {
let mut subtract = subtract.elapsed();

for _ in 0..iters {
let s2 = Instant::now();
let futures = bufs
.iter_mut()
.map(|b| async { scope.io(0, b) })
.collect::<Vec<_>>();

let futures =
futures::future::join_all(futures).await;
let streams =
futures::stream::iter(futures).flatten();
futures::stream::iter(bufs.iter().map(|b| {
unsafe { b.reset_err() };
scope.io(0, b.clone())
}));

subtract += s2.elapsed();

//let start = Instant::now();

black_box(streams.count().await);

//elapsed += start.elapsed();
black_box(
streams
.for_each(|v| async move {
v.await;
})
.await,
);
}

subtract
Expand Down Expand Up @@ -349,6 +326,5 @@ criterion_group! {
reads_tasked,
singlestream_reads,
reads,
allocations
}
criterion_main!(benches);
176 changes: 176 additions & 0 deletions mfio/src/io/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
use cglue::prelude::v1::*;
use core::cell::UnsafeCell;
use core::future::Future;
use core::marker::PhantomData;
use core::pin::Pin;
use core::task::{Context, Poll};

mod packet;
pub use packet::*;
mod opaque;
pub use opaque::*;

#[cglue_trait]
pub trait PacketIo<Perms: PacketPerms, Param>: Sized {
// TODO: make this a sink
fn send_io(&self, param: Param, view: BoundPacketView<Perms>);
}

pub trait PacketIoExt<Perms: PacketPerms, Param>: PacketIo<Perms, Param> {
fn io<'a, T: PacketStore<'a, Perms>>(
&'a self,
param: Param,
packet: T,
) -> IoFut<'a, Self, Perms, Param, T> {
//IoFut::NewId(self, param, packet.stack())
IoFut {
pkt: UnsafeCell::new(Some(packet.stack())),
initial_state: Some((self, param)),
_phantom: PhantomData,
}
}

fn io_to<'a, T: PacketStore<'a, Perms>, O: OutputStore<'a, Perms>>(
&'a self,
param: Param,
packet: T,
output: O,
) -> IoToFut<'a, Self, Perms, Param, T, O> {
//IoFut::NewId(self, param, packet.stack())
IoToFut {
pkt_out: UnsafeCell::new(Some((packet.stack(), output.stack()))),
initial_state: Some((self, param)),
_phantom: PhantomData,
}
}
}

impl<T: PacketIo<Perms, Param>, Perms: PacketPerms, Param> PacketIoExt<Perms, Param> for T {}

pub trait StreamIoExt<Perms: PacketPerms>: PacketIo<Perms, NoPos> {
fn stream_io<'a, T: PacketStore<'a, Perms>>(
&'a self,
packet: T,
) -> IoFut<'a, Self, Perms, NoPos, T> {
self.io(NoPos::new(), packet)
}

fn stream_io_to<'a, T: PacketStore<'a, Perms>, O: OutputStore<'a, Perms>>(
&'a self,
packet: T,
output: O,
) -> IoToFut<'a, Self, Perms, NoPos, T, O> {
self.io_to(NoPos::new(), packet, output)
}
}

impl<T: PacketIo<Perms, NoPos>, Perms: PacketPerms> StreamIoExt<Perms> for T {}

#[repr(transparent)]
#[derive(Clone)]
pub struct NoPos(core::marker::PhantomData<()>);

impl NoPos {
pub const fn new() -> Self {
Self(core::marker::PhantomData)
}
}

pub struct IoFut<'a, T, Perms: PacketPerms, Param, Packet: PacketStore<'a, Perms>> {
pkt: UnsafeCell<Option<Packet::StackReq<'a>>>,
initial_state: Option<(&'a T, Param)>,
_phantom: PhantomData<Perms>,
}

impl<'a, T: PacketIo<Perms, Param>, Perms: PacketPerms, Param, Pkt: PacketStore<'a, Perms>> Future
for IoFut<'a, T, Perms, Param, Pkt>
{
type Output = Pkt::StackReq<'a>;

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let state = unsafe { self.get_unchecked_mut() };

loop {
match state.initial_state.take() {
Some((io, param)) => {
// SAFETY: this packet's existence is tied to 'a lifetime, meaning it will be valid
// throughout 'a.
let pkt: &'a mut Pkt::StackReq<'a> =
unsafe { (*state.pkt.get()).as_mut().unwrap() };
let view: PacketView<'a, Perms> = Pkt::stack_opaque(pkt);
// SAFETY: PacketView's lifetime is a marker, and we are using the marker lifetime to guide
// assumptions about type's validity. A sound implementation would put a 'static object
// here regardless, making the object 'static, while non-'static implementations are out of
// our hand, therefore we assume the caller is giving us correct info.
let bound = unsafe { view.bind(None) };
io.send_io(param, bound)
}
None => {
let pkt: &'a Pkt::StackReq<'a> =
unsafe { (*state.pkt.get()).as_ref().unwrap() };
let mut pkt: &'a Packet<Perms> = Pkt::stack_hdr(pkt);
let pkt = Pin::new(&mut pkt);
break pkt
.poll(cx)
.map(|_| unsafe { (*state.pkt.get()).take().unwrap() });
}
}
}
}
}

pub struct IoToFut<
'a,
T,
Perms: PacketPerms,
Param,
Packet: PacketStore<'a, Perms>,
Output: OutputStore<'a, Perms>,
> {
pkt_out: UnsafeCell<Option<(Packet::StackReq<'a>, Output::StackReq<'a>)>>,
initial_state: Option<(&'a T, Param)>,
_phantom: PhantomData<Perms>,
}

impl<
'a,
T: PacketIo<Perms, Param>,
Perms: PacketPerms,
Param,
Pkt: PacketStore<'a, Perms>,
Out: OutputStore<'a, Perms>,
> Future for IoToFut<'a, T, Perms, Param, Pkt, Out>
{
type Output = (Pkt::StackReq<'a>, Out::StackReq<'a>);

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let state = unsafe { self.get_unchecked_mut() };

loop {
match state.initial_state.take() {
Some((io, param)) => {
// SAFETY: this packet's existence is tied to 'a lifetime, meaning it will be valid
// throughout 'a.
let (pkt, out): &'a mut (Pkt::StackReq<'a>, Out::StackReq<'a>) =
unsafe { (*state.pkt_out.get()).as_mut().unwrap() };
let view: PacketView<'a, Perms> = Pkt::stack_opaque(pkt);
// SAFETY: PacketView's lifetime is a marker, and we are using the marker lifetime to guide
// assumptions about type's validity. A sound implementation would put a 'static object
// here regardless, making the object 'static, while non-'static implementations are out of
// our hand, therefore we assume the caller is giving us correct info.
let bound = unsafe { view.bind(Some(Out::stack_opaque(out))) };
io.send_io(param, bound)
}
None => {
let pkt: &'a Pkt::StackReq<'a> =
unsafe { &(*state.pkt_out.get()).as_ref().unwrap().0 };
let mut pkt: &'a Packet<Perms> = Pkt::stack_hdr(pkt);
let pkt = Pin::new(&mut pkt);
break pkt
.poll(cx)
.map(|_| unsafe { (*state.pkt_out.get()).take().unwrap() });
}
}
}
}
}
Loading

0 comments on commit d2ff03f

Please sign in to comment.