Skip to content

Commit

Permalink
Calculate hash for new non-deferred datasets when finishing a job
Browse files Browse the repository at this point in the history
This is configurable with two new options:
- `calculate_dataset_hash`: in which cases Galaxy should calculate
  a hash for a new dataset. Possible values: 'always', 'upload'
  (the default), 'never'.
- `hash_function`. Possible values: 'md5', 'sha1', 'sha256', 'sha512'

Hashes are calculated via a Celery task, so currently only if
the 'enable_celery_tasks' option is set to true.
  • Loading branch information
nsoranzo committed Nov 21, 2024
1 parent aceaa11 commit d39db9b
Show file tree
Hide file tree
Showing 11 changed files with 172 additions and 51 deletions.
27 changes: 27 additions & 0 deletions doc/source/admin/galaxy_options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4719,6 +4719,33 @@
:Type: float


~~~~~~~~~~~~~~~~~~~~~~~~~~
``calculate_dataset_hash``
~~~~~~~~~~~~~~~~~~~~~~~~~~

:Description:
In which cases Galaxy should calculate a hash for a new dataset.
Dataset hashes can be used by the Galaxy job cache/search to check
if job inputs match. Setting the 'enable_celery_tasks' option to
true is also required for dataset hash calculation. Possible
values are: 'always', 'upload' (the default), 'never'. If set to
'upload', the hash is calculated only for the outputs of upload
jobs.
:Default: ``upload``
:Type: str


~~~~~~~~~~~~~~~~~
``hash_function``
~~~~~~~~~~~~~~~~~

:Description:
Hash function to use if 'calculate_dataset_hash' is enabled.
Possible values are: 'md5', 'sha1', 'sha256', 'sha512'
:Default: ``sha256``
:Type: str


