diff --git a/CHANGELOG.md b/CHANGELOG.md index 341210a03..5d319acea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,12 @@ +# Dev + +## Breaking change +* The output format of the `job info` command with JSON output mode has been changed. Note that +the JSON output mode is still unstable. + +## New features +* Allow setting minimum duration for a task (`min_time` resource value) using the Python API. + # v0.17.0 ## Breaking change diff --git a/crates/hyperqueue/src/client/output/json.rs b/crates/hyperqueue/src/client/output/json.rs index df998a724..cacc8cae7 100644 --- a/crates/hyperqueue/src/client/output/json.rs +++ b/crates/hyperqueue/src/client/output/json.rs @@ -80,90 +80,63 @@ impl Output for JsonOutput { self.print(json!(statuses)) } fn print_job_detail(&self, jobs: Vec, _worker_map: WorkerMap, server_uid: &str) { - let job_details: Vec<_> = jobs.into_iter().map(|job| { - let task_paths = resolve_task_paths(&job, server_uid); - - let JobDetail { - info, - job_desc, - tasks, - tasks_not_found: _, - max_fails, - submission_date, - completion_date_or_now, - submit_dir, - } = job; - - let finished_at = if info.counters.is_terminated(info.n_tasks) { - Some(completion_date_or_now) - } else { - None - }; - - let mut json = json!({ - "info": format_job_info(info), - "max_fails": max_fails, - "started_at": format_datetime(submission_date), - "finished_at": finished_at.map(format_datetime), - "submit_dir": submit_dir - }); - - if let JobDescription::Array { - task_desc: - TaskDescription { - program: - ProgramDefinition { - args, - env, - stdout, - stderr, - cwd, - stdin: _, - }, - resources, - pin_mode, - time_limit, - priority, - task_dir, - crash_limit, - }, - .. - } = job_desc - { - json["program"] = json!({ - "args": args.into_iter().map(|args| args.to_string()).collect::>(), - "env": env.into_iter().map(|(key, value)| (key.to_string(), value.to_string())).collect::>(), - "cwd": cwd, - "stderr": format_stdio_def(&stderr), - "stdout": format_stdio_def(&stdout), - }); - json["resources"] = resources.variants.into_iter().map(|v| { - let ResourceRequest { n_nodes, resources, min_time } = v; - json!({ - "n_nodes": n_nodes, - "resources": resources.into_iter().map(|res| { - json!({ - "resource": res.resource, - "request": res.policy - }) - }).collect::>(), - "min_time": format_duration(min_time) - })} - ).collect(); - json["pin_mode"] = json!(match pin_mode { - PinMode::None => None, - PinMode::OpenMP => Some("openmp"), - PinMode::TaskSet => Some("taskset"), + let job_details: Vec<_> = jobs + .into_iter() + .map(|job| { + let task_paths = resolve_task_paths(&job, server_uid); + + let JobDetail { + info, + job_desc, + tasks, + tasks_not_found: _, + max_fails, + submission_date, + completion_date_or_now, + submit_dir, + } = job; + + let finished_at = if info.counters.is_terminated(info.n_tasks) { + Some(completion_date_or_now) + } else { + None + }; + + let mut json = json!({ + "info": format_job_info(info), + "max_fails": max_fails, + "started_at": format_datetime(submission_date), + "finished_at": finished_at.map(format_datetime), + "submit_dir": submit_dir }); - json["priority"] = json!(priority); - json["time_limit"] = json!(time_limit.map(format_duration)); - json["task_dir"] = json!(task_dir); - json["crash_limit"] = json!(crash_limit); - } - json["tasks"] = format_tasks(tasks, task_paths); - json - }).collect(); + let task_description = match job_desc { + JobDescription::Array { task_desc, .. } => { + let mut task = json!({}); + format_task_description(task_desc, &mut task); + json!({ + "array": task + }) + } + JobDescription::Graph { tasks } => { + let tasks: Vec = tasks + .into_iter() + .map(|task| { + let mut json = json!({}); + format_task_description(task.task_desc, &mut json); + json + }) + .collect(); + json!({ + "graph": tasks + }) + } + }; + json["task-desc"] = task_description; + json["tasks"] = format_tasks(tasks, task_paths); + json + }) + .collect(); self.print(Value::Array(job_details)); } @@ -216,12 +189,14 @@ impl Output for JsonOutput { fn print_task_info( &self, - _job: (JobId, JobDetail), - _tasks: Vec, + job: (JobId, JobDetail), + tasks: Vec, _worker_map: WorkerMap, - _server_uid: &str, + server_uid: &str, _verbosity: Verbosity, ) { + let map = resolve_task_paths(&job.1, server_uid); + self.print(format_tasks(tasks, map)); } fn print_summary(&self, filename: &Path, summary: Summary) { @@ -257,6 +232,64 @@ impl Output for JsonOutput { } } +fn format_task_description(task_desc: TaskDescription, json: &mut Value) { + let TaskDescription { + program: + ProgramDefinition { + args, + env, + stdout, + stderr, + cwd, + stdin: _, + }, + resources, + pin_mode, + time_limit, + priority, + task_dir, + crash_limit, + } = task_desc; + + json["program"] = json!({ + "args": args.into_iter().map(|args| args.to_string()).collect::>(), + "env": env.into_iter().map(|(key, value)| (key.to_string(), value.to_string())).collect::>(), + "cwd": cwd, + "stderr": format_stdio_def(&stderr), + "stdout": format_stdio_def(&stdout), + }); + json["resources"] = resources + .variants + .into_iter() + .map(|v| { + let ResourceRequest { + n_nodes, + resources, + min_time, + } = v; + json!({ + "n_nodes": n_nodes, + "resources": resources.into_iter().map(|res| { + json!({ + "resource": res.resource, + "request": res.policy + }) + }).collect::>(), + "min_time": format_duration(min_time) + }) + }) + .collect(); + json["pin_mode"] = json!(match pin_mode { + PinMode::None => None, + PinMode::OpenMP => Some("openmp"), + PinMode::TaskSet => Some("taskset"), + }); + json["priority"] = json!(priority); + json["time_limit"] = json!(time_limit.map(format_duration)); + json["task_dir"] = json!(task_dir); + json["crash_limit"] = json!(crash_limit); +} + fn fill_task_started_data(dict: &mut Value, data: StartedTaskData) { dict["started_at"] = format_datetime(data.start_date); if data.worker_ids.len() == 1 { diff --git a/crates/pyhq/python/hyperqueue/ffi/protocol.py b/crates/pyhq/python/hyperqueue/ffi/protocol.py index 3a4709f66..da3e324e5 100644 --- a/crates/pyhq/python/hyperqueue/ffi/protocol.py +++ b/crates/pyhq/python/hyperqueue/ffi/protocol.py @@ -1,4 +1,5 @@ import dataclasses +import datetime from typing import Dict, List, Optional, Sequence, Union from ..output import StdioDef @@ -7,6 +8,7 @@ class ResourceRequest: n_nodes: int = 0 resources: Dict[str, Union[int, float, str]] = dataclasses.field(default_factory=dict) + min_time: Optional[float] = None def __init__( self, @@ -14,15 +16,17 @@ def __init__( n_nodes=0, cpus: Union[int, float, str] = 1, resources: Optional[Dict[str, Union[int, float, str]]] = None, + min_time: Optional[datetime.timedelta] = None, ): self.n_nodes = n_nodes if resources is None: resources = {} resources["cpus"] = cpus self.resources = resources + self.min_time = min_time.total_seconds() if min_time is not None else None def __repr__(self): - return f"" + return f"" @dataclasses.dataclass() diff --git a/crates/pyhq/src/client/job.rs b/crates/pyhq/src/client/job.rs index b7c50e804..55ccc2234 100644 --- a/crates/pyhq/src/client/job.rs +++ b/crates/pyhq/src/client/job.rs @@ -23,7 +23,7 @@ use tako::gateway::{ResourceRequestEntries, ResourceRequestEntry, ResourceReques use tako::program::{FileOnCloseBehavior, ProgramDefinition, StdioDef}; use tako::resources::{AllocationRequest, NumOfNodes, ResourceAmount}; -use crate::marshal::FromPy; +use crate::marshal::{FromPy, WrappedDuration}; use crate::utils::error::ToPyResult; use crate::{borrow_mut, run_future, ClientContextPtr, FromPyObject, PyJobId, PyTaskId}; @@ -38,6 +38,7 @@ enum AllocationValue { pub struct ResourceRequestDescription { n_nodes: NumOfNodes, resources: HashMap, + min_time: Option, } #[derive(Debug, FromPyObject)] @@ -182,7 +183,7 @@ fn build_task_desc(desc: TaskDescription, submit_dir: &Path) -> anyhow::Result>()?, - min_time: Default::default(), + min_time: rq.min_time.map(|v| v.into()).unwrap_or_default(), }) }) .collect::>()?, diff --git a/crates/pyhq/src/marshal.rs b/crates/pyhq/src/marshal.rs index 3ebc50cb4..29344ea50 100644 --- a/crates/pyhq/src/marshal.rs +++ b/crates/pyhq/src/marshal.rs @@ -59,3 +59,9 @@ impl<'a> FromPyObject<'a> for WrappedDuration { Ok(WrappedDuration(Duration::from_nanos(nanoseconds as u64))) } } + +impl From for Duration { + fn from(value: WrappedDuration) -> Self { + value.0 + } +} diff --git a/tests/output/test_json.py b/tests/output/test_json.py index a23ffb05a..80c5a8516 100644 --- a/tests/output/test_json.py +++ b/tests/output/test_json.py @@ -131,31 +131,35 @@ def test_print_job_list(hq_env: HqEnv): schema.validate(output) -JOB_DETAIL_SCHEMA = { +ARRAY_JOB_DETAIL_SCHEMA = { "info": { "id": int, "name": "echo", "task_count": 1, "task_stats": dict, }, - "resources": list, + "task-desc": { + "array": { + "resources": list, + "priority": 0, + "program": { + "args": ["echo", "tt"], + "env": {}, + "cwd": str, + "stdout": str, + "stderr": str, + }, + "pin_mode": None, + "crash_limit": int, + "task_dir": bool, + "time_limit": None, + } + }, "finished_at": None, "max_fails": None, - "pin_mode": None, - "priority": 0, - "program": { - "args": ["echo", "tt"], - "env": {}, - "cwd": str, - "stdout": str, - "stderr": str, - }, "started_at": str, - "time_limit": None, "submit_dir": str, "tasks": list, - "task_dir": bool, - "crash_limit": int, } @@ -164,7 +168,7 @@ def test_print_job_detail(hq_env: HqEnv): hq_env.command(["submit", "echo", "tt"]) output = parse_json_output(hq_env, ["--output-mode=json", "job", "info", "1"]) - schema = Schema([JOB_DETAIL_SCHEMA]) + schema = Schema([ARRAY_JOB_DETAIL_SCHEMA]) schema.validate(output) @@ -188,26 +192,24 @@ def test_print_job_detail_resources(hq_env: HqEnv): output = parse_json_output(hq_env, ["--output-mode=json", "job", "info", "1"]) schema = Schema( - [ - { - "resources": [ - { - "min_time": 0.0, - "n_nodes": 0, - "resources": [ - {"request": {"Compact": 2 * 10_000}, "resource": "res1"}, - {"request": {"ForceCompact": 8 * 10_000}, "resource": "res2"}, - {"request": "All", "resource": "res3"}, - {"request": {"Scatter": 4 * 10_000}, "resource": "res4"}, - {"request": {"Compact": 1 * 10_000}, "resource": "cpus"}, - ], - } - ] - } - ], + { + "resources": [ + { + "min_time": 0.0, + "n_nodes": 0, + "resources": [ + {"request": {"Compact": 2 * 10_000}, "resource": "res1"}, + {"request": {"ForceCompact": 8 * 10_000}, "resource": "res2"}, + {"request": "All", "resource": "res3"}, + {"request": {"Scatter": 4 * 10_000}, "resource": "res4"}, + {"request": {"Compact": 1 * 10_000}, "resource": "cpus"}, + ], + } + ] + }, ignore_extra_keys=True, ) - schema.validate(output) + schema.validate(output[0]["task-desc"]["array"]) def test_print_job_detail_multiple_jobs(hq_env: HqEnv): @@ -216,7 +218,7 @@ def test_print_job_detail_multiple_jobs(hq_env: HqEnv): hq_env.command(["submit", "echo", "tt"]) output = parse_json_output(hq_env, ["--output-mode=json", "job", "info", "1,2"]) - schema = Schema([JOB_DETAIL_SCHEMA]) + schema = Schema([ARRAY_JOB_DETAIL_SCHEMA]) schema.validate(output) diff --git a/tests/pyapi/test_job.py b/tests/pyapi/test_job.py index cb6b7ab8f..863fcea4e 100644 --- a/tests/pyapi/test_job.py +++ b/tests/pyapi/test_job.py @@ -1,3 +1,4 @@ +import datetime import random import time from pathlib import Path @@ -294,3 +295,18 @@ def test_job_forget_running(hq_env: HqEnv): with pytest.raises(Exception, match="Cannot forget job 1"): client.forget(submitted_job) + + +def test_resource_min_time(hq_env: HqEnv): + (job, client) = prepare_job_client(hq_env) + + job.program( + args=bash("echo Hello"), + resources=[ + ResourceRequest(min_time=datetime.timedelta(seconds=42)), + ], + ) + client.submit(job) + + data = hq_env.command(["--output-mode=json", "job", "info", "last"], as_json=True)[0] + assert data["task-desc"]["graph"][0]["resources"][0]["min_time"] == 42.0