diff --git a/crates/hyperqueue/src/server/client/submit.rs b/crates/hyperqueue/src/server/client/submit.rs index 1c2e1215e..e159053c9 100644 --- a/crates/hyperqueue/src/server/client/submit.rs +++ b/crates/hyperqueue/src/server/client/submit.rs @@ -307,8 +307,6 @@ fn build_tasks_array( resources: task_desc.resources.clone(), n_outputs: 0, time_limit: task_desc.time_limit, - keep: false, - observe: true, priority: task_desc.priority, crash_limit: task_desc.crash_limit, }], @@ -362,8 +360,6 @@ fn build_tasks_graph( n_outputs: 0, time_limit: task.time_limit, priority: task.priority, - keep: false, - observe: true, crash_limit: task.crash_limit, }); index diff --git a/crates/tako/benches/utils/mod.rs b/crates/tako/benches/utils/mod.rs index 07f60be3b..54086dd1a 100644 --- a/crates/tako/benches/utils/mod.rs +++ b/crates/tako/benches/utils/mod.rs @@ -18,14 +18,7 @@ pub fn create_task(id: TaskId) -> Task { n_outputs: 0, crash_limit: 5, }; - Task::new( - id, - Default::default(), - Rc::new(conf), - Default::default(), - false, - false, - ) + Task::new(id, Default::default(), Rc::new(conf), Default::default()) } pub fn create_worker(id: u64) -> Worker { Worker::new( diff --git a/crates/tako/src/gateway.rs b/crates/tako/src/gateway.rs index bc3968888..e210552c5 100644 --- a/crates/tako/src/gateway.rs +++ b/crates/tako/src/gateway.rs @@ -106,12 +106,6 @@ pub struct SharedTaskConfiguration { #[serde(default)] pub priority: Priority, - #[serde(default)] - pub keep: bool, - - #[serde(default)] - pub observe: bool, - pub crash_limit: u32, } @@ -136,11 +130,6 @@ pub struct NewTasksMessage { pub adjust_instance_id: Map, } -#[derive(Deserialize, Serialize, Debug)] -pub struct ObserveTasksMessage { - pub tasks: Vec, -} - #[derive(Deserialize, Serialize, Debug)] pub struct TaskInfoRequest { pub tasks: Vec, // If empty, then all tasks are assumed @@ -190,7 +179,6 @@ pub struct NewWorkerQuery { #[serde(tag = "op")] pub enum FromGatewayMessage { NewTasks(NewTasksMessage), - ObserveTasks(ObserveTasksMessage), CancelTasks(CancelTasks), GetTaskInfo(TaskInfoRequest), ServerInfo, @@ -234,10 +222,6 @@ impl Serialize for TaskState { } } -/* User can receive this updates when task is registered with "observe flag" - Note: Error state is NOT there, it is sent separately as TaskFail, - because task fail is received even without "observe" flag. -*/ #[derive(Serialize, Deserialize, Debug)] pub struct TaskUpdate { pub id: TaskId, diff --git a/crates/tako/src/internal/server/client.rs b/crates/tako/src/internal/server/client.rs index bc308f343..9b273c11c 100644 --- a/crates/tako/src/internal/server/client.rs +++ b/crates/tako/src/internal/server/client.rs @@ -3,16 +3,15 @@ use tokio::sync::mpsc::UnboundedSender; use crate::gateway::{ CancelTasksResponse, FromGatewayMessage, NewTasksMessage, NewTasksResponse, - SharedTaskConfiguration, TaskInfo, TaskState, TaskUpdate, TasksInfoResponse, ToGatewayMessage, + SharedTaskConfiguration, TaskInfo, TaskState, TasksInfoResponse, ToGatewayMessage, }; +use crate::internal::common::resources::request::ResourceRequestEntry; use crate::internal::messages::worker::ToWorkerMessage; +use crate::internal::scheduler::query::compute_new_worker_query; use crate::internal::server::comm::{Comm, CommSender, CommSenderRef}; use crate::internal::server::core::{Core, CoreRef}; -use crate::internal::server::reactor::{on_cancel_tasks, on_new_tasks, on_set_observe_flag}; +use crate::internal::server::reactor::{on_cancel_tasks, on_new_tasks}; use crate::internal::server::task::{Task, TaskConfiguration, TaskInput, TaskRuntimeState}; -//use crate::internal::transfer::transport::make_protocol_builder; -use crate::internal::common::resources::request::ResourceRequestEntry; -use crate::internal::scheduler::query::compute_new_worker_query; use std::rc::Rc; use thin_vec::ThinVec; @@ -107,30 +106,6 @@ pub(crate) async fn process_client_message( message: FromGatewayMessage, ) -> Option { match message { - FromGatewayMessage::ObserveTasks(msg) => { - let mut core = core_ref.get_mut(); - let mut comm = comm_ref.get_mut(); - for task_id in msg.tasks { - log::debug!("Client start observing task={}", task_id); - if !on_set_observe_flag(&mut core, &mut *comm, task_id, true) { - log::debug!( - "Client ask for observing of invalid (old?) task={}", - task_id - ); - client_sender - .send(ToGatewayMessage::TaskUpdate(TaskUpdate { - id: task_id, - state: if core.is_used_task_id(task_id) { - TaskState::Finished - } else { - TaskState::Invalid - }, - })) - .unwrap(); - }; - } - None - } FromGatewayMessage::NewTasks(msg) => handle_new_tasks( &mut core_ref.get_mut(), &mut comm_ref.get_mut(), @@ -243,14 +218,12 @@ fn handle_new_tasks( .into_iter() .map(|c| { assert_eq!(c.n_outputs, 0); // TODO: Implementation for more outputs - let keep = c.keep; - let observe = c.observe; - (Rc::new(create_task_configuration(core, c)), keep, observe) + Rc::new(create_task_configuration(core, c)) }) .collect(); for cfg in &configurations { - if let Err(e) = cfg.0.resources.validate() { + if let Err(e) = cfg.resources.validate() { return Some(format!("Invalid task request {e:?}")); } } @@ -264,13 +237,13 @@ fn handle_new_tasks( if idx >= configurations.len() { return Some(format!("Invalid configuration index {idx}")); } - let (conf, keep, observe) = &configurations[idx]; + let conf = &configurations[idx]; let inputs: ThinVec<_> = task .task_deps .iter() .map(|&task_id| TaskInput::new_task_dependency(task_id)) .collect(); - let task = Task::new(task.id, inputs, conf.clone(), task.body, *keep, *observe); + let task = Task::new(task.id, inputs, conf.clone(), task.body); tasks.push(task); } if !msg.adjust_instance_id.is_empty() { diff --git a/crates/tako/src/internal/server/reactor.rs b/crates/tako/src/internal/server/reactor.rs index 9160c1f37..6d95deb43 100644 --- a/crates/tako/src/internal/server/reactor.rs +++ b/crates/tako/src/internal/server/reactor.rs @@ -241,9 +241,7 @@ pub(crate) fn on_task_running( } }; - if task.is_observed() { - comm.send_client_task_started(task_id, task.instance_id, worker_ids, context); - } + comm.send_client_task_started(task_id, task.instance_id, worker_ids, context); } } @@ -302,10 +300,7 @@ pub(crate) fn on_task_finished( task.state = TaskRuntimeState::Finished(FinishInfo {}); comm.ask_for_scheduling(); - - if task.is_observed() { - comm.send_client_task_finished(task.id); - } + comm.send_client_task_finished(task.id); } else { log::debug!("Unknown task finished id={}", msg.id); return; @@ -409,30 +404,6 @@ pub(crate) fn on_steal_response( } } -#[cfg(test)] // The current version of HQ does not use it, it is now used only in tests -pub(crate) fn on_reset_keep_flag(core: &mut Core, comm: &mut impl Comm, task_id: TaskId) { - let task = core.get_task_mut(task_id); - task.set_keep_flag(false); - remove_task_if_possible(core, comm, task_id); -} - -pub(crate) fn on_set_observe_flag( - core: &mut Core, - comm: &mut impl Comm, - task_id: TaskId, - value: bool, -) -> bool { - if let Some(task) = core.find_task_mut(task_id) { - if value && task.is_finished() { - comm.send_client_task_finished(task_id); - } - task.set_observed_flag(value); - true - } else { - false - } -} - fn fail_task_helper( core: &mut Core, comm: &mut impl Comm, @@ -529,7 +500,6 @@ pub(crate) fn on_cancel_tasks( continue; } - let mut remove = false; { let (tasks, workers) = core.split_tasks_workers_mut(); let task = tasks.get_task(task_id); @@ -564,17 +534,10 @@ pub(crate) fn on_cancel_tasks( running_ids.entry(from_id).or_default().push(task_id); } TaskRuntimeState::Finished(_) => { - if task.is_keeped() { - remove = true; - } already_finished.push(task_id); } }; } - if remove { - core.get_task_mut(task_id).set_keep_flag(false); - remove_task_if_possible(core, comm, task_id); - } } for &task_id in &to_unregister { @@ -609,7 +572,6 @@ fn remove_task_if_possible(core: &mut Core, _comm: &mut impl Comm, task_id: Task if !core.get_task(task_id).is_removable() { return; } - match core.remove_task(task_id) { TaskRuntimeState::Finished(_finfo) => { /* Ok */ } _ => unreachable!(), diff --git a/crates/tako/src/internal/server/task.rs b/crates/tako/src/internal/server/task.rs index ae62c3756..8b3131b23 100644 --- a/crates/tako/src/internal/server/task.rs +++ b/crates/tako/src/internal/server/task.rs @@ -47,13 +47,10 @@ impl fmt::Debug for TaskRuntimeState { bitflags::bitflags! { #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct TaskFlags: u32 { - const KEEP = 0b00000001; - const OBSERVE = 0b00000010; - - const FRESH = 0b00000100; + const FRESH = 0b00000001; // This is utilized inside scheduler, it has no meaning between scheduler calls - const TAKE = 0b00001000; + const TAKE = 0b00000010; } } @@ -129,14 +126,10 @@ impl Task { inputs: ThinVec, configuration: Rc, body: Box<[u8]>, - keep: bool, - observe: bool, ) -> Self { log::debug!("New task {} {:?}", id, &configuration.resources); let mut flags = TaskFlags::empty(); - flags.set(TaskFlags::KEEP, keep); - flags.set(TaskFlags::OBSERVE, observe); flags.set(TaskFlags::FRESH, true); Self { @@ -211,36 +204,16 @@ impl Task { &self.consumers } - #[inline] - pub(crate) fn set_keep_flag(&mut self, value: bool) { - self.flags.set(TaskFlags::KEEP, value); - } - #[inline] pub(crate) fn set_take_flag(&mut self, value: bool) { self.flags.set(TaskFlags::TAKE, value); } - #[inline] - pub(crate) fn set_observed_flag(&mut self, value: bool) { - self.flags.set(TaskFlags::OBSERVE, value); - } - #[inline] pub(crate) fn set_fresh_flag(&mut self, value: bool) { self.flags.set(TaskFlags::FRESH, value); } - #[inline] - pub(crate) fn is_observed(&self) -> bool { - self.flags.contains(TaskFlags::OBSERVE) - } - - #[inline] - pub(crate) fn is_keeped(&self) -> bool { - self.flags.contains(TaskFlags::KEEP) - } - #[inline] pub(crate) fn is_fresh(&self) -> bool { self.flags.contains(TaskFlags::FRESH) @@ -253,7 +226,7 @@ impl Task { #[inline] pub(crate) fn is_removable(&self) -> bool { - self.consumers.is_empty() && !self.is_keeped() && self.is_finished() + self.consumers.is_empty() && self.is_finished() } pub(crate) fn collect_consumers(&self, taskmap: &TaskMap) -> Set { diff --git a/crates/tako/src/internal/tests/integration/test_basic.rs b/crates/tako/src/internal/tests/integration/test_basic.rs index 93189e678..6de0f489b 100644 --- a/crates/tako/src/internal/tests/integration/test_basic.rs +++ b/crates/tako/src/internal/tests/integration/test_basic.rs @@ -27,11 +27,7 @@ async fn test_submit_simple_task_ok() { let ids = handler .submit( GraphBuilder::default() - .task( - TaskConfigBuilder::default() - .args(simple_args(&["/bin/hostname"])) - .keep(true), - ) + .task(TaskConfigBuilder::default().args(simple_args(&["/bin/hostname"]))) .simple_task(&["/bin/hostname"]) .task( TaskConfigBuilder::default() @@ -134,8 +130,6 @@ async fn test_cancel_error_task() { let response = cancel(&mut handle, &[1]).await; assert_eq!(response.already_finished, vec![1].to_ids()); - - assert!(handle.wait(&[1]).await.get(1).is_invalid()); }) .await; } diff --git a/crates/tako/src/internal/tests/integration/utils/api.rs b/crates/tako/src/internal/tests/integration/utils/api.rs index 572bd325f..233ca6f45 100644 --- a/crates/tako/src/internal/tests/integration/utils/api.rs +++ b/crates/tako/src/internal/tests/integration/utils/api.rs @@ -80,10 +80,7 @@ pub async fn wait_for_task_start>( #[derive(Debug)] pub enum TaskResult { Update(TaskState), - Fail { - cancelled_tasks: Vec, - info: TaskFailInfo, - }, + Fail { info: TaskFailInfo }, } impl TaskResult { @@ -91,10 +88,6 @@ impl TaskResult { matches!(self, TaskResult::Update(TaskState::Finished)) } - pub fn is_invalid(&self) -> bool { - matches!(self, TaskResult::Update(TaskState::Invalid)) - } - pub fn is_failed(&self) -> bool { matches!(self, TaskResult::Fail { .. }) } @@ -118,10 +111,6 @@ impl TaskWaitResult { self.events.iter().any(|v| v.is_failed()) } - pub fn is_invalid(&self) -> bool { - self.events.iter().any(|v| v.is_invalid()) - } - pub fn assert_error_message(&self, needle: &str) { for event in &self.events { if let TaskResult::Fail { info, .. } = event { @@ -188,10 +177,7 @@ pub async fn wait_for_tasks>( .tasks .entry(msg.id) .or_default() - .add(TaskResult::Fail { - cancelled_tasks: msg.cancelled_tasks, - info: msg.info, - }); + .add(TaskResult::Fail { info: msg.info }); } ToGatewayMessage::Error(msg) => panic!( "Received error message {:?} while waiting for tasks", diff --git a/crates/tako/src/internal/tests/integration/utils/server.rs b/crates/tako/src/internal/tests/integration/utils/server.rs index 842fdf99d..3c742c2fd 100644 --- a/crates/tako/src/internal/tests/integration/utils/server.rs +++ b/crates/tako/src/internal/tests/integration/utils/server.rs @@ -10,8 +10,8 @@ use tokio::task::{JoinHandle, LocalSet}; use tokio::time::timeout; use crate::gateway::{ - FromGatewayMessage, NewTasksMessage, NewTasksResponse, ObserveTasksMessage, - SharedTaskConfiguration, StopWorkerRequest, TaskConfiguration, ToGatewayMessage, + FromGatewayMessage, NewTasksMessage, NewTasksResponse, SharedTaskConfiguration, + StopWorkerRequest, TaskConfiguration, ToGatewayMessage, }; use crate::internal::common::{Map, Set}; use crate::internal::server::client::process_client_message; @@ -205,10 +205,6 @@ impl ServerHandle { } pub async fn wait + Copy>(&mut self, tasks: &[T]) -> TaskWaitResultMap { - let msg = ObserveTasksMessage { - tasks: tasks.iter().map(|&v| v.into()).collect(), - }; - self.send(FromGatewayMessage::ObserveTasks(msg)).await; timeout(WAIT_TIMEOUT, wait_for_tasks(self, tasks.to_vec())) .await .unwrap() diff --git a/crates/tako/src/internal/tests/integration/utils/task.rs b/crates/tako/src/internal/tests/integration/utils/task.rs index 52b0b69a4..735f43063 100644 --- a/crates/tako/src/internal/tests/integration/utils/task.rs +++ b/crates/tako/src/internal/tests/integration/utils/task.rs @@ -76,8 +76,6 @@ pub fn build_task_def_from_config( ) -> (TaskConfiguration, SharedTaskConfiguration) { let TaskConfig { id, - keep, - observe, time_limit, resources, args, @@ -115,8 +113,6 @@ pub fn build_task_def_from_config( n_outputs: 0, time_limit, priority: 0, - keep, - observe: observe.unwrap_or(true), crash_limit: 5, }; ( @@ -136,11 +132,6 @@ pub struct TaskConfig { #[builder(default)] pub id: Option, - #[builder(default)] - keep: bool, - #[builder(default)] - observe: Option, - #[builder(default)] time_limit: Option, diff --git a/crates/tako/src/internal/tests/integration/utils/worker.rs b/crates/tako/src/internal/tests/integration/utils/worker.rs index 318cda721..9e8bdaf20 100644 --- a/crates/tako/src/internal/tests/integration/utils/worker.rs +++ b/crates/tako/src/internal/tests/integration/utils/worker.rs @@ -91,7 +91,6 @@ pub(super) fn create_worker_configuration( /// This data is available for tests pub struct WorkerHandle { pub workdir: PathBuf, - pub logdir: PathBuf, pub id: WorkerId, control_tx: tokio::sync::mpsc::Sender, } @@ -149,7 +148,6 @@ pub(super) async fn start_worker( let (mut configuration, worker_secret_key) = create_worker_configuration(config); let tmpdir = TempDir::with_prefix("tako")?; let workdir = tmpdir.path().to_path_buf().join("work"); - let logdir = tmpdir.path().to_path_buf().join("logs"); configuration.work_dir.clone_from(&workdir); std::fs::create_dir_all(&configuration.work_dir).unwrap(); @@ -218,7 +216,6 @@ pub(super) async fn start_worker( let handle = WorkerHandle { workdir, - logdir, id: worker_id, control_tx, }; diff --git a/crates/tako/src/internal/tests/test_reactor.rs b/crates/tako/src/internal/tests/test_reactor.rs index 7cfbe0662..1c6bc61b6 100644 --- a/crates/tako/src/internal/tests/test_reactor.rs +++ b/crates/tako/src/internal/tests/test_reactor.rs @@ -11,8 +11,8 @@ use crate::internal::messages::worker::{StealResponse, StealResponseMsg}; use crate::internal::scheduler::state::SchedulerState; use crate::internal::server::core::Core; use crate::internal::server::reactor::{ - on_cancel_tasks, on_new_tasks, on_new_worker, on_remove_worker, on_reset_keep_flag, - on_set_observe_flag, on_steal_response, on_task_error, on_task_finished, on_task_running, + on_cancel_tasks, on_new_tasks, on_new_worker, on_remove_worker, on_steal_response, + on_task_error, on_task_finished, on_task_running, }; use crate::internal::server::task::{Task, TaskRuntimeState}; use crate::internal::server::worker::Worker; @@ -193,12 +193,10 @@ fn test_assignments_and_finish() { let t1 = TaskBuilder::new(11).user_priority(12).outputs(1).build(); let t2 = task(12); - let mut t3 = task_with_deps(13, &[&t1, &t2], 1); - t3.set_keep_flag(true); + let t3 = task_with_deps(13, &[&t1, &t2], 1); let t4 = task(14); let t5 = task(15); - let mut t7 = task_with_deps(17, &[&t4], 1); - t7.set_keep_flag(true); + let t7 = task_with_deps(17, &[&t4], 1); let (id1, id2, id3, id5, id7) = (t1.id, t2.id, t3.id, t5.id, t7.id); @@ -265,6 +263,7 @@ fn test_assignments_and_finish() { check_worker_tasks_exact(&core, 102, &[]); comm.check_need_scheduling(); + assert_eq!(comm.take_client_task_finished(1)[0], TaskId::new(15)); comm.emptiness_check(); assert!(core.find_task(15.into()).is_none()); @@ -285,6 +284,7 @@ fn test_assignments_and_finish() { check_worker_tasks_exact(&core, 102, &[]); comm.check_need_scheduling(); + assert_eq!(comm.take_client_task_finished(1)[0], TaskId::new(12)); comm.emptiness_check(); assert!(core.find_task(12.into()).is_some()); @@ -307,11 +307,10 @@ fn test_assignments_and_finish() { ToWorkerMessage::ComputeTask(ComputeTaskMsg { id: TaskId(13), .. }) )); + assert_eq!(comm.take_client_task_finished(1)[0], TaskId::new(11)); comm.emptiness_check(); core.sanity_check(); - on_set_observe_flag(&mut core, &mut comm, 13.into(), true); - on_task_finished( &mut core, &mut comm, @@ -322,14 +321,9 @@ fn test_assignments_and_finish() { comm.check_need_scheduling(); assert_eq!(comm.take_client_task_finished(1), vec![13].to_ids()); - - comm.emptiness_check(); - - on_reset_keep_flag(&mut core, &mut comm, 13.into()); comm.emptiness_check(); core.sanity_check(); - on_reset_keep_flag(&mut core, &mut comm, 17.into()); comm.emptiness_check(); core.sanity_check(); } @@ -489,6 +483,7 @@ fn finish_task_without_outputs() { TaskFinishedMsg { id: 1.into() }, ); comm.check_need_scheduling(); + assert_eq!(comm.take_client_task_finished(1)[0], TaskId::new(1)); comm.emptiness_check(); core.sanity_check(); } @@ -744,7 +739,6 @@ fn test_running_task() { let mut comm = create_test_comm(); - on_set_observe_flag(&mut core, &mut comm, 1.into(), true); comm.emptiness_check(); on_task_running(&mut core, &mut comm, 101.into(), task_running_msg(1)); @@ -752,6 +746,7 @@ fn test_running_task() { comm.emptiness_check(); on_task_running(&mut core, &mut comm, 101.into(), task_running_msg(2)); + assert_eq!(comm.take_client_task_running(1)[0], TaskId::new(2)); comm.emptiness_check(); assert!(matches!( @@ -808,6 +803,7 @@ fn test_finished_before_steal_response() { ); comm.check_need_scheduling(); + assert_eq!(comm.take_client_task_finished(1)[0], TaskId::new(1)); comm.emptiness_check(); assert!(!worker_has_task(&core, 101, 1)); @@ -841,6 +837,7 @@ fn test_running_before_steal_response() { let mut comm = create_test_comm(); on_task_running(&mut core, &mut comm, 101.into(), task_running_msg(1)); comm.check_need_scheduling(); + assert_eq!(comm.take_client_task_running(1)[0], TaskId::new(1)); comm.emptiness_check(); assert!(worker_has_task(&core, 101, 1)); diff --git a/crates/tako/src/internal/tests/utils/task.rs b/crates/tako/src/internal/tests/utils/task.rs index d552013d7..65b6766b5 100644 --- a/crates/tako/src/internal/tests/utils/task.rs +++ b/crates/tako/src/internal/tests/utils/task.rs @@ -110,8 +110,6 @@ impl TaskBuilder { crash_limit: self.crash_limit, }), Default::default(), - false, - false, ) } } diff --git a/crates/tako/src/internal/tests/utils/workflows.rs b/crates/tako/src/internal/tests/utils/workflows.rs index 1e5f49b1c..4945debcb 100644 --- a/crates/tako/src/internal/tests/utils/workflows.rs +++ b/crates/tako/src/internal/tests/utils/workflows.rs @@ -16,12 +16,10 @@ pub fn submit_example_1(core: &mut Core) { */ let t1 = task::task(11); - let mut t2 = task::task(12); - t2.set_keep_flag(true); + let t2 = task::task(12); let t3 = task_with_deps(13, &[&t1, &t2], 1); let t4 = task_with_deps(14, &[&t2], 1); - let mut t5 = task_with_deps(15, &[&t3, &t4], 1); - t5.set_keep_flag(true); + let t5 = task_with_deps(15, &[&t3, &t4], 1); let t6 = task_with_deps(16, &[&t3], 1); let t7 = task_with_deps(17, &[&t6], 1); submit_test_tasks(core, vec![t1, t2, t3, t4, t5, t6, t7]);