diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 769f59e5..0550236f 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -46,10 +46,6 @@ jobs: conda env list conda list - - name: Type check - run: | - mypy virtualizarr - - name: Running Tests run: | python -m pytest ./virtualizarr --run-network-tests --cov=./ --cov-report=xml --verbose diff --git a/.github/workflows/min-deps.yml b/.github/workflows/min-deps.yml new file mode 100644 index 00000000..066e1ba3 --- /dev/null +++ b/.github/workflows/min-deps.yml @@ -0,0 +1,60 @@ +name: min-deps + +on: + push: + branches: [ "main" ] + paths-ignore: + - 'docs/**' + pull_request: + branches: [ "main" ] + paths-ignore: + - 'docs/**' + schedule: + - cron: "0 0 * * *" + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + + test: + name: ${{ matrix.python-version }}-build + runs-on: ubuntu-latest + defaults: + run: + shell: bash -l {0} + strategy: + matrix: + python-version: ["3.12"] + steps: + - uses: actions/checkout@v4 + + - name: Setup micromamba + uses: mamba-org/setup-micromamba@v1 + with: + environment-file: ci/min-deps.yml + cache-environment: true + create-args: >- + python=${{matrix.python-version}} + + - name: Install virtualizarr + run: | + python -m pip install -e . --no-deps + - name: Conda list information + run: | + conda env list + conda list + + - name: Running Tests + run: | + python -m pytest ./virtualizarr --cov=./ --cov-report=xml --verbose + + - name: Upload code coverage to Codecov + uses: codecov/codecov-action@v3.1.4 + with: + file: ./coverage.xml + flags: unittests + env_vars: OS,PYTHON + name: codecov-umbrella + fail_ci_if_error: false diff --git a/.github/workflows/typing.yml b/.github/workflows/typing.yml new file mode 100644 index 00000000..0540801b --- /dev/null +++ b/.github/workflows/typing.yml @@ -0,0 +1,38 @@ +name: Typing + +on: + push: + branches: [ "main" ] + paths-ignore: + - 'docs/**' + pull_request: + branches: [ "main" ] + paths-ignore: + - 'docs/**' + schedule: + - cron: "0 0 * * *" + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + mypy: + name: mypy + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.12' + + - name: Install deps + run: | + # We need to test optional dep to add all the library stubs + pip install -e '.[test]' + + - name: Type check + run: | + mypy virtualizarr diff --git a/.github/workflows/upstream.yml b/.github/workflows/upstream.yml new file mode 100644 index 00000000..9140896b --- /dev/null +++ b/.github/workflows/upstream.yml @@ -0,0 +1,60 @@ +name: upstream + +on: + push: + branches: [ "main" ] + paths-ignore: + - 'docs/**' + pull_request: + branches: [ "main" ] + paths-ignore: + - 'docs/**' + schedule: + - cron: "0 0 * * *" + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + + test: + name: ${{ matrix.python-version }}-build + runs-on: ubuntu-latest + defaults: + run: + shell: bash -l {0} + strategy: + matrix: + python-version: ["3.12"] + steps: + - uses: actions/checkout@v4 + + - name: Setup micromamba + uses: mamba-org/setup-micromamba@v1 + with: + environment-file: ci/upstream.yml + cache-environment: true + create-args: >- + python=${{matrix.python-version}} + + - name: Install virtualizarr + run: | + python -m pip install -e . --no-deps + - name: Conda list information + run: | + conda env list + conda list + + - name: Running Tests + run: | + python -m pytest ./virtualizarr --cov=./ --cov-report=xml --verbose + + - name: Upload code coverage to Codecov + uses: codecov/codecov-action@v3.1.4 + with: + file: ./coverage.xml + flags: unittests + env_vars: OS,PYTHON + name: codecov-umbrella + fail_ci_if_error: false diff --git a/.gitignore b/.gitignore index d360cfa4..d6720a7a 100644 --- a/.gitignore +++ b/.gitignore @@ -160,3 +160,4 @@ cython_debug/ #.idea/ virtualizarr/_version.py docs/generated/ +examples/ diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 803b7a78..3bae6a6c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -3,7 +3,7 @@ ci: autoupdate_schedule: monthly repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.6.0 + rev: v5.0.0 hooks: - id: trailing-whitespace - id: end-of-file-fixer @@ -11,7 +11,7 @@ repos: - repo: https://github.com/astral-sh/ruff-pre-commit # Ruff version. - rev: "v0.6.3" + rev: "v0.6.9" hooks: # Run the linter. - id: ruff diff --git a/ci/doc.yml b/ci/doc.yml index 7d7e9224..ccc3ded6 100644 --- a/ci/doc.yml +++ b/ci/doc.yml @@ -13,4 +13,3 @@ dependencies: - "sphinx_design" - "sphinx_togglebutton" - "sphinx-autodoc-typehints" - - -e "..[test]" diff --git a/ci/min-deps.yml b/ci/min-deps.yml new file mode 100644 index 00000000..7ca8c0b3 --- /dev/null +++ b/ci/min-deps.yml @@ -0,0 +1,26 @@ +name: virtualizarr-min-deps +channels: + - conda-forge + - nodefaults +dependencies: + - h5netcdf + - h5py + - hdf5 + - netcdf4 + - xarray>=2024.6.0 + - numpy>=2.0.0 + - numcodecs + - packaging + - ujson + - universal_pathlib + # Testing + - codecov + - pre-commit + - mypy + - ruff + - pandas-stubs + - pytest-mypy + - pytest-cov + - pytest + - pooch + - fsspec diff --git a/ci/upstream.yml b/ci/upstream.yml new file mode 100644 index 00000000..2c2680bc --- /dev/null +++ b/ci/upstream.yml @@ -0,0 +1,30 @@ +name: virtualizarr-min-deps +channels: + - conda-forge + - nodefaults +dependencies: + - h5netcdf + - h5py + - hdf5 + - netcdf4 + - numpy>=2.0.0 + - packaging + - ujson + - universal_pathlib + # Testing + - codecov + - pre-commit + - mypy + - ruff + - pandas-stubs + - pytest-mypy + - pytest-cov + - pytest + - pooch + - fsspec + - pip + - pip: + - icechunk # Installs zarr v3 as dependency + - git+https://github.com/pydata/xarray@zarr-v3 # zarr-v3 compatibility branch + - git+https://github.com/zarr-developers/numcodecs@zarr3-codecs # zarr-v3 compatibility branch + # - git+https://github.com/fsspec/kerchunk@main # kerchunk is currently incompatible with zarr-python v3 (https://github.com/fsspec/kerchunk/pull/516) diff --git a/conftest.py b/conftest.py index 32b3581f..810fd833 100644 --- a/conftest.py +++ b/conftest.py @@ -1,6 +1,8 @@ import h5py +import numpy as np import pytest import xarray as xr +from xarray.core.variable import Variable def pytest_addoption(parser): @@ -33,6 +35,20 @@ def netcdf4_file(tmpdir): return filepath +@pytest.fixture +def netcdf4_virtual_dataset(netcdf4_file): + from virtualizarr import open_virtual_dataset + + return open_virtual_dataset(netcdf4_file, indexes={}) + + +@pytest.fixture +def netcdf4_inlined_ref(netcdf4_file): + from kerchunk.hdf import SingleHdf5ToZarr + + return SingleHdf5ToZarr(netcdf4_file, inline_threshold=1000).translate() + + @pytest.fixture def hdf5_groups_file(tmpdir): # Set up example xarray dataset @@ -82,3 +98,16 @@ def hdf5_scalar(tmpdir): dataset = f.create_dataset("scalar", data=0.1, dtype="float32") dataset.attrs["scalar"] = "true" return filepath + + +@pytest.fixture +def simple_netcdf4(tmpdir): + filepath = f"{tmpdir}/simple.nc" + + arr = np.arange(12, dtype=np.dtype("int32")).reshape(3, 4) + var = Variable(data=arr, dims=["x", "y"]) + ds = xr.Dataset({"foo": var}) + + ds.to_netcdf(filepath) + + return filepath diff --git a/docs/api.rst b/docs/api.rst index 81d08a77..755713d0 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -39,6 +39,7 @@ Serialization VirtualiZarrDatasetAccessor.to_kerchunk VirtualiZarrDatasetAccessor.to_zarr + VirtualiZarrDatasetAccessor.to_icechunk Rewriting diff --git a/docs/releases.rst b/docs/releases.rst index ec057807..93a5fec9 100644 --- a/docs/releases.rst +++ b/docs/releases.rst @@ -8,6 +8,11 @@ v1.0.1 (unreleased) New Features ~~~~~~~~~~~~ + + +- Can open `kerchunk` reference files with ``open_virtual_dataset``. + (:pull:`251`, :pull:`186`) By `Raphael Hagen `_ & `Kristen Thyng `_. + - Adds defaults for `open_virtual_dataset_from_v3_store` in (:pull:`234`) By `Raphael Hagen `_. @@ -23,6 +28,12 @@ New Features - Load scalar variables by default. (:pull:`205`) By `Gustavo Hidalgo `_. +- Support empty files (:pull:`260`) + By `Justus Magin `_. + +- Can write virtual datasets to Icechunk stores using `vitualize.to_icechunk` (:pull:`256`) + By `Matt Iannucci `_. + Breaking changes ~~~~~~~~~~~~~~~~ @@ -30,7 +41,7 @@ Breaking changes By `Gustavo Hidalgo `_. - VirtualiZarr's `ZArray`, `ChunkEntry`, and `Codec` no longer subclass `pydantic.BaseModel` (:pull:`210`) -- `ZArray`'s `__init__` signature has changed to match `zarr.Array`'s (:pull:`xxx`) +- `ZArray`'s `__init__` signature has changed to match `zarr.Array`'s (:pull:`210`) Deprecations ~~~~~~~~~~~~ @@ -50,7 +61,7 @@ Bug fixes Documentation ~~~~~~~~~~~~~ -- Adds virtualizarr + coiled serverless example notebook (:pull`223`) +- Adds virtualizarr + coiled serverless example notebook (:pull:`223`) By `Raphael Hagen `_. @@ -59,6 +70,8 @@ Internal Changes - Refactored internal structure significantly to split up everything to do with reading references from that to do with writing references. (:issue:`229`) (:pull:`231`) By `Tom Nicholas `_. +- Refactored readers to consider every filetype as a separate reader, all standardized to present the same `open_virtual_dataset` interface internally. + (:pull:`261`) By `Tom Nicholas `_. .. _v1.0.0: diff --git a/docs/usage.md b/docs/usage.md index 40071b8a..30eab144 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -396,6 +396,23 @@ combined_ds = xr.open_dataset('combined.parq', engine="kerchunk") By default references are placed in separate parquet file when the total number of references exceeds `record_size`. If there are fewer than `categorical_threshold` unique urls referenced by a particular variable, url will be stored as a categorical variable. +### Writing to an Icechunk Store + +We can also write these references out as an [IcechunkStore](https://icechunk.io/). `Icechunk` is a Open-source, cloud-native transactional tensor storage engine that is compatible with zarr version 3. To export our virtual dataset to an `Icechunk` Store, we simply use the {py:meth}`ds.virtualize.to_icechunk ` accessor method. + +```python +# create an icechunk store +from icechunk import IcechunkStore, StorageConfig, StoreConfig, VirtualRefConfig +storage = StorageConfig.filesystem(str('combined')) +store = IcechunkStore.create(storage=storage, mode="w", config=StoreConfig( + virtual_ref_config=VirtualRefConfig.s3_anonymous(region='us-east-1'), +)) + +combined_vds.virtualize.to_icechunk(store) +``` + +See the [Icechunk documentation](https://icechunk.io/icechunk-python/virtual/#creating-a-virtual-dataset-with-virtualizarr) for more details. + ### Writing as Zarr Alternatively, we can write these references out as an actual Zarr store, at least one that is compliant with the [proposed "Chunk Manifest" ZEP](https://github.com/zarr-developers/zarr-specs/issues/287). To do this we simply use the {py:meth}`ds.virtualize.to_zarr ` accessor method. @@ -421,6 +438,18 @@ Currently there are not yet any zarr v3 readers which understand the chunk manif This store can however be read by {py:func}`~virtualizarr.xarray.open_virtual_dataset`, by passing `filetype="zarr_v3"`. ``` +## Opening Kerchunk references as virtual datasets + +You can open existing Kerchunk `json` or `parquet` references as Virtualizarr virtual datasets. This may be useful for converting existing Kerchunk formatted references to storage formats like [Icechunk](https://icechunk.io/). + +```python + +vds = open_virtual_dataset('combined.json', format='kerchunk') +# or +vds = open_virtual_dataset('combined.parquet', format='kerchunk') + +``` + ## Rewriting existing manifests Sometimes it can be useful to rewrite the contents of an already-generated manifest or virtual dataset. diff --git a/pyproject.toml b/pyproject.toml index 62a13001..df6c37be 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,14 +22,14 @@ requires-python = ">=3.10" dynamic = ["version"] dependencies = [ "xarray>=2024.06.0", - "kerchunk>=0.2.5", - "h5netcdf", - "pydantic", "numpy>=2.0.0", - "ujson", "packaging", "universal-pathlib", + "h5py", "hdf5plugin", + "numcodecs", + "imagecodecs-numcodecs", + "ujson", ] [project.optional-dependencies] @@ -37,9 +37,14 @@ test = [ "codecov", "fastparquet", "fsspec", + "h5netcdf", "h5py", + "hdf5plugin", + "kerchunk>=0.2.5", "mypy", "netcdf4", + "numcodecs", + "imagecodecs-numcodecs", "pandas-stubs", "pooch", "pre-commit", @@ -48,9 +53,6 @@ test = [ "pytest", "ruff", "s3fs", - "fastparquet", - "imagecodecs-numcodecs", - "h5py", "scipy", ] @@ -92,6 +94,10 @@ ignore_missing_imports = true module = "kerchunk.*" ignore_missing_imports = true +[[tool.mypy.overrides]] +module = "ujson.*" +ignore_missing_imports = true + [tool.ruff] # Same as Black. line-length = 88 diff --git a/virtualizarr/accessor.py b/virtualizarr/accessor.py index 0a97237e..336838f9 100644 --- a/virtualizarr/accessor.py +++ b/virtualizarr/accessor.py @@ -1,11 +1,11 @@ from pathlib import Path from typing import ( + TYPE_CHECKING, Callable, Literal, overload, ) -import ujson # type: ignore from xarray import Dataset, register_dataset_accessor from virtualizarr.manifests import ManifestArray @@ -13,6 +13,9 @@ from virtualizarr.writers.kerchunk import dataset_to_kerchunk_refs from virtualizarr.writers.zarr import dataset_to_zarr +if TYPE_CHECKING: + from icechunk import IcechunkStore # type: ignore[import-not-found] + @register_dataset_accessor("virtualize") class VirtualiZarrDatasetAccessor: @@ -40,6 +43,20 @@ def to_zarr(self, storepath: str) -> None: """ dataset_to_zarr(self.ds, storepath) + def to_icechunk(self, store: "IcechunkStore") -> None: + """ + Write an xarray dataset to an Icechunk store. + + Any variables backed by ManifestArray objects will be be written as virtual references, any other variables will be loaded into memory before their binary chunk data is written into the store. + + Parameters + ---------- + store: IcechunkStore + """ + from virtualizarr.writers.icechunk import dataset_to_icechunk + + dataset_to_icechunk(self.ds, store) + @overload def to_kerchunk( self, filepath: None, format: Literal["dict"] @@ -91,6 +108,8 @@ def to_kerchunk( if format == "dict": return refs elif format == "json": + import ujson + if filepath is None: raise ValueError("Filepath must be provided when format is 'json'") diff --git a/virtualizarr/backend.py b/virtualizarr/backend.py index 076fc559..731069df 100644 --- a/virtualizarr/backend.py +++ b/virtualizarr/backend.py @@ -1,24 +1,39 @@ -import os import warnings -from collections.abc import Iterable, Mapping, MutableMapping +from collections.abc import Iterable, Mapping from enum import Enum, auto -from io import BufferedIOBase +from pathlib import Path from typing import ( Any, - Hashable, Optional, - cast, ) -import xarray as xr -from xarray.backends import AbstractDataStore, BackendArray -from xarray.core.indexes import Index, PandasIndex -from xarray.core.variable import IndexVariable +from xarray import Dataset +from xarray.core.indexes import Index from virtualizarr.manifests import ManifestArray -from virtualizarr.utils import _fsspec_openfile_from_filepath - -XArrayOpenT = str | os.PathLike[Any] | BufferedIOBase | AbstractDataStore +from virtualizarr.readers import ( + DMRPPVirtualBackend, + FITSVirtualBackend, + HDFVirtualBackend, + KerchunkVirtualBackend, + NetCDF3VirtualBackend, + TIFFVirtualBackend, + ZarrV3VirtualBackend, +) +from virtualizarr.utils import _FsspecFSFromFilepath, check_for_collisions + +# TODO add entrypoint to allow external libraries to add to this mapping +VIRTUAL_BACKENDS = { + "kerchunk": KerchunkVirtualBackend, + "zarr_v3": ZarrV3VirtualBackend, + "dmrpp": DMRPPVirtualBackend, + "hdf5": HDFVirtualBackend, + "netcdf4": HDFVirtualBackend, # note this is the same as for hdf5 + # all the below call one of the kerchunk backends internally (https://fsspec.github.io/kerchunk/reference.html#file-format-backends) + "netcdf3": NetCDF3VirtualBackend, + "tiff": TIFFVirtualBackend, + "fits": FITSVirtualBackend, +} class AutoName(Enum): @@ -39,12 +54,52 @@ class FileType(AutoName): zarr = auto() dmrpp = auto() zarr_v3 = auto() + kerchunk = auto() -class ManifestBackendArray(ManifestArray, BackendArray): - """Using this prevents xarray from wrapping the KerchunkArray in ExplicitIndexingAdapter etc.""" +def automatically_determine_filetype( + *, + filepath: str, + reader_options: Optional[dict[str, Any]] = {}, +) -> FileType: + """ + Attempt to automatically infer the correct reader for this filetype. + + Uses magic bytes and file / directory suffixes. + """ + + # TODO this should ideally handle every filetype that we have a reader for, not just kerchunk + + # TODO how do we handle kerchunk json / parquet here? + if Path(filepath).suffix == ".zarr": + # TODO we could imagine opening an existing zarr store, concatenating it, and writing a new virtual one... + raise NotImplementedError() + + # Read magic bytes from local or remote file + fpath = _FsspecFSFromFilepath( + filepath=filepath, reader_options=reader_options + ).open_file() + magic_bytes = fpath.read(8) + fpath.close() + + if magic_bytes.startswith(b"CDF"): + filetype = FileType.netcdf3 + elif magic_bytes.startswith(b"\x0e\x03\x13\x01"): + raise NotImplementedError("HDF4 formatted files not supported") + elif magic_bytes.startswith(b"\x89HDF"): + filetype = FileType.hdf5 + elif magic_bytes.startswith(b"GRIB"): + filetype = FileType.grib + elif magic_bytes.startswith(b"II*"): + filetype = FileType.tiff + elif magic_bytes.startswith(b"SIMPLE"): + filetype = FileType.fits + else: + raise NotImplementedError( + f"Unrecognised file based on header bytes: {magic_bytes}" + ) - ... + return filetype def open_virtual_dataset( @@ -59,7 +114,7 @@ def open_virtual_dataset( indexes: Mapping[str, Index] | None = None, virtual_array_class=ManifestArray, reader_options: Optional[dict] = None, -) -> xr.Dataset: +) -> Dataset: """ Open a file or store as an xarray Dataset wrapping virtualized zarr arrays. @@ -73,7 +128,7 @@ def open_virtual_dataset( File path to open as a set of virtualized zarr arrays. filetype : FileType, default None Type of file to be opened. Used to determine which kerchunk file format backend to use. - Can be one of {'netCDF3', 'netCDF4', 'HDF', 'TIFF', 'GRIB', 'FITS', 'zarr_v3'}. + Can be one of {'netCDF3', 'netCDF4', 'HDF', 'TIFF', 'GRIB', 'FITS', 'zarr_v3', 'kerchunk'}. If not provided will attempt to automatically infer the correct filetype from header bytes. group : str, default is None Path to the HDF5/netCDF4 group in the given file to open. Given as a str, supported by filetypes “netcdf4” and “hdf5”. @@ -109,206 +164,38 @@ def open_virtual_dataset( stacklevel=2, ) - loadable_vars: dict[str, xr.Variable] - virtual_vars: dict[str, xr.Variable] - vars: dict[str, xr.Variable] - - if drop_variables is None: - drop_variables = [] - elif isinstance(drop_variables, str): - drop_variables = [drop_variables] - else: - drop_variables = list(drop_variables) - if loadable_variables is None: - loadable_variables = [] - elif isinstance(loadable_variables, str): - loadable_variables = [loadable_variables] - else: - loadable_variables = list(loadable_variables) - common = set(drop_variables).intersection(set(loadable_variables)) - if common: - raise ValueError(f"Cannot both load and drop variables {common}") + drop_variables, loadable_variables = check_for_collisions( + drop_variables, + loadable_variables, + ) if virtual_array_class is not ManifestArray: raise NotImplementedError() - # if filetype is user defined, convert to FileType + if reader_options is None: + reader_options = {} + if filetype is not None: + # if filetype is user defined, convert to FileType filetype = FileType(filetype) - - if filetype == FileType.zarr_v3: - # TODO is there a neat way of auto-detecting this? - from virtualizarr.readers.zarr import open_virtual_dataset_from_v3_store - - return open_virtual_dataset_from_v3_store( - storepath=filepath, drop_variables=drop_variables, indexes=indexes - ) - elif filetype == FileType.dmrpp: - from virtualizarr.readers.dmrpp import DMRParser - - if loadable_variables != [] or indexes is None: - raise NotImplementedError( - "Specifying `loadable_variables` or auto-creating indexes with `indexes=None` is not supported for dmrpp files." - ) - - fpath = _fsspec_openfile_from_filepath( - filepath=filepath, reader_options=reader_options - ) - parser = DMRParser(fpath.read(), data_filepath=filepath.strip(".dmrpp")) - vds = parser.parse_dataset() - vds.drop_vars(drop_variables) - return vds else: - if reader_options is None: - reader_options = {} - - from virtualizarr.readers.kerchunk import _automatically_determine_filetype - - if filetype is None: - filetype = _automatically_determine_filetype( - filepath=filepath, reader_options=reader_options - ) - filetype = FileType(filetype) - if filetype == FileType.hdf5: - from virtualizarr.readers.hdf import ( - attrs_from_root_group, - virtual_vars_from_hdf, - ) - - virtual_vars = virtual_vars_from_hdf( - path=filepath, - group=group, - drop_variables=drop_variables + loadable_variables, - reader_options=reader_options, - ) - ds_attrs = attrs_from_root_group( - path=filepath, reader_options=reader_options, group=group - ) - coord_names = ds_attrs.pop("coordinates", []) - # we currently read every other filetype using kerchunks various file format backends - else: - from virtualizarr.readers.kerchunk import ( - fully_decode_arr_refs, - read_kerchunk_references_from_file, - virtual_vars_from_kerchunk_refs, - ) - - # this is the only place we actually always need to use kerchunk directly - # TODO avoid even reading byte ranges for variables that will be dropped later anyway? - vds_refs = read_kerchunk_references_from_file( - filepath=filepath, - filetype=filetype, - group=group, - reader_options=reader_options, - ) - virtual_vars = virtual_vars_from_kerchunk_refs( - vds_refs, - drop_variables=drop_variables + loadable_variables, - virtual_array_class=virtual_array_class, - ) - ds_attrs = fully_decode_arr_refs(vds_refs["refs"]).get(".zattrs", {}) - coord_names = ds_attrs.pop("coordinates", []) - - if indexes is None or len(loadable_variables) > 0: - # TODO we are reading a bunch of stuff we know we won't need here, e.g. all of the data variables... - # TODO it would also be nice if we could somehow consolidate this with the reading of the kerchunk references - # TODO really we probably want a dedicated xarray backend that iterates over all variables only once - fpath = _fsspec_openfile_from_filepath( - filepath=filepath, reader_options=reader_options - ) - - # fpath can be `Any` thanks to fsspec.filesystem(...).open() returning Any. - # We'll (hopefully safely) cast it to what xarray is expecting, but this might let errors through. - - ds = xr.open_dataset( - cast(XArrayOpenT, fpath), - drop_variables=drop_variables, - group=group, - decode_times=decode_times, - ) - - if indexes is None: - warnings.warn( - "Specifying `indexes=None` will create in-memory pandas indexes for each 1D coordinate, but concatenation of ManifestArrays backed by pandas indexes is not yet supported (see issue #18)." - "You almost certainly want to pass `indexes={}` to `open_virtual_dataset` instead." - ) - - # add default indexes by reading data from file - indexes = {name: index for name, index in ds.xindexes.items()} - elif indexes != {}: - # TODO allow manual specification of index objects - raise NotImplementedError() - else: - indexes = dict(**indexes) # for type hinting: to allow mutation - - loadable_vars = { - str(name): var - for name, var in ds.variables.items() - if name in loadable_variables - } - - # if we only read the indexes we can just close the file right away as nothing is lazy - if loadable_vars == {}: - ds.close() - else: - loadable_vars = {} - indexes = {} - - vars = {**virtual_vars, **loadable_vars} - - data_vars, coords = separate_coords(vars, indexes, coord_names) - - vds = xr.Dataset( - data_vars, - coords=coords, - # indexes={}, # TODO should be added in a later version of xarray - attrs=ds_attrs, + filetype = automatically_determine_filetype( + filepath=filepath, reader_options=reader_options ) - # TODO we should probably also use vds.set_close() to tell xarray how to close the file we opened - - return vds - - -def separate_coords( - vars: Mapping[str, xr.Variable], - indexes: MutableMapping[str, Index], - coord_names: Iterable[str] | None = None, -) -> tuple[dict[str, xr.Variable], xr.Coordinates]: - """ - Try to generate a set of coordinates that won't cause xarray to automatically build a pandas.Index for the 1D coordinates. - - Currently requires this function as a workaround unless xarray PR #8124 is merged. - - Will also preserve any loaded variables and indexes it is passed. - """ - - if coord_names is None: - coord_names = [] - - # split data and coordinate variables (promote dimension coordinates) - data_vars = {} - coord_vars: dict[ - str, tuple[Hashable, Any, dict[Any, Any], dict[Any, Any]] | xr.Variable - ] = {} - for name, var in vars.items(): - if name in coord_names or var.dims == (name,): - # use workaround to avoid creating IndexVariables described here https://github.com/pydata/xarray/pull/8107#discussion_r1311214263 - if len(var.dims) == 1: - dim1d, *_ = var.dims - coord_vars[name] = (dim1d, var.data, var.attrs, var.encoding) + backend_cls = VIRTUAL_BACKENDS.get(filetype.name.lower()) - if isinstance(var, IndexVariable): - # unless variable actually already is a loaded IndexVariable, - # in which case we need to keep it and add the corresponding indexes explicitly - coord_vars[str(name)] = var - # TODO this seems suspect - will it handle datetimes? - indexes[name] = PandasIndex(var, dim1d) - else: - coord_vars[name] = var - else: - data_vars[name] = var + if backend_cls is None: + raise NotImplementedError(f"Unsupported file type: {filetype.name}") - coords = xr.Coordinates(coord_vars, indexes=indexes) + vds = backend_cls.open_virtual_dataset( + filepath, + group=group, + drop_variables=drop_variables, + loadable_variables=loadable_variables, + decode_times=decode_times, + indexes=indexes, + reader_options=reader_options, + ) - return data_vars, coords + return vds diff --git a/virtualizarr/manifests/array.py b/virtualizarr/manifests/array.py index 5ac0aef0..179bcf1c 100644 --- a/virtualizarr/manifests/array.py +++ b/virtualizarr/manifests/array.py @@ -3,10 +3,13 @@ import numpy as np -from ..types.kerchunk import KerchunkArrRefs -from ..zarr import ZArray -from .array_api import MANIFESTARRAY_HANDLED_ARRAY_FUNCTIONS, _isnan -from .manifest import ChunkManifest +from virtualizarr.manifests.array_api import ( + MANIFESTARRAY_HANDLED_ARRAY_FUNCTIONS, + _isnan, +) +from virtualizarr.manifests.manifest import ChunkManifest +from virtualizarr.types.kerchunk import KerchunkArrRefs +from virtualizarr.zarr import ZArray class ManifestArray: @@ -61,7 +64,7 @@ def __init__( @classmethod def _from_kerchunk_refs(cls, arr_refs: KerchunkArrRefs) -> "ManifestArray": - from virtualizarr.readers.kerchunk import ( + from virtualizarr.translators.kerchunk import ( fully_decode_arr_refs, parse_array_refs, ) diff --git a/virtualizarr/manifests/array_api.py b/virtualizarr/manifests/array_api.py index 09606978..f5cf220b 100644 --- a/virtualizarr/manifests/array_api.py +++ b/virtualizarr/manifests/array_api.py @@ -1,8 +1,8 @@ -from typing import TYPE_CHECKING, Callable, Iterable +from typing import TYPE_CHECKING, Any, Callable, Iterable, cast import numpy as np -from virtualizarr.zarr import Codec, ceildiv +from virtualizarr.zarr import Codec, determine_chunk_grid_shape from .manifest import ChunkManifest @@ -217,9 +217,12 @@ def stack( new_shape.insert(axis, length_along_new_stacked_axis) # do stacking of entries in manifest - stacked_paths = np.stack( - [arr.manifest._paths for arr in arrays], - axis=axis, + stacked_paths = cast( # `np.stack` apparently is type hinted as if the output could have Any dtype + np.ndarray[Any, np.dtypes.StringDType], + np.stack( + [arr.manifest._paths for arr in arrays], + axis=axis, + ), ) stacked_offsets = np.stack( [arr.manifest._offsets for arr in arrays], @@ -290,16 +293,17 @@ def broadcast_to(x: "ManifestArray", /, shape: tuple[int, ...]) -> "ManifestArra ) # find new chunk grid shape by dividing new array shape by new chunk shape - new_chunk_grid_shape = tuple( - ceildiv(axis_length, chunk_length) - for axis_length, chunk_length in zip(new_shape, new_chunk_shape) - ) + new_chunk_grid_shape = determine_chunk_grid_shape(new_shape, new_chunk_shape) # do broadcasting of entries in manifest - broadcasted_paths = np.broadcast_to( - x.manifest._paths, - shape=new_chunk_grid_shape, + broadcasted_paths = cast( # `np.broadcast_to` apparently is type hinted as if the output could have Any dtype + np.ndarray[Any, np.dtypes.StringDType], + np.broadcast_to( + x.manifest._paths, + shape=new_chunk_grid_shape, + ), ) + broadcasted_offsets = np.broadcast_to( x.manifest._offsets, shape=new_chunk_grid_shape, diff --git a/virtualizarr/manifests/manifest.py b/virtualizarr/manifests/manifest.py index 3aaebb41..1933844a 100644 --- a/virtualizarr/manifests/manifest.py +++ b/virtualizarr/manifests/manifest.py @@ -5,7 +5,6 @@ from typing import Any, Callable, Dict, NewType, Tuple, TypedDict, cast import numpy as np -from upath import UPath from virtualizarr.types import ChunkKey @@ -41,6 +40,8 @@ class ChunkEntry: def from_kerchunk( cls, path_and_byte_range_info: tuple[str] | tuple[str, int, int] ) -> "ChunkEntry": + from upath import UPath + if len(path_and_byte_range_info) == 1: path = path_and_byte_range_info[0] offset = 0 @@ -84,11 +85,11 @@ class ChunkManifest: so it's not possible to have a ChunkManifest object that does not represent a valid grid of chunks. """ - _paths: np.ndarray[Any, np.dtypes.StringDType] # type: ignore[name-defined] + _paths: np.ndarray[Any, np.dtypes.StringDType] _offsets: np.ndarray[Any, np.dtype[np.uint64]] _lengths: np.ndarray[Any, np.dtype[np.uint64]] - def __init__(self, entries: dict) -> None: + def __init__(self, entries: dict, shape: tuple[int, ...] | None = None) -> None: """ Create a ChunkManifest from a dictionary mapping zarr chunk keys to byte ranges. @@ -104,16 +105,20 @@ def __init__(self, entries: dict) -> None: "0.1.1": {"path": "s3://bucket/foo.nc", "offset": 400, "length": 100}, } """ + if shape is None and not entries: + raise ValueError("need a chunk grid shape if no chunks given") # TODO do some input validation here first? validate_chunk_keys(entries.keys()) - # TODO should we actually optionally pass chunk grid shape in, - # in case there are not enough chunks to give correct idea of full shape? - shape = get_chunk_grid_shape(entries.keys()) + if shape is None: + shape = get_chunk_grid_shape(entries.keys()) # Initializing to empty implies that entries with path='' are treated as missing chunks - paths = np.empty(shape=shape, dtype=np.dtypes.StringDType()) # type: ignore[attr-defined] + paths = cast( # `np.empty` apparently is type hinted as if the output could have Any dtype + np.ndarray[Any, np.dtypes.StringDType], + np.empty(shape=shape, dtype=np.dtypes.StringDType()), + ) offsets = np.empty(shape=shape, dtype=np.dtype("uint64")) lengths = np.empty(shape=shape, dtype=np.dtype("uint64")) @@ -141,7 +146,7 @@ def __init__(self, entries: dict) -> None: @classmethod def from_arrays( cls, - paths: np.ndarray[Any, np.dtype[np.dtypes.StringDType]], # type: ignore[name-defined] + paths: np.ndarray[Any, np.dtypes.StringDType], offsets: np.ndarray[Any, np.dtype[np.uint64]], lengths: np.ndarray[Any, np.dtype[np.uint64]], ) -> "ChunkManifest": @@ -306,7 +311,9 @@ def _from_kerchunk_chunk_dict( chunk_entries: dict[ChunkKey, ChunkDictEntry] = {} for k, v in kerchunk_chunk_dict.items(): if isinstance(v, (str, bytes)): - raise NotImplementedError("TODO: handle inlined data") + raise NotImplementedError( + "Reading inlined reference data is currently not supported. [ToDo]" + ) elif not isinstance(v, (tuple, list)): raise TypeError(f"Unexpected type {type(v)} for chunk value: {v}") chunk_entries[k] = ChunkEntry.from_kerchunk(v).dict() @@ -380,6 +387,9 @@ def get_ndim_from_key(key: str) -> int: def validate_chunk_keys(chunk_keys: Iterable[ChunkKey]): + if not chunk_keys: + return + # Check if all keys have the correct form for key in chunk_keys: if not re.match(_CHUNK_KEY, key): diff --git a/virtualizarr/readers/__init__.py b/virtualizarr/readers/__init__.py new file mode 100644 index 00000000..c776a9ae --- /dev/null +++ b/virtualizarr/readers/__init__.py @@ -0,0 +1,19 @@ +from virtualizarr.readers.dmrpp import DMRPPVirtualBackend +from virtualizarr.readers.fits import FITSVirtualBackend +from virtualizarr.readers.hdf import HDFVirtualBackend +from virtualizarr.readers.hdf5 import HDF5VirtualBackend +from virtualizarr.readers.kerchunk import KerchunkVirtualBackend +from virtualizarr.readers.netcdf3 import NetCDF3VirtualBackend +from virtualizarr.readers.tiff import TIFFVirtualBackend +from virtualizarr.readers.zarr_v3 import ZarrV3VirtualBackend + +__all__ = [ + "DMRPPVirtualBackend", + "FITSVirtualBackend", + "HDFVirtualBackend", + "HDF5VirtualBackend", + "KerchunkVirtualBackend", + "NetCDF3VirtualBackend", + "TIFFVirtualBackend", + "ZarrV3VirtualBackend", +] diff --git a/virtualizarr/readers/common.py b/virtualizarr/readers/common.py new file mode 100644 index 00000000..54aedfe2 --- /dev/null +++ b/virtualizarr/readers/common.py @@ -0,0 +1,195 @@ +import os +import warnings +from abc import ABC +from collections.abc import Iterable, Mapping, MutableMapping +from io import BufferedIOBase +from typing import ( + TYPE_CHECKING, + Any, + Hashable, + Optional, + cast, +) + +import xarray as xr +from xarray import Dataset +from xarray.backends import AbstractDataStore, BackendArray +from xarray.core.indexes import Index, PandasIndex +from xarray.core.variable import IndexVariable, Variable + +from virtualizarr.manifests import ManifestArray +from virtualizarr.utils import _FsspecFSFromFilepath + +XArrayOpenT = str | os.PathLike[Any] | BufferedIOBase | AbstractDataStore + +if TYPE_CHECKING: + try: + from xarray import DataTree # type: ignore[attr-defined] + except ImportError: + DataTree = Any + + +class ManifestBackendArray(ManifestArray, BackendArray): + """Using this prevents xarray from wrapping the KerchunkArray in ExplicitIndexingAdapter etc.""" + + ... + + +def open_loadable_vars_and_indexes( + filepath: str, + loadable_variables, + reader_options, + drop_variables, + indexes, + group, + decode_times, +) -> tuple[Mapping[str, Variable], Mapping[str, Index]]: + """ + Open selected variables and indexes using xarray. + + Relies on xr.open_dataset and its auto-detection of filetypes to find the correct installed backend. + """ + + # TODO get rid of this if? + if indexes is None or len(loadable_variables) > 0: + # TODO we are reading a bunch of stuff we know we won't need here, e.g. all of the data variables... + # TODO it would also be nice if we could somehow consolidate this with the reading of the kerchunk references + # TODO really we probably want a dedicated xarray backend that iterates over all variables only once + fpath = _FsspecFSFromFilepath( + filepath=filepath, reader_options=reader_options + ).open_file() + + # fpath can be `Any` thanks to fsspec.filesystem(...).open() returning Any. + # We'll (hopefully safely) cast it to what xarray is expecting, but this might let errors through. + + ds = xr.open_dataset( + cast(XArrayOpenT, fpath), + drop_variables=drop_variables, + group=group, + decode_times=decode_times, + ) + + if indexes is None: + warnings.warn( + "Specifying `indexes=None` will create in-memory pandas indexes for each 1D coordinate, but concatenation of ManifestArrays backed by pandas indexes is not yet supported (see issue #18)." + "You almost certainly want to pass `indexes={}` to `open_virtual_dataset` instead." + ) + + # add default indexes by reading data from file + indexes = {name: index for name, index in ds.xindexes.items()} + elif indexes != {}: + # TODO allow manual specification of index objects + raise NotImplementedError() + else: + indexes = dict(**indexes) # for type hinting: to allow mutation + + # TODO we should drop these earlier by using drop_variables + loadable_vars = { + str(name): var + for name, var in ds.variables.items() + if name in loadable_variables + } + + # if we only read the indexes we can just close the file right away as nothing is lazy + if loadable_vars == {}: + ds.close() + else: + loadable_vars = {} + indexes = {} + + return loadable_vars, indexes + + +def construct_virtual_dataset( + virtual_vars, + loadable_vars, + indexes, + coord_names, + attrs, +) -> Dataset: + """Construct a virtual Datset from consistuent parts.""" + + vars = {**virtual_vars, **loadable_vars} + + data_vars, coords = separate_coords(vars, indexes, coord_names) + + vds = xr.Dataset( + data_vars, + coords=coords, + # indexes={}, # TODO should be added in a later version of xarray + attrs=attrs, + ) + + # TODO we should probably also use vds.set_close() to tell xarray how to close the file we opened + + return vds + + +def separate_coords( + vars: Mapping[str, xr.Variable], + indexes: MutableMapping[str, Index], + coord_names: Iterable[str] | None = None, +) -> tuple[dict[str, xr.Variable], xr.Coordinates]: + """ + Try to generate a set of coordinates that won't cause xarray to automatically build a pandas.Index for the 1D coordinates. + + Currently requires this function as a workaround unless xarray PR #8124 is merged. + + Will also preserve any loaded variables and indexes it is passed. + """ + + if coord_names is None: + coord_names = [] + + # split data and coordinate variables (promote dimension coordinates) + data_vars = {} + coord_vars: dict[ + str, tuple[Hashable, Any, dict[Any, Any], dict[Any, Any]] | xr.Variable + ] = {} + for name, var in vars.items(): + if name in coord_names or var.dims == (name,): + # use workaround to avoid creating IndexVariables described here https://github.com/pydata/xarray/pull/8107#discussion_r1311214263 + if len(var.dims) == 1: + dim1d, *_ = var.dims + coord_vars[name] = (dim1d, var.data, var.attrs, var.encoding) + + if isinstance(var, IndexVariable): + # unless variable actually already is a loaded IndexVariable, + # in which case we need to keep it and add the corresponding indexes explicitly + coord_vars[str(name)] = var + # TODO this seems suspect - will it handle datetimes? + indexes[name] = PandasIndex(var, dim1d) + else: + coord_vars[name] = var + else: + data_vars[name] = var + + coords = xr.Coordinates(coord_vars, indexes=indexes) + + return data_vars, coords + + +class VirtualBackend(ABC): + @staticmethod + def open_virtual_dataset( + filepath: str, + group: str | None = None, + drop_variables: Iterable[str] | None = None, + loadable_variables: Iterable[str] | None = None, + decode_times: bool | None = None, + indexes: Mapping[str, Index] | None = None, + reader_options: Optional[dict] = None, + ) -> Dataset: + raise NotImplementedError() + + @staticmethod + def open_virtual_datatree( + path: str, + group: str | None = None, + drop_variables: Iterable[str] | None = None, + loadable_variables: Iterable[str] | None = None, + decode_times: bool | None = None, + indexes: Mapping[str, Index] | None = None, + reader_options: Optional[dict] = None, + ) -> "DataTree": + raise NotImplementedError() diff --git a/virtualizarr/readers/dmrpp.py b/virtualizarr/readers/dmrpp.py index fa66205a..766b1c62 100644 --- a/virtualizarr/readers/dmrpp.py +++ b/virtualizarr/readers/dmrpp.py @@ -2,18 +2,55 @@ import warnings from collections import defaultdict from collections.abc import Mapping -from typing import Any, Optional +from typing import Any, Iterable, Optional from xml.etree import ElementTree as ET import numpy as np -import xarray as xr +from xarray import Coordinates, Dataset from xarray.core.indexes import Index +from xarray.core.variable import Variable from virtualizarr.manifests import ChunkManifest, ManifestArray +from virtualizarr.readers.common import VirtualBackend from virtualizarr.types import ChunkKey +from virtualizarr.utils import _FsspecFSFromFilepath, check_for_collisions from virtualizarr.zarr import ZArray +class DMRPPVirtualBackend(VirtualBackend): + @staticmethod + def open_virtual_dataset( + filepath: str, + group: str | None = None, + drop_variables: Iterable[str] | None = None, + loadable_variables: Iterable[str] | None = None, + decode_times: bool | None = None, + indexes: Mapping[str, Index] | None = None, + reader_options: Optional[dict] = None, + ) -> Dataset: + loadable_variables, drop_variables = check_for_collisions( + drop_variables=drop_variables, + loadable_variables=loadable_variables, + ) + + if loadable_variables != [] or decode_times or indexes is None: + raise NotImplementedError( + "Specifying `loadable_variables` or auto-creating indexes with `indexes=None` is not supported for dmrpp files." + ) + + if group: + raise NotImplementedError() + + fpath = _FsspecFSFromFilepath( + filepath=filepath, reader_options=reader_options + ).open_file() + + parser = DMRParser(fpath.read(), data_filepath=filepath.strip(".dmrpp")) + vds = parser.parse_dataset() + + return vds.drop_vars(drop_variables) + + class DMRParser: """ Parser for the OPeNDAP DMR++ XML format. @@ -69,9 +106,7 @@ def __init__(self, dmr: str, data_filepath: Optional[str] = None): data_filepath if data_filepath is not None else self.root.attrib["name"] ) - def parse_dataset( - self, group=None, indexes: Mapping[str, Index] = {} - ) -> xr.Dataset: + def parse_dataset(self, group=None, indexes: Mapping[str, Index] = {}) -> Dataset: """ Parses the given file and creates a virtual xr.Dataset with ManifestArrays. @@ -128,7 +163,7 @@ def _parse_netcdf4_dataset( root: ET.Element, group: Optional[str] = None, indexes: Mapping[str, Index] = {}, - ) -> xr.Dataset: + ) -> Dataset: """ Parse the dataset from the netcdf4 based dmrpp with groups, starting at the given group. Set root to the given group. @@ -201,7 +236,7 @@ def _parse_hdf5_dataset( root: ET.Element, group: Optional[str] = None, indexes: Mapping[str, Index] = {}, - ) -> xr.Dataset: + ) -> Dataset: """ Parse the dataset from the HDF5 based dmrpp with groups, starting at the given group. Set root to the given group. @@ -331,7 +366,7 @@ def _split_hdf5(self, root: ET.Element) -> dict[str, ET.Element]: def _parse_dataset( self, root: ET.Element, indexes: Mapping[str, Index] = {} - ) -> xr.Dataset: + ) -> Dataset: """ Parse the dataset using the root element of the DMR file. @@ -353,8 +388,8 @@ def _parse_dataset( if len(coord_names) == 0 or len(coord_names) < len(dataset_dims): coord_names = set(dataset_dims.keys()) # Seperate and parse coords + data variables - coord_vars: dict[str, xr.Variable] = {} - data_vars: dict[str, xr.Variable] = {} + coord_vars: dict[str, Variable] = {} + data_vars: dict[str, Variable] = {} for var_tag in self._find_var_tags(root): variable = self._parse_variable(var_tag, dataset_dims) if var_tag.attrib["name"] in coord_names: @@ -365,9 +400,9 @@ def _parse_dataset( attrs: dict[str, str] = {} for attr_tag in self.root.iterfind("dap:Attribute", self._ns): attrs.update(self._parse_attribute(attr_tag)) - return xr.Dataset( + return Dataset( data_vars=data_vars, - coords=xr.Coordinates(coords=coord_vars, indexes=indexes), + coords=Coordinates(coords=coord_vars, indexes=indexes), attrs=attrs, ) @@ -484,7 +519,7 @@ def _parse_multi_dims( def _parse_variable( self, var_tag: ET.Element, dataset_dims: dict[str, int] - ) -> xr.Variable: + ) -> Variable: """ Parse a variable from a DMR tag. @@ -542,7 +577,7 @@ def _parse_variable( ) marr = ManifestArray(zarray=zarray, chunkmanifest=chunkmanifest) encoding = {k: attrs.get(k) for k in self._encoding_keys if k in attrs} - return xr.Variable( + return Variable( dims=dim_shapes.keys(), data=marr, attrs=attrs, encoding=encoding ) diff --git a/virtualizarr/readers/fits.py b/virtualizarr/readers/fits.py new file mode 100644 index 00000000..618d81cd --- /dev/null +++ b/virtualizarr/readers/fits.py @@ -0,0 +1,59 @@ +from typing import Iterable, Mapping, Optional + +from xarray import Dataset +from xarray.core.indexes import Index + +from virtualizarr.readers.common import ( + VirtualBackend, + construct_virtual_dataset, + open_loadable_vars_and_indexes, +) +from virtualizarr.translators.kerchunk import ( + extract_group, + virtual_vars_and_metadata_from_kerchunk_refs, +) +from virtualizarr.types.kerchunk import KerchunkStoreRefs + + +class FITSVirtualBackend(VirtualBackend): + @staticmethod + def open_virtual_dataset( + filepath: str, + group: str | None = None, + drop_variables: Iterable[str] | None = None, + loadable_variables: Iterable[str] | None = None, + decode_times: bool | None = None, + indexes: Mapping[str, Index] | None = None, + reader_options: Optional[dict] = None, + ) -> Dataset: + from kerchunk.fits import process_file + + # handle inconsistency in kerchunk, see GH issue https://github.com/zarr-developers/VirtualiZarr/issues/160 + refs = KerchunkStoreRefs({"refs": process_file(filepath, **reader_options)}) + + refs = extract_group(refs, group) + + virtual_vars, attrs, coord_names = virtual_vars_and_metadata_from_kerchunk_refs( + refs, + loadable_variables, + drop_variables, + ) + + # TODO this wouldn't work until you had an xarray backend for FITS installed + loadable_vars, indexes = open_loadable_vars_and_indexes( + filepath, + loadable_variables=loadable_variables, + reader_options=reader_options, + drop_variables=drop_variables, + indexes=indexes, + group=group, + decode_times=decode_times, + ) + + return construct_virtual_dataset( + virtual_vars=virtual_vars, + loadable_vars=loadable_vars, + indexes=indexes, + coord_names=coord_names, + attrs=attrs, + ) diff --git a/virtualizarr/readers/hdf.py b/virtualizarr/readers/hdf.py index 65b97eeb..30dd402f 100644 --- a/virtualizarr/readers/hdf.py +++ b/virtualizarr/readers/hdf.py @@ -1,261 +1,319 @@ import math -from typing import Dict, List, Optional, Union +from typing import Dict, Iterable, List, Mapping, Optional, Union import h5py # type: ignore import numpy as np -import xarray as xr +from xarray import Dataset, Variable +from xarray.core.indexes import Index from virtualizarr.manifests import ChunkEntry, ChunkManifest, ManifestArray +from virtualizarr.readers.common import ( + VirtualBackend, + construct_virtual_dataset, + open_loadable_vars_and_indexes, +) from virtualizarr.readers.hdf_filters import cfcodec_from_dataset, codecs_from_dataset from virtualizarr.types import ChunkKey -from virtualizarr.utils import _fsspec_openfile_from_filepath +from virtualizarr.utils import _FsspecFSFromFilepath, check_for_collisions from virtualizarr.zarr import ZArray -def _dataset_chunk_manifest( - path: str, dataset: h5py.Dataset -) -> Optional[ChunkManifest]: - """ - Generate ChunkManifest for HDF5 dataset. - - Parameters - ---------- - path: str - The path the HDF5 container file - dset : h5py.Dataset - HDF5 dataset for which to create a ChunkManifest - - Returns - ------- - ChunkManifest - A Virtualizarr ChunkManifest - """ - dsid = dataset.id - if dataset.chunks is None: - if dsid.get_offset() is None: - return None - else: - key_list = [0] * (len(dataset.shape) or 1) - key = ".".join(map(str, key_list)) - chunk_entry = ChunkEntry( - path=path, offset=dsid.get_offset(), length=dsid.get_storage_size() - ) - chunk_key = ChunkKey(key) - chunk_entries = {chunk_key: chunk_entry.dict()} - chunk_manifest = ChunkManifest(entries=chunk_entries) - return chunk_manifest - else: - num_chunks = dsid.get_num_chunks() - if num_chunks == 0: - raise ValueError("The dataset is chunked but contains no chunks") - shape = tuple(math.ceil(a / b) for a, b in zip(dataset.shape, dataset.chunks)) - paths = np.empty(shape, dtype=np.dtypes.StringDType) # type: ignore - offsets = np.empty(shape, dtype=np.uint64) - lengths = np.empty(shape, dtype=np.uint64) - - def get_key(blob): - return tuple([a // b for a, b in zip(blob.chunk_offset, dataset.chunks)]) +class HDFVirtualBackend(VirtualBackend): + @staticmethod + def open_virtual_dataset( + filepath: str, + group: str | None = None, + drop_variables: Iterable[str] | None = None, + loadable_variables: Iterable[str] | None = None, + decode_times: bool | None = None, + indexes: Mapping[str, Index] | None = None, + reader_options: Optional[dict] = None, + ) -> Dataset: + drop_variables, loadable_variables = check_for_collisions( + drop_variables, + loadable_variables, + ) - def add_chunk_info(blob): - key = get_key(blob) - paths[key] = path - offsets[key] = blob.byte_offset - lengths[key] = blob.size + virtual_vars = HDFVirtualBackend._virtual_vars_from_hdf( + path=filepath, + group=group, + drop_variables=drop_variables + loadable_variables, + reader_options=reader_options, + ) - has_chunk_iter = callable(getattr(dsid, "chunk_iter", None)) - if has_chunk_iter: - dsid.chunk_iter(add_chunk_info) - else: - for index in range(num_chunks): - add_chunk_info(dsid.get_chunk_info(index)) + loadable_vars, indexes = open_loadable_vars_and_indexes( + filepath, + loadable_variables=loadable_variables, + reader_options=reader_options, + drop_variables=drop_variables, + indexes=indexes, + group=group, + decode_times=decode_times, + ) - chunk_manifest = ChunkManifest.from_arrays( - paths=paths, offsets=offsets, lengths=lengths + attrs = HDFVirtualBackend._attrs_from_root_group( + path=filepath, reader_options=reader_options, group=group ) - return chunk_manifest + coord_names = attrs.pop("coordinates", []) -def _dataset_dims(dataset: h5py.Dataset) -> Union[List[str], List[None]]: - """ - Get a list of dimension scale names attached to input HDF5 dataset. + return construct_virtual_dataset( + virtual_vars=virtual_vars, + loadable_vars=loadable_vars, + indexes=indexes, + coord_names=coord_names, + attrs=attrs, + ) - This is required by the xarray package to work with Zarr arrays. Only - one dimension scale per dataset dimension is allowed. If dataset is - dimension scale, it will be considered as the dimension to itself. + @staticmethod + def _dataset_chunk_manifest( + path: str, dataset: h5py.Dataset + ) -> Optional[ChunkManifest]: + """ + Generate ChunkManifest for HDF5 dataset. - Parameters - ---------- - dataset : h5py.Dataset - HDF5 dataset. + Parameters + ---------- + path: str + The path the HDF5 container file + dset : h5py.Dataset + HDF5 dataset for which to create a ChunkManifest - Returns - ------- - list - List with HDF5 path names of dimension scales attached to input - dataset. - """ - dims = list() - rank = len(dataset.shape) - if rank: - for n in range(rank): - num_scales = len(dataset.dims[n]) - if num_scales == 1: - dims.append(dataset.dims[n][0].name[1:]) - elif h5py.h5ds.is_scale(dataset.id): - dims.append(dataset.name[1:]) - elif num_scales > 1: - raise ValueError( - f"{dataset.name}: {len(dataset.dims[n])} " - f"dimension scales attached to dimension #{n}" + Returns + ------- + ChunkManifest + A Virtualizarr ChunkManifest + """ + dsid = dataset.id + if dataset.chunks is None: + if dsid.get_offset() is None: + return None + else: + key_list = [0] * (len(dataset.shape) or 1) + key = ".".join(map(str, key_list)) + chunk_entry = ChunkEntry( + path=path, offset=dsid.get_offset(), length=dsid.get_storage_size() ) - elif num_scales == 0: - # Some HDF5 files do not have dimension scales. - # If this is the case, `num_scales` will be 0. - # In this case, we mimic netCDF4 and assign phony dimension names. - # See https://github.com/fsspec/kerchunk/issues/41 - dims.append(f"phony_dim_{n}") - return dims + chunk_key = ChunkKey(key) + chunk_entries = {chunk_key: chunk_entry.dict()} + chunk_manifest = ChunkManifest(entries=chunk_entries) + return chunk_manifest + else: + num_chunks = dsid.get_num_chunks() + if num_chunks == 0: + raise ValueError("The dataset is chunked but contains no chunks") + shape = tuple( + math.ceil(a / b) for a, b in zip(dataset.shape, dataset.chunks) + ) + paths = np.empty(shape, dtype=np.dtypes.StringDType) # type: ignore + offsets = np.empty(shape, dtype=np.uint64) + lengths = np.empty(shape, dtype=np.uint64) + def get_key(blob): + return tuple( + [a // b for a, b in zip(blob.chunk_offset, dataset.chunks)] + ) -def _extract_attrs(h5obj: Union[h5py.Dataset, h5py.Group]): - """ - Extract attributes from an HDF5 group or dataset. + def add_chunk_info(blob): + key = get_key(blob) + paths[key] = path + offsets[key] = blob.byte_offset + lengths[key] = blob.size - Parameters - ---------- - h5obj : h5py.Group or h5py.Dataset - An HDF5 group or dataset. - """ - _HIDDEN_ATTRS = { - "REFERENCE_LIST", - "CLASS", - "DIMENSION_LIST", - "NAME", - "_Netcdf4Dimid", - "_Netcdf4Coordinates", - "_nc3_strict", - "_NCProperties", - } - attrs = {} - for n, v in h5obj.attrs.items(): - if n in _HIDDEN_ATTRS: - continue - # Fix some attribute values to avoid JSON encoding exceptions... - if isinstance(v, bytes): - v = v.decode("utf-8") or " " - elif isinstance(v, (np.ndarray, np.number, np.bool_)): - if v.dtype.kind == "S": - v = v.astype(str) - if n == "_FillValue": - continue - elif v.size == 1: - v = v.flatten()[0] - if isinstance(v, (np.ndarray, np.number, np.bool_)): - v = v.tolist() + has_chunk_iter = callable(getattr(dsid, "chunk_iter", None)) + if has_chunk_iter: + dsid.chunk_iter(add_chunk_info) else: - v = v.tolist() - elif isinstance(v, h5py._hl.base.Empty): - v = "" - if v == "DIMENSION_SCALE": - continue + for index in range(num_chunks): + add_chunk_info(dsid.get_chunk_info(index)) - attrs[n] = v - return attrs + chunk_manifest = ChunkManifest.from_arrays( + paths=paths, offsets=offsets, lengths=lengths + ) + return chunk_manifest + @staticmethod + def _dataset_dims(dataset: h5py.Dataset) -> Union[List[str], List[None]]: + """ + Get a list of dimension scale names attached to input HDF5 dataset. -def _dataset_to_variable(path: str, dataset: h5py.Dataset) -> Optional[xr.Variable]: - # This chunk determination logic mirrors zarr-python's create - # https://github.com/zarr-developers/zarr-python/blob/main/zarr/creation.py#L62-L66 + This is required by the xarray package to work with Zarr arrays. Only + one dimension scale per dataset dimension is allowed. If dataset is + dimension scale, it will be considered as the dimension to itself. - chunks = dataset.chunks if dataset.chunks else dataset.shape - codecs = codecs_from_dataset(dataset) - cfcodec = cfcodec_from_dataset(dataset) - attrs = _extract_attrs(dataset) - if cfcodec: - codecs.insert(0, cfcodec["codec"]) - dtype = cfcodec["target_dtype"] - attrs.pop("scale_factor", None) - attrs.pop("add_offset", None) - fill_value = cfcodec["codec"].decode(dataset.fillvalue) - else: - dtype = dataset.dtype - fill_value = dataset.fillvalue - if isinstance(fill_value, np.ndarray): - fill_value = fill_value[0] - if np.isnan(fill_value): - fill_value = float("nan") - if isinstance(fill_value, np.generic): - fill_value = fill_value.item() - filters = [codec.get_config() for codec in codecs] - zarray = ZArray( - chunks=chunks, - compressor=None, - dtype=dtype, - fill_value=fill_value, - filters=filters, - order="C", - shape=dataset.shape, - zarr_format=2, - ) - dims = _dataset_dims(dataset) - manifest = _dataset_chunk_manifest(path, dataset) - if manifest: - marray = ManifestArray(zarray=zarray, chunkmanifest=manifest) - variable = xr.Variable(data=marray, dims=dims, attrs=attrs) - else: - variable = xr.Variable(data=np.empty(dataset.shape), dims=dims, attrs=attrs) - return variable + Parameters + ---------- + dataset : h5py.Dataset + HDF5 dataset. + Returns + ------- + list + List with HDF5 path names of dimension scales attached to input + dataset. + """ + dims = list() + rank = len(dataset.shape) + if rank: + for n in range(rank): + num_scales = len(dataset.dims[n]) + if num_scales == 1: + dims.append(dataset.dims[n][0].name[1:]) + elif h5py.h5ds.is_scale(dataset.id): + dims.append(dataset.name[1:]) + elif num_scales > 1: + raise ValueError( + f"{dataset.name}: {len(dataset.dims[n])} " + f"dimension scales attached to dimension #{n}" + ) + elif num_scales == 0: + # Some HDF5 files do not have dimension scales. + # If this is the case, `num_scales` will be 0. + # In this case, we mimic netCDF4 and assign phony dimension names. + # See https://github.com/fsspec/kerchunk/issues/41 + dims.append(f"phony_dim_{n}") + return dims -def virtual_vars_from_hdf( - path: str, - group: Optional[str] = None, - drop_variables: Optional[List[str]] = None, - reader_options: Optional[dict] = { - "storage_options": {"key": "", "secret": "", "anon": True} - }, -) -> Dict[str, xr.Variable]: - if drop_variables is None: - drop_variables = [] - open_file = _fsspec_openfile_from_filepath( - filepath=path, reader_options=reader_options - ) - f = h5py.File(open_file, mode="r") - if group: - g = f[group] - if not isinstance(g, h5py.Group): - raise ValueError("The provided group is not an HDF group") - else: - g = f - variables = {} - for key in g.keys(): - if key not in drop_variables: - if isinstance(g[key], h5py.Dataset): - variable = _dataset_to_variable(path, g[key]) - if variable is not None: - variables[key] = variable - else: - raise NotImplementedError("Nested groups are not yet supported") + @staticmethod + def _extract_attrs(h5obj: Union[h5py.Dataset, h5py.Group]): + """ + Extract attributes from an HDF5 group or dataset. + + Parameters + ---------- + h5obj : h5py.Group or h5py.Dataset + An HDF5 group or dataset. + """ + _HIDDEN_ATTRS = { + "REFERENCE_LIST", + "CLASS", + "DIMENSION_LIST", + "NAME", + "_Netcdf4Dimid", + "_Netcdf4Coordinates", + "_nc3_strict", + "_NCProperties", + } + attrs = {} + for n, v in h5obj.attrs.items(): + if n in _HIDDEN_ATTRS: + continue + # Fix some attribute values to avoid JSON encoding exceptions... + if isinstance(v, bytes): + v = v.decode("utf-8") or " " + elif isinstance(v, (np.ndarray, np.number, np.bool_)): + if v.dtype.kind == "S": + v = v.astype(str) + if n == "_FillValue": + continue + elif v.size == 1: + v = v.flatten()[0] + if isinstance(v, (np.ndarray, np.number, np.bool_)): + v = v.tolist() + else: + v = v.tolist() + elif isinstance(v, h5py._hl.base.Empty): + v = "" + if v == "DIMENSION_SCALE": + continue - return variables + attrs[n] = v + return attrs + @staticmethod + def _dataset_to_variable(path: str, dataset: h5py.Dataset) -> Optional[Variable]: + # This chunk determination logic mirrors zarr-python's create + # https://github.com/zarr-developers/zarr-python/blob/main/zarr/creation.py#L62-L66 -def attrs_from_root_group( - path: str, - group: Optional[str] = None, - reader_options: Optional[dict] = { - "storage_options": {"key": "", "secret": "", "anon": True} - }, -): - open_file = _fsspec_openfile_from_filepath( - filepath=path, reader_options=reader_options - ) - f = h5py.File(open_file, mode="r") - if group: - g = f[group] - if not isinstance(g, h5py.Group): - raise ValueError("The provided group is not an HDF group") - else: - g = f - attrs = _extract_attrs(g) - return attrs + chunks = dataset.chunks if dataset.chunks else dataset.shape + codecs = codecs_from_dataset(dataset) + cfcodec = cfcodec_from_dataset(dataset) + attrs = HDFVirtualBackend._extract_attrs(dataset) + if cfcodec: + codecs.insert(0, cfcodec["codec"]) + dtype = cfcodec["target_dtype"] + attrs.pop("scale_factor", None) + attrs.pop("add_offset", None) + fill_value = cfcodec["codec"].decode(dataset.fillvalue) + else: + dtype = dataset.dtype + fill_value = dataset.fillvalue + if isinstance(fill_value, np.ndarray): + fill_value = fill_value[0] + if np.isnan(fill_value): + fill_value = float("nan") + if isinstance(fill_value, np.generic): + fill_value = fill_value.item() + filters = [codec.get_config() for codec in codecs] + zarray = ZArray( + chunks=chunks, + compressor=None, + dtype=dtype, + fill_value=fill_value, + filters=filters, + order="C", + shape=dataset.shape, + zarr_format=2, + ) + dims = HDFVirtualBackend._dataset_dims(dataset) + manifest = HDFVirtualBackend._dataset_chunk_manifest(path, dataset) + if manifest: + marray = ManifestArray(zarray=zarray, chunkmanifest=manifest) + variable = Variable(data=marray, dims=dims, attrs=attrs) + else: + variable = Variable(data=np.empty(dataset.shape), dims=dims, attrs=attrs) + return variable + + @staticmethod + def _virtual_vars_from_hdf( + path: str, + group: Optional[str] = None, + drop_variables: Optional[List[str]] = None, + reader_options: Optional[dict] = { + "storage_options": {"key": "", "secret": "", "anon": True} + }, + ) -> Dict[str, Variable]: + if drop_variables is None: + drop_variables = [] + open_file = _FsspecFSFromFilepath( + filepath=path, reader_options=reader_options + ).open_file() + f = h5py.File(open_file, mode="r") + if group: + g = f[group] + if not isinstance(g, h5py.Group): + raise ValueError("The provided group is not an HDF group") + else: + g = f + variables = {} + for key in g.keys(): + if key not in drop_variables: + if isinstance(g[key], h5py.Dataset): + variable = HDFVirtualBackend._dataset_to_variable(path, g[key]) + if variable is not None: + variables[key] = variable + else: + raise NotImplementedError("Nested groups are not yet supported") + + return variables + + @staticmethod + def _attrs_from_root_group( + path: str, + group: Optional[str] = None, + reader_options: Optional[dict] = { + "storage_options": {"key": "", "secret": "", "anon": True} + }, + ): + open_file = _FsspecFSFromFilepath( + filepath=path, reader_options=reader_options + ).open_file() + f = h5py.File(open_file, mode="r") + if group: + g = f[group] + if not isinstance(g, h5py.Group): + raise ValueError("The provided group is not an HDF group") + else: + g = f + attrs = HDFVirtualBackend._extract_attrs(g) + return attrs diff --git a/virtualizarr/readers/hdf5.py b/virtualizarr/readers/hdf5.py new file mode 100644 index 00000000..c0d38e20 --- /dev/null +++ b/virtualizarr/readers/hdf5.py @@ -0,0 +1,64 @@ +from typing import Iterable, Mapping, Optional + +from xarray import Dataset +from xarray.core.indexes import Index + +from virtualizarr.readers.common import ( + VirtualBackend, + construct_virtual_dataset, + open_loadable_vars_and_indexes, +) +from virtualizarr.translators.kerchunk import ( + extract_group, + virtual_vars_and_metadata_from_kerchunk_refs, +) +from virtualizarr.utils import check_for_collisions + + +class HDF5VirtualBackend(VirtualBackend): + @staticmethod + def open_virtual_dataset( + filepath: str, + group: str | None = None, + drop_variables: Iterable[str] | None = None, + loadable_variables: Iterable[str] | None = None, + decode_times: bool | None = None, + indexes: Mapping[str, Index] | None = None, + reader_options: Optional[dict] = None, + ) -> Dataset: + from kerchunk.hdf import SingleHdf5ToZarr + + drop_variables, loadable_variables = check_for_collisions( + drop_variables, + loadable_variables, + ) + + refs = SingleHdf5ToZarr( + filepath, inline_threshold=0, **reader_options + ).translate() + + refs = extract_group(refs, group) + + virtual_vars, attrs, coord_names = virtual_vars_and_metadata_from_kerchunk_refs( + refs, + loadable_variables, + drop_variables, + ) + + loadable_vars, indexes = open_loadable_vars_and_indexes( + filepath, + loadable_variables=loadable_variables, + reader_options=reader_options, + drop_variables=drop_variables, + indexes=indexes, + group=group, + decode_times=decode_times, + ) + + return construct_virtual_dataset( + virtual_vars=virtual_vars, + loadable_vars=loadable_vars, + indexes=indexes, + coord_names=coord_names, + attrs=attrs, + ) diff --git a/virtualizarr/readers/kerchunk.py b/virtualizarr/readers/kerchunk.py index 19a8c28d..35fa4932 100644 --- a/virtualizarr/readers/kerchunk.py +++ b/virtualizarr/readers/kerchunk.py @@ -1,309 +1,69 @@ -import warnings -from pathlib import Path -from typing import Any, MutableMapping, Optional, cast +from typing import Iterable, Mapping, Optional -import ujson # type: ignore +import ujson from xarray import Dataset from xarray.core.indexes import Index -from xarray.core.variable import Variable -from virtualizarr.backend import FileType, separate_coords -from virtualizarr.manifests import ChunkManifest, ManifestArray +from virtualizarr.readers.common import VirtualBackend +from virtualizarr.translators.kerchunk import dataset_from_kerchunk_refs from virtualizarr.types.kerchunk import ( - KerchunkArrRefs, KerchunkStoreRefs, ) -from virtualizarr.utils import _fsspec_openfile_from_filepath -from virtualizarr.zarr import ZArray, ZAttrs - - -# TODO shouldn't this live in backend.py? Because it's not just useful for the kerchunk-specific readers... -def _automatically_determine_filetype( - *, - filepath: str, - reader_options: Optional[dict[str, Any]] = {}, -) -> FileType: - if Path(filepath).suffix == ".zarr": - # TODO we could imagine opening an existing zarr store, concatenating it, and writing a new virtual one... - raise NotImplementedError() - - # Read magic bytes from local or remote file - fpath = _fsspec_openfile_from_filepath( - filepath=filepath, reader_options=reader_options - ) - magic_bytes = fpath.read(8) - fpath.close() - - if magic_bytes.startswith(b"CDF"): - filetype = FileType.netcdf3 - elif magic_bytes.startswith(b"\x0e\x03\x13\x01"): - raise NotImplementedError("HDF4 formatted files not supported") - elif magic_bytes.startswith(b"\x89HDF"): - filetype = FileType.hdf5 - elif magic_bytes.startswith(b"GRIB"): - filetype = FileType.grib - elif magic_bytes.startswith(b"II*"): - filetype = FileType.tiff - elif magic_bytes.startswith(b"SIMPLE"): - filetype = FileType.fits - else: - raise NotImplementedError( - f"Unrecognised file based on header bytes: {magic_bytes}" - ) - - return filetype - - -def read_kerchunk_references_from_file( - filepath: str, - filetype: FileType | None, - group: str | None, - reader_options: Optional[dict[str, Any]] = None, -) -> KerchunkStoreRefs: - """ - Read a single legacy file and return kerchunk references to its contents. - - Parameters - ---------- - filepath : str, default: None - File path to open as a set of virtualized zarr arrays. - filetype : FileType, default: None - Type of file to be opened. Used to determine which kerchunk file format backend to use. - If not provided will attempt to automatically infer the correct filetype from the the filepath's extension. - group : str, default is None - Path to the HDF5/netCDF4 group in the given file to open. Given as a str, supported by filetypes “netcdf4” and “hdf5”. - Dict passed into Kerchunk file readers. Note: Each Kerchunk file reader has distinct arguments, - so ensure reader_options match selected Kerchunk reader arguments. - """ - - if reader_options is None: - reader_options = {} - - if filetype is None: - filetype = _automatically_determine_filetype( - filepath=filepath, reader_options=reader_options +from virtualizarr.utils import _FsspecFSFromFilepath, check_for_collisions + + +class KerchunkVirtualBackend(VirtualBackend): + @staticmethod + def open_virtual_dataset( + filepath: str, + group: str | None = None, + drop_variables: Iterable[str] | None = None, + loadable_variables: Iterable[str] | None = None, + decode_times: bool | None = None, + indexes: Mapping[str, Index] | None = None, + reader_options: Optional[dict] = None, + ) -> Dataset: + """Reads existing kerchunk references (in JSON or parquet) format.""" + + if group: + raise NotImplementedError() + + loadable_variables, drop_variables = check_for_collisions( + drop_variables=drop_variables, + loadable_variables=loadable_variables, ) - # if filetype is user defined, convert to FileType - filetype = FileType(filetype) + if loadable_variables or indexes or decode_times: + raise NotImplementedError() - if filetype.name.lower() == "netcdf3": - from kerchunk.netCDF3 import NetCDF3ToZarr + fs = _FsspecFSFromFilepath(filepath=filepath, reader_options=reader_options) - refs = NetCDF3ToZarr(filepath, inline_threshold=0, **reader_options).translate() + # The kerchunk .parquet storage format isn't actually a parquet, but a directory that contains named parquets for each group/variable. + if fs.filepath.endswith("ref.parquet"): + from fsspec.implementations.reference import LazyReferenceMapper - elif filetype.name.lower() == "hdf5" or filetype.name.lower() == "netcdf4": - from kerchunk.hdf import SingleHdf5ToZarr + lrm = LazyReferenceMapper(filepath, fs.fs) - refs = SingleHdf5ToZarr( - filepath, inline_threshold=0, **reader_options - ).translate() + # build reference dict from KV pairs in LazyReferenceMapper + # is there a better / more preformant way to extract this? + array_refs = {k: lrm[k] for k in lrm.keys()} - refs = extract_group(refs, group) - - elif filetype.name.lower() == "grib": - # TODO Grib files should be handled as a DataTree object - # see https://github.com/TomNicholas/VirtualiZarr/issues/11 - raise NotImplementedError(f"Unsupported file type: {filetype}") - elif filetype.name.lower() == "tiff": - from kerchunk.tiff import tiff_to_zarr - - reader_options.pop("storage_options", {}) - warnings.warn( - "storage_options have been dropped from reader_options as they are not supported by kerchunk.tiff.tiff_to_zarr", - UserWarning, - ) + full_reference = {"refs": array_refs} - # handle inconsistency in kerchunk, see GH issue https://github.com/zarr-developers/VirtualiZarr/issues/160 - refs = {"refs": tiff_to_zarr(filepath, **reader_options)} - elif filetype.name.lower() == "fits": - from kerchunk.fits import process_file + vds = dataset_from_kerchunk_refs(KerchunkStoreRefs(full_reference)) - # handle inconsistency in kerchunk, see GH issue https://github.com/zarr-developers/VirtualiZarr/issues/160 - refs = {"refs": process_file(filepath, **reader_options)} - else: - raise NotImplementedError(f"Unsupported file type: {filetype.name}") + # JSON has no magic bytes, but the Kerchunk version 1 spec starts with 'version': + # https://fsspec.github.io/kerchunk/spec.html + elif fs.read_bytes(9).startswith(b'{"version'): + with fs.open_file() as of: + refs = ujson.load(of) - # TODO validate the references that were read before returning? - return refs + vds = dataset_from_kerchunk_refs(KerchunkStoreRefs(refs)) - -def extract_group(vds_refs: KerchunkStoreRefs, group: str | None) -> KerchunkStoreRefs: - """Extract only the part of the kerchunk reference dict that is relevant to a single HDF group""" - hdf_groups = [ - k.removesuffix(".zgroup") for k in vds_refs["refs"].keys() if ".zgroup" in k - ] - if len(hdf_groups) == 1: - return vds_refs - else: - if group is None: + else: raise ValueError( - f"Multiple HDF Groups found. Must specify group= keyword to select one of {hdf_groups}" + "The input Kerchunk reference did not seem to be in Kerchunk's JSON or Parquet spec: https://fsspec.github.io/kerchunk/spec.html. The Kerchunk format autodetection is quite flaky, so if your reference matches the Kerchunk spec feel free to open an issue: https://github.com/zarr-developers/VirtualiZarr/issues" ) - else: - # Ensure supplied group kwarg is consistent with kerchunk keys - if not group.endswith("/"): - group += "/" - if group.startswith("/"): - group = group.removeprefix("/") - - if group not in hdf_groups: - raise ValueError(f'Group "{group}" not found in {hdf_groups}') - - # Filter by group prefix and remove prefix from all keys - groupdict = { - k.removeprefix(group): v - for k, v in vds_refs["refs"].items() - if k.startswith(group) - } - # Also remove group prefix from _ARRAY_DIMENSIONS - for k, v in groupdict.items(): - if isinstance(v, str): - groupdict[k] = v.replace("\\/", "/").replace(group, "") - - vds_refs["refs"] = groupdict - - return KerchunkStoreRefs(vds_refs) - - -def virtual_vars_from_kerchunk_refs( - refs: KerchunkStoreRefs, - drop_variables: list[str] | None = None, - virtual_array_class=ManifestArray, -) -> dict[str, Variable]: - """ - Translate a store-level kerchunk reference dict into aaset of xarray Variables containing virtualized arrays. - - Parameters - ---------- - drop_variables: list[str], default is None - Variables in the file to drop before returning. - virtual_array_class - Virtual array class to use to represent the references to the chunks in each on-disk array. - Currently can only be ManifestArray, but once VirtualZarrArray is implemented the default should be changed to that. - """ - - var_names = find_var_names(refs) - if drop_variables is None: - drop_variables = [] - var_names_to_keep = [ - var_name for var_name in var_names if var_name not in drop_variables - ] - - vars = { - var_name: variable_from_kerchunk_refs(refs, var_name, virtual_array_class) - for var_name in var_names_to_keep - } - return vars - - -def dataset_from_kerchunk_refs( - refs: KerchunkStoreRefs, - drop_variables: list[str] = [], - virtual_array_class: type = ManifestArray, - indexes: MutableMapping[str, Index] | None = None, -) -> Dataset: - """ - Translate a store-level kerchunk reference dict into an xarray Dataset containing virtualized arrays. - - drop_variables: list[str], default is None - Variables in the file to drop before returning. - virtual_array_class - Virtual array class to use to represent the references to the chunks in each on-disk array. - Currently can only be ManifestArray, but once VirtualZarrArray is implemented the default should be changed to that. - """ - - vars = virtual_vars_from_kerchunk_refs(refs, drop_variables, virtual_array_class) - ds_attrs = fully_decode_arr_refs(refs["refs"]).get(".zattrs", {}) - coord_names = ds_attrs.pop("coordinates", []) - - if indexes is None: - indexes = {} - data_vars, coords = separate_coords(vars, indexes, coord_names) - - vds = Dataset( - data_vars, - coords=coords, - # indexes={}, # TODO should be added in a later version of xarray - attrs=ds_attrs, - ) - - return vds - - -def variable_from_kerchunk_refs( - refs: KerchunkStoreRefs, var_name: str, virtual_array_class -) -> Variable: - """Create a single xarray Variable by reading specific keys of a kerchunk references dict.""" - - arr_refs = extract_array_refs(refs, var_name) - chunk_dict, zarray, zattrs = parse_array_refs(arr_refs) - # we want to remove the _ARRAY_DIMENSIONS from the final variables' .attrs - dims = zattrs.pop("_ARRAY_DIMENSIONS") - if chunk_dict: - manifest = ChunkManifest._from_kerchunk_chunk_dict(chunk_dict) - varr = virtual_array_class(zarray=zarray, chunkmanifest=manifest) - else: - # This means we encountered a scalar variable of dimension 0, - # very likely that it actually has no numeric value and its only purpose - # is to communicate dataset attributes. - varr = zarray.fill_value - - return Variable(data=varr, dims=dims, attrs=zattrs) - - -def find_var_names(ds_reference_dict: KerchunkStoreRefs) -> list[str]: - """Find the names of zarr variables in this store/group.""" - - refs = ds_reference_dict["refs"] - found_var_names = {key.split("/")[0] for key in refs.keys() if "/" in key} - - return list(found_var_names) - - -def extract_array_refs( - ds_reference_dict: KerchunkStoreRefs, var_name: str -) -> KerchunkArrRefs: - """Extract only the part of the kerchunk reference dict that is relevant to this one zarr array""" - - found_var_names = find_var_names(ds_reference_dict) - - refs = ds_reference_dict["refs"] - if var_name in found_var_names: - # TODO these function probably have more loops in them than they need to... - - arr_refs = { - key.split("/")[1]: refs[key] - for key in refs.keys() - if var_name == key.split("/")[0] - } - - return fully_decode_arr_refs(arr_refs) - - else: - raise KeyError( - f"Could not find zarr array variable name {var_name}, only {found_var_names}" - ) - - -def parse_array_refs( - arr_refs: KerchunkArrRefs, -) -> tuple[dict, ZArray, ZAttrs]: - zarray = ZArray.from_kerchunk_refs(arr_refs.pop(".zarray")) - zattrs = arr_refs.pop(".zattrs", {}) - chunk_dict = arr_refs - - return chunk_dict, zarray, zattrs - - -def fully_decode_arr_refs(d: dict) -> KerchunkArrRefs: - """ - Only have to do this because kerchunk.SingleHdf5ToZarr apparently doesn't bother converting .zarray and .zattrs contents to dicts, see https://github.com/fsspec/kerchunk/issues/415 . - """ - sanitized = d.copy() - for k, v in d.items(): - if k.startswith("."): - # ensure contents of .zattrs and .zarray are python dictionaries - sanitized[k] = ujson.loads(v) - return cast(KerchunkArrRefs, sanitized) + # TODO would be more efficient to drop these before converting them into ManifestArrays, i.e. drop them from the kerchunk refs dict + return vds.drop_vars(drop_variables) diff --git a/virtualizarr/readers/netcdf3.py b/virtualizarr/readers/netcdf3.py new file mode 100644 index 00000000..30c6746e --- /dev/null +++ b/virtualizarr/readers/netcdf3.py @@ -0,0 +1,62 @@ +from typing import Iterable, Mapping, Optional + +from xarray import Dataset +from xarray.core.indexes import Index + +from virtualizarr.readers.common import ( + VirtualBackend, + construct_virtual_dataset, + open_loadable_vars_and_indexes, +) +from virtualizarr.translators.kerchunk import ( + extract_group, + virtual_vars_and_metadata_from_kerchunk_refs, +) +from virtualizarr.utils import check_for_collisions + + +class NetCDF3VirtualBackend(VirtualBackend): + @staticmethod + def open_virtual_dataset( + filepath: str, + group: str | None = None, + drop_variables: Iterable[str] | None = None, + loadable_variables: Iterable[str] | None = None, + decode_times: bool | None = None, + indexes: Mapping[str, Index] | None = None, + reader_options: Optional[dict] = None, + ) -> Dataset: + from kerchunk.netCDF3 import NetCDF3ToZarr + + drop_variables, loadable_variables = check_for_collisions( + drop_variables, + loadable_variables, + ) + + refs = NetCDF3ToZarr(filepath, inline_threshold=0, **reader_options).translate() + + refs = extract_group(refs, group) + + virtual_vars, attrs, coord_names = virtual_vars_and_metadata_from_kerchunk_refs( + refs, + loadable_variables, + drop_variables, + ) + + loadable_vars, indexes = open_loadable_vars_and_indexes( + filepath, + loadable_variables=loadable_variables, + reader_options=reader_options, + drop_variables=drop_variables, + indexes=indexes, + group=group, + decode_times=decode_times, + ) + + return construct_virtual_dataset( + virtual_vars=virtual_vars, + loadable_vars=loadable_vars, + indexes=indexes, + coord_names=coord_names, + attrs=attrs, + ) diff --git a/virtualizarr/readers/tiff.py b/virtualizarr/readers/tiff.py new file mode 100644 index 00000000..bb32e647 --- /dev/null +++ b/virtualizarr/readers/tiff.py @@ -0,0 +1,73 @@ +import warnings +from typing import Iterable, Mapping, Optional + +from xarray import Dataset +from xarray.core.indexes import Index + +from virtualizarr.readers.common import ( + VirtualBackend, + construct_virtual_dataset, + open_loadable_vars_and_indexes, +) +from virtualizarr.translators.kerchunk import ( + extract_group, + virtual_vars_and_metadata_from_kerchunk_refs, +) +from virtualizarr.types.kerchunk import KerchunkStoreRefs +from virtualizarr.utils import check_for_collisions + + +class TIFFVirtualBackend(VirtualBackend): + @staticmethod + def open_virtual_dataset( + filepath: str, + group: str | None = None, + drop_variables: Iterable[str] | None = None, + loadable_variables: Iterable[str] | None = None, + decode_times: bool | None = None, + indexes: Mapping[str, Index] | None = None, + reader_options: Optional[dict] = None, + ) -> Dataset: + from kerchunk.tiff import tiff_to_zarr + + drop_variables, loadable_variables = check_for_collisions( + drop_variables=drop_variables, loadable_variables=loadable_variables + ) + + if reader_options is None: + reader_options = {} + + reader_options.pop("storage_options", {}) + warnings.warn( + "storage_options have been dropped from reader_options as they are not supported by kerchunk.tiff.tiff_to_zarr", + UserWarning, + ) + + # handle inconsistency in kerchunk, see GH issue https://github.com/zarr-developers/VirtualiZarr/issues/160 + refs = KerchunkStoreRefs({"refs": tiff_to_zarr(filepath, **reader_options)}) + + refs = extract_group(refs, group) + + virtual_vars, attrs, coord_names = virtual_vars_and_metadata_from_kerchunk_refs( + refs, + loadable_variables, + drop_variables, + ) + + loadable_vars, indexes = open_loadable_vars_and_indexes( + filepath, + loadable_variables=loadable_variables, + reader_options=reader_options, + drop_variables=drop_variables, + indexes=indexes, + group=group, + decode_times=decode_times, + ) + + return construct_virtual_dataset( + virtual_vars=virtual_vars, + loadable_vars=loadable_vars, + indexes=indexes, + coord_names=coord_names, + attrs=attrs, + ) diff --git a/virtualizarr/readers/zarr.py b/virtualizarr/readers/zarr.py deleted file mode 100644 index 168faa2b..00000000 --- a/virtualizarr/readers/zarr.py +++ /dev/null @@ -1,131 +0,0 @@ -import json -from pathlib import Path -from typing import Mapping - -import numcodecs -import numpy as np -from xarray import Dataset -from xarray.core.indexes import Index -from xarray.core.variable import Variable - -from virtualizarr.backend import separate_coords -from virtualizarr.manifests import ChunkManifest, ManifestArray -from virtualizarr.zarr import ZArray - - -def open_virtual_dataset_from_v3_store( - storepath: str, - drop_variables: list[str] = [], - indexes: Mapping[str, Index] | None = None, -) -> Dataset: - """ - Read a Zarr v3 store and return an xarray Dataset containing virtualized arrays. - """ - _storepath = Path(storepath) - - ds_attrs = attrs_from_zarr_group_json(_storepath / "zarr.json") - coord_names = ds_attrs.pop("coordinates", []) - - # TODO recursive glob to create a datatree - # Note: this .is_file() check should not be necessary according to the pathlib docs, but tests fail on github CI without it - # see https://github.com/TomNicholas/VirtualiZarr/pull/45#discussion_r1547833166 - all_paths = _storepath.glob("*/") - directory_paths = [p for p in all_paths if not p.is_file()] - - vars = {} - for array_dir in directory_paths: - var_name = array_dir.name - if var_name in drop_variables: - break - - zarray, dim_names, attrs = metadata_from_zarr_json(array_dir / "zarr.json") - manifest = ChunkManifest.from_zarr_json(str(array_dir / "manifest.json")) - - marr = ManifestArray(chunkmanifest=manifest, zarray=zarray) - var = Variable(data=marr, dims=dim_names, attrs=attrs) - vars[var_name] = var - - if indexes is None: - raise NotImplementedError() - elif indexes != {}: - # TODO allow manual specification of index objects - raise NotImplementedError() - else: - indexes = dict(**indexes) # for type hinting: to allow mutation - - data_vars, coords = separate_coords(vars, indexes, coord_names) - - ds = Dataset( - data_vars, - coords=coords, - # indexes={}, # TODO should be added in a later version of xarray - attrs=ds_attrs, - ) - - return ds - - -def attrs_from_zarr_group_json(filepath: Path) -> dict: - with open(filepath) as metadata_file: - attrs = json.load(metadata_file) - return attrs["attributes"] - - -def metadata_from_zarr_json(filepath: Path) -> tuple[ZArray, list[str], dict]: - with open(filepath) as metadata_file: - metadata = json.load(metadata_file) - - if { - "name": "chunk-manifest-json", - "configuration": { - "manifest": "./manifest.json", - }, - } not in metadata.get("storage_transformers", []): - raise ValueError( - "Can only read byte ranges from Zarr v3 stores which implement the manifest storage transformer ZEP." - ) - - attrs = metadata.pop("attributes") - dim_names = metadata.pop("dimension_names") - - chunk_shape = tuple(metadata["chunk_grid"]["configuration"]["chunk_shape"]) - shape = tuple(metadata["shape"]) - zarr_format = metadata["zarr_format"] - - if metadata["fill_value"] is None: - raise ValueError( - "fill_value must be specified https://zarr-specs.readthedocs.io/en/latest/v3/core/v3.0.html#fill-value" - ) - else: - fill_value = metadata["fill_value"] - - all_codecs = [ - codec - for codec in metadata["codecs"] - if codec["name"] not in ("transpose", "bytes") - ] - compressor, *filters = [ - _configurable_to_num_codec_config(_filter) for _filter in all_codecs - ] - zarray = ZArray( - chunks=chunk_shape, - compressor=compressor, - dtype=np.dtype(metadata["data_type"]), - fill_value=fill_value, - filters=filters or None, - order="C", - shape=shape, - zarr_format=zarr_format, - ) - - return zarray, dim_names, attrs - - -def _configurable_to_num_codec_config(configurable: dict) -> dict: - """ - Convert a zarr v3 configurable into a numcodecs codec. - """ - configurable_copy = configurable.copy() - codec_id = configurable_copy.pop("name") - configuration = configurable_copy.pop("configuration") - return numcodecs.get_codec({"id": codec_id, **configuration}).get_config() diff --git a/virtualizarr/readers/zarr_v3.py b/virtualizarr/readers/zarr_v3.py new file mode 100644 index 00000000..a1f4ab7d --- /dev/null +++ b/virtualizarr/readers/zarr_v3.py @@ -0,0 +1,156 @@ +import json +from pathlib import Path +from typing import Iterable, Mapping, Optional + +import numcodecs +import numpy as np +from xarray import Dataset +from xarray.core.indexes import Index +from xarray.core.variable import Variable + +from virtualizarr.manifests import ChunkManifest, ManifestArray +from virtualizarr.readers.common import VirtualBackend, separate_coords +from virtualizarr.zarr import ZArray + + +class ZarrV3VirtualBackend(VirtualBackend): + @staticmethod + def open_virtual_dataset( + filepath: str, + group: str | None = None, + drop_variables: Iterable[str] | None = None, + loadable_variables: Iterable[str] | None = None, + decode_times: bool | None = None, + indexes: Mapping[str, Index] | None = None, + reader_options: Optional[dict] = None, + ) -> Dataset: + """ + Read a Zarr v3 store containing chunk manifests and return an xarray Dataset containing virtualized arrays. + + This is experimental - chunk manifests are not part of the Zarr v3 Spec. + """ + storepath = Path(filepath) + + if group: + raise NotImplementedError() + + if loadable_variables or decode_times: + raise NotImplementedError() + + if reader_options: + raise NotImplementedError() + + drop_vars: list[str] + if drop_variables is None: + drop_vars = [] + else: + drop_vars = list(drop_variables) + + ds_attrs = attrs_from_zarr_group_json(storepath / "zarr.json") + coord_names = ds_attrs.pop("coordinates", []) + + # TODO recursive glob to create a datatree + # Note: this .is_file() check should not be necessary according to the pathlib docs, but tests fail on github CI without it + # see https://github.com/TomNicholas/VirtualiZarr/pull/45#discussion_r1547833166 + all_paths = storepath.glob("*/") + directory_paths = [p for p in all_paths if not p.is_file()] + + vars = {} + for array_dir in directory_paths: + var_name = array_dir.name + if var_name in drop_vars: + break + + zarray, dim_names, attrs = metadata_from_zarr_json(array_dir / "zarr.json") + manifest = ChunkManifest.from_zarr_json(str(array_dir / "manifest.json")) + + marr = ManifestArray(chunkmanifest=manifest, zarray=zarray) + var = Variable(data=marr, dims=dim_names, attrs=attrs) + vars[var_name] = var + + if indexes is None: + raise NotImplementedError() + elif indexes != {}: + # TODO allow manual specification of index objects + raise NotImplementedError() + else: + indexes = dict(**indexes) # for type hinting: to allow mutation + + data_vars, coords = separate_coords(vars, indexes, coord_names) + + ds = Dataset( + data_vars, + coords=coords, + # indexes={}, # TODO should be added in a later version of xarray + attrs=ds_attrs, + ) + + return ds + + +def attrs_from_zarr_group_json(filepath: Path) -> dict: + with open(filepath) as metadata_file: + attrs = json.load(metadata_file) + return attrs["attributes"] + + +def metadata_from_zarr_json(filepath: Path) -> tuple[ZArray, list[str], dict]: + with open(filepath) as metadata_file: + metadata = json.load(metadata_file) + + if { + "name": "chunk-manifest-json", + "configuration": { + "manifest": "./manifest.json", + }, + } not in metadata.get("storage_transformers", []): + raise ValueError( + "Can only read byte ranges from Zarr v3 stores which implement the manifest storage transformer ZEP." + ) + + attrs = metadata.pop("attributes") + dim_names = metadata.pop("dimension_names") + + chunk_shape = tuple(metadata["chunk_grid"]["configuration"]["chunk_shape"]) + shape = tuple(metadata["shape"]) + zarr_format = metadata["zarr_format"] + + if metadata["fill_value"] is None: + raise ValueError( + "fill_value must be specified https://zarr-specs.readthedocs.io/en/latest/v3/core/v3.0.html#fill-value" + ) + else: + fill_value = metadata["fill_value"] + + all_codecs = [ + codec + for codec in metadata["codecs"] + if codec["name"] not in ("transpose", "bytes") + ] + compressor, *filters = [ + _configurable_to_num_codec_config(_filter) for _filter in all_codecs + ] + zarray = ZArray( + chunks=chunk_shape, + compressor=compressor, + dtype=np.dtype(metadata["data_type"]), + fill_value=fill_value, + filters=filters or None, + order="C", + shape=shape, + zarr_format=zarr_format, + ) + + return zarray, dim_names, attrs + + +def _configurable_to_num_codec_config(configurable: dict) -> dict: + """ + Convert a zarr v3 configurable into a numcodecs codec. + """ + configurable_copy = configurable.copy() + codec_id = configurable_copy.pop("name") + if codec_id.startswith("numcodecs."): + codec_id = codec_id[len("numcodecs.") :] + configuration = configurable_copy.pop("configuration") + return numcodecs.get_codec({"id": codec_id, **configuration}).get_config() diff --git a/virtualizarr/tests/__init__.py b/virtualizarr/tests/__init__.py index 7df13d10..70f613ce 100644 --- a/virtualizarr/tests/__init__.py +++ b/virtualizarr/tests/__init__.py @@ -33,7 +33,9 @@ def _importorskip( has_astropy, requires_astropy = _importorskip("astropy") +has_kerchunk, requires_kerchunk = _importorskip("kerchunk") has_s3fs, requires_s3fs = _importorskip("s3fs") +has_scipy, requires_scipy = _importorskip("scipy") has_tifffile, requires_tifffile = _importorskip("tifffile") diff --git a/virtualizarr/tests/test_backend.py b/virtualizarr/tests/test_backend.py index 3feab262..43a6bbd8 100644 --- a/virtualizarr/tests/test_backend.py +++ b/virtualizarr/tests/test_backend.py @@ -1,7 +1,6 @@ from collections.abc import Mapping from unittest.mock import patch -import fsspec import numpy as np import pytest import xarray as xr @@ -10,12 +9,19 @@ from xarray.core.indexes import Index from virtualizarr import open_virtual_dataset -from virtualizarr.backend import FileType +from virtualizarr.backend import FileType, automatically_determine_filetype from virtualizarr.manifests import ManifestArray -from virtualizarr.readers.kerchunk import _automatically_determine_filetype -from virtualizarr.tests import has_astropy, has_tifffile, network, requires_s3fs +from virtualizarr.tests import ( + has_astropy, + has_tifffile, + network, + requires_kerchunk, + requires_s3fs, + requires_scipy, +) +@requires_scipy def test_automatically_determine_filetype_netcdf3_netcdf4(): # test the NetCDF3 vs NetCDF4 automatic file type selection @@ -27,10 +33,10 @@ def test_automatically_determine_filetype_netcdf3_netcdf4(): ds.to_netcdf(netcdf3_file_path, engine="scipy", format="NETCDF3_CLASSIC") ds.to_netcdf(netcdf4_file_path, engine="h5netcdf") - assert FileType("netcdf3") == _automatically_determine_filetype( + assert FileType("netcdf3") == automatically_determine_filetype( filepath=netcdf3_file_path ) - assert FileType("hdf5") == _automatically_determine_filetype( + assert FileType("hdf5") == automatically_determine_filetype( filepath=netcdf4_file_path ) @@ -49,7 +55,7 @@ def test_valid_filetype_bytes(tmp_path, filetype, headerbytes): filepath = tmp_path / "file.abc" with open(filepath, "wb") as f: f.write(headerbytes) - assert FileType(filetype) == _automatically_determine_filetype(filepath=filepath) + assert FileType(filetype) == automatically_determine_filetype(filepath=filepath) def test_notimplemented_filetype(tmp_path): @@ -58,7 +64,7 @@ def test_notimplemented_filetype(tmp_path): with open(filepath, "wb") as f: f.write(headerbytes) with pytest.raises(NotImplementedError): - _automatically_determine_filetype(filepath=filepath) + automatically_determine_filetype(filepath=filepath) def test_FileType(): @@ -75,6 +81,7 @@ def test_FileType(): FileType(None) +@requires_kerchunk class TestOpenVirtualDatasetIndexes: def test_no_indexes(self, netcdf4_file): vds = open_virtual_dataset(netcdf4_file, indexes={}) @@ -105,6 +112,7 @@ def index_mappings_equal(indexes1: Mapping[str, Index], indexes2: Mapping[str, I return True +@requires_kerchunk def test_cftime_index(tmpdir): """Ensure a virtual dataset contains the same indexes as an Xarray dataset""" # Note: Test was created to debug: https://github.com/zarr-developers/VirtualiZarr/issues/168 @@ -130,6 +138,7 @@ def test_cftime_index(tmpdir): assert vds.attrs == ds.attrs +@requires_kerchunk class TestOpenVirtualDatasetAttrs: def test_drop_array_dimensions(self, netcdf4_file): # regression test for GH issue #150 @@ -237,6 +246,8 @@ def test_read_from_url(self, filetype, url): assert isinstance(vds, xr.Dataset) def test_virtualizarr_vs_local_nisar(self): + import fsspec + # Open group directly from locally cached file with xarray url = "https://nisar.asf.earthdatacloud.nasa.gov/NISAR-SAMPLE-DATA/GCOV/ALOS1_Rosamond_20081012/NISAR_L2_PR_GCOV_001_005_A_219_4020_SHNA_A_20081012T060910_20081012T060926_P01101_F_N_J_001.h5" tmpfile = fsspec.open_local( @@ -266,6 +277,7 @@ def test_virtualizarr_vs_local_nisar(self): xrt.assert_equal(dsXR, dsV) +@requires_kerchunk class TestLoadVirtualDataset: def test_loadable_variables(self, netcdf4_file): vars_to_load = ["air", "time"] @@ -293,9 +305,9 @@ def test_explicit_filetype(self, netcdf4_file): open_virtual_dataset(netcdf4_file, filetype="grib") def test_group_kwarg(self, hdf5_groups_file): - with pytest.raises(NotImplementedError, match="Nested groups"): + with pytest.raises(ValueError, match="Multiple HDF Groups found"): open_virtual_dataset(hdf5_groups_file) - with pytest.raises(KeyError, match="doesn't exist"): + with pytest.raises(ValueError, match="not found in"): open_virtual_dataset(hdf5_groups_file, group="doesnt_exist") vars_to_load = ["air", "time"] @@ -313,19 +325,20 @@ def test_group_kwarg(self, hdf5_groups_file): if name in vars_to_load: xrt.assert_identical(vds.variables[name], full_ds.variables[name]) - @patch("virtualizarr.readers.hdf.virtual_vars_from_hdf") + @pytest.mark.xfail(reason="patches a function which no longer exists") + @patch("virtualizarr.translators.kerchunk.read_kerchunk_references_from_file") def test_open_virtual_dataset_passes_expected_args( - self, mock_read_hdf, netcdf4_file + self, mock_read_kerchunk, netcdf4_file ): reader_options = {"option1": "value1", "option2": "value2"} open_virtual_dataset(netcdf4_file, indexes={}, reader_options=reader_options) args = { - "path": netcdf4_file, + "filepath": netcdf4_file, + "filetype": None, "group": None, - "drop_variables": [], "reader_options": reader_options, } - mock_read_hdf.assert_called_once_with(**args) + mock_read_kerchunk.assert_called_once_with(**args) def test_open_dataset_with_empty(self, hdf5_empty, tmpdir): vds = open_virtual_dataset(hdf5_empty) @@ -336,3 +349,79 @@ def test_open_dataset_with_scalar(self, hdf5_scalar, tmpdir): vds = open_virtual_dataset(hdf5_scalar) assert vds.scalar.dims == () assert vds.scalar.attrs == {"scalar": "true"} + + +@requires_kerchunk +@pytest.mark.parametrize( + "reference_format", + ["json", "parquet", "invalid"], +) +def test_open_virtual_dataset_existing_kerchunk_refs( + tmp_path, netcdf4_virtual_dataset, reference_format +): + example_reference_dict = netcdf4_virtual_dataset.virtualize.to_kerchunk( + format="dict" + ) + + if reference_format == "invalid": + # Test invalid file format leads to ValueError + ref_filepath = tmp_path / "ref.csv" + with open(ref_filepath.as_posix(), mode="w") as of: + of.write("tmp") + + with pytest.raises(ValueError): + open_virtual_dataset( + filepath=ref_filepath.as_posix(), filetype="kerchunk", indexes={} + ) + + else: + # Test valid json and parquet reference formats + + if reference_format == "json": + ref_filepath = tmp_path / "ref.json" + + import ujson + + with open(ref_filepath, "w") as json_file: + ujson.dump(example_reference_dict, json_file) + + if reference_format == "parquet": + from kerchunk.df import refs_to_dataframe + + ref_filepath = tmp_path / "ref.parquet" + refs_to_dataframe(fo=example_reference_dict, url=ref_filepath.as_posix()) + + vds = open_virtual_dataset( + filepath=ref_filepath.as_posix(), filetype="kerchunk", indexes={} + ) + + # Inconsistent results! https://github.com/TomNicholas/VirtualiZarr/pull/73#issuecomment-2040931202 + # assert vds.virtualize.to_kerchunk(format='dict') == example_reference_dict + refs = vds.virtualize.to_kerchunk(format="dict") + expected_refs = netcdf4_virtual_dataset.virtualize.to_kerchunk(format="dict") + assert refs["refs"]["air/0.0.0"] == expected_refs["refs"]["air/0.0.0"] + assert refs["refs"]["lon/0"] == expected_refs["refs"]["lon/0"] + assert refs["refs"]["lat/0"] == expected_refs["refs"]["lat/0"] + assert refs["refs"]["time/0"] == expected_refs["refs"]["time/0"] + + assert list(vds) == list(netcdf4_virtual_dataset) + assert set(vds.coords) == set(netcdf4_virtual_dataset.coords) + assert set(vds.variables) == set(netcdf4_virtual_dataset.variables) + + +@requires_kerchunk +def test_notimplemented_read_inline_refs(tmp_path, netcdf4_inlined_ref): + # For now, we raise a NotImplementedError if we read existing references that have inlined data + # https://github.com/zarr-developers/VirtualiZarr/pull/251#pullrequestreview-2361916932 + + ref_filepath = tmp_path / "ref.json" + + import ujson + + with open(ref_filepath, "w") as json_file: + ujson.dump(netcdf4_inlined_ref, json_file) + + with pytest.raises(NotImplementedError): + open_virtual_dataset( + filepath=ref_filepath.as_posix(), filetype="kerchunk", indexes={} + ) diff --git a/virtualizarr/tests/test_integration.py b/virtualizarr/tests/test_integration.py index 37a6c10e..63158777 100644 --- a/virtualizarr/tests/test_integration.py +++ b/virtualizarr/tests/test_integration.py @@ -4,11 +4,53 @@ import xarray.testing as xrt from virtualizarr import open_virtual_dataset -from virtualizarr.manifests.array import ManifestArray -from virtualizarr.manifests.manifest import ChunkManifest +from virtualizarr.manifests import ChunkManifest, ManifestArray +from virtualizarr.tests import requires_kerchunk +from virtualizarr.translators.kerchunk import ( + dataset_from_kerchunk_refs, + find_var_names, +) from virtualizarr.zarr import ZArray +def test_kerchunk_roundtrip_in_memory_no_concat(): + # Set up example xarray dataset + chunks_dict = { + "0.0": {"path": "foo.nc", "offset": 100, "length": 100}, + "0.1": {"path": "foo.nc", "offset": 200, "length": 100}, + } + manifest = ChunkManifest(entries=chunks_dict) + marr = ManifestArray( + zarray=dict( + shape=(2, 4), + dtype=np.dtype(" Dataset: + arr = ManifestArray( + chunkmanifest=ChunkManifest( + entries={"0.0": dict(path="/test.nc", offset=6144, length=48)} + ), + zarray=dict( + shape=(2, 3), + dtype=np.dtype(" "IcechunkStore": + from icechunk import IcechunkStore, StorageConfig + + storage = StorageConfig.filesystem(str(tmpdir)) + + # TODO if icechunk exposed a synchronous version of .open then we wouldn't need to use asyncio.run here + # TODO is this the correct mode to use? + store = IcechunkStore.create(storage=storage, mode="w") + + # TODO instead yield store then store.close() ?? + return store + + +def test_write_new_virtual_variable( + icechunk_filestore: "IcechunkStore", vds_with_manifest_arrays: Dataset +): + vds = vds_with_manifest_arrays + + dataset_to_icechunk(vds, icechunk_filestore) + + # check attrs + root_group = group(store=icechunk_filestore) + assert isinstance(root_group, Group) + assert root_group.attrs == {"something": 0} + + # TODO check against vds, then perhaps parametrize? + + # check array exists + assert "a" in root_group + arr = root_group["a"] + assert isinstance(arr, Array) + + # check array metadata + # TODO why doesn't a .zarr_format or .version attribute exist on zarr.Array? + # assert arr.zarr_format == 3 + assert arr.shape == (2, 3) + assert arr.chunks == (2, 3) + assert arr.dtype == np.dtype(" Dataset: - arr = ManifestArray( - chunkmanifest=ChunkManifest( - entries={"0.0": dict(path="test.nc", offset=6144, length=48)} - ), - zarray=dict( - shape=(2, 3), - dtype=np.dtype(" bool: """ Several metadata attributes in ZarrV3 use a dictionary with keys "name" : str and "configuration" : dict diff --git a/virtualizarr/tests/test_xarray.py b/virtualizarr/tests/test_xarray.py index 9db6e3a2..062eda5f 100644 --- a/virtualizarr/tests/test_xarray.py +++ b/virtualizarr/tests/test_xarray.py @@ -4,6 +4,7 @@ from virtualizarr import open_virtual_dataset from virtualizarr.manifests import ChunkManifest, ManifestArray +from virtualizarr.tests import requires_kerchunk from virtualizarr.zarr import ZArray @@ -222,6 +223,7 @@ def test_concat_dim_coords_along_existing_dim(self): assert result.data.zarray.zarr_format == zarray.zarr_format +@requires_kerchunk class TestCombineUsingIndexes: def test_combine_by_coords(self, netcdf4_files): filepath1, filepath2 = netcdf4_files @@ -255,6 +257,7 @@ def test_combine_by_coords_keeping_manifestarrays(self, netcdf4_files): assert isinstance(combined_vds["lon"].data, ManifestArray) +@requires_kerchunk class TestRenamePaths: def test_rename_to_str(self, netcdf4_file): vds = open_virtual_dataset(netcdf4_file, indexes={}) diff --git a/virtualizarr/translators/__init__.py b/virtualizarr/translators/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/virtualizarr/translators/kerchunk.py b/virtualizarr/translators/kerchunk.py new file mode 100644 index 00000000..f2d2f5df --- /dev/null +++ b/virtualizarr/translators/kerchunk.py @@ -0,0 +1,223 @@ +from typing import Any, Mapping, MutableMapping, cast + +from xarray import Dataset +from xarray.core.indexes import Index +from xarray.core.variable import Variable + +from virtualizarr.manifests import ChunkManifest, ManifestArray +from virtualizarr.readers.common import separate_coords +from virtualizarr.types.kerchunk import ( + KerchunkArrRefs, + KerchunkStoreRefs, +) +from virtualizarr.zarr import ZArray, ZAttrs, determine_chunk_grid_shape + + +def virtual_vars_and_metadata_from_kerchunk_refs( + vds_refs: KerchunkStoreRefs, + loadable_variables, + drop_variables, + virtual_array_class=ManifestArray, +) -> tuple[Mapping[str, Variable], dict[str, Any], list[str]]: + """ + Parses all useful information from a set kerchunk references (for a single group). + """ + + virtual_vars = virtual_vars_from_kerchunk_refs( + vds_refs, + drop_variables=drop_variables + loadable_variables, + virtual_array_class=virtual_array_class, + ) + ds_attrs = fully_decode_arr_refs(vds_refs["refs"]).get(".zattrs", {}) + coord_names = ds_attrs.pop("coordinates", []) + + return virtual_vars, ds_attrs, coord_names + + +def extract_group(vds_refs: KerchunkStoreRefs, group: str | None) -> KerchunkStoreRefs: + """Extract only the part of the kerchunk reference dict that is relevant to a single HDF group""" + hdf_groups = [ + k.removesuffix(".zgroup") for k in vds_refs["refs"].keys() if ".zgroup" in k + ] + if len(hdf_groups) == 1: + return vds_refs + else: + if group is None: + raise ValueError( + f"Multiple HDF Groups found. Must specify group= keyword to select one of {hdf_groups}" + ) + else: + # Ensure supplied group kwarg is consistent with kerchunk keys + if not group.endswith("/"): + group += "/" + if group.startswith("/"): + group = group.removeprefix("/") + + if group not in hdf_groups: + raise ValueError(f'Group "{group}" not found in {hdf_groups}') + + # Filter by group prefix and remove prefix from all keys + groupdict = { + k.removeprefix(group): v + for k, v in vds_refs["refs"].items() + if k.startswith(group) + } + # Also remove group prefix from _ARRAY_DIMENSIONS + for k, v in groupdict.items(): + if isinstance(v, str): + groupdict[k] = v.replace("\\/", "/").replace(group, "") + + vds_refs["refs"] = groupdict + + return KerchunkStoreRefs(vds_refs) + + +def virtual_vars_from_kerchunk_refs( + refs: KerchunkStoreRefs, + drop_variables: list[str] | None = None, + virtual_array_class=ManifestArray, +) -> dict[str, Variable]: + """ + Translate a store-level kerchunk reference dict into aaset of xarray Variables containing virtualized arrays. + + Parameters + ---------- + drop_variables: list[str], default is None + Variables in the file to drop before returning. + virtual_array_class + Virtual array class to use to represent the references to the chunks in each on-disk array. + Currently can only be ManifestArray, but once VirtualZarrArray is implemented the default should be changed to that. + """ + + var_names = find_var_names(refs) + if drop_variables is None: + drop_variables = [] + var_names_to_keep = [ + var_name for var_name in var_names if var_name not in drop_variables + ] + + vars = { + var_name: variable_from_kerchunk_refs(refs, var_name, virtual_array_class) + for var_name in var_names_to_keep + } + return vars + + +def dataset_from_kerchunk_refs( + refs: KerchunkStoreRefs, + drop_variables: list[str] = [], + virtual_array_class: type = ManifestArray, + indexes: MutableMapping[str, Index] | None = None, +) -> Dataset: + """ + Translate a store-level kerchunk reference dict into an xarray Dataset containing virtualized arrays. + + drop_variables: list[str], default is None + Variables in the file to drop before returning. + virtual_array_class + Virtual array class to use to represent the references to the chunks in each on-disk array. + Currently can only be ManifestArray, but once VirtualZarrArray is implemented the default should be changed to that. + """ + + vars = virtual_vars_from_kerchunk_refs(refs, drop_variables, virtual_array_class) + ds_attrs = fully_decode_arr_refs(refs["refs"]).get(".zattrs", {}) + coord_names = ds_attrs.pop("coordinates", []) + + if indexes is None: + indexes = {} + data_vars, coords = separate_coords(vars, indexes, coord_names) + + vds = Dataset( + data_vars, + coords=coords, + # indexes={}, # TODO should be added in a later version of xarray + attrs=ds_attrs, + ) + + return vds + + +def variable_from_kerchunk_refs( + refs: KerchunkStoreRefs, var_name: str, virtual_array_class +) -> Variable: + """Create a single xarray Variable by reading specific keys of a kerchunk references dict.""" + + arr_refs = extract_array_refs(refs, var_name) + chunk_dict, zarray, zattrs = parse_array_refs(arr_refs) + # we want to remove the _ARRAY_DIMENSIONS from the final variables' .attrs + dims = zattrs.pop("_ARRAY_DIMENSIONS") + if chunk_dict: + manifest = ChunkManifest._from_kerchunk_chunk_dict(chunk_dict) + varr = virtual_array_class(zarray=zarray, chunkmanifest=manifest) + elif len(zarray.shape) != 0: + # empty variables don't have physical chunks, but zarray shows that the variable + # is at least 1D + shape = determine_chunk_grid_shape(zarray.shape, zarray.chunks) + manifest = ChunkManifest(entries={}, shape=shape) + varr = virtual_array_class(zarray=zarray, chunkmanifest=manifest) + else: + # This means we encountered a scalar variable of dimension 0, + # very likely that it actually has no numeric value and its only purpose + # is to communicate dataset attributes. + varr = zarray.fill_value + + return Variable(data=varr, dims=dims, attrs=zattrs) + + +def find_var_names(ds_reference_dict: KerchunkStoreRefs) -> list[str]: + """Find the names of zarr variables in this store/group.""" + + refs = ds_reference_dict["refs"] + found_var_names = {key.split("/")[0] for key in refs.keys() if "/" in key} + + return list(found_var_names) + + +def extract_array_refs( + ds_reference_dict: KerchunkStoreRefs, var_name: str +) -> KerchunkArrRefs: + """Extract only the part of the kerchunk reference dict that is relevant to this one zarr array""" + + found_var_names = find_var_names(ds_reference_dict) + + refs = ds_reference_dict["refs"] + if var_name in found_var_names: + # TODO these function probably have more loops in them than they need to... + + arr_refs = { + key.split("/")[1]: refs[key] + for key in refs.keys() + if var_name == key.split("/")[0] + } + + return fully_decode_arr_refs(arr_refs) + + else: + raise KeyError( + f"Could not find zarr array variable name {var_name}, only {found_var_names}" + ) + + +def parse_array_refs( + arr_refs: KerchunkArrRefs, +) -> tuple[dict, ZArray, ZAttrs]: + zarray = ZArray.from_kerchunk_refs(arr_refs.pop(".zarray")) + zattrs = arr_refs.pop(".zattrs", {}) + chunk_dict = arr_refs + + return chunk_dict, zarray, zattrs + + +def fully_decode_arr_refs(d: dict) -> KerchunkArrRefs: + """ + Only have to do this because kerchunk.SingleHdf5ToZarr apparently doesn't bother converting .zarray and .zattrs contents to dicts, see https://github.com/fsspec/kerchunk/issues/415 . + """ + import ujson + + sanitized = d.copy() + for k, v in d.items(): + if k.startswith("."): + # ensure contents of .zattrs and .zarray are python dictionaries + sanitized[k] = ujson.loads(v) + + return cast(KerchunkArrRefs, sanitized) diff --git a/virtualizarr/utils.py b/virtualizarr/utils.py index 092ddd25..c9260aa6 100644 --- a/virtualizarr/utils.py +++ b/virtualizarr/utils.py @@ -1,7 +1,7 @@ from __future__ import annotations import io -from typing import TYPE_CHECKING, Optional, Union +from typing import TYPE_CHECKING, Iterable, Optional, Union if TYPE_CHECKING: import fsspec.core @@ -13,42 +13,76 @@ ] -def _fsspec_openfile_from_filepath( - *, - filepath: str, - reader_options: Optional[dict] = None, -) -> OpenFileType: - """Converts input filepath to fsspec openfile object. +from dataclasses import dataclass, field + + +@dataclass +class _FsspecFSFromFilepath: + """Class to create fsspec Filesystem from input filepath. Parameters ---------- filepath : str Input filepath reader_options : dict, optional - Dict containing kwargs to pass to file opener, by default {} - - Returns - ------- - OpenFileType - An open file-like object, specific to the protocol supplied in filepath. + dict containing kwargs to pass to file opener, by default {} + fs : Option | None + The fsspec filesystem object, created in __post_init__ - Raises - ------ - NotImplementedError - Raises a Not Implemented Error if filepath protocol is not supported. """ - import fsspec - from upath import UPath + filepath: str + reader_options: Optional[dict] = field(default_factory=dict) + fs: fsspec.AbstractFileSystem = field(init=False) + + def open_file(self) -> OpenFileType: + """Calls `.open` on fsspec.Filesystem instantiation using self.filepath as an input. + + Returns + ------- + OpenFileType + file opened with fsspec + """ + return self.fs.open(self.filepath) + + def read_bytes(self, bytes: int) -> bytes: + with self.open_file() as of: + return of.read(bytes) + + def __post_init__(self) -> None: + """Initialize the fsspec filesystem object""" + import fsspec + from upath import UPath + + universal_filepath = UPath(self.filepath) + protocol = universal_filepath.protocol + + self.reader_options = self.reader_options or {} + storage_options = self.reader_options.get("storage_options", {}) # type: ignore + + self.fs = fsspec.filesystem(protocol, **storage_options) - universal_filepath = UPath(filepath) - protocol = universal_filepath.protocol - if reader_options is None: - reader_options = {} +def check_for_collisions( + drop_variables: Iterable[str] | None, + loadable_variables: Iterable[str] | None, +) -> tuple[list[str], list[str]]: + if drop_variables is None: + drop_variables = [] + elif isinstance(drop_variables, str): + drop_variables = [drop_variables] + else: + drop_variables = list(drop_variables) - storage_options = reader_options.get("storage_options", {}) # type: ignore + if loadable_variables is None: + loadable_variables = [] + elif isinstance(loadable_variables, str): + loadable_variables = [loadable_variables] + else: + loadable_variables = list(loadable_variables) - fpath = fsspec.filesystem(protocol, **storage_options).open(filepath) + common = set(drop_variables).intersection(set(loadable_variables)) + if common: + raise ValueError(f"Cannot both load and drop variables {common}") - return fpath + return drop_variables, loadable_variables diff --git a/virtualizarr/writers/icechunk.py b/virtualizarr/writers/icechunk.py new file mode 100644 index 00000000..6dadbc08 --- /dev/null +++ b/virtualizarr/writers/icechunk.py @@ -0,0 +1,204 @@ +from typing import TYPE_CHECKING, cast + +import numpy as np +from xarray import Dataset +from xarray.backends.zarr import encode_zarr_attr_value +from xarray.core.variable import Variable + +from virtualizarr.manifests import ChunkManifest, ManifestArray +from virtualizarr.zarr import encode_dtype + +if TYPE_CHECKING: + from icechunk import IcechunkStore # type: ignore[import-not-found] + from zarr import Group # type: ignore + + +VALID_URI_PREFIXES = { + "s3://", + # "gs://", # https://github.com/earth-mover/icechunk/issues/265 + # "azure://", # https://github.com/earth-mover/icechunk/issues/266 + # "r2://", + # "cos://", + # "minio://", + "file:///", +} + + +def dataset_to_icechunk(ds: Dataset, store: "IcechunkStore") -> None: + """ + Write an xarray dataset whose variables wrap ManifestArrays to an Icechunk store. + + Currently requires all variables to be backed by ManifestArray objects. + + Parameters + ---------- + ds: xr.Dataset + store: IcechunkStore + """ + try: + from icechunk import IcechunkStore # type: ignore[import-not-found] + from zarr import Group # type: ignore[import-untyped] + except ImportError: + raise ImportError( + "The 'icechunk' and 'zarr' version 3 libraries are required to use this function" + ) + + if not isinstance(store, IcechunkStore): + raise TypeError(f"expected type IcechunkStore, but got type {type(store)}") + + if not store.supports_writes: + raise ValueError("supplied store does not support writes") + + # TODO only supports writing to the root group currently + # TODO pass zarr_format kwarg? + root_group = Group.from_store(store=store) + + # TODO this is Frozen, the API for setting attributes must be something else + # root_group.attrs = ds.attrs + # for k, v in ds.attrs.items(): + # root_group.attrs[k] = encode_zarr_attr_value(v) + + return write_variables_to_icechunk_group( + ds.variables, + ds.attrs, + store=store, + group=root_group, + ) + + +def write_variables_to_icechunk_group( + variables, + attrs, + store, + group, +): + virtual_variables = { + name: var + for name, var in variables.items() + if isinstance(var.data, ManifestArray) + } + + loadable_variables = { + name: var for name, var in variables.items() if name not in virtual_variables + } + + # First write all the non-virtual variables + # NOTE: We set the attributes of the group before writing the dataset because the dataset + # will overwrite the root group's attributes with the dataset's attributes. We take advantage + # of xarrays zarr integration to ignore having to format the attributes ourselves. + ds = Dataset(loadable_variables, attrs=attrs) + ds.to_zarr(store, zarr_format=3, consolidated=False, mode="a") + + # Then finish by writing the virtual variables to the same group + for name, var in virtual_variables.items(): + write_virtual_variable_to_icechunk( + store=store, + group=group, + name=name, + var=var, + ) + + +def write_variable_to_icechunk( + store: "IcechunkStore", + group: "Group", + name: str, + var: Variable, +) -> None: + """Write a single (possibly virtual) variable into an icechunk store""" + if isinstance(var.data, ManifestArray): + write_virtual_variable_to_icechunk( + store=store, + group=group, + name=name, + var=var, + ) + else: + raise ValueError( + "Cannot write non-virtual variables as virtual variables to Icechunk stores" + ) + + +def write_virtual_variable_to_icechunk( + store: "IcechunkStore", + group: "Group", + name: str, + var: Variable, +) -> None: + """Write a single virtual variable into an icechunk store""" + ma = cast(ManifestArray, var.data) + zarray = ma.zarray + + # creates array if it doesn't already exist + arr = group.require_array( + name=name, + shape=zarray.shape, + chunk_shape=zarray.chunks, + dtype=encode_dtype(zarray.dtype), + codecs=zarray._v3_codec_pipeline(), + dimension_names=var.dims, + fill_value=zarray.fill_value, + # TODO fill_value? + ) + + # TODO it would be nice if we could assign directly to the .attrs property + for k, v in var.attrs.items(): + arr.attrs[k] = encode_zarr_attr_value(v) + arr.attrs["_ARRAY_DIMENSIONS"] = encode_zarr_attr_value(var.dims) + + _encoding_keys = {"_FillValue", "missing_value", "scale_factor", "add_offset"} + for k, v in var.encoding.items(): + if k in _encoding_keys: + arr.attrs[k] = encode_zarr_attr_value(v) + + write_manifest_virtual_refs( + store=store, + group=group, + arr_name=name, + manifest=ma.manifest, + ) + + +def write_manifest_virtual_refs( + store: "IcechunkStore", + group: "Group", + arr_name: str, + manifest: ChunkManifest, +) -> None: + """Write all the virtual references for one array manifest at once.""" + + key_prefix = f"{group.name}{arr_name}" + + # loop over every reference in the ChunkManifest for that array + # TODO inefficient: this should be replaced with something that sets all (new) references for the array at once + # but Icechunk need to expose a suitable API first + it = np.nditer( + [manifest._paths, manifest._offsets, manifest._lengths], # type: ignore[arg-type] + flags=[ + "refs_ok", + "multi_index", + "c_index", + ], + op_flags=[["readonly"]] * 3, # type: ignore + ) + for path, offset, length in it: + index = it.multi_index + chunk_key = "/".join(str(i) for i in index) + + # set each reference individually + store.set_virtual_ref( + # TODO it would be marginally neater if I could pass the group and name as separate args + key=f"{key_prefix}/c/{chunk_key}", # should be of form 'group/arr_name/c/0/1/2', where c stands for chunks + location=as_file_uri(path.item()), + offset=offset.item(), + length=length.item(), + ) + + +def as_file_uri(path): + # TODO a more robust solution to this requirement exists in https://github.com/zarr-developers/VirtualiZarr/pull/243 + if not any(path.startswith(prefix) for prefix in VALID_URI_PREFIXES) and path != "": + # assume path is local + return f"file://{path}" + else: + return path diff --git a/virtualizarr/writers/kerchunk.py b/virtualizarr/writers/kerchunk.py index 6b4b55f8..3a0bd27b 100644 --- a/virtualizarr/writers/kerchunk.py +++ b/virtualizarr/writers/kerchunk.py @@ -3,7 +3,6 @@ from typing import cast import numpy as np -import ujson # type: ignore from xarray import Dataset from xarray.coding.times import CFDatetimeCoder from xarray.core.variable import Variable @@ -30,6 +29,8 @@ def dataset_to_kerchunk_refs(ds: Dataset) -> KerchunkStoreRefs: Create a dictionary containing kerchunk-style store references from a single xarray.Dataset (which wraps ManifestArray objects). """ + import ujson + all_arr_refs = {} for var_name, var in ds.variables.items(): arr_refs = variable_to_kerchunk_arr_refs(var, str(var_name)) diff --git a/virtualizarr/writers/zarr.py b/virtualizarr/writers/zarr.py index b3dc8f1a..b9529ad5 100644 --- a/virtualizarr/writers/zarr.py +++ b/virtualizarr/writers/zarr.py @@ -80,7 +80,6 @@ def to_zarr_json(var: Variable, array_dir: Path) -> None: def zarr_v3_array_metadata(zarray: ZArray, dim_names: list[str], attrs: dict) -> dict: """Construct a v3-compliant metadata dict from v2 zarray + information stored on the xarray variable.""" # TODO it would be nice if we could use the zarr-python metadata.ArrayMetadata classes to do this conversion for us - metadata = zarray.dict() # adjust to match v3 spec @@ -95,7 +94,7 @@ def zarr_v3_array_metadata(zarray: ZArray, dim_names: list[str], attrs: dict) -> "name": "default", "configuration": {"separator": "/"}, } - metadata["codecs"] = zarray._v3_codec_pipeline() + metadata["codecs"] = tuple(c.to_dict() for c in zarray._v3_codec_pipeline()) metadata.pop("filters") metadata.pop("compressor") metadata.pop("order") diff --git a/virtualizarr/zarr.py b/virtualizarr/zarr.py index f62b1269..e339a3f4 100644 --- a/virtualizarr/zarr.py +++ b/virtualizarr/zarr.py @@ -1,9 +1,7 @@ import dataclasses from typing import TYPE_CHECKING, Any, Literal, NewType, cast -import numcodecs import numpy as np -import ujson # type: ignore if TYPE_CHECKING: pass @@ -74,8 +72,11 @@ def codec(self) -> Codec: @classmethod def from_kerchunk_refs(cls, decoded_arr_refs_zarray) -> "ZArray": # coerce type of fill_value as kerchunk can be inconsistent with this + dtype = np.dtype(decoded_arr_refs_zarray["dtype"]) fill_value = decoded_arr_refs_zarray["fill_value"] - if fill_value is None or fill_value == "NaN" or fill_value == "nan": + if np.issubdtype(dtype, np.floating) and ( + fill_value is None or fill_value == "NaN" or fill_value == "nan" + ): fill_value = np.nan compressor = decoded_arr_refs_zarray["compressor"] @@ -86,7 +87,7 @@ def from_kerchunk_refs(cls, decoded_arr_refs_zarray) -> "ZArray": return ZArray( chunks=tuple(decoded_arr_refs_zarray["chunks"]), compressor=compressor, - dtype=np.dtype(decoded_arr_refs_zarray["dtype"]), + dtype=dtype, fill_value=fill_value, filters=decoded_arr_refs_zarray["filters"], order=decoded_arr_refs_zarray["order"], @@ -100,6 +101,8 @@ def dict(self) -> dict[str, Any]: return zarray_dict def to_kerchunk_json(self) -> str: + import ujson + zarray_dict = self.dict() if zarray_dict["fill_value"] is np.nan: zarray_dict["fill_value"] = None @@ -140,7 +143,7 @@ def replace( replacements["zarr_format"] = zarr_format return dataclasses.replace(self, **replacements) - def _v3_codec_pipeline(self) -> list: + def _v3_codec_pipeline(self) -> Any: """ VirtualiZarr internally uses the `filters`, `compressor`, and `order` attributes from zarr v2, but to create conformant zarr v3 metadata those 3 must be turned into `codecs` objects. @@ -153,44 +156,46 @@ def _v3_codec_pipeline(self) -> list: post_compressor: Iterable[BytesBytesCodec] #optional ``` """ - if self.filters: - filter_codecs_configs = [ - numcodecs.get_codec(filter).get_config() for filter in self.filters - ] - filters = [ - dict(name=codec.pop("id"), configuration=codec) - for codec in filter_codecs_configs - ] - else: - filters = [] + try: + from zarr.core.metadata.v3 import ( # type: ignore[import-untyped] + parse_codecs, + ) + except ImportError: + raise ImportError("zarr v3 is required to generate v3 codec pipelines") - # Noting here that zarr v3 has very few codecs specificed in the official spec, - # and that there are far more codecs in `numcodecs`. We take a gamble and assume - # that the codec names and configuration are simply mapped into zarrv3 "configurables". - if self.compressor: - compressor = [_num_codec_config_to_configurable(self.compressor)] - else: - compressor = [] + codec_configs = [] # https://zarr-specs.readthedocs.io/en/latest/v3/codecs/transpose/v1.0.html#transpose-codec-v1 # Either "C" or "F", defining the layout of bytes within each chunk of the array. # "C" means row-major order, i.e., the last dimension varies fastest; # "F" means column-major order, i.e., the first dimension varies fastest. - if self.order == "C": - order = tuple(range(len(self.shape))) - elif self.order == "F": + # For now, we only need transpose if the order is not "C" + if self.order == "F": order = tuple(reversed(range(len(self.shape)))) + transpose = dict(name="transpose", configuration=dict(order=order)) + codec_configs.append(transpose) - transpose = dict(name="transpose", configuration=dict(order=order)) # https://github.com/zarr-developers/zarr-python/pull/1944#issuecomment-2151994097 # "If no ArrayBytesCodec is supplied, we can auto-add a BytesCodec" bytes = dict( name="bytes", configuration={} ) # TODO need to handle endianess configuration + codec_configs.append(bytes) + + # Noting here that zarr v3 has very few codecs specificed in the official spec, + # and that there are far more codecs in `numcodecs`. We take a gamble and assume + # that the codec names and configuration are simply mapped into zarrv3 "configurables". + if self.filters: + codec_configs.extend( + [_num_codec_config_to_configurable(filter) for filter in self.filters] + ) + + if self.compressor: + codec_configs.append(_num_codec_config_to_configurable(self.compressor)) + + # convert the pipeline repr into actual v3 codec objects + codec_pipeline = parse_codecs(codec_configs) - # The order here is significant! - # [ArrayArray] -> ArrayBytes -> [BytesBytes] - codec_pipeline = [transpose, bytes] + compressor + filters return codec_pipeline @@ -208,9 +213,19 @@ def ceildiv(a: int, b: int) -> int: return -(a // -b) +def determine_chunk_grid_shape( + shape: tuple[int, ...], chunks: tuple[int, ...] +) -> tuple[int, ...]: + return tuple(ceildiv(length, chunksize) for length, chunksize in zip(shape, chunks)) + + def _num_codec_config_to_configurable(num_codec: dict) -> dict: """ Convert a numcodecs codec into a zarr v3 configurable. """ + if num_codec["id"].startswith("numcodecs."): + return num_codec + num_codec_copy = num_codec.copy() - return {"name": num_codec_copy.pop("id"), "configuration": num_codec_copy} + name = "numcodecs." + num_codec_copy.pop("id") + return {"name": name, "configuration": num_codec_copy}