Skip to content

Commit

Permalink
Optimizations in job submit
Browse files Browse the repository at this point in the history
  • Loading branch information
spirali committed Dec 19, 2023
1 parent 2c65837 commit 3d72136
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 93 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ the JSON output mode is still unstable.
## New features
* Allow setting minimum duration for a task (`min_time` resource value) using the Python API.

* Optimizations related to job submit & long term memory saving

# v0.17.0

## Breaking change
Expand Down
66 changes: 25 additions & 41 deletions crates/hyperqueue/src/server/client/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@ use tako::gateway::{
FromGatewayMessage, NewTasksMessage, ResourceRequestVariants, SharedTaskConfiguration,
TaskConfiguration, ToGatewayMessage,
};
use tako::program::ProgramDefinition;
use tako::TaskId;

use crate::common::arraydef::IntArray;
use crate::common::env::{HQ_ENTRY, HQ_JOB_ID, HQ_SUBMIT_DIR, HQ_TASK_ID};
use crate::common::placeholders::{
fill_placeholders_after_submit, fill_placeholders_log, normalize_path,
};
Expand Down Expand Up @@ -43,7 +41,7 @@ pub async fn handle_submit(
let (job_id, tako_base_id) = prepare_job(&mut message, &mut state_ref.get_mut());

let SubmitRequest {
job_desc,
mut job_desc,
name,
max_fails,
submit_dir,
Expand All @@ -56,19 +54,24 @@ pub async fn handle_submit(
submit_dir: &submit_dir,
};

let small_job_desc = job_desc.clone_without_large_data();

let new_tasks: anyhow::Result<NewTasksMessage> = {
match job_desc {
match &mut job_desc {
JobDescription::Array {
ids,
entries,
task_desc,
} => Ok(build_tasks_array(ids, entries, task_desc, job_ctx)),
} => Ok(build_tasks_array(
ids,
std::mem::take(entries),
task_desc,
job_ctx,
)),
JobDescription::Graph { tasks } => build_tasks_graph(tasks, job_ctx),
}
};

job_desc.strip_large_data();

let new_tasks = match new_tasks {
Err(error) => {
state_ref.get_mut().revert_to_job_id(job_id);
Expand All @@ -78,7 +81,7 @@ pub async fn handle_submit(
};

let job = Job::new(
small_job_desc,
job_desc,
job_id,
tako_base_id,
name,
Expand Down Expand Up @@ -154,39 +157,20 @@ async fn start_log_streaming(tako_ref: &Backend, job_id: JobId, path: PathBuf) {
assert!(receiver.await.is_ok());
}

fn make_program_def_for_task(
program_def: &ProgramDefinition,
task_id: JobTaskId,
ctx: &JobContext,
) -> ProgramDefinition {
let mut def = program_def.clone();
def.env
.insert(HQ_JOB_ID.into(), ctx.job_id.to_string().into());
def.env
.insert(HQ_TASK_ID.into(), task_id.to_string().into());
def.env.insert(
HQ_SUBMIT_DIR.into(),
BString::from(ctx.submit_dir.to_string_lossy().as_bytes()),
);
def
}

fn serialize_task_body(
ctx: &JobContext,
task_id: JobTaskId,
entry: Option<BString>,
task_desc: &TaskDescription,
) -> Box<[u8]> {
let mut program = make_program_def_for_task(&task_desc.program, task_id, ctx);
if let Some(e) = entry {
program.env.insert(HQ_ENTRY.into(), e);
}
let body_msg = TaskBody {
program,
program: Cow::Borrowed(&task_desc.program),
pin: task_desc.pin_mode.clone(),
task_dir: task_desc.task_dir,
job_id: ctx.job_id,
task_id,
submit_dir: Cow::Borrowed(ctx.submit_dir),
entry,
};
let body = tako::comm::serialize(&body_msg).expect("Could not serialize task body");
// Make sure that `into_boxed_slice` is a no-op.
Expand All @@ -195,9 +179,9 @@ fn serialize_task_body(
}

fn build_tasks_array(
ids: IntArray,
ids: &IntArray,
entries: Option<Vec<BString>>,
task_desc: TaskDescription,
task_desc: &TaskDescription,
ctx: JobContext,
) -> NewTasksMessage {
let tako_base_id = ctx.tako_base_id.as_num();
Expand Down Expand Up @@ -237,7 +221,7 @@ fn build_tasks_array(
NewTasksMessage {
tasks,
shared_data: vec![SharedTaskConfiguration {
resources: task_desc.resources,
resources: task_desc.resources.clone(),
n_outputs: 0,
time_limit: task_desc.time_limit,
keep: false,
Expand All @@ -249,13 +233,13 @@ fn build_tasks_array(
}

fn build_tasks_graph(
tasks: Vec<TaskWithDependencies>,
tasks: &[TaskWithDependencies],
ctx: JobContext,
) -> anyhow::Result<NewTasksMessage> {
let mut job_task_id_to_tako_id: Map<JobTaskId, TaskId> = Map::with_capacity(tasks.len());

let mut tako_id = ctx.tako_base_id.as_num();
for task in &tasks {
for task in tasks {
if job_task_id_to_tako_id
.insert(task.id, tako_id.into())
.is_some()
Expand All @@ -268,7 +252,7 @@ fn build_tasks_graph(
let mut shared_data = vec![];
let mut shared_data_map =
Map::<(Cow<ResourceRequestVariants>, Option<Duration>, Priority), usize>::new();
let mut allocate_shared_data = |task: TaskDescription| -> u32 {
let mut allocate_shared_data = |task: &TaskDescription| -> u32 {
shared_data_map
.get(&(
Cow::Borrowed(&task.resources),
Expand All @@ -287,7 +271,7 @@ fn build_tasks_graph(
index,
);
shared_data.push(SharedTaskConfiguration {
resources: task.resources,
resources: task.resources.clone(),
n_outputs: 0,
time_limit: task.time_limit,
priority: task.priority,
Expand All @@ -302,14 +286,14 @@ fn build_tasks_graph(
let mut task_configs = Vec::with_capacity(tasks.len());
for task in tasks {
let body = serialize_task_body(&ctx, task.id, None, &task.task_desc);
let shared_data_index = allocate_shared_data(task.task_desc);
let shared_data_index = allocate_shared_data(&task.task_desc);

let mut task_deps = Vec::with_capacity(task.dependencies.len());
for dependency in task.dependencies {
if dependency == task.id {
for dependency in &task.dependencies {
if *dependency == task.id {
return Err(anyhow::anyhow!("Task {} depends on itself", task.id));
}
match job_task_id_to_tako_id.get(&dependency) {
match job_task_id_to_tako_id.get(dependency) {
Some(id) => task_deps.push(*id),
None => {
return Err(anyhow::anyhow!(
Expand Down
54 changes: 23 additions & 31 deletions crates/hyperqueue/src/transfer/messages.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use chrono::{DateTime, Utc};
use serde::Deserialize;
use serde::Serialize;
use std::borrow::Cow;

use crate::client::status::Status;
use crate::common::arraydef::IntArray;
Expand All @@ -9,7 +10,7 @@ use crate::server::autoalloc::{Allocation, QueueId, QueueInfo};
use crate::server::job::{JobTaskCounters, JobTaskInfo};
use crate::{JobId, JobTaskCount, JobTaskId, Map, WorkerId};
use bstr::BString;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::time::Duration;

use crate::server::event::MonitoringEvent;
Expand Down Expand Up @@ -58,12 +59,14 @@ impl PinMode {
}

#[derive(Serialize, Deserialize, Debug)]
pub struct TaskBody {
pub program: ProgramDefinition,
pub struct TaskBody<'a> {
pub program: Cow<'a, ProgramDefinition>,
pub pin: PinMode,
pub task_dir: bool,
pub job_id: JobId,
pub task_id: JobTaskId,
pub submit_dir: Cow<'a, Path>,
pub entry: Option<BString>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand All @@ -78,16 +81,8 @@ pub struct TaskDescription {
}

impl TaskDescription {
pub fn clone_without_large_data(&self) -> TaskDescription {
TaskDescription {
program: self.program.clone_without_large_data(),
resources: self.resources.clone(),
pin_mode: self.pin_mode.clone(),
task_dir: self.task_dir,
time_limit: self.time_limit.clone(),
priority: self.priority,
crash_limit: self.crash_limit,
}
pub fn strip_large_data(&mut self) {
self.program.strip_large_data();
}
}

Expand All @@ -99,12 +94,8 @@ pub struct TaskWithDependencies {
}

impl TaskWithDependencies {
pub fn clone_without_large_data(&self) -> TaskWithDependencies {
TaskWithDependencies {
id: self.id,
task_desc: self.task_desc.clone_without_large_data(),
dependencies: self.dependencies.clone(),
}
pub fn strip_large_data(&mut self) {
self.task_desc.strip_large_data();
}
}

Expand All @@ -129,21 +120,22 @@ impl JobDescription {
}
}

pub fn clone_without_large_data(&self) -> JobDescription {
pub fn strip_large_data(&mut self) {
match self {
JobDescription::Array {
ids,
entries: _,
ids: _,
entries,
task_desc,
} => JobDescription::Array {
ids: ids.clone(),
entries: None, // Forget entries!
task_desc: task_desc.clone_without_large_data(),
},
JobDescription::Graph { tasks } => JobDescription::Graph {
tasks: tasks.iter().map(|t| t.clone_without_large_data()).collect(),
},
}
} => {
*entries = None;
task_desc.strip_large_data();
}
JobDescription::Graph { tasks } => {
for task in tasks {
task.strip_large_data()
}
}
};
}
}

Expand Down
23 changes: 21 additions & 2 deletions crates/hyperqueue/src/worker/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use tako::launcher::{
use tako::{format_comma_delimited, InstanceId};

use crate::common::env::{
HQ_CPUS, HQ_ERROR_FILENAME, HQ_INSTANCE_ID, HQ_NODE_FILE, HQ_PIN, HQ_SUBMIT_DIR, HQ_TASK_DIR,
HQ_CPUS, HQ_ENTRY, HQ_ERROR_FILENAME, HQ_INSTANCE_ID, HQ_JOB_ID, HQ_NODE_FILE, HQ_PIN,
HQ_SUBMIT_DIR, HQ_TASK_DIR, HQ_TASK_ID,
};
use crate::common::placeholders::{
fill_placeholders_in_paths, CompletePlaceholderCtx, ResolvablePaths,
Expand Down Expand Up @@ -84,13 +85,31 @@ impl TaskLauncher for HqTaskLauncher {

let body: TaskBody = tako::comm::deserialize(launch_ctx.body())?;
let TaskBody {
mut program,
program,
pin: pin_mode,
task_dir,
job_id,
task_id,
submit_dir,
entry,
} = body;

let mut program = program.into_owned();

program
.env
.insert(HQ_JOB_ID.into(), job_id.to_string().into());
program
.env
.insert(HQ_TASK_ID.into(), task_id.to_string().into());
program.env.insert(
HQ_SUBMIT_DIR.into(),
BString::from(submit_dir.to_string_lossy().as_bytes()),
);
if let Some(entry) = entry {
program.env.insert(HQ_ENTRY.into(), entry);
}

pin_program(&mut program, launch_ctx.allocation(), pin_mode, &launch_ctx)?;

let task_dir = if task_dir {
Expand Down
31 changes: 12 additions & 19 deletions crates/tako/src/program.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,27 +62,20 @@ pub struct ProgramDefinition {
pub cwd: PathBuf,
}

fn shortened_bstring(str: &BString) -> BString {
if str.len() < 256 {
str.clone()
} else {
format!("<{} bytes>", str.len()).into()
}
}
const MAX_SHORTENED_BSTRING: usize = 256;
const MAX_SHORTENED_ARGS: usize = 128;

impl ProgramDefinition {
pub fn clone_without_large_data(&self) -> ProgramDefinition {
ProgramDefinition {
args: self.args.iter().take(128).map(shortened_bstring).collect(),
env: self
.env
.iter()
.map(|(k, v)| (shortened_bstring(k), shortened_bstring(v)))
.collect(),
stdout: self.stdout.clone(),
stderr: self.stderr.clone(),
stdin: Vec::new(), // Forget stdin
cwd: self.cwd.clone(),
pub fn strip_large_data(&mut self) {
self.stdin = Vec::new();
if self.args.len() > MAX_SHORTENED_ARGS {
self.args.truncate(MAX_SHORTENED_ARGS);
self.args.shrink_to_fit();
}
for arg in &mut self.args {
if arg.len() > MAX_SHORTENED_BSTRING {
*arg = format!("<{} bytes>", arg.len()).into()
}
}
}
}

0 comments on commit 3d72136

Please sign in to comment.