diff --git a/shotover-proxy/benches/windsock/aws/mod.rs b/shotover-proxy/benches/windsock/aws/mod.rs index 2c693bb2a..6a54e5047 100644 --- a/shotover-proxy/benches/windsock/aws/mod.rs +++ b/shotover-proxy/benches/windsock/aws/mod.rs @@ -279,7 +279,8 @@ impl Ec2InstanceWithShotover { .push_file_from_bytes(topology.as_bytes(), Path::new("topology.yaml")) .await; - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel(); + let (event_tx, mut event_rx) = tokio::sync::mpsc::unbounded_channel(); tokio::task::spawn(async move { let mut receiver = self .instance @@ -300,7 +301,7 @@ RUST_BACKTRACE=1 ./shotover-bin --config-file config.yaml --topology-file topolo if let Level::Error = event.level { tracing::error!("shotover error:\n {event}"); } - if tx.send(event).is_err() { + if event_tx.send(event).is_err() { return } } @@ -308,16 +309,21 @@ RUST_BACKTRACE=1 ./shotover-bin --config-file config.yaml --topology-file topolo None => return, } }, - _ = tx.closed() => { + _ = shutdown_rx.recv() => { + // shutdown_tx is dropped, instructing us to shutdown + // we MUST drop self before dropping event_tx to ensure that the Arc clone is dropped before the task indicates that it has terminated. + // Otherwise we may hit a race condition and fail the assertion that there is only one Arc clone alive. + std::mem::drop(self); + std::mem::drop(event_tx); return; }, - }; + } } }); // wait for shotover to startup loop { - let event = rx + let event = event_rx .recv() .await .expect("Shotover shutdown before indicating that it had started"); @@ -328,23 +334,28 @@ RUST_BACKTRACE=1 ./shotover-bin --config-file config.yaml --topology-file topolo break; } } - RunningShotover { rx } + + RunningShotover { + shutdown_tx, + event_rx, + } } } pub struct RunningShotover { - rx: tokio::sync::mpsc::UnboundedReceiver, + shutdown_tx: tokio::sync::mpsc::UnboundedSender<()>, + event_rx: tokio::sync::mpsc::UnboundedReceiver, } impl RunningShotover { pub async fn shutdown(mut self) { - while let Ok(event) = self.rx.try_recv() { + // dropping shutdown_tx instructs the task to shutdown causing shotover to be terminated + std::mem::drop(self.shutdown_tx); + + while let Some(event) = self.event_rx.recv().await { if let Level::Warn | Level::Error = event.level { panic!("Received error/warn event from shotover:\n {event}") } } - - // dropping rx instructs the task to shutdown causing shotover to be terminated - std::mem::drop(self.rx) } }