From 7d4bce56758d0d654820ce57bba63a7595c7f9c7 Mon Sep 17 00:00:00 2001 From: Ada Bohm Date: Fri, 8 Dec 2023 11:12:25 +0100 Subject: [PATCH 1/6] hq job task-ids + some refactoring --- crates/hyperqueue/src/bin/hq.rs | 18 ++++++-- crates/hyperqueue/src/client/commands/job.rs | 12 +++++ .../src/client/commands/submit/command.rs | 2 +- crates/hyperqueue/src/client/commands/wait.rs | 27 ++++++----- crates/hyperqueue/src/client/output/cli.rs | 7 +++ crates/hyperqueue/src/client/output/json.rs | 10 +++++ .../hyperqueue/src/client/output/outputs.rs | 2 + crates/hyperqueue/src/client/output/quiet.rs | 3 ++ crates/hyperqueue/src/client/task.rs | 45 ++++++++++++++++++- crates/hyperqueue/src/common/arraydef.rs | 17 ++++--- crates/hyperqueue/src/common/cli.rs | 4 +- crates/hyperqueue/src/server/client/submit.rs | 2 +- tests/test_job.py | 19 ++++++++ 13 files changed, 145 insertions(+), 23 deletions(-) diff --git a/crates/hyperqueue/src/bin/hq.rs b/crates/hyperqueue/src/bin/hq.rs index e9d2cd0de..8a9186e31 100644 --- a/crates/hyperqueue/src/bin/hq.rs +++ b/crates/hyperqueue/src/bin/hq.rs @@ -11,7 +11,7 @@ use hyperqueue::client::commands::autoalloc::command_autoalloc; use hyperqueue::client::commands::event::command_event_log; use hyperqueue::client::commands::job::{ cancel_job, forget_job, output_job_cat, output_job_detail, output_job_list, output_job_summary, - JobCancelOpts, JobCatOpts, JobForgetOpts, JobInfoOpts, JobListOpts, + JobCancelOpts, JobCatOpts, JobForgetOpts, JobInfoOpts, JobListOpts, JobTaskIdsOpts, }; use hyperqueue::client::commands::log::command_log; use hyperqueue::client::commands::server::command_server; @@ -32,7 +32,8 @@ use hyperqueue::client::output::outputs::{Output, Outputs}; use hyperqueue::client::output::quiet::Quiet; use hyperqueue::client::status::Status; use hyperqueue::client::task::{ - output_job_task_info, output_job_task_list, TaskCommand, TaskInfoOpts, TaskListOpts, TaskOpts, + output_job_task_ids, output_job_task_info, output_job_task_list, TaskCommand, TaskInfoOpts, + TaskListOpts, TaskOpts, }; use hyperqueue::common::cli::{ get_task_id_selector, get_task_selector, ColorPolicy, CommonOpts, GenerateCompletionOpts, @@ -137,6 +138,14 @@ async fn command_job_resubmit( resubmit_computation(gsettings, &mut connection, opts).await } +async fn command_job_task_ids( + gsettings: &GlobalSettings, + opts: JobTaskIdsOpts, +) -> anyhow::Result<()> { + let mut connection = get_client_session(gsettings.server_directory()).await?; + output_job_task_ids(gsettings, &mut connection, opts).await +} + async fn command_job_wait(gsettings: &GlobalSettings, opts: JobWaitOpts) -> anyhow::Result<()> { let mut connection = get_client_session(gsettings.server_directory()).await?; wait_for_jobs(gsettings, &mut connection, opts.selector).await @@ -155,7 +164,7 @@ async fn command_job_progress( ToClientMessage::JobInfoResponse(r) => r ) .await?; - wait_for_jobs_with_progress(&mut session, response.jobs).await + wait_for_jobs_with_progress(&mut session, &response.jobs).await } async fn command_task_list(gsettings: &GlobalSettings, opts: TaskListOpts) -> anyhow::Result<()> { @@ -431,6 +440,9 @@ async fn main() -> hyperqueue::Result<()> { SubCommand::Job(JobOpts { subcmd: JobCommand::Progress(opts), }) => command_job_progress(&gsettings, opts).await, + SubCommand::Job(JobOpts { + subcmd: JobCommand::TaskIds(opts), + }) => command_job_task_ids(&gsettings, opts).await, SubCommand::Task(TaskOpts { subcmd: TaskCommand::List(opts), }) => command_task_list(&gsettings, opts).await, diff --git a/crates/hyperqueue/src/client/commands/job.rs b/crates/hyperqueue/src/client/commands/job.rs index 8b8c5903e..df48c1eaf 100644 --- a/crates/hyperqueue/src/client/commands/job.rs +++ b/crates/hyperqueue/src/client/commands/job.rs @@ -59,6 +59,18 @@ pub struct JobForgetOpts { pub filter: Vec, } +#[derive(Parser)] +pub struct JobTaskIdsOpts { + /// Single ID, ID range or `last` to display the most recently submitted job + #[arg(value_parser = parse_last_all_range)] + pub selector: IdSelector, + + /// Select only tasks with given state(s) + /// You can use multiple states separated by a comma. + #[arg(long, value_delimiter(','), value_enum)] + pub filter: Vec, +} + #[derive(clap::ValueEnum, Clone, Debug)] pub enum CompletedJobStatus { Finished, diff --git a/crates/hyperqueue/src/client/commands/submit/command.rs b/crates/hyperqueue/src/client/commands/submit/command.rs index e61cf2f2b..24c739020 100644 --- a/crates/hyperqueue/src/client/commands/submit/command.rs +++ b/crates/hyperqueue/src/client/commands/submit/command.rs @@ -681,7 +681,7 @@ pub(crate) async fn send_submit_request( ) .await?; } else if progress { - wait_for_jobs_with_progress(session, vec![info]).await?; + wait_for_jobs_with_progress(session, &[info]).await?; } Ok(()) } diff --git a/crates/hyperqueue/src/client/commands/wait.rs b/crates/hyperqueue/src/client/commands/wait.rs index 95c693f72..434c1b6c5 100644 --- a/crates/hyperqueue/src/client/commands/wait.rs +++ b/crates/hyperqueue/src/client/commands/wait.rs @@ -1,3 +1,4 @@ +use std::collections::BTreeSet; use std::io::Write; use std::time::{Duration, SystemTime}; use tokio::time::sleep; @@ -17,7 +18,7 @@ use crate::transfer::messages::{ FromClientMessage, IdSelector, JobDetailRequest, JobInfo, JobInfoRequest, TaskIdSelector, TaskSelector, TaskStatusSelector, ToClientMessage, WaitForJobsRequest, }; -use crate::{rpc_call, JobId, JobTaskCount, Set}; +use crate::{rpc_call, JobId, JobTaskCount}; use colored::Colorize; pub async fn wait_for_jobs( @@ -72,18 +73,23 @@ pub async fn wait_for_jobs( pub async fn wait_for_jobs_with_progress( session: &mut ClientSession, - mut jobs: Vec, + jobs: &[JobInfo], ) -> anyhow::Result<()> { - jobs.retain(|info| !is_terminated(info)); - - if jobs.is_empty() { + if jobs.iter().all(is_terminated) { log::warn!("There are no jobs to wait for"); } else { - let total_tasks: JobTaskCount = jobs.iter().map(|info| info.n_tasks).sum(); - let mut remaining_job_ids: Set = jobs.into_iter().map(|info| info.id).collect(); + let total_tasks: JobTaskCount = jobs + .iter() + .filter(|info| !is_terminated(info)) + .map(|info| info.n_tasks) + .sum(); + let mut remaining_job_ids: BTreeSet = jobs + .iter() + .filter(|info| !is_terminated(info)) + .map(|info| info.id) + .collect(); let total_jobs = remaining_job_ids.len(); - log::info!( "Waiting for {} {} with {} {}", total_jobs, @@ -95,15 +101,14 @@ pub async fn wait_for_jobs_with_progress( let mut counters = JobTaskCounters::default(); loop { - let ids_ref = &mut remaining_job_ids; let response = rpc_call!( session.connection(), FromClientMessage::JobInfo(JobInfoRequest { - selector: IdSelector::Specific(IntArray::from_ids(ids_ref.iter().map(|&id| id.into()).collect())), + selector: IdSelector::Specific(IntArray::from_ids(remaining_job_ids.iter().map(|x| x.as_num()))), }), ToClientMessage::JobInfoResponse(r) => r ) - .await?; + .await?; let mut current_counters = counters; for job in &response.jobs { diff --git a/crates/hyperqueue/src/client/output/cli.rs b/crates/hyperqueue/src/client/output/cli.rs index a602d1a62..b23447bfc 100644 --- a/crates/hyperqueue/src/client/output/cli.rs +++ b/crates/hyperqueue/src/client/output/cli.rs @@ -35,6 +35,7 @@ use crate::client::output::common::{ }; use crate::client::output::json::format_datetime; use crate::client::output::Verbosity; +use crate::common::arraydef::IntArray; use crate::common::utils::str::{pluralize, select_plural, truncate_middle}; use crate::worker::start::WORKER_EXTRA_PROCESS_PID; use anyhow::Error; @@ -424,6 +425,12 @@ impl Output for CliOutput { ); } + fn print_task_ids(&self, job_task_ids: Vec<(JobId, IntArray)>) { + for (_, array) in &job_task_ids { + println!("{}", array); + } + } + fn print_job_list(&self, jobs: Vec, total_jobs: usize) { let job_count = jobs.len(); let rows: Vec<_> = jobs diff --git a/crates/hyperqueue/src/client/output/json.rs b/crates/hyperqueue/src/client/output/json.rs index cacc8cae7..e5c0cf093 100644 --- a/crates/hyperqueue/src/client/output/json.rs +++ b/crates/hyperqueue/src/client/output/json.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::path::Path; use std::time::Duration; @@ -17,6 +18,7 @@ use crate::client::job::WorkerMap; use crate::client::output::common::{group_jobs_by_status, resolve_task_paths, TaskToPathsMap}; use crate::client::output::outputs::{Output, OutputStream}; use crate::client::output::Verbosity; +use crate::common::arraydef::IntArray; use crate::common::manager::info::{GetManagerInfo, ManagerType}; use crate::server::autoalloc::{Allocation, AllocationState, QueueId}; use crate::server::job::{JobTaskInfo, JobTaskState, StartedTaskData}; @@ -199,6 +201,14 @@ impl Output for JsonOutput { self.print(format_tasks(tasks, map)); } + fn print_task_ids(&self, job_task_ids: Vec<(JobId, IntArray)>) { + let map: HashMap> = job_task_ids + .into_iter() + .map(|(key, value)| (key, value.iter().collect())) + .collect(); + self.print(json!(map)); + } + fn print_summary(&self, filename: &Path, summary: Summary) { let json = json!({ "filename": filename, diff --git a/crates/hyperqueue/src/client/output/outputs.rs b/crates/hyperqueue/src/client/output/outputs.rs index 685506bad..002213d86 100644 --- a/crates/hyperqueue/src/client/output/outputs.rs +++ b/crates/hyperqueue/src/client/output/outputs.rs @@ -10,6 +10,7 @@ use std::path::Path; use crate::client::output::common::TaskToPathsMap; use crate::client::output::Verbosity; +use crate::common::arraydef::IntArray; use crate::server::job::JobTaskInfo; use crate::JobId; use core::time::Duration; @@ -77,6 +78,7 @@ pub trait Output { server_uid: &str, verbosity: Verbosity, ); + fn print_task_ids(&self, jobs_task_id: Vec<(JobId, IntArray)>); // Log fn print_summary(&self, filename: &Path, summary: Summary); diff --git a/crates/hyperqueue/src/client/output/quiet.rs b/crates/hyperqueue/src/client/output/quiet.rs index 6465f0c7a..36833c128 100644 --- a/crates/hyperqueue/src/client/output/quiet.rs +++ b/crates/hyperqueue/src/client/output/quiet.rs @@ -13,6 +13,7 @@ use crate::client::output::common::{ }; use crate::client::output::outputs::{Output, OutputStream}; use crate::client::status::{job_status, Status}; +use crate::common::arraydef::IntArray; use crate::server::autoalloc::Allocation; use crate::server::job::JobTaskInfo; use crate::stream::reader::logfile::Summary; @@ -130,6 +131,8 @@ impl Output for Quiet { ) { } + fn print_task_ids(&self, _job_task_ids: Vec<(JobId, IntArray)>) {} + // Log fn print_summary(&self, _filename: &Path, _summary: Summary) {} diff --git a/crates/hyperqueue/src/client/task.rs b/crates/hyperqueue/src/client/task.rs index 44f529884..954b304bd 100644 --- a/crates/hyperqueue/src/client/task.rs +++ b/crates/hyperqueue/src/client/task.rs @@ -1,14 +1,16 @@ +use crate::client::commands::job::JobTaskIdsOpts; use crate::client::globalsettings::GlobalSettings; use crate::client::job::get_worker_map; use crate::client::output::{Verbosity, VerbosityFlag}; use crate::common::arraydef::IntArray; use crate::common::cli::{parse_last_range, parse_last_single_id, TaskSelectorArg}; -use crate::rpc_call; +use crate::common::error::HqError; use crate::transfer::connection::ClientSession; use crate::transfer::messages::{ FromClientMessage, IdSelector, JobDetailRequest, SingleIdSelector, TaskIdSelector, TaskSelector, TaskStatusSelector, ToClientMessage, }; +use crate::{rpc_call, JobId}; #[derive(clap::Parser)] pub struct TaskOpts { @@ -127,3 +129,44 @@ pub async fn output_job_task_info( Ok(()) } + +pub async fn output_job_task_ids( + gsettings: &GlobalSettings, + session: &mut ClientSession, + opts: JobTaskIdsOpts, +) -> anyhow::Result<()> { + let message = FromClientMessage::JobDetail(JobDetailRequest { + job_id_selector: opts.selector, + task_selector: Some(TaskSelector { + id_selector: TaskIdSelector::All, + status_selector: if opts.filter.is_empty() { + TaskStatusSelector::All + } else { + TaskStatusSelector::Specific(opts.filter) + }, + }), + }); + let response = + rpc_call!(session.connection(), message, ToClientMessage::JobDetailResponse(r) => r) + .await?; + let mut job_task_ids: Vec<_> = response + .details + .iter() + .map(|(job_id, detail)| { + Ok(( + *job_id, + IntArray::from_ids( + detail + .as_ref() + .ok_or_else(|| HqError::GenericError("Job Id not found".to_string()))? + .tasks + .iter() + .map(|info| info.task_id.as_num()), + ), + )) + }) + .collect::>>()?; + job_task_ids.sort_unstable_by_key(|x| x.0); + gsettings.printer().print_task_ids(job_task_ids); + Ok(()) +} diff --git a/crates/hyperqueue/src/common/arraydef.rs b/crates/hyperqueue/src/common/arraydef.rs index 6f51b7544..6a6b75a33 100644 --- a/crates/hyperqueue/src/common/arraydef.rs +++ b/crates/hyperqueue/src/common/arraydef.rs @@ -38,19 +38,22 @@ impl IntArray { IntArray { ranges } } - pub fn from_ids(ids: Vec) -> IntArray { + // ids has to be sorted! + pub fn from_ids(ids: impl Iterator) -> IntArray { let mut ranges: Vec = Vec::new(); + let mut last_id = None; for id in ids { - if let Some(pos) = ranges.iter().position(|x| id == (x.start + x.count)) { - ranges[pos].count += 1; + if last_id.map(|last_id| last_id + 1 == id).unwrap_or(false) { + ranges.last_mut().unwrap().count += 1; } else { ranges.push(IntRange::new(id, 1, 1)); } + last_id = Some(id) } IntArray { ranges } } pub fn from_id(id: u32) -> IntArray { - Self::from_ids(vec![id]) + Self::from_ids([id].iter().copied()) } pub fn from_range(start: u32, count: u32) -> Self { @@ -104,7 +107,11 @@ impl fmt::Display for IntArray { )); } } - write!(f, "{}", &str[0..str.len() - 2]) + if str.len() >= 2 { + write!(f, "{}", &str[0..str.len() - 2]) + } else { + Ok(()) + } } } diff --git a/crates/hyperqueue/src/common/cli.rs b/crates/hyperqueue/src/common/cli.rs index ec3b6236e..8788c1c56 100644 --- a/crates/hyperqueue/src/common/cli.rs +++ b/crates/hyperqueue/src/common/cli.rs @@ -9,7 +9,7 @@ use tako::WorkerId; use crate::client::commands::autoalloc::AutoAllocOpts; use crate::client::commands::event::EventLogOpts; use crate::client::commands::job::{ - JobCancelOpts, JobCatOpts, JobForgetOpts, JobInfoOpts, JobListOpts, + JobCancelOpts, JobCatOpts, JobForgetOpts, JobInfoOpts, JobListOpts, JobTaskIdsOpts, }; use crate::client::commands::log::LogOpts; use crate::client::commands::server::ServerOpts; @@ -320,6 +320,8 @@ pub enum JobCommand { Wait(JobWaitOpts), /// Interactively observe the execution of a job Progress(JobProgressOpts), + /// Print tasks Ids for given job + TaskIds(JobTaskIdsOpts), } #[derive(Parser)] diff --git a/crates/hyperqueue/src/server/client/submit.rs b/crates/hyperqueue/src/server/client/submit.rs index 18c0466d7..84d4a2f78 100644 --- a/crates/hyperqueue/src/server/client/submit.rs +++ b/crates/hyperqueue/src/server/client/submit.rs @@ -194,7 +194,7 @@ pub async fn handle_resubmit( ids.sort_unstable(); JobDescription::Array { - ids: IntArray::from_ids(ids), + ids: IntArray::from_ids(ids.iter().copied()), entries: entries.clone(), task_desc: task_desc.clone(), } diff --git a/tests/test_job.py b/tests/test_job.py index d00adec63..6de8f5f0d 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -1476,3 +1476,22 @@ def check_child_process_exited(hq_env: HqEnv, stop_fn: Callable[[subprocess.Pope parent, child = pids wait_for_pid_exit(parent) wait_for_pid_exit(child) + +def test_job_task_ids(hq_env: HqEnv): + hq_env.start_server() + hq_env.command(["submit", "--array=2,7,9,20-30", "--", "python", "-c", "import os; assert os.environ['HQ_TASK_ID'] not in ['25', '26', '27', '28']"]) + hq_env.start_workers(1, cpus=1) + wait_for_job_state(hq_env, 1, "FAILED") + + result = hq_env.command(["job", "task-ids", "1"]) + assert result == "2, 7, 9, 20-30\n" + + result = hq_env.command(["job", "task-ids", "1", "--filter", "finished"]) + assert result == "2, 7, 9, 20-24, 29-30\n" + + result = hq_env.command(["job", "task-ids", "1", "--filter", "failed"]) + assert result == "25-28\n" + + result = hq_env.command(["job", "task-ids", "1", "--filter", "canceled"]) + assert result == "\n" + From eb8ed2fc567438005a275072182c1430c4844523 Mon Sep 17 00:00:00 2001 From: Ada Bohm Date: Fri, 8 Dec 2023 16:13:51 +0100 Subject: [PATCH 2/6] Allow combining --array and --each-line/--from-json --- .../src/client/commands/submit/command.rs | 33 ++++++++-- tests/test_entries.py | 62 +++++++++++++++++++ 2 files changed, 89 insertions(+), 6 deletions(-) diff --git a/crates/hyperqueue/src/client/commands/submit/command.rs b/crates/hyperqueue/src/client/commands/submit/command.rs index 24c739020..f30341e18 100644 --- a/crates/hyperqueue/src/client/commands/submit/command.rs +++ b/crates/hyperqueue/src/client/commands/submit/command.rs @@ -1,3 +1,4 @@ +use std::collections::BTreeSet; use std::io::{BufRead, Read}; use std::path::{Path, PathBuf}; use std::str::FromStr; @@ -246,14 +247,14 @@ pub struct SubmitJobConfOpts { // Parameters for creating array jobs /// Create a task array where a task will be created for each line of the given file. /// The corresponding line will be passed to the task in environment variable `HQ_ENTRY`. - #[arg(long, conflicts_with("array"), value_hint = clap::ValueHint::FilePath)] + #[arg(long, value_hint = clap::ValueHint::FilePath)] each_line: Option, /// Create a task array where a task will be created for each item of a JSON array stored in /// the given file. /// The corresponding item from the array will be passed as a JSON string to the task in /// environment variable `HQ_ENTRY`. - #[arg(long, conflicts_with("array"), conflicts_with("each_line"), value_hint = clap::ValueHint::FilePath)] + #[arg(long, conflicts_with("each_line"), value_hint = clap::ValueHint::FilePath)] from_json: Option, /// Create a task array where a task will be created for each number in the specified number range. @@ -687,7 +688,7 @@ pub(crate) async fn send_submit_request( } fn get_ids_and_entries(opts: &JobSubmitOpts) -> anyhow::Result<(IntArray, Option>)> { - let entries = if let Some(ref filename) = opts.conf.each_line { + let mut entries = if let Some(ref filename) = opts.conf.each_line { Some(read_lines(filename)?) } else if let Some(ref filename) = opts.conf.from_json { Some(make_entries_from_json(filename)?) @@ -695,10 +696,30 @@ fn get_ids_and_entries(opts: &JobSubmitOpts) -> anyhow::Result<(IntArray, Option None }; - let ids = if let Some(ref entries) = entries { + let ids = if let Some(array) = &opts.conf.array { + if let Some(es) = entries { + let id_set: BTreeSet = array + .iter() + .filter(|id| (*id as usize) < es.len()) + .collect(); + entries = Some( + es.into_iter() + .enumerate() + .filter_map(|(id, value)| { + if id_set.contains(&(id as u32)) { + Some(value) + } else { + None + } + }) + .collect(), + ); + IntArray::from_ids(id_set.iter().copied()) + } else { + array.clone() + } + } else if let Some(ref entries) = entries { IntArray::from_range(0, entries.len() as JobTaskCount) - } else if let Some(ref array) = opts.conf.array { - array.clone() } else { IntArray::from_id(0) }; diff --git a/tests/test_entries.py b/tests/test_entries.py index a9929d4de..ca338e2dc 100644 --- a/tests/test_entries.py +++ b/tests/test_entries.py @@ -110,3 +110,65 @@ def test_entries_invalid_from_json_entry(hq_env: HqEnv): "echo $HQ_ENTRY", ] ) + + +def test_each_line_with_array(hq_env: HqEnv): + hq_env.start_server() + hq_env.start_worker(cpus=2) + + with open("input", "w") as f: + f.write("One\nTwo\nThree\nFour\nFive\nSix\nSeven") + + hq_env.command( + [ + "submit", + "--each-line=input", + "--array", "2-4, 6", + "--", + "bash", + "-c", + "echo $HQ_ENTRY,$HQ_TASK_ID", + ] + ) + wait_for_job_state(hq_env, 1, "FINISHED") + + for i, test in enumerate([None, None, "Three,2\n", "Four,3\n", "Five,4\n", None, "Seven,6\n"]): + filename = default_task_output(job_id=1, task_id=i) + if test is None: + assert not os.path.exists(filename) + else: + check_file_contents(filename, test) + + table = hq_env.command(["job", "info", "1"], as_table=True) + assert table.get_row_value("State").split("\n")[-1] == "FINISHED (4)" + + +def test_json_with_array(hq_env: HqEnv): + hq_env.start_server() + hq_env.start_worker(cpus=2) + + with open("input", "w") as f: + f.write('["One", "Two", "Three", "Four", "Five", "Six", "Seven"]') + + hq_env.command( + [ + "submit", + "--from-json=input", + "--array", "2-3, 5, 6, 7, 1000", + "--", + "bash", + "-c", + "echo $HQ_ENTRY,$HQ_TASK_ID", + ] + ) + wait_for_job_state(hq_env, 1, "FINISHED") + + for i, test in enumerate([None, None, '"Three",2\n', '"Four",3\n', None, '"Six",5\n', '"Seven",6\n']): + filename = default_task_output(job_id=1, task_id=i) + if test is None: + assert not os.path.exists(filename) + else: + check_file_contents(filename, test) + + table = hq_env.command(["job", "info", "1"], as_table=True) + assert table.get_row_value("State").split("\n")[-1] == "FINISHED (4)" From 86c8b05a66b9190042b18a99786b5f7a25cf1c7b Mon Sep 17 00:00:00 2001 From: Ada Bohm Date: Fri, 8 Dec 2023 16:48:27 +0100 Subject: [PATCH 3/6] Docs updated --- docs/jobs/arrays.md | 16 ++++++++++++++++ docs/jobs/failure.md | 43 ++++++++++++++++++++++++++++--------------- 2 files changed, 44 insertions(+), 15 deletions(-) diff --git a/docs/jobs/arrays.md b/docs/jobs/arrays.md index d4024ef62..88addab09 100644 --- a/docs/jobs/arrays.md +++ b/docs/jobs/arrays.md @@ -100,3 +100,19 @@ If `items.json` contained this content: ``` then HyperQueue would create two tasks, one with `HQ_ENTRY` set to `{"batch_size": 4, "learning_rate": 0.01}` and the other with `HQ_ENTRY` set to `{"batch_size": 8, "learning_rate": 0.001}`. + +### Combining with `--each-line`/`--from-json` with `--array` + +Option `--each-line` or `--from-json` can be combined with option `--array`. +In such case, only a subset of lines/json will be submited. +If `--array` defines an ID that exceeds the number of lines in the file (or the number of elements in JSON), then the ID is silently removed. + + +For example: + +```commandline +$ hq submit --each-line input.txt --array "2, 8-10" +``` + +If `input.txt` has sufficiently many lines then it will create array job with four tasks. One for 3rd line of file and three tasks for 9th-11th line +(note that first line has id 0). It analogously works for `--from-json`. \ No newline at end of file diff --git a/docs/jobs/failure.md b/docs/jobs/failure.md index 26ad1a686..2522afd8c 100644 --- a/docs/jobs/failure.md +++ b/docs/jobs/failure.md @@ -1,29 +1,42 @@ In distributed systems, failure is inevitable. This sections describes how HyperQueue handles various types of failures and how can you affect its behavior. -## Resubmitting jobs -When a job fails or is canceled, you might want to submit it again, without the need to pass all the original -parameters. You can achieve this using **resubmit**: +## Resubmitting array jobs +When a job fails or is canceled, you can submit it again. +However, in case of [task arrays](arrays.md), different tasks may end in different states, and often we want to +recompute only tasks with a specific status (e.g. failed tasks). -```bash -$ hq job resubmit +By following combination of commands you may recompute only failed tasks. Let us assume that we want to recompute +all failed jobs in job 5: + +```commandline +$ hq submit --array=`hq job task-ids 5 --filter=failed` ./my-computation +``` +It works as follows: Command `hq job task-ids 5 --filter=failed` returns IDs of failed jobs of job `5`, and we set +it to `--array` parameter that starts only tasks for given IDs. + +If we want to recompute all failed tasks and all canceled tasks we can do it as follows: + +```commandline +$ hq submit --array=`hq job task-ids 5 --filter=failed,canceled` ./my-computation ``` -It wil create a new job that has the same configuration as the job with the entered job id. +Note that it also works with `--each-line` or `--from-json`, i.e.: + +```commandline +# Original computation +$ hq submit --each-line=input.txt ./my-computation -This is especially useful for [task arrays](arrays.md). By default, `resubmit` will submit all tasks of the original job; -however, you can specify only a subset of tasks based on their [state](jobs.md#task-state): -```bash -$ hq job resubmit --status=failed,canceled +# Resubmitting failed jobs +$ hq submit --each-line=input.txt --array=`hq job task-ids last --filter=failed` ./my-computation ``` -Using this command you can resubmit e.g. only the tasks that have failed, without the need to recompute all tasks of -a large task array. ## Task restart -Sometimes a worker might crash while it is executing some task. In that case the server will reschedule that task to a -different worker and the task will begin executing from the beginning. + +Sometimes a worker might crash while it is executing some task. In that case the server will automatically +reschedule that task to a different worker and the task will begin executing from the beginning. In order to let the executed application know that the same task is being executed repeatedly, HyperQueue assigns each execution a separate **Instance ID**. It is a 32b non-negative number that identifies each (re-)execution of a task. @@ -43,7 +56,7 @@ You can change this behavior with the `--max-fails=` option of the `submit` c If specified, once more tasks than `X` tasks fail, the rest of the job's tasks that were not completed yet will be canceled. For example: -```bash +```commandline $ hq submit --array 1-1000 --max-fails 5 ... ``` This will create a task array with `1000` tasks. Once `5` or more tasks fail, the remaining uncompleted tasks of the job From d69325be2ce346ccbba2121951ce693e15342c1c Mon Sep 17 00:00:00 2001 From: Ada Bohm Date: Fri, 8 Dec 2023 16:52:16 +0100 Subject: [PATCH 4/6] Fixed Python API --- crates/pyhq/src/client/job.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/pyhq/src/client/job.rs b/crates/pyhq/src/client/job.rs index 55ccc2234..d4a751bd6 100644 --- a/crates/pyhq/src/client/job.rs +++ b/crates/pyhq/src/client/job.rs @@ -233,9 +233,8 @@ pub fn wait_for_jobs_impl( let mut response: JobInfoResponse; loop { - let selector = IdSelector::Specific(IntArray::from_ids( - remaining_job_ids.iter().copied().collect(), - )); + let selector = + IdSelector::Specific(IntArray::from_ids(remaining_job_ids.iter().copied())); response = hyperqueue::rpc_call!( ctx.session.connection(), @@ -311,7 +310,7 @@ pub fn get_failed_tasks_impl( ) -> PyResult { run_future(async move { let message = FromClientMessage::JobDetail(JobDetailRequest { - job_id_selector: IdSelector::Specific(IntArray::from_ids(job_ids)), + job_id_selector: IdSelector::Specific(IntArray::from_ids(job_ids.into_iter())), task_selector: Some(TaskSelector { id_selector: TaskIdSelector::All, status_selector: TaskStatusSelector::Specific(vec![Status::Failed]), From 81f768f50171911f35d3bc8ae4faab5249704e13 Mon Sep 17 00:00:00 2001 From: Ada Bohm Date: Fri, 8 Dec 2023 16:54:18 +0100 Subject: [PATCH 5/6] Removed some old unused code --- CHANGELOG.md | 1 + crates/hyperqueue/src/client/commands/job.rs | 25 +------------------- 2 files changed, 2 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5d319acea..ae4cc54a3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Dev ## Breaking change + * The output format of the `job info` command with JSON output mode has been changed. Note that the JSON output mode is still unstable. diff --git a/crates/hyperqueue/src/client/commands/job.rs b/crates/hyperqueue/src/client/commands/job.rs index df48c1eaf..77246334e 100644 --- a/crates/hyperqueue/src/client/commands/job.rs +++ b/crates/hyperqueue/src/client/commands/job.rs @@ -8,13 +8,12 @@ use crate::client::status::{job_status, Status}; use crate::common::cli::{parse_last_all_range, parse_last_range, TaskSelectorArg}; use crate::common::utils::str::pluralize; use crate::rpc_call; -use crate::transfer::connection::{ClientConnection, ClientSession}; +use crate::transfer::connection::ClientSession; use crate::transfer::messages::{ CancelJobResponse, CancelRequest, ForgetJobRequest, FromClientMessage, IdSelector, JobDetail, JobDetailRequest, JobInfoRequest, TaskIdSelector, TaskSelector, TaskStatusSelector, ToClientMessage, }; -use crate::JobId; #[derive(Parser)] pub struct JobListOpts { @@ -107,28 +106,6 @@ pub struct JobCatOpts { pub stream: OutputStream, } -pub async fn get_last_job_id(connection: &mut ClientConnection) -> crate::Result> { - let message = FromClientMessage::JobInfo(JobInfoRequest { - selector: IdSelector::LastN(1), - }); - let response = rpc_call!(connection, message, ToClientMessage::JobInfoResponse(r) => r).await?; - - Ok(response.jobs.last().map(|job| job.id)) -} - -pub async fn get_job_ids(connection: &mut ClientConnection) -> crate::Result>> { - let message = FromClientMessage::JobInfo(JobInfoRequest { - selector: IdSelector::All, - }); - let response = rpc_call!(connection, message, ToClientMessage::JobInfoResponse(r) => r).await?; - - let mut ids: Vec = Vec::new(); - for job in response.jobs { - ids.push(job.id); - } - Ok(Option::from(ids)) -} - pub async fn output_job_list( gsettings: &GlobalSettings, session: &mut ClientSession, From 41cb62aeaad39410106a56280e72dad930103f22 Mon Sep 17 00:00:00 2001 From: Ada Bohm Date: Tue, 12 Dec 2023 13:10:29 +0100 Subject: [PATCH 6/6] Code review updates --- .../src/client/commands/submit/command.rs | 2 +- crates/hyperqueue/src/client/commands/wait.rs | 2 +- crates/hyperqueue/src/client/output/cli.rs | 8 +++-- crates/hyperqueue/src/client/task.rs | 2 +- crates/hyperqueue/src/common/arraydef.rs | 31 +++++++------------ crates/hyperqueue/src/common/cli.rs | 2 +- crates/hyperqueue/src/server/client/submit.rs | 2 +- crates/pyhq/src/client/job.rs | 6 ++-- docs/jobs/arrays.md | 2 +- docs/jobs/failure.md | 2 +- tests/test_entries.py | 6 ++-- tests/test_job.py | 23 +++++++++----- tests/test_jobfile.py | 2 +- 13 files changed, 49 insertions(+), 41 deletions(-) diff --git a/crates/hyperqueue/src/client/commands/submit/command.rs b/crates/hyperqueue/src/client/commands/submit/command.rs index f30341e18..2e2e5aad8 100644 --- a/crates/hyperqueue/src/client/commands/submit/command.rs +++ b/crates/hyperqueue/src/client/commands/submit/command.rs @@ -714,7 +714,7 @@ fn get_ids_and_entries(opts: &JobSubmitOpts) -> anyhow::Result<(IntArray, Option }) .collect(), ); - IntArray::from_ids(id_set.iter().copied()) + IntArray::from_sorted_ids(id_set.into_iter()) } else { array.clone() } diff --git a/crates/hyperqueue/src/client/commands/wait.rs b/crates/hyperqueue/src/client/commands/wait.rs index 434c1b6c5..b0d62c9ab 100644 --- a/crates/hyperqueue/src/client/commands/wait.rs +++ b/crates/hyperqueue/src/client/commands/wait.rs @@ -104,7 +104,7 @@ pub async fn wait_for_jobs_with_progress( let response = rpc_call!( session.connection(), FromClientMessage::JobInfo(JobInfoRequest { - selector: IdSelector::Specific(IntArray::from_ids(remaining_job_ids.iter().map(|x| x.as_num()))), + selector: IdSelector::Specific(IntArray::from_sorted_ids(remaining_job_ids.iter().map(|x| x.as_num()))), }), ToClientMessage::JobInfoResponse(r) => r ) diff --git a/crates/hyperqueue/src/client/output/cli.rs b/crates/hyperqueue/src/client/output/cli.rs index b23447bfc..f0bac4bd7 100644 --- a/crates/hyperqueue/src/client/output/cli.rs +++ b/crates/hyperqueue/src/client/output/cli.rs @@ -426,8 +426,12 @@ impl Output for CliOutput { } fn print_task_ids(&self, job_task_ids: Vec<(JobId, IntArray)>) { - for (_, array) in &job_task_ids { - println!("{}", array); + if job_task_ids.len() == 1 { + println!("{}", job_task_ids[0].1); + } else { + for (job_id, array) in &job_task_ids { + println!("{}: {}", job_id, array); + } } } diff --git a/crates/hyperqueue/src/client/task.rs b/crates/hyperqueue/src/client/task.rs index 954b304bd..53a7ff035 100644 --- a/crates/hyperqueue/src/client/task.rs +++ b/crates/hyperqueue/src/client/task.rs @@ -155,7 +155,7 @@ pub async fn output_job_task_ids( .map(|(job_id, detail)| { Ok(( *job_id, - IntArray::from_ids( + IntArray::from_sorted_ids( detail .as_ref() .ok_or_else(|| HqError::GenericError("Job Id not found".to_string()))? diff --git a/crates/hyperqueue/src/common/arraydef.rs b/crates/hyperqueue/src/common/arraydef.rs index 6a6b75a33..98a4056d0 100644 --- a/crates/hyperqueue/src/common/arraydef.rs +++ b/crates/hyperqueue/src/common/arraydef.rs @@ -38,22 +38,22 @@ impl IntArray { IntArray { ranges } } - // ids has to be sorted! - pub fn from_ids(ids: impl Iterator) -> IntArray { + pub fn from_sorted_ids(ids: impl Iterator) -> IntArray { let mut ranges: Vec = Vec::new(); let mut last_id = None; for id in ids { + debug_assert!(last_id.map(|last_id| last_id < id).unwrap_or(true)); if last_id.map(|last_id| last_id + 1 == id).unwrap_or(false) { ranges.last_mut().unwrap().count += 1; } else { ranges.push(IntRange::new(id, 1, 1)); } - last_id = Some(id) + last_id = Some(id); } IntArray { ranges } } pub fn from_id(id: u32) -> IntArray { - Self::from_ids([id].iter().copied()) + Self::from_sorted_ids([id].into_iter()) } pub fn from_range(start: u32, count: u32) -> Self { @@ -92,26 +92,19 @@ impl FromStr for IntArray { impl fmt::Display for IntArray { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let mut str = String::new(); - for x in &self.ranges { + for (idx, x) in self.ranges.iter().enumerate() { + if idx > 0 { + write!(f, ",")?; + } if x.count == 1 { - str.push_str(&format!("{}, ", x.start)); + write!(f, "{}", x.start)?; } else if x.step == 1 { - str.push_str(&format!("{}-{}, ", x.start, x.start + x.count - 1)); + write!(f, "{}-{}", x.start, x.start + x.count - 1)?; } else { - str.push_str(&format!( - "{}-{}:{}, ", - x.start, - x.start + x.count - 1, - x.step - )); + write!(f, "{}-{}:{}", x.start, x.start + x.count - 1, x.step)?; } } - if str.len() >= 2 { - write!(f, "{}", &str[0..str.len() - 2]) - } else { - Ok(()) - } + Ok(()) } } diff --git a/crates/hyperqueue/src/common/cli.rs b/crates/hyperqueue/src/common/cli.rs index 8788c1c56..a5e6b992a 100644 --- a/crates/hyperqueue/src/common/cli.rs +++ b/crates/hyperqueue/src/common/cli.rs @@ -320,7 +320,7 @@ pub enum JobCommand { Wait(JobWaitOpts), /// Interactively observe the execution of a job Progress(JobProgressOpts), - /// Print tasks Ids for given job + /// Print task Ids for given job TaskIds(JobTaskIdsOpts), } diff --git a/crates/hyperqueue/src/server/client/submit.rs b/crates/hyperqueue/src/server/client/submit.rs index 84d4a2f78..1c6d7a9a3 100644 --- a/crates/hyperqueue/src/server/client/submit.rs +++ b/crates/hyperqueue/src/server/client/submit.rs @@ -194,7 +194,7 @@ pub async fn handle_resubmit( ids.sort_unstable(); JobDescription::Array { - ids: IntArray::from_ids(ids.iter().copied()), + ids: IntArray::from_sorted_ids(ids.into_iter()), entries: entries.clone(), task_desc: task_desc.clone(), } diff --git a/crates/pyhq/src/client/job.rs b/crates/pyhq/src/client/job.rs index d4a751bd6..d63cbdcf3 100644 --- a/crates/pyhq/src/client/job.rs +++ b/crates/pyhq/src/client/job.rs @@ -234,9 +234,9 @@ pub fn wait_for_jobs_impl( loop { let selector = - IdSelector::Specific(IntArray::from_ids(remaining_job_ids.iter().copied())); + IdSelector::Specific(IntArray::from_sorted_ids(remaining_job_ids.iter().copied())); - response = hyperqueue::rpc_call!( + response = rpc_call!( ctx.session.connection(), FromClientMessage::JobInfo(JobInfoRequest { selector, @@ -310,7 +310,7 @@ pub fn get_failed_tasks_impl( ) -> PyResult { run_future(async move { let message = FromClientMessage::JobDetail(JobDetailRequest { - job_id_selector: IdSelector::Specific(IntArray::from_ids(job_ids.into_iter())), + job_id_selector: IdSelector::Specific(IntArray::from_sorted_ids(job_ids.into_iter())), task_selector: Some(TaskSelector { id_selector: TaskIdSelector::All, status_selector: TaskStatusSelector::Specific(vec![Status::Failed]), diff --git a/docs/jobs/arrays.md b/docs/jobs/arrays.md index 88addab09..11381b8db 100644 --- a/docs/jobs/arrays.md +++ b/docs/jobs/arrays.md @@ -104,7 +104,7 @@ and the other with `HQ_ENTRY` set to `{"batch_size": 8, "learning_rate": 0.001}` ### Combining with `--each-line`/`--from-json` with `--array` Option `--each-line` or `--from-json` can be combined with option `--array`. -In such case, only a subset of lines/json will be submited. +In such case, only a subset of lines/json will be submitted. If `--array` defines an ID that exceeds the number of lines in the file (or the number of elements in JSON), then the ID is silently removed. diff --git a/docs/jobs/failure.md b/docs/jobs/failure.md index 2522afd8c..61637a927 100644 --- a/docs/jobs/failure.md +++ b/docs/jobs/failure.md @@ -7,7 +7,7 @@ However, in case of [task arrays](arrays.md), different tasks may end in differe recompute only tasks with a specific status (e.g. failed tasks). By following combination of commands you may recompute only failed tasks. Let us assume that we want to recompute -all failed jobs in job 5: +all failed tasks in job 5: ```commandline $ hq submit --array=`hq job task-ids 5 --filter=failed` ./my-computation diff --git a/tests/test_entries.py b/tests/test_entries.py index ca338e2dc..99c9199df 100644 --- a/tests/test_entries.py +++ b/tests/test_entries.py @@ -123,7 +123,8 @@ def test_each_line_with_array(hq_env: HqEnv): [ "submit", "--each-line=input", - "--array", "2-4, 6", + "--array", + "2-4, 6", "--", "bash", "-c", @@ -154,7 +155,8 @@ def test_json_with_array(hq_env: HqEnv): [ "submit", "--from-json=input", - "--array", "2-3, 5, 6, 7, 1000", + "--array", + "2-3, 5, 6, 7, 1000", "--", "bash", "-c", diff --git a/tests/test_job.py b/tests/test_job.py index 6de8f5f0d..c8aaddcc7 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -702,10 +702,10 @@ def test_job_resubmit_with_status(hq_env: HqEnv): wait_for_job_state(hq_env, 1, "FAILED") table = hq_env.command(["job", "resubmit", "1", "--filter=failed"], as_table=True) - table.check_row_value("Tasks", "4; Ids: 4-6, 8") + table.check_row_value("Tasks", "4; Ids: 4-6,8") table = hq_env.command(["job", "resubmit", "1", "--filter=finished"], as_table=True) - table.check_row_value("Tasks", "3; Ids: 3, 7, 9") + table.check_row_value("Tasks", "3; Ids: 3,7,9") def test_job_resubmit_all(hq_env: HqEnv): @@ -715,7 +715,7 @@ def test_job_resubmit_all(hq_env: HqEnv): wait_for_job_state(hq_env, 1, "FINISHED") table = hq_env.command(["job", "resubmit", "1"], as_table=True) - table.check_row_value("Tasks", "3; Ids: 2, 7, 9") + table.check_row_value("Tasks", "3; Ids: 2,7,9") def test_job_resubmit_empty(hq_env: HqEnv): @@ -1477,21 +1477,30 @@ def check_child_process_exited(hq_env: HqEnv, stop_fn: Callable[[subprocess.Pope wait_for_pid_exit(parent) wait_for_pid_exit(child) + def test_job_task_ids(hq_env: HqEnv): hq_env.start_server() - hq_env.command(["submit", "--array=2,7,9,20-30", "--", "python", "-c", "import os; assert os.environ['HQ_TASK_ID'] not in ['25', '26', '27', '28']"]) + hq_env.command( + [ + "submit", + "--array=2,7,9,20-30", + "--", + "python", + "-c", + "import os; assert os.environ['HQ_TASK_ID'] not in ['25', '26', '27', '28']", + ] + ) hq_env.start_workers(1, cpus=1) wait_for_job_state(hq_env, 1, "FAILED") result = hq_env.command(["job", "task-ids", "1"]) - assert result == "2, 7, 9, 20-30\n" + assert result == "2,7,9,20-30\n" result = hq_env.command(["job", "task-ids", "1", "--filter", "finished"]) - assert result == "2, 7, 9, 20-24, 29-30\n" + assert result == "2,7,9,20-24,29-30\n" result = hq_env.command(["job", "task-ids", "1", "--filter", "failed"]) assert result == "25-28\n" result = hq_env.command(["job", "task-ids", "1", "--filter", "canceled"]) assert result == "\n" - diff --git a/tests/test_jobfile.py b/tests/test_jobfile.py index c7be6d4cd..569dfddbf 100644 --- a/tests/test_jobfile.py +++ b/tests/test_jobfile.py @@ -219,7 +219,7 @@ def test_job_file_array(hq_env: HqEnv, tmp_path): """) hq_env.command(["job", "submit-file", "job.toml"]) r = hq_env.command(["job", "info", "1"], as_table=True) - r.check_row_value("Tasks", "7; Ids: 2, 10-14, 120") + r.check_row_value("Tasks", "7; Ids: 2,10-14,120") def test_job_file_fail_mixing_array_and_tasks(hq_env: HqEnv, tmp_path):