diff --git a/lib/galaxy/app.py b/lib/galaxy/app.py index fcbd1a1b7cb0..285e2eed8a1c 100644 --- a/lib/galaxy/app.py +++ b/lib/galaxy/app.py @@ -666,6 +666,7 @@ def __init__(self, configure_logging=True, use_converters=True, use_display_appl InstalledRepositoryManager, InstalledRepositoryManager(self) ) self.dynamic_tool_manager = self._register_singleton(DynamicToolManager) + self.trs_proxy = self._register_singleton(TrsProxy, TrsProxy(self.config)) self._configure_datatypes_registry( use_converters=use_converters, use_display_applications=use_display_applications, @@ -843,7 +844,6 @@ def __init__(self, **kwargs) -> None: # Must be initialized after job_config. self.workflow_scheduling_manager = scheduling_manager.WorkflowSchedulingManager(self) - self.trs_proxy = self._register_singleton(TrsProxy, TrsProxy(self.config)) # We need InteractiveToolManager before the job handler starts self.interactivetool_manager = InteractiveToolManager(self) # Start the job manager diff --git a/lib/galaxy/managers/landing.py b/lib/galaxy/managers/landing.py index 9facbac703c6..71b2fa4e7407 100644 --- a/lib/galaxy/managers/landing.py +++ b/lib/galaxy/managers/landing.py @@ -15,10 +15,7 @@ ObjectNotFound, RequestParameterMissingException, ) -from galaxy.managers.workflows import ( - WorkflowContentsManager, - WorkflowCreateOptions, -) +from galaxy.managers.workflows import WorkflowContentsManager from galaxy.model import ( ToolLandingRequest as ToolLandingRequestModel, WorkflowLandingRequest as WorkflowLandingRequestModel, @@ -106,21 +103,9 @@ def _ensure_workflow(self, trans: ProvidesUserContext, request: WorkflowLandingR if request.workflow_source_type == "trs_url" and isinstance(trans.app, StructuredApp): # trans is always structured app except for unit test assert request.workflow_source - trs_id, trs_version = request.workflow_source.rsplit("/", 1) - _, trs_id, trs_version = trans.app.trs_proxy.get_trs_id_and_version_from_trs_url(request.workflow_source) - workflow = self.workflow_contents_manager.get_workflow_by_trs_id_and_version( - self.sa_session, trs_id=trs_id, trs_version=trs_version, user_id=trans.user and trans.user.id + workflow = self.workflow_contents_manager.get_or_create_workflow_from_trs( + trans, trs_url=request.workflow_source ) - if not workflow: - data = trans.app.trs_proxy.get_version_from_trs_url(request.workflow_source) - as_dict = yaml.safe_load(data) - raw_workflow_description = self.workflow_contents_manager.normalize_workflow_format(trans, as_dict) - created_workflow = self.workflow_contents_manager.build_workflow_from_raw_description( - trans, - raw_workflow_description, - WorkflowCreateOptions(), - ) - workflow = created_workflow.workflow request.workflow_id = workflow.id def get_tool_landing_request(self, trans: ProvidesUserContext, uuid: UUID4) -> ToolLandingRequest: diff --git a/lib/galaxy/managers/workflows.py b/lib/galaxy/managers/workflows.py index d69e8afcab70..22a7ddd6081b 100644 --- a/lib/galaxy/managers/workflows.py +++ b/lib/galaxy/managers/workflows.py @@ -139,6 +139,7 @@ attach_ordered_steps, has_cycles, ) +from galaxy.workflow.trs_proxy import TrsProxy log = logging.getLogger(__name__) @@ -598,8 +599,10 @@ def add_serializers(self): class WorkflowContentsManager(UsesAnnotations): - def __init__(self, app: MinimalManagerApp): + + def __init__(self, app: MinimalManagerApp, trs_proxy: TrsProxy): self.app = app + self.trs_proxy = trs_proxy self._resource_mapper_function = get_resource_mapper_function(app) def ensure_raw_description(self, dict_or_raw_description): @@ -2016,9 +2019,57 @@ def get_all_tools(self, workflow): tools.extend(self.get_all_tools(step.subworkflow)) return tools + def get_or_create_workflow_from_trs( + self, + trans: ProvidesUserContext, + trs_url: Optional[str], + trs_id: Optional[str] = None, + trs_version: Optional[str] = None, + trs_server: Optional[str] = None, + ): + user_id = trans.user and trans.user.id + assert user_id, "Cannot create workflow for anonymous user" + if not trs_url: + assert trs_server and trs_id and trs_version, "trs_url or trs_server, trs_version and trs_id must be passed" + server = self.trs_proxy.get_server(trs_server) + trs_url = self.trs_proxy.get_trs_url(server._trs_url, trs_id, trs_version) + else: + _, trs_id, trs_version = self.trs_proxy.get_trs_id_and_version_from_trs_url(trs_url=trs_url) + assert trs_id and trs_version and trs_server + + workflow = self.get_workflow_by_trs_id_and_version(trs_id=trs_id, trs_version=trs_version, user_id=user_id) + if not workflow: + workflow = self.create_workflow_from_trs_url(trans, trs_url) + return workflow + + def create_workflow_from_trs_url( + self, + trans: ProvidesUserContext, + trs_url: str, + ) -> Workflow: + server, trs_tool_id, trs_version_id = self.trs_proxy.get_trs_id_and_version_from_trs_url(trs_url=trs_url) + data = self.trs_proxy.get_version_from_trs_url(trs_url) + as_dict = yaml.safe_load(data) + raw_workflow_description = self.normalize_workflow_format(trans, as_dict) + created_workflow = self.build_workflow_from_raw_description( + trans, + raw_workflow_description, + WorkflowCreateOptions(), + ) + workflow = created_workflow.workflow + workflow.source_metadata = { + "trs_tool_id": trs_tool_id, + "trs_version_id": trs_version_id, + "trs_url": trs_url, + "trs_server": server._trs_url, + } + return workflow + def get_workflow_by_trs_id_and_version( - self, sa_session: galaxy_scoped_session, trs_id: str, trs_version: str, user_id: Optional[int] = None + self, trs_id: str, trs_version: str, user_id: Optional[int] = None ) -> Optional[model.Workflow]: + sa_session = self.app.model.session + def to_json(column, keys: List[str]): assert sa_session.bind if sa_session.bind.dialect.name == "postgresql": diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index e97cf0438c2a..ffa93ecc7225 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -7817,7 +7817,7 @@ class Workflow(Base, Dictifiable, RepresentById): reports_config: Mapped[Optional[bytes]] = mapped_column(JSONType) creator_metadata: Mapped[Optional[bytes]] = mapped_column(JSONType) license: Mapped[Optional[str]] = mapped_column(TEXT) - source_metadata: Mapped[Optional[bytes]] = mapped_column(JSONType) + source_metadata: Mapped[Optional[Dict[str, str]]] = mapped_column(JSONType) uuid: Mapped[Optional[Union[UUID, str]]] = mapped_column(UUIDType) steps = relationship( diff --git a/lib/galaxy/webapps/galaxy/api/workflows.py b/lib/galaxy/webapps/galaxy/api/workflows.py index 6f94550e2134..0fea434c34eb 100644 --- a/lib/galaxy/webapps/galaxy/api/workflows.py +++ b/lib/galaxy/webapps/galaxy/api/workflows.py @@ -245,29 +245,14 @@ def create(self, trans: GalaxyWebTransaction, payload=None, **kwd): payload["workflow"] = workflow_src return self.__api_import_new_workflow(trans, payload, **kwd) elif archive_source == "trs_tool": - server = None - trs_tool_id = None - trs_version_id = None - import_source = None - if "trs_url" in payload: - parts = self.app.trs_proxy.match_url( - payload["trs_url"], trans.app.config.fetch_url_allowlist_ips - ) - if parts: - server = self.app.trs_proxy.server_from_url(parts["trs_base_url"]) - trs_tool_id = parts["tool_id"] - trs_version_id = parts["version_id"] - payload["trs_tool_id"] = trs_tool_id - payload["trs_version_id"] = trs_version_id - else: - raise exceptions.RequestParameterInvalidException(f"Invalid TRS URL {payload['trs_url']}.") - else: - trs_server = payload.get("trs_server") - server = self.app.trs_proxy.get_server(trs_server) - trs_tool_id = payload.get("trs_tool_id") - trs_version_id = payload.get("trs_version_id") - - archive_data = server.get_version_descriptor(trs_tool_id, trs_version_id) + workflow = self.workflow_contents_manager.get_or_create_workflow_from_trs( + trans, + trs_url=payload.get("trs_url"), + trs_id=payload.get("trs_tool_id"), + trs_version=payload.get("trs_version_id"), + trs_server=payload.get("trs_server"), + ) + return self.__api_import_response(workflow) else: try: archive_data = stream_url_to_str( @@ -602,13 +587,14 @@ def __api_import_from_archive(self, trans: GalaxyWebTransaction, archive_data, s workflow, missing_tool_tups = self._workflow_from_dict( trans, raw_workflow_description, workflow_create_options, source=source ) - workflow_id = workflow.id workflow = workflow.latest_workflow + return self.__api_import_response(workflow) + def __api_import_response(self, workflow: model.Workflow): response = { "message": f"Workflow '{workflow.name}' imported successfully.", "status": "success", - "id": trans.security.encode_id(workflow_id), + "id": self.app.security.encode_id(workflow.id), } if workflow.has_errors: response["message"] = "Imported, but some steps in this workflow have validation errors." diff --git a/lib/galaxy/workflow/trs_proxy.py b/lib/galaxy/workflow/trs_proxy.py index 550238ffa229..388c26b2f26c 100644 --- a/lib/galaxy/workflow/trs_proxy.py +++ b/lib/galaxy/workflow/trs_proxy.py @@ -81,6 +81,9 @@ def get_server(self, trs_server): def server_from_url(self, trs_url): return TrsServer(trs_url) + def get_trs_url(self, trs_base_url: str, tool_id: str, version_id: str): + return urllib.parse.urljoin(trs_base_url, f"{tool_id}/{version_id}") + def get_trs_id_and_version_from_trs_url(self, trs_url): parts = self.match_url(trs_url, self.fetch_url_allowlist_ips) if parts: