From 724fcfb6ed705ce41b4250878652029c077ff4e2 Mon Sep 17 00:00:00 2001 From: Jonathan Karlsen Date: Wed, 24 Apr 2024 09:55:55 +0200 Subject: [PATCH] Implement OpenPBS queue options `QSUB_CMD`, `QSTAT_CMD`, `QDEL_CMD` --- src/ert/scheduler/__init__.py | 3 +++ src/ert/scheduler/openpbs_driver.py | 16 ++++++++++++---- tests/unit_tests/scheduler/test_scheduler.py | 10 +++++++++- 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/src/ert/scheduler/__init__.py b/src/ert/scheduler/__init__.py index 5658b3c1a1b..c5c4875b8a5 100644 --- a/src/ert/scheduler/__init__.py +++ b/src/ert/scheduler/__init__.py @@ -22,6 +22,9 @@ def create_driver(config: QueueConfig) -> Driver: for key, value in config.queue_options.get(QueueSystem.TORQUE, []) } return OpenPBSDriver( + qsub_cmd=queue_config.get("QSUB_CMD"), + qstat_cmd=queue_config.get("QSTAT_CMD"), + qdel_cmd=queue_config.get("QDEL_CMD"), queue_name=queue_config.get("QUEUE"), keep_qsub_output=queue_config.get("KEEP_QSUB_OUTPUT", "0"), memory_per_job=queue_config.get("MEMORY_PER_JOB"), diff --git a/src/ert/scheduler/openpbs_driver.py b/src/ert/scheduler/openpbs_driver.py index 8b73f46535a..2c358a8c554 100644 --- a/src/ert/scheduler/openpbs_driver.py +++ b/src/ert/scheduler/openpbs_driver.py @@ -3,6 +3,7 @@ import asyncio import logging import shlex +import shutil from pathlib import Path from typing import ( Dict, @@ -114,6 +115,9 @@ def __init__( num_cpus_per_node: Optional[int] = None, cluster_label: Optional[str] = None, job_prefix: Optional[str] = None, + qsub_cmd: Optional[str] = None, + qstat_cmd: Optional[str] = None, + qdel_cmd: Optional[str] = None, ) -> None: super().__init__() @@ -128,6 +132,10 @@ def __init__( self._sleep_time_between_cmd_retries = 2 self._poll_period = _POLL_PERIOD + self._qsub_cmd = Path(qsub_cmd or shutil.which("qsub") or "qsub") + self._qstat_cmd = Path(qstat_cmd or shutil.which("qstat") or "qstat") + self._qdel_cmd = Path(qdel_cmd or shutil.which("qdel") or "qdel") + self._jobs: MutableMapping[str, Tuple[int, AnyJob]] = {} self._iens2jobid: MutableMapping[int, str] = {} self._non_finished_job_ids: Set[str] = set() @@ -181,7 +189,7 @@ async def submit( ) name_prefix = self._job_prefix or "" qsub_with_args: List[str] = [ - "qsub", + str(self._qsub_cmd), "-rn", # Don't restart on failure f"-N{name_prefix}{name}", # Set name of job *arg_queue_name, @@ -224,7 +232,7 @@ async def kill(self, iens: int) -> None: logger.debug(f"Killing realization {iens} with PBS-id {job_id}") process_success, process_message = await self._execute_with_retry( - ["qdel", str(job_id)], + [str(self._qdel_cmd), str(job_id)], retry_codes=(QDEL_REQUEST_INVALID,), accept_codes=(QDEL_JOB_HAS_FINISHED,), retries=self._num_pbs_cmd_retries, @@ -242,7 +250,7 @@ async def poll(self) -> None: if self._non_finished_job_ids: process = await asyncio.create_subprocess_exec( - "qstat", + str(self._qstat_cmd), "-Ex", "-w", # wide format *self._non_finished_job_ids, @@ -270,7 +278,7 @@ async def poll(self) -> None: if self._finished_job_ids: process = await asyncio.create_subprocess_exec( - "qstat", + str(self._qstat_cmd), "-Efx", "-Fjson", *self._finished_job_ids, diff --git a/tests/unit_tests/scheduler/test_scheduler.py b/tests/unit_tests/scheduler/test_scheduler.py index 4eb65f28a6d..517f5768bac 100644 --- a/tests/unit_tests/scheduler/test_scheduler.py +++ b/tests/unit_tests/scheduler/test_scheduler.py @@ -667,7 +667,9 @@ def test_scheduler_create_openpbs_driver(): num_cpus_per_node = 1 cluster_label = "bar_cluster_label" job_prefix = "foo_job_prefix" - + qsub_cmd = "bar_qsub_cmd" + qdel_cmd = "foo_qdel_cmd" + qstat_cmd = "bar_qstat_cmd" queue_config_dict = { "QUEUE_SYSTEM": "TORQUE", "QUEUE_OPTION": [ @@ -678,6 +680,9 @@ def test_scheduler_create_openpbs_driver(): ("TORQUE", "NUM_CPUS_PER_NODE", num_cpus_per_node), ("TORQUE", "CLUSTER_LABEL", cluster_label), ("TORQUE", "JOB_PREFIX", job_prefix), + ("TORQUE", "QSUB_CMD", qsub_cmd), + ("TORQUE", "QSTAT_CMD", qstat_cmd), + ("TORQUE", "QDEL_CMD", qdel_cmd), ], } queue_config = QueueConfig.from_dict(queue_config_dict) @@ -689,3 +694,6 @@ def test_scheduler_create_openpbs_driver(): assert driver._num_cpus_per_node == num_cpus_per_node assert driver._cluster_label == cluster_label assert driver._job_prefix == job_prefix + assert str(driver._qsub_cmd) == qsub_cmd + assert str(driver._qstat_cmd) == qstat_cmd + assert str(driver._qdel_cmd) == qdel_cmd