Skip to content

Commit

Permalink
Merge pull request #18985 from jmchilton/workflow_request
Browse files Browse the repository at this point in the history
Allow recovering a normalized version of workflow request state from API
  • Loading branch information
mvdbeek authored Oct 28, 2024
2 parents 2050f48 + cc8f1b6 commit 1ebd078
Show file tree
Hide file tree
Showing 15 changed files with 597 additions and 90 deletions.
1 change: 1 addition & 0 deletions .github/workflows/framework_workflows.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ on:
env:
GALAXY_TEST_DBURI: 'postgresql://postgres:postgres@localhost:5432/galaxy?client_encoding=utf8'
GALAXY_TEST_RAISE_EXCEPTION_ON_HISTORYLESS_HDA: '1'
GALAXY_TEST_WORKFLOW_AFTER_RERUN: '1'
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
Expand Down
173 changes: 155 additions & 18 deletions client/src/api/schema/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2645,6 +2645,23 @@ export interface paths {
patch?: never;
trace?: never;
};
"/api/invocations/{invocation_id}/request": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
/** Get a description modeling an API request to invoke this workflow - this is recreated and will be more specific in some ways than the initial creation request. */
get: operations["invocation_as_request_api_invocations__invocation_id__request_get"];
put?: never;
post?: never;
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/api/invocations/{invocation_id}/step_jobs_summary": {
parameters: {
query?: never;
Expand Down Expand Up @@ -12391,8 +12408,9 @@ export interface components {
*/
batch: boolean | null;
/**
* Dataset Map
* @description TODO
* Legacy Dataset Map
* @deprecated
* @description An older alternative to specifying inputs using database IDs, do not use this and use inputs instead
* @default {}
*/
ds_map: {
Expand All @@ -12415,12 +12433,12 @@ export interface components {
history_id?: string | null;
/**
* Inputs
* @description TODO
* @description Specify values for formal inputs to the workflow
*/
inputs?: Record<string, never> | null;
/**
* Inputs By
* @description How inputs maps to inputs (datasets/collections) to workflows steps.
* @description How the 'inputs' field maps its inputs (datasets/collections/step parameters) to workflows steps.
*/
inputs_by?: string | null;
/**
Expand All @@ -12447,35 +12465,35 @@ export interface components {
*/
no_add_to_history: boolean | null;
/**
* Parameters
* @description The raw parameters for the workflow invocation.
* Legacy Step Parameters
* @description Parameters specified per-step for the workflow invocation, this is legacy and you should generally use inputs and only specify the formal parameters of a workflow instead.
* @default {}
*/
parameters: Record<string, never> | null;
/**
* Parameters Normalized
* @description Indicates if parameters are already normalized for workflow invocation.
* Legacy Step Parameters Normalized
* @description Indicates if legacy parameters are already normalized to be indexed by the order_index and are specified as a dictionary per step. Legacy-style parameters could previously be specified as one parameter per step or by tool ID.
* @default false
*/
parameters_normalized: boolean | null;
/**
* Preferred Intermediate Object Store ID
* @description The ID of the ? object store that should be used to store ? datasets in this history.
* @description The ID of the object store that should be used to store the intermediate datasets of this workflow - - Galaxy's job configuration may override this in some cases but this workflow preference will override tool and user preferences
*/
preferred_intermediate_object_store_id?: string | null;
/**
* Preferred Object Store ID
* @description The ID of the object store that should be used to store new datasets in this history.
* @description The ID of the object store that should be used to store all datasets (can instead specify object store IDs for intermediate and outputs datasts separately) - - Galaxy's job configuration may override this in some cases but this workflow preference will override tool and user preferences
*/
preferred_object_store_id?: string | null;
/**
* Preferred Outputs Object Store ID
* @description The ID of the object store that should be used to store ? datasets in this history.
* @description The ID of the object store that should be used to store the marked output datasets of this workflow - Galaxy's job configuration may override this in some cases but this workflow preference will override tool and user preferences.
*/
preferred_outputs_object_store_id?: string | null;
/**
* Replacement Parameters
* @description TODO
* @description Class of parameters mostly used for string replacement in PJAs. In best practice workflows, these should be replaced with input parameters
* @default {}
*/
replacement_params: Record<string, never> | null;
Expand All @@ -12487,7 +12505,7 @@ export interface components {
require_exact_tool_versions: boolean | null;
/**
* Resource Parameters
* @description TODO
* @description If a workflow_resource_params_file file is defined and the target workflow is configured to consumer resource parameters, they can be specified with this parameter. See https://github.com/galaxyproject/galaxy/pull/4830 for more information.
* @default {}
*/
resource_params: Record<string, never> | null;
Expand All @@ -12496,11 +12514,6 @@ export interface components {
* @description Scheduler to use for workflow invocation.
*/
scheduler?: string | null;
/**
* Step Parameters
* @description TODO
*/
step_parameters?: Record<string, never> | null;
/**
* Use cached job
* @description Indicated whether to use a cached job for workflow invocation.
Expand Down Expand Up @@ -18194,6 +18207,86 @@ export interface components {
*/
workflow_id: string;
};
/**
* WorkflowInvocationRequestModel
* @description Model a workflow invocation request (InvokeWorkflowPayload) for an existing invocation.
*/
WorkflowInvocationRequestModel: {
/**
* History ID
* @description The encoded history id the workflow was run in.
*/
history_id: string;
/**
* Inputs
* @description Values for inputs
*/
inputs: Record<string, never>;
/**
* Inputs by
* @description How the 'inputs' field maps its inputs (datasets/collections/step parameters) to workflows steps.
*/
inputs_by: string;
/**
* Is instance
* @description This API yields a particular workflow instance, newer workflows belonging to the same storedworkflow may have different state.
* @default true
* @constant
* @enum {boolean}
*/
instance: true;
/**
* Legacy Step Parameters
* @description Parameters specified per-step for the workflow invocation, this is legacy and you should generally use inputs and only specify the formal parameters of a workflow instead. If these are set, the workflow was not executed in a best-practice fashion and we the resulting invocation request may not fully reflect the executed workflow state.
*/
parameters?: Record<string, never> | null;
/**
* Legacy Step Parameters Normalized
* @description Indicates if legacy parameters are already normalized to be indexed by the order_index and are specified as a dictionary per step. Legacy-style parameters could previously be specified as one parameter per step or by tool ID.
* @default true
* @constant
* @enum {boolean}
*/
parameters_normalized: true;
/**
* Preferred Intermediate Object Store ID
* @description The ID of the object store that should be used to store the intermediate datasets of this workflow - - Galaxy's job configuration may override this in some cases but this workflow preference will override tool and user preferences
*/
preferred_intermediate_object_store_id?: string | null;
/**
* Preferred Object Store ID
* @description The ID of the object store that should be used to store all datasets (can instead specify object store IDs for intermediate and outputs datasts separately) - - Galaxy's job configuration may override this in some cases but this workflow preference will override tool and user preferences
*/
preferred_object_store_id?: string | null;
/**
* Preferred Outputs Object Store ID
* @description The ID of the object store that should be used to store the marked output datasets of this workflow - Galaxy's job configuration may override this in some cases but this workflow preference will override tool and user preferences.
*/
preferred_outputs_object_store_id?: string | null;
/**
* Replacement Parameters
* @description Class of parameters mostly used for string replacement in PJAs. In best practice workflows, these should be replaced with input parameters
* @default {}
*/
replacement_params: Record<string, never> | null;
/**
* Resource Parameters
* @description If a workflow_resource_params_file file is defined and the target workflow is configured to consumer resource parameters, they can be specified with this parameter. See https://github.com/galaxyproject/galaxy/pull/4830 for more information.
* @default {}
*/
resource_params: Record<string, never> | null;
/**
* Use cached job
* @description Indicated whether to use a cached job for workflow invocation.
* @default false
*/
use_cached_job: boolean;
/**
* Workflow ID
* @description The encoded Workflow ID associated with the invocation.
*/
workflow_id: string;
};
/** WorkflowInvocationResponse */
WorkflowInvocationResponse:
| components["schemas"]["WorkflowInvocationElementView"]
Expand Down Expand Up @@ -27062,6 +27155,50 @@ export interface operations {
};
};
};
invocation_as_request_api_invocations__invocation_id__request_get: {
parameters: {
query?: never;
header?: {
/** @description The user ID that will be used to effectively make this API call. Only admins and designated users can make API calls on behalf of other users. */
"run-as"?: string | null;
};
path: {
/** @description The encoded database identifier of the Invocation. */
invocation_id: string;
};
cookie?: never;
};
requestBody?: never;
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["WorkflowInvocationRequestModel"];
};
};
/** @description Request Error */
"4XX": {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["MessageExceptionModel"];
};
};
/** @description Server Error */
"5XX": {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["MessageExceptionModel"];
};
};
};
};
invocation_step_jobs_summary_api_invocations__invocation_id__step_jobs_summary_get: {
parameters: {
query?: never;
Expand Down
75 changes: 71 additions & 4 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7749,6 +7749,16 @@ def get_internal_version(self, version):
raise galaxy.exceptions.RequestParameterInvalidException("Version does not exist")
return list(reversed(self.workflows))[version]

def get_internal_version_by_id(self, workflow_instance_id: int):
sa_session = object_session(self)
assert sa_session
workflow = sa_session.get(Workflow, workflow_instance_id)
if not workflow:
raise galaxy.exceptions.ObjectNotFound()
elif workflow.stored_workflow != self:
raise galaxy.exceptions.RequestParameterInvalidException()
return workflow

def version_of(self, workflow):
for version, workflow_instance in enumerate(reversed(self.workflows)):
if workflow_instance.id == workflow.id:
Expand Down Expand Up @@ -8015,7 +8025,7 @@ class WorkflowStep(Base, RepresentById, UsesCreateAndUpdateTime):
type: Mapped[Optional[str]] = mapped_column(String(64))
tool_id: Mapped[Optional[str]] = mapped_column(TEXT)
tool_version: Mapped[Optional[str]] = mapped_column(TEXT)
tool_inputs: Mapped[Optional[bytes]] = mapped_column(JSONType)
tool_inputs: Mapped[Optional[Dict[str, Any]]] = mapped_column(JSONType)
tool_errors: Mapped[Optional[bytes]] = mapped_column(JSONType)
position: Mapped[Optional[bytes]] = mapped_column(MutableJSONType)
config: Mapped[Optional[bytes]] = mapped_column(JSONType)
Expand Down Expand Up @@ -8085,11 +8095,13 @@ def init_on_load(self):
def tool_uuid(self):
return self.dynamic_tool and self.dynamic_tool.uuid

@property
def is_input_type(self) -> bool:
return bool(self.type and self.type in self.STEP_TYPE_TO_INPUT_TYPE)

@property
def input_type(self):
assert (
self.type and self.type in self.STEP_TYPE_TO_INPUT_TYPE
), "step.input_type can only be called on input step types"
assert self.is_input_type, "step.input_type can only be called on input step types"
return self.STEP_TYPE_TO_INPUT_TYPE[self.type]

@property
Expand Down Expand Up @@ -8310,6 +8322,17 @@ def log_str(self):
f"WorkflowStep[index={self.order_index},type={self.type},label={self.label},uuid={self.uuid},id={self.id}]"
)

@property
def effective_label(self) -> Optional[str]:
label = self.label
if label is not None:
return label
elif self.is_input_type:
tool_inputs = self.tool_inputs
if tool_inputs is not None:
return cast(Optional[str], tool_inputs.get("name"))
return None

def clear_module_extras(self):
# the module code adds random dynamic state to the step, this
# attempts to clear that.
Expand Down Expand Up @@ -9112,6 +9135,50 @@ def attach_step(request_to_content):
attach_step(request_to_content)
self.input_step_parameters.append(request_to_content)

def recover_inputs(self) -> Tuple[Dict[str, Any], str]:
inputs = {}
inputs_by = "name"

have_referenced_steps_by_order_index = False

def best_step_reference(workflow_step: "WorkflowStep") -> str:
label = workflow_step.effective_label
if label is not None:
return label
nonlocal have_referenced_steps_by_order_index
have_referenced_steps_by_order_index = True
return str(workflow_step.order_index)

def ensure_step(step: Optional["WorkflowStep"]) -> "WorkflowStep":
if step is None:
raise galaxy.exceptions.InconsistentDatabase(
"workflow input found without step definition, this should not happen"
)
assert step
return step

for input_dataset_assoc in self.input_datasets:
workflow_step = ensure_step(input_dataset_assoc.workflow_step)
input_dataset = input_dataset_assoc.dataset
input_index = best_step_reference(workflow_step)
inputs[input_index] = input_dataset

for input_dataset_collection_assoc in self.input_dataset_collections:
workflow_step = ensure_step(input_dataset_collection_assoc.workflow_step)
input_dataset_collection = input_dataset_collection_assoc.dataset_collection
input_index = best_step_reference(workflow_step)
inputs[input_index] = input_dataset_collection

for input_step_parameter_assoc in self.input_step_parameters:
workflow_step = ensure_step(input_step_parameter_assoc.workflow_step)
value = input_step_parameter_assoc.parameter_value
input_index = best_step_reference(workflow_step)
inputs[input_index] = value

if have_referenced_steps_by_order_index:
inputs_by = "name|step_index"
return inputs, inputs_by

def add_message(self, message: "InvocationMessageUnion"):
self.messages.append(
WorkflowInvocationMessage( # type:ignore[abstract]
Expand Down
Loading

0 comments on commit 1ebd078

Please sign in to comment.