Skip to content

Commit

Permalink
Add workflow extraction
Browse files Browse the repository at this point in the history
  • Loading branch information
bebatut committed Jul 15, 2024
1 parent 2c5cde7 commit 2430426
Show file tree
Hide file tree
Showing 3 changed files with 262 additions and 11 deletions.
234 changes: 234 additions & 0 deletions bin/extract_galaxy_workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
#!/usr/bin/env python

import argparse
from datetime import (
date,
datetime,
)
from typing import (
Dict,
List,
)
from pathlib import Path

import pandas as pd
import requests
import shared


class Workflow():
"""
Class for workflow
"""

def __init__(self, wf: dict, source: str, tools: dict):
if source == "json":
self.source=wf.source
self.id=wf.id
self.server=wf.server
self.link=wf.link
self.name=wf.name
self.creators=wf.creators
self.tags=wf.tags
self.update_time=wf.update_time
self.latest_version=wf.latest_version
self.versions=wf.versions
self.number_of_steps=wf.number_of_steps
self.tools=wf.tools
self.edam_operation=wf.edam_operation
else:
self.source=source
if self.source == "WorkflowHub":
self.id = wf["data"]["id"]
self.server = "https://workflowhub.eu"
self.link = f"https://workflowhub.eu{wf["data"]["links"]["self"]}"
self.name = wf["data"]["attributes"]["title"]
self.tags = wf["data"]["attributes"]["tags"]
self.update_time = wf["data"]["attributes"]["updated_at"]
self.latest_version = wf["data"]["attributes"]["latest_version"]
self.versions = len(wf["data"]["attributes"]["versions"])
self.number_of_steps = len(wf["data"]["attributes"]["internals"]["steps"])
elif self.source == "server":
self.id = wf["id"]
self.server = wf["server"]
self.link = f"{ wf["server"] }/published/workflow?id={ wf["id"] }"
self.name = wf["name"]
self.add_creators(wf)
self.number_of_steps = wf["number_of_steps"] if "number_of_steps" in wf else len(wf["steps"].keys())
self.tags = wf["tags"]
self.update_time = wf["update_time"]
self.latest_version = wf["version"]
self.versions = wf["version"]
else:
raise ValueError(f"Incorrect source ({ self.source }) for workflow")

self.add_creators(wf)
self.add_tools(wf)
self.edam_operation = shared.get_edam_operation_from_tools(self.tools, tools)

def add_creators(self, wf: dict) -> None:
"""
Get workflow creators
"""
self.creators = []
if self.source == "WorkflowHub":
creator = wf["data"]["attributes"]["creators"]
if len(creator)==0:
other = wf["data"]["attributes"]["other_creators"]
if other and len(other) > 0:
self.creators.extend(wf["data"]["attributes"]["other_creators"].split(","))
else:
self.creators.extend(f"{creator0["given_name"]} {creator0["family_name"]}")
else:
if "creator" in wf and wf["creator"] is not None:
for c in wf["creator"]:
self.creators.append(c["name"])

def add_tools(self, wf: list):
"""
Extract list of tool ids from workflow
"""
self.tools = set()
if self.source == "WorkflowHub":
for tool in wf["data"]["attributes"]["internals"]["steps"]:
self.tools.add(shared.shorten_tool_id(tool["description"]))
else:
for step in wf["steps"].values():
if "tool_id" in step and step["tool_id"] is not None:
self.tools.add(shared.shorten_tool_id(step["tool_id"]))
self.tools = list(self.tools)

def __getitem__(self, item):
return self.__dict__[item]


class Workflows():
"""
Class Workflows
"""

def __init__(self, tool_fp: str = "", test: bool = False, wfs: list = []) -> None:
self.workflows = []
self.tools = []
self.test = test
if tool_fp != "":
self.tools = shared.read_suite_per_tool_id(tool_fp)
self.add_workflows_from_workflowhub()
self.add_workflows_from_public_servers()
elif len(wfs) > 1:
for wf in wfs:
self.workflows = Workflow(wf=wf, source="json")

def add_workflows_from_workflowhub(self) -> None:
"""
Add workflows from WorkflowHub
"""
header = {"Accept": "application/json"}
wfhub_wfs = shared.get_request_json(
"https://workflowhub.eu/workflows?filter[workflow_type]=galaxy",
header,
)
print(f"Workflows from WorkflowHub: {len(wfhub_wfs["data"])}")
data = wfhub_wfs["data"]
if self.test:
data = data[:1]
for wf in data:
wfhub_wf = shared.get_request_json(
f"https://workflowhub.eu{wf["links"]["self"]}",
header,
)
self.workflows.append(Workflow(wf=wfhub_wf, source="WorkflowHub", tools=self.tools))

