Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New "resubmit" mechanism #649

Merged
merged 6 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions crates/hyperqueue/src/bin/hq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use hyperqueue::client::commands::autoalloc::command_autoalloc;
use hyperqueue::client::commands::event::command_event_log;
use hyperqueue::client::commands::job::{
cancel_job, forget_job, output_job_cat, output_job_detail, output_job_list, output_job_summary,
JobCancelOpts, JobCatOpts, JobForgetOpts, JobInfoOpts, JobListOpts,
JobCancelOpts, JobCatOpts, JobForgetOpts, JobInfoOpts, JobListOpts, JobTaskIdsOpts,
};
use hyperqueue::client::commands::log::command_log;
use hyperqueue::client::commands::server::command_server;
Expand All @@ -32,7 +32,8 @@ use hyperqueue::client::output::outputs::{Output, Outputs};
use hyperqueue::client::output::quiet::Quiet;
use hyperqueue::client::status::Status;
use hyperqueue::client::task::{
output_job_task_info, output_job_task_list, TaskCommand, TaskInfoOpts, TaskListOpts, TaskOpts,
output_job_task_ids, output_job_task_info, output_job_task_list, TaskCommand, TaskInfoOpts,
TaskListOpts, TaskOpts,
};
use hyperqueue::common::cli::{
get_task_id_selector, get_task_selector, ColorPolicy, CommonOpts, GenerateCompletionOpts,
Expand Down Expand Up @@ -137,6 +138,14 @@ async fn command_job_resubmit(
resubmit_computation(gsettings, &mut connection, opts).await
}

async fn command_job_task_ids(
gsettings: &GlobalSettings,
opts: JobTaskIdsOpts,
) -> anyhow::Result<()> {
let mut connection = get_client_session(gsettings.server_directory()).await?;
output_job_task_ids(gsettings, &mut connection, opts).await
}

async fn command_job_wait(gsettings: &GlobalSettings, opts: JobWaitOpts) -> anyhow::Result<()> {
let mut connection = get_client_session(gsettings.server_directory()).await?;
wait_for_jobs(gsettings, &mut connection, opts.selector).await
Expand All @@ -155,7 +164,7 @@ async fn command_job_progress(
ToClientMessage::JobInfoResponse(r) => r
)
.await?;
wait_for_jobs_with_progress(&mut session, response.jobs).await
wait_for_jobs_with_progress(&mut session, &response.jobs).await
}

async fn command_task_list(gsettings: &GlobalSettings, opts: TaskListOpts) -> anyhow::Result<()> {
Expand Down Expand Up @@ -431,6 +440,9 @@ async fn main() -> hyperqueue::Result<()> {
SubCommand::Job(JobOpts {
subcmd: JobCommand::Progress(opts),
}) => command_job_progress(&gsettings, opts).await,
SubCommand::Job(JobOpts {
subcmd: JobCommand::TaskIds(opts),
}) => command_job_task_ids(&gsettings, opts).await,
SubCommand::Task(TaskOpts {
subcmd: TaskCommand::List(opts),
}) => command_task_list(&gsettings, opts).await,
Expand Down
12 changes: 12 additions & 0 deletions crates/hyperqueue/src/client/commands/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,18 @@ pub struct JobForgetOpts {
pub filter: Vec<CompletedJobStatus>,
}

#[derive(Parser)]
pub struct JobTaskIdsOpts {
/// Single ID, ID range or `last` to display the most recently submitted job
#[arg(value_parser = parse_last_all_range)]
pub selector: IdSelector,

/// Select only tasks with given state(s)
/// You can use multiple states separated by a comma.
#[arg(long, value_delimiter(','), value_enum)]
pub filter: Vec<Status>,
}

#[derive(clap::ValueEnum, Clone, Debug)]
pub enum CompletedJobStatus {
Finished,
Expand Down
2 changes: 1 addition & 1 deletion crates/hyperqueue/src/client/commands/submit/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ pub(crate) async fn send_submit_request(
)
.await?;
} else if progress {
wait_for_jobs_with_progress(session, vec![info]).await?;
wait_for_jobs_with_progress(session, &[info]).await?;
}
Ok(())
}
Expand Down
27 changes: 16 additions & 11 deletions crates/hyperqueue/src/client/commands/wait.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::BTreeSet;
use std::io::Write;
use std::time::{Duration, SystemTime};
use tokio::time::sleep;
Expand All @@ -17,7 +18,7 @@ use crate::transfer::messages::{
FromClientMessage, IdSelector, JobDetailRequest, JobInfo, JobInfoRequest, TaskIdSelector,
TaskSelector, TaskStatusSelector, ToClientMessage, WaitForJobsRequest,
};
use crate::{rpc_call, JobId, JobTaskCount, Set};
use crate::{rpc_call, JobId, JobTaskCount};
use colored::Colorize;

pub async fn wait_for_jobs(
Expand Down Expand Up @@ -72,18 +73,23 @@ pub async fn wait_for_jobs(

pub async fn wait_for_jobs_with_progress(
session: &mut ClientSession,
mut jobs: Vec<JobInfo>,
jobs: &[JobInfo],
) -> anyhow::Result<()> {
jobs.retain(|info| !is_terminated(info));

if jobs.is_empty() {
if jobs.iter().all(is_terminated) {
log::warn!("There are no jobs to wait for");
} else {
let total_tasks: JobTaskCount = jobs.iter().map(|info| info.n_tasks).sum();
let mut remaining_job_ids: Set<JobId> = jobs.into_iter().map(|info| info.id).collect();
let total_tasks: JobTaskCount = jobs
.iter()
.filter(|info| !is_terminated(info))
.map(|info| info.n_tasks)
.sum();
let mut remaining_job_ids: BTreeSet<JobId> = jobs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason why Set was switched to BTreeSet and why the Vec was changed to a slice?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we need to random removes & sorted iterator for creating IntArray.

.iter()
.filter(|info| !is_terminated(info))
.map(|info| info.id)
.collect();

let total_jobs = remaining_job_ids.len();

log::info!(
"Waiting for {} {} with {} {}",
total_jobs,
Expand All @@ -95,15 +101,14 @@ pub async fn wait_for_jobs_with_progress(
let mut counters = JobTaskCounters::default();

loop {
let ids_ref = &mut remaining_job_ids;
let response = rpc_call!(
session.connection(),
FromClientMessage::JobInfo(JobInfoRequest {
selector: IdSelector::Specific(IntArray::from_ids(ids_ref.iter().map(|&id| id.into()).collect())),
selector: IdSelector::Specific(IntArray::from_ids(remaining_job_ids.iter().map(|x| x.as_num()))),
}),
ToClientMessage::JobInfoResponse(r) => r
)
.await?;
.await?;

let mut current_counters = counters;
for job in &response.jobs {
Expand Down
7 changes: 7 additions & 0 deletions crates/hyperqueue/src/client/output/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::client::output::common::{
};
use crate::client::output::json::format_datetime;
use crate::client::output::Verbosity;
use crate::common::arraydef::IntArray;
use crate::common::utils::str::{pluralize, select_plural, truncate_middle};
use crate::worker::start::WORKER_EXTRA_PROCESS_PID;
use anyhow::Error;
Expand Down Expand Up @@ -424,6 +425,12 @@ impl Output for CliOutput {
);
}

fn print_task_ids(&self, job_task_ids: Vec<(JobId, IntArray)>) {
for (_, array) in &job_task_ids {
println!("{}", array);
}
}

fn print_job_list(&self, jobs: Vec<JobInfo>, total_jobs: usize) {
let job_count = jobs.len();
let rows: Vec<_> = jobs
Expand Down
10 changes: 10 additions & 0 deletions crates/hyperqueue/src/client/output/json.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::path::Path;
use std::time::Duration;

Expand All @@ -17,6 +18,7 @@ use crate::client::job::WorkerMap;
use crate::client::output::common::{group_jobs_by_status, resolve_task_paths, TaskToPathsMap};
use crate::client::output::outputs::{Output, OutputStream};
use crate::client::output::Verbosity;
use crate::common::arraydef::IntArray;
use crate::common::manager::info::{GetManagerInfo, ManagerType};
use crate::server::autoalloc::{Allocation, AllocationState, QueueId};
use crate::server::job::{JobTaskInfo, JobTaskState, StartedTaskData};
Expand Down Expand Up @@ -199,6 +201,14 @@ impl Output for JsonOutput {
self.print(format_tasks(tasks, map));
}

fn print_task_ids(&self, job_task_ids: Vec<(JobId, IntArray)>) {
let map: HashMap<JobId, Vec<u32>> = job_task_ids
.into_iter()
.map(|(key, value)| (key, value.iter().collect()))
.collect();
self.print(json!(map));
}

fn print_summary(&self, filename: &Path, summary: Summary) {
let json = json!({
"filename": filename,
Expand Down
2 changes: 2 additions & 0 deletions crates/hyperqueue/src/client/output/outputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::path::Path;

use crate::client::output::common::TaskToPathsMap;
use crate::client::output::Verbosity;
use crate::common::arraydef::IntArray;
use crate::server::job::JobTaskInfo;
use crate::JobId;
use core::time::Duration;
Expand Down Expand Up @@ -77,6 +78,7 @@ pub trait Output {
server_uid: &str,
verbosity: Verbosity,
);
fn print_task_ids(&self, jobs_task_id: Vec<(JobId, IntArray)>);

// Log
fn print_summary(&self, filename: &Path, summary: Summary);
Expand Down
3 changes: 3 additions & 0 deletions crates/hyperqueue/src/client/output/quiet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::client::output::common::{
};
use crate::client::output::outputs::{Output, OutputStream};
use crate::client::status::{job_status, Status};
use crate::common::arraydef::IntArray;
use crate::server::autoalloc::Allocation;
use crate::server::job::JobTaskInfo;
use crate::stream::reader::logfile::Summary;
Expand Down Expand Up @@ -130,6 +131,8 @@ impl Output for Quiet {
) {
}

fn print_task_ids(&self, _job_task_ids: Vec<(JobId, IntArray)>) {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that here we could write one ID per line? This mode should really be renamed from quiet to something like bash-processable :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed it. if there are 2 or more jobs, it prints also job ids.


// Log
fn print_summary(&self, _filename: &Path, _summary: Summary) {}

Expand Down
45 changes: 44 additions & 1 deletion crates/hyperqueue/src/client/task.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use crate::client::commands::job::JobTaskIdsOpts;
use crate::client::globalsettings::GlobalSettings;
use crate::client::job::get_worker_map;
use crate::client::output::{Verbosity, VerbosityFlag};
use crate::common::arraydef::IntArray;
use crate::common::cli::{parse_last_range, parse_last_single_id, TaskSelectorArg};
use crate::rpc_call;
use crate::common::error::HqError;
use crate::transfer::connection::ClientSession;
use crate::transfer::messages::{
FromClientMessage, IdSelector, JobDetailRequest, SingleIdSelector, TaskIdSelector,
TaskSelector, TaskStatusSelector, ToClientMessage,
};
use crate::{rpc_call, JobId};

#[derive(clap::Parser)]
pub struct TaskOpts {
Expand Down Expand Up @@ -127,3 +129,44 @@ pub async fn output_job_task_info(

Ok(())
}

pub async fn output_job_task_ids(
gsettings: &GlobalSettings,
session: &mut ClientSession,
opts: JobTaskIdsOpts,
) -> anyhow::Result<()> {
let message = FromClientMessage::JobDetail(JobDetailRequest {
job_id_selector: opts.selector,
task_selector: Some(TaskSelector {
id_selector: TaskIdSelector::All,
status_selector: if opts.filter.is_empty() {
TaskStatusSelector::All
} else {
TaskStatusSelector::Specific(opts.filter)
},
}),
});
let response =
rpc_call!(session.connection(), message, ToClientMessage::JobDetailResponse(r) => r)
.await?;
let mut job_task_ids: Vec<_> = response
.details
.iter()
.map(|(job_id, detail)| {
Ok((
*job_id,
IntArray::from_ids(
detail
.as_ref()
.ok_or_else(|| HqError::GenericError("Job Id not found".to_string()))?
.tasks
.iter()
.map(|info| info.task_id.as_num()),
),
))
})
.collect::<crate::Result<Vec<(JobId, IntArray)>>>()?;
job_task_ids.sort_unstable_by_key(|x| x.0);
gsettings.printer().print_task_ids(job_task_ids);
Ok(())
}
17 changes: 12 additions & 5 deletions crates/hyperqueue/src/common/arraydef.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,22 @@ impl IntArray {
IntArray { ranges }
}

pub fn from_ids(ids: Vec<u32>) -> IntArray {
// ids has to be sorted!
spirali marked this conversation as resolved.
Show resolved Hide resolved
pub fn from_ids(ids: impl Iterator<Item = u32>) -> IntArray {
let mut ranges: Vec<IntRange> = Vec::new();
let mut last_id = None;
for id in ids {
if let Some(pos) = ranges.iter().position(|x| id == (x.start + x.count)) {
ranges[pos].count += 1;
if last_id.map(|last_id| last_id + 1 == id).unwrap_or(false) {
ranges.last_mut().unwrap().count += 1;
} else {
ranges.push(IntRange::new(id, 1, 1));
}
last_id = Some(id)
spirali marked this conversation as resolved.
Show resolved Hide resolved
}
IntArray { ranges }
}
pub fn from_id(id: u32) -> IntArray {
Self::from_ids(vec![id])
Self::from_ids([id].iter().copied())
}

pub fn from_range(start: u32, count: u32) -> Self {
Expand Down Expand Up @@ -104,7 +107,11 @@ impl fmt::Display for IntArray {
));
}
}
write!(f, "{}", &str[0..str.len() - 2])
if str.len() >= 2 {
spirali marked this conversation as resolved.
Show resolved Hide resolved
write!(f, "{}", &str[0..str.len() - 2])
} else {
Ok(())
}
}
}

Expand Down
4 changes: 3 additions & 1 deletion crates/hyperqueue/src/common/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tako::WorkerId;
use crate::client::commands::autoalloc::AutoAllocOpts;
use crate::client::commands::event::EventLogOpts;
use crate::client::commands::job::{
JobCancelOpts, JobCatOpts, JobForgetOpts, JobInfoOpts, JobListOpts,
JobCancelOpts, JobCatOpts, JobForgetOpts, JobInfoOpts, JobListOpts, JobTaskIdsOpts,
};
use crate::client::commands::log::LogOpts;
use crate::client::commands::server::ServerOpts;
Expand Down Expand Up @@ -320,6 +320,8 @@ pub enum JobCommand {
Wait(JobWaitOpts),
/// Interactively observe the execution of a job
Progress(JobProgressOpts),
/// Print tasks Ids for given job
spirali marked this conversation as resolved.
Show resolved Hide resolved
TaskIds(JobTaskIdsOpts),
}

#[derive(Parser)]
Expand Down
2 changes: 1 addition & 1 deletion crates/hyperqueue/src/server/client/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ pub async fn handle_resubmit(

ids.sort_unstable();
JobDescription::Array {
ids: IntArray::from_ids(ids),
ids: IntArray::from_ids(ids.iter().copied()),
spirali marked this conversation as resolved.
Show resolved Hide resolved
entries: entries.clone(),
task_desc: task_desc.clone(),
}
Expand Down
19 changes: 19 additions & 0 deletions tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1476,3 +1476,22 @@ def check_child_process_exited(hq_env: HqEnv, stop_fn: Callable[[subprocess.Pope
parent, child = pids
wait_for_pid_exit(parent)
wait_for_pid_exit(child)

def test_job_task_ids(hq_env: HqEnv):
hq_env.start_server()
hq_env.command(["submit", "--array=2,7,9,20-30", "--", "python", "-c", "import os; assert os.environ['HQ_TASK_ID'] not in ['25', '26', '27', '28']"])
hq_env.start_workers(1, cpus=1)
wait_for_job_state(hq_env, 1, "FAILED")
Kobzol marked this conversation as resolved.
Show resolved Hide resolved

result = hq_env.command(["job", "task-ids", "1"])
assert result == "2, 7, 9, 20-30\n"

result = hq_env.command(["job", "task-ids", "1", "--filter", "finished"])
assert result == "2, 7, 9, 20-24, 29-30\n"

result = hq_env.command(["job", "task-ids", "1", "--filter", "failed"])
assert result == "25-28\n"

result = hq_env.command(["job", "task-ids", "1", "--filter", "canceled"])
assert result == "\n"