From 64d608da3a48ebbcd98893ba4ea83be2b277f945 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Thu, 15 Jun 2023 17:41:17 +0200 Subject: [PATCH 01/16] Delete non-terminal jobs and subworkflow invocation when cancelling an invocation --- lib/galaxy/managers/workflows.py | 20 ++++++++-- lib/galaxy_test/api/test_workflows.py | 57 +++++++++++++++++++++++++++ lib/galaxy_test/base/populators.py | 12 ++++++ 3 files changed, 85 insertions(+), 4 deletions(-) diff --git a/lib/galaxy/managers/workflows.py b/lib/galaxy/managers/workflows.py index 345320413547..c5b32f0b3ff0 100644 --- a/lib/galaxy/managers/workflows.py +++ b/lib/galaxy/managers/workflows.py @@ -421,7 +421,7 @@ def get_invocation_report(self, trans, invocation_id, **kwd): target_format=target_format, ) - def cancel_invocation(self, trans, decoded_invocation_id): + def cancel_invocation(self, trans, decoded_invocation_id: int): workflow_invocation = self.get_invocation(trans, decoded_invocation_id) cancelled = workflow_invocation.cancel() @@ -430,9 +430,21 @@ def cancel_invocation(self, trans, decoded_invocation_id): trans.sa_session.add(workflow_invocation) with transaction(trans.sa_session): trans.sa_session.commit() - else: - # TODO: More specific exception? - raise exceptions.MessageException("Cannot cancel an inactive workflow invocation.") + + for step in workflow_invocation.steps: + for job in step.jobs: + job.mark_deleted() + trans.sa_session.add(job) + if step.implicit_collection_jobs: + for icjja in step.implicit_collection_jobs.jobs: + icjja.job.mark_deleted() + trans.sa_session.add(icjja.job) + + with transaction(trans.sa_session): + trans.sa_session.commit() + + for invocation in workflow_invocation.subworkflow_invocations: + self.cancel_invocation(trans, invocation.subworkflow_invocation_id) return workflow_invocation diff --git a/lib/galaxy_test/api/test_workflows.py b/lib/galaxy_test/api/test_workflows.py index 276b989460ca..dcc88c1bcc82 100644 --- a/lib/galaxy_test/api/test_workflows.py +++ b/lib/galaxy_test/api/test_workflows.py @@ -4040,6 +4040,63 @@ def test_cancel_workflow_invocation(self): message = invocation["messages"][0] assert message["reason"] == "user_request" + @skip_without_tool("collection_creates_dynamic_nested") + def test_cancel_workflow_invocation_deletes_jobs(self): + with self.dataset_populator.test_history() as history_id: + summary = self._run_workflow( + """ +class: GalaxyWorkflow +inputs: + list_input: + type: collection + collection_type: list +steps: + first_step: + tool_id: cat_data_and_sleep + in: + input1: list_input + state: + sleep_time: 60 + subworkflow_step: + run: + class: GalaxyWorkflow + inputs: + list_input: + type: collection + collection_type: list + steps: + intermediate_step: + tool_id: identifier_multiple + in: + input1: list_input + subworkflow: + in: + list_input: first_step/out_file1 +test_data: + list_input: + collection_type: list + elements: + - identifier: 1 + content: A + - identifier: 2 + content: B +""", + history_id=history_id, + wait=False, + ) + # wait_for_invocation just waits until scheduling complete, not jobs or subworkflow invocations + self.workflow_populator.wait_for_invocation("null", summary.invocation_id, assert_ok=True) + invocation_before_cancellation = self.workflow_populator.get_invocation(summary.invocation_id) + assert invocation_before_cancellation["state"] == "scheduled" + subworkflow_invocation_id = invocation_before_cancellation["steps"][2]["subworkflow_invocation_id"] + self.workflow_populator.cancel_invocation(summary.invocation_id) + invocation_jobs = self.workflow_populator.get_invocation_jobs(summary.invocation_id) + for job in invocation_jobs: + assert job["state"] == "deleted" or job["state"] == "deleted_new" + subworkflow_invocation_jobs = self.workflow_populator.get_invocation_jobs(subworkflow_invocation_id) + for job in subworkflow_invocation_jobs: + assert job["state"] == "deleted" or job["state"] == "deleted_new" + def test_workflow_failed_output_not_found(self, history_id): summary = self._run_workflow( """ diff --git a/lib/galaxy_test/base/populators.py b/lib/galaxy_test/base/populators.py index 48631597179a..4010cff642de 100644 --- a/lib/galaxy_test/base/populators.py +++ b/lib/galaxy_test/base/populators.py @@ -1697,6 +1697,11 @@ def workflow_invocations(self, workflow_id: str) -> List[Dict[str, Any]]: api_asserts.assert_status_code_is(response, 200) return response.json() + def cancel_invocation(self, invocation_id: str): + response = self._delete(f"invocations/{invocation_id}") + api_asserts.assert_status_code_is(response, 200) + return response.json() + def history_invocations(self, history_id: str) -> List[Dict[str, Any]]: history_invocations_response = self._get("invocations", {"history_id": history_id}) api_asserts.assert_status_code_is(history_invocations_response, 200) @@ -2079,6 +2084,13 @@ def setup_workflow_run( return workflow_request, history_id, workflow_id + def get_invocation_jobs(self, invocation_id: str) -> List[Dict[str, Any]]: + jobs_response = self._get("jobs", data={"invocation_id": invocation_id}) + api_asserts.assert_status_code_is(jobs_response, 200) + jobs = jobs_response.json() + assert isinstance(jobs, list) + return jobs + def wait_for_invocation_and_jobs( self, history_id: str, workflow_id: str, invocation_id: str, assert_ok: bool = True ) -> None: From a5f81c4c2ba308b74e91eb00bb1cf9cc4e8282ef Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Fri, 16 Jun 2023 08:58:33 +0200 Subject: [PATCH 02/16] Display cancel button while invocation jobs not terminal --- .../WorkflowInvocationState/WorkflowInvocationSummary.vue | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/src/components/WorkflowInvocationState/WorkflowInvocationSummary.vue b/client/src/components/WorkflowInvocationState/WorkflowInvocationSummary.vue index ca3fb191c70f..3b30faf87d73 100644 --- a/client/src/components/WorkflowInvocationState/WorkflowInvocationSummary.vue +++ b/client/src/components/WorkflowInvocationState/WorkflowInvocationSummary.vue @@ -15,7 +15,7 @@ -
+
From db3c875a8bd2698ab0976c13a28fc9ca05014322 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Wed, 21 Jun 2023 15:27:28 +0200 Subject: [PATCH 03/16] Replace for loop with sql update statement --- lib/galaxy/managers/workflows.py | 29 +++++++++++++++++++-------- lib/galaxy/model/__init__.py | 2 +- lib/galaxy_test/api/test_workflows.py | 4 ++-- 3 files changed, 24 insertions(+), 11 deletions(-) diff --git a/lib/galaxy/managers/workflows.py b/lib/galaxy/managers/workflows.py index c5b32f0b3ff0..3cac1c96f1ea 100644 --- a/lib/galaxy/managers/workflows.py +++ b/lib/galaxy/managers/workflows.py @@ -31,6 +31,7 @@ or_, select, true, + update, ) from sqlalchemy.orm import ( aliased, @@ -431,14 +432,26 @@ def cancel_invocation(self, trans, decoded_invocation_id: int): with transaction(trans.sa_session): trans.sa_session.commit() - for step in workflow_invocation.steps: - for job in step.jobs: - job.mark_deleted() - trans.sa_session.add(job) - if step.implicit_collection_jobs: - for icjja in step.implicit_collection_jobs.jobs: - icjja.job.mark_deleted() - trans.sa_session.add(icjja.job) + job_subq = ( + trans.sa_session.query(model.Job.id) + .join(model.WorkflowInvocationStep) + .filter(model.WorkflowInvocationStep.workflow_invocation_id == decoded_invocation_id) + .filter(~model.Job.table.c.state.in_(model.Job.terminal_states)) + .with_for_update().scalar_subquery() + ) + trans.sa_session.execute(update(model.Job.table).where(model.Job.id == job_subq).values({"state": model.Job.states.DELETING})) + + job_collection_subq = ( + trans.sa_session.query(model.Job.id) + .join(model.ImplicitCollectionJobsJobAssociation) + .join(model.ImplicitCollectionJobs) + .join(model.WorkflowInvocationStep, model.WorkflowInvocationStep.implicit_collection_jobs_id == model.ImplicitCollectionJobs.id) + .filter(model.WorkflowInvocationStep.workflow_invocation_id == decoded_invocation_id) + .filter(~model.Job.table.c.state.in_(model.Job.terminal_states)) + .with_for_update().subquery() + ) + + trans.sa_session.execute(update(model.Job.table).where(model.Job.table.c.id.in_(job_collection_subq)).values({"state": model.Job.states.DELETING})) with transaction(trans.sa_session): trans.sa_session.commit() diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index a0199e157afb..b16467a700f7 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -1366,7 +1366,7 @@ class Job(Base, JobLike, UsesCreateAndUpdateTime, Dictifiable, Serializable): states = JobState - terminal_states = [states.OK, states.ERROR, states.DELETED] + terminal_states = [states.OK, states.ERROR, states.DELETED, states.DELETING] #: job states where the job hasn't finished and the model may still change non_ready_states = [ states.NEW, diff --git a/lib/galaxy_test/api/test_workflows.py b/lib/galaxy_test/api/test_workflows.py index dcc88c1bcc82..4b8a418b0c75 100644 --- a/lib/galaxy_test/api/test_workflows.py +++ b/lib/galaxy_test/api/test_workflows.py @@ -4092,10 +4092,10 @@ def test_cancel_workflow_invocation_deletes_jobs(self): self.workflow_populator.cancel_invocation(summary.invocation_id) invocation_jobs = self.workflow_populator.get_invocation_jobs(summary.invocation_id) for job in invocation_jobs: - assert job["state"] == "deleted" or job["state"] == "deleted_new" + assert job["state"] == "deleted" or job["state"] == "deleting" subworkflow_invocation_jobs = self.workflow_populator.get_invocation_jobs(subworkflow_invocation_id) for job in subworkflow_invocation_jobs: - assert job["state"] == "deleted" or job["state"] == "deleted_new" + assert job["state"] == "deleted" or job["state"] == "deleting" def test_workflow_failed_output_not_found(self, history_id): summary = self._run_workflow( From d8bc1970301f7c5166117a4cf19d262e7a1a9030 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Wed, 21 Jun 2023 16:59:46 +0200 Subject: [PATCH 04/16] Make job terminal check in JobWrapper.change_state more robust By always fetching the job state from the database. --- lib/galaxy/jobs/__init__.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/lib/galaxy/jobs/__init__.py b/lib/galaxy/jobs/__init__.py index c26afff39104..2e7be38894f2 100644 --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -1529,13 +1529,14 @@ def mark_as_resubmitted(self, info=None): self.sa_session.commit() def change_state(self, state, info=False, flush=True, job=None): - job_supplied = job is not None - if not job_supplied: + if job is None: job = self.get_job() self.sa_session.refresh(job) - # Else: - # If this is a new job (e.g. initially queued) - we are in the same - # thread and no other threads are working on the job yet - so don't refresh. + else: + # job attributes may have been changed, so we can't refresh here, + # but we want to make sure that the terminal state check below works + # on the current job state value to minimize race conditions. + self.sa_session.expire(job, ["state"]) if job.state in model.Job.terminal_states: log.warning( From ee7160bb978e9fd072192026f5fffc35c511cc66 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Wed, 21 Jun 2023 20:23:07 +0200 Subject: [PATCH 05/16] More WIP stuff --- lib/galaxy/managers/workflows.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/galaxy/managers/workflows.py b/lib/galaxy/managers/workflows.py index 3cac1c96f1ea..fda3faf7ac6b 100644 --- a/lib/galaxy/managers/workflows.py +++ b/lib/galaxy/managers/workflows.py @@ -439,6 +439,7 @@ def cancel_invocation(self, trans, decoded_invocation_id: int): .filter(~model.Job.table.c.state.in_(model.Job.terminal_states)) .with_for_update().scalar_subquery() ) + log.debug("CANCEL STEP JOBS") trans.sa_session.execute(update(model.Job.table).where(model.Job.id == job_subq).values({"state": model.Job.states.DELETING})) job_collection_subq = ( @@ -451,7 +452,8 @@ def cancel_invocation(self, trans, decoded_invocation_id: int): .with_for_update().subquery() ) - trans.sa_session.execute(update(model.Job.table).where(model.Job.table.c.id.in_(job_collection_subq)).values({"state": model.Job.states.DELETING})) + log.debug("CANCEL STEP IMPLICIT JOBS") + trans.sa_session.execute(update(model.Job.table).where(model.Job.table.c.id.in_(job_collection_subq.element)).values({"state": model.Job.states.DELETING})) with transaction(trans.sa_session): trans.sa_session.commit() From 2df6048550c9a2082eedfac80c86af6f28ef35c8 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Tue, 10 Oct 2023 15:02:34 -0400 Subject: [PATCH 06/16] Make state change logic atomic --- lib/galaxy/jobs/__init__.py | 5 +++-- lib/galaxy/model/__init__.py | 23 ++++++++++++++++++++--- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/lib/galaxy/jobs/__init__.py b/lib/galaxy/jobs/__init__.py index 2e7be38894f2..e2d9cf909222 100644 --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -1548,9 +1548,10 @@ def change_state(self, state, info=False, flush=True, job=None): return if info: job.info = info - job.set_state(state) + state_changed = job.set_state(state) self.sa_session.add(job) - job.update_output_states(self.app.application_stack.supports_skip_locked()) + if state_changed: + job.update_output_states(self.app.application_stack.supports_skip_locked()) if flush: with transaction(self.sa_session): self.sa_session.commit() diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index b16467a700f7..28229cae905c 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -1636,10 +1636,27 @@ def all_entry_points_configured(self): def set_state(self, state): """ - Save state history + Save state history. Returns True if stat has changed, else False """ - self.state = state - self.state_history.append(JobStateHistory(self)) + if self.state == state: + # Nothing changed, no action needed + return False + session = object_session(self) + if session and self.id and not state in Job.terminal_states: + # generate statement that will not revert DELETING or DELETED back to anything non-terminal + rval = session.execute(update(Job.table).where( + Job.table.c.id == self.id, + ~Job.table.c.state.in_((Job.states.DELETING, Job.states.DELETED)) + ).values(state=state)) + if rval.rowcount == 1: + self.state_history.append(JobStateHistory(self)) + return True + else: + return False + else: + self.state = state + self.state_history.append(JobStateHistory(self)) + return True def get_param_values(self, app, ignore_errors=False): """ From 1a444e2d84fc18d5c8334e72d3f384a19d66cdca Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Tue, 10 Oct 2023 15:36:10 -0400 Subject: [PATCH 07/16] Make invocation cancellation two-step process API / users set invocation to cancelling, scheduler then deletes outputs. This avoids race conditions where the cancelled invocation still generates new jobs. --- lib/galaxy/managers/workflows.py | 31 +---------- lib/galaxy/model/__init__.py | 65 +++++++++++++++++++--- lib/galaxy/webapps/galaxy/api/workflows.py | 4 +- lib/galaxy/workflow/run.py | 2 + lib/galaxy/workflow/scheduling_manager.py | 6 ++ lib/galaxy_test/api/test_workflows.py | 10 +++- lib/galaxy_test/base/populators.py | 19 ++++++- 7 files changed, 92 insertions(+), 45 deletions(-) diff --git a/lib/galaxy/managers/workflows.py b/lib/galaxy/managers/workflows.py index fda3faf7ac6b..6527839cf160 100644 --- a/lib/galaxy/managers/workflows.py +++ b/lib/galaxy/managers/workflows.py @@ -31,7 +31,6 @@ or_, select, true, - update, ) from sqlalchemy.orm import ( aliased, @@ -422,45 +421,17 @@ def get_invocation_report(self, trans, invocation_id, **kwd): target_format=target_format, ) - def cancel_invocation(self, trans, decoded_invocation_id: int): + def request_invocation_cancellation(self, trans, decoded_invocation_id: int): workflow_invocation = self.get_invocation(trans, decoded_invocation_id) cancelled = workflow_invocation.cancel() if cancelled: workflow_invocation.add_message(InvocationCancellationUserRequest(reason="user_request")) trans.sa_session.add(workflow_invocation) - with transaction(trans.sa_session): - trans.sa_session.commit() - - job_subq = ( - trans.sa_session.query(model.Job.id) - .join(model.WorkflowInvocationStep) - .filter(model.WorkflowInvocationStep.workflow_invocation_id == decoded_invocation_id) - .filter(~model.Job.table.c.state.in_(model.Job.terminal_states)) - .with_for_update().scalar_subquery() - ) - log.debug("CANCEL STEP JOBS") - trans.sa_session.execute(update(model.Job.table).where(model.Job.id == job_subq).values({"state": model.Job.states.DELETING})) - - job_collection_subq = ( - trans.sa_session.query(model.Job.id) - .join(model.ImplicitCollectionJobsJobAssociation) - .join(model.ImplicitCollectionJobs) - .join(model.WorkflowInvocationStep, model.WorkflowInvocationStep.implicit_collection_jobs_id == model.ImplicitCollectionJobs.id) - .filter(model.WorkflowInvocationStep.workflow_invocation_id == decoded_invocation_id) - .filter(~model.Job.table.c.state.in_(model.Job.terminal_states)) - .with_for_update().subquery() - ) - - log.debug("CANCEL STEP IMPLICIT JOBS") - trans.sa_session.execute(update(model.Job.table).where(model.Job.table.c.id.in_(job_collection_subq.element)).values({"state": model.Job.states.DELETING})) with transaction(trans.sa_session): trans.sa_session.commit() - for invocation in workflow_invocation.subworkflow_invocations: - self.cancel_invocation(trans, invocation.subworkflow_invocation_id) - return workflow_invocation def get_invocation_step(self, trans, decoded_workflow_invocation_step_id): diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index 28229cae905c..fd5eaec21a3d 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -135,6 +135,7 @@ from galaxy.model.orm.now import now from galaxy.model.orm.util import add_object_to_object_session from galaxy.objectstore import ObjectStore +from galaxy.schema.invocation import InvocationCancellationUserRequest from galaxy.schema.schema import ( DatasetCollectionPopulatedState, DatasetState, @@ -1642,12 +1643,13 @@ def set_state(self, state): # Nothing changed, no action needed return False session = object_session(self) - if session and self.id and not state in Job.terminal_states: + if session and self.id and state not in Job.terminal_states: # generate statement that will not revert DELETING or DELETED back to anything non-terminal - rval = session.execute(update(Job.table).where( - Job.table.c.id == self.id, - ~Job.table.c.state.in_((Job.states.DELETING, Job.states.DELETED)) - ).values(state=state)) + rval = session.execute( + update(Job.table) + .where(Job.table.c.id == self.id, ~Job.table.c.state.in_((Job.states.DELETING, Job.states.DELETED))) + .values(state=state) + ) if rval.rowcount == 1: self.state_history.append(JobStateHistory(self)) return True @@ -8170,6 +8172,7 @@ class states(str, Enum): READY = "ready" # Workflow ready for another iteration of scheduling. SCHEDULED = "scheduled" # Workflow has been scheduled. CANCELLED = "cancelled" + CANCELLING = "cancelling" # invocation scheduler will cancel job in next iteration FAILED = "failed" non_terminal_states = [states.NEW, states.READY] @@ -8215,11 +8218,54 @@ def active(self): return self.state in [states.NEW, states.READY] def cancel(self): - if not self.active: - return False - else: - self.state = WorkflowInvocation.states.CANCELLED + if self.state not in [WorkflowInvocation.states.CANCELLING, WorkflowInvocation.states.CANCELLED]: + # No use cancelling workflow again, for all others we may still want to be able to cancel + # remaining tool and workflow steps + self.state = WorkflowInvocation.states.CANCELLING return True + return False + + def cancel_invocation_steps(self): + sa_session = object_session(self) + job_subq = ( + sa_session.query(Job.id) + .join(WorkflowInvocationStep) + .filter(WorkflowInvocationStep.workflow_invocation_id == self.id) + .filter(~Job.table.c.state.in_(Job.terminal_states)) + .with_for_update() + .scalar_subquery() + ) + sa_session.execute(update(Job.table).where(Job.id == job_subq).values({"state": Job.states.DELETING})) + + job_collection_subq = ( + sa_session.query(Job.id) + .join(ImplicitCollectionJobsJobAssociation) + .join(ImplicitCollectionJobs) + .join( + WorkflowInvocationStep, WorkflowInvocationStep.implicit_collection_jobs_id == ImplicitCollectionJobs.id + ) + .filter(WorkflowInvocationStep.workflow_invocation_id == self.id) + .filter(~Job.table.c.state.in_(Job.terminal_states)) + .with_for_update() + .subquery() + ) + + sa_session.execute( + update(Job.table) + .where(Job.table.c.id.in_(job_collection_subq.element)) + .values({"state": Job.states.DELETING}) + ) + + for invocation in self.subworkflow_invocations: + subworkflow_invocation = invocation.subworkflow_invocation + cancelled = subworkflow_invocation.cancel() + if cancelled: + subworkflow_invocation.add_message(InvocationCancellationUserRequest(reason="user_request")) + sa_session.add(subworkflow_invocation) + sa_session.commit() + + def mark_cancelled(self): + self.state = WorkflowInvocation.states.CANCELLED def fail(self): self.state = WorkflowInvocation.states.FAILED @@ -8269,6 +8315,7 @@ def poll_active_workflow_ids(engine, scheduler=None, handler=None): or_( WorkflowInvocation.state == WorkflowInvocation.states.NEW, WorkflowInvocation.state == WorkflowInvocation.states.READY, + WorkflowInvocation.state == WorkflowInvocation.states.CANCELLING, ), ] if scheduler is not None: diff --git a/lib/galaxy/webapps/galaxy/api/workflows.py b/lib/galaxy/webapps/galaxy/api/workflows.py index c8f9864d172e..508b2e481887 100644 --- a/lib/galaxy/webapps/galaxy/api/workflows.py +++ b/lib/galaxy/webapps/galaxy/api/workflows.py @@ -907,7 +907,9 @@ def cancel_invocation(self, trans: ProvidesUserContext, invocation_id, **kwd): :raises: exceptions.MessageException, exceptions.ObjectNotFound """ decoded_workflow_invocation_id = self.decode_id(invocation_id) - workflow_invocation = self.workflow_manager.cancel_invocation(trans, decoded_workflow_invocation_id) + workflow_invocation = self.workflow_manager.request_invocation_cancellation( + trans, decoded_workflow_invocation_id + ) return self.__encode_invocation(workflow_invocation, **kwd) @expose_api diff --git a/lib/galaxy/workflow/run.py b/lib/galaxy/workflow/run.py index 322a8800b616..59d19a5c079a 100644 --- a/lib/galaxy/workflow/run.py +++ b/lib/galaxy/workflow/run.py @@ -640,6 +640,8 @@ def subworkflow_invoker( when_values=None, ) -> WorkflowInvoker: subworkflow_invocation = self._subworkflow_invocation(step) + subworkflow_invocation.handler = self.workflow_invocation.handler + subworkflow_invocation.scheduler = self.workflow_invocation.scheduler workflow_run_config = workflow_request_to_run_config(subworkflow_invocation, use_cached_job) subworkflow_progress = self.subworkflow_progress( subworkflow_invocation, diff --git a/lib/galaxy/workflow/scheduling_manager.py b/lib/galaxy/workflow/scheduling_manager.py index fc44e59b699a..32fceaee7cf1 100644 --- a/lib/galaxy/workflow/scheduling_manager.py +++ b/lib/galaxy/workflow/scheduling_manager.py @@ -332,6 +332,12 @@ def __attempt_schedule(self, invocation_id, workflow_scheduler): workflow_invocation = session.get(model.WorkflowInvocation, invocation_id) try: + if workflow_invocation.state == workflow_invocation.states.CANCELLING: + workflow_invocation.cancel_invocation_steps() + workflow_invocation.state = workflow_invocation.states.CANCELLED + session.commit() + return False + if not workflow_invocation or not workflow_invocation.active: return False diff --git a/lib/galaxy_test/api/test_workflows.py b/lib/galaxy_test/api/test_workflows.py index 4b8a418b0c75..7ed5c5ba1a27 100644 --- a/lib/galaxy_test/api/test_workflows.py +++ b/lib/galaxy_test/api/test_workflows.py @@ -4090,12 +4090,18 @@ def test_cancel_workflow_invocation_deletes_jobs(self): assert invocation_before_cancellation["state"] == "scheduled" subworkflow_invocation_id = invocation_before_cancellation["steps"][2]["subworkflow_invocation_id"] self.workflow_populator.cancel_invocation(summary.invocation_id) + self.workflow_populator.wait_for_invocation_and_jobs( + history_id=history_id, + workflow_id=summary.workflow_id, + invocation_id=summary.invocation_id, + assert_ok=False, + ) invocation_jobs = self.workflow_populator.get_invocation_jobs(summary.invocation_id) for job in invocation_jobs: - assert job["state"] == "deleted" or job["state"] == "deleting" + assert job["state"] == "deleted" subworkflow_invocation_jobs = self.workflow_populator.get_invocation_jobs(subworkflow_invocation_id) for job in subworkflow_invocation_jobs: - assert job["state"] == "deleted" or job["state"] == "deleting" + assert job["state"] == "deleted" def test_workflow_failed_output_not_found(self, history_id): summary = self._run_workflow( diff --git a/lib/galaxy_test/base/populators.py b/lib/galaxy_test/base/populators.py index 4010cff642de..86cdba03f8e3 100644 --- a/lib/galaxy_test/base/populators.py +++ b/lib/galaxy_test/base/populators.py @@ -700,7 +700,9 @@ def invocation_jobs(self, invocation_id: str) -> List[Dict[str, Any]]: def active_history_jobs(self, history_id: str) -> list: all_history_jobs = self.history_jobs(history_id) - active_jobs = [j for j in all_history_jobs if j["state"] in ["new", "upload", "waiting", "queued", "running"]] + active_jobs = [ + j for j in all_history_jobs if j["state"] in ["new", "upload", "waiting", "queued", "running", "deleting"] + ] return active_jobs def cancel_job(self, job_id: str) -> Response: @@ -2094,7 +2096,7 @@ def get_invocation_jobs(self, invocation_id: str) -> List[Dict[str, Any]]: def wait_for_invocation_and_jobs( self, history_id: str, workflow_id: str, invocation_id: str, assert_ok: bool = True ) -> None: - state = self.wait_for_invocation(workflow_id, invocation_id) + state = self.wait_for_invocation(workflow_id, invocation_id, assert_ok=assert_ok) if assert_ok: assert state == "scheduled", state time.sleep(0.5) @@ -3117,7 +3119,18 @@ def get_state(): return state if skip_states is None: - skip_states = ["running", "queued", "new", "ready", "stop", "stopped", "setting_metadata", "waiting"] + skip_states = [ + "running", + "queued", + "new", + "ready", + "stop", + "stopped", + "setting_metadata", + "waiting", + "cancelling", + "deleting", + ] if ok_states is None: ok_states = ["ok", "scheduled", "deferred"] # Remove ok_states from skip_states, so we can wait for a state to becoming running From 21564323980d8964cae4666eaa2362413fb80255 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Tue, 10 Oct 2023 22:09:55 -0400 Subject: [PATCH 08/16] Refactor ItemGrabber into JobGrabber and InvocationGrabber --- lib/galaxy/jobs/handler.py | 44 +++++++++++++++-------- lib/galaxy/workflow/scheduling_manager.py | 7 ++-- 2 files changed, 33 insertions(+), 18 deletions(-) diff --git a/lib/galaxy/jobs/handler.py b/lib/galaxy/jobs/handler.py index 82a9f19382d4..62c2d1894d00 100644 --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -13,6 +13,8 @@ Dict, List, Tuple, + Type, + Union, ) from sqlalchemy.exc import OperationalError @@ -101,10 +103,11 @@ def shutdown(self): class ItemGrabber: + grab_model: Union[Type[model.Job], Type[model.WorkflowInvocation]] + def __init__( self, app, - grab_type="Job", handler_assignment_method=None, max_grab=None, self_handler_tags=None, @@ -112,8 +115,6 @@ def __init__( ): self.app = app self.sa_session = app.model.context - self.grab_this = getattr(model, grab_type) - self.grab_type = grab_type self.handler_assignment_method = handler_assignment_method self.self_handler_tags = self_handler_tags self.max_grab = max_grab @@ -123,27 +124,35 @@ def __init__( self._supports_returning = self.app.application_stack.supports_returning() def setup_query(self): + if self.grab_model is model.Job: + grab_condition = self.grab_model.table.c.state == self.grab_model.states.NEW + elif self.grab_model is model.WorkflowInvocation: + grab_condition = self.grab_model.table.c.state.in_( + (self.grab_model.states.NEW, self.grab_model.states.CANCELLING) + ) + else: + raise NotImplementedError(f"Grabbing {self.grab_model} not implemented") subq = ( - select(self.grab_this.id) + select(self.grab_model.id) .where( and_( - self.grab_this.table.c.handler.in_(self.self_handler_tags), - self.grab_this.table.c.state == self.grab_this.states.NEW, + self.grab_model.table.c.handler.in_(self.self_handler_tags), + grab_condition, ) ) - .order_by(self.grab_this.table.c.id) + .order_by(self.grab_model.table.c.id) ) if self.max_grab: subq = subq.limit(self.max_grab) if self.handler_assignment_method == HANDLER_ASSIGNMENT_METHODS.DB_SKIP_LOCKED: subq = subq.with_for_update(skip_locked=True) self._grab_query = ( - self.grab_this.table.update() - .where(self.grab_this.table.c.id.in_(subq)) + self.grab_model.table.update() + .where(self.grab_model.table.c.id.in_(subq)) .values(handler=self.app.config.server_name) ) if self._supports_returning: - self._grab_query = self._grab_query.returning(self.grab_this.table.c.id) + self._grab_query = self._grab_query.returning(self.grab_model.table.c.id) if self.handler_assignment_method == HANDLER_ASSIGNMENT_METHODS.DB_TRANSACTION_ISOLATION: self._grab_conn_opts["isolation_level"] = "SERIALIZABLE" log.info( @@ -183,7 +192,7 @@ def grab_unhandled_items(self): if self._supports_returning: rows = proxy.fetchall() if rows: - log.debug(f"Grabbed {self.grab_type}(s): {', '.join(str(row[0]) for row in rows)}") + log.debug(f"Grabbed {type(self.grab_model)}(s): {', '.join(str(row[0]) for row in rows)}") else: trans.rollback() except OperationalError as e: @@ -191,11 +200,19 @@ def grab_unhandled_items(self): # and should have attribute `code`. Other engines should just report the message and move on. if int(getattr(e.orig, "pgcode", -1)) != 40001: log.debug( - "Grabbing %s failed (serialization failures are ok): %s", self.grab_type, unicodify(e) + "Grabbing %s failed (serialization failures are ok): %s", self.grab_model, unicodify(e) ) trans.rollback() +class InvocationGrabber(ItemGrabber): + grab_model = model.WorkflowInvocation + + +class JobGrabber(ItemGrabber): + grab_model = model.Job + + class StopSignalException(Exception): """Exception raised when queue returns a stop signal.""" @@ -240,9 +257,8 @@ def __init__(self, app: MinimalManagerApp, dispatcher): self.app.job_config.handler_assignment_methods ) if handler_assignment_method: - self.job_grabber = ItemGrabber( + self.job_grabber = JobGrabber( app=app, - grab_type="Job", handler_assignment_method=handler_assignment_method, max_grab=self.app.job_config.handler_max_grab, self_handler_tags=self.app.job_config.self_handler_tags, diff --git a/lib/galaxy/workflow/scheduling_manager.py b/lib/galaxy/workflow/scheduling_manager.py index 32fceaee7cf1..b8851def3e7a 100644 --- a/lib/galaxy/workflow/scheduling_manager.py +++ b/lib/galaxy/workflow/scheduling_manager.py @@ -4,7 +4,7 @@ import galaxy.workflow.schedulers from galaxy import model from galaxy.exceptions import HandlerAssignmentError -from galaxy.jobs.handler import ItemGrabber +from galaxy.jobs.handler import InvocationGrabber from galaxy.model.base import transaction from galaxy.util import plugin_config from galaxy.util.custom_logging import get_logger @@ -285,13 +285,12 @@ def __init__(self, app, workflow_scheduling_manager): self.invocation_grabber = None self_handler_tags = set(self.app.job_config.self_handler_tags) self_handler_tags.add(self.workflow_scheduling_manager.default_handler_id) - handler_assignment_method = ItemGrabber.get_grabbable_handler_assignment_method( + handler_assignment_method = InvocationGrabber.get_grabbable_handler_assignment_method( self.workflow_scheduling_manager.handler_assignment_methods ) if handler_assignment_method: - self.invocation_grabber = ItemGrabber( + self.invocation_grabber = InvocationGrabber( app=app, - grab_type="WorkflowInvocation", handler_assignment_method=handler_assignment_method, max_grab=self.workflow_scheduling_manager.handler_max_grab, self_handler_tags=self_handler_tags, From 2980c1ff773757b6b59385ec950d8e786daab267 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Wed, 11 Oct 2023 18:05:17 -0400 Subject: [PATCH 09/16] Fix job state history --- lib/galaxy/model/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index fd5eaec21a3d..5c413a4af799 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -1651,6 +1651,8 @@ def set_state(self, state): .values(state=state) ) if rval.rowcount == 1: + # Need to expire state since we just updated it, but ORM doesn't know about it. + session.expire(self, ["state"]) self.state_history.append(JobStateHistory(self)) return True else: From c1801363a7b5d9a6838edc8853e5c40f99df6cb3 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Thu, 12 Oct 2023 08:00:30 +0100 Subject: [PATCH 10/16] Fix invocation cancel test --- lib/galaxy_test/api/test_workflows.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/galaxy_test/api/test_workflows.py b/lib/galaxy_test/api/test_workflows.py index 7ed5c5ba1a27..c41a4cdb7607 100644 --- a/lib/galaxy_test/api/test_workflows.py +++ b/lib/galaxy_test/api/test_workflows.py @@ -4034,7 +4034,12 @@ def test_cancel_workflow_invocation(self): invocation_url = self._api_url(f"workflows/{uploaded_workflow_id}/usage/{invocation_id}", use_key=True) delete_response = delete(invocation_url) self._assert_status_code_is(delete_response, 200) - + self.workflow_populator.wait_for_invocation_and_jobs( + history_id=history_id, + workflow_id=uploaded_workflow_id, + invocation_id=invocation_id, + assert_ok=False, + ) invocation = self._invocation_details(uploaded_workflow_id, invocation_id) assert invocation["state"] == "cancelled" message = invocation["messages"][0] From 6f1b60ae4ccce8e80b77121e9055b41141e491df Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Thu, 12 Oct 2023 11:59:58 +0200 Subject: [PATCH 11/16] Fix class name logging --- lib/galaxy/jobs/handler.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/lib/galaxy/jobs/handler.py b/lib/galaxy/jobs/handler.py index 62c2d1894d00..977293cb2117 100644 --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -131,7 +131,7 @@ def setup_query(self): (self.grab_model.states.NEW, self.grab_model.states.CANCELLING) ) else: - raise NotImplementedError(f"Grabbing {self.grab_model} not implemented") + raise NotImplementedError(f"Grabbing {self.grab_model.__name__} not implemented") subq = ( select(self.grab_model.id) .where( @@ -192,7 +192,9 @@ def grab_unhandled_items(self): if self._supports_returning: rows = proxy.fetchall() if rows: - log.debug(f"Grabbed {type(self.grab_model)}(s): {', '.join(str(row[0]) for row in rows)}") + log.debug( + f"Grabbed {self.grab_model.__name__}(s): {', '.join(str(row[0]) for row in rows)}" + ) else: trans.rollback() except OperationalError as e: @@ -200,7 +202,9 @@ def grab_unhandled_items(self): # and should have attribute `code`. Other engines should just report the message and move on. if int(getattr(e.orig, "pgcode", -1)) != 40001: log.debug( - "Grabbing %s failed (serialization failures are ok): %s", self.grab_model, unicodify(e) + "Grabbing %s failed (serialization failures are ok): %s", + self.grab_model.__name__, + unicodify(e), ) trans.rollback() From 6cde33f6e691c2e07e9374d704cf645e21d83e6a Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Mon, 13 Nov 2023 15:48:10 +0100 Subject: [PATCH 12/16] Apply Nicola's suggestions --- lib/galaxy/model/__init__.py | 4 ++-- lib/galaxy/workflow/scheduling_manager.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index 5c413a4af799..c2e201eb3f52 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -1635,9 +1635,9 @@ def all_entry_points_configured(self): all_configured = ep.configured and all_configured return all_configured - def set_state(self, state): + def set_state(self, state: JobState) -> bool: """ - Save state history. Returns True if stat has changed, else False + Save state history. Returns True if state has changed, else False. """ if self.state == state: # Nothing changed, no action needed diff --git a/lib/galaxy/workflow/scheduling_manager.py b/lib/galaxy/workflow/scheduling_manager.py index b8851def3e7a..31f4bca49ae1 100644 --- a/lib/galaxy/workflow/scheduling_manager.py +++ b/lib/galaxy/workflow/scheduling_manager.py @@ -333,7 +333,7 @@ def __attempt_schedule(self, invocation_id, workflow_scheduler): try: if workflow_invocation.state == workflow_invocation.states.CANCELLING: workflow_invocation.cancel_invocation_steps() - workflow_invocation.state = workflow_invocation.states.CANCELLED + workflow_invocation.mark_cancelled() session.commit() return False From db9d5b5f8bf3aba4b1d8007231185ca7643e728a Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Mon, 13 Nov 2023 15:59:53 +0100 Subject: [PATCH 13/16] Add new finished_states and move deleting to that --- lib/galaxy/model/__init__.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index c2e201eb3f52..ac2abf82df32 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -1367,7 +1367,10 @@ class Job(Base, JobLike, UsesCreateAndUpdateTime, Dictifiable, Serializable): states = JobState - terminal_states = [states.OK, states.ERROR, states.DELETED, states.DELETING] + # states that are not expected to change, except through admin action or re-scheduling + terminal_states = [states.OK, states.ERROR, states.DELETED] + # deleting state should not turn back into any of the non-ready states + finished_states = terminal_states + [states.DELETING] #: job states where the job hasn't finished and the model may still change non_ready_states = [ states.NEW, @@ -1392,13 +1395,7 @@ def running(self): @property def finished(self): - states = self.states - return self.state in [ - states.OK, - states.ERROR, - states.DELETING, - states.DELETED, - ] + return self.state in self.finished_states def io_dicts(self, exclude_implicit_outputs=False) -> IoDicts: inp_data: Dict[str, Optional[DatasetInstance]] = {da.name: da.dataset for da in self.input_datasets} @@ -1643,7 +1640,7 @@ def set_state(self, state: JobState) -> bool: # Nothing changed, no action needed return False session = object_session(self) - if session and self.id and state not in Job.terminal_states: + if session and self.id and state not in Job.finished_states: # generate statement that will not revert DELETING or DELETED back to anything non-terminal rval = session.execute( update(Job.table) @@ -8233,7 +8230,7 @@ def cancel_invocation_steps(self): sa_session.query(Job.id) .join(WorkflowInvocationStep) .filter(WorkflowInvocationStep.workflow_invocation_id == self.id) - .filter(~Job.table.c.state.in_(Job.terminal_states)) + .filter(~Job.table.c.state.in_(Job.finished_states)) .with_for_update() .scalar_subquery() ) @@ -8247,7 +8244,7 @@ def cancel_invocation_steps(self): WorkflowInvocationStep, WorkflowInvocationStep.implicit_collection_jobs_id == ImplicitCollectionJobs.id ) .filter(WorkflowInvocationStep.workflow_invocation_id == self.id) - .filter(~Job.table.c.state.in_(Job.terminal_states)) + .filter(~Job.table.c.state.in_(Job.finished_states)) .with_for_update() .subquery() ) From 194f5ad5df3e606a01addf7d8de249f1d51ef2eb Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Tue, 14 Nov 2023 09:56:13 +0100 Subject: [PATCH 14/16] Use sqlalchemy 2.0 query syntax --- lib/galaxy/jobs/handler.py | 14 ++++++-------- lib/galaxy/model/__init__.py | 11 ++++++----- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/lib/galaxy/jobs/handler.py b/lib/galaxy/jobs/handler.py index 977293cb2117..359fdeddfc89 100644 --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -125,22 +125,20 @@ def __init__( def setup_query(self): if self.grab_model is model.Job: - grab_condition = self.grab_model.table.c.state == self.grab_model.states.NEW + grab_condition = self.grab_model.state == self.grab_model.states.NEW elif self.grab_model is model.WorkflowInvocation: - grab_condition = self.grab_model.table.c.state.in_( - (self.grab_model.states.NEW, self.grab_model.states.CANCELLING) - ) + grab_condition = self.grab_model.state.in_((self.grab_model.states.NEW, self.grab_model.states.CANCELLING)) else: raise NotImplementedError(f"Grabbing {self.grab_model.__name__} not implemented") subq = ( select(self.grab_model.id) .where( and_( - self.grab_model.table.c.handler.in_(self.self_handler_tags), + self.grab_model.handler.in_(self.self_handler_tags), grab_condition, ) ) - .order_by(self.grab_model.table.c.id) + .order_by(self.grab_model.id) ) if self.max_grab: subq = subq.limit(self.max_grab) @@ -148,11 +146,11 @@ def setup_query(self): subq = subq.with_for_update(skip_locked=True) self._grab_query = ( self.grab_model.table.update() - .where(self.grab_model.table.c.id.in_(subq)) + .where(self.grab_model.id.in_(subq)) .values(handler=self.app.config.server_name) ) if self._supports_returning: - self._grab_query = self._grab_query.returning(self.grab_model.table.c.id) + self._grab_query = self._grab_query.returning(self.grab_model.id) if self.handler_assignment_method == HANDLER_ASSIGNMENT_METHODS.DB_TRANSACTION_ISOLATION: self._grab_conn_opts["isolation_level"] = "SERIALIZABLE" log.info( diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index ac2abf82df32..3c19fe0384a6 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -1644,7 +1644,7 @@ def set_state(self, state: JobState) -> bool: # generate statement that will not revert DELETING or DELETED back to anything non-terminal rval = session.execute( update(Job.table) - .where(Job.table.c.id == self.id, ~Job.table.c.state.in_((Job.states.DELETING, Job.states.DELETED))) + .where(Job.id == self.id, ~Job.state.in_((Job.states.DELETING, Job.states.DELETED))) .values(state=state) ) if rval.rowcount == 1: @@ -8226,25 +8226,26 @@ def cancel(self): def cancel_invocation_steps(self): sa_session = object_session(self) + assert sa_session job_subq = ( - sa_session.query(Job.id) + select(Job.id) .join(WorkflowInvocationStep) .filter(WorkflowInvocationStep.workflow_invocation_id == self.id) - .filter(~Job.table.c.state.in_(Job.finished_states)) + .filter(~Job.state.in_(Job.finished_states)) .with_for_update() .scalar_subquery() ) sa_session.execute(update(Job.table).where(Job.id == job_subq).values({"state": Job.states.DELETING})) job_collection_subq = ( - sa_session.query(Job.id) + select(Job.id) .join(ImplicitCollectionJobsJobAssociation) .join(ImplicitCollectionJobs) .join( WorkflowInvocationStep, WorkflowInvocationStep.implicit_collection_jobs_id == ImplicitCollectionJobs.id ) .filter(WorkflowInvocationStep.workflow_invocation_id == self.id) - .filter(~Job.table.c.state.in_(Job.finished_states)) + .filter(~Job.state.in_(Job.finished_states)) .with_for_update() .subquery() ) From 571c511dcd66a1ac317dd350df170da9a0efdc94 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Tue, 14 Nov 2023 10:01:18 +0100 Subject: [PATCH 15/16] Use JobGrabber.get_grabbable_handler_assignment_method static method for consistency --- lib/galaxy/jobs/handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/galaxy/jobs/handler.py b/lib/galaxy/jobs/handler.py index 359fdeddfc89..967c98a8c093 100644 --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -255,7 +255,7 @@ def __init__(self, app: MinimalManagerApp, dispatcher): name = "JobHandlerQueue.monitor_thread" self._init_monitor_thread(name, target=self.__monitor, config=app.config) self.job_grabber = None - handler_assignment_method = ItemGrabber.get_grabbable_handler_assignment_method( + handler_assignment_method = JobGrabber.get_grabbable_handler_assignment_method( self.app.job_config.handler_assignment_methods ) if handler_assignment_method: From 2a11420bc6548ba62bd1c7f1fc8cde9a8616be34 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Tue, 14 Nov 2023 10:58:29 +0100 Subject: [PATCH 16/16] Prevent cancelled invocation resetting back to new or ready --- lib/galaxy/model/__init__.py | 17 +++++++++++++++++ lib/galaxy/workflow/run.py | 4 ++-- lib/galaxy/workflow/scheduling_manager.py | 2 +- 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index 3c19fe0384a6..c437cb1ee65a 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -8216,6 +8216,23 @@ def active(self): states = WorkflowInvocation.states return self.state in [states.NEW, states.READY] + def set_state(self, state: "WorkflowInvocation.states"): + session = object_session(self) + priority_states = (WorkflowInvocation.states.CANCELLING, WorkflowInvocation.states.CANCELLED) + if session and self.id and state not in priority_states: + # generate statement that will not revert CANCELLING or CANCELLED back to anything non-terminal + session.execute( + update(WorkflowInvocation.table) + .where( + WorkflowInvocation.id == self.id, + or_(~WorkflowInvocation.state.in_(priority_states), WorkflowInvocation.state.is_(None)), + ) + .values(state=state) + ) + else: + # Not bound to a session, or setting cancelling/cancelled + self.state = state + def cancel(self): if self.state not in [WorkflowInvocation.states.CANCELLING, WorkflowInvocation.states.CANCELLED]: # No use cancelling workflow again, for all others we may still want to be able to cancel diff --git a/lib/galaxy/workflow/run.py b/lib/galaxy/workflow/run.py index 59d19a5c079a..776b4e525803 100644 --- a/lib/galaxy/workflow/run.py +++ b/lib/galaxy/workflow/run.py @@ -193,7 +193,7 @@ def invoke(self) -> Dict[int, Any]: log.debug( f"Workflow invocation [{workflow_invocation.id}] exceeded maximum number of seconds allowed for scheduling [{maximum_duration}], failing." ) - workflow_invocation.state = model.WorkflowInvocation.states.FAILED + workflow_invocation.set_state(model.WorkflowInvocation.states.FAILED) # All jobs ran successfully, so we can save now self.trans.sa_session.add(workflow_invocation) @@ -264,7 +264,7 @@ def invoke(self) -> Dict[int, Any]: state = model.WorkflowInvocation.states.READY else: state = model.WorkflowInvocation.states.SCHEDULED - workflow_invocation.state = state + workflow_invocation.set_state(state) # All jobs ran successfully, so we can save now self.trans.sa_session.add(workflow_invocation) diff --git a/lib/galaxy/workflow/scheduling_manager.py b/lib/galaxy/workflow/scheduling_manager.py index 31f4bca49ae1..3868e24c13a9 100644 --- a/lib/galaxy/workflow/scheduling_manager.py +++ b/lib/galaxy/workflow/scheduling_manager.py @@ -155,7 +155,7 @@ def shutdown(self): raise exception def queue(self, workflow_invocation, request_params, flush=True): - workflow_invocation.state = model.WorkflowInvocation.states.NEW + workflow_invocation.set_state(model.WorkflowInvocation.states.NEW) workflow_invocation.scheduler = request_params.get("scheduler", None) or self.default_scheduler_id sa_session = self.app.model.context sa_session.add(workflow_invocation)