Skip to content

Commit

Permalink
Scheduler: Refactor interface to make it more generic (#6043)
Browse files Browse the repository at this point in the history
The original `Scheduler` interface made the assumption that all
interfaces would interact with the scheduler through a command line
interface that would be invoked through a bash shell. However, this is
not always the case. Prime example is the new FirecREST service, being
developed by CSCS, that will allow to interact with the scheduler
through a REST API. Due to the assumptions of the `Scheduler` interface,
it was difficult to implement it for this use case.

The `Scheduler` interface is made more generic, by removing the
following (abstract) methods:

 * `_get_joblist_command`
 * `_parse_joblist_output`
 * `_get_submit_command`
 * `_parse_submit_output`
 * `submit_from_script`
 * `kill`
 * `_get_kill_command`
 * `_parse_kill_output`

They are replaced by three abstract methods:

 * `submit_job`
 * `get_jobs`
 * `kill_job`

The new interface no longer makes an assumption about how a plugin
implements these methods. The first one should simply submit the job,
given the location of the submission script on the remote computer. The
second should return the status of the list of active jobs. And the
final should kill a job and return the result.

Unfortunately, this change is backwards incompatible and will break
existing scheduler plugins. To simplify the migration pathway, a
subclass `BashCliScheduler` is added. This implements the new `Scheduler`
interface while maintaining the old interface. This means that this new
class is a drop-in replacement of the old `Scheduler` class for existing
plugins. The plugins that ship with `aiida-core` are all updated to
subclass from `BashCliScheduler`. Any existing plugins that subclassed
from these plugins will therefore not be affected whatsoever by these
changes.
  • Loading branch information
sphuber authored Jul 11, 2024
1 parent cba6e7c commit 954cbdd
Show file tree
Hide file tree
Showing 13 changed files with 194 additions and 116 deletions.
10 changes: 5 additions & 5 deletions src/aiida/engine/daemon/execmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ def submit_calculation(calculation: CalcJobNode, transport: Transport) -> str |
:param calculation: the instance of CalcJobNode to submit.
:param transport: an already opened transport to use to submit the calculation.
:return: the job id as returned by the scheduler `submit_from_script` call
:return: the job id as returned by the scheduler `submit_job` call
"""
job_id = calculation.get_job_id()

Expand All @@ -414,7 +414,7 @@ def submit_calculation(calculation: CalcJobNode, transport: Transport) -> str |

submit_script_filename = calculation.get_option('submit_script_filename')
workdir = calculation.get_remote_workdir()
result = scheduler.submit_from_script(workdir, submit_script_filename)
result = scheduler.submit_job(workdir, submit_script_filename)

if isinstance(result, str):
calculation.set_job_id(result)
Expand Down Expand Up @@ -572,7 +572,7 @@ def kill_calculation(calculation: CalcJobNode, transport: Transport) -> None:
scheduler.set_transport(transport)

# Call the proper kill method for the job ID of this calculation
result = scheduler.kill(job_id)
result = scheduler.kill_job(job_id)

if result is not True:
# Failed to kill because the job might have already been completed
Expand All @@ -581,10 +581,10 @@ def kill_calculation(calculation: CalcJobNode, transport: Transport) -> None:

# If the job is returned it is still running and the kill really failed, so we raise
if job is not None and job.job_state != JobState.DONE:
raise exceptions.RemoteOperationError(f'scheduler.kill({job_id}) was unsuccessful')
raise exceptions.RemoteOperationError(f'scheduler.kill_job({job_id}) was unsuccessful')
else:
EXEC_LOGGER.warning(
'scheduler.kill() failed but job<{%s}> no longer seems to be running regardless', job_id
'scheduler.kill_job() failed but job<{%s}> no longer seems to be running regardless', job_id
)


Expand Down
2 changes: 1 addition & 1 deletion src/aiida/engine/processes/calcjobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ async def execute(self) -> plumpy.process_states.State: # type: ignore[override
result = await self._launch_task(task_submit_job, node, transport_queue)

if isinstance(result, ExitCode):
# The scheduler plugin returned an exit code from ``Scheduler.submit_from_script`` indicating the
# The scheduler plugin returned an exit code from ``Scheduler.submit_job`` indicating the
# job submission failed due to a non-transient problem and the job should be terminated.
return self.create_state(ProcessState.RUNNING, self.process.terminate, result)

Expand Down
2 changes: 2 additions & 0 deletions src/aiida/schedulers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
# fmt: off

from .datastructures import *
from .plugins import *
from .scheduler import *

__all__ = (
'BashCliScheduler',
'JobInfo',
'JobResource',
'JobState',
Expand Down
3 changes: 3 additions & 0 deletions src/aiida/schedulers/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
from .bash import BashCliScheduler

__all__ = ('BashCliScheduler',)
123 changes: 123 additions & 0 deletions src/aiida/schedulers/plugins/bash.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
"""Job scheduler that is interacted with through a CLI in bash."""

from __future__ import annotations

import abc

from aiida.common.escaping import escape_for_bash
from aiida.engine.processes.exit_code import ExitCode
from aiida.schedulers.datastructures import JobInfo
from aiida.schedulers.scheduler import Scheduler, SchedulerError

__all__ = ('BashCliScheduler',)


class BashCliScheduler(Scheduler, metaclass=abc.ABCMeta):
"""Job scheduler that is interacted with through a CLI in bash."""

def submit_job(self, working_directory: str, filename: str) -> str | ExitCode:
"""Submit a job.
:param working_directory: The absolute filepath to the working directory where the job is to be exectued.
:param filename: The filename of the submission script relative to the working directory.
"""
self.transport.chdir(working_directory)
result = self.transport.exec_command_wait(self._get_submit_command(escape_for_bash(filename)))
return self._parse_submit_output(*result)

def get_jobs(
self,
jobs: list[str] | None = None,
user: str | None = None,
as_dict: bool = False,
) -> list[JobInfo] | dict[str, JobInfo]:
"""Return the list of currently active jobs.
:param jobs: A list of jobs to check; only these are checked.
:param user: A string with a user: only jobs of this user are checked.
:param as_dict: If ``False`` (default), a list of ``JobInfo`` objects is returned. If ``True``, a dictionary is
returned, where the ``job_id`` is the key and the values are the ``JobInfo`` objects.
:returns: List of active jobs.
"""
with self.transport:
retval, stdout, stderr = self.transport.exec_command_wait(self._get_joblist_command(jobs=jobs, user=user))

joblist = self._parse_joblist_output(retval, stdout, stderr)
if as_dict:
jobdict = {job.job_id: job for job in joblist}
if None in jobdict:
raise SchedulerError('Found at least one job without jobid')
return jobdict

return joblist

def kill_job(self, jobid: str) -> bool:
"""Kill a remote job and parse the return value of the scheduler to check if the command succeeded.
..note::
On some schedulers, even if the command is accepted, it may take some seconds for the job to actually
disappear from the queue.
:param jobid: the job ID to be killed
:returns: True if everything seems ok, False otherwise.
"""
retval, stdout, stderr = self.transport.exec_command_wait(self._get_kill_command(jobid))
return self._parse_kill_output(retval, stdout, stderr)

@abc.abstractmethod
def _get_submit_command(self, submit_script: str) -> str:
"""Return the string to execute to submit a given script.
.. warning:: the `submit_script` should already have been bash-escaped
:param submit_script: the path of the submit script relative to the working directory.
:return: the string to execute to submit a given script.
"""

@abc.abstractmethod
def _parse_submit_output(self, retval: int, stdout: str, stderr: str) -> str | ExitCode:
"""Parse the output of the submit command returned by calling the `_get_submit_command` command.
:return: a string with the job ID or an exit code if the submission failed because the submission script is
invalid and the job should be terminated.
"""

@abc.abstractmethod
def _get_joblist_command(self, jobs: list[str] | None = None, user: str | None = None) -> str:
"""Return the command to get the most complete description possible of currently active jobs.
.. note::
Typically one can pass only either jobs or user, depending on the specific plugin. The choice can be done
according to the value returned by `self.get_feature('can_query_by_user')`
:param jobs: either None to get a list of all jobs in the machine, or a list of jobs.
:param user: either None, or a string with the username (to show only jobs of the specific user).
"""

@abc.abstractmethod
def _parse_joblist_output(self, retval: int, stdout: str, stderr: str) -> list[JobInfo]:
"""Parse the joblist output as returned by executing the command returned by `_get_joblist_command` method.
:return: list of `JobInfo` objects, one of each job each with at least its default params implemented.
"""

@abc.abstractmethod
def _get_kill_command(self, jobid: str) -> str:
"""Return the command to kill the job with specified jobid."""

@abc.abstractmethod
def _parse_kill_output(self, retval: int, stdout: str, stderr: str) -> bool:
"""Parse the output of the kill command.
:return: True if everything seems ok, False otherwise.
"""
4 changes: 3 additions & 1 deletion src/aiida/schedulers/plugins/direct.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from aiida.schedulers import SchedulerError
from aiida.schedulers.datastructures import JobInfo, JobState, NodeNumberJobResource

from .bash import BashCliScheduler

## From the ps man page on Mac OS X 10.12
# state The state is given by a sequence of characters, for example,
# ``RWNA''. The first character indicates the run state of the
Expand Down Expand Up @@ -74,7 +76,7 @@ def accepts_default_memory_per_machine(cls):
return False


class DirectScheduler(aiida.schedulers.Scheduler):
class DirectScheduler(BashCliScheduler):
"""Support for the direct execution bypassing schedulers."""

_logger = aiida.schedulers.Scheduler._logger.getChild('direct')
Expand Down
7 changes: 5 additions & 2 deletions src/aiida/schedulers/plugins/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from aiida.schedulers import SchedulerError, SchedulerParsingError
from aiida.schedulers.datastructures import JobInfo, JobResource, JobState

from .bash import BashCliScheduler

# This maps LSF status codes to our own state list
#
# List of states from
Expand Down Expand Up @@ -167,9 +169,10 @@ def accepts_default_mpiprocs_per_machine(cls):
return False


class LsfScheduler(aiida.schedulers.Scheduler):
class LsfScheduler(BashCliScheduler):
"""Support for the IBM LSF scheduler
'https://www-01.ibm.com/support/knowledgecenter/SSETD4_9.1.2/lsf_welcome.html'
https://www-01.ibm.com/support/knowledgecenter/SSETD4_9.1.2/lsf_welcome.html
"""

_logger = aiida.schedulers.Scheduler._logger.getChild('lsf')
Expand Down
7 changes: 5 additions & 2 deletions src/aiida/schedulers/plugins/pbsbaseclasses.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
import logging

from aiida.common.escaping import escape_for_bash
from aiida.schedulers import Scheduler, SchedulerError, SchedulerParsingError
from aiida.schedulers import SchedulerError, SchedulerParsingError
from aiida.schedulers.datastructures import JobInfo, JobState, MachineInfo, NodeNumberJobResource

from .bash import BashCliScheduler

_LOGGER = logging.getLogger(__name__)

# This maps PbsPro status letters to our own status list
Expand Down Expand Up @@ -95,8 +97,9 @@ def validate_resources(cls, **kwargs):
return resources


class PbsBaseClass(Scheduler):
class PbsBaseClass(BashCliScheduler):
"""Base class with support for the PBSPro scheduler
(http://www.pbsworks.com/) and for PBS and Torque
(http://www.adaptivecomputing.com/products/open-source/torque/).
Expand Down
4 changes: 3 additions & 1 deletion src/aiida/schedulers/plugins/sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from aiida.schedulers import SchedulerError, SchedulerParsingError
from aiida.schedulers.datastructures import JobInfo, JobState, ParEnvJobResource

from .bash import BashCliScheduler

# 'http://www.loni.ucla.edu/twiki/bin/view/Infrastructure/GridComputing?skin=plain':
# Jobs Status:
# 'qw' - Queued and waiting,
Expand Down Expand Up @@ -88,7 +90,7 @@ class SgeJobResource(ParEnvJobResource):
pass


class SgeScheduler(aiida.schedulers.Scheduler):
class SgeScheduler(BashCliScheduler):
"""Support for the Sun Grid Engine scheduler and its variants/forks (Son of Grid Engine, Oracle Grid Engine, ...)"""

_logger = aiida.schedulers.Scheduler._logger.getChild('sge')
Expand Down
4 changes: 3 additions & 1 deletion src/aiida/schedulers/plugins/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from aiida.schedulers import Scheduler, SchedulerError
from aiida.schedulers.datastructures import JobInfo, JobState, NodeNumberJobResource

from .bash import BashCliScheduler

# This maps SLURM state codes to our own status list

## List of states from the man page of squeue
Expand Down Expand Up @@ -141,7 +143,7 @@ def validate_resources(cls, **kwargs):
return resources


class SlurmScheduler(Scheduler):
class SlurmScheduler(BashCliScheduler):
"""Support for the SLURM scheduler (http://slurm.schedmd.com/)."""

_logger = Scheduler._logger.getChild('slurm')
Expand Down
Loading

0 comments on commit 954cbdd

Please sign in to comment.