Skip to content

Commit

Permalink
Memory optimization for long-term stored objects
Browse files Browse the repository at this point in the history
  • Loading branch information
spirali committed Dec 19, 2023
1 parent fdc8772 commit 2c65837
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 2 deletions.
8 changes: 6 additions & 2 deletions crates/hyperqueue/src/server/client/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@ pub async fn handle_submit(
submit_dir: &submit_dir,
};

let small_job_desc = job_desc.clone_without_large_data();

let new_tasks: anyhow::Result<NewTasksMessage> = {
match job_desc.clone() {
match job_desc {
JobDescription::Array {
ids,
entries,
Expand All @@ -76,18 +78,20 @@ pub async fn handle_submit(
};

let job = Job::new(
job_desc,
small_job_desc,
job_id,
tako_base_id,
name,
max_fails,
log.clone(),
submit_dir,
);

let job_detail = job.make_job_detail(Some(&TaskSelector {
id_selector: TaskIdSelector::All,
status_selector: TaskStatusSelector::All,
}));

state_ref.get_mut().add_job(job);

if let Some(log) = log {
Expand Down
41 changes: 41 additions & 0 deletions crates/hyperqueue/src/transfer/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,37 @@ pub struct TaskDescription {
pub crash_limit: u32,
}

impl TaskDescription {
pub fn clone_without_large_data(&self) -> TaskDescription {
TaskDescription {
program: self.program.clone_without_large_data(),
resources: self.resources.clone(),
pin_mode: self.pin_mode.clone(),
task_dir: self.task_dir,
time_limit: self.time_limit.clone(),
priority: self.priority,
crash_limit: self.crash_limit,
}
}
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TaskWithDependencies {
pub id: JobTaskId,
pub task_desc: TaskDescription,
pub dependencies: Vec<JobTaskId>,
}

impl TaskWithDependencies {
pub fn clone_without_large_data(&self) -> TaskWithDependencies {
TaskWithDependencies {
id: self.id,
task_desc: self.task_desc.clone_without_large_data(),
dependencies: self.dependencies.clone(),
}
}
}

#[allow(clippy::large_enum_variant)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum JobDescription {
Expand All @@ -104,6 +128,23 @@ impl JobDescription {
JobDescription::Graph { tasks } => tasks.len() as JobTaskCount,
}
}

pub fn clone_without_large_data(&self) -> JobDescription {
match self {
JobDescription::Array {
ids,
entries: _,
task_desc,
} => JobDescription::Array {
ids: ids.clone(),
entries: None, // Forget entries!
task_desc: task_desc.clone_without_large_data(),
},
JobDescription::Graph { tasks } => JobDescription::Graph {
tasks: tasks.iter().map(|t| t.clone_without_large_data()).collect(),
},
}
}
}

#[derive(Serialize, Deserialize, Debug)]
Expand Down
25 changes: 25 additions & 0 deletions crates/tako/src/program.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,28 @@ pub struct ProgramDefinition {
#[serde(default)]
pub cwd: PathBuf,
}

fn shortened_bstring(str: &BString) -> BString {
if str.len() < 256 {
str.clone()
} else {
format!("<{} bytes>", str.len()).into()
}
}

impl ProgramDefinition {
pub fn clone_without_large_data(&self) -> ProgramDefinition {
ProgramDefinition {
args: self.args.iter().take(128).map(shortened_bstring).collect(),
env: self
.env
.iter()
.map(|(k, v)| (shortened_bstring(k), shortened_bstring(v)))
.collect(),
stdout: self.stdout.clone(),
stderr: self.stderr.clone(),
stdin: Vec::new(), // Forget stdin
cwd: self.cwd.clone(),
}
}
}

0 comments on commit 2c65837

Please sign in to comment.