Skip to content

Commit

Permalink
[nextest-runner] remove cancellation broadcast receiver
Browse files Browse the repository at this point in the history
This functionality is now provided by the dispatcher via the request channel.
  • Loading branch information
sunshowers committed Dec 15, 2024
1 parent 258e6c9 commit be10886
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 24 deletions.
8 changes: 3 additions & 5 deletions nextest-runner/src/runner/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use debug_ignore::DebugIgnore;
use quick_junit::ReportUuid;
use std::{collections::BTreeMap, time::Duration};
use tokio::sync::{
broadcast,
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
oneshot,
};
Expand Down Expand Up @@ -98,7 +97,6 @@ where
signal_handler: &mut SignalHandler,
input_handler: &mut InputHandler,
report_cancel_rx: oneshot::Receiver<()>,
cancellation_sender: broadcast::Sender<()>,
) -> RunnerTaskState {
let mut report_cancel_rx = std::pin::pin!(report_cancel_rx);

Expand Down Expand Up @@ -271,17 +269,17 @@ where
}
HandleEventResponse::Cancel(cancel) => {
// A cancellation notice was received.
let _ = cancellation_sender.send(());
match cancel {
// Some of the branches here don't do anything, but are specified
// for readability.
CancelEvent::Report => {
// An error was produced by the reporter, and cancellation has
// begun.
self.broadcast_request(RunUnitRequest::OtherCancel);
}
CancelEvent::TestFailure => {
// A test failure has caused cancellation to begin. Nothing to
// do here.
// A test failure has caused cancellation to begin.
self.broadcast_request(RunUnitRequest::OtherCancel);
}
CancelEvent::Signal(req) => {
// A signal has caused cancellation to begin. Let all the child
Expand Down
25 changes: 17 additions & 8 deletions nextest-runner/src/runner/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use std::{
use tokio::{
process::Child,
sync::{
broadcast,
mpsc::{UnboundedReceiver, UnboundedSender},
oneshot,
},
Expand Down Expand Up @@ -163,7 +162,6 @@ impl<'a> ExecutorContext<'a> {
test_instance: TestInstance<'a>,
settings: TestSettings<'a>,
resp_tx: UnboundedSender<ExecutorEvent<'a>>,
mut cancel_receiver: broadcast::Receiver<()>,
setup_script_data: Arc<SetupScriptExecuteData<'a>>,
) {
debug!(test_name = test_instance.name, "running test");
Expand Down Expand Up @@ -268,7 +266,6 @@ impl<'a> ExecutorContext<'a> {
previous_result,
previous_slow,
delay,
&mut cancel_receiver,
&mut req_rx,
)
.await;
Expand Down Expand Up @@ -464,6 +461,10 @@ impl<'a> ExecutorContext<'a> {
slow_timeout.grace_period
).await;
}
RunUnitRequest::OtherCancel => {
// Ignore non-signal cancellation requests --
// let the script finish.
}
RunUnitRequest::Query(RunUnitQuery::GetInfo(sender)) => {
_ = sender.send(script.info_response(
UnitState::Running {
Expand Down Expand Up @@ -706,6 +707,10 @@ impl<'a> ExecutorContext<'a> {
slow_timeout.grace_period,
).await;
}
RunUnitRequest::OtherCancel => {
// Ignore non-signal cancellation requests --
// let the test finish.
}
RunUnitRequest::Query(RunUnitQuery::GetInfo(tx)) => {
_ = tx.send(test.info_response(
UnitState::Running {
Expand Down Expand Up @@ -986,7 +991,6 @@ async fn handle_delay_between_attempts<'a>(
previous_result: ExecutionResult,
previous_slow: bool,
delay: Duration,
cancel_receiver: &mut broadcast::Receiver<()>,
req_rx: &mut UnboundedReceiver<RunUnitRequest<'a>>,
) {
let mut sleep = std::pin::pin!(crate::time::pausable_sleep(delay));
Expand All @@ -999,10 +1003,6 @@ async fn handle_delay_between_attempts<'a>(
// The timer has expired.
break;
}
_ = cancel_receiver.recv() => {
// The cancel signal was received.
break;
}
recv = req_rx.recv() => {
let req = recv.expect("req_rx sender is open");

Expand All @@ -1025,6 +1025,11 @@ async fn handle_delay_between_attempts<'a>(
// shutdown.
break;
}
RunUnitRequest::OtherCancel => {
// If a cancellation was requested, break out of the
// loop.
break;
}
RunUnitRequest::Query(RunUnitQuery::GetInfo(tx)) => {
let waiting_snapshot = waiting_stopwatch.snapshot();
_ = tx.send(
Expand Down Expand Up @@ -1096,6 +1101,10 @@ async fn detect_fd_leaks<'a>(
RunUnitRequest::Signal(_) => {
// The process is done executing, so signals are moot.
}
RunUnitRequest::OtherCancel => {
// Ignore non-signal cancellation requests -- let the
// unit finish.
}
RunUnitRequest::Query(RunUnitQuery::GetInfo(sender)) => {
let snapshot = waiting_stopwatch.snapshot();
let resp = cx.info_response(
Expand Down
14 changes: 3 additions & 11 deletions nextest-runner/src/runner/imp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use quick_junit::ReportUuid;
use std::{convert::Infallible, fmt, sync::Arc};
use tokio::{
runtime::Runtime,
sync::{broadcast, mpsc::unbounded_channel, oneshot},
sync::{mpsc::unbounded_channel, oneshot},
task::JoinError,
};
use tracing::{debug, warn};
Expand Down Expand Up @@ -280,16 +280,10 @@ impl<'a> TestRunnerInner<'a> {

let ((), results) = TokioScope::scope_and_block(move |scope| {
let (resp_tx, resp_rx) = unbounded_channel::<ExecutorEvent<'a>>();
let (cancellation_sender, _cancel_receiver) = broadcast::channel(1);

// Run the dispatcher to completion in a task.
let dispatcher_fut = dispatcher_cx_mut.run(
resp_rx,
signal_handler,
input_handler,
report_cancel_rx,
cancellation_sender.clone(),
);
let dispatcher_fut =
dispatcher_cx_mut.run(resp_rx, signal_handler, input_handler, report_cancel_rx);
scope.spawn_cancellable(dispatcher_fut, || RunnerTaskState::Cancelled);

let (script_tx, mut script_rx) = unbounded_channel::<SetupScriptExecuteData<'a>>();
Expand Down Expand Up @@ -330,7 +324,6 @@ impl<'a> TestRunnerInner<'a> {
TestGroup::Global => None,
TestGroup::Custom(name) => Some(name.clone()),
};
let cancel_rx = cancellation_sender.subscribe();
let resp_tx = resp_tx.clone();
let setup_script_data = setup_script_data.clone();

Expand Down Expand Up @@ -359,7 +352,6 @@ impl<'a> TestRunnerInner<'a> {
test_instance,
settings,
resp_tx.clone(),
cancel_rx,
setup_script_data,
))
})
Expand Down
4 changes: 4 additions & 0 deletions nextest-runner/src/runner/internal_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ impl InternalSetupScriptExecuteStatus<'_> {
#[derive(Clone, Debug)]
pub(super) enum RunUnitRequest<'a> {
Signal(SignalRequest),
/// Non-signal cancellation requests (e.g. test failures) which should cause
/// tests to exit in some states.
OtherCancel,
Query(RunUnitQuery<'a>),
}

Expand All @@ -197,6 +200,7 @@ impl<'a> RunUnitRequest<'a> {
#[cfg(unix)]
Self::Signal(SignalRequest::Continue) => {}
Self::Signal(SignalRequest::Shutdown(_)) => {}
Self::OtherCancel => {}
Self::Query(RunUnitQuery::GetInfo(tx)) => {
// The receiver being dead isn't really important.
_ = tx.send(status.info_response());
Expand Down
4 changes: 4 additions & 0 deletions nextest-runner/src/runner/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ pub(super) async fn terminate_child<'a>(
}
break;
}
RunUnitRequest::OtherCancel => {
// Ignore non-signal cancellation requests (most
// likely another test failed). Let the unit finish.
}
RunUnitRequest::Query(RunUnitQuery::GetInfo(sender)) => {
let waiting_snapshot = waiting_stopwatch.snapshot();
_ = sender.send(
Expand Down
4 changes: 4 additions & 0 deletions nextest-runner/src/runner/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ pub(super) async fn terminate_child<'a>(
// immediately -- go to the next step.
break false;
}
RunUnitRequest::OtherCancel => {
// Ignore non-signal cancellation requests (most
// likely another test failed). Let the unit finish.
}
RunUnitRequest::Query(RunUnitQuery::GetInfo(sender)) => {
let waiting_snapshot = waiting_stopwatch.snapshot();
_ = sender.send(
Expand Down

0 comments on commit be10886

Please sign in to comment.