Skip to content

Commit

Permalink
Implement OpenPBS queue options QSUB_CMD, QSTAT_CMD, QDEL_CMD
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-eq committed Apr 24, 2024
1 parent f3f0b85 commit 724fcfb
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 5 deletions.
3 changes: 3 additions & 0 deletions src/ert/scheduler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ def create_driver(config: QueueConfig) -> Driver:
for key, value in config.queue_options.get(QueueSystem.TORQUE, [])
}
return OpenPBSDriver(
qsub_cmd=queue_config.get("QSUB_CMD"),
qstat_cmd=queue_config.get("QSTAT_CMD"),
qdel_cmd=queue_config.get("QDEL_CMD"),
queue_name=queue_config.get("QUEUE"),
keep_qsub_output=queue_config.get("KEEP_QSUB_OUTPUT", "0"),
memory_per_job=queue_config.get("MEMORY_PER_JOB"),
Expand Down
16 changes: 12 additions & 4 deletions src/ert/scheduler/openpbs_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import asyncio
import logging
import shlex
import shutil
from pathlib import Path
from typing import (
Dict,
Expand Down Expand Up @@ -114,6 +115,9 @@ def __init__(
num_cpus_per_node: Optional[int] = None,
cluster_label: Optional[str] = None,
job_prefix: Optional[str] = None,
qsub_cmd: Optional[str] = None,
qstat_cmd: Optional[str] = None,
qdel_cmd: Optional[str] = None,
) -> None:
super().__init__()

Expand All @@ -128,6 +132,10 @@ def __init__(
self._sleep_time_between_cmd_retries = 2
self._poll_period = _POLL_PERIOD

self._qsub_cmd = Path(qsub_cmd or shutil.which("qsub") or "qsub")
self._qstat_cmd = Path(qstat_cmd or shutil.which("qstat") or "qstat")
self._qdel_cmd = Path(qdel_cmd or shutil.which("qdel") or "qdel")

self._jobs: MutableMapping[str, Tuple[int, AnyJob]] = {}
self._iens2jobid: MutableMapping[int, str] = {}
self._non_finished_job_ids: Set[str] = set()
Expand Down Expand Up @@ -181,7 +189,7 @@ async def submit(
)
name_prefix = self._job_prefix or ""
qsub_with_args: List[str] = [
"qsub",
str(self._qsub_cmd),
"-rn", # Don't restart on failure
f"-N{name_prefix}{name}", # Set name of job
*arg_queue_name,
Expand Down Expand Up @@ -224,7 +232,7 @@ async def kill(self, iens: int) -> None:
logger.debug(f"Killing realization {iens} with PBS-id {job_id}")

process_success, process_message = await self._execute_with_retry(
["qdel", str(job_id)],
[str(self._qdel_cmd), str(job_id)],
retry_codes=(QDEL_REQUEST_INVALID,),
accept_codes=(QDEL_JOB_HAS_FINISHED,),
retries=self._num_pbs_cmd_retries,
Expand All @@ -242,7 +250,7 @@ async def poll(self) -> None:

if self._non_finished_job_ids:
process = await asyncio.create_subprocess_exec(
"qstat",
str(self._qstat_cmd),
"-Ex",
"-w", # wide format
*self._non_finished_job_ids,
Expand Down Expand Up @@ -270,7 +278,7 @@ async def poll(self) -> None:

if self._finished_job_ids:
process = await asyncio.create_subprocess_exec(
"qstat",
str(self._qstat_cmd),
"-Efx",
"-Fjson",
*self._finished_job_ids,
Expand Down
10 changes: 9 additions & 1 deletion tests/unit_tests/scheduler/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,9 @@ def test_scheduler_create_openpbs_driver():
num_cpus_per_node = 1
cluster_label = "bar_cluster_label"
job_prefix = "foo_job_prefix"

qsub_cmd = "bar_qsub_cmd"
qdel_cmd = "foo_qdel_cmd"
qstat_cmd = "bar_qstat_cmd"
queue_config_dict = {
"QUEUE_SYSTEM": "TORQUE",
"QUEUE_OPTION": [
Expand All @@ -678,6 +680,9 @@ def test_scheduler_create_openpbs_driver():
("TORQUE", "NUM_CPUS_PER_NODE", num_cpus_per_node),
("TORQUE", "CLUSTER_LABEL", cluster_label),
("TORQUE", "JOB_PREFIX", job_prefix),
("TORQUE", "QSUB_CMD", qsub_cmd),
("TORQUE", "QSTAT_CMD", qstat_cmd),
("TORQUE", "QDEL_CMD", qdel_cmd),
],
}
queue_config = QueueConfig.from_dict(queue_config_dict)
Expand All @@ -689,3 +694,6 @@ def test_scheduler_create_openpbs_driver():
assert driver._num_cpus_per_node == num_cpus_per_node
assert driver._cluster_label == cluster_label
assert driver._job_prefix == job_prefix
assert str(driver._qsub_cmd) == qsub_cmd
assert str(driver._qstat_cmd) == qstat_cmd
assert str(driver._qdel_cmd) == qdel_cmd

0 comments on commit 724fcfb

Please sign in to comment.