Skip to content

Commit

Permalink
Fix resource allocation for CUDA
Browse files Browse the repository at this point in the history
Co-authored-by: Nazanin Donyapour <[email protected]>
  • Loading branch information
mr-c and ndonyapour committed Aug 29, 2023
1 parent 509ffb9 commit e855fed
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 8 deletions.
33 changes: 26 additions & 7 deletions cwltool/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from .command_line_tool import CallbackJob, ExpressionJob
from .context import RuntimeContext, getdefault
from .cuda import cuda_version_and_device_count
from .cwlprov.provenance_profile import ProvenanceProfile
from .errors import WorkflowException
from .job import JobBase
Expand Down Expand Up @@ -269,16 +270,22 @@ def __init__(self) -> None:

self.max_ram = int(psutil.virtual_memory().available / 2**20)
self.max_cores = float(psutil.cpu_count())
self.max_cuda = cuda_version_and_device_count()[1]
self.allocated_ram = float(0)
self.allocated_cores = float(0)
self.allocated_cuda: int = 0

def select_resources(
self, request: Dict[str, Union[int, float]], runtime_context: RuntimeContext
) -> Dict[str, Union[int, float]]: # pylint: disable=unused-argument
"""Naïve check for available cpu cores and memory."""
result: Dict[str, Union[int, float]] = {}
maxrsc = {"cores": self.max_cores, "ram": self.max_ram}
for rsc in ("cores", "ram"):
resources_types = {"cores", "ram"}
if "cudaDeviceCountMin" in request or "cudaDeviceCountMax" in request:
maxrsc["cudaDeviceCount"] = self.max_cuda
resources_types.add("cudaDeviceCount")

Check warning on line 287 in cwltool/executors.py

View check run for this annotation

Codecov / codecov/patch

cwltool/executors.py#L286-L287

Added lines #L286 - L287 were not covered by tests
for rsc in resources_types:
rsc_min = request[rsc + "Min"]
if rsc_min > maxrsc[rsc]:
raise WorkflowException(
Expand All @@ -293,9 +300,6 @@ def select_resources(
result["tmpdirSize"] = math.ceil(request["tmpdirMin"])
result["outdirSize"] = math.ceil(request["outdirMin"])

if "cudaDeviceCount" in request:
result["cudaDeviceCount"] = request["cudaDeviceCount"]

return result

def _runner(
Expand Down Expand Up @@ -326,6 +330,10 @@ def _runner(
self.allocated_ram -= ram
cores = job.builder.resources["cores"]
self.allocated_cores -= cores
cudaDevices: int = cast(
int, job.builder.resources.get("cudaDeviceCount", 0)
)
self.allocated_cuda -= cudaDevices
runtime_context.workflow_eval_lock.notify_all()

def run_job(
Expand All @@ -349,34 +357,43 @@ def run_job(
if isinstance(job, JobBase):
ram = job.builder.resources["ram"]
cores = job.builder.resources["cores"]
if ram > self.max_ram or cores > self.max_cores:
cudaDevices = cast(int, job.builder.resources.get("cudaDeviceCount", 0))
if ram > self.max_ram or cores > self.max_cores or cudaDevices > self.max_cuda:
_logger.error(
'Job "%s" cannot be run, requests more resources (%s) '
"than available on this host (max ram %d, max cores %d",
"than available on this host (already allocated ram is %d, "
"allocated cores is %d, allocated CUDA is %d, "
"max ram %d, max cores %d, max CUDA %d).",
job.name,
job.builder.resources,
self.allocated_ram,
self.allocated_cores,
self.allocated_cuda,
self.max_ram,
self.max_cores,
self.max_cuda,
)
self.pending_jobs.remove(job)
return

if (
self.allocated_ram + ram > self.max_ram
or self.allocated_cores + cores > self.max_cores
or self.allocated_cuda + cudaDevices > self.max_cuda
):
_logger.debug(
'Job "%s" cannot run yet, resources (%s) are not '
"available (already allocated ram is %d, allocated cores is %d, "
"max ram %d, max cores %d",
"allocated CUDA devices is %d, "
"max ram %d, max cores %d, max CUDA %d).",
job.name,
job.builder.resources,
self.allocated_ram,
self.allocated_cores,
self.allocated_cuda,
self.max_ram,
self.max_cores,
self.max_cuda,
)
n += 1
continue
Expand All @@ -386,6 +403,8 @@ def run_job(
self.allocated_ram += ram
cores = job.builder.resources["cores"]
self.allocated_cores += cores
cuda = cast(int, job.builder.resources.get("cudaDevices", 0))
self.allocated_cuda += cuda
self.taskqueue.add(
functools.partial(self._runner, job, runtime_context, TMPDIR_LOCK),
runtime_context.workflow_eval_lock,
Expand Down
3 changes: 2 additions & 1 deletion cwltool/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -980,7 +980,8 @@ def evalResources(
):
if rsc is None:
continue
mn = mx = None # type: Optional[Union[int, float]]
mn: Optional[Union[int, float]] = None
mx: Optional[Union[int, float]] = None
if rsc.get(a + "Min"):
with SourceLine(rsc, f"{a}Min", WorkflowException, runtimeContext.debug):
mn = cast(
Expand Down

0 comments on commit e855fed

Please sign in to comment.