diff --git a/client/src/components/WorkflowInvocationState/WorkflowInvocationSummary.vue b/client/src/components/WorkflowInvocationState/WorkflowInvocationSummary.vue index ca3fb191c70f..3b30faf87d73 100644 --- a/client/src/components/WorkflowInvocationState/WorkflowInvocationSummary.vue +++ b/client/src/components/WorkflowInvocationState/WorkflowInvocationSummary.vue @@ -15,7 +15,7 @@ -
+
diff --git a/lib/galaxy/jobs/__init__.py b/lib/galaxy/jobs/__init__.py index c26afff39104..e2d9cf909222 100644 --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -1529,13 +1529,14 @@ def mark_as_resubmitted(self, info=None): self.sa_session.commit() def change_state(self, state, info=False, flush=True, job=None): - job_supplied = job is not None - if not job_supplied: + if job is None: job = self.get_job() self.sa_session.refresh(job) - # Else: - # If this is a new job (e.g. initially queued) - we are in the same - # thread and no other threads are working on the job yet - so don't refresh. + else: + # job attributes may have been changed, so we can't refresh here, + # but we want to make sure that the terminal state check below works + # on the current job state value to minimize race conditions. + self.sa_session.expire(job, ["state"]) if job.state in model.Job.terminal_states: log.warning( @@ -1547,9 +1548,10 @@ def change_state(self, state, info=False, flush=True, job=None): return if info: job.info = info - job.set_state(state) + state_changed = job.set_state(state) self.sa_session.add(job) - job.update_output_states(self.app.application_stack.supports_skip_locked()) + if state_changed: + job.update_output_states(self.app.application_stack.supports_skip_locked()) if flush: with transaction(self.sa_session): self.sa_session.commit() diff --git a/lib/galaxy/jobs/handler.py b/lib/galaxy/jobs/handler.py index 82a9f19382d4..967c98a8c093 100644 --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -13,6 +13,8 @@ Dict, List, Tuple, + Type, + Union, ) from sqlalchemy.exc import OperationalError @@ -101,10 +103,11 @@ def shutdown(self): class ItemGrabber: + grab_model: Union[Type[model.Job], Type[model.WorkflowInvocation]] + def __init__( self, app, - grab_type="Job", handler_assignment_method=None, max_grab=None, self_handler_tags=None, @@ -112,8 +115,6 @@ def __init__( ): self.app = app self.sa_session = app.model.context - self.grab_this = getattr(model, grab_type) - self.grab_type = grab_type self.handler_assignment_method = handler_assignment_method self.self_handler_tags = self_handler_tags self.max_grab = max_grab @@ -123,27 +124,33 @@ def __init__( self._supports_returning = self.app.application_stack.supports_returning() def setup_query(self): + if self.grab_model is model.Job: + grab_condition = self.grab_model.state == self.grab_model.states.NEW + elif self.grab_model is model.WorkflowInvocation: + grab_condition = self.grab_model.state.in_((self.grab_model.states.NEW, self.grab_model.states.CANCELLING)) + else: + raise NotImplementedError(f"Grabbing {self.grab_model.__name__} not implemented") subq = ( - select(self.grab_this.id) + select(self.grab_model.id) .where( and_( - self.grab_this.table.c.handler.in_(self.self_handler_tags), - self.grab_this.table.c.state == self.grab_this.states.NEW, + self.grab_model.handler.in_(self.self_handler_tags), + grab_condition, ) ) - .order_by(self.grab_this.table.c.id) + .order_by(self.grab_model.id) ) if self.max_grab: subq = subq.limit(self.max_grab) if self.handler_assignment_method == HANDLER_ASSIGNMENT_METHODS.DB_SKIP_LOCKED: subq = subq.with_for_update(skip_locked=True) self._grab_query = ( - self.grab_this.table.update() - .where(self.grab_this.table.c.id.in_(subq)) + self.grab_model.table.update() + .where(self.grab_model.id.in_(subq)) .values(handler=self.app.config.server_name) ) if self._supports_returning: - self._grab_query = self._grab_query.returning(self.grab_this.table.c.id) + self._grab_query = self._grab_query.returning(self.grab_model.id) if self.handler_assignment_method == HANDLER_ASSIGNMENT_METHODS.DB_TRANSACTION_ISOLATION: self._grab_conn_opts["isolation_level"] = "SERIALIZABLE" log.info( @@ -183,7 +190,9 @@ def grab_unhandled_items(self): if self._supports_returning: rows = proxy.fetchall() if rows: - log.debug(f"Grabbed {self.grab_type}(s): {', '.join(str(row[0]) for row in rows)}") + log.debug( + f"Grabbed {self.grab_model.__name__}(s): {', '.join(str(row[0]) for row in rows)}" + ) else: trans.rollback() except OperationalError as e: @@ -191,11 +200,21 @@ def grab_unhandled_items(self): # and should have attribute `code`. Other engines should just report the message and move on. if int(getattr(e.orig, "pgcode", -1)) != 40001: log.debug( - "Grabbing %s failed (serialization failures are ok): %s", self.grab_type, unicodify(e) + "Grabbing %s failed (serialization failures are ok): %s", + self.grab_model.__name__, + unicodify(e), ) trans.rollback() +class InvocationGrabber(ItemGrabber): + grab_model = model.WorkflowInvocation + + +class JobGrabber(ItemGrabber): + grab_model = model.Job + + class StopSignalException(Exception): """Exception raised when queue returns a stop signal.""" @@ -236,13 +255,12 @@ def __init__(self, app: MinimalManagerApp, dispatcher): name = "JobHandlerQueue.monitor_thread" self._init_monitor_thread(name, target=self.__monitor, config=app.config) self.job_grabber = None - handler_assignment_method = ItemGrabber.get_grabbable_handler_assignment_method( + handler_assignment_method = JobGrabber.get_grabbable_handler_assignment_method( self.app.job_config.handler_assignment_methods ) if handler_assignment_method: - self.job_grabber = ItemGrabber( + self.job_grabber = JobGrabber( app=app, - grab_type="Job", handler_assignment_method=handler_assignment_method, max_grab=self.app.job_config.handler_max_grab, self_handler_tags=self.app.job_config.self_handler_tags, diff --git a/lib/galaxy/managers/workflows.py b/lib/galaxy/managers/workflows.py index 345320413547..6527839cf160 100644 --- a/lib/galaxy/managers/workflows.py +++ b/lib/galaxy/managers/workflows.py @@ -421,18 +421,16 @@ def get_invocation_report(self, trans, invocation_id, **kwd): target_format=target_format, ) - def cancel_invocation(self, trans, decoded_invocation_id): + def request_invocation_cancellation(self, trans, decoded_invocation_id: int): workflow_invocation = self.get_invocation(trans, decoded_invocation_id) cancelled = workflow_invocation.cancel() if cancelled: workflow_invocation.add_message(InvocationCancellationUserRequest(reason="user_request")) trans.sa_session.add(workflow_invocation) - with transaction(trans.sa_session): - trans.sa_session.commit() - else: - # TODO: More specific exception? - raise exceptions.MessageException("Cannot cancel an inactive workflow invocation.") + + with transaction(trans.sa_session): + trans.sa_session.commit() return workflow_invocation diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index a0199e157afb..c437cb1ee65a 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -135,6 +135,7 @@ from galaxy.model.orm.now import now from galaxy.model.orm.util import add_object_to_object_session from galaxy.objectstore import ObjectStore +from galaxy.schema.invocation import InvocationCancellationUserRequest from galaxy.schema.schema import ( DatasetCollectionPopulatedState, DatasetState, @@ -1366,7 +1367,10 @@ class Job(Base, JobLike, UsesCreateAndUpdateTime, Dictifiable, Serializable): states = JobState + # states that are not expected to change, except through admin action or re-scheduling terminal_states = [states.OK, states.ERROR, states.DELETED] + # deleting state should not turn back into any of the non-ready states + finished_states = terminal_states + [states.DELETING] #: job states where the job hasn't finished and the model may still change non_ready_states = [ states.NEW, @@ -1391,13 +1395,7 @@ def running(self): @property def finished(self): - states = self.states - return self.state in [ - states.OK, - states.ERROR, - states.DELETING, - states.DELETED, - ] + return self.state in self.finished_states def io_dicts(self, exclude_implicit_outputs=False) -> IoDicts: inp_data: Dict[str, Optional[DatasetInstance]] = {da.name: da.dataset for da in self.input_datasets} @@ -1634,12 +1632,32 @@ def all_entry_points_configured(self): all_configured = ep.configured and all_configured return all_configured - def set_state(self, state): + def set_state(self, state: JobState) -> bool: """ - Save state history + Save state history. Returns True if state has changed, else False. """ - self.state = state - self.state_history.append(JobStateHistory(self)) + if self.state == state: + # Nothing changed, no action needed + return False + session = object_session(self) + if session and self.id and state not in Job.finished_states: + # generate statement that will not revert DELETING or DELETED back to anything non-terminal + rval = session.execute( + update(Job.table) + .where(Job.id == self.id, ~Job.state.in_((Job.states.DELETING, Job.states.DELETED))) + .values(state=state) + ) + if rval.rowcount == 1: + # Need to expire state since we just updated it, but ORM doesn't know about it. + session.expire(self, ["state"]) + self.state_history.append(JobStateHistory(self)) + return True + else: + return False + else: + self.state = state + self.state_history.append(JobStateHistory(self)) + return True def get_param_values(self, app, ignore_errors=False): """ @@ -8153,6 +8171,7 @@ class states(str, Enum): READY = "ready" # Workflow ready for another iteration of scheduling. SCHEDULED = "scheduled" # Workflow has been scheduled. CANCELLED = "cancelled" + CANCELLING = "cancelling" # invocation scheduler will cancel job in next iteration FAILED = "failed" non_terminal_states = [states.NEW, states.READY] @@ -8197,12 +8216,73 @@ def active(self): states = WorkflowInvocation.states return self.state in [states.NEW, states.READY] - def cancel(self): - if not self.active: - return False + def set_state(self, state: "WorkflowInvocation.states"): + session = object_session(self) + priority_states = (WorkflowInvocation.states.CANCELLING, WorkflowInvocation.states.CANCELLED) + if session and self.id and state not in priority_states: + # generate statement that will not revert CANCELLING or CANCELLED back to anything non-terminal + session.execute( + update(WorkflowInvocation.table) + .where( + WorkflowInvocation.id == self.id, + or_(~WorkflowInvocation.state.in_(priority_states), WorkflowInvocation.state.is_(None)), + ) + .values(state=state) + ) else: - self.state = WorkflowInvocation.states.CANCELLED + # Not bound to a session, or setting cancelling/cancelled + self.state = state + + def cancel(self): + if self.state not in [WorkflowInvocation.states.CANCELLING, WorkflowInvocation.states.CANCELLED]: + # No use cancelling workflow again, for all others we may still want to be able to cancel + # remaining tool and workflow steps + self.state = WorkflowInvocation.states.CANCELLING return True + return False + + def cancel_invocation_steps(self): + sa_session = object_session(self) + assert sa_session + job_subq = ( + select(Job.id) + .join(WorkflowInvocationStep) + .filter(WorkflowInvocationStep.workflow_invocation_id == self.id) + .filter(~Job.state.in_(Job.finished_states)) + .with_for_update() + .scalar_subquery() + ) + sa_session.execute(update(Job.table).where(Job.id == job_subq).values({"state": Job.states.DELETING})) + + job_collection_subq = ( + select(Job.id) + .join(ImplicitCollectionJobsJobAssociation) + .join(ImplicitCollectionJobs) + .join( + WorkflowInvocationStep, WorkflowInvocationStep.implicit_collection_jobs_id == ImplicitCollectionJobs.id + ) + .filter(WorkflowInvocationStep.workflow_invocation_id == self.id) + .filter(~Job.state.in_(Job.finished_states)) + .with_for_update() + .subquery() + ) + + sa_session.execute( + update(Job.table) + .where(Job.table.c.id.in_(job_collection_subq.element)) + .values({"state": Job.states.DELETING}) + ) + + for invocation in self.subworkflow_invocations: + subworkflow_invocation = invocation.subworkflow_invocation + cancelled = subworkflow_invocation.cancel() + if cancelled: + subworkflow_invocation.add_message(InvocationCancellationUserRequest(reason="user_request")) + sa_session.add(subworkflow_invocation) + sa_session.commit() + + def mark_cancelled(self): + self.state = WorkflowInvocation.states.CANCELLED def fail(self): self.state = WorkflowInvocation.states.FAILED @@ -8252,6 +8332,7 @@ def poll_active_workflow_ids(engine, scheduler=None, handler=None): or_( WorkflowInvocation.state == WorkflowInvocation.states.NEW, WorkflowInvocation.state == WorkflowInvocation.states.READY, + WorkflowInvocation.state == WorkflowInvocation.states.CANCELLING, ), ] if scheduler is not None: diff --git a/lib/galaxy/webapps/galaxy/api/workflows.py b/lib/galaxy/webapps/galaxy/api/workflows.py index c8f9864d172e..508b2e481887 100644 --- a/lib/galaxy/webapps/galaxy/api/workflows.py +++ b/lib/galaxy/webapps/galaxy/api/workflows.py @@ -907,7 +907,9 @@ def cancel_invocation(self, trans: ProvidesUserContext, invocation_id, **kwd): :raises: exceptions.MessageException, exceptions.ObjectNotFound """ decoded_workflow_invocation_id = self.decode_id(invocation_id) - workflow_invocation = self.workflow_manager.cancel_invocation(trans, decoded_workflow_invocation_id) + workflow_invocation = self.workflow_manager.request_invocation_cancellation( + trans, decoded_workflow_invocation_id + ) return self.__encode_invocation(workflow_invocation, **kwd) @expose_api diff --git a/lib/galaxy/workflow/run.py b/lib/galaxy/workflow/run.py index 322a8800b616..776b4e525803 100644 --- a/lib/galaxy/workflow/run.py +++ b/lib/galaxy/workflow/run.py @@ -193,7 +193,7 @@ def invoke(self) -> Dict[int, Any]: log.debug( f"Workflow invocation [{workflow_invocation.id}] exceeded maximum number of seconds allowed for scheduling [{maximum_duration}], failing." ) - workflow_invocation.state = model.WorkflowInvocation.states.FAILED + workflow_invocation.set_state(model.WorkflowInvocation.states.FAILED) # All jobs ran successfully, so we can save now self.trans.sa_session.add(workflow_invocation) @@ -264,7 +264,7 @@ def invoke(self) -> Dict[int, Any]: state = model.WorkflowInvocation.states.READY else: state = model.WorkflowInvocation.states.SCHEDULED - workflow_invocation.state = state + workflow_invocation.set_state(state) # All jobs ran successfully, so we can save now self.trans.sa_session.add(workflow_invocation) @@ -640,6 +640,8 @@ def subworkflow_invoker( when_values=None, ) -> WorkflowInvoker: subworkflow_invocation = self._subworkflow_invocation(step) + subworkflow_invocation.handler = self.workflow_invocation.handler + subworkflow_invocation.scheduler = self.workflow_invocation.scheduler workflow_run_config = workflow_request_to_run_config(subworkflow_invocation, use_cached_job) subworkflow_progress = self.subworkflow_progress( subworkflow_invocation, diff --git a/lib/galaxy/workflow/scheduling_manager.py b/lib/galaxy/workflow/scheduling_manager.py index fc44e59b699a..3868e24c13a9 100644 --- a/lib/galaxy/workflow/scheduling_manager.py +++ b/lib/galaxy/workflow/scheduling_manager.py @@ -4,7 +4,7 @@ import galaxy.workflow.schedulers from galaxy import model from galaxy.exceptions import HandlerAssignmentError -from galaxy.jobs.handler import ItemGrabber +from galaxy.jobs.handler import InvocationGrabber from galaxy.model.base import transaction from galaxy.util import plugin_config from galaxy.util.custom_logging import get_logger @@ -155,7 +155,7 @@ def shutdown(self): raise exception def queue(self, workflow_invocation, request_params, flush=True): - workflow_invocation.state = model.WorkflowInvocation.states.NEW + workflow_invocation.set_state(model.WorkflowInvocation.states.NEW) workflow_invocation.scheduler = request_params.get("scheduler", None) or self.default_scheduler_id sa_session = self.app.model.context sa_session.add(workflow_invocation) @@ -285,13 +285,12 @@ def __init__(self, app, workflow_scheduling_manager): self.invocation_grabber = None self_handler_tags = set(self.app.job_config.self_handler_tags) self_handler_tags.add(self.workflow_scheduling_manager.default_handler_id) - handler_assignment_method = ItemGrabber.get_grabbable_handler_assignment_method( + handler_assignment_method = InvocationGrabber.get_grabbable_handler_assignment_method( self.workflow_scheduling_manager.handler_assignment_methods ) if handler_assignment_method: - self.invocation_grabber = ItemGrabber( + self.invocation_grabber = InvocationGrabber( app=app, - grab_type="WorkflowInvocation", handler_assignment_method=handler_assignment_method, max_grab=self.workflow_scheduling_manager.handler_max_grab, self_handler_tags=self_handler_tags, @@ -332,6 +331,12 @@ def __attempt_schedule(self, invocation_id, workflow_scheduler): workflow_invocation = session.get(model.WorkflowInvocation, invocation_id) try: + if workflow_invocation.state == workflow_invocation.states.CANCELLING: + workflow_invocation.cancel_invocation_steps() + workflow_invocation.mark_cancelled() + session.commit() + return False + if not workflow_invocation or not workflow_invocation.active: return False diff --git a/lib/galaxy_test/api/test_workflows.py b/lib/galaxy_test/api/test_workflows.py index 276b989460ca..c41a4cdb7607 100644 --- a/lib/galaxy_test/api/test_workflows.py +++ b/lib/galaxy_test/api/test_workflows.py @@ -4034,12 +4034,80 @@ def test_cancel_workflow_invocation(self): invocation_url = self._api_url(f"workflows/{uploaded_workflow_id}/usage/{invocation_id}", use_key=True) delete_response = delete(invocation_url) self._assert_status_code_is(delete_response, 200) - + self.workflow_populator.wait_for_invocation_and_jobs( + history_id=history_id, + workflow_id=uploaded_workflow_id, + invocation_id=invocation_id, + assert_ok=False, + ) invocation = self._invocation_details(uploaded_workflow_id, invocation_id) assert invocation["state"] == "cancelled" message = invocation["messages"][0] assert message["reason"] == "user_request" + @skip_without_tool("collection_creates_dynamic_nested") + def test_cancel_workflow_invocation_deletes_jobs(self): + with self.dataset_populator.test_history() as history_id: + summary = self._run_workflow( + """ +class: GalaxyWorkflow +inputs: + list_input: + type: collection + collection_type: list +steps: + first_step: + tool_id: cat_data_and_sleep + in: + input1: list_input + state: + sleep_time: 60 + subworkflow_step: + run: + class: GalaxyWorkflow + inputs: + list_input: + type: collection + collection_type: list + steps: + intermediate_step: + tool_id: identifier_multiple + in: + input1: list_input + subworkflow: + in: + list_input: first_step/out_file1 +test_data: + list_input: + collection_type: list + elements: + - identifier: 1 + content: A + - identifier: 2 + content: B +""", + history_id=history_id, + wait=False, + ) + # wait_for_invocation just waits until scheduling complete, not jobs or subworkflow invocations + self.workflow_populator.wait_for_invocation("null", summary.invocation_id, assert_ok=True) + invocation_before_cancellation = self.workflow_populator.get_invocation(summary.invocation_id) + assert invocation_before_cancellation["state"] == "scheduled" + subworkflow_invocation_id = invocation_before_cancellation["steps"][2]["subworkflow_invocation_id"] + self.workflow_populator.cancel_invocation(summary.invocation_id) + self.workflow_populator.wait_for_invocation_and_jobs( + history_id=history_id, + workflow_id=summary.workflow_id, + invocation_id=summary.invocation_id, + assert_ok=False, + ) + invocation_jobs = self.workflow_populator.get_invocation_jobs(summary.invocation_id) + for job in invocation_jobs: + assert job["state"] == "deleted" + subworkflow_invocation_jobs = self.workflow_populator.get_invocation_jobs(subworkflow_invocation_id) + for job in subworkflow_invocation_jobs: + assert job["state"] == "deleted" + def test_workflow_failed_output_not_found(self, history_id): summary = self._run_workflow( """ diff --git a/lib/galaxy_test/base/populators.py b/lib/galaxy_test/base/populators.py index 48631597179a..86cdba03f8e3 100644 --- a/lib/galaxy_test/base/populators.py +++ b/lib/galaxy_test/base/populators.py @@ -700,7 +700,9 @@ def invocation_jobs(self, invocation_id: str) -> List[Dict[str, Any]]: def active_history_jobs(self, history_id: str) -> list: all_history_jobs = self.history_jobs(history_id) - active_jobs = [j for j in all_history_jobs if j["state"] in ["new", "upload", "waiting", "queued", "running"]] + active_jobs = [ + j for j in all_history_jobs if j["state"] in ["new", "upload", "waiting", "queued", "running", "deleting"] + ] return active_jobs def cancel_job(self, job_id: str) -> Response: @@ -1697,6 +1699,11 @@ def workflow_invocations(self, workflow_id: str) -> List[Dict[str, Any]]: api_asserts.assert_status_code_is(response, 200) return response.json() + def cancel_invocation(self, invocation_id: str): + response = self._delete(f"invocations/{invocation_id}") + api_asserts.assert_status_code_is(response, 200) + return response.json() + def history_invocations(self, history_id: str) -> List[Dict[str, Any]]: history_invocations_response = self._get("invocations", {"history_id": history_id}) api_asserts.assert_status_code_is(history_invocations_response, 200) @@ -2079,10 +2086,17 @@ def setup_workflow_run( return workflow_request, history_id, workflow_id + def get_invocation_jobs(self, invocation_id: str) -> List[Dict[str, Any]]: + jobs_response = self._get("jobs", data={"invocation_id": invocation_id}) + api_asserts.assert_status_code_is(jobs_response, 200) + jobs = jobs_response.json() + assert isinstance(jobs, list) + return jobs + def wait_for_invocation_and_jobs( self, history_id: str, workflow_id: str, invocation_id: str, assert_ok: bool = True ) -> None: - state = self.wait_for_invocation(workflow_id, invocation_id) + state = self.wait_for_invocation(workflow_id, invocation_id, assert_ok=assert_ok) if assert_ok: assert state == "scheduled", state time.sleep(0.5) @@ -3105,7 +3119,18 @@ def get_state(): return state if skip_states is None: - skip_states = ["running", "queued", "new", "ready", "stop", "stopped", "setting_metadata", "waiting"] + skip_states = [ + "running", + "queued", + "new", + "ready", + "stop", + "stopped", + "setting_metadata", + "waiting", + "cancelling", + "deleting", + ] if ok_states is None: ok_states = ["ok", "scheduled", "deferred"] # Remove ok_states from skip_states, so we can wait for a state to becoming running