Skip to content

Commit

Permalink
Removed some very old and obsolete code from Tako
Browse files Browse the repository at this point in the history
  • Loading branch information
spirali committed Oct 7, 2024
1 parent a2fa062 commit dcb1be6
Show file tree
Hide file tree
Showing 14 changed files with 32 additions and 194 deletions.
4 changes: 0 additions & 4 deletions crates/hyperqueue/src/server/client/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,6 @@ fn build_tasks_array(
resources: task_desc.resources.clone(),
n_outputs: 0,
time_limit: task_desc.time_limit,
keep: false,
observe: true,
priority: task_desc.priority,
crash_limit: task_desc.crash_limit,
}],
Expand Down Expand Up @@ -362,8 +360,6 @@ fn build_tasks_graph(
n_outputs: 0,
time_limit: task.time_limit,
priority: task.priority,
keep: false,
observe: true,
crash_limit: task.crash_limit,
});
index
Expand Down
9 changes: 1 addition & 8 deletions crates/tako/benches/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,7 @@ pub fn create_task(id: TaskId) -> Task {
n_outputs: 0,
crash_limit: 5,
};
Task::new(
id,
Default::default(),
Rc::new(conf),
Default::default(),
false,
false,
)
Task::new(id, Default::default(), Rc::new(conf), Default::default())
}
pub fn create_worker(id: u64) -> Worker {
Worker::new(
Expand Down
16 changes: 0 additions & 16 deletions crates/tako/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,6 @@ pub struct SharedTaskConfiguration {
#[serde(default)]
pub priority: Priority,

#[serde(default)]
pub keep: bool,

#[serde(default)]
pub observe: bool,

pub crash_limit: u32,
}

Expand All @@ -136,11 +130,6 @@ pub struct NewTasksMessage {
pub adjust_instance_id: Map<TaskId, InstanceId>,
}

#[derive(Deserialize, Serialize, Debug)]
pub struct ObserveTasksMessage {
pub tasks: Vec<TaskId>,
}

#[derive(Deserialize, Serialize, Debug)]
pub struct TaskInfoRequest {
pub tasks: Vec<TaskId>, // If empty, then all tasks are assumed
Expand Down Expand Up @@ -190,7 +179,6 @@ pub struct NewWorkerQuery {
#[serde(tag = "op")]
pub enum FromGatewayMessage {
NewTasks(NewTasksMessage),
ObserveTasks(ObserveTasksMessage),
CancelTasks(CancelTasks),
GetTaskInfo(TaskInfoRequest),
ServerInfo,
Expand Down Expand Up @@ -234,10 +222,6 @@ impl Serialize for TaskState {
}
}

/* User can receive this updates when task is registered with "observe flag"
Note: Error state is NOT there, it is sent separately as TaskFail,
because task fail is received even without "observe" flag.
*/
#[derive(Serialize, Deserialize, Debug)]
pub struct TaskUpdate {
pub id: TaskId,
Expand Down
43 changes: 8 additions & 35 deletions crates/tako/src/internal/server/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@ use tokio::sync::mpsc::UnboundedSender;

use crate::gateway::{
CancelTasksResponse, FromGatewayMessage, NewTasksMessage, NewTasksResponse,
SharedTaskConfiguration, TaskInfo, TaskState, TaskUpdate, TasksInfoResponse, ToGatewayMessage,
SharedTaskConfiguration, TaskInfo, TaskState, TasksInfoResponse, ToGatewayMessage,
};
use crate::internal::common::resources::request::ResourceRequestEntry;
use crate::internal::messages::worker::ToWorkerMessage;
use crate::internal::scheduler::query::compute_new_worker_query;
use crate::internal::server::comm::{Comm, CommSender, CommSenderRef};
use crate::internal::server::core::{Core, CoreRef};
use crate::internal::server::reactor::{on_cancel_tasks, on_new_tasks, on_set_observe_flag};
use crate::internal::server::reactor::{on_cancel_tasks, on_new_tasks};
use crate::internal::server::task::{Task, TaskConfiguration, TaskInput, TaskRuntimeState};
//use crate::internal::transfer::transport::make_protocol_builder;
use crate::internal::common::resources::request::ResourceRequestEntry;
use crate::internal::scheduler::query::compute_new_worker_query;
use std::rc::Rc;
use thin_vec::ThinVec;

Expand Down Expand Up @@ -107,30 +106,6 @@ pub(crate) async fn process_client_message(
message: FromGatewayMessage,
) -> Option<String> {
match message {
FromGatewayMessage::ObserveTasks(msg) => {
let mut core = core_ref.get_mut();
let mut comm = comm_ref.get_mut();
for task_id in msg.tasks {
log::debug!("Client start observing task={}", task_id);
if !on_set_observe_flag(&mut core, &mut *comm, task_id, true) {
log::debug!(
"Client ask for observing of invalid (old?) task={}",
task_id
);
client_sender
.send(ToGatewayMessage::TaskUpdate(TaskUpdate {
id: task_id,
state: if core.is_used_task_id(task_id) {
TaskState::Finished
} else {
TaskState::Invalid
},
}))
.unwrap();
};
}
None
}
FromGatewayMessage::NewTasks(msg) => handle_new_tasks(
&mut core_ref.get_mut(),
&mut comm_ref.get_mut(),
Expand Down Expand Up @@ -243,14 +218,12 @@ fn handle_new_tasks(
.into_iter()
.map(|c| {
assert_eq!(c.n_outputs, 0); // TODO: Implementation for more outputs
let keep = c.keep;
let observe = c.observe;
(Rc::new(create_task_configuration(core, c)), keep, observe)
Rc::new(create_task_configuration(core, c))
})
.collect();

for cfg in &configurations {
if let Err(e) = cfg.0.resources.validate() {
if let Err(e) = cfg.resources.validate() {
return Some(format!("Invalid task request {e:?}"));
}
}
Expand All @@ -264,13 +237,13 @@ fn handle_new_tasks(
if idx >= configurations.len() {
return Some(format!("Invalid configuration index {idx}"));
}
let (conf, keep, observe) = &configurations[idx];
let conf = &configurations[idx];
let inputs: ThinVec<_> = task
.task_deps
.iter()
.map(|&task_id| TaskInput::new_task_dependency(task_id))
.collect();
let task = Task::new(task.id, inputs, conf.clone(), task.body, *keep, *observe);
let task = Task::new(task.id, inputs, conf.clone(), task.body);
tasks.push(task);
}
if !msg.adjust_instance_id.is_empty() {
Expand Down
42 changes: 2 additions & 40 deletions crates/tako/src/internal/server/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,7 @@ pub(crate) fn on_task_running(
}
};

if task.is_observed() {
comm.send_client_task_started(task_id, task.instance_id, worker_ids, context);
}
comm.send_client_task_started(task_id, task.instance_id, worker_ids, context);
}
}

Expand Down Expand Up @@ -302,10 +300,7 @@ pub(crate) fn on_task_finished(

task.state = TaskRuntimeState::Finished(FinishInfo {});
comm.ask_for_scheduling();

if task.is_observed() {
comm.send_client_task_finished(task.id);
}
comm.send_client_task_finished(task.id);
} else {
log::debug!("Unknown task finished id={}", msg.id);
return;
Expand Down Expand Up @@ -409,30 +404,6 @@ pub(crate) fn on_steal_response(
}
}

#[cfg(test)] // The current version of HQ does not use it, it is now used only in tests
pub(crate) fn on_reset_keep_flag(core: &mut Core, comm: &mut impl Comm, task_id: TaskId) {
let task = core.get_task_mut(task_id);
task.set_keep_flag(false);
remove_task_if_possible(core, comm, task_id);
}

pub(crate) fn on_set_observe_flag(
core: &mut Core,
comm: &mut impl Comm,
task_id: TaskId,
value: bool,
) -> bool {
if let Some(task) = core.find_task_mut(task_id) {
if value && task.is_finished() {
comm.send_client_task_finished(task_id);
}
task.set_observed_flag(value);
true
} else {
false
}
}

fn fail_task_helper(
core: &mut Core,
comm: &mut impl Comm,
Expand Down Expand Up @@ -529,7 +500,6 @@ pub(crate) fn on_cancel_tasks(
continue;
}

let mut remove = false;
{
let (tasks, workers) = core.split_tasks_workers_mut();
let task = tasks.get_task(task_id);
Expand Down Expand Up @@ -564,17 +534,10 @@ pub(crate) fn on_cancel_tasks(
running_ids.entry(from_id).or_default().push(task_id);
}
TaskRuntimeState::Finished(_) => {
if task.is_keeped() {
remove = true;
}
already_finished.push(task_id);
}
};
}
if remove {
core.get_task_mut(task_id).set_keep_flag(false);
remove_task_if_possible(core, comm, task_id);
}
}

for &task_id in &to_unregister {
Expand Down Expand Up @@ -609,7 +572,6 @@ fn remove_task_if_possible(core: &mut Core, _comm: &mut impl Comm, task_id: Task
if !core.get_task(task_id).is_removable() {
return;
}

match core.remove_task(task_id) {
TaskRuntimeState::Finished(_finfo) => { /* Ok */ }
_ => unreachable!(),
Expand Down
33 changes: 3 additions & 30 deletions crates/tako/src/internal/server/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,10 @@ impl fmt::Debug for TaskRuntimeState {
bitflags::bitflags! {
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct TaskFlags: u32 {
const KEEP = 0b00000001;
const OBSERVE = 0b00000010;

const FRESH = 0b00000100;
const FRESH = 0b00000001;

// This is utilized inside scheduler, it has no meaning between scheduler calls
const TAKE = 0b00001000;
const TAKE = 0b00000010;
}
}

Expand Down Expand Up @@ -129,14 +126,10 @@ impl Task {
inputs: ThinVec<TaskInput>,
configuration: Rc<TaskConfiguration>,
body: Box<[u8]>,
keep: bool,
observe: bool,
) -> Self {
log::debug!("New task {} {:?}", id, &configuration.resources);

let mut flags = TaskFlags::empty();
flags.set(TaskFlags::KEEP, keep);
flags.set(TaskFlags::OBSERVE, observe);
flags.set(TaskFlags::FRESH, true);

Self {
Expand Down Expand Up @@ -211,36 +204,16 @@ impl Task {
&self.consumers
}

#[inline]
pub(crate) fn set_keep_flag(&mut self, value: bool) {
self.flags.set(TaskFlags::KEEP, value);
}

#[inline]
pub(crate) fn set_take_flag(&mut self, value: bool) {
self.flags.set(TaskFlags::TAKE, value);
}

#[inline]
pub(crate) fn set_observed_flag(&mut self, value: bool) {
self.flags.set(TaskFlags::OBSERVE, value);
}

#[inline]
pub(crate) fn set_fresh_flag(&mut self, value: bool) {
self.flags.set(TaskFlags::FRESH, value);
}

#[inline]
pub(crate) fn is_observed(&self) -> bool {
self.flags.contains(TaskFlags::OBSERVE)
}

#[inline]
pub(crate) fn is_keeped(&self) -> bool {
self.flags.contains(TaskFlags::KEEP)
}

#[inline]
pub(crate) fn is_fresh(&self) -> bool {
self.flags.contains(TaskFlags::FRESH)
Expand All @@ -253,7 +226,7 @@ impl Task {

#[inline]
pub(crate) fn is_removable(&self) -> bool {
self.consumers.is_empty() && !self.is_keeped() && self.is_finished()
self.consumers.is_empty() && self.is_finished()
}

pub(crate) fn collect_consumers(&self, taskmap: &TaskMap) -> Set<TaskId> {
Expand Down
8 changes: 1 addition & 7 deletions crates/tako/src/internal/tests/integration/test_basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,7 @@ async fn test_submit_simple_task_ok() {
let ids = handler
.submit(
GraphBuilder::default()
.task(
TaskConfigBuilder::default()
.args(simple_args(&["/bin/hostname"]))
.keep(true),
)
.task(TaskConfigBuilder::default().args(simple_args(&["/bin/hostname"])))
.simple_task(&["/bin/hostname"])
.task(
TaskConfigBuilder::default()
Expand Down Expand Up @@ -134,8 +130,6 @@ async fn test_cancel_error_task() {

let response = cancel(&mut handle, &[1]).await;
assert_eq!(response.already_finished, vec![1].to_ids());

assert!(handle.wait(&[1]).await.get(1).is_invalid());
})
.await;
}
Expand Down
18 changes: 2 additions & 16 deletions crates/tako/src/internal/tests/integration/utils/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,21 +80,14 @@ pub async fn wait_for_task_start<T: Into<TaskId>>(
#[derive(Debug)]
pub enum TaskResult {
Update(TaskState),
Fail {
cancelled_tasks: Vec<TaskId>,
info: TaskFailInfo,
},
Fail { info: TaskFailInfo },
}

impl TaskResult {
pub fn is_finished(&self) -> bool {
matches!(self, TaskResult::Update(TaskState::Finished))
}

pub fn is_invalid(&self) -> bool {
matches!(self, TaskResult::Update(TaskState::Invalid))
}

pub fn is_failed(&self) -> bool {
matches!(self, TaskResult::Fail { .. })
}
Expand All @@ -118,10 +111,6 @@ impl TaskWaitResult {
self.events.iter().any(|v| v.is_failed())
}

pub fn is_invalid(&self) -> bool {
self.events.iter().any(|v| v.is_invalid())
}

pub fn assert_error_message(&self, needle: &str) {
for event in &self.events {
if let TaskResult::Fail { info, .. } = event {
Expand Down Expand Up @@ -188,10 +177,7 @@ pub async fn wait_for_tasks<T: Into<TaskId>>(
.tasks
.entry(msg.id)
.or_default()
.add(TaskResult::Fail {
cancelled_tasks: msg.cancelled_tasks,
info: msg.info,
});
.add(TaskResult::Fail { info: msg.info });
}
ToGatewayMessage::Error(msg) => panic!(
"Received error message {:?} while waiting for tasks",
Expand Down
Loading

0 comments on commit dcb1be6

Please sign in to comment.