Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove resubmit #650

Merged
merged 3 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Breaking change

* Mechanism for resubmitting tasks was changed. Command `resubmit` was removed,
see https://it4innovations.github.io/hyperqueue/latest/jobs/failure/ for replacement.

* The output format of the `job info` command with JSON output mode has been changed. Note that
the JSON output mode is still unstable.

Expand Down
14 changes: 1 addition & 13 deletions crates/hyperqueue/src/bin/hq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ use hyperqueue::client::commands::job::{
use hyperqueue::client::commands::log::command_log;
use hyperqueue::client::commands::server::command_server;
use hyperqueue::client::commands::submit::{
resubmit_computation, submit_computation, submit_computation_from_job_file, JobResubmitOpts,
JobSubmitFileOpts, JobSubmitOpts,
submit_computation, submit_computation_from_job_file, JobSubmitFileOpts, JobSubmitOpts,
};
use hyperqueue::client::commands::wait::{wait_for_jobs, wait_for_jobs_with_progress};
use hyperqueue::client::commands::worker::{
Expand Down Expand Up @@ -130,14 +129,6 @@ async fn command_job_delete(gsettings: &GlobalSettings, opts: JobForgetOpts) ->
forget_job(gsettings, &mut connection, opts).await
}

async fn command_job_resubmit(
gsettings: &GlobalSettings,
opts: JobResubmitOpts,
) -> anyhow::Result<()> {
let mut connection = get_client_session(gsettings.server_directory()).await?;
resubmit_computation(gsettings, &mut connection, opts).await
}

async fn command_job_task_ids(
gsettings: &GlobalSettings,
opts: JobTaskIdsOpts,
Expand Down Expand Up @@ -431,9 +422,6 @@ async fn main() -> hyperqueue::Result<()> {
SubCommand::Job(JobOpts {
subcmd: JobCommand::Forget(opts),
}) => command_job_delete(&gsettings, opts).await,
SubCommand::Job(JobOpts {
subcmd: JobCommand::Resubmit(opts),
}) => command_job_resubmit(&gsettings, opts).await,
SubCommand::Job(JobOpts {
subcmd: JobCommand::Wait(opts),
}) => command_job_wait(&gsettings, opts).await,
Expand Down
36 changes: 2 additions & 34 deletions crates/hyperqueue/src/client/commands/submit/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ use super::directives::parse_hq_directives;
use crate::client::commands::submit::directives::parse_hq_directives_from_file;
use crate::client::commands::wait::{wait_for_jobs, wait_for_jobs_with_progress};
use crate::client::globalsettings::GlobalSettings;
use crate::client::job::get_worker_map;
use crate::client::resources::{parse_allocation_request, parse_resource_request};
use crate::client::status::Status;
use crate::common::arraydef::IntArray;
use crate::common::cli::OptsWithMatches;
use crate::common::parser2::{all_consuming, CharParser, ParseError};
Expand All @@ -37,8 +35,8 @@ use crate::common::utils::str::pluralize;
use crate::common::utils::time::parse_human_time;
use crate::transfer::connection::ClientSession;
use crate::transfer::messages::{
FromClientMessage, IdSelector, JobDescription, PinMode, ResubmitRequest, SubmitRequest,
TaskDescription, ToClientMessage,
FromClientMessage, IdSelector, JobDescription, PinMode, SubmitRequest, TaskDescription,
ToClientMessage,
};
use crate::{rpc_call, JobTaskCount, Map};

Expand Down Expand Up @@ -844,36 +842,6 @@ fn check_suspicious_options(opts: &JobSubmitOpts, task_count: u32) -> anyhow::Re
Ok(())
}

#[derive(Parser)]
pub struct JobResubmitOpts {
/// Job that should be resubmitted
job_id: u32,

/// Resubmit only tasks with the given states.
/// You can use multiple states separated by a comma.
#[arg(long, value_delimiter(','), value_enum)]
filter: Vec<Status>,
}

pub async fn resubmit_computation(
gsettings: &GlobalSettings,
session: &mut ClientSession,
opts: JobResubmitOpts,
) -> anyhow::Result<()> {
let message = FromClientMessage::Resubmit(ResubmitRequest {
job_id: opts.job_id.into(),
filter: opts.filter,
});
let response =
rpc_call!(session.connection(), message, ToClientMessage::SubmitResponse(r) => r).await?;
gsettings.printer().print_job_detail(
vec![response.job],
get_worker_map(session).await?,
&response.server_uid,
);
Ok(())
}

fn validate_name(name: String) -> anyhow::Result<String> {
match name {
name if name.contains('\n') || name.contains('\t') => {
Expand Down
2 changes: 1 addition & 1 deletion crates/hyperqueue/src/client/commands/submit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ pub mod directives;
mod jobfile;

pub use command::SubmitJobConfOpts;
pub use command::{resubmit_computation, submit_computation, JobResubmitOpts, JobSubmitOpts};
pub use command::{submit_computation, JobSubmitOpts};

pub use jobfile::{submit_computation_from_job_file, JobSubmitFileOpts};
5 changes: 1 addition & 4 deletions crates/hyperqueue/src/common/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::client::commands::job::{
};
use crate::client::commands::log::LogOpts;
use crate::client::commands::server::ServerOpts;
use crate::client::commands::submit::{JobResubmitOpts, JobSubmitFileOpts, JobSubmitOpts};
use crate::client::commands::submit::{JobSubmitFileOpts, JobSubmitOpts};
use crate::client::commands::worker::{WorkerFilter, WorkerStartOpts};
use crate::client::output::outputs::Outputs;
use crate::client::status::Status;
Expand Down Expand Up @@ -299,7 +299,6 @@ pub enum JobCommand {
Info(JobInfoOpts),
/// Cancel a specific job.
/// This will cancel all tasks, stopping them from being computation.
/// The job will still remain in the server's memory, and you will be able to resubmit it later.
Cancel(JobCancelOpts),
/// Forget a specific job.
/// This will remove the job from the server's memory, forgetting it completely and reducing
Expand All @@ -314,8 +313,6 @@ pub enum JobCommand {
Submit(JobSubmitOpts),
/// Submit a job through a job definition file
SubmitFile(JobSubmitFileOpts),
/// Resubmits tasks of a job
Resubmit(JobResubmitOpts),
/// Waits until a job is finished
Wait(JobWaitOpts),
/// Interactively observe the execution of a job
Expand Down
3 changes: 0 additions & 3 deletions crates/hyperqueue/src/server/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,6 @@ pub async fn client_rpc_loop<
submit::handle_submit(&state_ref, &tako_ref, msg).await
}
FromClientMessage::JobInfo(msg) => compute_job_info(&state_ref, &msg.selector),
FromClientMessage::Resubmit(msg) => {
submit::handle_resubmit(&state_ref, &tako_ref, msg).await
}
FromClientMessage::Stop => {
end_flag.notify_one();
break;
Expand Down
87 changes: 4 additions & 83 deletions crates/hyperqueue/src/server/client/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use tako::gateway::{
use tako::program::ProgramDefinition;
use tako::TaskId;

use crate::client::status::get_task_status;
use crate::common::arraydef::IntArray;
use crate::common::env::{HQ_ENTRY, HQ_JOB_ID, HQ_SUBMIT_DIR, HQ_TASK_ID};
use crate::common::placeholders::{
Expand All @@ -25,8 +24,8 @@ use crate::server::rpc::Backend;
use crate::server::state::{State, StateRef};
use crate::stream::server::control::StreamServerControlMessage;
use crate::transfer::messages::{
JobDescription, ResubmitRequest, SubmitRequest, SubmitResponse, TaskBody, TaskDescription,
TaskIdSelector, TaskSelector, TaskStatusSelector, TaskWithDependencies, ToClientMessage,
JobDescription, SubmitRequest, SubmitResponse, TaskBody, TaskDescription, TaskIdSelector,
TaskSelector, TaskStatusSelector, TaskWithDependencies, ToClientMessage,
};
use crate::{JobId, JobTaskId, Priority, TakoTaskId};

Expand Down Expand Up @@ -141,84 +140,6 @@ fn prepare_job(request: &mut SubmitRequest, state: &mut State) -> (JobId, TakoTa
(job_id, state.new_task_id(task_count))
}

pub async fn handle_resubmit(
state_ref: &StateRef,
tako_ref: &Backend,
message: ResubmitRequest,
) -> ToClientMessage {
let msg_submit: SubmitRequest = {
let state = state_ref.get_mut();
let job = state.get_job(message.job_id);

if let Some(job) = job {
match job.job_desc {
JobDescription::Array { .. } => {}
_ => {
return ToClientMessage::Error(
"Resubmit is not supported for this job".to_string(),
)
}
}

if job.log.is_some() {
return ToClientMessage::Error(
"Resubmit is not currently supported when output streaming (`--log`) is used"
.to_string(),
);
}

let job_desc = if !message.filter.is_empty() {
match &job.job_desc {
JobDescription::Array {
task_desc,
entries,
ids: _,
} => {
let mut ids: Vec<u32> = job
.tasks
.values()
.filter_map(|v| {
if message.filter.contains(&get_task_status(&v.state)) {
Some(v.task_id.as_num())
} else {
None
}
})
.collect();

if ids.is_empty() {
return ToClientMessage::Error(
"Filtered task(s) are empty, can't submit empty job".to_string(),
);
}

ids.sort_unstable();
JobDescription::Array {
ids: IntArray::from_sorted_ids(ids.into_iter()),
entries: entries.clone(),
task_desc: task_desc.clone(),
}
}
_ => unimplemented!(),
}
} else {
job.job_desc.clone()
};

SubmitRequest {
job_desc,
name: job.name.clone(),
max_fails: job.max_fails,
submit_dir: std::env::current_dir().expect("Cannot get current working directory"),
log: None, // TODO: Reuse log configuration
}
} else {
return ToClientMessage::Error("Invalid job_id".to_string());
}
};
handle_submit(state_ref, tako_ref, msg_submit).await
}

async fn start_log_streaming(tako_ref: &Backend, job_id: JobId, path: PathBuf) {
let (sender, receiver) = oneshot::channel();
tako_ref.send_stream_control(StreamServerControlMessage::RegisterStream {
Expand Down Expand Up @@ -299,10 +220,10 @@ fn build_tasks_array(
Some(entries) => ids
.iter()
.zip(tako_base_id..)
.zip(entries.iter())
.zip(entries)
.map(|((task_id, tako_id), entry)| {
build_task_conf(
serialize_task_body(&ctx, task_id.into(), Some(entry.clone()), &task_desc),
serialize_task_body(&ctx, task_id.into(), Some(entry), &task_desc),
tako_id,
)
})
Expand Down
7 changes: 0 additions & 7 deletions crates/hyperqueue/src/transfer/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use tako::worker::WorkerConfiguration;
#[derive(Serialize, Deserialize, Debug)]
pub enum FromClientMessage {
Submit(SubmitRequest),
Resubmit(ResubmitRequest),
Cancel(CancelRequest),
ForgetJob(ForgetJobRequest),
JobDetail(JobDetailRequest),
Expand Down Expand Up @@ -147,12 +146,6 @@ pub struct TaskSelector {
pub status_selector: TaskStatusSelector,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct ResubmitRequest {
pub job_id: JobId,
pub filter: Vec<Status>,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct CancelRequest {
pub selector: IdSelector,
Expand Down
8 changes: 6 additions & 2 deletions crates/pyhq/python/hyperqueue/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ def submit(self, job: Job) -> SubmittedJob:
raise Exception("Submitted job must have at least a single task")

job_id = self.connection.submit_job(job_desc)
logging.info(f"Submitted job {job_id} with {task_count} {pluralize('task', task_count)}")
logging.info(
f"Submitted job {job_id} with {task_count} {pluralize('task', task_count)}"
)
return SubmittedJob(job=job, id=job_id)

def wait_for_jobs(self, jobs: Sequence[SubmittedJob], raise_on_error=True) -> bool:
Expand All @@ -93,7 +95,9 @@ def wait_for_jobs(self, jobs: Sequence[SubmittedJob], raise_on_error=True) -> bo
job_ids_str = ",".join(str(id) for id in job_ids)
if len(jobs) > 1:
job_ids_str = "{" + job_ids_str + "}"
logging.info(f"Waiting for {pluralize('job', len(jobs))} {job_ids_str} to finish")
logging.info(
f"Waiting for {pluralize('job', len(jobs))} {job_ids_str} to finish"
)

callback = create_progress_callback()

Expand Down
4 changes: 3 additions & 1 deletion crates/pyhq/python/hyperqueue/ffi/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@

class ResourceRequest:
n_nodes: int = 0
resources: Dict[str, Union[int, float, str]] = dataclasses.field(default_factory=dict)
resources: Dict[str, Union[int, float, str]] = dataclasses.field(
default_factory=dict
)
min_time: Optional[float] = None

def __init__(
Expand Down
6 changes: 5 additions & 1 deletion crates/pyhq/python/hyperqueue/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ def __init__(
self.tasks: List[Task] = []
self.task_map: Dict[TaskId, Task] = {}
self.max_fails = max_fails
self.default_workdir = Path(default_workdir).resolve() if default_workdir is not None else default_workdir
self.default_workdir = (
Path(default_workdir).resolve()
if default_workdir is not None
else default_workdir
)
self.default_env = default_env or {}

def task_by_id(self, id: TaskId) -> Optional[Task]:
Expand Down
8 changes: 6 additions & 2 deletions crates/pyhq/python/hyperqueue/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,13 @@ def default_stderr() -> str:

# TODO: how to resolve TASK_ID in the context of some other task?
class Output:
def __init__(self, name: str, filepath: Optional[str] = None, extension: Optional[str] = None):
def __init__(
self, name: str, filepath: Optional[str] = None, extension: Optional[str] = None
):
if filepath and extension:
raise ValidationException("Parameters `filepath` and `extension` are mutually exclusive")
raise ValidationException(
"Parameters `filepath` and `extension` are mutually exclusive"
)

self.name = name
self.filepath = filepath
Expand Down
12 changes: 9 additions & 3 deletions crates/pyhq/python/hyperqueue/task/function/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ class CloudWrapper:
Wraps a callable so that cloudpickle is used to pickle it, caching the pickle.
"""

def __init__(self, fn, pickled_fn=None, cache=True, protocol=cloudpickle.DEFAULT_PROTOCOL):
def __init__(
self, fn, pickled_fn=None, cache=True, protocol=cloudpickle.DEFAULT_PROTOCOL
):
if fn is None:
if pickled_fn is None:
raise ValueError("Pass at least one of `fn` and `pickled_fn`")
Expand All @@ -25,7 +27,9 @@ def __init__(self, fn, pickled_fn=None, cache=True, protocol=cloudpickle.DEFAULT
self.pickled_fn = pickled_fn
self.cache = cache
self.protocol = protocol
self.__doc__ = "CloudWrapper for {!r}. Original doc:\n\n{}".format(self.fn, self.fn.__doc__)
self.__doc__ = "CloudWrapper for {!r}. Original doc:\n\n{}".format(
self.fn, self.fn.__doc__
)
if hasattr(self.fn, "__name__"):
self.__name__ = self.fn.__name__

Expand All @@ -52,7 +56,9 @@ def _get_pickled_fn(self):
return pfn

def __call__(self, *args, **kwargs):
logging.debug(f"Running function {self.fn} using args {args} and kwargs {kwargs}")
logging.debug(
f"Running function {self.fn} using args {args} and kwargs {kwargs}"
)
return self.fn(*args, **kwargs)

def __reduce__(self):
Expand Down
4 changes: 3 additions & 1 deletion crates/pyhq/python/hyperqueue/task/program.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ def get_task_outputs(task: ExternalProgram) -> Dict[str, Output]:
outputs = gather_outputs(task.args) + gather_outputs(task.env)
for output in outputs:
if output.name in output_map:
raise ValidationException(f"Output `{output.name}` has been defined multiple times")
raise ValidationException(
f"Output `{output.name}` has been defined multiple times"
)
output_map[output.name] = output
return output_map
Binary file modified docs/imgs/cheatsheet.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Loading