diff --git a/client/src/components/WorkflowInvocationState/InvocationMessage.vue b/client/src/components/WorkflowInvocationState/InvocationMessage.vue index d31a4189b0bd..840ad7ed44f3 100644 --- a/client/src/components/WorkflowInvocationState/InvocationMessage.vue +++ b/client/src/components/WorkflowInvocationState/InvocationMessage.vue @@ -62,7 +62,12 @@ const workflow = computed(() => { }); const workflowStep = computed(() => { - if ("workflow_step_id" in props.invocationMessage && workflow.value) { + if ( + "workflow_step_id" in props.invocationMessage && + props.invocationMessage.workflow_step_id !== undefined && + props.invocationMessage.workflow_step_id !== null && + workflow.value + ) { return workflow.value.steps[props.invocationMessage.workflow_step_id]; } return undefined; @@ -146,10 +151,14 @@ const infoString = computed(() => { invocationMessage.workflow_step_id + 1 } is a conditional step and the result of the when expression is not a boolean type.`; } else if (reason === "unexpected_failure") { + let atStep = ""; + if (invocationMessage.workflow_step_id !== null && invocationMessage.workflow_step_id !== undefined) { + atStep = ` at step ${invocationMessage.workflow_step_id + 1}`; + } if (invocationMessage.details) { - return `${failFragment} an unexpected failure occurred: '${invocationMessage.details}'`; + return `${failFragment} an unexpected failure occurred${atStep}: '${invocationMessage.details}'`; } - return `${failFragment} an unexpected failure occurred.`; + return `${failFragment} an unexpected failure occurred${atStep}.`; } else if (reason === "workflow_output_not_found") { return `Defined workflow output '${invocationMessage.output_name}' was not found in step ${ invocationMessage.workflow_step_id + 1 diff --git a/client/src/components/WorkflowInvocationState/invocationMessageModel.ts b/client/src/components/WorkflowInvocationState/invocationMessageModel.ts index 7c86d43ad17c..29ab6a989847 100644 --- a/client/src/components/WorkflowInvocationState/invocationMessageModel.ts +++ b/client/src/components/WorkflowInvocationState/invocationMessageModel.ts @@ -77,7 +77,7 @@ export interface GenericInvocationFailureCollectionFailedEncodedDatabaseIdField /** * HistoryDatasetCollectionAssociation ID that relates to failure. */ - hdca_id?: string; + hdca_id: string; /** * Workflow step id of step that caused failure. */ @@ -92,7 +92,7 @@ export interface GenericInvocationFailureCollectionFailedInt { /** * HistoryDatasetCollectionAssociation ID that relates to failure. */ - hdca_id?: number; + hdca_id: number; /** * Workflow step id of step that caused failure. */ @@ -159,7 +159,7 @@ export interface GenericInvocationFailureJobFailedEncodedDatabaseIdField { /** * Job ID that relates to failure. */ - job_id?: string; + job_id: string; /** * Workflow step id of step that caused failure. */ @@ -174,7 +174,7 @@ export interface GenericInvocationFailureJobFailedInt { /** * Job ID that relates to failure. */ - job_id?: number; + job_id: number; /** * Workflow step id of step that caused failure. */ @@ -232,6 +232,10 @@ export interface GenericInvocationUnexpectedFailureEncodedDatabaseIdField { * May contains details to help troubleshoot this problem. */ details?: string; + /** + * Workflow step id of step that failed. + */ + workflow_step_id?: number; } export interface GenericInvocationUnexpectedFailureInt { reason: "unexpected_failure"; @@ -239,4 +243,8 @@ export interface GenericInvocationUnexpectedFailureInt { * May contains details to help troubleshoot this problem. */ details?: string; + /** + * Workflow step id of step that failed. + */ + workflow_step_id?: number; } diff --git a/lib/galaxy/exceptions/__init__.py b/lib/galaxy/exceptions/__init__.py index 2448fd817960..5806ed157c21 100644 --- a/lib/galaxy/exceptions/__init__.py +++ b/lib/galaxy/exceptions/__init__.py @@ -151,6 +151,16 @@ class ToolInputsNotReadyException(MessageException): error_code = error_codes_by_name["TOOL_INPUTS_NOT_READY"] +class ToolInputsNotOKException(MessageException): + def __init__(self, err_msg=None, type="info", *, src: str, id: int, **extra_error_info): + super().__init__(err_msg, type, **extra_error_info) + self.src = src + self.id = id + + status_code = 400 + error_code = error_codes_by_name["TOOL_INPUTS_NOT_OK"] + + class RealUserRequiredException(MessageException): status_code = 400 error_code = error_codes_by_name["REAL_USER_REQUIRED"] diff --git a/lib/galaxy/exceptions/error_codes.json b/lib/galaxy/exceptions/error_codes.json index 7d8f521b8218..99464306acb6 100644 --- a/lib/galaxy/exceptions/error_codes.json +++ b/lib/galaxy/exceptions/error_codes.json @@ -94,6 +94,11 @@ "code": 400016, "message": "Only real users can make this request." }, + { + "name": "TOOL_INPUTS_NOT_OK", + "code": 400017, + "message": "Tool inputs not in required OK state." + }, { "name": "USER_AUTHENTICATION_FAILED", "code": 401001, diff --git a/lib/galaxy/schema/invocation.py b/lib/galaxy/schema/invocation.py index 0d2617a25862..fad8f6deba8c 100644 --- a/lib/galaxy/schema/invocation.py +++ b/lib/galaxy/schema/invocation.py @@ -51,7 +51,10 @@ def get(self, key: Any, default: Any = None) -> Any: # Fetch the order_index when serializing for the API, # which makes much more sense when pointing to steps. if key == "workflow_step_id": - return self._obj.workflow_step.order_index + if self._obj.workflow_step: + return self._obj.workflow_step.order_index + else: + return default elif key == "dependent_workflow_step_id": if self._obj.dependent_workflow_step_id: return self._obj.dependent_workflow_step.order_index @@ -132,6 +135,7 @@ class GenericInvocationFailureWhenNotBoolean(InvocationFailureMessageBase[Databa class GenericInvocationUnexpectedFailure(InvocationMessageBase, Generic[DatabaseIdT]): reason: Literal[FailureReason.unexpected_failure] details: Optional[str] = Field(None, description="May contains details to help troubleshoot this problem.") + workflow_step_id: Optional[int] = Field(None, description="Workflow step id of step that failed.") class GenericInvocationWarning(InvocationMessageBase, Generic[DatabaseIdT]): diff --git a/lib/galaxy/tools/__init__.py b/lib/galaxy/tools/__init__.py index 5a92fd960e4b..3d5838d96490 100644 --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -34,7 +34,10 @@ exceptions, model, ) -from galaxy.exceptions import ToolInputsNotReadyException +from galaxy.exceptions import ( + ToolInputsNotOKException, + ToolInputsNotReadyException, +) from galaxy.job_execution import output_collect from galaxy.metadata import get_metadata_compute_strategy from galaxy.model.base import transaction @@ -2858,7 +2861,7 @@ def exec_after_process(self, app, inp_data, out_data, param_dict, job=None, **kw copy_object = input_dataset break if copy_object is None: - raise Exception("Failed to find dataset output.") + raise exceptions.MessageException("Failed to find dataset output.") out_data[key].copy_from(copy_object) def parse_environment_variables(self, tool_source): @@ -3200,8 +3203,10 @@ def check_dataset_state(state): if self.require_dataset_ok: if state != model.Dataset.states.OK: - raise ValueError( - f"Tool requires inputs to be in valid state, but dataset {input_dataset} is in state '{input_dataset.state}'" + raise ToolInputsNotOKException( + f"Tool requires inputs to be in valid state, but dataset {input_dataset} is in state '{input_dataset.state}'", + src="hda", + id=input_dataset.id, ) for input_dataset in input_datasets.values(): @@ -3330,7 +3335,7 @@ def produce_outputs(self, trans, out_data, output_collections, incoming, history elif how == "by_index": extracted_element = collection[int(incoming["which"]["index"])] else: - raise Exception("Invalid tool parameters.") + raise exceptions.MessageException("Invalid tool parameters.") extracted = extracted_element.element_object extracted_o = extracted.copy( copy_tags=extracted.tags, new_name=extracted_element.element_identifier, flush=False @@ -3370,7 +3375,9 @@ def produce_outputs(self, trans, out_data, output_collections, incoming, history if element_identifier not in identifiers_map: identifiers_map[element_identifier] = [] elif dupl_actions == "fail": - raise Exception(f"Duplicate collection element identifiers found for [{element_identifier}]") + raise exceptions.MessageException( + f"Duplicate collection element identifiers found for [{element_identifier}]" + ) identifiers_map[element_identifier].append(input_num) for copy, input_list in enumerate(input_lists): @@ -3562,12 +3569,12 @@ def produce_outputs(self, trans, out_data, output_collections, incoming, history except KeyError: hdca_history_name = f"{hdca.hid}: {hdca.name}" message = f"List of element identifiers does not match element identifiers in collection '{hdca_history_name}'" - raise Exception(message) + raise exceptions.MessageException(message) else: message = f"Number of lines must match number of list elements ({len(elements)}), but file has {data_lines} lines" - raise Exception(message) + raise exceptions.MessageException(message) else: - raise Exception(f"Unknown sort_type '{sorttype}'") + raise exceptions.MessageException(f"Unknown sort_type '{sorttype}'") if presort_elements is not None: sorted_elements = [x[1] for x in sorted(presort_elements, key=lambda x: x[0])] @@ -3599,7 +3606,7 @@ def produce_outputs(self, trans, out_data, output_collections, incoming, history def add_copied_value_to_new_elements(new_label, dce_object): new_label = new_label.strip() if new_label in new_elements: - raise Exception( + raise exceptions.MessageException( f"New identifier [{new_label}] appears twice in resulting collection, these values must be unique." ) if getattr(dce_object, "history_content_type", None) == "dataset": @@ -3612,7 +3619,7 @@ def add_copied_value_to_new_elements(new_label, dce_object): with open(new_labels_path) as fh: new_labels = fh.readlines(1024 * 1000000) if strict and len(hdca.collection.elements) != len(new_labels): - raise Exception("Relabel mapping file contains incorrect number of identifiers") + raise exceptions.MessageException("Relabel mapping file contains incorrect number of identifiers") if how_type == "tabular": # We have a tabular file, where the first column is an existing element identifier, # and the second column is the new element identifier. @@ -3624,7 +3631,7 @@ def add_copied_value_to_new_elements(new_label, dce_object): default = None if strict else element_identifier new_label = new_labels_dict.get(element_identifier, default) if not new_label: - raise Exception(f"Failed to find new label for identifier [{element_identifier}]") + raise exceptions.MessageException(f"Failed to find new label for identifier [{element_identifier}]") add_copied_value_to_new_elements(new_label, dce_object) else: # If new_labels_dataset_assoc is not a two-column tabular dataset we label with the current line of the dataset @@ -3633,7 +3640,7 @@ def add_copied_value_to_new_elements(new_label, dce_object): add_copied_value_to_new_elements(new_labels[i], dce_object) for key in new_elements.keys(): if not re.match(r"^[\w\- \.,]+$", key): - raise Exception(f"Invalid new collection identifier [{key}]") + raise exceptions.MessageException(f"Invalid new collection identifier [{key}]") self._add_datasets_to_history(history, new_elements.values()) output_collections.create_collection( next(iter(self.outputs.values())), "output", elements=new_elements, propagate_hda_tags=False diff --git a/lib/galaxy/tools/execute.py b/lib/galaxy/tools/execute.py index 412c36bcefdd..c935aff13ecc 100644 --- a/lib/galaxy/tools/execute.py +++ b/lib/galaxy/tools/execute.py @@ -19,6 +19,7 @@ from boltons.iterutils import remap from galaxy import model +from galaxy.exceptions import ToolInputsNotOKException from galaxy.model.base import transaction from galaxy.model.dataset_collections.matching import MatchingCollections from galaxy.model.dataset_collections.structure import ( @@ -143,14 +144,17 @@ def execute_single_job(execution_slice, completed_job, skip=False): if check_inputs_ready: for params in execution_tracker.param_combinations: # This will throw an exception if the tool is not ready. - check_inputs_ready( - tool, - trans, - params, - history, - execution_cache=execution_cache, - collection_info=collection_info, - ) + try: + check_inputs_ready( + tool, + trans, + params, + history, + execution_cache=execution_cache, + collection_info=collection_info, + ) + except ToolInputsNotOKException as e: + execution_tracker.record_error(e) execution_tracker.ensure_implicit_collections_populated(history, mapping_params.param_template) job_count = len(execution_tracker.param_combinations) diff --git a/lib/galaxy/workflow/modules.py b/lib/galaxy/workflow/modules.py index 36bdd374b172..805a8ba3f2f6 100644 --- a/lib/galaxy/workflow/modules.py +++ b/lib/galaxy/workflow/modules.py @@ -2314,6 +2314,16 @@ def callback(input, prefixed_name, **kwargs): if execution_tracker.execution_errors: # TODO: formalize into InvocationFailure ? message = f"Failed to create {len(execution_tracker.execution_errors)} job(s) for workflow step {step.order_index + 1}: {str(execution_tracker.execution_errors[0])}" + for error in execution_tracker.execution_errors: + # try first to raise a structured invocation error message + if isinstance(error, exceptions.ToolInputsNotOKException) and error.src == "hda": + raise FailWorkflowEvaluation( + why=InvocationFailureDatasetFailed( + reason=FailureReason.dataset_failed, + hda_id=error.id, + workflow_step_id=step.id, + ) + ) raise exceptions.MessageException(message) return complete diff --git a/lib/galaxy/workflow/run.py b/lib/galaxy/workflow/run.py index 6ddc56427672..d69fe79b6aa2 100644 --- a/lib/galaxy/workflow/run.py +++ b/lib/galaxy/workflow/run.py @@ -240,12 +240,20 @@ def invoke(self) -> Dict[int, Any]: except modules.DelayedWorkflowEvaluation as de: step_delayed = delayed_steps = True self.progress.mark_step_outputs_delayed(step, why=de.why) - except Exception: + except Exception as e: log.exception( "Failed to schedule %s, problem occurred on %s.", self.workflow_invocation.workflow.log_str(), step.log_str(), ) + if isinstance(e, MessageException): + # This is the highest level at which we can inject the step id + # to provide some more context to the exception. + raise modules.FailWorkflowEvaluation( + why=InvocationUnexpectedFailure( + reason=FailureReason.unexpected_failure, details=str(e), workflow_step_id=step.id + ) + ) raise if not step_delayed: diff --git a/lib/galaxy_test/api/test_workflows.py b/lib/galaxy_test/api/test_workflows.py index 393546d95ec6..5b7487404532 100644 --- a/lib/galaxy_test/api/test_workflows.py +++ b/lib/galaxy_test/api/test_workflows.py @@ -4060,6 +4060,78 @@ def test_workflow_warning_workflow_output_not_found(self, history_id): assert "workflow_step_id" in message assert message["output_name"] == "does_not_exist" + @skip_without_tool("__APPLY_RULES__") + @skip_without_tool("job_properties") + def test_workflow_failed_input_not_ok(self, history_id): + summary = self._run_workflow( + """ +class: GalaxyWorkflow +steps: + job_props: + tool_id: job_properties + state: + thebool: true + failbool: true + apply: + tool_id: __APPLY_RULES__ + in: + input: job_props/list_output + state: + rules: + rules: + - type: add_column_metadata + value: identifier0 + mapping: + - type: list_identifiers + columns: [0] + """, + history_id=history_id, + assert_ok=False, + wait=True, + ) + invocation_details = self.workflow_populator.get_invocation(summary.invocation_id, step_details=True) + assert invocation_details["state"] == "failed" + assert len(invocation_details["messages"]) == 1 + message = invocation_details["messages"][0] + assert message["reason"] == "dataset_failed" + assert message["workflow_step_id"] == 1 + + @skip_without_tool("__RELABEL_FROM_FILE__") + def test_workflow_failed_with_message_exception(self, history_id): + summary = self._run_workflow( + """ +class: GalaxyWorkflow +inputs: + input_collection: + collection_type: list + type: collection + relabel_file: + type: data +steps: + relabel: + tool_id: __RELABEL_FROM_FILE__ + in: + input: input_collection + how|labels: relabel_file +test_data: + input_collection: + collection_type: "list:list" + relabel_file: + value: 1.bed + type: File + """, + history_id=history_id, + assert_ok=False, + wait=True, + ) + invocation_details = self.workflow_populator.get_invocation(summary.invocation_id, step_details=True) + assert invocation_details["state"] == "failed" + assert len(invocation_details["messages"]) == 1 + message = invocation_details["messages"][0] + assert message["reason"] == "unexpected_failure" + assert message["workflow_step_id"] == 2 + assert "Invalid new collection identifier" in message["details"] + @skip_without_tool("identifier_multiple") def test_invocation_map_over(self, history_id): summary = self._run_workflow(