Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[23.1] Improve invocation error reporting #16917

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,12 @@ const workflow = computed(() => {
});

const workflowStep = computed(() => {
if ("workflow_step_id" in props.invocationMessage && workflow.value) {
if (
"workflow_step_id" in props.invocationMessage &&
props.invocationMessage.workflow_step_id !== undefined &&
props.invocationMessage.workflow_step_id !== null &&
workflow.value
) {
return workflow.value.steps[props.invocationMessage.workflow_step_id];
}
return undefined;
Expand Down Expand Up @@ -146,10 +151,14 @@ const infoString = computed(() => {
invocationMessage.workflow_step_id + 1
} is a conditional step and the result of the when expression is not a boolean type.`;
} else if (reason === "unexpected_failure") {
let atStep = "";
if (invocationMessage.workflow_step_id !== null && invocationMessage.workflow_step_id !== undefined) {
atStep = ` at step ${invocationMessage.workflow_step_id + 1}`;
}
if (invocationMessage.details) {
return `${failFragment} an unexpected failure occurred: '${invocationMessage.details}'`;
return `${failFragment} an unexpected failure occurred${atStep}: '${invocationMessage.details}'`;
}
return `${failFragment} an unexpected failure occurred.`;
return `${failFragment} an unexpected failure occurred${atStep}.`;
} else if (reason === "workflow_output_not_found") {
return `Defined workflow output '${invocationMessage.output_name}' was not found in step ${
invocationMessage.workflow_step_id + 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ export interface GenericInvocationFailureCollectionFailedEncodedDatabaseIdField
/**
* HistoryDatasetCollectionAssociation ID that relates to failure.
*/
hdca_id?: string;
hdca_id: string;
/**
* Workflow step id of step that caused failure.
*/
Expand All @@ -92,7 +92,7 @@ export interface GenericInvocationFailureCollectionFailedInt {
/**
* HistoryDatasetCollectionAssociation ID that relates to failure.
*/
hdca_id?: number;
hdca_id: number;
/**
* Workflow step id of step that caused failure.
*/
Expand Down Expand Up @@ -159,7 +159,7 @@ export interface GenericInvocationFailureJobFailedEncodedDatabaseIdField {
/**
* Job ID that relates to failure.
*/
job_id?: string;
job_id: string;
/**
* Workflow step id of step that caused failure.
*/
Expand All @@ -174,7 +174,7 @@ export interface GenericInvocationFailureJobFailedInt {
/**
* Job ID that relates to failure.
*/
job_id?: number;
job_id: number;
/**
* Workflow step id of step that caused failure.
*/
Expand Down Expand Up @@ -232,11 +232,19 @@ export interface GenericInvocationUnexpectedFailureEncodedDatabaseIdField {
* May contains details to help troubleshoot this problem.
*/
details?: string;
/**
* Workflow step id of step that failed.
*/
workflow_step_id?: number;
}
export interface GenericInvocationUnexpectedFailureInt {
reason: "unexpected_failure";
/**
* May contains details to help troubleshoot this problem.
*/
details?: string;
/**
* Workflow step id of step that failed.
*/
workflow_step_id?: number;
}
10 changes: 10 additions & 0 deletions lib/galaxy/exceptions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,16 @@ class ToolInputsNotReadyException(MessageException):
error_code = error_codes_by_name["TOOL_INPUTS_NOT_READY"]


class ToolInputsNotOKException(MessageException):
def __init__(self, err_msg=None, type="info", *, src: str, id: int, **extra_error_info):
super().__init__(err_msg, type, **extra_error_info)
self.src = src
self.id = id

status_code = 400
error_code = error_codes_by_name["TOOL_INPUTS_NOT_OK"]


class RealUserRequiredException(MessageException):
status_code = 400
error_code = error_codes_by_name["REAL_USER_REQUIRED"]
Expand Down
5 changes: 5 additions & 0 deletions lib/galaxy/exceptions/error_codes.json
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@
"code": 400016,
"message": "Only real users can make this request."
},
{
"name": "TOOL_INPUTS_NOT_OK",
"code": 400017,
"message": "Tool inputs not in required OK state."
},
{
"name": "USER_AUTHENTICATION_FAILED",
"code": 401001,
Expand Down
6 changes: 5 additions & 1 deletion lib/galaxy/schema/invocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ def get(self, key: Any, default: Any = None) -> Any:
# Fetch the order_index when serializing for the API,
# which makes much more sense when pointing to steps.
if key == "workflow_step_id":
return self._obj.workflow_step.order_index
if self._obj.workflow_step:
return self._obj.workflow_step.order_index
else:
return default
elif key == "dependent_workflow_step_id":
if self._obj.dependent_workflow_step_id:
return self._obj.dependent_workflow_step.order_index
Expand Down Expand Up @@ -132,6 +135,7 @@ class GenericInvocationFailureWhenNotBoolean(InvocationFailureMessageBase[Databa
class GenericInvocationUnexpectedFailure(InvocationMessageBase, Generic[DatabaseIdT]):
reason: Literal[FailureReason.unexpected_failure]
details: Optional[str] = Field(None, description="May contains details to help troubleshoot this problem.")
workflow_step_id: Optional[int] = Field(None, description="Workflow step id of step that failed.")


class GenericInvocationWarning(InvocationMessageBase, Generic[DatabaseIdT]):
Expand Down
33 changes: 20 additions & 13 deletions lib/galaxy/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@
exceptions,
model,
)
from galaxy.exceptions import ToolInputsNotReadyException
from galaxy.exceptions import (
ToolInputsNotOKException,
ToolInputsNotReadyException,
)
from galaxy.job_execution import output_collect
from galaxy.metadata import get_metadata_compute_strategy
from galaxy.model.base import transaction
Expand Down Expand Up @@ -2858,7 +2861,7 @@ def exec_after_process(self, app, inp_data, out_data, param_dict, job=None, **kw
copy_object = input_dataset
break
if copy_object is None:
raise Exception("Failed to find dataset output.")
raise exceptions.MessageException("Failed to find dataset output.")
out_data[key].copy_from(copy_object)

def parse_environment_variables(self, tool_source):
Expand Down Expand Up @@ -3200,8 +3203,10 @@ def check_dataset_state(state):

if self.require_dataset_ok:
if state != model.Dataset.states.OK:
raise ValueError(
f"Tool requires inputs to be in valid state, but dataset {input_dataset} is in state '{input_dataset.state}'"
raise ToolInputsNotOKException(
f"Tool requires inputs to be in valid state, but dataset {input_dataset} is in state '{input_dataset.state}'",
src="hda",
id=input_dataset.id,
)

for input_dataset in input_datasets.values():
Expand Down Expand Up @@ -3330,7 +3335,7 @@ def produce_outputs(self, trans, out_data, output_collections, incoming, history
elif how == "by_index":
extracted_element = collection[int(incoming["which"]["index"])]
else:
raise Exception("Invalid tool parameters.")
raise exceptions.MessageException("Invalid tool parameters.")
extracted = extracted_element.element_object
extracted_o = extracted.copy(
copy_tags=extracted.tags, new_name=extracted_element.element_identifier, flush=False
Expand Down Expand Up @@ -3370,7 +3375,9 @@ def produce_outputs(self, trans, out_data, output_collections, incoming, history
if element_identifier not in identifiers_map:
identifiers_map[element_identifier] = []
elif dupl_actions == "fail":
raise Exception(f"Duplicate collection element identifiers found for [{element_identifier}]")
raise exceptions.MessageException(
f"Duplicate collection element identifiers found for [{element_identifier}]"
)
identifiers_map[element_identifier].append(input_num)

for copy, input_list in enumerate(input_lists):
Expand Down Expand Up @@ -3562,12 +3569,12 @@ def produce_outputs(self, trans, out_data, output_collections, incoming, history
except KeyError:
hdca_history_name = f"{hdca.hid}: {hdca.name}"
message = f"List of element identifiers does not match element identifiers in collection '{hdca_history_name}'"
raise Exception(message)
raise exceptions.MessageException(message)
else:
message = f"Number of lines must match number of list elements ({len(elements)}), but file has {data_lines} lines"
raise Exception(message)
raise exceptions.MessageException(message)
else:
raise Exception(f"Unknown sort_type '{sorttype}'")
raise exceptions.MessageException(f"Unknown sort_type '{sorttype}'")

if presort_elements is not None:
sorted_elements = [x[1] for x in sorted(presort_elements, key=lambda x: x[0])]
Expand Down Expand Up @@ -3599,7 +3606,7 @@ def produce_outputs(self, trans, out_data, output_collections, incoming, history
def add_copied_value_to_new_elements(new_label, dce_object):
new_label = new_label.strip()
if new_label in new_elements:
raise Exception(
raise exceptions.MessageException(
f"New identifier [{new_label}] appears twice in resulting collection, these values must be unique."
)
if getattr(dce_object, "history_content_type", None) == "dataset":
Expand All @@ -3612,7 +3619,7 @@ def add_copied_value_to_new_elements(new_label, dce_object):
with open(new_labels_path) as fh:
new_labels = fh.readlines(1024 * 1000000)
if strict and len(hdca.collection.elements) != len(new_labels):
raise Exception("Relabel mapping file contains incorrect number of identifiers")
raise exceptions.MessageException("Relabel mapping file contains incorrect number of identifiers")
if how_type == "tabular":
# We have a tabular file, where the first column is an existing element identifier,
# and the second column is the new element identifier.
Expand All @@ -3624,7 +3631,7 @@ def add_copied_value_to_new_elements(new_label, dce_object):
default = None if strict else element_identifier
new_label = new_labels_dict.get(element_identifier, default)
if not new_label:
raise Exception(f"Failed to find new label for identifier [{element_identifier}]")
raise exceptions.MessageException(f"Failed to find new label for identifier [{element_identifier}]")
add_copied_value_to_new_elements(new_label, dce_object)
else:
# If new_labels_dataset_assoc is not a two-column tabular dataset we label with the current line of the dataset
Expand All @@ -3633,7 +3640,7 @@ def add_copied_value_to_new_elements(new_label, dce_object):
add_copied_value_to_new_elements(new_labels[i], dce_object)
for key in new_elements.keys():
if not re.match(r"^[\w\- \.,]+$", key):
raise Exception(f"Invalid new collection identifier [{key}]")
raise exceptions.MessageException(f"Invalid new collection identifier [{key}]")
self._add_datasets_to_history(history, new_elements.values())
output_collections.create_collection(
next(iter(self.outputs.values())), "output", elements=new_elements, propagate_hda_tags=False
Expand Down
20 changes: 12 additions & 8 deletions lib/galaxy/tools/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from boltons.iterutils import remap

from galaxy import model
from galaxy.exceptions import ToolInputsNotOKException
from galaxy.model.base import transaction
from galaxy.model.dataset_collections.matching import MatchingCollections
from galaxy.model.dataset_collections.structure import (
Expand Down Expand Up @@ -143,14 +144,17 @@ def execute_single_job(execution_slice, completed_job, skip=False):
if check_inputs_ready:
for params in execution_tracker.param_combinations:
# This will throw an exception if the tool is not ready.
check_inputs_ready(
tool,
trans,
params,
history,
execution_cache=execution_cache,
collection_info=collection_info,
)
try:
check_inputs_ready(
tool,
trans,
params,
history,
execution_cache=execution_cache,
collection_info=collection_info,
)
except ToolInputsNotOKException as e:
execution_tracker.record_error(e)

execution_tracker.ensure_implicit_collections_populated(history, mapping_params.param_template)
job_count = len(execution_tracker.param_combinations)
Expand Down
10 changes: 10 additions & 0 deletions lib/galaxy/workflow/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -2314,6 +2314,16 @@ def callback(input, prefixed_name, **kwargs):
if execution_tracker.execution_errors:
# TODO: formalize into InvocationFailure ?
message = f"Failed to create {len(execution_tracker.execution_errors)} job(s) for workflow step {step.order_index + 1}: {str(execution_tracker.execution_errors[0])}"
for error in execution_tracker.execution_errors:
# try first to raise a structured invocation error message
if isinstance(error, exceptions.ToolInputsNotOKException) and error.src == "hda":
raise FailWorkflowEvaluation(
why=InvocationFailureDatasetFailed(
reason=FailureReason.dataset_failed,
hda_id=error.id,
workflow_step_id=step.id,
)
)
raise exceptions.MessageException(message)

return complete
Expand Down
10 changes: 9 additions & 1 deletion lib/galaxy/workflow/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,20 @@ def invoke(self) -> Dict[int, Any]:
except modules.DelayedWorkflowEvaluation as de:
step_delayed = delayed_steps = True
self.progress.mark_step_outputs_delayed(step, why=de.why)
except Exception:
except Exception as e:
log.exception(
"Failed to schedule %s, problem occurred on %s.",
self.workflow_invocation.workflow.log_str(),
step.log_str(),
)
if isinstance(e, MessageException):
# This is the highest level at which we can inject the step id
# to provide some more context to the exception.
raise modules.FailWorkflowEvaluation(
why=InvocationUnexpectedFailure(
reason=FailureReason.unexpected_failure, details=str(e), workflow_step_id=step.id
)
)
raise

if not step_delayed:
Expand Down
72 changes: 72 additions & 0 deletions lib/galaxy_test/api/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -4060,6 +4060,78 @@ def test_workflow_warning_workflow_output_not_found(self, history_id):
assert "workflow_step_id" in message
assert message["output_name"] == "does_not_exist"

@skip_without_tool("__APPLY_RULES__")
@skip_without_tool("job_properties")
def test_workflow_failed_input_not_ok(self, history_id):
summary = self._run_workflow(
"""
class: GalaxyWorkflow
steps:
job_props:
tool_id: job_properties
state:
thebool: true
failbool: true
apply:
tool_id: __APPLY_RULES__
in:
input: job_props/list_output
state:
rules:
rules:
- type: add_column_metadata
value: identifier0
mapping:
- type: list_identifiers
columns: [0]
""",
history_id=history_id,
assert_ok=False,
wait=True,
)
invocation_details = self.workflow_populator.get_invocation(summary.invocation_id, step_details=True)
assert invocation_details["state"] == "failed"
assert len(invocation_details["messages"]) == 1
message = invocation_details["messages"][0]
assert message["reason"] == "dataset_failed"
assert message["workflow_step_id"] == 1

@skip_without_tool("__RELABEL_FROM_FILE__")
def test_workflow_failed_with_message_exception(self, history_id):
summary = self._run_workflow(
"""
class: GalaxyWorkflow
inputs:
input_collection:
collection_type: list
type: collection
relabel_file:
type: data
steps:
relabel:
tool_id: __RELABEL_FROM_FILE__
in:
input: input_collection
how|labels: relabel_file
test_data:
input_collection:
collection_type: "list:list"
relabel_file:
value: 1.bed
type: File
""",
history_id=history_id,
assert_ok=False,
wait=True,
)
invocation_details = self.workflow_populator.get_invocation(summary.invocation_id, step_details=True)
assert invocation_details["state"] == "failed"
assert len(invocation_details["messages"]) == 1
message = invocation_details["messages"][0]
assert message["reason"] == "unexpected_failure"
assert message["workflow_step_id"] == 2
assert "Invalid new collection identifier" in message["details"]

@skip_without_tool("identifier_multiple")
def test_invocation_map_over(self, history_id):
summary = self._run_workflow(
Expand Down
Loading