Skip to content

Commit

Permalink
Don't create runtime_context.kill_switch by default
Browse files Browse the repository at this point in the history
So that the runtime_context object can still be pickled.

Other cleanups
  • Loading branch information
mr-c committed Apr 17, 2024
1 parent 27e224e commit 68c4ebd
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 10 deletions.
4 changes: 2 additions & 2 deletions cwltool/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def __init__(self, kwargs: Optional[Dict[str, Any]] = None) -> None:
self.select_resources: Optional[select_resources_callable] = None
self.eval_timeout: float = 60
self.postScatterEval: Optional[Callable[[CWLObjectType], Optional[CWLObjectType]]] = None
self.on_error: Union[Literal["stop"], Literal["continue"]] = "stop"
self.on_error: Union[Literal["stop"], Literal["continue"], Literal["kill"]] = "stop"
self.strict_memory_limit: bool = False
self.strict_cpu_limit: bool = False
self.cidfile_dir: Optional[str] = None
Expand All @@ -200,7 +200,7 @@ def __init__(self, kwargs: Optional[Dict[str, Any]] = None) -> None:
self.default_stderr: Optional[Union[IO[bytes], TextIO]] = None
self.validate_only: bool = False
self.validate_stdout: Optional[Union[IO[bytes], TextIO, IO[str]]] = None
self.kill_switch = threading.Event()
self.kill_switch: Optional[threading.Event] = None
super().__init__(kwargs)
if self.tmp_outdir_prefix == "":
self.tmp_outdir_prefix = self.tmpdir_prefix
Expand Down
8 changes: 5 additions & 3 deletions cwltool/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ class GraphTargetMissingException(WorkflowException):
class WorkflowKillSwitch(Exception):
"""When processStatus != "success" and on-error=kill, raise this exception."""

def __init__(self, job_id, rcode):
def __init__(self, job_id: str, rcode: int) -> None:
"""Record the job identifier and the error code."""
self.job_id = job_id
self.rcode = rcode

Check warning on line 23 in cwltool/errors.py

View check run for this annotation

Codecov / codecov/patch

cwltool/errors.py#L22-L23

Added lines #L22 - L23 were not covered by tests

def __str__(self):
return f'[job {self.job_id}] activated kill switch with return code {self.rcode}'
def __str__(self) -> str:
"""Represent this exception as a string."""
return f"[job {self.job_id}] activated kill switch with return code {self.rcode}"

Check warning on line 27 in cwltool/errors.py

View check run for this annotation

Codecov / codecov/patch

cwltool/errors.py#L27

Added line #L27 was not covered by tests
3 changes: 2 additions & 1 deletion cwltool/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,8 @@ def run_jobs(
self.wait_for_next_completion(runtime_context)
self.run_job(None, runtime_context)
finally:
runtime_context.workflow_eval_lock.release()
if (lock := runtime_context.workflow_eval_lock) is not None:
lock.release()
self.taskqueue.drain()
self.taskqueue.join()

Expand Down
13 changes: 10 additions & 3 deletions cwltool/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,9 @@ def _execute(
runtime: List[str],
env: MutableMapping[str, str],
runtimeContext: RuntimeContext,
monitor_function: Optional[Callable[["subprocess.Popen[str]"], None]] = None,
monitor_function: Optional[
Callable[["subprocess.Popen[str]", threading.Event], None]
] = None,
) -> None:
"""Execute the tool, either directly or via script.
Expand Down Expand Up @@ -333,6 +335,10 @@ def stderr_stdout_log_path(
builder: Optional[Builder] = getattr(self, "builder", None)
if builder is not None:
job_script_contents = builder.build_job_script(commands)
if runtimeContext.kill_switch is None:
runtimeContext.kill_switch = kill_switch = threading.Event()
else:
kill_switch = runtimeContext.kill_switch
rcode = _job_popen(
commands,
stdin_path=stdin_path,
Expand All @@ -341,7 +347,7 @@ def stderr_stdout_log_path(
env=env,
cwd=self.outdir,
make_job_dir=lambda: runtimeContext.create_outdir(),
kill_switch=runtimeContext.kill_switch,
kill_switch=kill_switch,
job_script_contents=job_script_contents,
timelimit=self.timelimit,
name=self.name,
Expand Down Expand Up @@ -547,7 +553,8 @@ def monitor_kill_switch() -> None:
nonlocal ks_tm
if kill_switch.is_set():
_logger.error("[job %s] terminating by kill switch", self.name)

Check warning on line 555 in cwltool/job.py

View check run for this annotation

Codecov / codecov/patch

cwltool/job.py#L555

Added line #L555 was not covered by tests
if sproc.stdin: sproc.stdin.close()
if sproc.stdin:
sproc.stdin.close()
sproc.terminate()

Check warning on line 558 in cwltool/job.py

View check run for this annotation

Codecov / codecov/patch

cwltool/job.py#L557-L558

Added lines #L557 - L558 were not covered by tests
else:
ks_tm = Timer(interval=1, function=monitor_kill_switch)
Expand Down
2 changes: 1 addition & 1 deletion cwltool/task_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import threading
from typing import Callable, Optional

from .loghandler import _logger
from .errors import WorkflowKillSwitch
from .loghandler import _logger


class TaskQueue:
Expand Down

0 comments on commit 68c4ebd

Please sign in to comment.