Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding new choice to --on-error #1974

Open
wants to merge 30 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
572e865
Adding inputs for new "kill" mode of the on-error parameter
AlexTate Feb 3, 2024
d7b7712
Adding kill switch trigger. If a job's processStatus != "success" and…
AlexTate Feb 3, 2024
788a89d
Actively running jobs respond to the kill switch by checking the swit…
AlexTate Feb 3, 2024
de73938
When the WorkflowKillSwitch exception reaches the TaskQueue, try to p…
AlexTate Feb 3, 2024
a9a2ab6
Propagation of the WorkflowKillSwitch exception stops once it reaches…
AlexTate Feb 4, 2024
0a00582
Don't create runtime_context.kill_switch by default
mr-c Apr 17, 2024
beaa3f4
This commit brings kill switch initialization and monitoring to the T…
AlexTate Jul 31, 2024
b52333d
JobBase._execute() previously skipped some important post-job actions…
AlexTate Jul 31, 2024
a1ab910
Adding a unit test. Two outcomes are measured:
AlexTate Aug 7, 2024
5924cfe
Formatting corrections provided by `make cleanup` which are relevant …
AlexTate Aug 7, 2024
1bee55f
cleanups
mr-c Aug 7, 2024
7eb2c0d
DO NOT MERGE, let all tests run
mr-c Aug 7, 2024
03cde29
Removing irrelevant debug line from test body
AlexTate Oct 22, 2024
2b9d76a
Removing CPU resource constraints for the test. My intention is to ha…
AlexTate Oct 22, 2024
36f12cf
Using a different approach to remove the "one job per core" resource …
AlexTate Oct 23, 2024
791fe7b
Adding suggested changes to the --on-error help string per @mr-c. For…
AlexTate Oct 23, 2024
93948fb
Missed running `make mypy` before pushing last commit. Opting to disa…
AlexTate Oct 23, 2024
d938bf8
don't cancel jobs; typecheck the test just in case
mr-c Oct 24, 2024
5d92d2d
test_on_error_kill: allow for some longer runtime due to busy systems
mr-c Oct 24, 2024
fe53fb8
Fixing ReceiveScatterOutput to allow outputs to be collected from suc…
AlexTate Nov 12, 2024
b639dfa
Simplifying value checks for RuntimeContext.on_error in parallel_step…
AlexTate Nov 12, 2024
8e0b6f2
Adding __repr__ to WorkflowStep, WorkflowJob, and WorkflowJobStep cla…
AlexTate Nov 12, 2024
32b22d8
It is critical for WorkflowJob.processStatus to update in WorkflowJob…
AlexTate Nov 12, 2024
6485cff
Simplifying the approach in on-error_kill.cwl. It now activates the k…
AlexTate Nov 12, 2024
beb6af7
Reformatting and type hint updates
AlexTate Nov 12, 2024
50f2793
Formatting mistake fix
AlexTate Nov 12, 2024
20fd703
Adding instance ID to repr strings, improving readability of Workflow…
AlexTate Nov 12, 2024
e8f9284
Changing TaskQueue constructor to accept the kill_switch threading.Ev…
AlexTate Nov 12, 2024
f17f678
codecov: don't fail CI if there is an error
mr-c Nov 13, 2024
e139fae
Merge branch 'main' into on-error-abort
mr-c Nov 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cwltool/argparser.py
Original file line number Diff line number Diff line change
Expand Up @@ -596,10 +596,11 @@ def arg_parser() -> argparse.ArgumentParser:
parser.add_argument(
"--on-error",
help="Desired workflow behavior when a step fails. One of 'stop' (do "
"not submit any more steps) or 'continue' (may submit other steps that "
"are not downstream from the error). Default is 'stop'.",
"not submit any more steps), 'continue' (may submit other steps that "
"are not downstream from the error), or 'kill' (same as 'stop', but also "
"terminates running jobs in the active step(s)). Default is 'stop'.",
default="stop",
choices=("stop", "continue"),
choices=("stop", "continue", "kill"),
)

