Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New "resubmit" mechanism #649

Merged
merged 6 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Dev

## Breaking change

* The output format of the `job info` command with JSON output mode has been changed. Note that
the JSON output mode is still unstable.

Expand Down
18 changes: 15 additions & 3 deletions crates/hyperqueue/src/bin/hq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use hyperqueue::client::commands::autoalloc::command_autoalloc;
use hyperqueue::client::commands::event::command_event_log;
use hyperqueue::client::commands::job::{
cancel_job, forget_job, output_job_cat, output_job_detail, output_job_list, output_job_summary,
JobCancelOpts, JobCatOpts, JobForgetOpts, JobInfoOpts, JobListOpts,
JobCancelOpts, JobCatOpts, JobForgetOpts, JobInfoOpts, JobListOpts, JobTaskIdsOpts,
};
use hyperqueue::client::commands::log::command_log;
use hyperqueue::client::commands::server::command_server;
Expand All @@ -32,7 +32,8 @@ use hyperqueue::client::output::outputs::{Output, Outputs};
use hyperqueue::client::output::quiet::Quiet;
use hyperqueue::client::status::Status;
use hyperqueue::client::task::{
output_job_task_info, output_job_task_list, TaskCommand, TaskInfoOpts, TaskListOpts, TaskOpts,
output_job_task_ids, output_job_task_info, output_job_task_list, TaskCommand, TaskInfoOpts,
TaskListOpts, TaskOpts,
};
use hyperqueue::common::cli::{
get_task_id_selector, get_task_selector, ColorPolicy, CommonOpts, GenerateCompletionOpts,
Expand Down Expand Up @@ -137,6 +138,14 @@ async fn command_job_resubmit(
resubmit_computation(gsettings, &mut connection, opts).await
}

async fn command_job_task_ids(
gsettings: &GlobalSettings,
opts: JobTaskIdsOpts,
) -> anyhow::Result<()> {
let mut connection = get_client_session(gsettings.server_directory()).await?;
output_job_task_ids(gsettings, &mut connection, opts).await
}

async fn command_job_wait(gsettings: &GlobalSettings, opts: JobWaitOpts) -> anyhow::Result<()> {
let mut connection = get_client_session(gsettings.server_directory()).await?;
wait_for_jobs(gsettings, &mut connection, opts.selector).await
Expand All @@ -155,7 +164,7 @@ async fn command_job_progress(
ToClientMessage::JobInfoResponse(r) => r
)
.await?;
wait_for_jobs_with_progress(&mut session, response.jobs).await
wait_for_jobs_with_progress(&mut session, &response.jobs).await
}

async fn command_task_list(gsettings: &GlobalSettings, opts: TaskListOpts) -> anyhow::Result<()> {
Expand Down Expand Up @@ -431,6 +440,9 @@ async fn main() -> hyperqueue::Result<()> {
SubCommand::Job(JobOpts {
subcmd: JobCommand::Progress(opts),
}) => command_job_progress(&gsettings, opts).await,
SubCommand::Job(JobOpts {
subcmd: JobCommand::TaskIds(opts),
}) => command_job_task_ids(&gsettings, opts).await,
SubCommand::Task(TaskOpts {
subcmd: TaskCommand::List(opts),
}) => command_task_list(&gsettings, opts).await,
Expand Down
37 changes: 13 additions & 24 deletions crates/hyperqueue/src/client/commands/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@ use crate::client::status::{job_status, Status};
use crate::common::cli::{parse_last_all_range, parse_last_range, TaskSelectorArg};
use crate::common::utils::str::pluralize;
use crate::rpc_call;
use crate::transfer::connection::{ClientConnection, ClientSession};
use crate::transfer::connection::ClientSession;
use crate::transfer::messages::{
CancelJobResponse, CancelRequest, ForgetJobRequest, FromClientMessage, IdSelector, JobDetail,
JobDetailRequest, JobInfoRequest, TaskIdSelector, TaskSelector, TaskStatusSelector,
ToClientMessage,
};
use crate::JobId;

#[derive(Parser)]
pub struct JobListOpts {
Expand Down Expand Up @@ -59,6 +58,18 @@ pub struct JobForgetOpts {
pub filter: Vec<CompletedJobStatus>,
}

#[derive(Parser)]
pub struct JobTaskIdsOpts {
/// Single ID, ID range or `last` to display the most recently submitted job
#[arg(value_parser = parse_last_all_range)]
pub selector: IdSelector,

/// Select only tasks with given state(s)
/// You can use multiple states separated by a comma.
#[arg(long, value_delimiter(','), value_enum)]
pub filter: Vec<Status>,
}

#[derive(clap::ValueEnum, Clone, Debug)]
pub enum CompletedJobStatus {
Finished,
Expand Down Expand Up @@ -95,28 +106,6 @@ pub struct JobCatOpts {
pub stream: OutputStream,
}

pub async fn get_last_job_id(connection: &mut ClientConnection) -> crate::Result<Option<JobId>> {
let message = FromClientMessage::JobInfo(JobInfoRequest {
selector: IdSelector::LastN(1),
});
let response = rpc_call!(connection, message, ToClientMessage::JobInfoResponse(r) => r).await?;

Ok(response.jobs.last().map(|job| job.id))
}

pub async fn get_job_ids(connection: &mut ClientConnection) -> crate::Result<Option<Vec<JobId>>> {
let message = FromClientMessage::JobInfo(JobInfoRequest {
selector: IdSelector::All,
});
let response = rpc_call!(connection, message, ToClientMessage::JobInfoResponse(r) => r).await?;

let mut ids: Vec<JobId> = Vec::new();
for job in response.jobs {
ids.push(job.id);
}
Ok(Option::from(ids))
}

pub async fn output_job_list(
gsettings: &GlobalSettings,
session: &mut ClientSession,
Expand Down
35 changes: 28 additions & 7 deletions crates/hyperqueue/src/client/commands/submit/command.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::BTreeSet;
use std::io::{BufRead, Read};
use std::path::{Path, PathBuf};
use std::str::FromStr;
Expand Down Expand Up @@ -246,14 +247,14 @@ pub struct SubmitJobConfOpts {
// Parameters for creating array jobs
/// Create a task array where a task will be created for each line of the given file.
/// The corresponding line will be passed to the task in environment variable `HQ_ENTRY`.
#[arg(long, conflicts_with("array"), value_hint = clap::ValueHint::FilePath)]
#[arg(long, value_hint = clap::ValueHint::FilePath)]
each_line: Option<PathBuf>,

/// Create a task array where a task will be created for each item of a JSON array stored in
/// the given file.
/// The corresponding item from the array will be passed as a JSON string to the task in
/// environment variable `HQ_ENTRY`.
#[arg(long, conflicts_with("array"), conflicts_with("each_line"), value_hint = clap::ValueHint::FilePath)]
#[arg(long, conflicts_with("each_line"), value_hint = clap::ValueHint::FilePath)]
from_json: Option<PathBuf>,

/// Create a task array where a task will be created for each number in the specified number range.
Expand Down Expand Up @@ -681,24 +682,44 @@ pub(crate) async fn send_submit_request(
)
.await?;
} else if progress {
wait_for_jobs_with_progress(session, vec![info]).await?;
wait_for_jobs_with_progress(session, &[info]).await?;
}
Ok(())
}

fn get_ids_and_entries(opts: &JobSubmitOpts) -> anyhow::Result<(IntArray, Option<Vec<BString>>)> {
let entries = if let Some(ref filename) = opts.conf.each_line {
let mut entries = if let Some(ref filename) = opts.conf.each_line {
Some(read_lines(filename)?)
} else if let Some(ref filename) = opts.conf.from_json {
Some(make_entries_from_json(filename)?)
} else {
None
};

let ids = if let Some(ref entries) = entries {
let ids = if let Some(array) = &opts.conf.array {
if let Some(es) = entries {
let id_set: BTreeSet<u32> = array
.iter()
.filter(|id| (*id as usize) < es.len())
.collect();
entries = Some(
es.into_iter()
.enumerate()
.filter_map(|(id, value)| {
if id_set.contains(&(id as u32)) {
Some(value)
} else {
None
}
})
.collect(),
);
IntArray::from_sorted_ids(id_set.into_iter())
} else {
array.clone()
}
} else if let Some(ref entries) = entries {
IntArray::from_range(0, entries.len() as JobTaskCount)
} else if let Some(ref array) = opts.conf.array {
array.clone()
} else {
IntArray::from_id(0)
};
Expand Down
27 changes: 16 additions & 11 deletions crates/hyperqueue/src/client/commands/wait.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::BTreeSet;
use std::io::Write;
use std::time::{Duration, SystemTime};
use tokio::time::sleep;
Expand All @@ -17,7 +18,7 @@ use crate::transfer::messages::{
FromClientMessage, IdSelector, JobDetailRequest, JobInfo, JobInfoRequest, TaskIdSelector,
TaskSelector, TaskStatusSelector, ToClientMessage, WaitForJobsRequest,
};
use crate::{rpc_call, JobId, JobTaskCount, Set};
use crate::{rpc_call, JobId, JobTaskCount};
use colored::Colorize;

pub async fn wait_for_jobs(
Expand Down Expand Up @@ -72,18 +73,23 @@ pub async fn wait_for_jobs(

pub async fn wait_for_jobs_with_progress(
session: &mut ClientSession,
mut jobs: Vec<JobInfo>,
jobs: &[JobInfo],
) -> anyhow::Result<()> {
jobs.retain(|info| !is_terminated(info));

if jobs.is_empty() {
if jobs.iter().all(is_terminated) {
log::warn!("There are no jobs to wait for");
} else {
let total_tasks: JobTaskCount = jobs.iter().map(|info| info.n_tasks).sum();
let mut remaining_job_ids: Set<JobId> = jobs.into_iter().map(|info| info.id).collect();
let total_tasks: JobTaskCount = jobs
.iter()
.filter(|info| !is_terminated(info))
.map(|info| info.n_tasks)
.sum();
let mut remaining_job_ids: BTreeSet<JobId> = jobs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason why Set was switched to BTreeSet and why the Vec was changed to a slice?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we need to random removes & sorted iterator for creating IntArray.

.iter()
.filter(|info| !is_terminated(info))
.map(|info| info.id)
.collect();

let total_jobs = remaining_job_ids.len();

log::info!(
"Waiting for {} {} with {} {}",
total_jobs,
Expand All @@ -95,15 +101,14 @@ pub async fn wait_for_jobs_with_progress(
let mut counters = JobTaskCounters::default();

loop {
let ids_ref = &mut remaining_job_ids;
let response = rpc_call!(
session.connection(),
FromClientMessage::JobInfo(JobInfoRequest {
selector: IdSelector::Specific(IntArray::from_ids(ids_ref.iter().map(|&id| id.into()).collect())),
selector: IdSelector::Specific(IntArray::from_sorted_ids(remaining_job_ids.iter().map(|x| x.as_num()))),
}),
ToClientMessage::JobInfoResponse(r) => r
)
.await?;
.await?;

let mut current_counters = counters;
for job in &response.jobs {
Expand Down
11 changes: 11 additions & 0 deletions crates/hyperqueue/src/client/output/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::client::output::common::{
};
use crate::client::output::json::format_datetime;
use crate::client::output::Verbosity;
use crate::common::arraydef::IntArray;
use crate::common::utils::str::{pluralize, select_plural, truncate_middle};
use crate::worker::start::WORKER_EXTRA_PROCESS_PID;
use anyhow::Error;
Expand Down Expand Up @@ -424,6 +425,16 @@ impl Output for CliOutput {
);
}

fn print_task_ids(&self, job_task_ids: Vec<(JobId, IntArray)>) {
if job_task_ids.len() == 1 {
println!("{}", job_task_ids[0].1);
} else {
for (job_id, array) in &job_task_ids {
println!("{}: {}", job_id, array);
}
}
}

fn print_job_list(&self, jobs: Vec<JobInfo>, total_jobs: usize) {
let job_count = jobs.len();
let rows: Vec<_> = jobs
Expand Down
10 changes: 10 additions & 0 deletions crates/hyperqueue/src/client/output/json.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::path::Path;
use std::time::Duration;

Expand All @@ -17,6 +18,7 @@ use crate::client::job::WorkerMap;
use crate::client::output::common::{group_jobs_by_status, resolve_task_paths, TaskToPathsMap};
use crate::client::output::outputs::{Output, OutputStream};
use crate::client::output::Verbosity;
use crate::common::arraydef::IntArray;
use crate::common::manager::info::{GetManagerInfo, ManagerType};
use crate::server::autoalloc::{Allocation, AllocationState, QueueId};
use crate::server::job::{JobTaskInfo, JobTaskState, StartedTaskData};
Expand Down Expand Up @@ -199,6 +201,14 @@ impl Output for JsonOutput {
self.print(format_tasks(tasks, map));
}

fn print_task_ids(&self, job_task_ids: Vec<(JobId, IntArray)>) {
let map: HashMap<JobId, Vec<u32>> = job_task_ids
.into_iter()
.map(|(key, value)| (key, value.iter().collect()))
.collect();
self.print(json!(map));
}

fn print_summary(&self, filename: &Path, summary: Summary) {
let json = json!({
"filename": filename,
Expand Down
2 changes: 2 additions & 0 deletions crates/hyperqueue/src/client/output/outputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::path::Path;

use crate::client::output::common::TaskToPathsMap;
use crate::client::output::Verbosity;
use crate::common::arraydef::IntArray;
use crate::server::job::JobTaskInfo;
use crate::JobId;
use core::time::Duration;
Expand Down Expand Up @@ -77,6 +78,7 @@ pub trait Output {
server_uid: &str,
verbosity: Verbosity,
);
fn print_task_ids(&self, jobs_task_id: Vec<(JobId, IntArray)>);

// Log
fn print_summary(&self, filename: &Path, summary: Summary);
Expand Down
3 changes: 3 additions & 0 deletions crates/hyperqueue/src/client/output/quiet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::client::output::common::{
};
use crate::client::output::outputs::{Output, OutputStream};
use crate::client::status::{job_status, Status};
use crate::common::arraydef::IntArray;
use crate::server::autoalloc::Allocation;
use crate::server::job::JobTaskInfo;
use crate::stream::reader::logfile::Summary;
Expand Down Expand Up @@ -130,6 +131,8 @@ impl Output for Quiet {
) {
}

fn print_task_ids(&self, _job_task_ids: Vec<(JobId, IntArray)>) {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that here we could write one ID per line? This mode should really be renamed from quiet to something like bash-processable :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed it. if there are 2 or more jobs, it prints also job ids.


// Log
fn print_summary(&self, _filename: &Path, _summary: Summary) {}

Expand Down
Loading
Loading