From f8a0bfc3d44ff818a99b3f935c58360f485def97 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20K=C3=B6ster?= Date: Thu, 11 Apr 2024 17:37:17 +0200 Subject: [PATCH] fix: cgroup confinement (#23) fix #41 --------- Co-authored-by: meesters Co-authored-by: Christian Meesters --- .../__init__.py | 24 ++++++++++++++++++- tests/test_github_issue41/Snakefile | 10 ++++++++ .../expected_results/1.out | 0 tests/tests.py | 4 ++++ 4 files changed, 37 insertions(+), 1 deletion(-) create mode 100644 tests/test_github_issue41/Snakefile create mode 100644 tests/test_github_issue41/expected_results/1.out diff --git a/snakemake_executor_plugin_slurm_jobstep/__init__.py b/snakemake_executor_plugin_slurm_jobstep/__init__.py index dd59cdc..fa4d362 100644 --- a/snakemake_executor_plugin_slurm_jobstep/__init__.py +++ b/snakemake_executor_plugin_slurm_jobstep/__init__.py @@ -13,6 +13,7 @@ JobExecutorInterface, ) from snakemake_interface_executor_plugins.settings import ExecMode, CommonSettings +from snakemake_interface_common.exceptions import WorkflowError # Required: @@ -111,7 +112,15 @@ def run_job(self, job: JobExecutorInterface): # The -n1 is important to avoid that srun executes the given command # multiple times, depending on the relation between # cpus per task and the number of CPU cores. - call = f"srun -n1 --cpu-bind=q {self.format_job_exec(job)}" + + # as of v22.11.0, the --cpu-per-task flag is needed to ensure that + # the job can utilize the c-group's resources. + # We set the limitation accordingly, assuming the submit executor + # has set the resources correctly. + + call = "srun -n1 --cpu-bind=q " + call += f"--cpus-per-task {get_cpus_per_task(job)} " + call += f"{self.format_job_exec(job)}" self.logger.debug(f"This job is a group job: {job.is_group()}") self.logger.debug(f"The call for this job is: {call}") @@ -144,3 +153,16 @@ def cores(self): def get_exec_mode(self) -> ExecMode: return ExecMode.REMOTE + + +def get_cpus_per_task(job: JobExecutorInterface): + cpus_per_task = job.threads + if job.resources.get("cpus_per_task"): + if not isinstance(cpus_per_task, int): + raise WorkflowError( + f"cpus_per_task must be an integer, but is {cpus_per_task}" + ) + cpus_per_task = job.resources.cpus_per_task + # ensure that at least 1 cpu is requested + # because 0 is not allowed by slurm + return max(1, cpus_per_task) diff --git a/tests/test_github_issue41/Snakefile b/tests/test_github_issue41/Snakefile new file mode 100644 index 0000000..33bef95 --- /dev/null +++ b/tests/test_github_issue41/Snakefile @@ -0,0 +1,10 @@ +rule all: + input: "1.out" + +rule test1: + output: "1.out" + #threads: 2 + resources: + cpus_per_task=1 + shell: "touch $SLURM_CPUS_PER_TASK.out" + diff --git a/tests/test_github_issue41/expected_results/1.out b/tests/test_github_issue41/expected_results/1.out new file mode 100644 index 0000000..e69de29 diff --git a/tests/tests.py b/tests/tests.py index 935c1fd..9740753 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -12,3 +12,7 @@ def get_executor(self) -> str: def get_executor_settings(self) -> Optional[ExecutorSettingsBase]: # instatiate ExecutorSettings of this plugin as appropriate return None + + +# def test_issue_41(): +# run(dpath("test_github_issue41"))