diff --git a/CHANGELOG.md b/CHANGELOG.md index 5d319acea..ae4cc54a3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Dev ## Breaking change + * The output format of the `job info` command with JSON output mode has been changed. Note that the JSON output mode is still unstable. diff --git a/crates/hyperqueue/src/bin/hq.rs b/crates/hyperqueue/src/bin/hq.rs index e9d2cd0de..8a9186e31 100644 --- a/crates/hyperqueue/src/bin/hq.rs +++ b/crates/hyperqueue/src/bin/hq.rs @@ -11,7 +11,7 @@ use hyperqueue::client::commands::autoalloc::command_autoalloc; use hyperqueue::client::commands::event::command_event_log; use hyperqueue::client::commands::job::{ cancel_job, forget_job, output_job_cat, output_job_detail, output_job_list, output_job_summary, - JobCancelOpts, JobCatOpts, JobForgetOpts, JobInfoOpts, JobListOpts, + JobCancelOpts, JobCatOpts, JobForgetOpts, JobInfoOpts, JobListOpts, JobTaskIdsOpts, }; use hyperqueue::client::commands::log::command_log; use hyperqueue::client::commands::server::command_server; @@ -32,7 +32,8 @@ use hyperqueue::client::output::outputs::{Output, Outputs}; use hyperqueue::client::output::quiet::Quiet; use hyperqueue::client::status::Status; use hyperqueue::client::task::{ - output_job_task_info, output_job_task_list, TaskCommand, TaskInfoOpts, TaskListOpts, TaskOpts, + output_job_task_ids, output_job_task_info, output_job_task_list, TaskCommand, TaskInfoOpts, + TaskListOpts, TaskOpts, }; use hyperqueue::common::cli::{ get_task_id_selector, get_task_selector, ColorPolicy, CommonOpts, GenerateCompletionOpts, @@ -137,6 +138,14 @@ async fn command_job_resubmit( resubmit_computation(gsettings, &mut connection, opts).await } +async fn command_job_task_ids( + gsettings: &GlobalSettings, + opts: JobTaskIdsOpts, +) -> anyhow::Result<()> { + let mut connection = get_client_session(gsettings.server_directory()).await?; + output_job_task_ids(gsettings, &mut connection, opts).await +} + async fn command_job_wait(gsettings: &GlobalSettings, opts: JobWaitOpts) -> anyhow::Result<()> { let mut connection = get_client_session(gsettings.server_directory()).await?; wait_for_jobs(gsettings, &mut connection, opts.selector).await @@ -155,7 +164,7 @@ async fn command_job_progress( ToClientMessage::JobInfoResponse(r) => r ) .await?; - wait_for_jobs_with_progress(&mut session, response.jobs).await + wait_for_jobs_with_progress(&mut session, &response.jobs).await } async fn command_task_list(gsettings: &GlobalSettings, opts: TaskListOpts) -> anyhow::Result<()> { @@ -431,6 +440,9 @@ async fn main() -> hyperqueue::Result<()> { SubCommand::Job(JobOpts { subcmd: JobCommand::Progress(opts), }) => command_job_progress(&gsettings, opts).await, + SubCommand::Job(JobOpts { + subcmd: JobCommand::TaskIds(opts), + }) => command_job_task_ids(&gsettings, opts).await, SubCommand::Task(TaskOpts { subcmd: TaskCommand::List(opts), }) => command_task_list(&gsettings, opts).await, diff --git a/crates/hyperqueue/src/client/commands/job.rs b/crates/hyperqueue/src/client/commands/job.rs index 8b8c5903e..77246334e 100644 --- a/crates/hyperqueue/src/client/commands/job.rs +++ b/crates/hyperqueue/src/client/commands/job.rs @@ -8,13 +8,12 @@ use crate::client::status::{job_status, Status}; use crate::common::cli::{parse_last_all_range, parse_last_range, TaskSelectorArg}; use crate::common::utils::str::pluralize; use crate::rpc_call; -use crate::transfer::connection::{ClientConnection, ClientSession}; +use crate::transfer::connection::ClientSession; use crate::transfer::messages::{ CancelJobResponse, CancelRequest, ForgetJobRequest, FromClientMessage, IdSelector, JobDetail, JobDetailRequest, JobInfoRequest, TaskIdSelector, TaskSelector, TaskStatusSelector, ToClientMessage, }; -use crate::JobId; #[derive(Parser)] pub struct JobListOpts { @@ -59,6 +58,18 @@ pub struct JobForgetOpts { pub filter: Vec, } +#[derive(Parser)] +pub struct JobTaskIdsOpts { + /// Single ID, ID range or `last` to display the most recently submitted job + #[arg(value_parser = parse_last_all_range)] + pub selector: IdSelector, + + /// Select only tasks with given state(s) + /// You can use multiple states separated by a comma. + #[arg(long, value_delimiter(','), value_enum)] + pub filter: Vec, +} + #[derive(clap::ValueEnum, Clone, Debug)] pub enum CompletedJobStatus { Finished, @@ -95,28 +106,6 @@ pub struct JobCatOpts { pub stream: OutputStream, } -pub async fn get_last_job_id(connection: &mut ClientConnection) -> crate::Result> { - let message = FromClientMessage::JobInfo(JobInfoRequest { - selector: IdSelector::LastN(1), - }); - let response = rpc_call!(connection, message, ToClientMessage::JobInfoResponse(r) => r).await?; - - Ok(response.jobs.last().map(|job| job.id)) -} - -pub async fn get_job_ids(connection: &mut ClientConnection) -> crate::Result>> { - let message = FromClientMessage::JobInfo(JobInfoRequest { - selector: IdSelector::All, - }); - let response = rpc_call!(connection, message, ToClientMessage::JobInfoResponse(r) => r).await?; - - let mut ids: Vec = Vec::new(); - for job in response.jobs { - ids.push(job.id); - } - Ok(Option::from(ids)) -} - pub async fn output_job_list( gsettings: &GlobalSettings, session: &mut ClientSession, diff --git a/crates/hyperqueue/src/client/commands/submit/command.rs b/crates/hyperqueue/src/client/commands/submit/command.rs index e61cf2f2b..2e2e5aad8 100644 --- a/crates/hyperqueue/src/client/commands/submit/command.rs +++ b/crates/hyperqueue/src/client/commands/submit/command.rs @@ -1,3 +1,4 @@ +use std::collections::BTreeSet; use std::io::{BufRead, Read}; use std::path::{Path, PathBuf}; use std::str::FromStr; @@ -246,14 +247,14 @@ pub struct SubmitJobConfOpts { // Parameters for creating array jobs /// Create a task array where a task will be created for each line of the given file. /// The corresponding line will be passed to the task in environment variable `HQ_ENTRY`. - #[arg(long, conflicts_with("array"), value_hint = clap::ValueHint::FilePath)] + #[arg(long, value_hint = clap::ValueHint::FilePath)] each_line: Option, /// Create a task array where a task will be created for each item of a JSON array stored in /// the given file. /// The corresponding item from the array will be passed as a JSON string to the task in /// environment variable `HQ_ENTRY`. - #[arg(long, conflicts_with("array"), conflicts_with("each_line"), value_hint = clap::ValueHint::FilePath)] + #[arg(long, conflicts_with("each_line"), value_hint = clap::ValueHint::FilePath)] from_json: Option, /// Create a task array where a task will be created for each number in the specified number range. @@ -681,13 +682,13 @@ pub(crate) async fn send_submit_request( ) .await?; } else if progress { - wait_for_jobs_with_progress(session, vec![info]).await?; + wait_for_jobs_with_progress(session, &[info]).await?; } Ok(()) } fn get_ids_and_entries(opts: &JobSubmitOpts) -> anyhow::Result<(IntArray, Option>)> { - let entries = if let Some(ref filename) = opts.conf.each_line { + let mut entries = if let Some(ref filename) = opts.conf.each_line { Some(read_lines(filename)?) } else if let Some(ref filename) = opts.conf.from_json { Some(make_entries_from_json(filename)?) @@ -695,10 +696,30 @@ fn get_ids_and_entries(opts: &JobSubmitOpts) -> anyhow::Result<(IntArray, Option None }; - let ids = if let Some(ref entries) = entries { + let ids = if let Some(array) = &opts.conf.array { + if let Some(es) = entries { + let id_set: BTreeSet = array + .iter() + .filter(|id| (*id as usize) < es.len()) + .collect(); + entries = Some( + es.into_iter() + .enumerate() + .filter_map(|(id, value)| { + if id_set.contains(&(id as u32)) { + Some(value) + } else { + None + } + }) + .collect(), + ); + IntArray::from_sorted_ids(id_set.into_iter()) + } else { + array.clone() + } + } else if let Some(ref entries) = entries { IntArray::from_range(0, entries.len() as JobTaskCount) - } else if let Some(ref array) = opts.conf.array { - array.clone() } else { IntArray::from_id(0) }; diff --git a/crates/hyperqueue/src/client/commands/wait.rs b/crates/hyperqueue/src/client/commands/wait.rs index 95c693f72..b0d62c9ab 100644 --- a/crates/hyperqueue/src/client/commands/wait.rs +++ b/crates/hyperqueue/src/client/commands/wait.rs @@ -1,3 +1,4 @@ +use std::collections::BTreeSet; use std::io::Write; use std::time::{Duration, SystemTime}; use tokio::time::sleep; @@ -17,7 +18,7 @@ use crate::transfer::messages::{ FromClientMessage, IdSelector, JobDetailRequest, JobInfo, JobInfoRequest, TaskIdSelector, TaskSelector, TaskStatusSelector, ToClientMessage, WaitForJobsRequest, }; -use crate::{rpc_call, JobId, JobTaskCount, Set}; +use crate::{rpc_call, JobId, JobTaskCount}; use colored::Colorize; pub async fn wait_for_jobs( @@ -72,18 +73,23 @@ pub async fn wait_for_jobs( pub async fn wait_for_jobs_with_progress( session: &mut ClientSession, - mut jobs: Vec, + jobs: &[JobInfo], ) -> anyhow::Result<()> { - jobs.retain(|info| !is_terminated(info)); - - if jobs.is_empty() { + if jobs.iter().all(is_terminated) { log::warn!("There are no jobs to wait for"); } else { - let total_tasks: JobTaskCount = jobs.iter().map(|info| info.n_tasks).sum(); - let mut remaining_job_ids: Set = jobs.into_iter().map(|info| info.id).collect(); + let total_tasks: JobTaskCount = jobs + .iter() + .filter(|info| !is_terminated(info)) + .map(|info| info.n_tasks) + .sum(); + let mut remaining_job_ids: BTreeSet = jobs + .iter() + .filter(|info| !is_terminated(info)) + .map(|info| info.id) + .collect(); let total_jobs = remaining_job_ids.len(); - log::info!( "Waiting for {} {} with {} {}", total_jobs, @@ -95,15 +101,14 @@ pub async fn wait_for_jobs_with_progress( let mut counters = JobTaskCounters::default(); loop { - let ids_ref = &mut remaining_job_ids; let response = rpc_call!( session.connection(), FromClientMessage::JobInfo(JobInfoRequest { - selector: IdSelector::Specific(IntArray::from_ids(ids_ref.iter().map(|&id| id.into()).collect())), + selector: IdSelector::Specific(IntArray::from_sorted_ids(remaining_job_ids.iter().map(|x| x.as_num()))), }), ToClientMessage::JobInfoResponse(r) => r ) - .await?; + .await?; let mut current_counters = counters; for job in &response.jobs { diff --git a/crates/hyperqueue/src/client/output/cli.rs b/crates/hyperqueue/src/client/output/cli.rs index a602d1a62..f0bac4bd7 100644 --- a/crates/hyperqueue/src/client/output/cli.rs +++ b/crates/hyperqueue/src/client/output/cli.rs @@ -35,6 +35,7 @@ use crate::client::output::common::{ }; use crate::client::output::json::format_datetime; use crate::client::output::Verbosity; +use crate::common::arraydef::IntArray; use crate::common::utils::str::{pluralize, select_plural, truncate_middle}; use crate::worker::start::WORKER_EXTRA_PROCESS_PID; use anyhow::Error; @@ -424,6 +425,16 @@ impl Output for CliOutput { ); } + fn print_task_ids(&self, job_task_ids: Vec<(JobId, IntArray)>) { + if job_task_ids.len() == 1 { + println!("{}", job_task_ids[0].1); + } else { + for (job_id, array) in &job_task_ids { + println!("{}: {}", job_id, array); + } + } + } + fn print_job_list(&self, jobs: Vec, total_jobs: usize) { let job_count = jobs.len(); let rows: Vec<_> = jobs diff --git a/crates/hyperqueue/src/client/output/json.rs b/crates/hyperqueue/src/client/output/json.rs index cacc8cae7..e5c0cf093 100644 --- a/crates/hyperqueue/src/client/output/json.rs +++ b/crates/hyperqueue/src/client/output/json.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::path::Path; use std::time::Duration; @@ -17,6 +18,7 @@ use crate::client::job::WorkerMap; use crate::client::output::common::{group_jobs_by_status, resolve_task_paths, TaskToPathsMap}; use crate::client::output::outputs::{Output, OutputStream}; use crate::client::output::Verbosity; +use crate::common::arraydef::IntArray; use crate::common::manager::info::{GetManagerInfo, ManagerType}; use crate::server::autoalloc::{Allocation, AllocationState, QueueId}; use crate::server::job::{JobTaskInfo, JobTaskState, StartedTaskData}; @@ -199,6 +201,14 @@ impl Output for JsonOutput { self.print(format_tasks(tasks, map)); } + fn print_task_ids(&self, job_task_ids: Vec<(JobId, IntArray)>) { + let map: HashMap> = job_task_ids + .into_iter() + .map(|(key, value)| (key, value.iter().collect())) + .collect(); + self.print(json!(map)); + } + fn print_summary(&self, filename: &Path, summary: Summary) { let json = json!({ "filename": filename, diff --git a/crates/hyperqueue/src/client/output/outputs.rs b/crates/hyperqueue/src/client/output/outputs.rs index 685506bad..002213d86 100644 --- a/crates/hyperqueue/src/client/output/outputs.rs +++ b/crates/hyperqueue/src/client/output/outputs.rs @@ -10,6 +10,7 @@ use std::path::Path; use crate::client::output::common::TaskToPathsMap; use crate::client::output::Verbosity; +use crate::common::arraydef::IntArray; use crate::server::job::JobTaskInfo; use crate::JobId; use core::time::Duration; @@ -77,6 +78,7 @@ pub trait Output { server_uid: &str, verbosity: Verbosity, ); + fn print_task_ids(&self, jobs_task_id: Vec<(JobId, IntArray)>); // Log fn print_summary(&self, filename: &Path, summary: Summary); diff --git a/crates/hyperqueue/src/client/output/quiet.rs b/crates/hyperqueue/src/client/output/quiet.rs index 6465f0c7a..36833c128 100644 --- a/crates/hyperqueue/src/client/output/quiet.rs +++ b/crates/hyperqueue/src/client/output/quiet.rs @@ -13,6 +13,7 @@ use crate::client::output::common::{ }; use crate::client::output::outputs::{Output, OutputStream}; use crate::client::status::{job_status, Status}; +use crate::common::arraydef::IntArray; use crate::server::autoalloc::Allocation; use crate::server::job::JobTaskInfo; use crate::stream::reader::logfile::Summary; @@ -130,6 +131,8 @@ impl Output for Quiet { ) { } + fn print_task_ids(&self, _job_task_ids: Vec<(JobId, IntArray)>) {} + // Log fn print_summary(&self, _filename: &Path, _summary: Summary) {} diff --git a/crates/hyperqueue/src/client/task.rs b/crates/hyperqueue/src/client/task.rs index 44f529884..53a7ff035 100644 --- a/crates/hyperqueue/src/client/task.rs +++ b/crates/hyperqueue/src/client/task.rs @@ -1,14 +1,16 @@ +use crate::client::commands::job::JobTaskIdsOpts; use crate::client::globalsettings::GlobalSettings; use crate::client::job::get_worker_map; use crate::client::output::{Verbosity, VerbosityFlag}; use crate::common::arraydef::IntArray; use crate::common::cli::{parse_last_range, parse_last_single_id, TaskSelectorArg}; -use crate::rpc_call; +use crate::common::error::HqError; use crate::transfer::connection::ClientSession; use crate::transfer::messages::{ FromClientMessage, IdSelector, JobDetailRequest, SingleIdSelector, TaskIdSelector, TaskSelector, TaskStatusSelector, ToClientMessage, }; +use crate::{rpc_call, JobId}; #[derive(clap::Parser)] pub struct TaskOpts { @@ -127,3 +129,44 @@ pub async fn output_job_task_info( Ok(()) } + +pub async fn output_job_task_ids( + gsettings: &GlobalSettings, + session: &mut ClientSession, + opts: JobTaskIdsOpts, +) -> anyhow::Result<()> { + let message = FromClientMessage::JobDetail(JobDetailRequest { + job_id_selector: opts.selector, + task_selector: Some(TaskSelector { + id_selector: TaskIdSelector::All, + status_selector: if opts.filter.is_empty() { + TaskStatusSelector::All + } else { + TaskStatusSelector::Specific(opts.filter) + }, + }), + }); + let response = + rpc_call!(session.connection(), message, ToClientMessage::JobDetailResponse(r) => r) + .await?; + let mut job_task_ids: Vec<_> = response + .details + .iter() + .map(|(job_id, detail)| { + Ok(( + *job_id, + IntArray::from_sorted_ids( + detail + .as_ref() + .ok_or_else(|| HqError::GenericError("Job Id not found".to_string()))? + .tasks + .iter() + .map(|info| info.task_id.as_num()), + ), + )) + }) + .collect::>>()?; + job_task_ids.sort_unstable_by_key(|x| x.0); + gsettings.printer().print_task_ids(job_task_ids); + Ok(()) +} diff --git a/crates/hyperqueue/src/common/arraydef.rs b/crates/hyperqueue/src/common/arraydef.rs index 6f51b7544..98a4056d0 100644 --- a/crates/hyperqueue/src/common/arraydef.rs +++ b/crates/hyperqueue/src/common/arraydef.rs @@ -38,19 +38,22 @@ impl IntArray { IntArray { ranges } } - pub fn from_ids(ids: Vec) -> IntArray { + pub fn from_sorted_ids(ids: impl Iterator) -> IntArray { let mut ranges: Vec = Vec::new(); + let mut last_id = None; for id in ids { - if let Some(pos) = ranges.iter().position(|x| id == (x.start + x.count)) { - ranges[pos].count += 1; + debug_assert!(last_id.map(|last_id| last_id < id).unwrap_or(true)); + if last_id.map(|last_id| last_id + 1 == id).unwrap_or(false) { + ranges.last_mut().unwrap().count += 1; } else { ranges.push(IntRange::new(id, 1, 1)); } + last_id = Some(id); } IntArray { ranges } } pub fn from_id(id: u32) -> IntArray { - Self::from_ids(vec![id]) + Self::from_sorted_ids([id].into_iter()) } pub fn from_range(start: u32, count: u32) -> Self { @@ -89,22 +92,19 @@ impl FromStr for IntArray { impl fmt::Display for IntArray { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let mut str = String::new(); - for x in &self.ranges { + for (idx, x) in self.ranges.iter().enumerate() { + if idx > 0 { + write!(f, ",")?; + } if x.count == 1 { - str.push_str(&format!("{}, ", x.start)); + write!(f, "{}", x.start)?; } else if x.step == 1 { - str.push_str(&format!("{}-{}, ", x.start, x.start + x.count - 1)); + write!(f, "{}-{}", x.start, x.start + x.count - 1)?; } else { - str.push_str(&format!( - "{}-{}:{}, ", - x.start, - x.start + x.count - 1, - x.step - )); + write!(f, "{}-{}:{}", x.start, x.start + x.count - 1, x.step)?; } } - write!(f, "{}", &str[0..str.len() - 2]) + Ok(()) } } diff --git a/crates/hyperqueue/src/common/cli.rs b/crates/hyperqueue/src/common/cli.rs index ec3b6236e..a5e6b992a 100644 --- a/crates/hyperqueue/src/common/cli.rs +++ b/crates/hyperqueue/src/common/cli.rs @@ -9,7 +9,7 @@ use tako::WorkerId; use crate::client::commands::autoalloc::AutoAllocOpts; use crate::client::commands::event::EventLogOpts; use crate::client::commands::job::{ - JobCancelOpts, JobCatOpts, JobForgetOpts, JobInfoOpts, JobListOpts, + JobCancelOpts, JobCatOpts, JobForgetOpts, JobInfoOpts, JobListOpts, JobTaskIdsOpts, }; use crate::client::commands::log::LogOpts; use crate::client::commands::server::ServerOpts; @@ -320,6 +320,8 @@ pub enum JobCommand { Wait(JobWaitOpts), /// Interactively observe the execution of a job Progress(JobProgressOpts), + /// Print task Ids for given job + TaskIds(JobTaskIdsOpts), } #[derive(Parser)] diff --git a/crates/hyperqueue/src/server/client/submit.rs b/crates/hyperqueue/src/server/client/submit.rs index 18c0466d7..1c6d7a9a3 100644 --- a/crates/hyperqueue/src/server/client/submit.rs +++ b/crates/hyperqueue/src/server/client/submit.rs @@ -194,7 +194,7 @@ pub async fn handle_resubmit( ids.sort_unstable(); JobDescription::Array { - ids: IntArray::from_ids(ids), + ids: IntArray::from_sorted_ids(ids.into_iter()), entries: entries.clone(), task_desc: task_desc.clone(), } diff --git a/crates/pyhq/src/client/job.rs b/crates/pyhq/src/client/job.rs index 55ccc2234..d63cbdcf3 100644 --- a/crates/pyhq/src/client/job.rs +++ b/crates/pyhq/src/client/job.rs @@ -233,11 +233,10 @@ pub fn wait_for_jobs_impl( let mut response: JobInfoResponse; loop { - let selector = IdSelector::Specific(IntArray::from_ids( - remaining_job_ids.iter().copied().collect(), - )); + let selector = + IdSelector::Specific(IntArray::from_sorted_ids(remaining_job_ids.iter().copied())); - response = hyperqueue::rpc_call!( + response = rpc_call!( ctx.session.connection(), FromClientMessage::JobInfo(JobInfoRequest { selector, @@ -311,7 +310,7 @@ pub fn get_failed_tasks_impl( ) -> PyResult { run_future(async move { let message = FromClientMessage::JobDetail(JobDetailRequest { - job_id_selector: IdSelector::Specific(IntArray::from_ids(job_ids)), + job_id_selector: IdSelector::Specific(IntArray::from_sorted_ids(job_ids.into_iter())), task_selector: Some(TaskSelector { id_selector: TaskIdSelector::All, status_selector: TaskStatusSelector::Specific(vec![Status::Failed]), diff --git a/docs/jobs/arrays.md b/docs/jobs/arrays.md index d4024ef62..11381b8db 100644 --- a/docs/jobs/arrays.md +++ b/docs/jobs/arrays.md @@ -100,3 +100,19 @@ If `items.json` contained this content: ``` then HyperQueue would create two tasks, one with `HQ_ENTRY` set to `{"batch_size": 4, "learning_rate": 0.01}` and the other with `HQ_ENTRY` set to `{"batch_size": 8, "learning_rate": 0.001}`. + +### Combining with `--each-line`/`--from-json` with `--array` + +Option `--each-line` or `--from-json` can be combined with option `--array`. +In such case, only a subset of lines/json will be submitted. +If `--array` defines an ID that exceeds the number of lines in the file (or the number of elements in JSON), then the ID is silently removed. + + +For example: + +```commandline +$ hq submit --each-line input.txt --array "2, 8-10" +``` + +If `input.txt` has sufficiently many lines then it will create array job with four tasks. One for 3rd line of file and three tasks for 9th-11th line +(note that first line has id 0). It analogously works for `--from-json`. \ No newline at end of file diff --git a/docs/jobs/failure.md b/docs/jobs/failure.md index 26ad1a686..61637a927 100644 --- a/docs/jobs/failure.md +++ b/docs/jobs/failure.md @@ -1,29 +1,42 @@ In distributed systems, failure is inevitable. This sections describes how HyperQueue handles various types of failures and how can you affect its behavior. -## Resubmitting jobs -When a job fails or is canceled, you might want to submit it again, without the need to pass all the original -parameters. You can achieve this using **resubmit**: +## Resubmitting array jobs +When a job fails or is canceled, you can submit it again. +However, in case of [task arrays](arrays.md), different tasks may end in different states, and often we want to +recompute only tasks with a specific status (e.g. failed tasks). -```bash -$ hq job resubmit +By following combination of commands you may recompute only failed tasks. Let us assume that we want to recompute +all failed tasks in job 5: + +```commandline +$ hq submit --array=`hq job task-ids 5 --filter=failed` ./my-computation +``` +It works as follows: Command `hq job task-ids 5 --filter=failed` returns IDs of failed jobs of job `5`, and we set +it to `--array` parameter that starts only tasks for given IDs. + +If we want to recompute all failed tasks and all canceled tasks we can do it as follows: + +```commandline +$ hq submit --array=`hq job task-ids 5 --filter=failed,canceled` ./my-computation ``` -It wil create a new job that has the same configuration as the job with the entered job id. +Note that it also works with `--each-line` or `--from-json`, i.e.: + +```commandline +# Original computation +$ hq submit --each-line=input.txt ./my-computation -This is especially useful for [task arrays](arrays.md). By default, `resubmit` will submit all tasks of the original job; -however, you can specify only a subset of tasks based on their [state](jobs.md#task-state): -```bash -$ hq job resubmit --status=failed,canceled +# Resubmitting failed jobs +$ hq submit --each-line=input.txt --array=`hq job task-ids last --filter=failed` ./my-computation ``` -Using this command you can resubmit e.g. only the tasks that have failed, without the need to recompute all tasks of -a large task array. ## Task restart -Sometimes a worker might crash while it is executing some task. In that case the server will reschedule that task to a -different worker and the task will begin executing from the beginning. + +Sometimes a worker might crash while it is executing some task. In that case the server will automatically +reschedule that task to a different worker and the task will begin executing from the beginning. In order to let the executed application know that the same task is being executed repeatedly, HyperQueue assigns each execution a separate **Instance ID**. It is a 32b non-negative number that identifies each (re-)execution of a task. @@ -43,7 +56,7 @@ You can change this behavior with the `--max-fails=` option of the `submit` c If specified, once more tasks than `X` tasks fail, the rest of the job's tasks that were not completed yet will be canceled. For example: -```bash +```commandline $ hq submit --array 1-1000 --max-fails 5 ... ``` This will create a task array with `1000` tasks. Once `5` or more tasks fail, the remaining uncompleted tasks of the job diff --git a/tests/test_entries.py b/tests/test_entries.py index a9929d4de..99c9199df 100644 --- a/tests/test_entries.py +++ b/tests/test_entries.py @@ -110,3 +110,67 @@ def test_entries_invalid_from_json_entry(hq_env: HqEnv): "echo $HQ_ENTRY", ] ) + + +def test_each_line_with_array(hq_env: HqEnv): + hq_env.start_server() + hq_env.start_worker(cpus=2) + + with open("input", "w") as f: + f.write("One\nTwo\nThree\nFour\nFive\nSix\nSeven") + + hq_env.command( + [ + "submit", + "--each-line=input", + "--array", + "2-4, 6", + "--", + "bash", + "-c", + "echo $HQ_ENTRY,$HQ_TASK_ID", + ] + ) + wait_for_job_state(hq_env, 1, "FINISHED") + + for i, test in enumerate([None, None, "Three,2\n", "Four,3\n", "Five,4\n", None, "Seven,6\n"]): + filename = default_task_output(job_id=1, task_id=i) + if test is None: + assert not os.path.exists(filename) + else: + check_file_contents(filename, test) + + table = hq_env.command(["job", "info", "1"], as_table=True) + assert table.get_row_value("State").split("\n")[-1] == "FINISHED (4)" + + +def test_json_with_array(hq_env: HqEnv): + hq_env.start_server() + hq_env.start_worker(cpus=2) + + with open("input", "w") as f: + f.write('["One", "Two", "Three", "Four", "Five", "Six", "Seven"]') + + hq_env.command( + [ + "submit", + "--from-json=input", + "--array", + "2-3, 5, 6, 7, 1000", + "--", + "bash", + "-c", + "echo $HQ_ENTRY,$HQ_TASK_ID", + ] + ) + wait_for_job_state(hq_env, 1, "FINISHED") + + for i, test in enumerate([None, None, '"Three",2\n', '"Four",3\n', None, '"Six",5\n', '"Seven",6\n']): + filename = default_task_output(job_id=1, task_id=i) + if test is None: + assert not os.path.exists(filename) + else: + check_file_contents(filename, test) + + table = hq_env.command(["job", "info", "1"], as_table=True) + assert table.get_row_value("State").split("\n")[-1] == "FINISHED (4)" diff --git a/tests/test_job.py b/tests/test_job.py index d00adec63..c8aaddcc7 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -702,10 +702,10 @@ def test_job_resubmit_with_status(hq_env: HqEnv): wait_for_job_state(hq_env, 1, "FAILED") table = hq_env.command(["job", "resubmit", "1", "--filter=failed"], as_table=True) - table.check_row_value("Tasks", "4; Ids: 4-6, 8") + table.check_row_value("Tasks", "4; Ids: 4-6,8") table = hq_env.command(["job", "resubmit", "1", "--filter=finished"], as_table=True) - table.check_row_value("Tasks", "3; Ids: 3, 7, 9") + table.check_row_value("Tasks", "3; Ids: 3,7,9") def test_job_resubmit_all(hq_env: HqEnv): @@ -715,7 +715,7 @@ def test_job_resubmit_all(hq_env: HqEnv): wait_for_job_state(hq_env, 1, "FINISHED") table = hq_env.command(["job", "resubmit", "1"], as_table=True) - table.check_row_value("Tasks", "3; Ids: 2, 7, 9") + table.check_row_value("Tasks", "3; Ids: 2,7,9") def test_job_resubmit_empty(hq_env: HqEnv): @@ -1476,3 +1476,31 @@ def check_child_process_exited(hq_env: HqEnv, stop_fn: Callable[[subprocess.Pope parent, child = pids wait_for_pid_exit(parent) wait_for_pid_exit(child) + + +def test_job_task_ids(hq_env: HqEnv): + hq_env.start_server() + hq_env.command( + [ + "submit", + "--array=2,7,9,20-30", + "--", + "python", + "-c", + "import os; assert os.environ['HQ_TASK_ID'] not in ['25', '26', '27', '28']", + ] + ) + hq_env.start_workers(1, cpus=1) + wait_for_job_state(hq_env, 1, "FAILED") + + result = hq_env.command(["job", "task-ids", "1"]) + assert result == "2,7,9,20-30\n" + + result = hq_env.command(["job", "task-ids", "1", "--filter", "finished"]) + assert result == "2,7,9,20-24,29-30\n" + + result = hq_env.command(["job", "task-ids", "1", "--filter", "failed"]) + assert result == "25-28\n" + + result = hq_env.command(["job", "task-ids", "1", "--filter", "canceled"]) + assert result == "\n" diff --git a/tests/test_jobfile.py b/tests/test_jobfile.py index c7be6d4cd..569dfddbf 100644 --- a/tests/test_jobfile.py +++ b/tests/test_jobfile.py @@ -219,7 +219,7 @@ def test_job_file_array(hq_env: HqEnv, tmp_path): """) hq_env.command(["job", "submit-file", "job.toml"]) r = hq_env.command(["job", "info", "1"], as_table=True) - r.check_row_value("Tasks", "7; Ids: 2, 10-14, 120") + r.check_row_value("Tasks", "7; Ids: 2,10-14,120") def test_job_file_fail_mixing_array_and_tasks(hq_env: HqEnv, tmp_path):