~~~~~~~~~~~~~~~~~~~~~
``metadata_strategy``
~~~~~~~~~~~~~~~~~~~~~
Expand Down
9 changes: 9 additions & 0 deletions lib/galaxy/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from galaxy.util.custom_logging import LOGLV_TRACE
from galaxy.util.dynamic import HasDynamicProperties
from galaxy.util.facts import get_facts
from galaxy.util.hash_util import HashFunctionNameEnum
from galaxy.util.properties import (
read_properties_from_file,
running_from_source,
Expand Down Expand Up @@ -716,6 +717,7 @@ class GalaxyAppConfiguration(BaseAppConfiguration, CommonConfigurationMixin):
galaxy_data_manager_data_path: str
galaxy_infrastructure_url: str
hours_between_check: int
hash_function: HashFunctionNameEnum
integrated_tool_panel_config: str
involucro_path: str
len_file_path: str
Expand Down Expand Up @@ -897,6 +899,13 @@ def _process_config(self, kwargs: Dict[str, Any]) -> None:
self.update_integrated_tool_panel = kwargs.get("update_integrated_tool_panel", True)
self.galaxy_data_manager_data_path = self.galaxy_data_manager_data_path or self.tool_data_path
self.tool_secret = kwargs.get("tool_secret", "")
if self.calculate_dataset_hash not in ("always", "upload", "never"):
raise ConfigurationError(
f"Unrecognized value for calculate_dataset_hash option: {self.calculate_dataset_hash}"
)
if self.hash_function not in HashFunctionNameEnum.__members__:
raise ConfigurationError(f"Unrecognized value for hash_function option: {self.hash_function}")
self.hash_function = HashFunctionNameEnum[self.hash_function]
self.metadata_strategy = kwargs.get("metadata_strategy", "directory")
self.use_remote_user = self.use_remote_user or self.single_user
self.fetch_url_allowlist_ips = parse_allowlist_ips(listify(kwargs.get("fetch_url_allowlist")))
Expand Down
12 changes: 12 additions & 0 deletions lib/galaxy/config/sample/galaxy.yml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -2545,6 +2545,18 @@ galaxy:
# handler processes. Float values are allowed.
#workflow_monitor_sleep: 1.0

# In which cases Galaxy should calculate a hash for a new dataset.
# Dataset hashes can be used by the Galaxy job cache/search to check
# if job inputs match. Setting the 'enable_celery_tasks' option to
# true is also required for dataset hash calculation. Possible values
# are: 'always', 'upload' (the default), 'never'. If set to 'upload',
# the hash is calculated only for the outputs of upload jobs.
#calculate_dataset_hash: upload

# Hash function to use if 'calculate_dataset_hash' is enabled.
# Possible values are: 'md5', 'sha1', 'sha256', 'sha512'
#hash_function: sha256

# Determines how metadata will be set. Valid values are `directory`,
# `extended`, `directory_celery` and `extended_celery`. In extended
# mode jobs will decide if a tool run failed, the object stores
Expand Down
22 changes: 22 additions & 0 deletions lib/galaxy/config/schemas/config_schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2788,6 +2788,7 @@ mapping:
Avoiding making this a boolean because we may add options such as 'in-single-form-view'
or 'in-simplified-workflow-views'. https://github.com/galaxyproject/galaxy/pull/9809/files#r461889109
allow_user_dataset_purge:
type: bool
default: true
Expand Down Expand Up @@ -3454,6 +3455,26 @@ mapping:
decreased if extremely high job throughput is necessary, but doing so can increase CPU
usage of handler processes. Float values are allowed.
calculate_dataset_hash:
type: str
default: upload
required: false
enum: ['always', 'upload', 'never']
desc: |
In which cases Galaxy should calculate a hash for a new dataset.
Dataset hashes can be used by the Galaxy job cache/search to check if job inputs match.
Setting the 'enable_celery_tasks' option to true is also required for dataset hash calculation.
Possible values are: 'always', 'upload' (the default), 'never'. If set to 'upload', the
hash is calculated only for the outputs of upload jobs.
hash_function:
type: str
default: sha256
required: false
desc: |
Hash function to use if 'calculate_dataset_hash' is enabled. Possible values
are: 'md5', 'sha1', 'sha256', 'sha512'
metadata_strategy:
type: str
required: false
Expand Down Expand Up @@ -3547,6 +3568,7 @@ mapping:
default: always
required: false
reloadable: true
enum: ['always', 'onsuccess', 'never']
desc: |
Clean up various bits of jobs left on the filesystem after completion. These
bits include the job working directory, external metadata temporary files,
Expand Down
20 changes: 20 additions & 0 deletions lib/galaxy/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
ObjectStorePopulator,
serialize_static_object_store_config,
)
from galaxy.schema.tasks import ComputeDatasetHashTaskRequest
from galaxy.structured_app import MinimalManagerApp
from galaxy.tool_util.deps import requirements
from galaxy.tool_util.output_checker import (
Expand Down Expand Up @@ -2038,6 +2039,25 @@ def fail(message=job.info, exception=None):
dataset.full_delete()
collected_bytes = 0

# Calculate dataset hash
for dataset_assoc in output_dataset_associations:
dataset = dataset_assoc.dataset.dataset
if not dataset.purged and dataset.state != Dataset.states.DEFERRED and not dataset.hashes:
if self.app.config.calculate_dataset_hash == "always" or (
self.app.config.calculate_dataset_hash == "upload" and job.tool_id in ("upload1", "__DATA_FETCH__")
):
# Calculate dataset hash via a celery task
if self.app.config.enable_celery_tasks:
from galaxy.celery.tasks import compute_dataset_hash

extra_files_path = dataset.extra_files_path if dataset.extra_files_path_exists() else None
request = ComputeDatasetHashTaskRequest(
dataset_id=dataset.id,
extra_files_path=extra_files_path,
hash_function=self.app.config.hash_function,
)
compute_dataset_hash.delay(request=request)

user = job.user
if user and collected_bytes > 0 and quota_source_info is not None and quota_source_info.use:
user.adjust_total_disk_usage(collected_bytes, quota_source_info.label)
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4290,7 +4290,7 @@ def get_file_name(self, sync_cache: bool = True) -> str:
if not file_name and self.state not in (self.states.NEW, self.states.QUEUED):
# Queued datasets can be assigned an object store and have a filename, but they aren't guaranteed to.
# Anything after queued should have a file name.
log.warning(f"Failed to determine file name for dataset {self.id}")
log.warning(f"Failed to determine file name for dataset {self.id} in state {self.state}")
return file_name
else:
filename = self.external_filename
Expand Down
22 changes: 11 additions & 11 deletions lib/galaxy/model/deferred.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import shutil
from typing import (
cast,
List,
NamedTuple,
Optional,
Union,
Expand All @@ -26,7 +25,6 @@
Dataset,
DatasetCollection,
DatasetCollectionElement,
DatasetHash,
DatasetSource,
DescribesHash,
History,
Expand Down Expand Up @@ -142,7 +140,7 @@ def ensure_materialized(
sa_session.commit()
object_store_populator.set_dataset_object_store_id(materialized_dataset)
try:
path = self._stream_source(target_source, dataset_instance.datatype, materialized_dataset_hashes)
path = self._stream_source(target_source, dataset_instance.datatype, materialized_dataset)
object_store.update_from_file(materialized_dataset, file_name=path)
materialized_dataset.set_size()
except Exception as e:
Expand All @@ -152,9 +150,9 @@ def ensure_materialized(
assert transient_path_mapper
transient_paths = transient_path_mapper.transient_paths_for(dataset)
# TODO: optimize this by streaming right to this path...
# TODO: take into acount transform and ensure we are and are not modifying the file as appropriate.
# TODO: take into account transform and ensure we are and are not modifying the file as appropriate.
try:
path = self._stream_source(target_source, dataset_instance.datatype, materialized_dataset_hashes)
path = self._stream_source(target_source, dataset_instance.datatype, materialized_dataset)
shutil.move(path, transient_paths.external_filename)
materialized_dataset.external_filename = transient_paths.external_filename
except Exception as e:
Expand All @@ -178,9 +176,9 @@ def ensure_materialized(
materialized_dataset_instance = cast(HistoryDatasetAssociation, dataset_instance)
if exception_materializing is not None:
materialized_dataset.state = Dataset.states.ERROR
materialized_dataset_instance.info = (
f"Failed to materialize deferred dataset with exception: {exception_materializing}"
)
error_msg = f"Failed to materialize deferred dataset with exception: {exception_materializing}"
materialized_dataset_instance.info = error_msg
log.error(error_msg)
if attached:
sa_session = self._sa_session
if sa_session is None:
Expand All @@ -206,7 +204,7 @@ def ensure_materialized(
materialized_dataset_instance.metadata_deferred = False
return materialized_dataset_instance

def _stream_source(self, target_source: DatasetSource, datatype, dataset_hashes: List[DatasetHash]) -> str:
def _stream_source(self, target_source: DatasetSource, datatype, dataset: Dataset) -> str:
source_uri = target_source.source_uri
if source_uri is None:
raise Exception("Cannot stream from dataset source without specified source_uri")
Expand Down Expand Up @@ -236,9 +234,11 @@ def _stream_source(self, target_source: DatasetSource, datatype, dataset_hashes:
path = convert_result.converted_path
if datatype_groom:
datatype.groom_dataset_content(path)
# Grooming is not idempotent (e.g. paths in BAM headers), so invalidate hashes
dataset.hashes = []

if dataset_hashes:
for dataset_hash in dataset_hashes:
if dataset.hashes:
for dataset_hash in dataset.hashes:
_validate_hash(path, dataset_hash, "dataset contents")

return path
Expand Down
32 changes: 5 additions & 27 deletions lib/galaxy_test/api/test_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -758,23 +758,13 @@ def test_composite_datatype_download(self, history_id):

def test_compute_md5_on_primary_dataset(self, history_id):
hda = self.dataset_populator.new_dataset(history_id, wait=True)
hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=hda)
assert "hashes" in hda_details, str(hda_details.keys())
hashes = hda_details["hashes"]
assert len(hashes) == 0

self.dataset_populator.compute_hash(hda["id"])
hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=hda)
self.assert_hash_value(hda_details, "940cbe15c94d7e339dc15550f6bdcf4d", "MD5")

def test_compute_sha1_on_composite_dataset(self, history_id):
output = self.dataset_populator.fetch_hda(history_id, COMPOSITE_DATA_FETCH_REQUEST_1, wait=True)
hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output)
assert "hashes" in hda_details, str(hda_details.keys())
hashes = hda_details["hashes"]
assert len(hashes) == 0

self.dataset_populator.compute_hash(hda_details["id"], hash_function="SHA-256", extra_files_path="Roadmaps")
self.dataset_populator.compute_hash(output["id"], hash_function="SHA-256", extra_files_path="Roadmaps")
hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output)
self.assert_hash_value(
hda_details,
Expand All @@ -785,31 +775,19 @@ def test_compute_sha1_on_composite_dataset(self, history_id):

def test_duplicated_hash_requests_on_primary(self, history_id):
hda = self.dataset_populator.new_dataset(history_id, wait=True)
hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=hda)
assert "hashes" in hda_details, str(hda_details.keys())
hashes = hda_details["hashes"]
assert len(hashes) == 0

self.dataset_populator.compute_hash(hda["id"])
self.dataset_populator.compute_hash(hda["id"])
hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=hda)
self.assert_hash_value(hda_details, "940cbe15c94d7e339dc15550f6bdcf4d", "MD5")

def test_duplicated_hash_requests_on_extra_files(self, history_id):
output = self.dataset_populator.fetch_hda(history_id, COMPOSITE_DATA_FETCH_REQUEST_1, wait=True)
hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output)
assert "hashes" in hda_details, str(hda_details.keys())
hashes = hda_details["hashes"]
assert len(hashes) == 0

# 4 unique requests, but make them twice...
for _ in range(2):
self.dataset_populator.compute_hash(hda_details["id"], hash_function="SHA-256", extra_files_path="Roadmaps")
self.dataset_populator.compute_hash(hda_details["id"], hash_function="SHA-1", extra_files_path="Roadmaps")
self.dataset_populator.compute_hash(hda_details["id"], hash_function="MD5", extra_files_path="Roadmaps")
self.dataset_populator.compute_hash(
hda_details["id"], hash_function="SHA-256", extra_files_path="Sequences"
)
self.dataset_populator.compute_hash(output["id"], hash_function="SHA-256", extra_files_path="Roadmaps")
self.dataset_populator.compute_hash(output["id"], hash_function="SHA-1", extra_files_path="Roadmaps")
self.dataset_populator.compute_hash(output["id"], hash_function="MD5", extra_files_path="Roadmaps")
self.dataset_populator.compute_hash(output["id"], hash_function="SHA-256", extra_files_path="Sequences")

hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output)
self.assert_hash_value(hda_details, "ce0c0ef1073317ff96c896c249b002dc", "MD5", extra_files_path="Roadmaps")
Expand Down
7 changes: 7 additions & 0 deletions lib/galaxy_test/base/populators.py
Original file line number Diff line number Diff line change
Expand Up @@ -1398,6 +1398,13 @@ def validated():

