Skip to content

Commit

Permalink
Merge pull request #19077 from davelopez/allow_deferred_as_uri
Browse files Browse the repository at this point in the history
Allow deferred datasets to behave as URIs
  • Loading branch information
bgruening authored Nov 4, 2024
2 parents 3ff3843 + 2bdcec0 commit a5aaa1c
Show file tree
Hide file tree
Showing 9 changed files with 178 additions and 1 deletion.
8 changes: 8 additions & 0 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2737,6 +2737,7 @@ class FakeDatasetAssociation:
def __init__(self, dataset: Optional["Dataset"] = None) -> None:
self.dataset = dataset
self.metadata: Dict = {}
self.has_deferred_data = False

def get_file_name(self, sync_cache: bool = True) -> str:
assert self.dataset
Expand Down Expand Up @@ -4685,6 +4686,13 @@ def ext(self):
def has_deferred_data(self):
return self.dataset.state == Dataset.states.DEFERRED

@property
def deferred_source_uri(self):
if self.has_deferred_data and self.sources:
# Assuming the first source is the deferred source
return self.sources[0].source_uri
return None

@property
def state(self):
# self._state holds state that should only affect this particular dataset association, not the dataset state itself
Expand Down
41 changes: 41 additions & 0 deletions lib/galaxy/tool_util/xsd/galaxy.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -4316,6 +4316,47 @@ template if the parameter is ``false`` or not checked by the user. Used only
when the ``type`` attribute value is ``boolean``.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="allow_uri_if_protocol" type="xs:string">
<xs:annotation>
<xs:documentation xml:lang="en"><![CDATA[
When using `deferred` datasets, Galaxy will try to materialize them into files when
running the tool. In case we don't want to download them, the `allow_uri_if_protocol` attribute
can be used to avoid materialization and pass the URI as is to the tool.
This is useful when the tool can handle the URI directly. Example:
```xml
<param name="input" type="data" format="txt" allow_uri_if_protocol="http,https,s3" />
```
You can specify multiple prefixes separated by comma or use the wildcard '*' to always
treat deferred datasets as URIs. The source URI will be passed to the tool as is.
This attribute is only valid for `data` parameters.
### Handling the input in the tool
Since the input can be a regular file or a URI, the tool should be able to handle both cases.
The tool should check if the input ``is_deferred`` and if so, treat it as a URI, otherwise
it should treat it as a regular file. Please note that only deferred datasets with the specified
protocol will be passed as URIs, the rest will be materialized as files.
Here is an example command section that handles the above sample input:
```python
<command>
## We should handle the case where the input must be treated as a URI with a specific protocol.
#if $input.is_deferred:
## Here, the input is a deferred dataset which source URI has the protocol 'https', 'http' or 's3'.
echo '$input' > '$output' ## The ouput will be the source URI of the input.
#else:
## Here, the input is a regular dataset or a materialized dataset in case of a
## deferred dataset which source URI has a protocol different than 'https', 'http' or 's3'.
cp '$input' '$output' ## The output will be a copy of the input content.
</command>
```
]]></xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="size" type="xs:string" gxdocs:deprecated="true">
<!-- TODO: can be integer or integerxinteger -->
<xs:annotation>
Expand Down
16 changes: 15 additions & 1 deletion lib/galaxy/tools/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,8 @@ def _deferred_objects(
deferred_objects: Dict[str, DeferrableObjectsT] = {}
for key, value in input_datasets.items():
if value is not None and value.state == model.Dataset.states.DEFERRED:
deferred_objects[key] = value
if self._should_materialize_deferred_input(key, value):
deferred_objects[key] = value

def find_deferred_collections(input, value, context, prefixed_name=None, **kwargs):
if (
Expand All @@ -327,6 +328,19 @@ def find_deferred_collections(input, value, context, prefixed_name=None, **kwarg

return deferred_objects

def _should_materialize_deferred_input(self, input_name: str, input_value: DeferrableObjectsT) -> bool:
"""
We can skip materializing some deferred datasets if the input can work with URIs that are prefixed
with a known prefix set in `allow_uri_if_protocol`.
"""
deferred_input = self.tool.inputs.get(input_name)
if isinstance(deferred_input, DataToolParameter) and isinstance(input_value, model.DatasetInstance):
source_uri = input_value.deferred_source_uri or ""
for prefix in deferred_input.allow_uri_if_protocol:
if prefix == "*" or source_uri.startswith(prefix):
return False
return True

def __walk_inputs(self, inputs, input_values, func):
def do_walk(inputs, input_values):
"""
Expand Down
7 changes: 7 additions & 0 deletions lib/galaxy/tools/parameters/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2068,6 +2068,7 @@ def __init__(self, tool, input_source, trans=None):
self.tag = tag
self.is_dynamic = True
self._parse_options(input_source)
self._parse_allow_uri_if_protocol(input_source)
# Load conversions required for the dataset input
self.conversions = []
self.default_object = input_source.parse_default()
Expand All @@ -2089,6 +2090,12 @@ def __init__(self, tool, input_source, trans=None):
)
self.conversions.append((name, conv_extension, [conv_type]))

def _parse_allow_uri_if_protocol(self, input_source):
# In case of deferred datasets, if the source URI is prefixed with one of the values in this list,
# the dataset will behave as an URI and will not be materialized into a file path.
allow_uri_if_protocol = input_source.get("allow_uri_if_protocol", None)
self.allow_uri_if_protocol = allow_uri_if_protocol.split(",") if allow_uri_if_protocol else []

def from_json(self, value, trans, other_values=None):
session = trans.sa_session

Expand Down
9 changes: 9 additions & 0 deletions lib/galaxy/tools/wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,10 @@ def serialize(self, invalid_chars: Sequence[str] = ("/",)) -> Dict[str, Any]:
def is_collection(self) -> bool:
return False

@property
def is_deferred(self) -> bool:
return self.unsanitized.has_deferred_data

def is_of_type(self, *exts: str) -> bool:
datatypes = []
if not self.datatypes_registry:
Expand All @@ -471,6 +475,11 @@ def is_of_type(self, *exts: str) -> bool:
return self.dataset.datatype.matches_any(datatypes)

def __str__(self) -> str:
return self._path_or_uri()

def _path_or_uri(self) -> str:
if self.is_deferred:
return self.unsanitized.deferred_source_uri or ""
if self.false_path is not None:
return self.false_path
else:
Expand Down
65 changes: 65 additions & 0 deletions lib/galaxy_test/api/test_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -2467,6 +2467,71 @@ def test_metadata_validator_can_fail_on_deferred_input(self, history_id):
job_details = self.dataset_populator.get_job_details(job_id=job_id).json()
assert job_details["state"] == "failed"

@skip_without_tool("gx_allow_uri_if_protocol")
def test_allow_uri_if_protocol_on_deferred_input(self, history_id):
source_uri = "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/simple_line.txt"
deferred_hda = self.dataset_populator.create_deferred_hda(history_id, source_uri, ext="txt")

inputs = {"input1": dataset_to_param(deferred_hda)}
# The tool just returns the URI (or file path if it was materialized) as the output content
run_response = self.dataset_populator.run_tool(
tool_id="gx_allow_uri_if_protocol", inputs=inputs, history_id=history_id
)
output = run_response["outputs"][0]
output_details = self.dataset_populator.get_history_dataset_details(
history_id, dataset=output, wait=True, assert_ok=True
)
assert output_details["state"] == "ok"
output_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output)
assert output_content.strip() == source_uri.strip()

@skip_without_tool("gx_allow_uri_if_protocol")
def test_allow_uri_if_protocol_on_collection_with_deferred(self, history_id):
source_uris = [
"https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/simple_line.txt",
"https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/simple_line_alternative.txt",
]
elements = [
{
"src": "url",
"url": source_uri,
"deferred": True,
"ext": "txt",
}
for source_uri in source_uris
]
targets = [
{
"destination": {"type": "hdca"},
"elements": elements,
"collection_type": "list",
"name": "deferred list",
}
]
payload = {
"history_id": history_id,
"targets": json.dumps(targets),
}
fetch_response = self.dataset_populator.fetch(payload, wait=True)
dataset_collection = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response)
hdca_id = dataset_collection["id"]
inputs = {
"input1": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]},
}
run_response = self.dataset_populator.run_tool(
tool_id="gx_allow_uri_if_protocol", inputs=inputs, history_id=history_id
)
hdca_id = run_response["implicit_collections"][0]["id"]
dataset_collection = self.dataset_populator.get_history_collection_details(history_id, id=hdca_id)
elements = dataset_collection["elements"]
assert len(elements) == 2
for element in elements:
object = element["object"]
assert isinstance(object, dict)
assert object["state"] == "ok"
output_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=object)
assert output_content.strip() in source_uris

@skip_without_tool("cat1")
def test_run_deferred_mapping(self, history_id: str):
elements = [
Expand Down
23 changes: 23 additions & 0 deletions test/functional/tools/parameters/gx_allow_uri_if_protocol.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<tool id="gx_allow_uri_if_protocol" name="gx_allow_uri_if_protocol" version="1.0.0">
<command><![CDATA[
#for $input in $input1:
## We should handle the case where the input must be treated as a URI with a specific protocol.
#if $input.is_deferred:
## Here, the input is a deferred dataset which source URI has the protocol 'https'.
## We append the URI to the output file.
echo '$input' >> '$output'
#else:
## Here, the input is a regular dataset or a materialized dataset in case of a
## deferred dataset which source URI has a protocol different than 'https'.
## We append the content of the dataset to the output file.
cat '$input' >> '$output'
#end if
#end for
]]></command>
<inputs>
<param name="input1" type="data" allow_uri_if_protocol="https" multiple="true"/>
</inputs>
<outputs>
<data name="output" format="txt"/>
</outputs>
</tool>
9 changes: 9 additions & 0 deletions test/unit/app/tools/test_parameter_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,3 +452,12 @@ def test_tool_collection(self):
)
assert param.type == "data_collection"
assert param.collection_types == ["list", "list:paired"]

def test_data_allow_uri_if_protocol(self):
param = self._parameter_for(
xml="""
<param name="deferred" type="data" allow_uri_if_protocol="https,s3">
</param>
"""
)
assert param.allow_uri_if_protocol == ["https", "s3"]
1 change: 1 addition & 0 deletions test/unit/app/tools/test_wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ def __init__(self):
self.extra_files_path = MOCK_DATASET_EXTRA_FILES_PATH
self.ext = MOCK_DATASET_EXT
self.tags = []
self.has_deferred_data = False

def get_file_name(self, sync_cache=True):
return MOCK_DATASET_PATH
Expand Down

0 comments on commit a5aaa1c

Please sign in to comment.