diff --git a/src/rust/inetstack/protocols/arp/peer.rs b/src/rust/inetstack/protocols/arp/peer.rs index 2c22b3072..9fbcbf477 100644 --- a/src/rust/inetstack/protocols/arp/peer.rs +++ b/src/rust/inetstack/protocols/arp/peer.rs @@ -169,9 +169,9 @@ impl SharedArpPeer { /// Background task that cleans up the ARP cache from time to time. async fn background(&mut self) { + let yielder: Yielder = Yielder::new(); loop { - let yielder: Yielder = Yielder::new(); - match self.runtime.get_timer().wait(Self::ARP_CLEANUP_TIMEOUT, yielder).await { + match self.runtime.get_timer().wait(Self::ARP_CLEANUP_TIMEOUT, &yielder).await { Ok(()) => continue, Err(_) => break, } @@ -276,13 +276,13 @@ impl SharedArpPeer { // > The frequency of the ARP request is very close to one per // > second, the maximum suggested by [RFC1122]. let result = { + let yielder: Yielder = Yielder::new(); for i in 0..self.arp_config.get_retry_count() + 1 { self.transport.transmit(Box::new(msg.clone())); - let yielder: Yielder = Yielder::new(); let timer = self .runtime .get_timer() - .wait(self.arp_config.get_request_timeout(), yielder); + .wait(self.arp_config.get_request_timeout(), &yielder); match arp_response.with_timeout(timer).await { Ok(link_addr) => { diff --git a/src/rust/inetstack/protocols/icmpv4/peer.rs b/src/rust/inetstack/protocols/icmpv4/peer.rs index f6f25e22d..51f661cb6 100644 --- a/src/rust/inetstack/protocols/icmpv4/peer.rs +++ b/src/rust/inetstack/protocols/icmpv4/peer.rs @@ -276,7 +276,7 @@ impl SharedIcmpv4Peer { }; let yielder: Yielder = Yielder::new(); let clock_ref: SharedTimer = self.runtime.get_timer(); - let timer = clock_ref.wait(timeout, yielder); + let timer = clock_ref.wait(timeout, &yielder); match rx.fuse().with_timeout(timer).await? { // Request completed successfully. Ok(_) => Ok(self.runtime.get_now() - t0), diff --git a/src/rust/inetstack/protocols/tcp/active_open.rs b/src/rust/inetstack/protocols/tcp/active_open.rs index 75d19136f..9a156e080 100644 --- a/src/rust/inetstack/protocols/tcp/active_open.rs +++ b/src/rust/inetstack/protocols/tcp/active_open.rs @@ -47,6 +47,7 @@ use crate::{ scheduler::{ TaskHandle, Yielder, + YielderHandle, }, }; use ::libc::{ @@ -78,6 +79,7 @@ pub struct ActiveOpenSocket { arp: SharedArpPeer, result: AsyncValue, Fail>>, handle: Option, + yielder_handle: YielderHandle, } #[derive(Clone)] @@ -98,6 +100,8 @@ impl SharedActiveOpenSocket { local_link_addr: MacAddress, arp: SharedArpPeer, ) -> Result { + let yielder: Yielder = Yielder::new(); + let yielder_handle: YielderHandle = yielder.get_handle(); let mut me: Self = Self(SharedObject::>::new(ActiveOpenSocket:: { local_isn, local, @@ -109,11 +113,12 @@ impl SharedActiveOpenSocket { arp, result: AsyncValue::, Fail>>::default(), handle: None, + yielder_handle, })); let handle: TaskHandle = runtime.insert_background_coroutine( "Inetstack::TCP::activeopen::background", - Box::pin(me.clone().background()), + Box::pin(me.clone().background(yielder)), )?; me.handle = Some(handle); // TODO: Add fast path here when remote is already in the ARP cache (and subtract one retry). @@ -226,13 +231,14 @@ impl SharedActiveOpenSocket { None, ); self.result.set(Ok(cb)); + self.yielder_handle.wake_with(Ok(())); let handle: TaskHandle = self.handle.take().expect("We should have allocated a background task"); if let Err(e) = self.runtime.remove_background_coroutine(&handle) { panic!("Failed to remove active open coroutine (error={:?}", e); } } - async fn background(mut self) { + async fn background(mut self, yielder: Yielder) { let handshake_retries: usize = self.tcp_config.get_handshake_retries(); let handshake_timeout = self.tcp_config.get_handshake_timeout(); for _ in 0..handshake_retries { @@ -266,8 +272,7 @@ impl SharedActiveOpenSocket { }; self.transport.transmit(Box::new(segment)); let clock_ref: SharedTimer = self.runtime.get_timer(); - let yielder: Yielder = Yielder::new(); - if let Err(e) = clock_ref.wait(handshake_timeout, yielder).await { + if let Err(e) = clock_ref.wait(handshake_timeout, &yielder).await { self.result.set(Err(e)); return; } diff --git a/src/rust/inetstack/protocols/tcp/established/background/acknowledger.rs b/src/rust/inetstack/protocols/tcp/established/background/acknowledger.rs index be1eb9de8..1d2bb515e 100644 --- a/src/rust/inetstack/protocols/tcp/established/background/acknowledger.rs +++ b/src/rust/inetstack/protocols/tcp/established/background/acknowledger.rs @@ -17,7 +17,7 @@ use ::futures::future::{ }; use ::std::time::Instant; -pub async fn acknowledger(mut cb: SharedControlBlock) -> Result { +pub async fn acknowledger(mut cb: SharedControlBlock, yielder: Yielder) -> Result { loop { // TODO: Implement TCP delayed ACKs, subject to restrictions from RFC 1122 // - TCP should implement a delayed ACK @@ -33,9 +33,8 @@ pub async fn acknowledger(mut cb: SharedControlBlock) -> Resu futures::pin_mut!(ack_deadline_changed); let clock_ref: SharedTimer = cb.get_timer(); - let yielder: Yielder = Yielder::new(); let ack_future = match deadline { - Some(t) => Either::Left(clock_ref.wait_until(t, yielder).fuse()), + Some(t) => Either::Left(clock_ref.wait_until(t, &yielder).fuse()), None => Either::Right(future::pending()), }; futures::pin_mut!(ack_future); @@ -43,6 +42,11 @@ pub async fn acknowledger(mut cb: SharedControlBlock) -> Resu futures::select_biased! { _ = ack_deadline_changed => continue, _ = ack_future => { + match cb.get_ack_deadline().get() { + Some(timeout) if timeout > cb.get_now() => continue, + None => continue, + _ => {}, + } cb.send_ack(); }, } diff --git a/src/rust/inetstack/protocols/tcp/established/background/mod.rs b/src/rust/inetstack/protocols/tcp/established/background/mod.rs index 7532b41f6..ef99869a3 100644 --- a/src/rust/inetstack/protocols/tcp/established/background/mod.rs +++ b/src/rust/inetstack/protocols/tcp/established/background/mod.rs @@ -13,24 +13,24 @@ use self::{ use crate::{ inetstack::protocols::tcp::established::ctrlblk::SharedControlBlock, runtime::QDesc, + scheduler::Yielder, }; use ::futures::{ channel::mpsc, FutureExt, }; -pub async fn background( - cb: SharedControlBlock, - fd: QDesc, - _dead_socket_tx: mpsc::UnboundedSender, -) { - let acknowledger = acknowledger(cb.clone()).fuse(); +pub async fn background(cb: SharedControlBlock, _dead_socket_tx: mpsc::UnboundedSender) { + let yielder_acknowledger: Yielder = Yielder::new(); + let acknowledger = acknowledger(cb.clone(), yielder_acknowledger).fuse(); futures::pin_mut!(acknowledger); - let retransmitter = retransmitter(cb.clone()).fuse(); + let yielder_retransmitter: Yielder = Yielder::new(); + let retransmitter = retransmitter(cb.clone(), yielder_retransmitter).fuse(); futures::pin_mut!(retransmitter); - let sender = sender(cb.clone()).fuse(); + let yielder_sender: Yielder = Yielder::new(); + let sender = sender(cb.clone(), yielder_sender).fuse(); futures::pin_mut!(sender); let r = futures::select_biased! { @@ -38,7 +38,7 @@ pub async fn background( r = retransmitter => r, r = sender => r, }; - error!("Connection (fd {:?}) terminated: {:?}", fd, r); + error!("Connection terminated: {:?}", r); // TODO Properly clean up Peer state for this connection. // dead_socket_tx diff --git a/src/rust/inetstack/protocols/tcp/established/background/retransmitter.rs b/src/rust/inetstack/protocols/tcp/established/background/retransmitter.rs index 5c331c53c..948698dfb 100644 --- a/src/rust/inetstack/protocols/tcp/established/background/retransmitter.rs +++ b/src/rust/inetstack/protocols/tcp/established/background/retransmitter.rs @@ -20,7 +20,7 @@ use ::std::time::{ Instant, }; -pub async fn retransmitter(mut cb: SharedControlBlock) -> Result { +pub async fn retransmitter(mut cb: SharedControlBlock, yielder: Yielder) -> Result { loop { // Pin future for timeout retransmission. let mut rtx_deadline_watched: SharedWatchedValue> = cb.watch_retransmit_deadline(); @@ -29,9 +29,8 @@ pub async fn retransmitter(mut cb: SharedControlBlock) -> Res let rtx_deadline_changed = rtx_deadline_watched.watch(rtx_yielder).fuse(); futures::pin_mut!(rtx_deadline_changed); let clock_ref: SharedTimer = cb.get_timer(); - let yielder: Yielder = Yielder::new(); let rtx_future = match rtx_deadline { - Some(t) => Either::Left(clock_ref.wait_until(t, yielder).fuse()), + Some(t) => Either::Left(clock_ref.wait_until(t, &yielder).fuse()), None => Either::Right(future::pending()), }; futures::pin_mut!(rtx_future); @@ -52,11 +51,17 @@ pub async fn retransmitter(mut cb: SharedControlBlock) -> Res } futures::pin_mut!(rtx_fast_retransmit_changed); + // Since these futures all share a single waker bit, they are all woken whenever one of them triggers. futures::select_biased! { _ = rtx_deadline_changed => continue, _ = rtx_fast_retransmit_changed => continue, _ = rtx_future => { - trace!("Retransmission Timer Expired"); + match cb.get_retransmit_deadline() { + Some(timeout) if timeout > cb.get_now() => continue, + None => continue, + _ => {}, + } + // Notify congestion control about RTO. // TODO: Is this the best place for this? // TODO: Why call into ControlBlock to get SND.UNA when congestion_control_on_rto() has access to it? diff --git a/src/rust/inetstack/protocols/tcp/established/background/sender.rs b/src/rust/inetstack/protocols/tcp/established/background/sender.rs index 9cb5cf07e..377efc1ff 100644 --- a/src/rust/inetstack/protocols/tcp/established/background/sender.rs +++ b/src/rust/inetstack/protocols/tcp/established/background/sender.rs @@ -24,7 +24,7 @@ use ::std::{ time::Duration, }; -pub async fn sender(mut cb: SharedControlBlock) -> Result { +pub async fn sender(mut cb: SharedControlBlock, yielder: Yielder) -> Result { 'top: loop { // First, check to see if there's any unsent data. // TODO: Change this to just look at the unsent queue to see if it is empty or not. @@ -85,12 +85,11 @@ pub async fn sender(mut cb: SharedControlBlock) -> Result continue 'top, - _ = clock_ref.wait(timeout, yielder).fuse() => { + _ = clock_ref.wait(timeout, &yielder).fuse() => { timeout *= 2; } } diff --git a/src/rust/inetstack/protocols/tcp/established/mod.rs b/src/rust/inetstack/protocols/tcp/established/mod.rs index 599503193..7fc3d77fa 100644 --- a/src/rust/inetstack/protocols/tcp/established/mod.rs +++ b/src/rust/inetstack/protocols/tcp/established/mod.rs @@ -46,14 +46,13 @@ pub struct EstablishedSocket { impl EstablishedSocket { pub fn new( cb: SharedControlBlock, - qd: QDesc, dead_socket_tx: mpsc::UnboundedSender, mut runtime: SharedDemiRuntime, ) -> Result { // TODO: Maybe add the queue descriptor here. let handle: TaskHandle = runtime.insert_background_coroutine( "Inetstack::TCP::established::background", - Box::pin(background::background(cb.clone(), qd, dead_socket_tx)), + Box::pin(background::background(cb.clone(), dead_socket_tx)), )?; Ok(Self { cb, diff --git a/src/rust/inetstack/protocols/tcp/passive_open.rs b/src/rust/inetstack/protocols/tcp/passive_open.rs index 2a7f62d5a..09adb85dc 100644 --- a/src/rust/inetstack/protocols/tcp/passive_open.rs +++ b/src/rust/inetstack/protocols/tcp/passive_open.rs @@ -48,6 +48,7 @@ use crate::{ scheduler::{ TaskHandle, Yielder, + YielderHandle, }, }; use ::libc::{ @@ -80,6 +81,7 @@ struct InflightAccept { remote_window_scale: Option, mss: usize, handle: TaskHandle, + yielder_handle: YielderHandle, } #[derive(Default)] @@ -212,7 +214,8 @@ impl SharedPassiveSocket { local_window_scale, remote_window_scale ); - if let Some(inflight) = self.inflight.remove(&remote) { + if let Some(mut inflight) = self.inflight.remove(&remote) { + inflight.yielder_handle.wake_with(Ok(())); if let Err(e) = self.runtime.remove_background_coroutine(&inflight.handle) { panic!("Failed to remove inflight accept (error={:?})", e); } @@ -259,7 +262,9 @@ impl SharedPassiveSocket { let local: SocketAddrV4 = self.local.clone(); let local_isn = self.isn_generator.generate(&local, &remote); let remote_isn = header.seq_num; - let future = self.clone().background(remote, remote_isn, local_isn); + let yielder: Yielder = Yielder::new(); + let yielder_handle: YielderHandle = yielder.get_handle(); + let future = self.clone().background(remote, remote_isn, local_isn, yielder); let handle: TaskHandle = self .runtime .insert_background_coroutine("Inetstack::TCP::passiveopen::background", Box::pin(future))?; @@ -286,12 +291,13 @@ impl SharedPassiveSocket { remote_window_scale, mss, handle, + yielder_handle, }; self.inflight.insert(remote, accept); Ok(()) } - async fn background(mut self, remote: SocketAddrV4, remote_isn: SeqNumber, local_isn: SeqNumber) { + async fn background(mut self, remote: SocketAddrV4, remote_isn: SeqNumber, local_isn: SeqNumber, yielder: Yielder) { let handshake_retries: usize = self.tcp_config.get_handshake_retries(); let handshake_timeout: Duration = self.tcp_config.get_handshake_timeout(); @@ -327,8 +333,7 @@ impl SharedPassiveSocket { }; self.transport.transmit(Box::new(segment)); let clock_ref: SharedTimer = self.runtime.get_timer(); - let yielder: Yielder = Yielder::new(); - if let Err(e) = clock_ref.wait(handshake_timeout, yielder).await { + if let Err(e) = clock_ref.wait(handshake_timeout, &yielder).await { self.ready.push_err(e); return; } diff --git a/src/rust/inetstack/protocols/tcp/peer.rs b/src/rust/inetstack/protocols/tcp/peer.rs index eb399fccc..6277fa50c 100644 --- a/src/rust/inetstack/protocols/tcp/peer.rs +++ b/src/rust/inetstack/protocols/tcp/peer.rs @@ -277,7 +277,7 @@ impl SharedTcpPeer { let new_qd: QDesc = self.runtime.alloc_queue::>(new_queue.clone()); // Set up established socket data structure. let established: EstablishedSocket = - EstablishedSocket::new(cb, new_qd, self.dead_socket_tx.clone(), self.runtime.clone())?; + EstablishedSocket::new(cb, self.dead_socket_tx.clone(), self.runtime.clone())?; let local: SocketAddrV4 = established.cb.get_local(); let remote: SocketAddrV4 = established.cb.get_remote(); // Set the socket in the new queue to established @@ -358,7 +358,6 @@ impl SharedTcpPeer { let cb: SharedControlBlock = socket.get_result(yielder).await?; let new_socket = Socket::Established(EstablishedSocket::new( cb, - qd, self.dead_socket_tx.clone(), self.runtime.clone(), )?); diff --git a/src/rust/runtime/timer.rs b/src/rust/runtime/timer.rs index c6c58696c..fa2d9ab7d 100644 --- a/src/rust/runtime/timer.rs +++ b/src/rust/runtime/timer.rs @@ -86,12 +86,12 @@ impl SharedTimer { self.now } - pub async fn wait(self, timeout: Duration, yielder: Yielder) -> Result<(), Fail> { + pub async fn wait(self, timeout: Duration, yielder: &Yielder) -> Result<(), Fail> { let now: Instant = self.now; - self.wait_until(now + timeout, yielder).await + self.wait_until(now + timeout, &yielder).await } - pub async fn wait_until(mut self, expiry: Instant, yielder: Yielder) -> Result<(), Fail> { + pub async fn wait_until(mut self, expiry: Instant, yielder: &Yielder) -> Result<(), Fail> { let entry = TimerQueueEntry { expiry, yielder: yielder.get_handle(), @@ -206,7 +206,7 @@ mod tests { let timer_ref: SharedTimer = timer.clone(); let yielder: Yielder = Yielder::new(); - let wait_future1 = timer_ref.wait(Duration::from_secs(2), yielder); + let wait_future1 = timer_ref.wait(Duration::from_secs(2), &yielder); futures::pin_mut!(wait_future1); crate::ensure_eq!(Future::poll(Pin::new(&mut wait_future1), &mut ctx).is_pending(), true); @@ -217,7 +217,7 @@ mod tests { let timer_ref2: SharedTimer = timer.clone(); let yielder2: Yielder = Yielder::new(); crate::ensure_eq!(Future::poll(Pin::new(&mut wait_future1), &mut ctx).is_pending(), true); - let wait_future2 = timer_ref2.wait(Duration::from_secs(1), yielder2); + let wait_future2 = timer_ref2.wait(Duration::from_secs(1), &yielder2); futures::pin_mut!(wait_future2); crate::ensure_eq!(Future::poll(Pin::new(&mut wait_future1), &mut ctx).is_pending(), true); diff --git a/src/rust/scheduler/handle.rs b/src/rust/scheduler/handle.rs index 9adc92cc2..18c27d816 100644 --- a/src/rust/scheduler/handle.rs +++ b/src/rust/scheduler/handle.rs @@ -87,9 +87,7 @@ impl YielderHandle { "wake_with(): already scheduled, overwriting result (old={:?})", old_result ); - } - - if let Some(waker) = self.waker_handle.borrow_mut().take() { + } else if let Some(waker) = self.waker_handle.borrow_mut().take() { waker.wake(); } } diff --git a/src/rust/scheduler/scheduler.rs b/src/rust/scheduler/scheduler.rs index 8fbc516eb..7dd1205e2 100644 --- a/src/rust/scheduler/scheduler.rs +++ b/src/rust/scheduler/scheduler.rs @@ -209,7 +209,10 @@ impl Scheduler { // Get the pinned ref. let pinned_ptr = { let pin_slab_index: usize = Scheduler::get_pin_slab_index(waker_page_index, waker_page_offset); - let pinned_ref: Pin<&mut Box> = self.tasks.get_pin_mut(pin_slab_index).unwrap(); + let pinned_ref: Pin<&mut Box> = self + .tasks + .get_pin_mut(pin_slab_index) + .expect(format!("Invalid offset: {:?}", pin_slab_index).as_str()); let pinned_ptr = unsafe { Pin::into_inner_unchecked(pinned_ref) as *mut _ }; pinned_ptr };