def add_workflows_from_a_server(self, server: str) -> None:
"""
Extract public workflows from a server
"""
header = {"Accept": "application/json"}
server_wfs = shared.get_request_json(
f"{server}/api/workflows/",
header,
)
count = 0
for wf in server_wfs:
if wf['published'] and wf['importable'] and not wf['deleted'] and not wf['hidden']:
count += 1
server_wf = shared.get_request_json(
f"{server}/api/workflows/{wf["id"]}",
header,
)
server_wf["server"] = server
self.workflows.append(Workflow(wf=server_wf, source="server", tools=self.tools))
print(f"Workflows from {server}: {count}")

def add_workflows_from_public_servers(self) -> None:
"""
Extract workflows from public servers
"""
server_df = pd.read_csv(Path("data", "available_public_servers.csv"), sep="\t", index_col=0)
server_urls = server_df["url"]
if self.test:
server_urls = server_urls[:2]
for url in server_urls:
print(url)
self.add_workflows_from_a_server(url)



if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Extract Galaxy Workflows from WorkflowHub and public servers"
)
subparser = parser.add_subparsers(dest="command")

# Extract Workflows
extract = subparser.add_parser("extract", help="Extract all workflows")
extract.add_argument(
"--all", "-o", required=True, help="Filepath to JSON with all extracted workflows"
)
extract.add_argument(
"--tools",
"-t",
required=True,
help="Filepath to JSON with all extracted tools, generated by extractools command",
)
extract.add_argument(
"--test",
action="store_true",
default=False,
required=False,
help="Run a small test case only on one topic",
)

# Filter workflows
filterwf = subparser.add_parser("filter", help="Filter workflows based on their tags")
filterwf.add_argument(
"--all",
"-a",
required=True,
help="Filepath to JSON with all extracted workflows, generated by extract command",
)
filterwf.add_argument(
"--filtered",
"-f",
required=True,
help="Filepath to TSV with filtered tutorials",
)
filterwf.add_argument(
"--tags",
"-c",
help="Path to a file with tags to keep in the extraction (one per line)",
)

args = parser.parse_args()

if args.command == "extract":
wfs = Workflows(tool_fp=args.tools, test=args.test)
shared.export_to_json(wfs.workflows, args.all)

elif args.command == "filter":
wfs = Workflows(wfs=shared.load_json(args.all))
# get categories and training to exclude
tags = shared.read_file(args.tags)
# filter training lists
#filtered_tutorials = filter_tutorials(all_tutorials, tags)
#export_tutorials_to_tsv(filtered_tutorials, args.filtered_tutorials)
13 changes: 2 additions & 11 deletions bin/extract_gtn_tutorials.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ def get_short_tool_ids(tuto: dict) -> None:
tuto["short_tools"] = set()
if "tools" in tuto:
for tool in tuto["tools"]:
if "toolshed" in tool:
tuto["short_tools"].add(tool.split("/")[-2])
else:
tuto["short_tools"].add(tool)
tuto["short_tools"].add(shared.shorten_tool_id(tool))
tuto["short_tools"] = list(tuto["short_tools"])


Expand All @@ -55,13 +52,7 @@ def get_edam_operations(tuto: dict, tools: dict) -> None:
"""
tuto["edam_operation"] = []
if "short_tools" in tuto:
edam_operation = set()
for t in tuto["short_tools"]:
if t in tools:
edam_operation.update(set(tools[t]["EDAM operation"]))
else:
print(f"{t} not found in all tools")
tuto["edam_operation"] = list(edam_operation)
tuto["edam_operation"] = shared.get_edam_operation_from_tools(tuto["short_tools"], tools)


def get_feedback(tuto: dict, feedback: dict) -> None:
Expand Down
26 changes: 26 additions & 0 deletions bin/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,29 @@ def get_request_json(url: str, headers: dict) -> dict:

def format_date(date: str) -> str:
return datetime.fromisoformat(date).strftime("%Y-%m-%d")


def shorten_tool_id(tool: str) -> str:
"""
Shorten tool id
"""
if "toolshed" in tool:
return tool.split("/")[-2]
else:
return tool


def get_edam_operation_from_tools(selected_tools: list, all_tools: dict) -> List:
"""
Get list of EDAM operations of the tools
:param selected_tools: list of tool suite ids
:param all_tools: dictionary with information about all tools
"""
edam_operation = set()
for t in selected_tools:
if t in all_tools:
edam_operation.update(set(all_tools[t]["EDAM operation"]))
else:
print(f"{t} not found in all tools")
return list(edam_operation)

0 comments on commit 2430426

Please sign in to comment.