Skip to content

Commit

Permalink
Resolve io_uring out of order packets
Browse files Browse the repository at this point in the history
  • Loading branch information
h33p committed Nov 30, 2023
1 parent 0dc8f3f commit 2dc631c
Showing 1 changed file with 23 additions and 2 deletions.
25 changes: 23 additions & 2 deletions mfio-rt/src/native/impls/io_uring/tcp_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use core::task::{Context, Poll};

use io_uring::{
opcode,
squeue::Flags,
types::{Fd, Fixed},
};
use parking_lot::Mutex;
Expand Down Expand Up @@ -131,7 +132,18 @@ impl StreamInner {
}
let queue = self.stream.write_queue();
if !queue.is_empty() {
for queue in queue.chunks(*IOV_MAX) {
// FIXME: investigate why processing more than 3 chunks leads to out-of-order
// transfer of data - OOO does not happen only if we add link flag to the tail as
// well, which is wrong - adding link flag to the tail likely interupts other logic
// slowing us down. Taking 3 chunks seems to work fine, but 2 should be good
// enough.
//
// Performance wise, a better improvement here would be to enable submission of new
// writes, before all chunks complete - this would maximize the throughput.
let target_height = core::cmp::min((queue.len() + *IOV_MAX - 1) / *IOV_MAX, 2);
let mut tailed = false;
for queue in queue.chunks(*IOV_MAX).take(target_height) {
debug_assert!(!tailed);
self.in_write += 1;
let entry = opcode::Writev::new(
Fixed(Key::Stream(idx).key() as _),
Expand All @@ -140,7 +152,16 @@ impl StreamInner {
)
.offset(!0u64)
.build();
push_handle.try_push_op(entry, Operation::StreamWrite(idx))

push_handle.try_push_op(
if self.in_write < target_height {
entry.flags(Flags::IO_HARDLINK)
} else {
tailed = true;
entry
},
Operation::StreamWrite(idx),
);
}
}
}
Expand Down

0 comments on commit 2dc631c

Please sign in to comment.