Skip to content

Commit

Permalink
Different packet size type support
Browse files Browse the repository at this point in the history
  • Loading branch information
h33p committed Dec 26, 2023
1 parent d6f56f5 commit 6237920
Show file tree
Hide file tree
Showing 10 changed files with 226 additions and 135 deletions.
12 changes: 6 additions & 6 deletions mfio-netfs/src/net/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,25 +69,25 @@ impl IntoOp for Write {
}
}

struct ShardedPacket<T: Splittable<u64>> {
shards: BTreeMap<u64, T>,
struct ShardedPacket<T: Splittable> {
shards: BTreeMap<T::Bounds, T>,
}

impl<T: Splittable<u64>> From<T> for ShardedPacket<T> {
impl<T: Splittable> From<T> for ShardedPacket<T> {
fn from(pkt: T) -> Self {
Self {
shards: std::iter::once((0, pkt)).collect(),
shards: std::iter::once((Default::default(), pkt)).collect(),
}
}
}

impl<T: Splittable<u64>> ShardedPacket<T> {
impl<T: Splittable> ShardedPacket<T> {
fn is_empty(&self) -> bool {
// TODO: do this or self.len() == 0?
self.shards.is_empty()
}

fn extract(&mut self, idx: u64, len: u64) -> Option<T> {
fn extract(&mut self, idx: T::Bounds, len: T::Bounds) -> Option<T> {
let (&shard_idx, _) = self.shards.range(..=idx).next_back()?;
let mut shard = self.shards.remove(&shard_idx)?;

Expand Down
4 changes: 2 additions & 2 deletions mfio-rt/src/native/impls/io_uring/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl Operation {
match self {
Operation::FileRead(pkt, buf) => match res {
Ok(read) if (read as u64) < pkt.len() => {
let (left, right) = pkt.split_at(read as u64);
let (left, right) = pkt.split_at(read);
if let Err(pkt) = left {
assert!(!buf.0.is_null());
let buf = unsafe { &*buf.0 };
Expand All @@ -111,7 +111,7 @@ impl Operation {
},
Operation::FileWrite(pkt, _) => match res {
Ok(read) if (read as u64) < pkt.len() => {
let (left, right) = pkt.split_at(read as u64);
let (left, right) = pkt.split_at(read);
deferred_pkts.ok(left);
right.error(io_err(State::Nop));
}
Expand Down
4 changes: 2 additions & 2 deletions mfio-rt/src/native/impls/iocp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ impl OperationMode {
match self {
Self::FileRead(pkt, buf) => match res {
Ok(read) if (read as u64) < pkt.len() => {
let (left, right) = pkt.split_at(read as u64);
let (left, right) = pkt.split_at(read);
if let Err(pkt) = left {
assert!(!buf.0.is_null());
let buf = unsafe { &*buf.0 };
Expand All @@ -299,7 +299,7 @@ impl OperationMode {
},
Self::FileWrite(pkt, _) => match res {
Ok(read) if (read as u64) < pkt.len() => {
let (left, right) = pkt.split_at(read as u64);
let (left, right) = pkt.split_at(read);
deferred_pkts.ok(left);
right.error(io_err(State::Nop));
}
Expand Down
4 changes: 2 additions & 2 deletions mfio-rt/src/native/impls/mio/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl FileInner {
}
break;
} else if l > 0 {
let (a, b) = pkt.split_at(l as _);
let (a, b) = pkt.split_at(l);
if let Err(pkt) = a {
let _ = unsafe {
pkt.transfer_data(self.tmp_buf.as_mut_ptr().cast())
Expand Down Expand Up @@ -270,7 +270,7 @@ impl FileInner {
if l == len as usize {
break;
} else if l > 0 {
pkt = pkt.split_at(l as _).1;
pkt = pkt.split_at(l).1;
} else {
pkt.error(io_err(State::Nop));
break;
Expand Down
8 changes: 4 additions & 4 deletions mfio-rt/src/native/impls/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ impl<Handle: IoHandle + Send + Sync + 'static> From<BaseArc<IoInner<Handle>>>
match buf.try_alloc() {
Ok(mut alloced) => match copy_buf(&mut alloced[..]) {
Ok(read) if (read as u64) < alloced.len() => {
let (_, right) = alloced.split_at(read as _);
let (_, right) = alloced.split_at(read);
right.error(io_err(State::Nop));
}
Err(e) => alloced.error(io_err(e.kind().into())),
Expand All @@ -284,7 +284,7 @@ impl<Handle: IoHandle + Send + Sync + 'static> From<BaseArc<IoInner<Handle>>>
}
match copy_buf(&mut tmp_buf[..(buf.len() as usize)]) {
Ok(read) if (read as u64) < buf.len() => {
let (left, right) = buf.split_at(read as u64);
let (left, right) = buf.split_at(read);
let _ = unsafe {
left.transfer_data(tmp_buf.as_ptr().cast())
};
Expand Down Expand Up @@ -321,7 +321,7 @@ impl<Handle: IoHandle + Send + Sync + 'static> From<BaseArc<IoInner<Handle>>>
let alloced: ReadPacketObj = alloced;
match file.write_at(&alloced[..], pos) {
Ok(written) if (written as u64) < alloced.len() => {
let (_, right) = alloced.split_at(written as u64);
let (_, right) = alloced.split_at(written);
right.error(io_err(State::Nop));
}
Err(e) => alloced.error(io_err(e.kind().into())),
Expand All @@ -342,7 +342,7 @@ impl<Handle: IoHandle + Send + Sync + 'static> From<BaseArc<IoInner<Handle>>>
};
match file.write_at(tmp_buf, pos) {
Ok(written) if (written as u64) < buf.len() => {
let (_, right) = buf.split_at(written as u64);
let (_, right) = buf.split_at(written);
right.error(io_err(State::Nop));
}
Err(e) => buf.error(io_err(e.kind().into())),
Expand Down
8 changes: 4 additions & 4 deletions mfio-rt/src/util/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ impl StreamBuf {
let spare_len = core::cmp::min(spare.len(), self.read_cached);

if (spare_len as u64) < packet.len() {
let (a, b) = packet.split_at(spare_len as u64);
let (a, b) = packet.split_at(spare_len);
let transferred = unsafe { a.transfer_data(spare.as_mut_ptr().cast()) };
self.read_buf.release(transferred.len() as usize);
self.read_cached -= transferred.len() as usize;
Expand Down Expand Up @@ -365,7 +365,7 @@ impl StreamBuf {
let packet = if len as u64 >= packet.len() {
packet
} else {
let (a, b) = packet.split_at(len as u64);
let (a, b) = packet.split_at(len);
self.read_ops2.push_front(b);
a
};
Expand Down Expand Up @@ -575,7 +575,7 @@ impl StreamBuf {
self.read_ops1.push_front(Err(pkt));
break;
} else if (spare_len as u64) < pkt.len() {
let (a, b) = pkt.split_at(spare_len as u64);
let (a, b) = pkt.split_at(spare_len);
self.read_buf.reserve(spare_len);
self.read_ops2.push_back(Err(a));
pkt = b;
Expand Down Expand Up @@ -650,7 +650,7 @@ impl StreamBuf {
*queued = Some(pkt);
break;
} else if (spare_len as u64) < pkt.len() {
let (a, b) = pkt.split_at(spare_len as u64);
let (a, b) = pkt.split_at(spare_len);
let pkt = unsafe { a.transfer_data(spare.as_mut_ptr().cast()) };
self.write_buf.reserve(spare_len);
transferred.push_back(pkt);
Expand Down
2 changes: 2 additions & 0 deletions mfio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ mfio-derive = { version = "0.1", path = "../mfio-derive" }
http = { version = "0.2", optional = true }
log = "0.4"
rangemap = "1"
num = { version = "0.4", default-features = false }
atomic-traits = { version = "0.3", default-features = false }
# This is only needed when std feature is disabled, but we can't do negative bounds
spin = "0.9"

Expand Down
Loading

0 comments on commit 6237920

Please sign in to comment.