Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add cudaDeviceCount flag to the request requirements #1895

Merged
merged 1 commit into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 26 additions & 7 deletions cwltool/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from .command_line_tool import CallbackJob, ExpressionJob
from .context import RuntimeContext, getdefault
from .cuda import cuda_version_and_device_count
from .cwlprov.provenance_profile import ProvenanceProfile
from .errors import WorkflowException
from .job import JobBase
Expand Down Expand Up @@ -269,16 +270,22 @@ def __init__(self) -> None:

self.max_ram = int(psutil.virtual_memory().available / 2**20)
self.max_cores = float(psutil.cpu_count())
self.max_cuda = cuda_version_and_device_count()[1]
self.allocated_ram = float(0)
self.allocated_cores = float(0)
self.allocated_cuda: int = 0

def select_resources(
self, request: Dict[str, Union[int, float]], runtime_context: RuntimeContext
) -> Dict[str, Union[int, float]]: # pylint: disable=unused-argument
"""Naïve check for available cpu cores and memory."""
result: Dict[str, Union[int, float]] = {}
maxrsc = {"cores": self.max_cores, "ram": self.max_ram}
for rsc in ("cores", "ram"):
resources_types = {"cores", "ram"}
if "cudaDeviceCountMin" in request or "cudaDeviceCountMax" in request:
maxrsc["cudaDeviceCount"] = self.max_cuda
resources_types.add("cudaDeviceCount")
for rsc in resources_types:
rsc_min = request[rsc + "Min"]
if rsc_min > maxrsc[rsc]:
raise WorkflowException(
Expand All @@ -293,9 +300,6 @@ def select_resources(
result["tmpdirSize"] = math.ceil(request["tmpdirMin"])
result["outdirSize"] = math.ceil(request["outdirMin"])

if "cudaDeviceCount" in request:
result["cudaDeviceCount"] = request["cudaDeviceCount"]

return result

def _runner(
Expand Down Expand Up @@ -326,6 +330,10 @@ def _runner(
self.allocated_ram -= ram
cores = job.builder.resources["cores"]
self.allocated_cores -= cores
cudaDevices: int = cast(
int, job.builder.resources.get("cudaDeviceCount", 0)
)
self.allocated_cuda -= cudaDevices
runtime_context.workflow_eval_lock.notify_all()

def run_job(
Expand All @@ -349,34 +357,43 @@ def run_job(
if isinstance(job, JobBase):
ram = job.builder.resources["ram"]
cores = job.builder.resources["cores"]
if ram > self.max_ram or cores > self.max_cores:
cudaDevices = cast(int, job.builder.resources.get("cudaDeviceCount", 0))
if ram > self.max_ram or cores > self.max_cores or cudaDevices > self.max_cuda:
_logger.error(
'Job "%s" cannot be run, requests more resources (%s) '
"than available on this host (max ram %d, max cores %d",
"than available on this host (already allocated ram is %d, "
"allocated cores is %d, allocated CUDA is %d, "
"max ram %d, max cores %d, max CUDA %d).",
job.name,
job.builder.resources,
self.allocated_ram,
self.allocated_cores,
self.allocated_cuda,
self.max_ram,
self.max_cores,
self.max_cuda,
)
self.pending_jobs.remove(job)
return

if (
self.allocated_ram + ram > self.max_ram
or self.allocated_cores + cores > self.max_cores
or self.allocated_cuda + cudaDevices > self.max_cuda
):
_logger.debug(
'Job "%s" cannot run yet, resources (%s) are not '
"available (already allocated ram is %d, allocated cores is %d, "
"max ram %d, max cores %d",
"allocated CUDA devices is %d, "
"max ram %d, max cores %d, max CUDA %d).",
job.name,
job.builder.resources,
self.allocated_ram,
self.allocated_cores,
self.allocated_cuda,
self.max_ram,
self.max_cores,
self.max_cuda,
)
n += 1
continue
Expand All @@ -386,6 +403,8 @@ def run_job(
self.allocated_ram += ram
cores = job.builder.resources["cores"]
self.allocated_cores += cores
cuda = cast(int, job.builder.resources.get("cudaDevices", 0))
self.allocated_cuda += cuda
self.taskqueue.add(
functools.partial(self._runner, job, runtime_context, TMPDIR_LOCK),
runtime_context.workflow_eval_lock,
Expand Down
3 changes: 2 additions & 1 deletion cwltool/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -980,7 +980,8 @@ def evalResources(
):
if rsc is None:
continue
mn = mx = None # type: Optional[Union[int, float]]
mn: Optional[Union[int, float]] = None
mx: Optional[Union[int, float]] = None
if rsc.get(a + "Min"):
with SourceLine(rsc, f"{a}Min", WorkflowException, runtimeContext.debug):
mn = cast(
Expand Down