Skip to content

Commit

Permalink
Code review updates
Browse files Browse the repository at this point in the history
  • Loading branch information
spirali committed Dec 12, 2023
1 parent 81f768f commit 29cf24c
Show file tree
Hide file tree
Showing 13 changed files with 49 additions and 41 deletions.
2 changes: 1 addition & 1 deletion crates/hyperqueue/src/client/commands/submit/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ fn get_ids_and_entries(opts: &JobSubmitOpts) -> anyhow::Result<(IntArray, Option
})
.collect(),
);
IntArray::from_ids(id_set.iter().copied())
IntArray::from_sorted_ids(id_set.into_iter())
} else {
array.clone()
}
Expand Down
2 changes: 1 addition & 1 deletion crates/hyperqueue/src/client/commands/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ pub async fn wait_for_jobs_with_progress(
let response = rpc_call!(
session.connection(),
FromClientMessage::JobInfo(JobInfoRequest {
selector: IdSelector::Specific(IntArray::from_ids(remaining_job_ids.iter().map(|x| x.as_num()))),
selector: IdSelector::Specific(IntArray::from_sorted_ids(remaining_job_ids.iter().map(|x| x.as_num()))),
}),
ToClientMessage::JobInfoResponse(r) => r
)
Expand Down
8 changes: 6 additions & 2 deletions crates/hyperqueue/src/client/output/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,12 @@ impl Output for CliOutput {
}

fn print_task_ids(&self, job_task_ids: Vec<(JobId, IntArray)>) {
for (_, array) in &job_task_ids {
println!("{}", array);
if job_task_ids.len() == 1 {
println!("{}", job_task_ids[0].1);
} else {
for (job_id, array) in &job_task_ids {
println!("{}: {}", job_id, array);
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/hyperqueue/src/client/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ pub async fn output_job_task_ids(
.map(|(job_id, detail)| {
Ok((
*job_id,
IntArray::from_ids(
IntArray::from_sorted_ids(
detail
.as_ref()
.ok_or_else(|| HqError::GenericError("Job Id not found".to_string()))?
Expand Down
31 changes: 12 additions & 19 deletions crates/hyperqueue/src/common/arraydef.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,22 @@ impl IntArray {
IntArray { ranges }
}

// ids has to be sorted!
pub fn from_ids(ids: impl Iterator<Item = u32>) -> IntArray {
pub fn from_sorted_ids(ids: impl Iterator<Item = u32>) -> IntArray {
let mut ranges: Vec<IntRange> = Vec::new();
let mut last_id = None;
for id in ids {
debug_assert!(last_id.map(|last_id| last_id < id).unwrap_or(true));
if last_id.map(|last_id| last_id + 1 == id).unwrap_or(false) {
ranges.last_mut().unwrap().count += 1;
} else {
ranges.push(IntRange::new(id, 1, 1));
}
last_id = Some(id)
last_id = Some(id);
}
IntArray { ranges }
}
pub fn from_id(id: u32) -> IntArray {
Self::from_ids([id].iter().copied())
Self::from_sorted_ids([id].into_iter())
}

pub fn from_range(start: u32, count: u32) -> Self {
Expand Down Expand Up @@ -92,26 +92,19 @@ impl FromStr for IntArray {

impl fmt::Display for IntArray {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut str = String::new();
for x in &self.ranges {
for (idx, x) in self.ranges.iter().enumerate() {
if idx > 0 {
write!(f, ",")?;
}
if x.count == 1 {
str.push_str(&format!("{}, ", x.start));
write!(f, "{}", x.start)?;
} else if x.step == 1 {
str.push_str(&format!("{}-{}, ", x.start, x.start + x.count - 1));
write!(f, "{}-{}", x.start, x.start + x.count - 1)?;
} else {
str.push_str(&format!(
"{}-{}:{}, ",
x.start,
x.start + x.count - 1,
x.step
));
write!(f, "{}-{}:{}", x.start, x.start + x.count - 1, x.step)?;
}
}
if str.len() >= 2 {
write!(f, "{}", &str[0..str.len() - 2])
} else {
Ok(())
}
Ok(())
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/hyperqueue/src/common/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ pub enum JobCommand {
Wait(JobWaitOpts),
/// Interactively observe the execution of a job
Progress(JobProgressOpts),
/// Print tasks Ids for given job
/// Print task Ids for given job
TaskIds(JobTaskIdsOpts),
}

Expand Down
2 changes: 1 addition & 1 deletion crates/hyperqueue/src/server/client/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ pub async fn handle_resubmit(

ids.sort_unstable();
JobDescription::Array {
ids: IntArray::from_ids(ids.iter().copied()),
ids: IntArray::from_sorted_ids(ids.into_iter()),
entries: entries.clone(),
task_desc: task_desc.clone(),
}
Expand Down
6 changes: 3 additions & 3 deletions crates/pyhq/src/client/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,9 @@ pub fn wait_for_jobs_impl(

loop {
let selector =
IdSelector::Specific(IntArray::from_ids(remaining_job_ids.iter().copied()));
IdSelector::Specific(IntArray::from_sorted_ids(remaining_job_ids.into_iter()));

Check failure on line 237 in crates/pyhq/src/client/job.rs

View workflow job for this annotation

GitHub Actions / Test

use of moved value: `remaining_job_ids`

response = hyperqueue::rpc_call!(
response = rpc_call!(
ctx.session.connection(),
FromClientMessage::JobInfo(JobInfoRequest {
selector,
Expand Down Expand Up @@ -310,7 +310,7 @@ pub fn get_failed_tasks_impl(
) -> PyResult<FailedTaskMap> {
run_future(async move {
let message = FromClientMessage::JobDetail(JobDetailRequest {
job_id_selector: IdSelector::Specific(IntArray::from_ids(job_ids.into_iter())),
job_id_selector: IdSelector::Specific(IntArray::from_sorted_ids(job_ids.into_iter())),
task_selector: Some(TaskSelector {
id_selector: TaskIdSelector::All,
status_selector: TaskStatusSelector::Specific(vec![Status::Failed]),
Expand Down
2 changes: 1 addition & 1 deletion docs/jobs/arrays.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ and the other with `HQ_ENTRY` set to `{"batch_size": 8, "learning_rate": 0.001}`
### Combining with `--each-line`/`--from-json` with `--array`

Option `--each-line` or `--from-json` can be combined with option `--array`.
In such case, only a subset of lines/json will be submited.
In such case, only a subset of lines/json will be submitted.
If `--array` defines an ID that exceeds the number of lines in the file (or the number of elements in JSON), then the ID is silently removed.


Expand Down
2 changes: 1 addition & 1 deletion docs/jobs/failure.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ However, in case of [task arrays](arrays.md), different tasks may end in differe
recompute only tasks with a specific status (e.g. failed tasks).

By following combination of commands you may recompute only failed tasks. Let us assume that we want to recompute
all failed jobs in job 5:
all failed tasks in job 5:

```commandline
$ hq submit --array=`hq job task-ids 5 --filter=failed` ./my-computation
Expand Down
6 changes: 4 additions & 2 deletions tests/test_entries.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ def test_each_line_with_array(hq_env: HqEnv):
[
"submit",
"--each-line=input",
"--array", "2-4, 6",
"--array",
"2-4, 6",
"--",
"bash",
"-c",
Expand Down Expand Up @@ -154,7 +155,8 @@ def test_json_with_array(hq_env: HqEnv):
[
"submit",
"--from-json=input",
"--array", "2-3, 5, 6, 7, 1000",
"--array",
"2-3, 5, 6, 7, 1000",
"--",
"bash",
"-c",
Expand Down
23 changes: 16 additions & 7 deletions tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -702,10 +702,10 @@ def test_job_resubmit_with_status(hq_env: HqEnv):
wait_for_job_state(hq_env, 1, "FAILED")

table = hq_env.command(["job", "resubmit", "1", "--filter=failed"], as_table=True)
table.check_row_value("Tasks", "4; Ids: 4-6, 8")
table.check_row_value("Tasks", "4; Ids: 4-6,8")

table = hq_env.command(["job", "resubmit", "1", "--filter=finished"], as_table=True)
table.check_row_value("Tasks", "3; Ids: 3, 7, 9")
table.check_row_value("Tasks", "3; Ids: 3,7,9")


def test_job_resubmit_all(hq_env: HqEnv):
Expand All @@ -715,7 +715,7 @@ def test_job_resubmit_all(hq_env: HqEnv):
wait_for_job_state(hq_env, 1, "FINISHED")

table = hq_env.command(["job", "resubmit", "1"], as_table=True)
table.check_row_value("Tasks", "3; Ids: 2, 7, 9")
table.check_row_value("Tasks", "3; Ids: 2,7,9")


def test_job_resubmit_empty(hq_env: HqEnv):
Expand Down Expand Up @@ -1477,21 +1477,30 @@ def check_child_process_exited(hq_env: HqEnv, stop_fn: Callable[[subprocess.Pope
wait_for_pid_exit(parent)
wait_for_pid_exit(child)


def test_job_task_ids(hq_env: HqEnv):
hq_env.start_server()
hq_env.command(["submit", "--array=2,7,9,20-30", "--", "python", "-c", "import os; assert os.environ['HQ_TASK_ID'] not in ['25', '26', '27', '28']"])
hq_env.command(
[
"submit",
"--array=2,7,9,20-30",
"--",
"python",
"-c",
"import os; assert os.environ['HQ_TASK_ID'] not in ['25', '26', '27', '28']",
]
)
hq_env.start_workers(1, cpus=1)
wait_for_job_state(hq_env, 1, "FAILED")

result = hq_env.command(["job", "task-ids", "1"])
assert result == "2, 7, 9, 20-30\n"
assert result == "2,7,9,20-30\n"

result = hq_env.command(["job", "task-ids", "1", "--filter", "finished"])
assert result == "2, 7, 9, 20-24, 29-30\n"
assert result == "2,7,9,20-24,29-30\n"

result = hq_env.command(["job", "task-ids", "1", "--filter", "failed"])
assert result == "25-28\n"

result = hq_env.command(["job", "task-ids", "1", "--filter", "canceled"])
assert result == "\n"

2 changes: 1 addition & 1 deletion tests/test_jobfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ def test_job_file_array(hq_env: HqEnv, tmp_path):
""")
hq_env.command(["job", "submit-file", "job.toml"])
r = hq_env.command(["job", "info", "1"], as_table=True)
r.check_row_value("Tasks", "7; Ids: 2, 10-14, 120")
r.check_row_value("Tasks", "7; Ids: 2,10-14,120")


def test_job_file_fail_mixing_array_and_tasks(hq_env: HqEnv, tmp_path):
Expand Down

0 comments on commit 29cf24c

Please sign in to comment.