return wait_on(validated, "dataset validation")

def wait_for_dataset_hashes(self, history_id: str, dataset_id: str):
def dataset_hashes_present():
hda = self.get_history_dataset_details(history_id=history_id, dataset_id=dataset_id)
return hda["hashes"] or None

return wait_on(dataset_hashes_present, "dataset hash presence")

def setup_history_for_export_testing(self, history_name):
using_requirement("new_history")
history_id = self.new_history(name=history_name)
Expand Down
49 changes: 49 additions & 0 deletions test/integration/test_dataset_hashing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from typing import Optional

from galaxy_test.base.populators import DatasetPopulator
from galaxy_test.driver import integration_util


class TestDatasetHashingIntegration(integration_util.IntegrationTestCase):
dataset_populator: DatasetPopulator
calculate_dataset_hash: Optional[str] = None

def setUp(self) -> None:
super().setUp()
self.dataset_populator = DatasetPopulator(self.galaxy_interactor)

@classmethod
def handle_galaxy_config_kwds(cls, config) -> None:
super().handle_galaxy_config_kwds(config)
if cls.calculate_dataset_hash is not None:
config["enable_celery_tasks"] = True
config["calculate_dataset_hash"] = cls.calculate_dataset_hash

def test_hashing(self, history_id: str) -> None:
hda = self.dataset_populator.new_dataset(history_id, wait=True)
if self.calculate_dataset_hash in [None, "always", "upload"]:
hashes = self.dataset_populator.wait_for_dataset_hashes(history_id=history_id, dataset_id=hda["id"])
assert hashes[0]["hash_value"] == "a17dcdfd36f47303a4824f1309d43ac14d7491ab3b8abb28782ac8e8d3b680ea"
else:
assert hda["hashes"] == [], hda
inputs = {"input1": {"src": "hda", "id": hda["id"]}}
run_response = self.dataset_populator.run_tool_raw("cat1", inputs=inputs, history_id=history_id)
self.dataset_populator.wait_for_tool_run(history_id=history_id, run_response=run_response)
cat_dataset = self.dataset_populator.get_history_dataset_details(history_id=history_id)
if self.calculate_dataset_hash == "always":
hashes = self.dataset_populator.wait_for_dataset_hashes(history_id=history_id, dataset_id=cat_dataset["id"])
assert hashes[0]["hash_value"] == "a17dcdfd36f47303a4824f1309d43ac14d7491ab3b8abb28782ac8e8d3b680ea"
else:
assert cat_dataset["hashes"] == [], cat_dataset


class TestDatasetHashingAlwaysIntegration(TestDatasetHashingIntegration):
calculate_dataset_hash = "always"


class TestDatasetHashingUploadIntegration(TestDatasetHashingIntegration):
calculate_dataset_hash = "upload"


class TestDatasetHashingNeverIntegration(TestDatasetHashingIntegration):
calculate_dataset_hash = "never"
Loading

0 comments on commit d39db9b

Please sign in to comment.