diff --git a/mfio-rt/src/native/impls/io_uring/tcp_stream.rs b/mfio-rt/src/native/impls/io_uring/tcp_stream.rs index 20bf15c..586f809 100644 --- a/mfio-rt/src/native/impls/io_uring/tcp_stream.rs +++ b/mfio-rt/src/native/impls/io_uring/tcp_stream.rs @@ -9,6 +9,7 @@ use core::task::{Context, Poll}; use io_uring::{ opcode, + squeue::Flags, types::{Fd, Fixed}, }; use parking_lot::Mutex; @@ -131,7 +132,18 @@ impl StreamInner { } let queue = self.stream.write_queue(); if !queue.is_empty() { - for queue in queue.chunks(*IOV_MAX) { + // FIXME: investigate why processing more than 3 chunks leads to out-of-order + // transfer of data - OOO does not happen only if we add link flag to the tail as + // well, which is wrong - adding link flag to the tail likely interupts other logic + // slowing us down. Taking 3 chunks seems to work fine, but 2 should be good + // enough. + // + // Performance wise, a better improvement here would be to enable submission of new + // writes, before all chunks complete - this would maximize the throughput. + let target_height = core::cmp::min((queue.len() + *IOV_MAX - 1) / *IOV_MAX, 2); + let mut tailed = false; + for queue in queue.chunks(*IOV_MAX).take(target_height) { + debug_assert!(!tailed); self.in_write += 1; let entry = opcode::Writev::new( Fixed(Key::Stream(idx).key() as _), @@ -140,7 +152,16 @@ impl StreamInner { ) .offset(!0u64) .build(); - push_handle.try_push_op(entry, Operation::StreamWrite(idx)) + + push_handle.try_push_op( + if self.in_write < target_height { + entry.flags(Flags::IO_HARDLINK) + } else { + tailed = true; + entry + }, + Operation::StreamWrite(idx), + ); } } }