checkgroup = parser.add_mutually_exclusive_group()
Expand Down
3 changes: 2 additions & 1 deletion cwltool/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def __init__(self, kwargs: Optional[dict[str, Any]] = None) -> None:
self.select_resources: Optional[select_resources_callable] = None
self.eval_timeout: float = 60
self.postScatterEval: Optional[Callable[[CWLObjectType], Optional[CWLObjectType]]] = None
self.on_error: Union[Literal["stop"], Literal["continue"]] = "stop"
self.on_error: Union[Literal["stop"], Literal["continue"], Literal["kill"]] = "stop"
self.strict_memory_limit: bool = False
self.strict_cpu_limit: bool = False
self.cidfile_dir: Optional[str] = None
Expand All @@ -189,6 +189,7 @@ def __init__(self, kwargs: Optional[dict[str, Any]] = None) -> None:
self.default_stderr: Optional[Union[IO[bytes], TextIO]] = None
self.validate_only: bool = False
self.validate_stdout: Optional["SupportsWrite[str]"] = None
self.kill_switch: Optional[threading.Event] = None
super().__init__(kwargs)
if self.tmp_outdir_prefix == "":
self.tmp_outdir_prefix = self.tmpdir_prefix
Expand Down
13 changes: 13 additions & 0 deletions cwltool/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,16 @@

class ArgumentException(Exception):
"""Mismatched command line arguments provided."""


class WorkflowKillSwitch(Exception):
"""When processStatus != "success" and on-error=kill, raise this exception."""

def __init__(self, job_id: str, rcode: int) -> None:
"""Record the job identifier and the error code."""
self.job_id = job_id
self.rcode = rcode

def __str__(self) -> str:
"""Represent this exception as a string."""
return f"[job {self.job_id}] activated kill switch with return code {self.rcode}"

Check warning on line 33 in cwltool/errors.py

View check run for this annotation

Codecov / codecov/patch

cwltool/errors.py#L33

Added line #L33 was not covered by tests
25 changes: 21 additions & 4 deletions cwltool/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from .context import RuntimeContext, getdefault
from .cuda import cuda_version_and_device_count
from .cwlprov.provenance_profile import ProvenanceProfile
from .errors import WorkflowException
from .errors import WorkflowException, WorkflowKillSwitch
from .job import JobBase
from .loghandler import _logger
from .mutation import MutationManager
Expand Down Expand Up @@ -102,6 +102,7 @@
runtime_context.mutation_manager = MutationManager()
runtime_context.toplevel = True
runtime_context.workflow_eval_lock = threading.Condition(threading.RLock())
runtime_context.kill_switch = threading.Event()

job_reqs: Optional[list[CWLObjectType]] = None
if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
Expand Down Expand Up @@ -251,6 +252,11 @@
WorkflowException,
): # pylint: disable=try-except-raise
raise
except WorkflowKillSwitch as err:
_logger.error(

Check warning on line 256 in cwltool/executors.py

View check run for this annotation

Codecov / codecov/patch

cwltool/executors.py#L256

Added line #L256 was not covered by tests
f"Workflow kill switch activated by [job {err.job_id}] "
f"because on-error={runtime_context.on_error}"
)
except Exception as err:
logger.exception("Got workflow error")
raise WorkflowException(str(err)) from err
Expand Down Expand Up @@ -323,6 +329,11 @@
except WorkflowException as err:
_logger.exception(f"Got workflow error: {err}")
self.exceptions.append(err)
except WorkflowKillSwitch as err:
_logger.error(
f"Workflow kill switch activated by [job {err.job_id}] "
f"because on-error={runtime_context.on_error}"
)
except Exception as err: # pylint: disable=broad-except
_logger.exception(f"Got workflow error: {err}")
self.exceptions.append(WorkflowException(str(err)))
Expand Down Expand Up @@ -429,7 +440,13 @@
logger: logging.Logger,
runtime_context: RuntimeContext,
) -> None:
self.taskqueue: TaskQueue = TaskQueue(threading.Lock(), int(math.ceil(self.max_cores)))
if runtime_context.kill_switch is None:
runtime_context.kill_switch = threading.Event()

Check warning on line 444 in cwltool/executors.py

View check run for this annotation

Codecov / codecov/patch

cwltool/executors.py#L444

Added line #L444 was not covered by tests

self.taskqueue: TaskQueue = TaskQueue(
threading.Lock(), int(math.ceil(self.max_cores)), runtime_context.kill_switch
)

try:
jobiter = process.job(job_order_object, self.output_callback, runtime_context)

Expand Down Expand Up @@ -457,9 +474,9 @@
while self.taskqueue.in_flight > 0:
self.wait_for_next_completion(runtime_context)
self.run_job(None, runtime_context)

runtime_context.workflow_eval_lock.release()
finally:
if (lock := runtime_context.workflow_eval_lock) is not None:
lock.release()
self.taskqueue.drain()
self.taskqueue.join()

Expand Down
Loading