diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 769f59e5..0550236f 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -46,10 +46,6 @@ jobs: conda env list conda list - - name: Type check - run: | - mypy virtualizarr - - name: Running Tests run: | python -m pytest ./virtualizarr --run-network-tests --cov=./ --cov-report=xml --verbose diff --git a/.github/workflows/typing.yml b/.github/workflows/typing.yml new file mode 100644 index 00000000..0540801b --- /dev/null +++ b/.github/workflows/typing.yml @@ -0,0 +1,38 @@ +name: Typing + +on: + push: + branches: [ "main" ] + paths-ignore: + - 'docs/**' + pull_request: + branches: [ "main" ] + paths-ignore: + - 'docs/**' + schedule: + - cron: "0 0 * * *" + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + mypy: + name: mypy + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.12' + + - name: Install deps + run: | + # We need to test optional dep to add all the library stubs + pip install -e '.[test]' + + - name: Type check + run: | + mypy virtualizarr diff --git a/.gitignore b/.gitignore index d360cfa4..d6720a7a 100644 --- a/.gitignore +++ b/.gitignore @@ -160,3 +160,4 @@ cython_debug/ #.idea/ virtualizarr/_version.py docs/generated/ +examples/ diff --git a/ci/doc.yml b/ci/doc.yml index 7d7e9224..ccc3ded6 100644 --- a/ci/doc.yml +++ b/ci/doc.yml @@ -13,4 +13,3 @@ dependencies: - "sphinx_design" - "sphinx_togglebutton" - "sphinx-autodoc-typehints" - - -e "..[test]" diff --git a/conftest.py b/conftest.py index 32b3581f..3af4bf06 100644 --- a/conftest.py +++ b/conftest.py @@ -33,6 +33,20 @@ def netcdf4_file(tmpdir): return filepath +@pytest.fixture +def netcdf4_virtual_dataset(netcdf4_file): + from virtualizarr import open_virtual_dataset + + return open_virtual_dataset(netcdf4_file, indexes={}) + + +@pytest.fixture +def netcdf4_inlined_ref(netcdf4_file): + from kerchunk.hdf import SingleHdf5ToZarr + + return SingleHdf5ToZarr(netcdf4_file, inline_threshold=1000).translate() + + @pytest.fixture def hdf5_groups_file(tmpdir): # Set up example xarray dataset diff --git a/docs/releases.rst b/docs/releases.rst index ec057807..622f01e0 100644 --- a/docs/releases.rst +++ b/docs/releases.rst @@ -8,6 +8,11 @@ v1.0.1 (unreleased) New Features ~~~~~~~~~~~~ + + +- Can open `kerchunk` reference files with ``open_virtual_dataset``. + (:pull:`251`, :pull:`186`) By `Raphael Hagen `_ & `Kristen Thyng `_. + - Adds defaults for `open_virtual_dataset_from_v3_store` in (:pull:`234`) By `Raphael Hagen `_. diff --git a/docs/usage.md b/docs/usage.md index 40071b8a..a0f9d058 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -421,6 +421,18 @@ Currently there are not yet any zarr v3 readers which understand the chunk manif This store can however be read by {py:func}`~virtualizarr.xarray.open_virtual_dataset`, by passing `filetype="zarr_v3"`. ``` +## Opening Kerchunk references as virtual datasets + +You can open existing Kerchunk `json` or `parquet` references as Virtualizarr virtual datasets. This may be useful for converting existing Kerchunk formatted references to storage formats like [Icechunk](https://icechunk.io/). + +```python + +vds = open_virtual_dataset('combined.json', format='kerchunk') +# or +vds = open_virtual_dataset('combined.parquet', format='kerchunk') + +``` + ## Rewriting existing manifests Sometimes it can be useful to rewrite the contents of an already-generated manifest or virtual dataset. diff --git a/pyproject.toml b/pyproject.toml index 6b0efe89..5af632ce 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -87,6 +87,10 @@ ignore_missing_imports = true module = "kerchunk.*" ignore_missing_imports = true +[[tool.mypy.overrides]] +module = "ujson.*" +ignore_missing_imports = true + [tool.ruff] # Same as Black. line-length = 88 diff --git a/virtualizarr/backend.py b/virtualizarr/backend.py index 904cad77..4da9e896 100644 --- a/virtualizarr/backend.py +++ b/virtualizarr/backend.py @@ -16,7 +16,8 @@ from xarray.core.variable import IndexVariable from virtualizarr.manifests import ManifestArray -from virtualizarr.utils import _fsspec_openfile_from_filepath +from virtualizarr.types.kerchunk import KerchunkStoreRefs +from virtualizarr.utils import _FsspecFSFromFilepath XArrayOpenT = str | os.PathLike[Any] | BufferedIOBase | AbstractDataStore @@ -39,6 +40,7 @@ class FileType(AutoName): zarr = auto() dmrpp = auto() zarr_v3 = auto() + kerchunk = auto() class ManifestBackendArray(ManifestArray, BackendArray): @@ -67,13 +69,14 @@ def open_virtual_dataset( Xarray indexes can optionally be created (the default behaviour). To avoid creating any xarray indexes pass ``indexes={}``. + Parameters ---------- filepath : str, default None File path to open as a set of virtualized zarr arrays. filetype : FileType, default None Type of file to be opened. Used to determine which kerchunk file format backend to use. - Can be one of {'netCDF3', 'netCDF4', 'HDF', 'TIFF', 'GRIB', 'FITS', 'zarr_v3'}. + Can be one of {'netCDF3', 'netCDF4', 'HDF', 'TIFF', 'GRIB', 'FITS', 'zarr_v3', 'kerchunk'}. If not provided will attempt to automatically infer the correct filetype from header bytes. group : str, default is None Path to the HDF5/netCDF4 group in the given file to open. Given as a str, supported by filetypes “netcdf4” and “hdf5”. @@ -133,9 +136,44 @@ def open_virtual_dataset( raise NotImplementedError() # if filetype is user defined, convert to FileType + if filetype is not None: filetype = FileType(filetype) + if filetype == FileType.kerchunk: + from virtualizarr.readers.kerchunk import dataset_from_kerchunk_refs + + fs = _FsspecFSFromFilepath(filepath=filepath, reader_options=reader_options) + + # The kerchunk .parquet storage format isn't actually a parquet, but a directory that contains named parquets for each group/variable. + if fs.filepath.endswith("ref.parquet"): + from fsspec.implementations.reference import LazyReferenceMapper + + lrm = LazyReferenceMapper(filepath, fs.fs) + + # build reference dict from KV pairs in LazyReferenceMapper + # is there a better / more preformant way to extract this? + array_refs = {k: lrm[k] for k in lrm.keys()} + + full_reference = {"refs": array_refs} + + return dataset_from_kerchunk_refs(KerchunkStoreRefs(full_reference)) + + # JSON has no magic bytes, but the Kerchunk version 1 spec starts with 'version': + # https://fsspec.github.io/kerchunk/spec.html + elif fs.read_bytes(9).startswith(b'{"version'): + import ujson + + with fs.open_file() as of: + refs = ujson.load(of) + + return dataset_from_kerchunk_refs(KerchunkStoreRefs(refs)) + + else: + raise ValueError( + "The input Kerchunk reference did not seem to be in Kerchunk's JSON or Parquet spec: https://fsspec.github.io/kerchunk/spec.html. The Kerchunk format autodetection is quite flaky, so if your reference matches the Kerchunk spec feel free to open an issue: https://github.com/zarr-developers/VirtualiZarr/issues" + ) + if filetype == FileType.zarr_v3: # TODO is there a neat way of auto-detecting this? from virtualizarr.readers.zarr import open_virtual_dataset_from_v3_store @@ -151,9 +189,9 @@ def open_virtual_dataset( "Specifying `loadable_variables` or auto-creating indexes with `indexes=None` is not supported for dmrpp files." ) - fpath = _fsspec_openfile_from_filepath( + fpath = _FsspecFSFromFilepath( filepath=filepath, reader_options=reader_options - ) + ).open_file() parser = DMRParser(fpath.read(), data_filepath=filepath.strip(".dmrpp")) vds = parser.parse_dataset() vds.drop_vars(drop_variables) @@ -189,9 +227,9 @@ def open_virtual_dataset( # TODO we are reading a bunch of stuff we know we won't need here, e.g. all of the data variables... # TODO it would also be nice if we could somehow consolidate this with the reading of the kerchunk references # TODO really we probably want a dedicated xarray backend that iterates over all variables only once - fpath = _fsspec_openfile_from_filepath( + fpath = _FsspecFSFromFilepath( filepath=filepath, reader_options=reader_options - ) + ).open_file() # fpath can be `Any` thanks to fsspec.filesystem(...).open() returning Any. # We'll (hopefully safely) cast it to what xarray is expecting, but this might let errors through. diff --git a/virtualizarr/manifests/array_api.py b/virtualizarr/manifests/array_api.py index 09606978..18f15933 100644 --- a/virtualizarr/manifests/array_api.py +++ b/virtualizarr/manifests/array_api.py @@ -1,4 +1,4 @@ -from typing import TYPE_CHECKING, Callable, Iterable +from typing import TYPE_CHECKING, Any, Callable, Iterable, cast import numpy as np @@ -217,9 +217,12 @@ def stack( new_shape.insert(axis, length_along_new_stacked_axis) # do stacking of entries in manifest - stacked_paths = np.stack( - [arr.manifest._paths for arr in arrays], - axis=axis, + stacked_paths = cast( # `np.stack` apparently is type hinted as if the output could have Any dtype + np.ndarray[Any, np.dtypes.StringDType], + np.stack( + [arr.manifest._paths for arr in arrays], + axis=axis, + ), ) stacked_offsets = np.stack( [arr.manifest._offsets for arr in arrays], @@ -296,10 +299,14 @@ def broadcast_to(x: "ManifestArray", /, shape: tuple[int, ...]) -> "ManifestArra ) # do broadcasting of entries in manifest - broadcasted_paths = np.broadcast_to( - x.manifest._paths, - shape=new_chunk_grid_shape, + broadcasted_paths = cast( # `np.broadcast_to` apparently is type hinted as if the output could have Any dtype + np.ndarray[Any, np.dtypes.StringDType], + np.broadcast_to( + x.manifest._paths, + shape=new_chunk_grid_shape, + ), ) + broadcasted_offsets = np.broadcast_to( x.manifest._offsets, shape=new_chunk_grid_shape, diff --git a/virtualizarr/manifests/manifest.py b/virtualizarr/manifests/manifest.py index 3aaebb41..a6d160ed 100644 --- a/virtualizarr/manifests/manifest.py +++ b/virtualizarr/manifests/manifest.py @@ -84,7 +84,7 @@ class ChunkManifest: so it's not possible to have a ChunkManifest object that does not represent a valid grid of chunks. """ - _paths: np.ndarray[Any, np.dtypes.StringDType] # type: ignore[name-defined] + _paths: np.ndarray[Any, np.dtypes.StringDType] _offsets: np.ndarray[Any, np.dtype[np.uint64]] _lengths: np.ndarray[Any, np.dtype[np.uint64]] @@ -113,7 +113,10 @@ def __init__(self, entries: dict) -> None: shape = get_chunk_grid_shape(entries.keys()) # Initializing to empty implies that entries with path='' are treated as missing chunks - paths = np.empty(shape=shape, dtype=np.dtypes.StringDType()) # type: ignore[attr-defined] + paths = cast( # `np.empty` apparently is type hinted as if the output could have Any dtype + np.ndarray[Any, np.dtypes.StringDType], + np.empty(shape=shape, dtype=np.dtypes.StringDType()), + ) offsets = np.empty(shape=shape, dtype=np.dtype("uint64")) lengths = np.empty(shape=shape, dtype=np.dtype("uint64")) @@ -141,7 +144,7 @@ def __init__(self, entries: dict) -> None: @classmethod def from_arrays( cls, - paths: np.ndarray[Any, np.dtype[np.dtypes.StringDType]], # type: ignore[name-defined] + paths: np.ndarray[Any, np.dtypes.StringDType], offsets: np.ndarray[Any, np.dtype[np.uint64]], lengths: np.ndarray[Any, np.dtype[np.uint64]], ) -> "ChunkManifest": @@ -306,7 +309,9 @@ def _from_kerchunk_chunk_dict( chunk_entries: dict[ChunkKey, ChunkDictEntry] = {} for k, v in kerchunk_chunk_dict.items(): if isinstance(v, (str, bytes)): - raise NotImplementedError("TODO: handle inlined data") + raise NotImplementedError( + "Reading inlined reference data is currently not supported. [ToDo]" + ) elif not isinstance(v, (tuple, list)): raise TypeError(f"Unexpected type {type(v)} for chunk value: {v}") chunk_entries[k] = ChunkEntry.from_kerchunk(v).dict() diff --git a/virtualizarr/readers/kerchunk.py b/virtualizarr/readers/kerchunk.py index 19a8c28d..c274ee5a 100644 --- a/virtualizarr/readers/kerchunk.py +++ b/virtualizarr/readers/kerchunk.py @@ -13,7 +13,7 @@ KerchunkArrRefs, KerchunkStoreRefs, ) -from virtualizarr.utils import _fsspec_openfile_from_filepath +from virtualizarr.utils import _FsspecFSFromFilepath from virtualizarr.zarr import ZArray, ZAttrs @@ -28,9 +28,9 @@ def _automatically_determine_filetype( raise NotImplementedError() # Read magic bytes from local or remote file - fpath = _fsspec_openfile_from_filepath( + fpath = _FsspecFSFromFilepath( filepath=filepath, reader_options=reader_options - ) + ).open_file() magic_bytes = fpath.read(8) fpath.close() diff --git a/virtualizarr/tests/test_backend.py b/virtualizarr/tests/test_backend.py index e42ad9ac..731c4acc 100644 --- a/virtualizarr/tests/test_backend.py +++ b/virtualizarr/tests/test_backend.py @@ -336,3 +336,77 @@ def test_open_dataset_with_scalar(self, hdf5_scalar, tmpdir): vds = open_virtual_dataset(hdf5_scalar) assert vds.scalar.dims == () assert vds.scalar.attrs == {"scalar": "true"} + + +@pytest.mark.parametrize( + "reference_format", + ["json", "parquet", "invalid"], +) +def test_open_virtual_dataset_existing_kerchunk_refs( + tmp_path, netcdf4_virtual_dataset, reference_format +): + example_reference_dict = netcdf4_virtual_dataset.virtualize.to_kerchunk( + format="dict" + ) + + if reference_format == "invalid": + # Test invalid file format leads to ValueError + ref_filepath = tmp_path / "ref.csv" + with open(ref_filepath.as_posix(), mode="w") as of: + of.write("tmp") + + with pytest.raises(ValueError): + open_virtual_dataset( + filepath=ref_filepath.as_posix(), filetype="kerchunk", indexes={} + ) + + else: + # Test valid json and parquet reference formats + + if reference_format == "json": + ref_filepath = tmp_path / "ref.json" + + import ujson + + with open(ref_filepath, "w") as json_file: + ujson.dump(example_reference_dict, json_file) + + if reference_format == "parquet": + from kerchunk.df import refs_to_dataframe + + ref_filepath = tmp_path / "ref.parquet" + refs_to_dataframe(fo=example_reference_dict, url=ref_filepath.as_posix()) + + vds = open_virtual_dataset( + filepath=ref_filepath.as_posix(), filetype="kerchunk", indexes={} + ) + + # Inconsistent results! https://github.com/TomNicholas/VirtualiZarr/pull/73#issuecomment-2040931202 + # assert vds.virtualize.to_kerchunk(format='dict') == example_reference_dict + refs = vds.virtualize.to_kerchunk(format="dict") + expected_refs = netcdf4_virtual_dataset.virtualize.to_kerchunk(format="dict") + assert refs["refs"]["air/0.0.0"] == expected_refs["refs"]["air/0.0.0"] + assert refs["refs"]["lon/0"] == expected_refs["refs"]["lon/0"] + assert refs["refs"]["lat/0"] == expected_refs["refs"]["lat/0"] + assert refs["refs"]["time/0"] == expected_refs["refs"]["time/0"] + + assert list(vds) == list(netcdf4_virtual_dataset) + assert set(vds.coords) == set(netcdf4_virtual_dataset.coords) + assert set(vds.variables) == set(netcdf4_virtual_dataset.variables) + + +def test_notimplemented_read_inline_refs(tmp_path, netcdf4_inlined_ref): + # For now, we raise a NotImplementedError if we read existing references that have inlined data + # https://github.com/zarr-developers/VirtualiZarr/pull/251#pullrequestreview-2361916932 + + ref_filepath = tmp_path / "ref.json" + + import ujson + + with open(ref_filepath, "w") as json_file: + ujson.dump(netcdf4_inlined_ref, json_file) + + with pytest.raises(NotImplementedError): + open_virtual_dataset( + filepath=ref_filepath.as_posix(), filetype="kerchunk", indexes={} + ) diff --git a/virtualizarr/tests/test_utils.py b/virtualizarr/tests/test_utils.py index ed204c16..d42c2288 100644 --- a/virtualizarr/tests/test_utils.py +++ b/virtualizarr/tests/test_utils.py @@ -7,7 +7,7 @@ import pytest import xarray as xr -from virtualizarr.utils import _fsspec_openfile_from_filepath +from virtualizarr.utils import _FsspecFSFromFilepath @pytest.fixture @@ -21,7 +21,7 @@ def test_fsspec_openfile_from_path(tmp_path: pathlib.Path, dataset: xr.Dataset) f = tmp_path / "dataset.nc" dataset.to_netcdf(f) - result = _fsspec_openfile_from_filepath(filepath=f.as_posix()) + result = _FsspecFSFromFilepath(filepath=f.as_posix()).open_file() assert isinstance(result, fsspec.implementations.local.LocalFileOpener) @@ -32,6 +32,6 @@ def test_fsspec_openfile_memory(dataset: xr.Dataset): with fs.open("dataset.nc", mode="wb") as f: dataset.to_netcdf(f, engine="h5netcdf") - result = _fsspec_openfile_from_filepath(filepath="memory://dataset.nc") + result = _FsspecFSFromFilepath(filepath="memory://dataset.nc").open_file() with result: assert isinstance(result, fsspec.implementations.memory.MemoryFile) diff --git a/virtualizarr/utils.py b/virtualizarr/utils.py index 092ddd25..1721a3e7 100644 --- a/virtualizarr/utils.py +++ b/virtualizarr/utils.py @@ -13,42 +13,51 @@ ] -def _fsspec_openfile_from_filepath( - *, - filepath: str, - reader_options: Optional[dict] = None, -) -> OpenFileType: - """Converts input filepath to fsspec openfile object. +from dataclasses import dataclass, field + + +@dataclass +class _FsspecFSFromFilepath: + """Class to create fsspec Filesystem from input filepath. Parameters ---------- filepath : str Input filepath reader_options : dict, optional - Dict containing kwargs to pass to file opener, by default {} - - Returns - ------- - OpenFileType - An open file-like object, specific to the protocol supplied in filepath. + dict containing kwargs to pass to file opener, by default {} + fs : Option | None + The fsspec filesystem object, created in __post_init__ - Raises - ------ - NotImplementedError - Raises a Not Implemented Error if filepath protocol is not supported. """ - import fsspec - from upath import UPath + filepath: str + reader_options: Optional[dict] = field(default_factory=dict) + fs: fsspec.AbstractFileSystem = field(init=False) + + def open_file(self) -> OpenFileType: + """Calls `.open` on fsspec.Filesystem instantiation using self.filepath as an input. + + Returns + ------- + OpenFileType + file opened with fsspec + """ + return self.fs.open(self.filepath) - universal_filepath = UPath(filepath) - protocol = universal_filepath.protocol + def read_bytes(self, bytes: int) -> bytes: + with self.open_file() as of: + return of.read(bytes) - if reader_options is None: - reader_options = {} + def __post_init__(self) -> None: + """Initialize the fsspec filesystem object""" + import fsspec + from upath import UPath - storage_options = reader_options.get("storage_options", {}) # type: ignore + universal_filepath = UPath(self.filepath) + protocol = universal_filepath.protocol - fpath = fsspec.filesystem(protocol, **storage_options).open(filepath) + self.reader_options = self.reader_options or {} + storage_options = self.reader_options.get("storage_options", {}) # type: ignore - return fpath + self.fs = fsspec.filesystem(protocol, **storage_options)