From 827bc9a47fe198ebc0577a449e32f8240d778a78 Mon Sep 17 00:00:00 2001 From: Rain Date: Wed, 27 Nov 2024 22:43:11 +0000 Subject: [PATCH] [nextest-runner] rename run_sender to resp_tx Following up from the previous commit, rename to a request-response model. --- nextest-runner/src/runner.rs | 45 ++++++++++++++++++------------------ 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/nextest-runner/src/runner.rs b/nextest-runner/src/runner.rs index 45266280d76..b08e3ae7dec 100644 --- a/nextest-runner/src/runner.rs +++ b/nextest-runner/src/runner.rs @@ -355,7 +355,7 @@ impl<'a> TestRunnerInner<'a> { let mut report_cancel_receiver = std::pin::pin!(report_cancel_receiver); let ((), results) = TokioScope::scope_and_block(move |scope| { - let (run_sender, mut run_receiver) = tokio::sync::mpsc::unbounded_channel(); + let (resp_tx, mut resp_rx) = tokio::sync::mpsc::unbounded_channel(); let (cancellation_sender, _cancel_receiver) = broadcast::channel(1); let exec_cancellation_sender = cancellation_sender.clone(); @@ -365,7 +365,7 @@ impl<'a> TestRunnerInner<'a> { loop { let internal_event = tokio::select! { - internal_event = run_receiver.recv() => { + internal_event = resp_rx.recv() => { match internal_event { Some(event) => InternalEvent::Test(event), None => { @@ -482,8 +482,7 @@ impl<'a> TestRunnerInner<'a> { // Ignore errors here: if there are no receivers to cancel, so // be it. Also note the ordering here: cancelled_ref is set // *before* this is sent. - let _ = req_tx_ref - .send(SignalRequest::Shutdown(req)); + let _ = req_tx_ref.send(SignalRequest::Shutdown(req)); } } } @@ -503,7 +502,7 @@ impl<'a> TestRunnerInner<'a> { // Run setup scripts one by one. for (index, script) in setup_scripts.into_iter().enumerate() { - let this_run_sender = run_sender.clone(); + let this_resp_tx = resp_tx.clone(); let (completion_sender, completion_receiver) = tokio::sync::oneshot::channel(); let script_id = script.id.clone(); @@ -520,7 +519,7 @@ impl<'a> TestRunnerInner<'a> { return; } - let _ = this_run_sender.send(InternalTestEvent::SetupScriptStarted { + let _ = this_resp_tx.send(InternalTestEvent::SetupScriptStarted { script_id: script_id.clone(), config, index, @@ -533,11 +532,11 @@ impl<'a> TestRunnerInner<'a> { }; let (status, env_map) = self - .run_setup_script(packet, &this_run_sender, &mut this_req_rx) + .run_setup_script(packet, &this_resp_tx, &mut this_req_rx) .await; let status = status.into_external(); - let _ = this_run_sender.send(InternalTestEvent::SetupScriptFinished { + let _ = this_resp_tx.send(InternalTestEvent::SetupScriptFinished { script_id, config, index, @@ -572,7 +571,7 @@ impl<'a> TestRunnerInner<'a> { let run_fut = futures::stream::iter(self.test_list.iter_tests()) .map(move |test_instance| { - let this_run_sender = run_sender.clone(); + let this_resp_tx = resp_tx.clone(); let mut cancel_receiver = cancellation_sender.subscribe(); let query = test_instance.to_test_query(); @@ -605,7 +604,7 @@ impl<'a> TestRunnerInner<'a> { test_instance.test_info.filter_match { // Failure to send means the receiver was dropped. - let _ = this_run_sender.send(InternalTestEvent::Skipped { + let _ = this_resp_tx.send(InternalTestEvent::Skipped { test_instance, reason, }); @@ -614,7 +613,7 @@ impl<'a> TestRunnerInner<'a> { // Failure to send means the receiver was dropped. let _ = - this_run_sender.send(InternalTestEvent::Started { test_instance }); + this_resp_tx.send(InternalTestEvent::Started { test_instance }); let mut run_statuses = vec![]; let mut delay = Duration::ZERO; @@ -630,7 +629,7 @@ impl<'a> TestRunnerInner<'a> { // is empty. if retry_data.attempt > 1 { - _ = this_run_sender.send(InternalTestEvent::RetryStarted { + _ = this_resp_tx.send(InternalTestEvent::RetryStarted { test_instance, retry_data, }); @@ -648,7 +647,7 @@ impl<'a> TestRunnerInner<'a> { }; let run_status = self - .run_test(packet, &this_run_sender, &mut this_req_rx) + .run_test(packet, &this_resp_tx, &mut this_req_rx) .await .into_external(retry_data); @@ -664,7 +663,7 @@ impl<'a> TestRunnerInner<'a> { .next() .expect("backoff delay must be non-empty"); - let _ = this_run_sender.send( + let _ = this_resp_tx.send( InternalTestEvent::AttemptFailedWillRetry { test_instance, failure_output: settings.failure_output(), @@ -694,7 +693,7 @@ impl<'a> TestRunnerInner<'a> { // * the test has succeeded, or // * the test has failed and we've run out of retries. // In either case, the test is finished. - let _ = this_run_sender.send(InternalTestEvent::Finished { + let _ = this_resp_tx.send(InternalTestEvent::Finished { test_instance, success_output: settings.success_output(), failure_output: settings.failure_output(), @@ -738,13 +737,13 @@ impl<'a> TestRunnerInner<'a> { async fn run_setup_script( &self, script: SetupScriptPacket<'a>, - run_sender: &UnboundedSender>, + resp_tx: &UnboundedSender>, req_rx: &mut tokio::sync::broadcast::Receiver, ) -> (InternalSetupScriptExecuteStatus, Option) { let mut stopwatch = crate::time::stopwatch(); match self - .run_setup_script_inner(script, &mut stopwatch, run_sender, req_rx) + .run_setup_script_inner(script, &mut stopwatch, resp_tx, req_rx) .await { Ok((status, env_map)) => (status, env_map), @@ -775,7 +774,7 @@ impl<'a> TestRunnerInner<'a> { &self, script: SetupScriptPacket<'a>, stopwatch: &mut StopwatchStart, - run_sender: &UnboundedSender>, + resp_tx: &UnboundedSender>, req_rx: &mut tokio::sync::broadcast::Receiver, ) -> Result<(InternalSetupScriptExecuteStatus, Option), SetupScriptError> { @@ -850,7 +849,7 @@ impl<'a> TestRunnerInner<'a> { }; if !slow_timeout.grace_period.is_zero() { - let _ = run_sender.send(script.slow_event( + let _ = resp_tx.send(script.slow_event( // Pass in the slow timeout period times timeout_hit, since // stopwatch.elapsed() tends to be slightly longer. timeout_hit * slow_timeout.period, @@ -960,14 +959,14 @@ impl<'a> TestRunnerInner<'a> { async fn run_test( &self, test: TestPacket<'a, '_>, - run_sender: &UnboundedSender>, + resp_tx: &UnboundedSender>, req_rx: &mut broadcast::Receiver, ) -> InternalExecuteStatus { let mut stopwatch = crate::time::stopwatch(); let delay_before_start = test.delay_before_start; match self - .run_test_inner(test, &mut stopwatch, run_sender, req_rx) + .run_test_inner(test, &mut stopwatch, resp_tx, req_rx) .await { Ok(run_status) => run_status, @@ -992,7 +991,7 @@ impl<'a> TestRunnerInner<'a> { &self, test: TestPacket<'a, '_>, stopwatch: &mut StopwatchStart, - run_sender: &UnboundedSender>, + resp_tx: &UnboundedSender>, req_rx: &mut broadcast::Receiver, ) -> Result { let ctx = TestExecuteContext { @@ -1060,7 +1059,7 @@ impl<'a> TestRunnerInner<'a> { }; if !slow_timeout.grace_period.is_zero() { - let _ = run_sender.send(test.slow_event( + let _ = resp_tx.send(test.slow_event( // Pass in the slow timeout period times timeout_hit, since // stopwatch.elapsed() tends to be slightly longer. timeout_hit * slow_timeout.period,