diff --git a/CHANGELOG.md b/CHANGELOG.md index ae4cc54a3..5ef7af38f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## Breaking change +* Mechanism for resubmitting tasks was changed. Command `resubmit` was removed, see the manual for replacement. + * The output format of the `job info` command with JSON output mode has been changed. Note that the JSON output mode is still unstable. diff --git a/crates/hyperqueue/src/bin/hq.rs b/crates/hyperqueue/src/bin/hq.rs index 8a9186e31..77c8879f0 100644 --- a/crates/hyperqueue/src/bin/hq.rs +++ b/crates/hyperqueue/src/bin/hq.rs @@ -16,8 +16,7 @@ use hyperqueue::client::commands::job::{ use hyperqueue::client::commands::log::command_log; use hyperqueue::client::commands::server::command_server; use hyperqueue::client::commands::submit::{ - resubmit_computation, submit_computation, submit_computation_from_job_file, JobResubmitOpts, - JobSubmitFileOpts, JobSubmitOpts, + submit_computation, submit_computation_from_job_file, JobSubmitFileOpts, JobSubmitOpts, }; use hyperqueue::client::commands::wait::{wait_for_jobs, wait_for_jobs_with_progress}; use hyperqueue::client::commands::worker::{ @@ -130,14 +129,6 @@ async fn command_job_delete(gsettings: &GlobalSettings, opts: JobForgetOpts) -> forget_job(gsettings, &mut connection, opts).await } -async fn command_job_resubmit( - gsettings: &GlobalSettings, - opts: JobResubmitOpts, -) -> anyhow::Result<()> { - let mut connection = get_client_session(gsettings.server_directory()).await?; - resubmit_computation(gsettings, &mut connection, opts).await -} - async fn command_job_task_ids( gsettings: &GlobalSettings, opts: JobTaskIdsOpts, @@ -431,9 +422,6 @@ async fn main() -> hyperqueue::Result<()> { SubCommand::Job(JobOpts { subcmd: JobCommand::Forget(opts), }) => command_job_delete(&gsettings, opts).await, - SubCommand::Job(JobOpts { - subcmd: JobCommand::Resubmit(opts), - }) => command_job_resubmit(&gsettings, opts).await, SubCommand::Job(JobOpts { subcmd: JobCommand::Wait(opts), }) => command_job_wait(&gsettings, opts).await, diff --git a/crates/hyperqueue/src/client/commands/submit/command.rs b/crates/hyperqueue/src/client/commands/submit/command.rs index 2e2e5aad8..66da43002 100644 --- a/crates/hyperqueue/src/client/commands/submit/command.rs +++ b/crates/hyperqueue/src/client/commands/submit/command.rs @@ -22,9 +22,7 @@ use super::directives::parse_hq_directives; use crate::client::commands::submit::directives::parse_hq_directives_from_file; use crate::client::commands::wait::{wait_for_jobs, wait_for_jobs_with_progress}; use crate::client::globalsettings::GlobalSettings; -use crate::client::job::get_worker_map; use crate::client::resources::{parse_allocation_request, parse_resource_request}; -use crate::client::status::Status; use crate::common::arraydef::IntArray; use crate::common::cli::OptsWithMatches; use crate::common::parser2::{all_consuming, CharParser, ParseError}; @@ -37,8 +35,8 @@ use crate::common::utils::str::pluralize; use crate::common::utils::time::parse_human_time; use crate::transfer::connection::ClientSession; use crate::transfer::messages::{ - FromClientMessage, IdSelector, JobDescription, PinMode, ResubmitRequest, SubmitRequest, - TaskDescription, ToClientMessage, + FromClientMessage, IdSelector, JobDescription, PinMode, SubmitRequest, TaskDescription, + ToClientMessage, }; use crate::{rpc_call, JobTaskCount, Map}; @@ -844,36 +842,6 @@ fn check_suspicious_options(opts: &JobSubmitOpts, task_count: u32) -> anyhow::Re Ok(()) } -#[derive(Parser)] -pub struct JobResubmitOpts { - /// Job that should be resubmitted - job_id: u32, - - /// Resubmit only tasks with the given states. - /// You can use multiple states separated by a comma. - #[arg(long, value_delimiter(','), value_enum)] - filter: Vec, -} - -pub async fn resubmit_computation( - gsettings: &GlobalSettings, - session: &mut ClientSession, - opts: JobResubmitOpts, -) -> anyhow::Result<()> { - let message = FromClientMessage::Resubmit(ResubmitRequest { - job_id: opts.job_id.into(), - filter: opts.filter, - }); - let response = - rpc_call!(session.connection(), message, ToClientMessage::SubmitResponse(r) => r).await?; - gsettings.printer().print_job_detail( - vec![response.job], - get_worker_map(session).await?, - &response.server_uid, - ); - Ok(()) -} - fn validate_name(name: String) -> anyhow::Result { match name { name if name.contains('\n') || name.contains('\t') => { diff --git a/crates/hyperqueue/src/client/commands/submit/mod.rs b/crates/hyperqueue/src/client/commands/submit/mod.rs index ca2bc7a9b..046399406 100644 --- a/crates/hyperqueue/src/client/commands/submit/mod.rs +++ b/crates/hyperqueue/src/client/commands/submit/mod.rs @@ -4,6 +4,6 @@ pub mod directives; mod jobfile; pub use command::SubmitJobConfOpts; -pub use command::{resubmit_computation, submit_computation, JobResubmitOpts, JobSubmitOpts}; +pub use command::{submit_computation, JobSubmitOpts}; pub use jobfile::{submit_computation_from_job_file, JobSubmitFileOpts}; diff --git a/crates/hyperqueue/src/common/cli.rs b/crates/hyperqueue/src/common/cli.rs index a5e6b992a..b14d0e592 100644 --- a/crates/hyperqueue/src/common/cli.rs +++ b/crates/hyperqueue/src/common/cli.rs @@ -13,7 +13,7 @@ use crate::client::commands::job::{ }; use crate::client::commands::log::LogOpts; use crate::client::commands::server::ServerOpts; -use crate::client::commands::submit::{JobResubmitOpts, JobSubmitFileOpts, JobSubmitOpts}; +use crate::client::commands::submit::{JobSubmitFileOpts, JobSubmitOpts}; use crate::client::commands::worker::{WorkerFilter, WorkerStartOpts}; use crate::client::output::outputs::Outputs; use crate::client::status::Status; @@ -299,7 +299,6 @@ pub enum JobCommand { Info(JobInfoOpts), /// Cancel a specific job. /// This will cancel all tasks, stopping them from being computation. - /// The job will still remain in the server's memory, and you will be able to resubmit it later. Cancel(JobCancelOpts), /// Forget a specific job. /// This will remove the job from the server's memory, forgetting it completely and reducing @@ -314,8 +313,6 @@ pub enum JobCommand { Submit(JobSubmitOpts), /// Submit a job through a job definition file SubmitFile(JobSubmitFileOpts), - /// Resubmits tasks of a job - Resubmit(JobResubmitOpts), /// Waits until a job is finished Wait(JobWaitOpts), /// Interactively observe the execution of a job diff --git a/crates/hyperqueue/src/server/client/mod.rs b/crates/hyperqueue/src/server/client/mod.rs index ae78b503b..64c164850 100644 --- a/crates/hyperqueue/src/server/client/mod.rs +++ b/crates/hyperqueue/src/server/client/mod.rs @@ -92,9 +92,6 @@ pub async fn client_rpc_loop< submit::handle_submit(&state_ref, &tako_ref, msg).await } FromClientMessage::JobInfo(msg) => compute_job_info(&state_ref, &msg.selector), - FromClientMessage::Resubmit(msg) => { - submit::handle_resubmit(&state_ref, &tako_ref, msg).await - } FromClientMessage::Stop => { end_flag.notify_one(); break; diff --git a/crates/hyperqueue/src/server/client/submit.rs b/crates/hyperqueue/src/server/client/submit.rs index 1c6d7a9a3..421e54dbd 100644 --- a/crates/hyperqueue/src/server/client/submit.rs +++ b/crates/hyperqueue/src/server/client/submit.rs @@ -14,7 +14,6 @@ use tako::gateway::{ use tako::program::ProgramDefinition; use tako::TaskId; -use crate::client::status::get_task_status; use crate::common::arraydef::IntArray; use crate::common::env::{HQ_ENTRY, HQ_JOB_ID, HQ_SUBMIT_DIR, HQ_TASK_ID}; use crate::common::placeholders::{ @@ -25,8 +24,8 @@ use crate::server::rpc::Backend; use crate::server::state::{State, StateRef}; use crate::stream::server::control::StreamServerControlMessage; use crate::transfer::messages::{ - JobDescription, ResubmitRequest, SubmitRequest, SubmitResponse, TaskBody, TaskDescription, - TaskIdSelector, TaskSelector, TaskStatusSelector, TaskWithDependencies, ToClientMessage, + JobDescription, SubmitRequest, SubmitResponse, TaskBody, TaskDescription, TaskIdSelector, + TaskSelector, TaskStatusSelector, TaskWithDependencies, ToClientMessage, }; use crate::{JobId, JobTaskId, Priority, TakoTaskId}; @@ -141,84 +140,6 @@ fn prepare_job(request: &mut SubmitRequest, state: &mut State) -> (JobId, TakoTa (job_id, state.new_task_id(task_count)) } -pub async fn handle_resubmit( - state_ref: &StateRef, - tako_ref: &Backend, - message: ResubmitRequest, -) -> ToClientMessage { - let msg_submit: SubmitRequest = { - let state = state_ref.get_mut(); - let job = state.get_job(message.job_id); - - if let Some(job) = job { - match job.job_desc { - JobDescription::Array { .. } => {} - _ => { - return ToClientMessage::Error( - "Resubmit is not supported for this job".to_string(), - ) - } - } - - if job.log.is_some() { - return ToClientMessage::Error( - "Resubmit is not currently supported when output streaming (`--log`) is used" - .to_string(), - ); - } - - let job_desc = if !message.filter.is_empty() { - match &job.job_desc { - JobDescription::Array { - task_desc, - entries, - ids: _, - } => { - let mut ids: Vec = job - .tasks - .values() - .filter_map(|v| { - if message.filter.contains(&get_task_status(&v.state)) { - Some(v.task_id.as_num()) - } else { - None - } - }) - .collect(); - - if ids.is_empty() { - return ToClientMessage::Error( - "Filtered task(s) are empty, can't submit empty job".to_string(), - ); - } - - ids.sort_unstable(); - JobDescription::Array { - ids: IntArray::from_sorted_ids(ids.into_iter()), - entries: entries.clone(), - task_desc: task_desc.clone(), - } - } - _ => unimplemented!(), - } - } else { - job.job_desc.clone() - }; - - SubmitRequest { - job_desc, - name: job.name.clone(), - max_fails: job.max_fails, - submit_dir: std::env::current_dir().expect("Cannot get current working directory"), - log: None, // TODO: Reuse log configuration - } - } else { - return ToClientMessage::Error("Invalid job_id".to_string()); - } - }; - handle_submit(state_ref, tako_ref, msg_submit).await -} - async fn start_log_streaming(tako_ref: &Backend, job_id: JobId, path: PathBuf) { let (sender, receiver) = oneshot::channel(); tako_ref.send_stream_control(StreamServerControlMessage::RegisterStream { diff --git a/crates/hyperqueue/src/transfer/messages.rs b/crates/hyperqueue/src/transfer/messages.rs index fa57e55e0..fe0ed4d18 100644 --- a/crates/hyperqueue/src/transfer/messages.rs +++ b/crates/hyperqueue/src/transfer/messages.rs @@ -22,7 +22,6 @@ use tako::worker::WorkerConfiguration; #[derive(Serialize, Deserialize, Debug)] pub enum FromClientMessage { Submit(SubmitRequest), - Resubmit(ResubmitRequest), Cancel(CancelRequest), ForgetJob(ForgetJobRequest), JobDetail(JobDetailRequest), @@ -147,12 +146,6 @@ pub struct TaskSelector { pub status_selector: TaskStatusSelector, } -#[derive(Serialize, Deserialize, Debug)] -pub struct ResubmitRequest { - pub job_id: JobId, - pub filter: Vec, -} - #[derive(Serialize, Deserialize, Debug)] pub struct CancelRequest { pub selector: IdSelector, diff --git a/crates/pyhq/python/hyperqueue/client.py b/crates/pyhq/python/hyperqueue/client.py index 619154ef9..3a9aff9e1 100644 --- a/crates/pyhq/python/hyperqueue/client.py +++ b/crates/pyhq/python/hyperqueue/client.py @@ -83,7 +83,9 @@ def submit(self, job: Job) -> SubmittedJob: raise Exception("Submitted job must have at least a single task") job_id = self.connection.submit_job(job_desc) - logging.info(f"Submitted job {job_id} with {task_count} {pluralize('task', task_count)}") + logging.info( + f"Submitted job {job_id} with {task_count} {pluralize('task', task_count)}" + ) return SubmittedJob(job=job, id=job_id) def wait_for_jobs(self, jobs: Sequence[SubmittedJob], raise_on_error=True) -> bool: @@ -93,7 +95,9 @@ def wait_for_jobs(self, jobs: Sequence[SubmittedJob], raise_on_error=True) -> bo job_ids_str = ",".join(str(id) for id in job_ids) if len(jobs) > 1: job_ids_str = "{" + job_ids_str + "}" - logging.info(f"Waiting for {pluralize('job', len(jobs))} {job_ids_str} to finish") + logging.info( + f"Waiting for {pluralize('job', len(jobs))} {job_ids_str} to finish" + ) callback = create_progress_callback() diff --git a/crates/pyhq/python/hyperqueue/ffi/protocol.py b/crates/pyhq/python/hyperqueue/ffi/protocol.py index da3e324e5..b1bf6760c 100644 --- a/crates/pyhq/python/hyperqueue/ffi/protocol.py +++ b/crates/pyhq/python/hyperqueue/ffi/protocol.py @@ -7,7 +7,9 @@ class ResourceRequest: n_nodes: int = 0 - resources: Dict[str, Union[int, float, str]] = dataclasses.field(default_factory=dict) + resources: Dict[str, Union[int, float, str]] = dataclasses.field( + default_factory=dict + ) min_time: Optional[float] = None def __init__( diff --git a/crates/pyhq/python/hyperqueue/job.py b/crates/pyhq/python/hyperqueue/job.py index de29f7be5..ef9f89994 100644 --- a/crates/pyhq/python/hyperqueue/job.py +++ b/crates/pyhq/python/hyperqueue/job.py @@ -31,7 +31,11 @@ def __init__( self.tasks: List[Task] = [] self.task_map: Dict[TaskId, Task] = {} self.max_fails = max_fails - self.default_workdir = Path(default_workdir).resolve() if default_workdir is not None else default_workdir + self.default_workdir = ( + Path(default_workdir).resolve() + if default_workdir is not None + else default_workdir + ) self.default_env = default_env or {} def task_by_id(self, id: TaskId) -> Optional[Task]: diff --git a/crates/pyhq/python/hyperqueue/output.py b/crates/pyhq/python/hyperqueue/output.py index ffa8910ac..4b1265f13 100644 --- a/crates/pyhq/python/hyperqueue/output.py +++ b/crates/pyhq/python/hyperqueue/output.py @@ -40,9 +40,13 @@ def default_stderr() -> str: # TODO: how to resolve TASK_ID in the context of some other task? class Output: - def __init__(self, name: str, filepath: Optional[str] = None, extension: Optional[str] = None): + def __init__( + self, name: str, filepath: Optional[str] = None, extension: Optional[str] = None + ): if filepath and extension: - raise ValidationException("Parameters `filepath` and `extension` are mutually exclusive") + raise ValidationException( + "Parameters `filepath` and `extension` are mutually exclusive" + ) self.name = name self.filepath = filepath diff --git a/crates/pyhq/python/hyperqueue/task/function/wrapper.py b/crates/pyhq/python/hyperqueue/task/function/wrapper.py index e99b26c56..5919c6bd9 100644 --- a/crates/pyhq/python/hyperqueue/task/function/wrapper.py +++ b/crates/pyhq/python/hyperqueue/task/function/wrapper.py @@ -9,7 +9,9 @@ class CloudWrapper: Wraps a callable so that cloudpickle is used to pickle it, caching the pickle. """ - def __init__(self, fn, pickled_fn=None, cache=True, protocol=cloudpickle.DEFAULT_PROTOCOL): + def __init__( + self, fn, pickled_fn=None, cache=True, protocol=cloudpickle.DEFAULT_PROTOCOL + ): if fn is None: if pickled_fn is None: raise ValueError("Pass at least one of `fn` and `pickled_fn`") @@ -25,7 +27,9 @@ def __init__(self, fn, pickled_fn=None, cache=True, protocol=cloudpickle.DEFAULT self.pickled_fn = pickled_fn self.cache = cache self.protocol = protocol - self.__doc__ = "CloudWrapper for {!r}. Original doc:\n\n{}".format(self.fn, self.fn.__doc__) + self.__doc__ = "CloudWrapper for {!r}. Original doc:\n\n{}".format( + self.fn, self.fn.__doc__ + ) if hasattr(self.fn, "__name__"): self.__name__ = self.fn.__name__ @@ -52,7 +56,9 @@ def _get_pickled_fn(self): return pfn def __call__(self, *args, **kwargs): - logging.debug(f"Running function {self.fn} using args {args} and kwargs {kwargs}") + logging.debug( + f"Running function {self.fn} using args {args} and kwargs {kwargs}" + ) return self.fn(*args, **kwargs) def __reduce__(self): diff --git a/crates/pyhq/python/hyperqueue/task/program.py b/crates/pyhq/python/hyperqueue/task/program.py index 48b869bb0..8a8aa22f9 100644 --- a/crates/pyhq/python/hyperqueue/task/program.py +++ b/crates/pyhq/python/hyperqueue/task/program.py @@ -95,6 +95,8 @@ def get_task_outputs(task: ExternalProgram) -> Dict[str, Output]: outputs = gather_outputs(task.args) + gather_outputs(task.env) for output in outputs: if output.name in output_map: - raise ValidationException(f"Output `{output.name}` has been defined multiple times") + raise ValidationException( + f"Output `{output.name}` has been defined multiple times" + ) output_map[output.name] = output return output_map diff --git a/tests/test_job.py b/tests/test_job.py index c8aaddcc7..8d7e74eda 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -686,62 +686,6 @@ def test_job_last(hq_env: HqEnv): table.check_row_value("ID", "2") -def test_job_resubmit_with_status(hq_env: HqEnv): - hq_env.start_server() - hq_env.command( - [ - "submit", - "--array=3-9", - "--", - "python3", - "-c", - "import os; assert os.environ['HQ_TASK_ID'] not in ['4', '5', '6', '8']", - ] - ) - hq_env.start_workers(2, cpus=1) - wait_for_job_state(hq_env, 1, "FAILED") - - table = hq_env.command(["job", "resubmit", "1", "--filter=failed"], as_table=True) - table.check_row_value("Tasks", "4; Ids: 4-6,8") - - table = hq_env.command(["job", "resubmit", "1", "--filter=finished"], as_table=True) - table.check_row_value("Tasks", "3; Ids: 3,7,9") - - -def test_job_resubmit_all(hq_env: HqEnv): - hq_env.start_server() - hq_env.command(["submit", "--array=2,7,9", "--", "/bin/hostname"]) - hq_env.start_workers(2, cpus=1) - wait_for_job_state(hq_env, 1, "FINISHED") - - table = hq_env.command(["job", "resubmit", "1"], as_table=True) - table.check_row_value("Tasks", "3; Ids: 2,7,9") - - -def test_job_resubmit_empty(hq_env: HqEnv): - hq_env.start_server() - hq_env.command(["submit", "--array=2,7,9", "--", "/bin/hostname"]) - hq_env.start_workers(2, cpus=1) - wait_for_job_state(hq_env, 1, "FINISHED") - - hq_env.command( - ["job", "resubmit", "--filter=canceled", "1"], - expect_fail="Filtered task(s) are empty, can't submit empty job", - ) - - -def test_job_resubmit_with_log(hq_env: HqEnv): - hq_env.start_server() - hq_env.command(["submit", "--array=1-10", "--log", "foo.bin", "--", "/bin/nonexisting"]) - hq_env.start_workers(1) - wait_for_job_state(hq_env, 1, "FAILED") - - hq_env.command( - ["job", "resubmit", "1"], - expect_fail="Resubmit is not currently supported when output streaming (`--log`) is used", - ) - - def test_job_priority(hq_env: HqEnv, tmp_path): hq_env.start_server() hq_env.command(