From 2430426e7eee4dce399a2ff35c46846579fda23c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C3=A9r=C3=A9nice=20Batut?= Date: Fri, 12 Jul 2024 16:18:30 +0200 Subject: [PATCH] Add workflow extraction --- bin/extract_galaxy_workflows.py | 234 ++++++++++++++++++++++++++++++++ bin/extract_gtn_tutorials.py | 13 +- bin/shared.py | 26 ++++ 3 files changed, 262 insertions(+), 11 deletions(-) create mode 100644 bin/extract_galaxy_workflows.py diff --git a/bin/extract_galaxy_workflows.py b/bin/extract_galaxy_workflows.py new file mode 100644 index 00000000..c78a01d6 --- /dev/null +++ b/bin/extract_galaxy_workflows.py @@ -0,0 +1,234 @@ +#!/usr/bin/env python + +import argparse +from datetime import ( + date, + datetime, +) +from typing import ( + Dict, + List, +) +from pathlib import Path + +import pandas as pd +import requests +import shared + + +class Workflow(): + """ + Class for workflow + """ + + def __init__(self, wf: dict, source: str, tools: dict): + if source == "json": + self.source=wf.source + self.id=wf.id + self.server=wf.server + self.link=wf.link + self.name=wf.name + self.creators=wf.creators + self.tags=wf.tags + self.update_time=wf.update_time + self.latest_version=wf.latest_version + self.versions=wf.versions + self.number_of_steps=wf.number_of_steps + self.tools=wf.tools + self.edam_operation=wf.edam_operation + else: + self.source=source + if self.source == "WorkflowHub": + self.id = wf["data"]["id"] + self.server = "https://workflowhub.eu" + self.link = f"https://workflowhub.eu{wf["data"]["links"]["self"]}" + self.name = wf["data"]["attributes"]["title"] + self.tags = wf["data"]["attributes"]["tags"] + self.update_time = wf["data"]["attributes"]["updated_at"] + self.latest_version = wf["data"]["attributes"]["latest_version"] + self.versions = len(wf["data"]["attributes"]["versions"]) + self.number_of_steps = len(wf["data"]["attributes"]["internals"]["steps"]) + elif self.source == "server": + self.id = wf["id"] + self.server = wf["server"] + self.link = f"{ wf["server"] }/published/workflow?id={ wf["id"] }" + self.name = wf["name"] + self.add_creators(wf) + self.number_of_steps = wf["number_of_steps"] if "number_of_steps" in wf else len(wf["steps"].keys()) + self.tags = wf["tags"] + self.update_time = wf["update_time"] + self.latest_version = wf["version"] + self.versions = wf["version"] + else: + raise ValueError(f"Incorrect source ({ self.source }) for workflow") + + self.add_creators(wf) + self.add_tools(wf) + self.edam_operation = shared.get_edam_operation_from_tools(self.tools, tools) + + def add_creators(self, wf: dict) -> None: + """ + Get workflow creators + """ + self.creators = [] + if self.source == "WorkflowHub": + creator = wf["data"]["attributes"]["creators"] + if len(creator)==0: + other = wf["data"]["attributes"]["other_creators"] + if other and len(other) > 0: + self.creators.extend(wf["data"]["attributes"]["other_creators"].split(",")) + else: + self.creators.extend(f"{creator0["given_name"]} {creator0["family_name"]}") + else: + if "creator" in wf and wf["creator"] is not None: + for c in wf["creator"]: + self.creators.append(c["name"]) + + def add_tools(self, wf: list): + """ + Extract list of tool ids from workflow + """ + self.tools = set() + if self.source == "WorkflowHub": + for tool in wf["data"]["attributes"]["internals"]["steps"]: + self.tools.add(shared.shorten_tool_id(tool["description"])) + else: + for step in wf["steps"].values(): + if "tool_id" in step and step["tool_id"] is not None: + self.tools.add(shared.shorten_tool_id(step["tool_id"])) + self.tools = list(self.tools) + + def __getitem__(self, item): + return self.__dict__[item] + + +class Workflows(): + """ + Class Workflows + """ + + def __init__(self, tool_fp: str = "", test: bool = False, wfs: list = []) -> None: + self.workflows = [] + self.tools = [] + self.test = test + if tool_fp != "": + self.tools = shared.read_suite_per_tool_id(tool_fp) + self.add_workflows_from_workflowhub() + self.add_workflows_from_public_servers() + elif len(wfs) > 1: + for wf in wfs: + self.workflows = Workflow(wf=wf, source="json") + + def add_workflows_from_workflowhub(self) -> None: + """ + Add workflows from WorkflowHub + """ + header = {"Accept": "application/json"} + wfhub_wfs = shared.get_request_json( + "https://workflowhub.eu/workflows?filter[workflow_type]=galaxy", + header, + ) + print(f"Workflows from WorkflowHub: {len(wfhub_wfs["data"])}") + data = wfhub_wfs["data"] + if self.test: + data = data[:1] + for wf in data: + wfhub_wf = shared.get_request_json( + f"https://workflowhub.eu{wf["links"]["self"]}", + header, + ) + self.workflows.append(Workflow(wf=wfhub_wf, source="WorkflowHub", tools=self.tools)) + + def add_workflows_from_a_server(self, server: str) -> None: + """ + Extract public workflows from a server + """ + header = {"Accept": "application/json"} + server_wfs = shared.get_request_json( + f"{server}/api/workflows/", + header, + ) + count = 0 + for wf in server_wfs: + if wf['published'] and wf['importable'] and not wf['deleted'] and not wf['hidden']: + count += 1 + server_wf = shared.get_request_json( + f"{server}/api/workflows/{wf["id"]}", + header, + ) + server_wf["server"] = server + self.workflows.append(Workflow(wf=server_wf, source="server", tools=self.tools)) + print(f"Workflows from {server}: {count}") + + def add_workflows_from_public_servers(self) -> None: + """ + Extract workflows from public servers + """ + server_df = pd.read_csv(Path("data", "available_public_servers.csv"), sep="\t", index_col=0) + server_urls = server_df["url"] + if self.test: + server_urls = server_urls[:2] + for url in server_urls: + print(url) + self.add_workflows_from_a_server(url) + + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Extract Galaxy Workflows from WorkflowHub and public servers" + ) + subparser = parser.add_subparsers(dest="command") + + # Extract Workflows + extract = subparser.add_parser("extract", help="Extract all workflows") + extract.add_argument( + "--all", "-o", required=True, help="Filepath to JSON with all extracted workflows" + ) + extract.add_argument( + "--tools", + "-t", + required=True, + help="Filepath to JSON with all extracted tools, generated by extractools command", + ) + extract.add_argument( + "--test", + action="store_true", + default=False, + required=False, + help="Run a small test case only on one topic", + ) + + # Filter workflows + filterwf = subparser.add_parser("filter", help="Filter workflows based on their tags") + filterwf.add_argument( + "--all", + "-a", + required=True, + help="Filepath to JSON with all extracted workflows, generated by extract command", + ) + filterwf.add_argument( + "--filtered", + "-f", + required=True, + help="Filepath to TSV with filtered tutorials", + ) + filterwf.add_argument( + "--tags", + "-c", + help="Path to a file with tags to keep in the extraction (one per line)", + ) + + args = parser.parse_args() + + if args.command == "extract": + wfs = Workflows(tool_fp=args.tools, test=args.test) + shared.export_to_json(wfs.workflows, args.all) + + elif args.command == "filter": + wfs = Workflows(wfs=shared.load_json(args.all)) + # get categories and training to exclude + tags = shared.read_file(args.tags) + # filter training lists + #filtered_tutorials = filter_tutorials(all_tutorials, tags) + #export_tutorials_to_tsv(filtered_tutorials, args.filtered_tutorials) diff --git a/bin/extract_gtn_tutorials.py b/bin/extract_gtn_tutorials.py index 5ddee89f..041b5cd9 100644 --- a/bin/extract_gtn_tutorials.py +++ b/bin/extract_gtn_tutorials.py @@ -31,10 +31,7 @@ def get_short_tool_ids(tuto: dict) -> None: tuto["short_tools"] = set() if "tools" in tuto: for tool in tuto["tools"]: - if "toolshed" in tool: - tuto["short_tools"].add(tool.split("/")[-2]) - else: - tuto["short_tools"].add(tool) + tuto["short_tools"].add(shared.shorten_tool_id(tool)) tuto["short_tools"] = list(tuto["short_tools"]) @@ -55,13 +52,7 @@ def get_edam_operations(tuto: dict, tools: dict) -> None: """ tuto["edam_operation"] = [] if "short_tools" in tuto: - edam_operation = set() - for t in tuto["short_tools"]: - if t in tools: - edam_operation.update(set(tools[t]["EDAM operation"])) - else: - print(f"{t} not found in all tools") - tuto["edam_operation"] = list(edam_operation) + tuto["edam_operation"] = shared.get_edam_operation_from_tools(tuto["short_tools"], tools) def get_feedback(tuto: dict, feedback: dict) -> None: diff --git a/bin/shared.py b/bin/shared.py index f0aefc46..ee22c7f3 100644 --- a/bin/shared.py +++ b/bin/shared.py @@ -82,3 +82,29 @@ def get_request_json(url: str, headers: dict) -> dict: def format_date(date: str) -> str: return datetime.fromisoformat(date).strftime("%Y-%m-%d") + + +def shorten_tool_id(tool: str) -> str: + """ + Shorten tool id + """ + if "toolshed" in tool: + return tool.split("/")[-2] + else: + return tool + + +def get_edam_operation_from_tools(selected_tools: list, all_tools: dict) -> List: + """ + Get list of EDAM operations of the tools + + :param selected_tools: list of tool suite ids + :param all_tools: dictionary with information about all tools + """ + edam_operation = set() + for t in selected_tools: + if t in all_tools: + edam_operation.update(set(all_tools[t]["EDAM operation"])) + else: + print(f"{t} not found in all tools") + return list(edam_operation)