From 515d157b41bbbf9d40898c7b9cab5486d99c66d2 Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Mon, 26 Aug 2024 20:09:54 -0600 Subject: [PATCH] Internal refactor to separate reading and writing concerns (#231) * split xarray.py into backend.py and accessor.py * move the kerchunk serialization code out into a new writers submodule * separate out the zarr reading code as a separate reader * actually include new accessor.py file * actually include new kerchunk writers file * actually include new zarr writer file * update test to import from the new location of zarr code * refactor to create a kerchunk 'reader' * split test_xarray.py into two files * split up the kerchunk tests into tests of writing and reading kerchunk * absolute imports in top-level init * kerchunk.py -> types.kerchunk.py * fix some mypy issues * release notes * update module paths in API docs * separate zarr writer tests out * forgot file i moved the zarr tests to * move left behind test * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- docs/api.rst | 6 +- docs/releases.rst | 5 +- virtualizarr/__init__.py | 6 +- virtualizarr/accessor.py | 166 +++++++++ virtualizarr/{xarray.py => backend.py} | 339 ++---------------- virtualizarr/manifests/array.py | 7 +- virtualizarr/{ => readers}/kerchunk.py | 305 +++++++--------- virtualizarr/readers/zarr.py | 131 +++++++ virtualizarr/tests/test_backend.py | 255 +++++++++++++ virtualizarr/tests/test_kerchunk.py | 238 +----------- virtualizarr/tests/test_readers/__init__.py | 0 .../tests/test_readers/test_kerchunk.py | 63 ++++ virtualizarr/tests/test_writers/__init__.py | 0 .../tests/test_writers/test_kerchunk.py | 118 ++++++ virtualizarr/tests/test_writers/test_zarr.py | 82 +++++ virtualizarr/tests/test_xarray.py | 187 ---------- virtualizarr/tests/test_zarr.py | 80 +---- virtualizarr/types/__init__.py | 3 + virtualizarr/{types.py => types/general.py} | 0 virtualizarr/types/kerchunk.py | 12 + virtualizarr/writers/__init__.py | 0 virtualizarr/writers/kerchunk.py | 124 +++++++ virtualizarr/writers/zarr.py | 115 ++++++ virtualizarr/zarr.py | 178 --------- 24 files changed, 1247 insertions(+), 1173 deletions(-) create mode 100644 virtualizarr/accessor.py rename virtualizarr/{xarray.py => backend.py} (50%) rename virtualizarr/{ => readers}/kerchunk.py (51%) create mode 100644 virtualizarr/readers/zarr.py create mode 100644 virtualizarr/tests/test_backend.py create mode 100644 virtualizarr/tests/test_readers/__init__.py create mode 100644 virtualizarr/tests/test_readers/test_kerchunk.py create mode 100644 virtualizarr/tests/test_writers/__init__.py create mode 100644 virtualizarr/tests/test_writers/test_kerchunk.py create mode 100644 virtualizarr/tests/test_writers/test_zarr.py create mode 100644 virtualizarr/types/__init__.py rename virtualizarr/{types.py => types/general.py} (100%) create mode 100644 virtualizarr/types/kerchunk.py create mode 100644 virtualizarr/writers/__init__.py create mode 100644 virtualizarr/writers/kerchunk.py create mode 100644 virtualizarr/writers/zarr.py diff --git a/docs/api.rst b/docs/api.rst index 3dc1d146..81d08a77 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -21,7 +21,7 @@ Manifests Reading ======= -.. currentmodule:: virtualizarr.xarray +.. currentmodule:: virtualizarr.backend .. autosummary:: :nosignatures: :toctree: generated/ @@ -32,7 +32,7 @@ Reading Serialization ============= -.. currentmodule:: virtualizarr.xarray +.. currentmodule:: virtualizarr.accessor .. autosummary:: :nosignatures: :toctree: generated/ @@ -44,7 +44,7 @@ Serialization Rewriting ============= -.. currentmodule:: virtualizarr.xarray +.. currentmodule:: virtualizarr.accessor .. autosummary:: :nosignatures: :toctree: generated/ diff --git a/docs/releases.rst b/docs/releases.rst index 3fff4211..5ae3bff4 100644 --- a/docs/releases.rst +++ b/docs/releases.rst @@ -34,7 +34,7 @@ Bug fixes - Exclude empty chunks during `ChunkDict` construction. (:pull:`198`) By `Gustavo Hidalgo `_. - Fixed regression in `fill_value` handling for datetime dtypes making virtual - Zarr stores unreadable (:pr:`206`) + Zarr stores unreadable (:pull:`206`) By `Timothy Hodson `_ Documentation @@ -43,6 +43,9 @@ Documentation Internal Changes ~~~~~~~~~~~~~~~~ +- Refactored internal structure significantly to split up everything to do with reading references from that to do with writing references. + (:issue:`229`) (:pull:`231`) By `Tom Nicholas `_. + .. _v1.0.0: v1.0.0 (9th July 2024) diff --git a/virtualizarr/__init__.py b/virtualizarr/__init__.py index 11bdae6e..bd70f834 100644 --- a/virtualizarr/__init__.py +++ b/virtualizarr/__init__.py @@ -1,6 +1,6 @@ -from .manifests import ChunkManifest, ManifestArray # type: ignore # noqa -from .xarray import VirtualiZarrDatasetAccessor # type: ignore # noqa -from .xarray import open_virtual_dataset # noqa: F401 +from virtualizarr.manifests import ChunkManifest, ManifestArray # type: ignore # noqa +from virtualizarr.accessor import VirtualiZarrDatasetAccessor # type: ignore # noqa +from virtualizarr.backend import open_virtual_dataset # noqa: F401 from importlib.metadata import version as _version diff --git a/virtualizarr/accessor.py b/virtualizarr/accessor.py new file mode 100644 index 00000000..0a97237e --- /dev/null +++ b/virtualizarr/accessor.py @@ -0,0 +1,166 @@ +from pathlib import Path +from typing import ( + Callable, + Literal, + overload, +) + +import ujson # type: ignore +from xarray import Dataset, register_dataset_accessor + +from virtualizarr.manifests import ManifestArray +from virtualizarr.types.kerchunk import KerchunkStoreRefs +from virtualizarr.writers.kerchunk import dataset_to_kerchunk_refs +from virtualizarr.writers.zarr import dataset_to_zarr + + +@register_dataset_accessor("virtualize") +class VirtualiZarrDatasetAccessor: + """ + Xarray accessor for writing out virtual datasets to disk. + + Methods on this object are called via `ds.virtualize.{method}`. + """ + + def __init__(self, ds: Dataset): + self.ds: Dataset = ds + + def to_zarr(self, storepath: str) -> None: + """ + Serialize all virtualized arrays in this xarray dataset as a Zarr store. + + Currently requires all variables to be backed by ManifestArray objects. + + Not very useful until some implementation of a Zarr reader can actually read these manifest.json files. + See https://github.com/zarr-developers/zarr-specs/issues/287 + + Parameters + ---------- + storepath : str + """ + dataset_to_zarr(self.ds, storepath) + + @overload + def to_kerchunk( + self, filepath: None, format: Literal["dict"] + ) -> KerchunkStoreRefs: ... + + @overload + def to_kerchunk(self, filepath: str | Path, format: Literal["json"]) -> None: ... + + @overload + def to_kerchunk( + self, + filepath: str | Path, + format: Literal["parquet"], + record_size: int = 100_000, + categorical_threshold: int = 10, + ) -> None: ... + + def to_kerchunk( + self, + filepath: str | Path | None = None, + format: Literal["dict", "json", "parquet"] = "dict", + record_size: int = 100_000, + categorical_threshold: int = 10, + ) -> KerchunkStoreRefs | None: + """ + Serialize all virtualized arrays in this xarray dataset into the kerchunk references format. + + Parameters + ---------- + filepath : str, default: None + File path to write kerchunk references into. Not required if format is 'dict'. + format : 'dict', 'json', or 'parquet' + Format to serialize the kerchunk references as. + If 'json' or 'parquet' then the 'filepath' argument is required. + record_size (parquet only): int + Number of references to store in each reference file (default 100,000). Bigger values + mean fewer read requests but larger memory footprint. + categorical_threshold (parquet only) : int + Encode urls as pandas.Categorical to reduce memory footprint if the ratio + of the number of unique urls to total number of refs for each variable + is greater than or equal to this number. (default 10) + + References + ---------- + https://fsspec.github.io/kerchunk/spec.html + """ + refs = dataset_to_kerchunk_refs(self.ds) + + if format == "dict": + return refs + elif format == "json": + if filepath is None: + raise ValueError("Filepath must be provided when format is 'json'") + + with open(filepath, "w") as json_file: + ujson.dump(refs, json_file) + + return None + elif format == "parquet": + from kerchunk.df import refs_to_dataframe + + if isinstance(filepath, Path): + url = str(filepath) + elif isinstance(filepath, str): + url = filepath + + # refs_to_dataframe is responsible for writing to parquet. + # at no point does it create a full in-memory dataframe. + refs_to_dataframe( + refs, + url=url, + record_size=record_size, + categorical_threshold=categorical_threshold, + ) + return None + else: + raise ValueError(f"Unrecognized output format: {format}") + + def rename_paths( + self, + new: str | Callable[[str], str], + ) -> Dataset: + """ + Rename paths to chunks in every ManifestArray in this dataset. + + Accepts either a string, in which case this new path will be used for all chunks, or + a function which accepts the old path and returns the new path. + + Parameters + ---------- + new + New path to use for all chunks, either as a string, or as a function which accepts and returns strings. + + Returns + ------- + Dataset + + Examples + -------- + Rename paths to reflect moving the referenced files from local storage to an S3 bucket. + + >>> def local_to_s3_url(old_local_path: str) -> str: + ... from pathlib import Path + ... + ... new_s3_bucket_url = "http://s3.amazonaws.com/my_bucket/" + ... + ... filename = Path(old_local_path).name + ... return str(new_s3_bucket_url / filename) + + >>> ds.virtualize.rename_paths(local_to_s3_url) + + See Also + -------- + ManifestArray.rename_paths + ChunkManifest.rename_paths + """ + + new_ds = self.ds.copy() + for var_name in new_ds.variables: + data = new_ds[var_name].data + if isinstance(data, ManifestArray): + new_ds[var_name].data = data.rename_paths(new=new) + + return new_ds diff --git a/virtualizarr/xarray.py b/virtualizarr/backend.py similarity index 50% rename from virtualizarr/xarray.py rename to virtualizarr/backend.py index 0fb33815..87c2aa2a 100644 --- a/virtualizarr/xarray.py +++ b/virtualizarr/backend.py @@ -1,39 +1,47 @@ import os import warnings from collections.abc import Iterable, Mapping, MutableMapping +from enum import Enum, auto from io import BufferedIOBase -from pathlib import Path from typing import ( Any, - Callable, Hashable, - Literal, Optional, cast, - overload, ) -import ujson # type: ignore import xarray as xr -from xarray import register_dataset_accessor from xarray.backends import AbstractDataStore, BackendArray from xarray.coding.times import CFDatetimeCoder from xarray.core.indexes import Index, PandasIndex from xarray.core.variable import IndexVariable -import virtualizarr.kerchunk as kerchunk -from virtualizarr.kerchunk import FileType, KerchunkStoreRefs -from virtualizarr.manifests import ChunkManifest, ManifestArray +from virtualizarr.manifests import ManifestArray from virtualizarr.utils import _fsspec_openfile_from_filepath -from virtualizarr.zarr import ( - attrs_from_zarr_group_json, - dataset_to_zarr, - metadata_from_zarr_json, -) XArrayOpenT = str | os.PathLike[Any] | BufferedIOBase | AbstractDataStore +class AutoName(Enum): + # Recommended by official Python docs for auto naming: + # https://docs.python.org/3/library/enum.html#using-automatic-values + def _generate_next_value_(name, start, count, last_values): + return name + + +class FileType(AutoName): + netcdf3 = auto() + netcdf4 = auto() # NOTE: netCDF4 is a subset of hdf5 + hdf4 = auto() + hdf5 = auto() + grib = auto() + tiff = auto() + fits = auto() + zarr = auto() + dmrpp = auto() + zarr_v3 = auto() + + class ManifestBackendArray(ManifestArray, BackendArray): """Using this prevents xarray from wrapping the KerchunkArray in ExplicitIndexingAdapter etc.""" @@ -134,6 +142,8 @@ def open_virtual_dataset( if filetype == FileType.zarr_v3: # TODO is there a neat way of auto-detecting this? + from virtualizarr.readers.zarr import open_virtual_dataset_from_v3_store + return open_virtual_dataset_from_v3_store( storepath=filepath, drop_variables=drop_variables, indexes=indexes ) @@ -153,12 +163,19 @@ def open_virtual_dataset( vds.drop_vars(drop_variables) return vds else: + # we currently read every other filetype using kerchunks various file format backends + from virtualizarr.readers.kerchunk import ( + fully_decode_arr_refs, + read_kerchunk_references_from_file, + virtual_vars_from_kerchunk_refs, + ) + if reader_options is None: reader_options = {} # this is the only place we actually always need to use kerchunk directly # TODO avoid even reading byte ranges for variables that will be dropped later anyway? - vds_refs = kerchunk.read_kerchunk_references_from_file( + vds_refs = read_kerchunk_references_from_file( filepath=filepath, filetype=filetype, reader_options=reader_options, @@ -168,7 +185,7 @@ def open_virtual_dataset( drop_variables=drop_variables + loadable_variables, virtual_array_class=virtual_array_class, ) - ds_attrs = kerchunk.fully_decode_arr_refs(vds_refs["refs"]).get(".zattrs", {}) + ds_attrs = fully_decode_arr_refs(vds_refs["refs"]).get(".zattrs", {}) coord_names = ds_attrs.pop("coordinates", []) if indexes is None or len(loadable_variables) > 0: @@ -235,144 +252,6 @@ def open_virtual_dataset( return vds -def open_virtual_dataset_from_v3_store( - storepath: str, - drop_variables: list[str], - indexes: Mapping[str, Index] | None, -) -> xr.Dataset: - """ - Read a Zarr v3 store and return an xarray Dataset containing virtualized arrays. - """ - _storepath = Path(storepath) - - ds_attrs = attrs_from_zarr_group_json(_storepath / "zarr.json") - coord_names = ds_attrs.pop("coordinates", []) - - # TODO recursive glob to create a datatree - # Note: this .is_file() check should not be necessary according to the pathlib docs, but tests fail on github CI without it - # see https://github.com/TomNicholas/VirtualiZarr/pull/45#discussion_r1547833166 - all_paths = _storepath.glob("*/") - directory_paths = [p for p in all_paths if not p.is_file()] - - vars = {} - for array_dir in directory_paths: - var_name = array_dir.name - if var_name in drop_variables: - break - - zarray, dim_names, attrs = metadata_from_zarr_json(array_dir / "zarr.json") - manifest = ChunkManifest.from_zarr_json(str(array_dir / "manifest.json")) - - marr = ManifestArray(chunkmanifest=manifest, zarray=zarray) - var = xr.Variable(data=marr, dims=dim_names, attrs=attrs) - vars[var_name] = var - - if indexes is None: - raise NotImplementedError() - elif indexes != {}: - # TODO allow manual specification of index objects - raise NotImplementedError() - else: - indexes = dict(**indexes) # for type hinting: to allow mutation - - data_vars, coords = separate_coords(vars, indexes, coord_names) - - ds = xr.Dataset( - data_vars, - coords=coords, - # indexes={}, # TODO should be added in a later version of xarray - attrs=ds_attrs, - ) - - return ds - - -def virtual_vars_from_kerchunk_refs( - refs: KerchunkStoreRefs, - drop_variables: list[str] | None = None, - virtual_array_class=ManifestArray, -) -> dict[str, xr.Variable]: - """ - Translate a store-level kerchunk reference dict into aaset of xarray Variables containing virtualized arrays. - - Parameters - ---------- - drop_variables: list[str], default is None - Variables in the file to drop before returning. - virtual_array_class - Virtual array class to use to represent the references to the chunks in each on-disk array. - Currently can only be ManifestArray, but once VirtualZarrArray is implemented the default should be changed to that. - """ - - var_names = kerchunk.find_var_names(refs) - if drop_variables is None: - drop_variables = [] - var_names_to_keep = [ - var_name for var_name in var_names if var_name not in drop_variables - ] - - vars = { - var_name: variable_from_kerchunk_refs(refs, var_name, virtual_array_class) - for var_name in var_names_to_keep - } - return vars - - -def dataset_from_kerchunk_refs( - refs: KerchunkStoreRefs, - drop_variables: list[str] = [], - virtual_array_class: type = ManifestArray, - indexes: MutableMapping[str, Index] | None = None, -) -> xr.Dataset: - """ - Translate a store-level kerchunk reference dict into an xarray Dataset containing virtualized arrays. - - drop_variables: list[str], default is None - Variables in the file to drop before returning. - virtual_array_class - Virtual array class to use to represent the references to the chunks in each on-disk array. - Currently can only be ManifestArray, but once VirtualZarrArray is implemented the default should be changed to that. - """ - - vars = virtual_vars_from_kerchunk_refs(refs, drop_variables, virtual_array_class) - ds_attrs = kerchunk.fully_decode_arr_refs(refs["refs"]).get(".zattrs", {}) - coord_names = ds_attrs.pop("coordinates", []) - - if indexes is None: - indexes = {} - data_vars, coords = separate_coords(vars, indexes, coord_names) - - vds = xr.Dataset( - data_vars, - coords=coords, - # indexes={}, # TODO should be added in a later version of xarray - attrs=ds_attrs, - ) - - return vds - - -def variable_from_kerchunk_refs( - refs: KerchunkStoreRefs, var_name: str, virtual_array_class -) -> xr.Variable: - """Create a single xarray Variable by reading specific keys of a kerchunk references dict.""" - - arr_refs = kerchunk.extract_array_refs(refs, var_name) - chunk_dict, zarray, zattrs = kerchunk.parse_array_refs(arr_refs) - # we want to remove the _ARRAY_DIMENSIONS from the final variables' .attrs - dims = zattrs.pop("_ARRAY_DIMENSIONS") - if chunk_dict: - manifest = ChunkManifest._from_kerchunk_chunk_dict(chunk_dict) - varr = virtual_array_class(zarray=zarray, chunkmanifest=manifest) - else: - # This means we encountered a scalar variable of dimension 0, - # very likely that it actually has no numeric value and its only purpose - # is to communicate dataset attributes. - varr = zarray.fill_value - - return xr.Variable(data=varr, dims=dims, attrs=zattrs) - - def separate_coords( vars: Mapping[str, xr.Variable], indexes: MutableMapping[str, Index], @@ -415,155 +294,3 @@ def separate_coords( coords = xr.Coordinates(coord_vars, indexes=indexes) return data_vars, coords - - -@register_dataset_accessor("virtualize") -class VirtualiZarrDatasetAccessor: - """ - Xarray accessor for writing out virtual datasets to disk. - - Methods on this object are called via `ds.virtualize.{method}`. - """ - - def __init__(self, ds: xr.Dataset): - self.ds: xr.Dataset = ds - - def to_zarr(self, storepath: str) -> None: - """ - Serialize all virtualized arrays in this xarray dataset as a Zarr store. - - Currently requires all variables to be backed by ManifestArray objects. - - Not very useful until some implementation of a Zarr reader can actually read these manifest.json files. - See https://github.com/zarr-developers/zarr-specs/issues/287 - - Parameters - ---------- - storepath : str - """ - dataset_to_zarr(self.ds, storepath) - - @overload - def to_kerchunk( - self, filepath: None, format: Literal["dict"] - ) -> KerchunkStoreRefs: ... - - @overload - def to_kerchunk(self, filepath: str | Path, format: Literal["json"]) -> None: ... - - @overload - def to_kerchunk( - self, - filepath: str | Path, - format: Literal["parquet"], - record_size: int = 100_000, - categorical_threshold: int = 10, - ) -> None: ... - - def to_kerchunk( - self, - filepath: str | Path | None = None, - format: Literal["dict", "json", "parquet"] = "dict", - record_size: int = 100_000, - categorical_threshold: int = 10, - ) -> KerchunkStoreRefs | None: - """ - Serialize all virtualized arrays in this xarray dataset into the kerchunk references format. - - Parameters - ---------- - filepath : str, default: None - File path to write kerchunk references into. Not required if format is 'dict'. - format : 'dict', 'json', or 'parquet' - Format to serialize the kerchunk references as. - If 'json' or 'parquet' then the 'filepath' argument is required. - record_size (parquet only): int - Number of references to store in each reference file (default 100,000). Bigger values - mean fewer read requests but larger memory footprint. - categorical_threshold (parquet only) : int - Encode urls as pandas.Categorical to reduce memory footprint if the ratio - of the number of unique urls to total number of refs for each variable - is greater than or equal to this number. (default 10) - - References - ---------- - https://fsspec.github.io/kerchunk/spec.html - """ - refs = kerchunk.dataset_to_kerchunk_refs(self.ds) - - if format == "dict": - return refs - elif format == "json": - if filepath is None: - raise ValueError("Filepath must be provided when format is 'json'") - - with open(filepath, "w") as json_file: - ujson.dump(refs, json_file) - - return None - elif format == "parquet": - from kerchunk.df import refs_to_dataframe - - if isinstance(filepath, Path): - url = str(filepath) - elif isinstance(filepath, str): - url = filepath - - # refs_to_dataframe is responsible for writing to parquet. - # at no point does it create a full in-memory dataframe. - refs_to_dataframe( - refs, - url=url, - record_size=record_size, - categorical_threshold=categorical_threshold, - ) - return None - else: - raise ValueError(f"Unrecognized output format: {format}") - - def rename_paths( - self, - new: str | Callable[[str], str], - ) -> xr.Dataset: - """ - Rename paths to chunks in every ManifestArray in this dataset. - - Accepts either a string, in which case this new path will be used for all chunks, or - a function which accepts the old path and returns the new path. - - Parameters - ---------- - new - New path to use for all chunks, either as a string, or as a function which accepts and returns strings. - - Returns - ------- - Dataset - - Examples - -------- - Rename paths to reflect moving the referenced files from local storage to an S3 bucket. - - >>> def local_to_s3_url(old_local_path: str) -> str: - ... from pathlib import Path - ... - ... new_s3_bucket_url = "http://s3.amazonaws.com/my_bucket/" - ... - ... filename = Path(old_local_path).name - ... return str(new_s3_bucket_url / filename) - - >>> ds.virtualize.rename_paths(local_to_s3_url) - - See Also - -------- - ManifestArray.rename_paths - ChunkManifest.rename_paths - """ - - new_ds = self.ds.copy() - for var_name in new_ds.variables: - data = new_ds[var_name].data - if isinstance(data, ManifestArray): - new_ds[var_name].data = data.rename_paths(new=new) - - return new_ds diff --git a/virtualizarr/manifests/array.py b/virtualizarr/manifests/array.py index 0ec9c844..5ac0aef0 100644 --- a/virtualizarr/manifests/array.py +++ b/virtualizarr/manifests/array.py @@ -3,7 +3,7 @@ import numpy as np -from ..kerchunk import KerchunkArrRefs +from ..types.kerchunk import KerchunkArrRefs from ..zarr import ZArray from .array_api import MANIFESTARRAY_HANDLED_ARRAY_FUNCTIONS, _isnan from .manifest import ChunkManifest @@ -61,7 +61,10 @@ def __init__( @classmethod def _from_kerchunk_refs(cls, arr_refs: KerchunkArrRefs) -> "ManifestArray": - from virtualizarr.kerchunk import fully_decode_arr_refs, parse_array_refs + from virtualizarr.readers.kerchunk import ( + fully_decode_arr_refs, + parse_array_refs, + ) decoded_arr_refs = fully_decode_arr_refs(arr_refs) diff --git a/virtualizarr/kerchunk.py b/virtualizarr/readers/kerchunk.py similarity index 51% rename from virtualizarr/kerchunk.py rename to virtualizarr/readers/kerchunk.py index a73f2cda..4686ce94 100644 --- a/virtualizarr/kerchunk.py +++ b/virtualizarr/readers/kerchunk.py @@ -1,61 +1,57 @@ -import base64 -import json import warnings -from enum import Enum, auto from pathlib import Path -from typing import Any, NewType, Optional, cast +from typing import Any, MutableMapping, Optional, cast -import numpy as np import ujson # type: ignore -import xarray as xr -from xarray.coding.times import CFDatetimeCoder - -from virtualizarr.manifests.manifest import join +from xarray import Dataset +from xarray.core.indexes import Index +from xarray.core.variable import Variable + +from virtualizarr.backend import FileType, separate_coords +from virtualizarr.manifests import ChunkManifest, ManifestArray +from virtualizarr.types.kerchunk import ( + KerchunkArrRefs, + KerchunkStoreRefs, +) from virtualizarr.utils import _fsspec_openfile_from_filepath from virtualizarr.zarr import ZArray, ZAttrs -# Distinguishing these via type hints makes it a lot easier to mentally keep track of what the opaque kerchunk "reference dicts" actually mean -# (idea from https://kobzol.github.io/rust/python/2023/05/20/writing-python-like-its-rust.html) -# TODO I would prefer to be more specific about these types -KerchunkStoreRefs = NewType( - "KerchunkStoreRefs", dict -) # top-level dict with keys for 'version', 'refs' -KerchunkArrRefs = NewType( - "KerchunkArrRefs", - dict, -) # lower-level dict containing just the information for one zarr array - - -class AutoName(Enum): - # Recommended by official Python docs for auto naming: - # https://docs.python.org/3/library/enum.html#using-automatic-values - def _generate_next_value_(name, start, count, last_values): - return name - - -class FileType(AutoName): - netcdf3 = auto() - netcdf4 = auto() # NOTE: netCDF4 is a subset of hdf5 - hdf4 = auto() - hdf5 = auto() - grib = auto() - tiff = auto() - fits = auto() - zarr = auto() - dmrpp = auto() - zarr_v3 = auto() - - -class NumpyEncoder(json.JSONEncoder): - # TODO I don't understand how kerchunk gets around this problem of encoding numpy types (in the zattrs) whilst only using ujson - def default(self, obj): - if isinstance(obj, np.ndarray): - return obj.tolist() # Convert NumPy array to Python list - elif isinstance(obj, np.generic): - return obj.item() # Convert NumPy scalar to Python scalar - elif isinstance(obj, np.dtype): - return str(obj) - return json.JSONEncoder.default(self, obj) + +# TODO shouldn't this live in backend.py? Because it's not just useful for the kerchunk-specific readers... +def _automatically_determine_filetype( + *, + filepath: str, + reader_options: Optional[dict[str, Any]] = {}, +) -> FileType: + if Path(filepath).suffix == ".zarr": + # TODO we could imagine opening an existing zarr store, concatenating it, and writing a new virtual one... + raise NotImplementedError() + + # Read magic bytes from local or remote file + fpath = _fsspec_openfile_from_filepath( + filepath=filepath, reader_options=reader_options + ) + magic_bytes = fpath.read(8) + fpath.close() + + if magic_bytes.startswith(b"CDF"): + filetype = FileType.netcdf3 + elif magic_bytes.startswith(b"\x0e\x03\x13\x01"): + raise NotImplementedError("HDF4 formatted files not supported") + elif magic_bytes.startswith(b"\x89HDF"): + filetype = FileType.hdf5 + elif magic_bytes.startswith(b"GRIB"): + filetype = FileType.grib + elif magic_bytes.startswith(b"II*"): + filetype = FileType.tiff + elif magic_bytes.startswith(b"SIMPLE"): + filetype = FileType.fits + else: + raise NotImplementedError( + f"Unrecognised file based on header bytes: {magic_bytes}" + ) + + return filetype def read_kerchunk_references_from_file( @@ -127,40 +123,90 @@ def read_kerchunk_references_from_file( return refs -def _automatically_determine_filetype( - *, - filepath: str, - reader_options: Optional[dict[str, Any]] = {}, -) -> FileType: - if Path(filepath).suffix == ".zarr": - # TODO we could imagine opening an existing zarr store, concatenating it, and writing a new virtual one... - raise NotImplementedError() +def virtual_vars_from_kerchunk_refs( + refs: KerchunkStoreRefs, + drop_variables: list[str] | None = None, + virtual_array_class=ManifestArray, +) -> dict[str, Variable]: + """ + Translate a store-level kerchunk reference dict into aaset of xarray Variables containing virtualized arrays. - # Read magic bytes from local or remote file - fpath = _fsspec_openfile_from_filepath( - filepath=filepath, reader_options=reader_options + Parameters + ---------- + drop_variables: list[str], default is None + Variables in the file to drop before returning. + virtual_array_class + Virtual array class to use to represent the references to the chunks in each on-disk array. + Currently can only be ManifestArray, but once VirtualZarrArray is implemented the default should be changed to that. + """ + + var_names = find_var_names(refs) + if drop_variables is None: + drop_variables = [] + var_names_to_keep = [ + var_name for var_name in var_names if var_name not in drop_variables + ] + + vars = { + var_name: variable_from_kerchunk_refs(refs, var_name, virtual_array_class) + for var_name in var_names_to_keep + } + return vars + + +def dataset_from_kerchunk_refs( + refs: KerchunkStoreRefs, + drop_variables: list[str] = [], + virtual_array_class: type = ManifestArray, + indexes: MutableMapping[str, Index] | None = None, +) -> Dataset: + """ + Translate a store-level kerchunk reference dict into an xarray Dataset containing virtualized arrays. + + drop_variables: list[str], default is None + Variables in the file to drop before returning. + virtual_array_class + Virtual array class to use to represent the references to the chunks in each on-disk array. + Currently can only be ManifestArray, but once VirtualZarrArray is implemented the default should be changed to that. + """ + + vars = virtual_vars_from_kerchunk_refs(refs, drop_variables, virtual_array_class) + ds_attrs = fully_decode_arr_refs(refs["refs"]).get(".zattrs", {}) + coord_names = ds_attrs.pop("coordinates", []) + + if indexes is None: + indexes = {} + data_vars, coords = separate_coords(vars, indexes, coord_names) + + vds = Dataset( + data_vars, + coords=coords, + # indexes={}, # TODO should be added in a later version of xarray + attrs=ds_attrs, ) - magic_bytes = fpath.read(8) - fpath.close() - if magic_bytes.startswith(b"CDF"): - filetype = FileType.netcdf3 - elif magic_bytes.startswith(b"\x0e\x03\x13\x01"): - raise NotImplementedError("HDF4 formatted files not supported") - elif magic_bytes.startswith(b"\x89HDF"): - filetype = FileType.hdf5 - elif magic_bytes.startswith(b"GRIB"): - filetype = FileType.grib - elif magic_bytes.startswith(b"II*"): - filetype = FileType.tiff - elif magic_bytes.startswith(b"SIMPLE"): - filetype = FileType.fits + return vds + + +def variable_from_kerchunk_refs( + refs: KerchunkStoreRefs, var_name: str, virtual_array_class +) -> Variable: + """Create a single xarray Variable by reading specific keys of a kerchunk references dict.""" + + arr_refs = extract_array_refs(refs, var_name) + chunk_dict, zarray, zattrs = parse_array_refs(arr_refs) + # we want to remove the _ARRAY_DIMENSIONS from the final variables' .attrs + dims = zattrs.pop("_ARRAY_DIMENSIONS") + if chunk_dict: + manifest = ChunkManifest._from_kerchunk_chunk_dict(chunk_dict) + varr = virtual_array_class(zarray=zarray, chunkmanifest=manifest) else: - raise NotImplementedError( - f"Unrecognised file based on header bytes: {magic_bytes}" - ) + # This means we encountered a scalar variable of dimension 0, + # very likely that it actually has no numeric value and its only purpose + # is to communicate dataset attributes. + varr = zarray.fill_value - return filetype + return Variable(data=varr, dims=dims, attrs=zattrs) def find_var_names(ds_reference_dict: KerchunkStoreRefs) -> list[str]: @@ -216,102 +262,3 @@ def fully_decode_arr_refs(d: dict) -> KerchunkArrRefs: sanitized[k] = ujson.loads(v) return cast(KerchunkArrRefs, sanitized) - - -def dataset_to_kerchunk_refs(ds: xr.Dataset) -> KerchunkStoreRefs: - """ - Create a dictionary containing kerchunk-style store references from a single xarray.Dataset (which wraps ManifestArray objects). - """ - - all_arr_refs = {} - for var_name, var in ds.variables.items(): - arr_refs = variable_to_kerchunk_arr_refs(var, str(var_name)) - - prepended_with_var_name = { - f"{var_name}/{key}": val for key, val in arr_refs.items() - } - - all_arr_refs.update(prepended_with_var_name) - - zattrs = ds.attrs - if ds.coords: - coord_names = [str(x) for x in ds.coords] - # this weird concatenated string instead of a list of strings is inconsistent with how other features in the kerchunk references format are stored - # see https://github.com/zarr-developers/VirtualiZarr/issues/105#issuecomment-2187266739 - zattrs["coordinates"] = " ".join(coord_names) - - ds_refs = { - "version": 1, - "refs": { - ".zgroup": '{"zarr_format":2}', - ".zattrs": ujson.dumps(zattrs), - **all_arr_refs, - }, - } - - return cast(KerchunkStoreRefs, ds_refs) - - -def variable_to_kerchunk_arr_refs(var: xr.Variable, var_name: str) -> KerchunkArrRefs: - """ - Create a dictionary containing kerchunk-style array references from a single xarray.Variable (which wraps either a ManifestArray or a numpy array). - - Partially encodes the inner dicts to json to match kerchunk behaviour (see https://github.com/fsspec/kerchunk/issues/415). - """ - from virtualizarr.manifests import ManifestArray - - if isinstance(var.data, ManifestArray): - marr = var.data - - arr_refs: dict[str, str | list[str | int]] = { - str(chunk_key): [entry["path"], entry["offset"], entry["length"]] - for chunk_key, entry in marr.manifest.dict().items() - } - - zarray = marr.zarray.replace(zarr_format=2) - - else: - try: - np_arr = var.to_numpy() - except AttributeError as e: - raise TypeError( - f"Can only serialize wrapped arrays of type ManifestArray or numpy.ndarray, but got type {type(var.data)}" - ) from e - - if var.encoding: - if "scale_factor" in var.encoding: - raise NotImplementedError( - f"Cannot serialize loaded variable {var_name}, as it is encoded with a scale_factor" - ) - if "offset" in var.encoding: - raise NotImplementedError( - f"Cannot serialize loaded variable {var_name}, as it is encoded with an offset" - ) - if "calendar" in var.encoding: - np_arr = CFDatetimeCoder().encode(var.copy(), name=var_name).values - - # This encoding is what kerchunk does when it "inlines" data, see https://github.com/fsspec/kerchunk/blob/a0c4f3b828d37f6d07995925b324595af68c4a19/kerchunk/hdf.py#L472 - byte_data = np_arr.tobytes() - # TODO do I really need to encode then decode like this? - inlined_data = (b"base64:" + base64.b64encode(byte_data)).decode("utf-8") - - # TODO can this be generalized to save individual chunks of a dask array? - # TODO will this fail for a scalar? - arr_refs = {join(0 for _ in np_arr.shape): inlined_data} - - zarray = ZArray( - chunks=np_arr.shape, - shape=np_arr.shape, - dtype=np_arr.dtype, - order="C", - fill_value=None, - ) - - zarray_dict = zarray.to_kerchunk_json() - arr_refs[".zarray"] = zarray_dict - - zattrs = {**var.attrs, **var.encoding} - zattrs["_ARRAY_DIMENSIONS"] = list(var.dims) - arr_refs[".zattrs"] = json.dumps(zattrs, separators=(",", ":"), cls=NumpyEncoder) - - return cast(KerchunkArrRefs, arr_refs) diff --git a/virtualizarr/readers/zarr.py b/virtualizarr/readers/zarr.py new file mode 100644 index 00000000..b841d5c3 --- /dev/null +++ b/virtualizarr/readers/zarr.py @@ -0,0 +1,131 @@ +import json +from pathlib import Path +from typing import Mapping + +import numcodecs +import numpy as np +from xarray import Dataset +from xarray.core.indexes import Index +from xarray.core.variable import Variable + +from virtualizarr.backend import separate_coords +from virtualizarr.manifests import ChunkManifest, ManifestArray +from virtualizarr.zarr import ZArray + + +def open_virtual_dataset_from_v3_store( + storepath: str, + drop_variables: list[str], + indexes: Mapping[str, Index] | None, +) -> Dataset: + """ + Read a Zarr v3 store and return an xarray Dataset containing virtualized arrays. + """ + _storepath = Path(storepath) + + ds_attrs = attrs_from_zarr_group_json(_storepath / "zarr.json") + coord_names = ds_attrs.pop("coordinates", []) + + # TODO recursive glob to create a datatree + # Note: this .is_file() check should not be necessary according to the pathlib docs, but tests fail on github CI without it + # see https://github.com/TomNicholas/VirtualiZarr/pull/45#discussion_r1547833166 + all_paths = _storepath.glob("*/") + directory_paths = [p for p in all_paths if not p.is_file()] + + vars = {} + for array_dir in directory_paths: + var_name = array_dir.name + if var_name in drop_variables: + break + + zarray, dim_names, attrs = metadata_from_zarr_json(array_dir / "zarr.json") + manifest = ChunkManifest.from_zarr_json(str(array_dir / "manifest.json")) + + marr = ManifestArray(chunkmanifest=manifest, zarray=zarray) + var = Variable(data=marr, dims=dim_names, attrs=attrs) + vars[var_name] = var + + if indexes is None: + raise NotImplementedError() + elif indexes != {}: + # TODO allow manual specification of index objects + raise NotImplementedError() + else: + indexes = dict(**indexes) # for type hinting: to allow mutation + + data_vars, coords = separate_coords(vars, indexes, coord_names) + + ds = Dataset( + data_vars, + coords=coords, + # indexes={}, # TODO should be added in a later version of xarray + attrs=ds_attrs, + ) + + return ds + + +def attrs_from_zarr_group_json(filepath: Path) -> dict: + with open(filepath) as metadata_file: + attrs = json.load(metadata_file) + return attrs["attributes"] + + +def metadata_from_zarr_json(filepath: Path) -> tuple[ZArray, list[str], dict]: + with open(filepath) as metadata_file: + metadata = json.load(metadata_file) + + if { + "name": "chunk-manifest-json", + "configuration": { + "manifest": "./manifest.json", + }, + } not in metadata.get("storage_transformers", []): + raise ValueError( + "Can only read byte ranges from Zarr v3 stores which implement the manifest storage transformer ZEP." + ) + + attrs = metadata.pop("attributes") + dim_names = metadata.pop("dimension_names") + + chunk_shape = tuple(metadata["chunk_grid"]["configuration"]["chunk_shape"]) + shape = tuple(metadata["shape"]) + zarr_format = metadata["zarr_format"] + + if metadata["fill_value"] is None: + raise ValueError( + "fill_value must be specified https://zarr-specs.readthedocs.io/en/latest/v3/core/v3.0.html#fill-value" + ) + else: + fill_value = metadata["fill_value"] + + all_codecs = [ + codec + for codec in metadata["codecs"] + if codec["name"] not in ("transpose", "bytes") + ] + compressor, *filters = [ + _configurable_to_num_codec_config(_filter) for _filter in all_codecs + ] + zarray = ZArray( + chunks=chunk_shape, + compressor=compressor, + dtype=np.dtype(metadata["data_type"]), + fill_value=fill_value, + filters=filters or None, + order="C", + shape=shape, + zarr_format=zarr_format, + ) + + return zarray, dim_names, attrs + + +def _configurable_to_num_codec_config(configurable: dict) -> dict: + """ + Convert a zarr v3 configurable into a numcodecs codec. + """ + configurable_copy = configurable.copy() + codec_id = configurable_copy.pop("name") + configuration = configurable_copy.pop("configuration") + return numcodecs.get_codec({"id": codec_id, **configuration}).get_config() diff --git a/virtualizarr/tests/test_backend.py b/virtualizarr/tests/test_backend.py new file mode 100644 index 00000000..3b0c0315 --- /dev/null +++ b/virtualizarr/tests/test_backend.py @@ -0,0 +1,255 @@ +from collections.abc import Mapping +from unittest.mock import patch + +import numpy as np +import pytest +import xarray as xr +import xarray.testing as xrt +from xarray import open_dataset +from xarray.core.indexes import Index + +from virtualizarr import open_virtual_dataset +from virtualizarr.backend import FileType +from virtualizarr.manifests import ManifestArray +from virtualizarr.readers.kerchunk import _automatically_determine_filetype +from virtualizarr.tests import has_astropy, has_tifffile, network, requires_s3fs + + +def test_automatically_determine_filetype_netcdf3_netcdf4(): + # test the NetCDF3 vs NetCDF4 automatic file type selection + + ds = xr.Dataset({"a": (["x"], [0, 1])}) + netcdf3_file_path = "/tmp/netcdf3.nc" + netcdf4_file_path = "/tmp/netcdf4.nc" + + # write two version of NetCDF + ds.to_netcdf(netcdf3_file_path, engine="scipy", format="NETCDF3_CLASSIC") + ds.to_netcdf(netcdf4_file_path, engine="h5netcdf") + + assert FileType("netcdf3") == _automatically_determine_filetype( + filepath=netcdf3_file_path + ) + assert FileType("hdf5") == _automatically_determine_filetype( + filepath=netcdf4_file_path + ) + + +@pytest.mark.parametrize( + "filetype,headerbytes", + [ + ("netcdf3", b"CDF"), + ("hdf5", b"\x89HDF"), + ("grib", b"GRIB"), + ("tiff", b"II*"), + ("fits", b"SIMPLE"), + ], +) +def test_valid_filetype_bytes(tmp_path, filetype, headerbytes): + filepath = tmp_path / "file.abc" + with open(filepath, "wb") as f: + f.write(headerbytes) + assert FileType(filetype) == _automatically_determine_filetype(filepath=filepath) + + +def test_notimplemented_filetype(tmp_path): + for headerbytes in [b"JUNK", b"\x0e\x03\x13\x01"]: + filepath = tmp_path / "file.abc" + with open(filepath, "wb") as f: + f.write(headerbytes) + with pytest.raises(NotImplementedError): + _automatically_determine_filetype(filepath=filepath) + + +def test_FileType(): + # tests if FileType converts user supplied strings to correct filetype + assert "netcdf3" == FileType("netcdf3").name + assert "netcdf4" == FileType("netcdf4").name + assert "hdf4" == FileType("hdf4").name + assert "hdf5" == FileType("hdf5").name + assert "grib" == FileType("grib").name + assert "tiff" == FileType("tiff").name + assert "fits" == FileType("fits").name + assert "zarr" == FileType("zarr").name + with pytest.raises(ValueError): + FileType(None) + + +class TestOpenVirtualDatasetIndexes: + def test_no_indexes(self, netcdf4_file): + vds = open_virtual_dataset(netcdf4_file, indexes={}) + assert vds.indexes == {} + + def test_create_default_indexes(self, netcdf4_file): + with pytest.warns(UserWarning, match="will create in-memory pandas indexes"): + vds = open_virtual_dataset(netcdf4_file, indexes=None) + ds = open_dataset(netcdf4_file, decode_times=False) + + # TODO use xr.testing.assert_identical(vds.indexes, ds.indexes) instead once class supported by assertion comparison, see https://github.com/pydata/xarray/issues/5812 + assert index_mappings_equal(vds.xindexes, ds.xindexes) + + +def index_mappings_equal(indexes1: Mapping[str, Index], indexes2: Mapping[str, Index]): + # Check if the mappings have the same keys + if set(indexes1.keys()) != set(indexes2.keys()): + return False + + # Check if the values for each key are identical + for key in indexes1.keys(): + index1 = indexes1[key] + index2 = indexes2[key] + + if not index1.equals(index2): + return False + + return True + + +class TestOpenVirtualDatasetAttrs: + def test_drop_array_dimensions(self, netcdf4_file): + # regression test for GH issue #150 + vds = open_virtual_dataset(netcdf4_file, indexes={}) + assert "_ARRAY_DIMENSIONS" not in vds["air"].attrs + + def test_coordinate_variable_attrs_preserved(self, netcdf4_file): + # regression test for GH issue #155 + vds = open_virtual_dataset(netcdf4_file, indexes={}) + assert vds["lat"].attrs == { + "standard_name": "latitude", + "long_name": "Latitude", + "units": "degrees_north", + "axis": "Y", + } + + +@network +@requires_s3fs +class TestReadFromS3: + @pytest.mark.parametrize( + "filetype", ["netcdf4", None], ids=["netcdf4 filetype", "None filetype"] + ) + @pytest.mark.parametrize( + "indexes", [None, {}], ids=["None index", "empty dict index"] + ) + def test_anon_read_s3(self, filetype, indexes): + """Parameterized tests for empty vs supplied indexes and filetypes.""" + # TODO: Switch away from this s3 url after minIO is implemented. + fpath = "s3://carbonplan-share/virtualizarr/local.nc" + vds = open_virtual_dataset( + fpath, + filetype=filetype, + indexes=indexes, + reader_options={"storage_options": {"anon": True}}, + ) + + assert vds.dims == {"time": 2920, "lat": 25, "lon": 53} + for var in vds.variables: + assert isinstance(vds[var].data, ManifestArray), var + + +@network +class TestReadFromURL: + @pytest.mark.parametrize( + "filetype, url", + [ + ( + "grib", + "https://github.com/pydata/xarray-data/raw/master/era5-2mt-2019-03-uk.grib", + ), + ( + "netcdf3", + "https://github.com/pydata/xarray-data/raw/master/air_temperature.nc", + ), + ( + "netcdf4", + "https://github.com/pydata/xarray-data/raw/master/ROMS_example.nc", + ), + ( + "hdf4", + "https://github.com/corteva/rioxarray/raw/master/test/test_data/input/MOD09GA.A2008296.h14v17.006.2015181011753.hdf", + ), + # https://github.com/zarr-developers/VirtualiZarr/issues/159 + # ("hdf5", "https://github.com/fsspec/kerchunk/raw/main/kerchunk/tests/NEONDSTowerTemperatureData.hdf5"), + pytest.param( + "tiff", + "https://github.com/fsspec/kerchunk/raw/main/kerchunk/tests/lcmap_tiny_cog_2020.tif", + marks=pytest.mark.skipif( + not has_tifffile, reason="package tifffile is not available" + ), + ), + pytest.param( + "fits", + "https://fits.gsfc.nasa.gov/samples/WFPC2u5780205r_c0fx.fits", + marks=pytest.mark.skipif( + not has_astropy, reason="package astropy is not available" + ), + ), + ( + "jpg", + "https://github.com/rasterio/rasterio/raw/main/tests/data/389225main_sw_1965_1024.jpg", + ), + ], + ) + def test_read_from_url(self, filetype, url): + if filetype in ["grib", "jpg", "hdf4"]: + with pytest.raises(NotImplementedError): + vds = open_virtual_dataset(url, reader_options={}, indexes={}) + else: + vds = open_virtual_dataset(url, indexes={}) + assert isinstance(vds, xr.Dataset) + + +class TestLoadVirtualDataset: + def test_loadable_variables(self, netcdf4_file): + vars_to_load = ["air", "time"] + vds = open_virtual_dataset( + netcdf4_file, loadable_variables=vars_to_load, indexes={} + ) + + for name in vds.variables: + if name in vars_to_load: + assert isinstance(vds[name].data, np.ndarray), name + else: + assert isinstance(vds[name].data, ManifestArray), name + + full_ds = xr.open_dataset(netcdf4_file, decode_times=False) + + for name in full_ds.variables: + if name in vars_to_load: + xrt.assert_identical(vds.variables[name], full_ds.variables[name]) + + def test_explicit_filetype(self, netcdf4_file): + with pytest.raises(ValueError): + open_virtual_dataset(netcdf4_file, filetype="unknown") + + with pytest.raises(NotImplementedError): + open_virtual_dataset(netcdf4_file, filetype="grib") + + @patch("virtualizarr.readers.kerchunk.read_kerchunk_references_from_file") + def test_open_virtual_dataset_passes_expected_args( + self, mock_read_kerchunk, netcdf4_file + ): + reader_options = {"option1": "value1", "option2": "value2"} + open_virtual_dataset(netcdf4_file, indexes={}, reader_options=reader_options) + args = { + "filepath": netcdf4_file, + "filetype": None, + "reader_options": reader_options, + } + mock_read_kerchunk.assert_called_once_with(**args) + + def test_open_dataset_with_empty(self, hdf5_empty, tmpdir): + vds = open_virtual_dataset(hdf5_empty) + assert vds.empty.dims == () + assert vds.empty.attrs == {"empty": "true"} + + def test_open_dataset_with_scalar(self, hdf5_scalar, tmpdir): + vds = open_virtual_dataset(hdf5_scalar) + assert vds.scalar.dims == () + assert vds.scalar.attrs == {"scalar": "true"} + + +def test_cftime_variables_must_be_in_loadable_variables(tmpdir): + ds = xr.Dataset(data_vars={"time": ["2024-06-21"]}) + ds.to_netcdf(f"{tmpdir}/scalar.nc") + with pytest.raises(ValueError, match="'time' not in"): + open_virtual_dataset(f"{tmpdir}/scalar.nc", cftime_variables=["time"]) diff --git a/virtualizarr/tests/test_kerchunk.py b/virtualizarr/tests/test_kerchunk.py index 379c43ad..2442ec8d 100644 --- a/virtualizarr/tests/test_kerchunk.py +++ b/virtualizarr/tests/test_kerchunk.py @@ -1,185 +1,12 @@ import numpy as np -import pandas as pd -import pytest -import ujson # type: ignore import xarray as xr import xarray.testing as xrt -from virtualizarr.kerchunk import ( - FileType, - _automatically_determine_filetype, +from virtualizarr.manifests import ChunkManifest, ManifestArray +from virtualizarr.readers.kerchunk import ( + dataset_from_kerchunk_refs, find_var_names, ) -from virtualizarr.manifests import ChunkManifest, ManifestArray -from virtualizarr.xarray import dataset_from_kerchunk_refs - - -def gen_ds_refs( - zgroup: str = '{"zarr_format":2}', - zarray: str = '{"chunks":[2,3],"compressor":null,"dtype":" Dataset: + arr = ManifestArray( + chunkmanifest=ChunkManifest( + entries={"0.0": dict(path="test.nc", offset=6144, length=48)} + ), + zarray=dict( + shape=(2, 3), + dtype=np.dtype(" bool: + """ + Several metadata attributes in ZarrV3 use a dictionary with keys "name" : str and "configuration" : dict + """ + return "name" in value and "configuration" in value + + +def test_zarr_v3_metadata_conformance(tmpdir, vds_with_manifest_arrays: Dataset): + """ + Checks that the output metadata of an array variable conforms to this spec + for the required attributes: + https://zarr-specs.readthedocs.io/en/latest/v3/core/v3.0.html#metadata + """ + dataset_to_zarr(vds_with_manifest_arrays, tmpdir / "store.zarr") + # read the a variable's metadata + with open(tmpdir / "store.zarr/a/zarr.json", mode="r") as f: + metadata = json.loads(f.read()) + assert metadata["zarr_format"] == 3 + assert metadata["node_type"] == "array" + assert isinstance(metadata["shape"], list) and all( + isinstance(dim, int) for dim in metadata["shape"] + ) + assert isinstance(metadata["data_type"], str) or isconfigurable( + metadata["data_type"] + ) + assert isconfigurable(metadata["chunk_grid"]) + assert isconfigurable(metadata["chunk_key_encoding"]) + assert isinstance(metadata["fill_value"], (bool, int, float, str, list)) + assert ( + isinstance(metadata["codecs"], list) + and len(metadata["codecs"]) > 1 + and all(isconfigurable(codec) for codec in metadata["codecs"]) + ) + + +def test_zarr_v3_roundtrip(tmpdir, vds_with_manifest_arrays: Dataset): + vds_with_manifest_arrays.virtualize.to_zarr(tmpdir / "store.zarr") + roundtrip = open_virtual_dataset( + tmpdir / "store.zarr", filetype=FileType.zarr_v3, indexes={} + ) + + xrt.assert_identical(roundtrip, vds_with_manifest_arrays) + + +def test_metadata_roundtrip(tmpdir, vds_with_manifest_arrays: Dataset): + dataset_to_zarr(vds_with_manifest_arrays, tmpdir / "store.zarr") + zarray, _, _ = metadata_from_zarr_json(tmpdir / "store.zarr/a/zarr.json") + assert zarray == vds_with_manifest_arrays.a.data.zarray diff --git a/virtualizarr/tests/test_xarray.py b/virtualizarr/tests/test_xarray.py index 9133eb54..9db6e3a2 100644 --- a/virtualizarr/tests/test_xarray.py +++ b/virtualizarr/tests/test_xarray.py @@ -1,15 +1,9 @@ -from collections.abc import Mapping -from unittest.mock import patch - import numpy as np import pytest import xarray as xr -import xarray.testing as xrt -from xarray.core.indexes import Index from virtualizarr import open_virtual_dataset from virtualizarr.manifests import ChunkManifest, ManifestArray -from virtualizarr.tests import has_astropy, has_tifffile, network, requires_s3fs from virtualizarr.zarr import ZArray @@ -228,53 +222,6 @@ def test_concat_dim_coords_along_existing_dim(self): assert result.data.zarray.zarr_format == zarray.zarr_format -class TestOpenVirtualDatasetAttrs: - def test_drop_array_dimensions(self, netcdf4_file): - # regression test for GH issue #150 - vds = open_virtual_dataset(netcdf4_file, indexes={}) - assert "_ARRAY_DIMENSIONS" not in vds["air"].attrs - - def test_coordinate_variable_attrs_preserved(self, netcdf4_file): - # regression test for GH issue #155 - vds = open_virtual_dataset(netcdf4_file, indexes={}) - assert vds["lat"].attrs == { - "standard_name": "latitude", - "long_name": "Latitude", - "units": "degrees_north", - "axis": "Y", - } - - -class TestOpenVirtualDatasetIndexes: - def test_no_indexes(self, netcdf4_file): - vds = open_virtual_dataset(netcdf4_file, indexes={}) - assert vds.indexes == {} - - def test_create_default_indexes(self, netcdf4_file): - with pytest.warns(UserWarning, match="will create in-memory pandas indexes"): - vds = open_virtual_dataset(netcdf4_file, indexes=None) - ds = xr.open_dataset(netcdf4_file, decode_times=False) - - # TODO use xr.testing.assert_identical(vds.indexes, ds.indexes) instead once class supported by assertion comparison, see https://github.com/pydata/xarray/issues/5812 - assert index_mappings_equal(vds.xindexes, ds.xindexes) - - -def index_mappings_equal(indexes1: Mapping[str, Index], indexes2: Mapping[str, Index]): - # Check if the mappings have the same keys - if set(indexes1.keys()) != set(indexes2.keys()): - return False - - # Check if the values for each key are identical - for key in indexes1.keys(): - index1 = indexes1[key] - index2 = indexes2[key] - - if not index1.equals(index2): - return False - - return True - - class TestCombineUsingIndexes: def test_combine_by_coords(self, netcdf4_files): filepath1, filepath2 = netcdf4_files @@ -308,133 +255,6 @@ def test_combine_by_coords_keeping_manifestarrays(self, netcdf4_files): assert isinstance(combined_vds["lon"].data, ManifestArray) -@network -@requires_s3fs -class TestReadFromS3: - @pytest.mark.parametrize( - "filetype", ["netcdf4", None], ids=["netcdf4 filetype", "None filetype"] - ) - @pytest.mark.parametrize( - "indexes", [None, {}], ids=["None index", "empty dict index"] - ) - def test_anon_read_s3(self, filetype, indexes): - """Parameterized tests for empty vs supplied indexes and filetypes.""" - # TODO: Switch away from this s3 url after minIO is implemented. - fpath = "s3://carbonplan-share/virtualizarr/local.nc" - vds = open_virtual_dataset( - fpath, - filetype=filetype, - indexes=indexes, - reader_options={"storage_options": {"anon": True}}, - ) - - assert vds.dims == {"time": 2920, "lat": 25, "lon": 53} - for var in vds.variables: - assert isinstance(vds[var].data, ManifestArray), var - - -@network -class TestReadFromURL: - @pytest.mark.parametrize( - "filetype, url", - [ - ( - "grib", - "https://github.com/pydata/xarray-data/raw/master/era5-2mt-2019-03-uk.grib", - ), - ( - "netcdf3", - "https://github.com/pydata/xarray-data/raw/master/air_temperature.nc", - ), - ( - "netcdf4", - "https://github.com/pydata/xarray-data/raw/master/ROMS_example.nc", - ), - ( - "hdf4", - "https://github.com/corteva/rioxarray/raw/master/test/test_data/input/MOD09GA.A2008296.h14v17.006.2015181011753.hdf", - ), - # https://github.com/zarr-developers/VirtualiZarr/issues/159 - # ("hdf5", "https://github.com/fsspec/kerchunk/raw/main/kerchunk/tests/NEONDSTowerTemperatureData.hdf5"), - pytest.param( - "tiff", - "https://github.com/fsspec/kerchunk/raw/main/kerchunk/tests/lcmap_tiny_cog_2020.tif", - marks=pytest.mark.skipif( - not has_tifffile, reason="package tifffile is not available" - ), - ), - pytest.param( - "fits", - "https://fits.gsfc.nasa.gov/samples/WFPC2u5780205r_c0fx.fits", - marks=pytest.mark.skipif( - not has_astropy, reason="package astropy is not available" - ), - ), - ( - "jpg", - "https://github.com/rasterio/rasterio/raw/main/tests/data/389225main_sw_1965_1024.jpg", - ), - ], - ) - def test_read_from_url(self, filetype, url): - if filetype in ["grib", "jpg", "hdf4"]: - with pytest.raises(NotImplementedError): - vds = open_virtual_dataset(url, reader_options={}, indexes={}) - else: - vds = open_virtual_dataset(url, indexes={}) - assert isinstance(vds, xr.Dataset) - - -class TestLoadVirtualDataset: - def test_loadable_variables(self, netcdf4_file): - vars_to_load = ["air", "time"] - vds = open_virtual_dataset( - netcdf4_file, loadable_variables=vars_to_load, indexes={} - ) - - for name in vds.variables: - if name in vars_to_load: - assert isinstance(vds[name].data, np.ndarray), name - else: - assert isinstance(vds[name].data, ManifestArray), name - - full_ds = xr.open_dataset(netcdf4_file, decode_times=False) - - for name in full_ds.variables: - if name in vars_to_load: - xrt.assert_identical(vds.variables[name], full_ds.variables[name]) - - def test_explicit_filetype(self, netcdf4_file): - with pytest.raises(ValueError): - open_virtual_dataset(netcdf4_file, filetype="unknown") - - with pytest.raises(NotImplementedError): - open_virtual_dataset(netcdf4_file, filetype="grib") - - @patch("virtualizarr.kerchunk.read_kerchunk_references_from_file") - def test_open_virtual_dataset_passes_expected_args( - self, mock_read_kerchunk, netcdf4_file - ): - reader_options = {"option1": "value1", "option2": "value2"} - open_virtual_dataset(netcdf4_file, indexes={}, reader_options=reader_options) - args = { - "filepath": netcdf4_file, - "filetype": None, - "reader_options": reader_options, - } - mock_read_kerchunk.assert_called_once_with(**args) - - def test_open_dataset_with_empty(self, hdf5_empty, tmpdir): - vds = open_virtual_dataset(hdf5_empty) - assert vds.empty.dims == () - assert vds.empty.attrs == {"empty": "true"} - - def test_open_dataset_with_scalar(self, hdf5_scalar, tmpdir): - vds = open_virtual_dataset(hdf5_scalar) - assert vds.scalar.dims == () - assert vds.scalar.attrs == {"scalar": "true"} - - class TestRenamePaths: def test_rename_to_str(self, netcdf4_file): vds = open_virtual_dataset(netcdf4_file, indexes={}) @@ -477,10 +297,3 @@ def test_mixture_of_manifestarrays_and_numpy_arrays(self, netcdf4_file): == "s3://bucket/air.nc" ) assert isinstance(renamed_vds["lat"].data, np.ndarray) - - -def test_cftime_variables_must_be_in_loadable_variables(tmpdir): - ds = xr.Dataset(data_vars={"time": ["2024-06-21"]}) - ds.to_netcdf(f"{tmpdir}/scalar.nc") - with pytest.raises(ValueError, match="'time' not in"): - open_virtual_dataset(f"{tmpdir}/scalar.nc", cftime_variables=["time"]) diff --git a/virtualizarr/tests/test_zarr.py b/virtualizarr/tests/test_zarr.py index 3433030f..95dbf55f 100644 --- a/virtualizarr/tests/test_zarr.py +++ b/virtualizarr/tests/test_zarr.py @@ -1,84 +1,6 @@ -import json - import numpy as np -import pytest -import xarray as xr -import xarray.testing as xrt - -from virtualizarr import ManifestArray, open_virtual_dataset -from virtualizarr.kerchunk import FileType -from virtualizarr.manifests.manifest import ChunkManifest -from virtualizarr.zarr import ZArray, dataset_to_zarr, metadata_from_zarr_json - - -@pytest.fixture -def vds_with_manifest_arrays() -> xr.Dataset: - arr = ManifestArray( - chunkmanifest=ChunkManifest( - entries={"0.0": dict(path="test.nc", offset=6144, length=48)} - ), - zarray=dict( - shape=(2, 3), - dtype=np.dtype(" bool: - """ - Several metadata attributes in ZarrV3 use a dictionary with keys "name" : str and "configuration" : dict - """ - return "name" in value and "configuration" in value - - -def test_zarr_v3_roundtrip(tmpdir, vds_with_manifest_arrays: xr.Dataset): - vds_with_manifest_arrays.virtualize.to_zarr(tmpdir / "store.zarr") - roundtrip = open_virtual_dataset( - tmpdir / "store.zarr", filetype=FileType.zarr_v3, indexes={} - ) - - xrt.assert_identical(roundtrip, vds_with_manifest_arrays) - - -def test_metadata_roundtrip(tmpdir, vds_with_manifest_arrays: xr.Dataset): - dataset_to_zarr(vds_with_manifest_arrays, tmpdir / "store.zarr") - zarray, _, _ = metadata_from_zarr_json(tmpdir / "store.zarr/a/zarr.json") - assert zarray == vds_with_manifest_arrays.a.data.zarray - - -def test_zarr_v3_metadata_conformance(tmpdir, vds_with_manifest_arrays: xr.Dataset): - """ - Checks that the output metadata of an array variable conforms to this spec - for the required attributes: - https://zarr-specs.readthedocs.io/en/latest/v3/core/v3.0.html#metadata - """ - dataset_to_zarr(vds_with_manifest_arrays, tmpdir / "store.zarr") - # read the a variable's metadata - with open(tmpdir / "store.zarr/a/zarr.json", mode="r") as f: - metadata = json.loads(f.read()) - assert metadata["zarr_format"] == 3 - assert metadata["node_type"] == "array" - assert isinstance(metadata["shape"], list) and all( - isinstance(dim, int) for dim in metadata["shape"] - ) - assert isinstance(metadata["data_type"], str) or isconfigurable( - metadata["data_type"] - ) - assert isconfigurable(metadata["chunk_grid"]) - assert isconfigurable(metadata["chunk_key_encoding"]) - assert isinstance(metadata["fill_value"], (bool, int, float, str, list)) - assert ( - isinstance(metadata["codecs"], list) - and len(metadata["codecs"]) > 1 - and all(isconfigurable(codec) for codec in metadata["codecs"]) - ) +from virtualizarr.zarr import ZArray def test_replace_partial(): diff --git a/virtualizarr/types/__init__.py b/virtualizarr/types/__init__.py new file mode 100644 index 00000000..34cd4bde --- /dev/null +++ b/virtualizarr/types/__init__.py @@ -0,0 +1,3 @@ +from virtualizarr.types.general import ChunkKey # type: ignore[F401] + +__all__ = ["ChunkKey"] diff --git a/virtualizarr/types.py b/virtualizarr/types/general.py similarity index 100% rename from virtualizarr/types.py rename to virtualizarr/types/general.py diff --git a/virtualizarr/types/kerchunk.py b/virtualizarr/types/kerchunk.py new file mode 100644 index 00000000..e8dada20 --- /dev/null +++ b/virtualizarr/types/kerchunk.py @@ -0,0 +1,12 @@ +from typing import NewType + +# Distinguishing these via type hints makes it a lot easier to mentally keep track of what the opaque kerchunk "reference dicts" actually mean +# (idea from https://kobzol.github.io/rust/python/2023/05/20/writing-python-like-its-rust.html) +# TODO I would prefer to be more specific about these types +KerchunkStoreRefs = NewType( + "KerchunkStoreRefs", dict +) # top-level dict with keys for 'version', 'refs' +KerchunkArrRefs = NewType( + "KerchunkArrRefs", + dict, +) # lower-level dict containing just the information for one zarr array diff --git a/virtualizarr/writers/__init__.py b/virtualizarr/writers/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/virtualizarr/writers/kerchunk.py b/virtualizarr/writers/kerchunk.py new file mode 100644 index 00000000..6b4b55f8 --- /dev/null +++ b/virtualizarr/writers/kerchunk.py @@ -0,0 +1,124 @@ +import base64 +import json +from typing import cast + +import numpy as np +import ujson # type: ignore +from xarray import Dataset +from xarray.coding.times import CFDatetimeCoder +from xarray.core.variable import Variable + +from virtualizarr.manifests.manifest import join +from virtualizarr.types.kerchunk import KerchunkArrRefs, KerchunkStoreRefs +from virtualizarr.zarr import ZArray + + +class NumpyEncoder(json.JSONEncoder): + # TODO I don't understand how kerchunk gets around this problem of encoding numpy types (in the zattrs) whilst only using ujson + def default(self, obj): + if isinstance(obj, np.ndarray): + return obj.tolist() # Convert NumPy array to Python list + elif isinstance(obj, np.generic): + return obj.item() # Convert NumPy scalar to Python scalar + elif isinstance(obj, np.dtype): + return str(obj) + return json.JSONEncoder.default(self, obj) + + +def dataset_to_kerchunk_refs(ds: Dataset) -> KerchunkStoreRefs: + """ + Create a dictionary containing kerchunk-style store references from a single xarray.Dataset (which wraps ManifestArray objects). + """ + + all_arr_refs = {} + for var_name, var in ds.variables.items(): + arr_refs = variable_to_kerchunk_arr_refs(var, str(var_name)) + + prepended_with_var_name = { + f"{var_name}/{key}": val for key, val in arr_refs.items() + } + + all_arr_refs.update(prepended_with_var_name) + + zattrs = ds.attrs + if ds.coords: + coord_names = [str(x) for x in ds.coords] + # this weird concatenated string instead of a list of strings is inconsistent with how other features in the kerchunk references format are stored + # see https://github.com/zarr-developers/VirtualiZarr/issues/105#issuecomment-2187266739 + zattrs["coordinates"] = " ".join(coord_names) + + ds_refs = { + "version": 1, + "refs": { + ".zgroup": '{"zarr_format":2}', + ".zattrs": ujson.dumps(zattrs), + **all_arr_refs, + }, + } + + return cast(KerchunkStoreRefs, ds_refs) + + +def variable_to_kerchunk_arr_refs(var: Variable, var_name: str) -> KerchunkArrRefs: + """ + Create a dictionary containing kerchunk-style array references from a single xarray.Variable (which wraps either a ManifestArray or a numpy array). + + Partially encodes the inner dicts to json to match kerchunk behaviour (see https://github.com/fsspec/kerchunk/issues/415). + """ + from virtualizarr.manifests import ManifestArray + + if isinstance(var.data, ManifestArray): + marr = var.data + + arr_refs: dict[str, str | list[str | int]] = { + str(chunk_key): [entry["path"], entry["offset"], entry["length"]] + for chunk_key, entry in marr.manifest.dict().items() + } + + zarray = marr.zarray.replace(zarr_format=2) + + else: + try: + np_arr = var.to_numpy() + except AttributeError as e: + raise TypeError( + f"Can only serialize wrapped arrays of type ManifestArray or numpy.ndarray, but got type {type(var.data)}" + ) from e + + if var.encoding: + if "scale_factor" in var.encoding: + raise NotImplementedError( + f"Cannot serialize loaded variable {var_name}, as it is encoded with a scale_factor" + ) + if "offset" in var.encoding: + raise NotImplementedError( + f"Cannot serialize loaded variable {var_name}, as it is encoded with an offset" + ) + if "calendar" in var.encoding: + np_arr = CFDatetimeCoder().encode(var.copy(), name=var_name).values + + # This encoding is what kerchunk does when it "inlines" data, see https://github.com/fsspec/kerchunk/blob/a0c4f3b828d37f6d07995925b324595af68c4a19/kerchunk/hdf.py#L472 + byte_data = np_arr.tobytes() + # TODO do I really need to encode then decode like this? + inlined_data = (b"base64:" + base64.b64encode(byte_data)).decode("utf-8") + + # TODO can this be generalized to save individual chunks of a dask array? + # TODO will this fail for a scalar? + arr_refs = {join(0 for _ in np_arr.shape): inlined_data} + + zarray = ZArray( + chunks=np_arr.shape, + shape=np_arr.shape, + dtype=np_arr.dtype, + order="C", + fill_value=None, + ) + + zarray_dict = zarray.to_kerchunk_json() + arr_refs[".zarray"] = zarray_dict + + zattrs = {**var.attrs, **var.encoding} + zattrs["_ARRAY_DIMENSIONS"] = list(var.dims) + arr_refs[".zattrs"] = json.dumps(zattrs, separators=(",", ":"), cls=NumpyEncoder) + + return cast(KerchunkArrRefs, arr_refs) diff --git a/virtualizarr/writers/zarr.py b/virtualizarr/writers/zarr.py new file mode 100644 index 00000000..b3dc8f1a --- /dev/null +++ b/virtualizarr/writers/zarr.py @@ -0,0 +1,115 @@ +from pathlib import Path + +import numpy as np +from xarray import Dataset +from xarray.core.variable import Variable + +from virtualizarr.vendor.zarr.utils import json_dumps +from virtualizarr.zarr import ZArray + + +def dataset_to_zarr(ds: Dataset, storepath: str) -> None: + """ + Write an xarray dataset whose variables wrap ManifestArrays to a v3 Zarr store, writing chunk references into manifest.json files. + + Currently requires all variables to be backed by ManifestArray objects. + + Not very useful until some implementation of a Zarr reader can actually read these manifest.json files. + See https://github.com/zarr-developers/zarr-specs/issues/287 + + Parameters + ---------- + ds: xr.Dataset + storepath: str + """ + + from virtualizarr.manifests import ManifestArray + + _storepath = Path(storepath) + Path.mkdir(_storepath, exist_ok=False) + + # should techically loop over groups in a tree but a dataset corresponds to only one group + group_metadata = {"zarr_format": 3, "node_type": "group", "attributes": ds.attrs} + with open(_storepath / "zarr.json", "wb") as group_metadata_file: + group_metadata_file.write(json_dumps(group_metadata)) + + for name, var in ds.variables.items(): + array_dir = _storepath / str(name) + marr = var.data + + # TODO move this check outside the writing loop so we don't write an incomplete store on failure? + # TODO at some point this should be generalized to also write in-memory arrays as normal zarr chunks, see GH isse #62. + if not isinstance(marr, ManifestArray): + raise TypeError( + "Only xarray objects wrapping ManifestArrays can be written to zarr using this method, " + f"but variable {name} wraps an array of type {type(marr)}" + ) + + Path.mkdir(array_dir, exist_ok=False) + + # write the chunk references into a manifest.json file + # and the array metadata into a zarr.json file + to_zarr_json(var, array_dir) + + +def to_zarr_json(var: Variable, array_dir: Path) -> None: + """ + Write out both the zarr.json and manifest.json file into the given zarr array directory. + + Follows the Zarr v3 manifest storage transformer ZEP (see https://github.com/zarr-developers/zarr-specs/issues/287). + + Parameters + ---------- + var : xr.Variable + Must be wrapping a ManifestArray + dirpath : str + Zarr store array directory into which to write files. + """ + + marr = var.data + + marr.manifest.to_zarr_json(array_dir / "manifest.json") + + metadata = zarr_v3_array_metadata( + marr.zarray, [str(x) for x in var.dims], var.attrs + ) + with open(array_dir / "zarr.json", "wb") as metadata_file: + metadata_file.write(json_dumps(metadata)) + + +def zarr_v3_array_metadata(zarray: ZArray, dim_names: list[str], attrs: dict) -> dict: + """Construct a v3-compliant metadata dict from v2 zarray + information stored on the xarray variable.""" + # TODO it would be nice if we could use the zarr-python metadata.ArrayMetadata classes to do this conversion for us + + metadata = zarray.dict() + + # adjust to match v3 spec + metadata["zarr_format"] = 3 + metadata["node_type"] = "array" + metadata["data_type"] = str(np.dtype(metadata.pop("dtype"))) + metadata["chunk_grid"] = { + "name": "regular", + "configuration": {"chunk_shape": metadata.pop("chunks")}, + } + metadata["chunk_key_encoding"] = { + "name": "default", + "configuration": {"separator": "/"}, + } + metadata["codecs"] = zarray._v3_codec_pipeline() + metadata.pop("filters") + metadata.pop("compressor") + metadata.pop("order") + + # indicate that we're using the manifest storage transformer ZEP + metadata["storage_transformers"] = [ + { + "name": "chunk-manifest-json", + "configuration": {"manifest": "./manifest.json"}, + } + ] + + # add information from xarray object + metadata["dimension_names"] = dim_names + metadata["attributes"] = attrs + + return metadata diff --git a/virtualizarr/zarr.py b/virtualizarr/zarr.py index 824892cc..f62b1269 100644 --- a/virtualizarr/zarr.py +++ b/virtualizarr/zarr.py @@ -1,14 +1,9 @@ import dataclasses -import json -from pathlib import Path from typing import TYPE_CHECKING, Any, Literal, NewType, cast import numcodecs import numpy as np import ujson # type: ignore -import xarray as xr - -from virtualizarr.vendor.zarr.utils import json_dumps if TYPE_CHECKING: pass @@ -213,179 +208,6 @@ def ceildiv(a: int, b: int) -> int: return -(a // -b) -def dataset_to_zarr(ds: xr.Dataset, storepath: str) -> None: - """ - Write an xarray dataset whose variables wrap ManifestArrays to a v3 Zarr store, writing chunk references into manifest.json files. - - Currently requires all variables to be backed by ManifestArray objects. - - Not very useful until some implementation of a Zarr reader can actually read these manifest.json files. - See https://github.com/zarr-developers/zarr-specs/issues/287 - - Parameters - ---------- - ds: xr.Dataset - storepath: str - """ - - from virtualizarr.manifests import ManifestArray - - _storepath = Path(storepath) - Path.mkdir(_storepath, exist_ok=False) - - # should techically loop over groups in a tree but a dataset corresponds to only one group - group_metadata = {"zarr_format": 3, "node_type": "group", "attributes": ds.attrs} - with open(_storepath / "zarr.json", "wb") as group_metadata_file: - group_metadata_file.write(json_dumps(group_metadata)) - - for name, var in ds.variables.items(): - array_dir = _storepath / str(name) - marr = var.data - - # TODO move this check outside the writing loop so we don't write an incomplete store on failure? - # TODO at some point this should be generalized to also write in-memory arrays as normal zarr chunks, see GH isse #62. - if not isinstance(marr, ManifestArray): - raise TypeError( - "Only xarray objects wrapping ManifestArrays can be written to zarr using this method, " - f"but variable {name} wraps an array of type {type(marr)}" - ) - - Path.mkdir(array_dir, exist_ok=False) - - # write the chunk references into a manifest.json file - # and the array metadata into a zarr.json file - to_zarr_json(var, array_dir) - - -def to_zarr_json(var: xr.Variable, array_dir: Path) -> None: - """ - Write out both the zarr.json and manifest.json file into the given zarr array directory. - - Follows the Zarr v3 manifest storage transformer ZEP (see https://github.com/zarr-developers/zarr-specs/issues/287). - - Parameters - ---------- - var : xr.Variable - Must be wrapping a ManifestArray - dirpath : str - Zarr store array directory into which to write files. - """ - - marr = var.data - - marr.manifest.to_zarr_json(array_dir / "manifest.json") - - metadata = zarr_v3_array_metadata( - marr.zarray, [str(x) for x in var.dims], var.attrs - ) - with open(array_dir / "zarr.json", "wb") as metadata_file: - metadata_file.write(json_dumps(metadata)) - - -def zarr_v3_array_metadata(zarray: ZArray, dim_names: list[str], attrs: dict) -> dict: - """Construct a v3-compliant metadata dict from v2 zarray + information stored on the xarray variable.""" - # TODO it would be nice if we could use the zarr-python metadata.ArrayMetadata classes to do this conversion for us - - metadata = zarray.dict() - - # adjust to match v3 spec - metadata["zarr_format"] = 3 - metadata["node_type"] = "array" - metadata["data_type"] = str(np.dtype(metadata.pop("dtype"))) - metadata["chunk_grid"] = { - "name": "regular", - "configuration": {"chunk_shape": metadata.pop("chunks")}, - } - metadata["chunk_key_encoding"] = { - "name": "default", - "configuration": {"separator": "/"}, - } - metadata["codecs"] = zarray._v3_codec_pipeline() - metadata.pop("filters") - metadata.pop("compressor") - metadata.pop("order") - - # indicate that we're using the manifest storage transformer ZEP - metadata["storage_transformers"] = [ - { - "name": "chunk-manifest-json", - "configuration": {"manifest": "./manifest.json"}, - } - ] - - # add information from xarray object - metadata["dimension_names"] = dim_names - metadata["attributes"] = attrs - - return metadata - - -def attrs_from_zarr_group_json(filepath: Path) -> dict: - with open(filepath) as metadata_file: - attrs = json.load(metadata_file) - return attrs["attributes"] - - -def metadata_from_zarr_json(filepath: Path) -> tuple[ZArray, list[str], dict]: - with open(filepath) as metadata_file: - metadata = json.load(metadata_file) - - if { - "name": "chunk-manifest-json", - "configuration": { - "manifest": "./manifest.json", - }, - } not in metadata.get("storage_transformers", []): - raise ValueError( - "Can only read byte ranges from Zarr v3 stores which implement the manifest storage transformer ZEP." - ) - - attrs = metadata.pop("attributes") - dim_names = metadata.pop("dimension_names") - - chunk_shape = tuple(metadata["chunk_grid"]["configuration"]["chunk_shape"]) - shape = tuple(metadata["shape"]) - zarr_format = metadata["zarr_format"] - - if metadata["fill_value"] is None: - raise ValueError( - "fill_value must be specified https://zarr-specs.readthedocs.io/en/latest/v3/core/v3.0.html#fill-value" - ) - else: - fill_value = metadata["fill_value"] - - all_codecs = [ - codec - for codec in metadata["codecs"] - if codec["name"] not in ("transpose", "bytes") - ] - compressor, *filters = [ - _configurable_to_num_codec_config(_filter) for _filter in all_codecs - ] - zarray = ZArray( - chunks=chunk_shape, - compressor=compressor, - dtype=np.dtype(metadata["data_type"]), - fill_value=fill_value, - filters=filters or None, - order="C", - shape=shape, - zarr_format=zarr_format, - ) - - return zarray, dim_names, attrs - - -def _configurable_to_num_codec_config(configurable: dict) -> dict: - """ - Convert a zarr v3 configurable into a numcodecs codec. - """ - configurable_copy = configurable.copy() - codec_id = configurable_copy.pop("name") - configuration = configurable_copy.pop("configuration") - return numcodecs.get_codec({"id": codec_id, **configuration}).get_config() - - def _num_codec_config_to_configurable(num_codec: dict) -> dict: """ Convert a numcodecs codec into a zarr v3 configurable.