Skip to content

Commit

Permalink
Command "resubmit" removed
Browse files Browse the repository at this point in the history
  • Loading branch information
spirali committed Dec 15, 2023
1 parent bfa67e1 commit f618f4e
Show file tree
Hide file tree
Showing 15 changed files with 41 additions and 209 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Breaking change

* Mechanism for resubmitting tasks was changed. Command `resubmit` was removed, see the manual for replacement.

* The output format of the `job info` command with JSON output mode has been changed. Note that
the JSON output mode is still unstable.

Expand Down
14 changes: 1 addition & 13 deletions crates/hyperqueue/src/bin/hq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ use hyperqueue::client::commands::job::{
use hyperqueue::client::commands::log::command_log;
use hyperqueue::client::commands::server::command_server;
use hyperqueue::client::commands::submit::{
resubmit_computation, submit_computation, submit_computation_from_job_file, JobResubmitOpts,
JobSubmitFileOpts, JobSubmitOpts,
submit_computation, submit_computation_from_job_file, JobSubmitFileOpts, JobSubmitOpts,
};
use hyperqueue::client::commands::wait::{wait_for_jobs, wait_for_jobs_with_progress};
use hyperqueue::client::commands::worker::{
Expand Down Expand Up @@ -130,14 +129,6 @@ async fn command_job_delete(gsettings: &GlobalSettings, opts: JobForgetOpts) ->
forget_job(gsettings, &mut connection, opts).await
}

async fn command_job_resubmit(
gsettings: &GlobalSettings,
opts: JobResubmitOpts,
) -> anyhow::Result<()> {
let mut connection = get_client_session(gsettings.server_directory()).await?;
resubmit_computation(gsettings, &mut connection, opts).await
}

async fn command_job_task_ids(
gsettings: &GlobalSettings,
opts: JobTaskIdsOpts,
Expand Down Expand Up @@ -431,9 +422,6 @@ async fn main() -> hyperqueue::Result<()> {
SubCommand::Job(JobOpts {
subcmd: JobCommand::Forget(opts),
}) => command_job_delete(&gsettings, opts).await,
SubCommand::Job(JobOpts {
subcmd: JobCommand::Resubmit(opts),
}) => command_job_resubmit(&gsettings, opts).await,
SubCommand::Job(JobOpts {
subcmd: JobCommand::Wait(opts),
}) => command_job_wait(&gsettings, opts).await,
Expand Down
36 changes: 2 additions & 34 deletions crates/hyperqueue/src/client/commands/submit/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ use super::directives::parse_hq_directives;
use crate::client::commands::submit::directives::parse_hq_directives_from_file;
use crate::client::commands::wait::{wait_for_jobs, wait_for_jobs_with_progress};
use crate::client::globalsettings::GlobalSettings;
use crate::client::job::get_worker_map;
use crate::client::resources::{parse_allocation_request, parse_resource_request};
use crate::client::status::Status;
use crate::common::arraydef::IntArray;
use crate::common::cli::OptsWithMatches;
use crate::common::parser2::{all_consuming, CharParser, ParseError};
Expand All @@ -37,8 +35,8 @@ use crate::common::utils::str::pluralize;
use crate::common::utils::time::parse_human_time;
use crate::transfer::connection::ClientSession;
use crate::transfer::messages::{
FromClientMessage, IdSelector, JobDescription, PinMode, ResubmitRequest, SubmitRequest,
TaskDescription, ToClientMessage,
FromClientMessage, IdSelector, JobDescription, PinMode, SubmitRequest, TaskDescription,
ToClientMessage,
};
use crate::{rpc_call, JobTaskCount, Map};

Expand Down Expand Up @@ -844,36 +842,6 @@ fn check_suspicious_options(opts: &JobSubmitOpts, task_count: u32) -> anyhow::Re
Ok(())
}

#[derive(Parser)]
pub struct JobResubmitOpts {
/// Job that should be resubmitted
job_id: u32,

/// Resubmit only tasks with the given states.
/// You can use multiple states separated by a comma.
#[arg(long, value_delimiter(','), value_enum)]
filter: Vec<Status>,
}

pub async fn resubmit_computation(
gsettings: &GlobalSettings,
session: &mut ClientSession,
opts: JobResubmitOpts,
) -> anyhow::Result<()> {
let message = FromClientMessage::Resubmit(ResubmitRequest {
job_id: opts.job_id.into(),
filter: opts.filter,
});
let response =
rpc_call!(session.connection(), message, ToClientMessage::SubmitResponse(r) => r).await?;
gsettings.printer().print_job_detail(
vec![response.job],
get_worker_map(session).await?,
&response.server_uid,
);
Ok(())
}

fn validate_name(name: String) -> anyhow::Result<String> {
match name {
name if name.contains('\n') || name.contains('\t') => {
Expand Down
2 changes: 1 addition & 1 deletion crates/hyperqueue/src/client/commands/submit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ pub mod directives;
mod jobfile;

pub use command::SubmitJobConfOpts;
pub use command::{resubmit_computation, submit_computation, JobResubmitOpts, JobSubmitOpts};
pub use command::{submit_computation, JobSubmitOpts};

pub use jobfile::{submit_computation_from_job_file, JobSubmitFileOpts};
5 changes: 1 addition & 4 deletions crates/hyperqueue/src/common/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::client::commands::job::{
};
use crate::client::commands::log::LogOpts;
use crate::client::commands::server::ServerOpts;
use crate::client::commands::submit::{JobResubmitOpts, JobSubmitFileOpts, JobSubmitOpts};
use crate::client::commands::submit::{JobSubmitFileOpts, JobSubmitOpts};
use crate::client::commands::worker::{WorkerFilter, WorkerStartOpts};
use crate::client::output::outputs::Outputs;
use crate::client::status::Status;
Expand Down Expand Up @@ -299,7 +299,6 @@ pub enum JobCommand {
Info(JobInfoOpts),
/// Cancel a specific job.
/// This will cancel all tasks, stopping them from being computation.
/// The job will still remain in the server's memory, and you will be able to resubmit it later.
Cancel(JobCancelOpts),
/// Forget a specific job.
/// This will remove the job from the server's memory, forgetting it completely and reducing
Expand All @@ -314,8 +313,6 @@ pub enum JobCommand {
Submit(JobSubmitOpts),
/// Submit a job through a job definition file
SubmitFile(JobSubmitFileOpts),
/// Resubmits tasks of a job
Resubmit(JobResubmitOpts),
/// Waits until a job is finished
Wait(JobWaitOpts),
/// Interactively observe the execution of a job
Expand Down
3 changes: 0 additions & 3 deletions crates/hyperqueue/src/server/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,6 @@ pub async fn client_rpc_loop<
submit::handle_submit(&state_ref, &tako_ref, msg).await
}
FromClientMessage::JobInfo(msg) => compute_job_info(&state_ref, &msg.selector),
FromClientMessage::Resubmit(msg) => {
submit::handle_resubmit(&state_ref, &tako_ref, msg).await
}
FromClientMessage::Stop => {
end_flag.notify_one();
break;
Expand Down
83 changes: 2 additions & 81 deletions crates/hyperqueue/src/server/client/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use tako::gateway::{
use tako::program::ProgramDefinition;
use tako::TaskId;

use crate::client::status::get_task_status;
use crate::common::arraydef::IntArray;
use crate::common::env::{HQ_ENTRY, HQ_JOB_ID, HQ_SUBMIT_DIR, HQ_TASK_ID};
use crate::common::placeholders::{
Expand All @@ -25,8 +24,8 @@ use crate::server::rpc::Backend;
use crate::server::state::{State, StateRef};
use crate::stream::server::control::StreamServerControlMessage;
use crate::transfer::messages::{
JobDescription, ResubmitRequest, SubmitRequest, SubmitResponse, TaskBody, TaskDescription,
TaskIdSelector, TaskSelector, TaskStatusSelector, TaskWithDependencies, ToClientMessage,
JobDescription, SubmitRequest, SubmitResponse, TaskBody, TaskDescription, TaskIdSelector,
TaskSelector, TaskStatusSelector, TaskWithDependencies, ToClientMessage,
};
use crate::{JobId, JobTaskId, Priority, TakoTaskId};

Expand Down Expand Up @@ -141,84 +140,6 @@ fn prepare_job(request: &mut SubmitRequest, state: &mut State) -> (JobId, TakoTa
(job_id, state.new_task_id(task_count))
}

pub async fn handle_resubmit(
state_ref: &StateRef,
tako_ref: &Backend,
message: ResubmitRequest,
) -> ToClientMessage {
let msg_submit: SubmitRequest = {
let state = state_ref.get_mut();
let job = state.get_job(message.job_id);

if let Some(job) = job {
match job.job_desc {
JobDescription::Array { .. } => {}
_ => {
return ToClientMessage::Error(
"Resubmit is not supported for this job".to_string(),
)
}
}

if job.log.is_some() {
return ToClientMessage::Error(
"Resubmit is not currently supported when output streaming (`--log`) is used"
.to_string(),
);
}

let job_desc = if !message.filter.is_empty() {
match &job.job_desc {
JobDescription::Array {
task_desc,
entries,
ids: _,
} => {
let mut ids: Vec<u32> = job
.tasks
.values()
.filter_map(|v| {
if message.filter.contains(&get_task_status(&v.state)) {
Some(v.task_id.as_num())
} else {
None
}
})
.collect();

if ids.is_empty() {
return ToClientMessage::Error(
"Filtered task(s) are empty, can't submit empty job".to_string(),
);
}

ids.sort_unstable();
JobDescription::Array {
ids: IntArray::from_sorted_ids(ids.into_iter()),
entries: entries.clone(),
task_desc: task_desc.clone(),
}
}
_ => unimplemented!(),
}
} else {
job.job_desc.clone()
};

SubmitRequest {
job_desc,
name: job.name.clone(),
max_fails: job.max_fails,
submit_dir: std::env::current_dir().expect("Cannot get current working directory"),
log: None, // TODO: Reuse log configuration
}
} else {
return ToClientMessage::Error("Invalid job_id".to_string());
}
};
handle_submit(state_ref, tako_ref, msg_submit).await
}

async fn start_log_streaming(tako_ref: &Backend, job_id: JobId, path: PathBuf) {
let (sender, receiver) = oneshot::channel();
tako_ref.send_stream_control(StreamServerControlMessage::RegisterStream {
Expand Down
7 changes: 0 additions & 7 deletions crates/hyperqueue/src/transfer/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use tako::worker::WorkerConfiguration;
#[derive(Serialize, Deserialize, Debug)]
pub enum FromClientMessage {
Submit(SubmitRequest),
Resubmit(ResubmitRequest),
Cancel(CancelRequest),
ForgetJob(ForgetJobRequest),
JobDetail(JobDetailRequest),
Expand Down Expand Up @@ -147,12 +146,6 @@ pub struct TaskSelector {
pub status_selector: TaskStatusSelector,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct ResubmitRequest {
pub job_id: JobId,
pub filter: Vec<Status>,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct CancelRequest {
pub selector: IdSelector,
Expand Down
8 changes: 6 additions & 2 deletions crates/pyhq/python/hyperqueue/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ def submit(self, job: Job) -> SubmittedJob:
raise Exception("Submitted job must have at least a single task")

job_id = self.connection.submit_job(job_desc)
logging.info(f"Submitted job {job_id} with {task_count} {pluralize('task', task_count)}")
logging.info(
f"Submitted job {job_id} with {task_count} {pluralize('task', task_count)}"
)
return SubmittedJob(job=job, id=job_id)

def wait_for_jobs(self, jobs: Sequence[SubmittedJob], raise_on_error=True) -> bool:
Expand All @@ -93,7 +95,9 @@ def wait_for_jobs(self, jobs: Sequence[SubmittedJob], raise_on_error=True) -> bo
job_ids_str = ",".join(str(id) for id in job_ids)
if len(jobs) > 1:
job_ids_str = "{" + job_ids_str + "}"
logging.info(f"Waiting for {pluralize('job', len(jobs))} {job_ids_str} to finish")
logging.info(
f"Waiting for {pluralize('job', len(jobs))} {job_ids_str} to finish"
)

callback = create_progress_callback()

Expand Down
4 changes: 3 additions & 1 deletion crates/pyhq/python/hyperqueue/ffi/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@

class ResourceRequest:
n_nodes: int = 0
resources: Dict[str, Union[int, float, str]] = dataclasses.field(default_factory=dict)
resources: Dict[str, Union[int, float, str]] = dataclasses.field(
default_factory=dict
)
min_time: Optional[float] = None

def __init__(
Expand Down
6 changes: 5 additions & 1 deletion crates/pyhq/python/hyperqueue/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ def __init__(
self.tasks: List[Task] = []
self.task_map: Dict[TaskId, Task] = {}
self.max_fails = max_fails
self.default_workdir = Path(default_workdir).resolve() if default_workdir is not None else default_workdir
self.default_workdir = (
Path(default_workdir).resolve()
if default_workdir is not None
else default_workdir
)
self.default_env = default_env or {}

def task_by_id(self, id: TaskId) -> Optional[Task]:
Expand Down
8 changes: 6 additions & 2 deletions crates/pyhq/python/hyperqueue/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,13 @@ def default_stderr() -> str:

# TODO: how to resolve TASK_ID in the context of some other task?
class Output:
def __init__(self, name: str, filepath: Optional[str] = None, extension: Optional[str] = None):
def __init__(
self, name: str, filepath: Optional[str] = None, extension: Optional[str] = None
):
if filepath and extension:
raise ValidationException("Parameters `filepath` and `extension` are mutually exclusive")
raise ValidationException(
"Parameters `filepath` and `extension` are mutually exclusive"
)

self.name = name
self.filepath = filepath
Expand Down
12 changes: 9 additions & 3 deletions crates/pyhq/python/hyperqueue/task/function/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ class CloudWrapper:
Wraps a callable so that cloudpickle is used to pickle it, caching the pickle.
"""

def __init__(self, fn, pickled_fn=None, cache=True, protocol=cloudpickle.DEFAULT_PROTOCOL):
def __init__(
self, fn, pickled_fn=None, cache=True, protocol=cloudpickle.DEFAULT_PROTOCOL
):
if fn is None:
if pickled_fn is None:
raise ValueError("Pass at least one of `fn` and `pickled_fn`")
Expand All @@ -25,7 +27,9 @@ def __init__(self, fn, pickled_fn=None, cache=True, protocol=cloudpickle.DEFAULT
self.pickled_fn = pickled_fn
self.cache = cache
self.protocol = protocol
self.__doc__ = "CloudWrapper for {!r}. Original doc:\n\n{}".format(self.fn, self.fn.__doc__)
self.__doc__ = "CloudWrapper for {!r}. Original doc:\n\n{}".format(
self.fn, self.fn.__doc__
)
if hasattr(self.fn, "__name__"):
self.__name__ = self.fn.__name__

Expand All @@ -52,7 +56,9 @@ def _get_pickled_fn(self):
return pfn

def __call__(self, *args, **kwargs):
logging.debug(f"Running function {self.fn} using args {args} and kwargs {kwargs}")
logging.debug(
f"Running function {self.fn} using args {args} and kwargs {kwargs}"
)
return self.fn(*args, **kwargs)

def __reduce__(self):
Expand Down
4 changes: 3 additions & 1 deletion crates/pyhq/python/hyperqueue/task/program.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ def get_task_outputs(task: ExternalProgram) -> Dict[str, Output]:
outputs = gather_outputs(task.args) + gather_outputs(task.env)
for output in outputs:
if output.name in output_map:
raise ValidationException(f"Output `{output.name}` has been defined multiple times")
raise ValidationException(
f"Output `{output.name}` has been defined multiple times"
)
output_map[output.name] = output
return output_map
Loading

0 comments on commit f618f4e

Please sign in to comment.