diff --git a/doc/source/admin/galaxy_options.rst b/doc/source/admin/galaxy_options.rst index a2a077224750..d7fe4cd5f41f 100644 --- a/doc/source/admin/galaxy_options.rst +++ b/doc/source/admin/galaxy_options.rst @@ -4719,6 +4719,33 @@ :Type: float +~~~~~~~~~~~~~~~~~~~~~~~~~~ +``calculate_dataset_hash`` +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +:Description: + In which cases Galaxy should calculate a hash for a new dataset. + Dataset hashes can be used by the Galaxy job cache/search to check + if job inputs match. Setting the 'enable_celery_tasks' option to + true is also required for dataset hash calculation. Possible + values are: 'always', 'upload' (the default), 'never'. If set to + 'upload', the hash is calculated only for the outputs of upload + jobs. +:Default: ``upload`` +:Type: str + + +~~~~~~~~~~~~~~~~~ +``hash_function`` +~~~~~~~~~~~~~~~~~ + +:Description: + Hash function to use if 'calculate_dataset_hash' is enabled. + Possible values are: 'md5', 'sha1', 'sha256', 'sha512' +:Default: ``sha256`` +:Type: str + + ~~~~~~~~~~~~~~~~~~~~~ ``metadata_strategy`` ~~~~~~~~~~~~~~~~~~~~~ diff --git a/lib/galaxy/config/__init__.py b/lib/galaxy/config/__init__.py index d253b3d0d9b6..1dc273bc7240 100644 --- a/lib/galaxy/config/__init__.py +++ b/lib/galaxy/config/__init__.py @@ -46,6 +46,7 @@ from galaxy.util.custom_logging import LOGLV_TRACE from galaxy.util.dynamic import HasDynamicProperties from galaxy.util.facts import get_facts +from galaxy.util.hash_util import HashFunctionNameEnum from galaxy.util.properties import ( read_properties_from_file, running_from_source, @@ -716,6 +717,7 @@ class GalaxyAppConfiguration(BaseAppConfiguration, CommonConfigurationMixin): galaxy_data_manager_data_path: str galaxy_infrastructure_url: str hours_between_check: int + hash_function: HashFunctionNameEnum integrated_tool_panel_config: str involucro_path: str len_file_path: str @@ -897,6 +899,13 @@ def _process_config(self, kwargs: Dict[str, Any]) -> None: self.update_integrated_tool_panel = kwargs.get("update_integrated_tool_panel", True) self.galaxy_data_manager_data_path = self.galaxy_data_manager_data_path or self.tool_data_path self.tool_secret = kwargs.get("tool_secret", "") + if self.calculate_dataset_hash not in ("always", "upload", "never"): + raise ConfigurationError( + f"Unrecognized value for calculate_dataset_hash option: {self.calculate_dataset_hash}" + ) + if self.hash_function not in HashFunctionNameEnum.__members__: + raise ConfigurationError(f"Unrecognized value for hash_function option: {self.hash_function}") + self.hash_function = HashFunctionNameEnum[self.hash_function] self.metadata_strategy = kwargs.get("metadata_strategy", "directory") self.use_remote_user = self.use_remote_user or self.single_user self.fetch_url_allowlist_ips = parse_allowlist_ips(listify(kwargs.get("fetch_url_allowlist"))) diff --git a/lib/galaxy/config/sample/galaxy.yml.sample b/lib/galaxy/config/sample/galaxy.yml.sample index d2da6d1723ca..713b3788d26c 100644 --- a/lib/galaxy/config/sample/galaxy.yml.sample +++ b/lib/galaxy/config/sample/galaxy.yml.sample @@ -2545,6 +2545,18 @@ galaxy: # handler processes. Float values are allowed. #workflow_monitor_sleep: 1.0 + # In which cases Galaxy should calculate a hash for a new dataset. + # Dataset hashes can be used by the Galaxy job cache/search to check + # if job inputs match. Setting the 'enable_celery_tasks' option to + # true is also required for dataset hash calculation. Possible values + # are: 'always', 'upload' (the default), 'never'. If set to 'upload', + # the hash is calculated only for the outputs of upload jobs. + #calculate_dataset_hash: upload + + # Hash function to use if 'calculate_dataset_hash' is enabled. + # Possible values are: 'md5', 'sha1', 'sha256', 'sha512' + #hash_function: sha256 + # Determines how metadata will be set. Valid values are `directory`, # `extended`, `directory_celery` and `extended_celery`. In extended # mode jobs will decide if a tool run failed, the object stores diff --git a/lib/galaxy/config/schemas/config_schema.yml b/lib/galaxy/config/schemas/config_schema.yml index e659efd6d21c..d1e55b6a6eac 100644 --- a/lib/galaxy/config/schemas/config_schema.yml +++ b/lib/galaxy/config/schemas/config_schema.yml @@ -2788,6 +2788,7 @@ mapping: Avoiding making this a boolean because we may add options such as 'in-single-form-view' or 'in-simplified-workflow-views'. https://github.com/galaxyproject/galaxy/pull/9809/files#r461889109 + allow_user_dataset_purge: type: bool default: true @@ -3454,6 +3455,26 @@ mapping: decreased if extremely high job throughput is necessary, but doing so can increase CPU usage of handler processes. Float values are allowed. + calculate_dataset_hash: + type: str + default: upload + required: false + enum: ['always', 'upload', 'never'] + desc: | + In which cases Galaxy should calculate a hash for a new dataset. + Dataset hashes can be used by the Galaxy job cache/search to check if job inputs match. + Setting the 'enable_celery_tasks' option to true is also required for dataset hash calculation. + Possible values are: 'always', 'upload' (the default), 'never'. If set to 'upload', the + hash is calculated only for the outputs of upload jobs. + + hash_function: + type: str + default: sha256 + required: false + desc: | + Hash function to use if 'calculate_dataset_hash' is enabled. Possible values + are: 'md5', 'sha1', 'sha256', 'sha512' + metadata_strategy: type: str required: false @@ -3547,6 +3568,7 @@ mapping: default: always required: false reloadable: true + enum: ['always', 'onsuccess', 'never'] desc: | Clean up various bits of jobs left on the filesystem after completion. These bits include the job working directory, external metadata temporary files, diff --git a/lib/galaxy/jobs/__init__.py b/lib/galaxy/jobs/__init__.py index 0bf12219ce3f..22a909466b41 100644 --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -75,6 +75,7 @@ ObjectStorePopulator, serialize_static_object_store_config, ) +from galaxy.schema.tasks import ComputeDatasetHashTaskRequest from galaxy.structured_app import MinimalManagerApp from galaxy.tool_util.deps import requirements from galaxy.tool_util.output_checker import ( @@ -2038,6 +2039,25 @@ def fail(message=job.info, exception=None): dataset.full_delete() collected_bytes = 0 + # Calculate dataset hash + for dataset_assoc in output_dataset_associations: + dataset = dataset_assoc.dataset.dataset + if not dataset.purged and dataset.state != Dataset.states.DEFERRED and not dataset.hashes: + if self.app.config.calculate_dataset_hash == "always" or ( + self.app.config.calculate_dataset_hash == "upload" and job.tool_id in ("upload1", "__DATA_FETCH__") + ): + # Calculate dataset hash via a celery task + if self.app.config.enable_celery_tasks: + from galaxy.celery.tasks import compute_dataset_hash + + extra_files_path = dataset.extra_files_path if dataset.extra_files_path_exists() else None + request = ComputeDatasetHashTaskRequest( + dataset_id=dataset.id, + extra_files_path=extra_files_path, + hash_function=self.app.config.hash_function, + ) + compute_dataset_hash.delay(request=request) + user = job.user if user and collected_bytes > 0 and quota_source_info is not None and quota_source_info.use: user.adjust_total_disk_usage(collected_bytes, quota_source_info.label) diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index f5f1085dd850..a5e03373dfb7 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -4290,7 +4290,7 @@ def get_file_name(self, sync_cache: bool = True) -> str: if not file_name and self.state not in (self.states.NEW, self.states.QUEUED): # Queued datasets can be assigned an object store and have a filename, but they aren't guaranteed to. # Anything after queued should have a file name. - log.warning(f"Failed to determine file name for dataset {self.id}") + log.warning(f"Failed to determine file name for dataset {self.id} in state {self.state}") return file_name else: filename = self.external_filename diff --git a/lib/galaxy/model/deferred.py b/lib/galaxy/model/deferred.py index 06eb995adc93..58f13f24a910 100644 --- a/lib/galaxy/model/deferred.py +++ b/lib/galaxy/model/deferred.py @@ -4,7 +4,6 @@ import shutil from typing import ( cast, - List, NamedTuple, Optional, Union, @@ -26,7 +25,6 @@ Dataset, DatasetCollection, DatasetCollectionElement, - DatasetHash, DatasetSource, DescribesHash, History, @@ -142,7 +140,7 @@ def ensure_materialized( sa_session.commit() object_store_populator.set_dataset_object_store_id(materialized_dataset) try: - path = self._stream_source(target_source, dataset_instance.datatype, materialized_dataset_hashes) + path = self._stream_source(target_source, dataset_instance.datatype, materialized_dataset) object_store.update_from_file(materialized_dataset, file_name=path) materialized_dataset.set_size() except Exception as e: @@ -152,9 +150,9 @@ def ensure_materialized( assert transient_path_mapper transient_paths = transient_path_mapper.transient_paths_for(dataset) # TODO: optimize this by streaming right to this path... - # TODO: take into acount transform and ensure we are and are not modifying the file as appropriate. + # TODO: take into account transform and ensure we are and are not modifying the file as appropriate. try: - path = self._stream_source(target_source, dataset_instance.datatype, materialized_dataset_hashes) + path = self._stream_source(target_source, dataset_instance.datatype, materialized_dataset) shutil.move(path, transient_paths.external_filename) materialized_dataset.external_filename = transient_paths.external_filename except Exception as e: @@ -178,9 +176,9 @@ def ensure_materialized( materialized_dataset_instance = cast(HistoryDatasetAssociation, dataset_instance) if exception_materializing is not None: materialized_dataset.state = Dataset.states.ERROR - materialized_dataset_instance.info = ( - f"Failed to materialize deferred dataset with exception: {exception_materializing}" - ) + error_msg = f"Failed to materialize deferred dataset with exception: {exception_materializing}" + materialized_dataset_instance.info = error_msg + log.error(error_msg) if attached: sa_session = self._sa_session if sa_session is None: @@ -206,7 +204,7 @@ def ensure_materialized( materialized_dataset_instance.metadata_deferred = False return materialized_dataset_instance - def _stream_source(self, target_source: DatasetSource, datatype, dataset_hashes: List[DatasetHash]) -> str: + def _stream_source(self, target_source: DatasetSource, datatype, dataset: Dataset) -> str: source_uri = target_source.source_uri if source_uri is None: raise Exception("Cannot stream from dataset source without specified source_uri") @@ -236,9 +234,11 @@ def _stream_source(self, target_source: DatasetSource, datatype, dataset_hashes: path = convert_result.converted_path if datatype_groom: datatype.groom_dataset_content(path) + # Grooming is not idempotent (e.g. paths in BAM headers), so invalidate hashes + dataset.hashes = [] - if dataset_hashes: - for dataset_hash in dataset_hashes: + if dataset.hashes: + for dataset_hash in dataset.hashes: _validate_hash(path, dataset_hash, "dataset contents") return path diff --git a/lib/galaxy_test/api/test_datasets.py b/lib/galaxy_test/api/test_datasets.py index 87ec35daf517..b74a230b213d 100644 --- a/lib/galaxy_test/api/test_datasets.py +++ b/lib/galaxy_test/api/test_datasets.py @@ -758,23 +758,13 @@ def test_composite_datatype_download(self, history_id): def test_compute_md5_on_primary_dataset(self, history_id): hda = self.dataset_populator.new_dataset(history_id, wait=True) - hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=hda) - assert "hashes" in hda_details, str(hda_details.keys()) - hashes = hda_details["hashes"] - assert len(hashes) == 0 - self.dataset_populator.compute_hash(hda["id"]) hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=hda) self.assert_hash_value(hda_details, "940cbe15c94d7e339dc15550f6bdcf4d", "MD5") def test_compute_sha1_on_composite_dataset(self, history_id): output = self.dataset_populator.fetch_hda(history_id, COMPOSITE_DATA_FETCH_REQUEST_1, wait=True) - hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output) - assert "hashes" in hda_details, str(hda_details.keys()) - hashes = hda_details["hashes"] - assert len(hashes) == 0 - - self.dataset_populator.compute_hash(hda_details["id"], hash_function="SHA-256", extra_files_path="Roadmaps") + self.dataset_populator.compute_hash(output["id"], hash_function="SHA-256", extra_files_path="Roadmaps") hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output) self.assert_hash_value( hda_details, @@ -785,11 +775,6 @@ def test_compute_sha1_on_composite_dataset(self, history_id): def test_duplicated_hash_requests_on_primary(self, history_id): hda = self.dataset_populator.new_dataset(history_id, wait=True) - hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=hda) - assert "hashes" in hda_details, str(hda_details.keys()) - hashes = hda_details["hashes"] - assert len(hashes) == 0 - self.dataset_populator.compute_hash(hda["id"]) self.dataset_populator.compute_hash(hda["id"]) hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=hda) @@ -797,19 +782,12 @@ def test_duplicated_hash_requests_on_primary(self, history_id): def test_duplicated_hash_requests_on_extra_files(self, history_id): output = self.dataset_populator.fetch_hda(history_id, COMPOSITE_DATA_FETCH_REQUEST_1, wait=True) - hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output) - assert "hashes" in hda_details, str(hda_details.keys()) - hashes = hda_details["hashes"] - assert len(hashes) == 0 - # 4 unique requests, but make them twice... for _ in range(2): - self.dataset_populator.compute_hash(hda_details["id"], hash_function="SHA-256", extra_files_path="Roadmaps") - self.dataset_populator.compute_hash(hda_details["id"], hash_function="SHA-1", extra_files_path="Roadmaps") - self.dataset_populator.compute_hash(hda_details["id"], hash_function="MD5", extra_files_path="Roadmaps") - self.dataset_populator.compute_hash( - hda_details["id"], hash_function="SHA-256", extra_files_path="Sequences" - ) + self.dataset_populator.compute_hash(output["id"], hash_function="SHA-256", extra_files_path="Roadmaps") + self.dataset_populator.compute_hash(output["id"], hash_function="SHA-1", extra_files_path="Roadmaps") + self.dataset_populator.compute_hash(output["id"], hash_function="MD5", extra_files_path="Roadmaps") + self.dataset_populator.compute_hash(output["id"], hash_function="SHA-256", extra_files_path="Sequences") hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output) self.assert_hash_value(hda_details, "ce0c0ef1073317ff96c896c249b002dc", "MD5", extra_files_path="Roadmaps") diff --git a/lib/galaxy_test/base/populators.py b/lib/galaxy_test/base/populators.py index 684689de2f6f..3e47c70c18f8 100644 --- a/lib/galaxy_test/base/populators.py +++ b/lib/galaxy_test/base/populators.py @@ -1398,6 +1398,13 @@ def validated(): return wait_on(validated, "dataset validation") + def wait_for_dataset_hashes(self, history_id: str, dataset_id: str): + def dataset_hashes_present(): + hda = self.get_history_dataset_details(history_id=history_id, dataset_id=dataset_id) + return hda["hashes"] or None + + return wait_on(dataset_hashes_present, "dataset hash presence") + def setup_history_for_export_testing(self, history_name): using_requirement("new_history") history_id = self.new_history(name=history_name) diff --git a/test/integration/test_dataset_hashing.py b/test/integration/test_dataset_hashing.py new file mode 100644 index 000000000000..a3a123b735f1 --- /dev/null +++ b/test/integration/test_dataset_hashing.py @@ -0,0 +1,49 @@ +from typing import Optional + +from galaxy_test.base.populators import DatasetPopulator +from galaxy_test.driver import integration_util + + +class TestDatasetHashingIntegration(integration_util.IntegrationTestCase): + dataset_populator: DatasetPopulator + calculate_dataset_hash: Optional[str] = None + + def setUp(self) -> None: + super().setUp() + self.dataset_populator = DatasetPopulator(self.galaxy_interactor) + + @classmethod + def handle_galaxy_config_kwds(cls, config) -> None: + super().handle_galaxy_config_kwds(config) + if cls.calculate_dataset_hash is not None: + config["enable_celery_tasks"] = True + config["calculate_dataset_hash"] = cls.calculate_dataset_hash + + def test_hashing(self, history_id: str) -> None: + hda = self.dataset_populator.new_dataset(history_id, wait=True) + if self.calculate_dataset_hash in [None, "always", "upload"]: + hashes = self.dataset_populator.wait_for_dataset_hashes(history_id=history_id, dataset_id=hda["id"]) + assert hashes[0]["hash_value"] == "a17dcdfd36f47303a4824f1309d43ac14d7491ab3b8abb28782ac8e8d3b680ea" + else: + assert hda["hashes"] == [], hda + inputs = {"input1": {"src": "hda", "id": hda["id"]}} + run_response = self.dataset_populator.run_tool_raw("cat1", inputs=inputs, history_id=history_id) + self.dataset_populator.wait_for_tool_run(history_id=history_id, run_response=run_response) + cat_dataset = self.dataset_populator.get_history_dataset_details(history_id=history_id) + if self.calculate_dataset_hash == "always": + hashes = self.dataset_populator.wait_for_dataset_hashes(history_id=history_id, dataset_id=cat_dataset["id"]) + assert hashes[0]["hash_value"] == "a17dcdfd36f47303a4824f1309d43ac14d7491ab3b8abb28782ac8e8d3b680ea" + else: + assert cat_dataset["hashes"] == [], cat_dataset + + +class TestDatasetHashingAlwaysIntegration(TestDatasetHashingIntegration): + calculate_dataset_hash = "always" + + +class TestDatasetHashingUploadIntegration(TestDatasetHashingIntegration): + calculate_dataset_hash = "upload" + + +class TestDatasetHashingNeverIntegration(TestDatasetHashingIntegration): + calculate_dataset_hash = "never" diff --git a/test/integration/test_materialize_dataset_instance_tasks.py b/test/integration/test_materialize_dataset_instance_tasks.py index 817098994a5f..aef30d0a29c6 100644 --- a/test/integration/test_materialize_dataset_instance_tasks.py +++ b/test/integration/test_materialize_dataset_instance_tasks.py @@ -160,7 +160,7 @@ def test_upload_vs_materialize_simplest_upload(self, history_id: str): assert len(uploaded_details["sources"]) == 1 content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output, assert_ok=True) assert content == "This is a line of text." - new_history_id = self._reupload_and_then_materialize(history_id, output) + new_history_id = self._reupload_and_then_materialize(output) content = self.dataset_populator.get_history_dataset_content(new_history_id, hid=2, assert_ok=False) assert content == "This is a line of text." @@ -186,7 +186,7 @@ def test_upload_vs_materialize_to_posix_lines(self, history_id: str): assert len(transform) == 1 content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output, assert_ok=True) assert content == "This is a line of text.\n" - new_history_id = self._reupload_and_then_materialize(history_id, output) + new_history_id = self._reupload_and_then_materialize(output) content = self.dataset_populator.get_history_dataset_content(new_history_id, hid=2, assert_ok=False) assert content == "This is a line of text.\n" @@ -212,7 +212,7 @@ def test_upload_vs_materialize_space_to_tab(self, history_id: str): assert len(transform) == 1 content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output, assert_ok=True) assert content == "This\tis\ta\tline\tof\ttext." - new_history_id = self._reupload_and_then_materialize(history_id, output) + new_history_id = self._reupload_and_then_materialize(output) content = self.dataset_populator.get_history_dataset_content(new_history_id, hid=2, assert_ok=False) assert content == "This\tis\ta\tline\tof\ttext." @@ -239,7 +239,7 @@ def test_upload_vs_materialize_to_posix_and_space_to_tab(self, history_id: str): assert len(transform) == 2 content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output, assert_ok=True) assert content == "This\tis\ta\tline\tof\ttext.\n" - new_history_id = self._reupload_and_then_materialize(history_id, output) + new_history_id = self._reupload_and_then_materialize(output) content = self.dataset_populator.get_history_dataset_content(new_history_id, hid=2, assert_ok=False) assert content == "This\tis\ta\tline\tof\ttext.\n" @@ -262,20 +262,17 @@ def test_upload_vs_materialize_grooming(self, history_id: str): transform = source_0["transform"] assert isinstance(transform, list) assert len(transform) == 1 - original_details = self.dataset_populator.get_history_dataset_details( - history_id, dataset=output, assert_ok=True - ) - new_history_id = self._reupload_and_then_materialize(history_id, output) + new_history_id = self._reupload_and_then_materialize(output) new_details = self.dataset_populator.get_history_dataset_details(new_history_id, hid=2, assert_ok=False) - for key in original_details.keys(): + for key in uploaded_details.keys(): if key in ["metadata_bam_header", "metadata_bam_index"]: # differs because command-line different, index path different, and such... continue if key.startswith("metadata_"): - assert original_details[key] == new_details[key], f"Mismatched on key {key}" - assert original_details["file_ext"] == new_details["file_ext"] + assert uploaded_details[key] == new_details[key], f"Mismatched on key {key}" + assert uploaded_details["file_ext"] == new_details["file_ext"] - def _reupload_and_then_materialize(self, history_id, dataset): + def _reupload_and_then_materialize(self, dataset): new_history_id, uploaded_hdas = self.dataset_populator.reupload_contents(dataset) assert len(uploaded_hdas) == 1 deferred_hda = uploaded_hdas[0]