diff --git a/RELEASE.md b/RELEASE.md index d84495d5e6..94fa345843 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -2,6 +2,7 @@ ## Major features and improvements ## Bug fixes and other changes +* Added validation to ensure dataset versions consistency across catalog. ## Breaking changes to the API ## Documentation changes ## Community contributions diff --git a/kedro/io/core.py b/kedro/io/core.py index 01e85963b9..c83e77c7a6 100644 --- a/kedro/io/core.py +++ b/kedro/io/core.py @@ -71,16 +71,16 @@ class DatasetError(Exception): class DatasetNotFoundError(DatasetError): - """``DatasetNotFoundError`` raised by ``DataCatalog`` class in case of - trying to use a non-existing dataset. + """``DatasetNotFoundError`` raised by ```DataCatalog`` and ``KedroDataCatalog`` + classes in case of trying to use a non-existing dataset. """ pass class DatasetAlreadyExistsError(DatasetError): - """``DatasetAlreadyExistsError`` raised by ``DataCatalog`` class in case - of trying to add a dataset which already exists in the ``DataCatalog``. + """``DatasetAlreadyExistsError`` raised by ```DataCatalog`` and ``KedroDataCatalog`` + classes in case of trying to add a dataset which already exists in the ``DataCatalog``. """ pass @@ -94,6 +94,15 @@ class VersionNotFoundError(DatasetError): pass +class VersionAlreadyExistsError(DatasetError): + """``VersionAlreadyExistsError`` raised by ``DataCatalog`` and ``KedroDataCatalog`` + classes when attempting to add a dataset to a catalog with a save version + that conflicts with the save version already set for the catalog. + """ + + pass + + _DI = TypeVar("_DI") _DO = TypeVar("_DO") @@ -955,3 +964,57 @@ def confirm(self, name: str) -> None: def shallow_copy(self, extra_dataset_patterns: Patterns | None = None) -> _C: """Returns a shallow copy of the current object.""" ... + + +def _validate_versions( + datasets: dict[str, AbstractDataset] | None, + load_versions: dict[str, str], + save_version: str | None, +) -> tuple[dict[str, str], str | None]: + """Validates and synchronises dataset versions for loading and saving. + + Ensures consistency of dataset versions across a catalog, particularly + for versioned datasets. It updates load versions and validates that all + save versions are consistent. + + Args: + datasets: A dictionary mapping dataset names to their instances. + if None, no validation occurs. + load_versions: A mapping between dataset names and versions + to load. + save_version: Version string to be used for ``save`` operations + by all datasets with versioning enabled. + + Returns: + Updated ``load_versions`` with load versions specified in the ``datasets`` + and resolved ``save_version``. + + Raises: + VersionAlreadyExistsError: If a dataset's save version conflicts with + the catalog's save version. + """ + if not datasets: + return load_versions, save_version + + cur_load_versions = load_versions.copy() + cur_save_version = save_version + + for ds_name, ds in datasets.items(): + # TODO: Move to kedro/io/kedro_data_catalog.py when removing DataCatalog + # TODO: Make it a protected static method for KedroDataCatalog + # TODO: Replace with isinstance(ds, CachedDataset) - current implementation avoids circular import + cur_ds = ds._dataset if ds.__class__.__name__ == "CachedDataset" else ds # type: ignore[attr-defined] + + if isinstance(cur_ds, AbstractVersionedDataset) and cur_ds._version: + if cur_ds._version.load: + cur_load_versions[ds_name] = cur_ds._version.load + if cur_ds._version.save: + cur_save_version = cur_save_version or cur_ds._version.save + if cur_save_version != cur_ds._version.save: + raise VersionAlreadyExistsError( + f"Cannot add a dataset `{ds_name}` with `{cur_ds._version.save}` save version. " + f"Save version set for the catalog is `{cur_save_version}`" + f"All datasets in the catalog must have the same save version." + ) + + return cur_load_versions, cur_save_version diff --git a/kedro/io/data_catalog.py b/kedro/io/data_catalog.py index 6f9a678272..42863c735f 100644 --- a/kedro/io/data_catalog.py +++ b/kedro/io/data_catalog.py @@ -25,6 +25,7 @@ DatasetError, DatasetNotFoundError, Version, + _validate_versions, generate_timestamp, ) from kedro.io.memory_dataset import MemoryDataset @@ -160,20 +161,20 @@ def __init__( # noqa: PLR0913 >>> catalog = DataCatalog(datasets={'cars': cars}) """ self._config_resolver = config_resolver or CatalogConfigResolver() - # Kept to avoid breaking changes if not config_resolver: self._config_resolver._dataset_patterns = dataset_patterns or {} self._config_resolver._default_pattern = default_pattern or {} + self._load_versions, self._save_version = _validate_versions( + datasets, load_versions or {}, save_version + ) + self._datasets: dict[str, AbstractDataset] = {} self.datasets: _FrozenDatasets | None = None self.add_all(datasets or {}) - self._load_versions = load_versions or {} - self._save_version = save_version - self._use_rich_markup = _has_rich_handler() if feed_dict: @@ -506,6 +507,9 @@ def add( raise DatasetAlreadyExistsError( f"Dataset '{dataset_name}' has already been registered" ) + self._load_versions, self._save_version = _validate_versions( + {dataset_name: dataset}, self._load_versions, self._save_version + ) self._datasets[dataset_name] = dataset self.datasets = _FrozenDatasets(self.datasets, {dataset_name: dataset}) diff --git a/kedro/io/kedro_data_catalog.py b/kedro/io/kedro_data_catalog.py index 8bbf573d7e..9555cf1f69 100644 --- a/kedro/io/kedro_data_catalog.py +++ b/kedro/io/kedro_data_catalog.py @@ -25,6 +25,7 @@ DatasetError, DatasetNotFoundError, Version, + _validate_versions, generate_timestamp, ) from kedro.io.memory_dataset import MemoryDataset @@ -98,8 +99,9 @@ def __init__( self._config_resolver = config_resolver or CatalogConfigResolver() self._datasets = datasets or {} self._lazy_datasets: dict[str, _LazyDataset] = {} - self._load_versions = load_versions or {} - self._save_version = save_version + self._load_versions, self._save_version = _validate_versions( + datasets, load_versions or {}, save_version + ) self._use_rich_markup = _has_rich_handler() @@ -218,6 +220,9 @@ def __setitem__(self, key: str, value: Any) -> None: if key in self._datasets: self._logger.warning("Replacing dataset '%s'", key) if isinstance(value, AbstractDataset): + self._load_versions, self._save_version = _validate_versions( + {key: value}, self._load_versions, self._save_version + ) self._datasets[key] = value elif isinstance(value, _LazyDataset): self._lazy_datasets[key] = value diff --git a/tests/io/conftest.py b/tests/io/conftest.py index 9abce4c83e..ce466469dd 100644 --- a/tests/io/conftest.py +++ b/tests/io/conftest.py @@ -3,6 +3,8 @@ import pytest from kedro_datasets.pandas import CSVDataset +from kedro.io import CachedDataset, Version + @pytest.fixture def dummy_numpy_array(): @@ -34,6 +36,26 @@ def dataset(filepath): return CSVDataset(filepath=filepath, save_args={"index": False}) +@pytest.fixture +def dataset_versioned(filepath): + return CSVDataset( + filepath=filepath, + save_args={"index": False}, + version=Version(load="test_load_version.csv", save="test_save_version.csv"), + ) + + +@pytest.fixture +def cached_dataset_versioned(filepath): + return CachedDataset( + dataset=CSVDataset( + filepath=filepath, + save_args={"index": False}, + version=Version(load="test_load_version.csv", save="test_save_version.csv"), + ) + ) + + @pytest.fixture def correct_config(filepath): return { diff --git a/tests/io/test_data_catalog.py b/tests/io/test_data_catalog.py index 386c0812db..180919e309 100644 --- a/tests/io/test_data_catalog.py +++ b/tests/io/test_data_catalog.py @@ -23,6 +23,7 @@ _DEFAULT_PACKAGES, VERSION_FORMAT, Version, + VersionAlreadyExistsError, generate_timestamp, parse_dataset_definition, ) @@ -753,6 +754,82 @@ def test_no_versions_with_cloud_protocol(self, monkeypatch): with pytest.raises(DatasetError, match=pattern): versioned_dataset.load() + def test_redefine_save_version_via_catalog(self, correct_config, dataset_versioned): + """Test redefining save version when it is already set""" + # Version is set automatically for the catalog + catalog = DataCatalog.from_config(**correct_config) + with pytest.raises(VersionAlreadyExistsError): + catalog.add("ds_versioned", dataset_versioned) + + # Version is set manually for the catalog + correct_config["catalog"]["boats"]["versioned"] = True + catalog = DataCatalog.from_config(**correct_config) + with pytest.raises(VersionAlreadyExistsError): + catalog.add("ds_versioned", dataset_versioned) + + def test_set_load_and_save_versions(self, correct_config, dataset_versioned): + """Test setting load and save versions for catalog based on dataset's versions provided""" + catalog = DataCatalog(datasets={"ds_versioned": dataset_versioned}) + + assert catalog._load_versions["ds_versioned"] == dataset_versioned._version.load + assert catalog._save_version == dataset_versioned._version.save + + def test_set_same_versions(self, correct_config, dataset_versioned): + """Test setting the same load and save versions for catalog based on dataset's versions provided""" + catalog = DataCatalog(datasets={"ds_versioned": dataset_versioned}) + catalog.add("ds_same_versions", dataset_versioned) + + assert catalog._load_versions["ds_versioned"] == dataset_versioned._version.load + assert catalog._save_version == dataset_versioned._version.save + + def test_redefine_load_version(self, correct_config, dataset_versioned): + """Test redefining save version when it is already set""" + catalog = DataCatalog(datasets={"ds_versioned": dataset_versioned}) + dataset_versioned._version = Version( + load="another_load_version.csv", + save="test_save_version.csv", + ) + catalog.add("ds_same_versions", dataset_versioned) + + assert ( + catalog._load_versions["ds_same_versions"] + == dataset_versioned._version.load + ) + assert catalog._load_versions["ds_versioned"] == "test_load_version.csv" + assert catalog._save_version == dataset_versioned._version.save + + def test_redefine_save_version(self, correct_config, dataset_versioned): + """Test redefining save version when it is already set""" + catalog = DataCatalog(datasets={"ds_versioned": dataset_versioned}) + dataset_versioned._version = Version( + load="another_load_version.csv", + save="another_save_version.csv", + ) + with pytest.raises(VersionAlreadyExistsError): + catalog.add("ds_same_versions", dataset_versioned) + + def test_redefine_save_version_with_cached_dataset( + self, correct_config, cached_dataset_versioned + ): + """Test redefining load and save version with CachedDataset""" + catalog = DataCatalog.from_config(**correct_config) + + # Redefining save version fails + with pytest.raises(VersionAlreadyExistsError): + catalog.add("cached_dataset_versioned", cached_dataset_versioned) + + # Redefining load version passes + cached_dataset_versioned._dataset._version = Version( + load="test_load_version.csv", save=None + ) + catalog.add("cached_dataset_versioned", cached_dataset_versioned) + + assert ( + catalog._load_versions["cached_dataset_versioned"] + == "test_load_version.csv" + ) + assert catalog._save_version + class TestDataCatalogDatasetFactories: def test_match_added_to_datasets_on_get(self, config_with_dataset_factories): diff --git a/tests/io/test_kedro_data_catalog.py b/tests/io/test_kedro_data_catalog.py index 367580ef80..efd5a8a68e 100644 --- a/tests/io/test_kedro_data_catalog.py +++ b/tests/io/test_kedro_data_catalog.py @@ -20,6 +20,8 @@ from kedro.io.core import ( _DEFAULT_PACKAGES, VERSION_FORMAT, + Version, + VersionAlreadyExistsError, generate_timestamp, parse_dataset_definition, ) @@ -667,3 +669,87 @@ def test_load_version_on_unversioned_dataset( with pytest.raises(DatasetError): catalog.load("boats", version="first") + + def test_redefine_save_version_via_catalog( + self, correct_config, dataset_versioned + ): + """Test redefining save version when it is already set""" + # Version is set automatically for the catalog + catalog = KedroDataCatalog.from_config(**correct_config) + with pytest.raises(VersionAlreadyExistsError): + catalog["ds_versioned"] = dataset_versioned + + # Version is set manually for the catalog + correct_config["catalog"]["boats"]["versioned"] = True + catalog = KedroDataCatalog.from_config(**correct_config) + with pytest.raises(VersionAlreadyExistsError): + catalog["ds_versioned"] = dataset_versioned + + def test_set_load_and_save_versions(self, correct_config, dataset_versioned): + """Test setting load and save versions for catalog based on dataset's versions provided""" + catalog = KedroDataCatalog(datasets={"ds_versioned": dataset_versioned}) + + assert ( + catalog._load_versions["ds_versioned"] + == dataset_versioned._version.load + ) + assert catalog._save_version == dataset_versioned._version.save + + def test_set_same_versions(self, correct_config, dataset_versioned): + """Test setting the same load and save versions for catalog based on dataset's versions provided""" + catalog = KedroDataCatalog(datasets={"ds_versioned": dataset_versioned}) + catalog["ds_same_versions"] = dataset_versioned + + assert ( + catalog._load_versions["ds_versioned"] + == dataset_versioned._version.load + ) + assert catalog._save_version == dataset_versioned._version.save + + def test_redefine_load_version(self, correct_config, dataset_versioned): + """Test redefining save version when it is already set""" + catalog = KedroDataCatalog(datasets={"ds_versioned": dataset_versioned}) + dataset_versioned._version = Version( + load="another_load_version.csv", + save="test_save_version.csv", + ) + catalog["ds_same_versions"] = dataset_versioned + + assert ( + catalog._load_versions["ds_same_versions"] + == dataset_versioned._version.load + ) + assert catalog._load_versions["ds_versioned"] == "test_load_version.csv" + assert catalog._save_version == dataset_versioned._version.save + + def test_redefine_save_version(self, correct_config, dataset_versioned): + """Test redefining save version when it is already set""" + catalog = KedroDataCatalog(datasets={"ds_versioned": dataset_versioned}) + dataset_versioned._version = Version( + load="another_load_version.csv", + save="another_save_version.csv", + ) + with pytest.raises(VersionAlreadyExistsError): + catalog["ds_same_versions"] = dataset_versioned + + def test_redefine_save_version_with_cached_dataset( + self, correct_config, cached_dataset_versioned + ): + """Test redefining load and save version with CachedDataset""" + catalog = KedroDataCatalog.from_config(**correct_config) + + # Redefining save version fails + with pytest.raises(VersionAlreadyExistsError): + catalog["cached_dataset_versioned"] = cached_dataset_versioned + + # Redefining load version passes + cached_dataset_versioned._dataset._version = Version( + load="test_load_version.csv", save=None + ) + catalog["cached_dataset_versioned"] = cached_dataset_versioned + + assert ( + catalog._load_versions["cached_dataset_versioned"] + == "test_load_version.csv" + ) + assert catalog._save_version