diff --git a/crates/hyperqueue/src/client/commands/submit/command.rs b/crates/hyperqueue/src/client/commands/submit/command.rs index f30341e18..2e2e5aad8 100644 --- a/crates/hyperqueue/src/client/commands/submit/command.rs +++ b/crates/hyperqueue/src/client/commands/submit/command.rs @@ -714,7 +714,7 @@ fn get_ids_and_entries(opts: &JobSubmitOpts) -> anyhow::Result<(IntArray, Option }) .collect(), ); - IntArray::from_ids(id_set.iter().copied()) + IntArray::from_sorted_ids(id_set.into_iter()) } else { array.clone() } diff --git a/crates/hyperqueue/src/client/commands/wait.rs b/crates/hyperqueue/src/client/commands/wait.rs index 434c1b6c5..b0d62c9ab 100644 --- a/crates/hyperqueue/src/client/commands/wait.rs +++ b/crates/hyperqueue/src/client/commands/wait.rs @@ -104,7 +104,7 @@ pub async fn wait_for_jobs_with_progress( let response = rpc_call!( session.connection(), FromClientMessage::JobInfo(JobInfoRequest { - selector: IdSelector::Specific(IntArray::from_ids(remaining_job_ids.iter().map(|x| x.as_num()))), + selector: IdSelector::Specific(IntArray::from_sorted_ids(remaining_job_ids.iter().map(|x| x.as_num()))), }), ToClientMessage::JobInfoResponse(r) => r ) diff --git a/crates/hyperqueue/src/client/output/cli.rs b/crates/hyperqueue/src/client/output/cli.rs index b23447bfc..f0bac4bd7 100644 --- a/crates/hyperqueue/src/client/output/cli.rs +++ b/crates/hyperqueue/src/client/output/cli.rs @@ -426,8 +426,12 @@ impl Output for CliOutput { } fn print_task_ids(&self, job_task_ids: Vec<(JobId, IntArray)>) { - for (_, array) in &job_task_ids { - println!("{}", array); + if job_task_ids.len() == 1 { + println!("{}", job_task_ids[0].1); + } else { + for (job_id, array) in &job_task_ids { + println!("{}: {}", job_id, array); + } } } diff --git a/crates/hyperqueue/src/client/task.rs b/crates/hyperqueue/src/client/task.rs index 954b304bd..53a7ff035 100644 --- a/crates/hyperqueue/src/client/task.rs +++ b/crates/hyperqueue/src/client/task.rs @@ -155,7 +155,7 @@ pub async fn output_job_task_ids( .map(|(job_id, detail)| { Ok(( *job_id, - IntArray::from_ids( + IntArray::from_sorted_ids( detail .as_ref() .ok_or_else(|| HqError::GenericError("Job Id not found".to_string()))? diff --git a/crates/hyperqueue/src/common/arraydef.rs b/crates/hyperqueue/src/common/arraydef.rs index 6a6b75a33..98a4056d0 100644 --- a/crates/hyperqueue/src/common/arraydef.rs +++ b/crates/hyperqueue/src/common/arraydef.rs @@ -38,22 +38,22 @@ impl IntArray { IntArray { ranges } } - // ids has to be sorted! - pub fn from_ids(ids: impl Iterator) -> IntArray { + pub fn from_sorted_ids(ids: impl Iterator) -> IntArray { let mut ranges: Vec = Vec::new(); let mut last_id = None; for id in ids { + debug_assert!(last_id.map(|last_id| last_id < id).unwrap_or(true)); if last_id.map(|last_id| last_id + 1 == id).unwrap_or(false) { ranges.last_mut().unwrap().count += 1; } else { ranges.push(IntRange::new(id, 1, 1)); } - last_id = Some(id) + last_id = Some(id); } IntArray { ranges } } pub fn from_id(id: u32) -> IntArray { - Self::from_ids([id].iter().copied()) + Self::from_sorted_ids([id].into_iter()) } pub fn from_range(start: u32, count: u32) -> Self { @@ -92,26 +92,19 @@ impl FromStr for IntArray { impl fmt::Display for IntArray { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let mut str = String::new(); - for x in &self.ranges { + for (idx, x) in self.ranges.iter().enumerate() { + if idx > 0 { + write!(f, ",")?; + } if x.count == 1 { - str.push_str(&format!("{}, ", x.start)); + write!(f, "{}", x.start)?; } else if x.step == 1 { - str.push_str(&format!("{}-{}, ", x.start, x.start + x.count - 1)); + write!(f, "{}-{}", x.start, x.start + x.count - 1)?; } else { - str.push_str(&format!( - "{}-{}:{}, ", - x.start, - x.start + x.count - 1, - x.step - )); + write!(f, "{}-{}:{}", x.start, x.start + x.count - 1, x.step)?; } } - if str.len() >= 2 { - write!(f, "{}", &str[0..str.len() - 2]) - } else { - Ok(()) - } + Ok(()) } } diff --git a/crates/hyperqueue/src/common/cli.rs b/crates/hyperqueue/src/common/cli.rs index 8788c1c56..a5e6b992a 100644 --- a/crates/hyperqueue/src/common/cli.rs +++ b/crates/hyperqueue/src/common/cli.rs @@ -320,7 +320,7 @@ pub enum JobCommand { Wait(JobWaitOpts), /// Interactively observe the execution of a job Progress(JobProgressOpts), - /// Print tasks Ids for given job + /// Print task Ids for given job TaskIds(JobTaskIdsOpts), } diff --git a/crates/hyperqueue/src/server/client/submit.rs b/crates/hyperqueue/src/server/client/submit.rs index 84d4a2f78..1c6d7a9a3 100644 --- a/crates/hyperqueue/src/server/client/submit.rs +++ b/crates/hyperqueue/src/server/client/submit.rs @@ -194,7 +194,7 @@ pub async fn handle_resubmit( ids.sort_unstable(); JobDescription::Array { - ids: IntArray::from_ids(ids.iter().copied()), + ids: IntArray::from_sorted_ids(ids.into_iter()), entries: entries.clone(), task_desc: task_desc.clone(), } diff --git a/crates/pyhq/src/client/job.rs b/crates/pyhq/src/client/job.rs index d4a751bd6..afafddd81 100644 --- a/crates/pyhq/src/client/job.rs +++ b/crates/pyhq/src/client/job.rs @@ -234,9 +234,9 @@ pub fn wait_for_jobs_impl( loop { let selector = - IdSelector::Specific(IntArray::from_ids(remaining_job_ids.iter().copied())); + IdSelector::Specific(IntArray::from_sorted_ids(remaining_job_ids.into_iter())); - response = hyperqueue::rpc_call!( + response = rpc_call!( ctx.session.connection(), FromClientMessage::JobInfo(JobInfoRequest { selector, @@ -310,7 +310,7 @@ pub fn get_failed_tasks_impl( ) -> PyResult { run_future(async move { let message = FromClientMessage::JobDetail(JobDetailRequest { - job_id_selector: IdSelector::Specific(IntArray::from_ids(job_ids.into_iter())), + job_id_selector: IdSelector::Specific(IntArray::from_sorted_ids(job_ids.into_iter())), task_selector: Some(TaskSelector { id_selector: TaskIdSelector::All, status_selector: TaskStatusSelector::Specific(vec![Status::Failed]), diff --git a/docs/jobs/arrays.md b/docs/jobs/arrays.md index 88addab09..11381b8db 100644 --- a/docs/jobs/arrays.md +++ b/docs/jobs/arrays.md @@ -104,7 +104,7 @@ and the other with `HQ_ENTRY` set to `{"batch_size": 8, "learning_rate": 0.001}` ### Combining with `--each-line`/`--from-json` with `--array` Option `--each-line` or `--from-json` can be combined with option `--array`. -In such case, only a subset of lines/json will be submited. +In such case, only a subset of lines/json will be submitted. If `--array` defines an ID that exceeds the number of lines in the file (or the number of elements in JSON), then the ID is silently removed. diff --git a/docs/jobs/failure.md b/docs/jobs/failure.md index 2522afd8c..61637a927 100644 --- a/docs/jobs/failure.md +++ b/docs/jobs/failure.md @@ -7,7 +7,7 @@ However, in case of [task arrays](arrays.md), different tasks may end in differe recompute only tasks with a specific status (e.g. failed tasks). By following combination of commands you may recompute only failed tasks. Let us assume that we want to recompute -all failed jobs in job 5: +all failed tasks in job 5: ```commandline $ hq submit --array=`hq job task-ids 5 --filter=failed` ./my-computation diff --git a/tests/test_entries.py b/tests/test_entries.py index ca338e2dc..99c9199df 100644 --- a/tests/test_entries.py +++ b/tests/test_entries.py @@ -123,7 +123,8 @@ def test_each_line_with_array(hq_env: HqEnv): [ "submit", "--each-line=input", - "--array", "2-4, 6", + "--array", + "2-4, 6", "--", "bash", "-c", @@ -154,7 +155,8 @@ def test_json_with_array(hq_env: HqEnv): [ "submit", "--from-json=input", - "--array", "2-3, 5, 6, 7, 1000", + "--array", + "2-3, 5, 6, 7, 1000", "--", "bash", "-c", diff --git a/tests/test_job.py b/tests/test_job.py index 6de8f5f0d..c8aaddcc7 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -702,10 +702,10 @@ def test_job_resubmit_with_status(hq_env: HqEnv): wait_for_job_state(hq_env, 1, "FAILED") table = hq_env.command(["job", "resubmit", "1", "--filter=failed"], as_table=True) - table.check_row_value("Tasks", "4; Ids: 4-6, 8") + table.check_row_value("Tasks", "4; Ids: 4-6,8") table = hq_env.command(["job", "resubmit", "1", "--filter=finished"], as_table=True) - table.check_row_value("Tasks", "3; Ids: 3, 7, 9") + table.check_row_value("Tasks", "3; Ids: 3,7,9") def test_job_resubmit_all(hq_env: HqEnv): @@ -715,7 +715,7 @@ def test_job_resubmit_all(hq_env: HqEnv): wait_for_job_state(hq_env, 1, "FINISHED") table = hq_env.command(["job", "resubmit", "1"], as_table=True) - table.check_row_value("Tasks", "3; Ids: 2, 7, 9") + table.check_row_value("Tasks", "3; Ids: 2,7,9") def test_job_resubmit_empty(hq_env: HqEnv): @@ -1477,21 +1477,30 @@ def check_child_process_exited(hq_env: HqEnv, stop_fn: Callable[[subprocess.Pope wait_for_pid_exit(parent) wait_for_pid_exit(child) + def test_job_task_ids(hq_env: HqEnv): hq_env.start_server() - hq_env.command(["submit", "--array=2,7,9,20-30", "--", "python", "-c", "import os; assert os.environ['HQ_TASK_ID'] not in ['25', '26', '27', '28']"]) + hq_env.command( + [ + "submit", + "--array=2,7,9,20-30", + "--", + "python", + "-c", + "import os; assert os.environ['HQ_TASK_ID'] not in ['25', '26', '27', '28']", + ] + ) hq_env.start_workers(1, cpus=1) wait_for_job_state(hq_env, 1, "FAILED") result = hq_env.command(["job", "task-ids", "1"]) - assert result == "2, 7, 9, 20-30\n" + assert result == "2,7,9,20-30\n" result = hq_env.command(["job", "task-ids", "1", "--filter", "finished"]) - assert result == "2, 7, 9, 20-24, 29-30\n" + assert result == "2,7,9,20-24,29-30\n" result = hq_env.command(["job", "task-ids", "1", "--filter", "failed"]) assert result == "25-28\n" result = hq_env.command(["job", "task-ids", "1", "--filter", "canceled"]) assert result == "\n" - diff --git a/tests/test_jobfile.py b/tests/test_jobfile.py index c7be6d4cd..569dfddbf 100644 --- a/tests/test_jobfile.py +++ b/tests/test_jobfile.py @@ -219,7 +219,7 @@ def test_job_file_array(hq_env: HqEnv, tmp_path): """) hq_env.command(["job", "submit-file", "job.toml"]) r = hq_env.command(["job", "info", "1"], as_table=True) - r.check_row_value("Tasks", "7; Ids: 2, 10-14, 120") + r.check_row_value("Tasks", "7; Ids: 2,10-14,120") def test_job_file_fail_mixing_array_and_tasks(hq_env: HqEnv, tmp_path):