From 2c658373fdc0a837a5e665e5794808a202387c1d Mon Sep 17 00:00:00 2001 From: Ada Bohm Date: Mon, 18 Dec 2023 18:28:09 +0100 Subject: [PATCH] Memory optimization for long-term stored objects --- crates/hyperqueue/src/server/client/submit.rs | 8 +++- crates/hyperqueue/src/transfer/messages.rs | 41 +++++++++++++++++++ crates/tako/src/program.rs | 25 +++++++++++ 3 files changed, 72 insertions(+), 2 deletions(-) diff --git a/crates/hyperqueue/src/server/client/submit.rs b/crates/hyperqueue/src/server/client/submit.rs index 0897a61f5..c5cc6de2e 100644 --- a/crates/hyperqueue/src/server/client/submit.rs +++ b/crates/hyperqueue/src/server/client/submit.rs @@ -56,8 +56,10 @@ pub async fn handle_submit( submit_dir: &submit_dir, }; + let small_job_desc = job_desc.clone_without_large_data(); + let new_tasks: anyhow::Result = { - match job_desc.clone() { + match job_desc { JobDescription::Array { ids, entries, @@ -76,7 +78,7 @@ pub async fn handle_submit( }; let job = Job::new( - job_desc, + small_job_desc, job_id, tako_base_id, name, @@ -84,10 +86,12 @@ pub async fn handle_submit( log.clone(), submit_dir, ); + let job_detail = job.make_job_detail(Some(&TaskSelector { id_selector: TaskIdSelector::All, status_selector: TaskStatusSelector::All, })); + state_ref.get_mut().add_job(job); if let Some(log) = log { diff --git a/crates/hyperqueue/src/transfer/messages.rs b/crates/hyperqueue/src/transfer/messages.rs index fe0ed4d18..c1635af55 100644 --- a/crates/hyperqueue/src/transfer/messages.rs +++ b/crates/hyperqueue/src/transfer/messages.rs @@ -77,6 +77,20 @@ pub struct TaskDescription { pub crash_limit: u32, } +impl TaskDescription { + pub fn clone_without_large_data(&self) -> TaskDescription { + TaskDescription { + program: self.program.clone_without_large_data(), + resources: self.resources.clone(), + pin_mode: self.pin_mode.clone(), + task_dir: self.task_dir, + time_limit: self.time_limit.clone(), + priority: self.priority, + crash_limit: self.crash_limit, + } + } +} + #[derive(Serialize, Deserialize, Debug, Clone)] pub struct TaskWithDependencies { pub id: JobTaskId, @@ -84,6 +98,16 @@ pub struct TaskWithDependencies { pub dependencies: Vec, } +impl TaskWithDependencies { + pub fn clone_without_large_data(&self) -> TaskWithDependencies { + TaskWithDependencies { + id: self.id, + task_desc: self.task_desc.clone_without_large_data(), + dependencies: self.dependencies.clone(), + } + } +} + #[allow(clippy::large_enum_variant)] #[derive(Serialize, Deserialize, Debug, Clone)] pub enum JobDescription { @@ -104,6 +128,23 @@ impl JobDescription { JobDescription::Graph { tasks } => tasks.len() as JobTaskCount, } } + + pub fn clone_without_large_data(&self) -> JobDescription { + match self { + JobDescription::Array { + ids, + entries: _, + task_desc, + } => JobDescription::Array { + ids: ids.clone(), + entries: None, // Forget entries! + task_desc: task_desc.clone_without_large_data(), + }, + JobDescription::Graph { tasks } => JobDescription::Graph { + tasks: tasks.iter().map(|t| t.clone_without_large_data()).collect(), + }, + } + } } #[derive(Serialize, Deserialize, Debug)] diff --git a/crates/tako/src/program.rs b/crates/tako/src/program.rs index b773fd9a9..e92afa897 100644 --- a/crates/tako/src/program.rs +++ b/crates/tako/src/program.rs @@ -61,3 +61,28 @@ pub struct ProgramDefinition { #[serde(default)] pub cwd: PathBuf, } + +fn shortened_bstring(str: &BString) -> BString { + if str.len() < 256 { + str.clone() + } else { + format!("<{} bytes>", str.len()).into() + } +} + +impl ProgramDefinition { + pub fn clone_without_large_data(&self) -> ProgramDefinition { + ProgramDefinition { + args: self.args.iter().take(128).map(shortened_bstring).collect(), + env: self + .env + .iter() + .map(|(k, v)| (shortened_bstring(k), shortened_bstring(v))) + .collect(), + stdout: self.stdout.clone(), + stderr: self.stderr.clone(), + stdin: Vec::new(), // Forget stdin + cwd: self.cwd.clone(), + } + } +}