diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md new file mode 100644 index 00000000..6adaaa7b --- /dev/null +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -0,0 +1,6 @@ + + +- [ ] Closes #xxxx +- [ ] Tests added +- [ ] Changes are documented in `docs/releases.rst` +- [ ] New functions/methods are listed in `api.rst` diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 2a40e646..3ca9cf32 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -22,7 +22,7 @@ jobs: shell: bash -l {0} strategy: matrix: - python-version: ["3.9", "3.10", "3.11", "3.12"] + python-version: ["3.10", "3.11", "3.12"] steps: - uses: actions/checkout@v4 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a1af21fa..a5670c75 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -11,7 +11,7 @@ repos: - repo: https://github.com/astral-sh/ruff-pre-commit # Ruff version. - rev: "v0.4.3" + rev: "v0.4.7" hooks: # Run the linter. - id: ruff @@ -37,10 +37,8 @@ repos: ] # run this occasionally, ref discussion https://github.com/pydata/xarray/pull/3194 # - repo: https://github.com/asottile/pyupgrade - # rev: v1.22.1 + # rev: v3.15.2 # hooks: # - id: pyupgrade # args: - # - "--py3-only" - # # remove on f-strings in Py3.7 - # - "--keep-percent-format" + # - "--py310-plus" diff --git a/ci/doc.yml b/ci/doc.yml index c2f72e7c..7d7e9224 100644 --- a/ci/doc.yml +++ b/ci/doc.yml @@ -3,7 +3,7 @@ channels: - conda-forge - nodefaults dependencies: - - python>=3.9 + - python>=3.10 - "sphinx" - pip - pip: @@ -13,4 +13,4 @@ dependencies: - "sphinx_design" - "sphinx_togglebutton" - "sphinx-autodoc-typehints" - - -e .. + - -e "..[test]" diff --git a/docs/index.md b/docs/index.md index 65460243..98264964 100644 --- a/docs/index.md +++ b/docs/index.md @@ -82,5 +82,5 @@ installation usage faq api - +releases ``` diff --git a/docs/releases.rst b/docs/releases.rst new file mode 100644 index 00000000..dfacf533 --- /dev/null +++ b/docs/releases.rst @@ -0,0 +1,33 @@ +Release notes +============= + +.. _v0.1: + +v0.1 (unreleased) +----------------- + +v0.1 is the first release of VirtualiZarr!! It contains functionality for using kerchunk to find byte ranges in netCDF files, +constructing an xarray.Dataset containing ManifestArray objects, then writing out such a dataset to kerchunk references as either json or parquet. + +New Features +~~~~~~~~~~~~ + + +Breaking changes +~~~~~~~~~~~~~~~~ + + +Deprecations +~~~~~~~~~~~~ + + +Bug fixes +~~~~~~~~~ + + +Documentation +~~~~~~~~~~~~~ + + +Internal Changes +~~~~~~~~~~~~~~~~ diff --git a/docs/usage.md b/docs/usage.md index 4fc5411a..691e69eb 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -27,6 +27,7 @@ vds = open_virtual_dataset('air.nc') (Notice we did not have to explicitly indicate the file format, as {py:func}`open_virtual_dataset ` will attempt to automatically infer it.) + ```{note} In future we would like for it to be possible to just use `xr.open_dataset`, e.g. @@ -61,6 +62,15 @@ Attributes: These {py:class}`ManifestArray ` objects are each a virtual reference to some data in the `air.nc` netCDF file, with the references stored in the form of "Chunk Manifests". +### Opening remote files + +To open remote files as virtual datasets pass the `reader_options` options, e.g. + +```python +aws_credentials = {"key": ..., "secret": ...} +vds = open_virtual_dataset("s3://some-bucket/file.nc", reader_options={'storage_options': aws_credentials}) +``` + ## Chunk Manifests In the Zarr model N-dimensional arrays are stored as a series of compressed chunks, each labelled by a chunk key which indicates its position in the array. Whilst conventionally each of these Zarr chunks are a separate compressed binary file stored within a Zarr Store, there is no reason why these chunks could not actually already exist as part of another file (e.g. a netCDF file), and be loaded by reading a specific byte range from this pre-existing file. @@ -311,27 +321,38 @@ Once we've combined references to all the chunks of all our legacy files into on The [kerchunk library](https://github.com/fsspec/kerchunk) has its own [specification](https://fsspec.github.io/kerchunk/spec.html) for how byte range references should be serialized (either as a JSON or parquet file). -To write out all the references in the virtual dataset as a single kerchunk-compliant JSON file, you can use the {py:meth}`ds.virtualize.to_kerchunk ` accessor method. +To write out all the references in the virtual dataset as a single kerchunk-compliant JSON or parquet file, you can use the {py:meth}`ds.virtualize.to_kerchunk ` accessor method. ```python combined_vds.virtualize.to_kerchunk('combined.json', format='json') ``` -These references can now be interpreted like they were a Zarr store by [fsspec](https://github.com/fsspec/filesystem_spec), using kerchunk's built-in xarray backend (so you need kerchunk to be installed to use `engine='kerchunk'`). +These references can now be interpreted like they were a Zarr store by [fsspec](https://github.com/fsspec/filesystem_spec), using kerchunk's built-in xarray backend (kerchunk must be installed to use `engine='kerchunk'`). ```python -import fsspec +combined_ds = xr.open_dataset('combined.json', engine="kerchunk") +``` + +In-memory ("loadable") variables backed by numpy arrays can also be written out to kerchunk reference files, with the values serialized as bytes. This is equivalent to kerchunk's concept of "inlining", but done on a per-array basis using the `loadable_variables` kwarg rather than a per-chunk basis using kerchunk's `inline_threshold` kwarg. -fs = fsspec.filesystem("reference", fo=f"combined.json") -mapper = fs.get_mapper("") +```{note} +Currently you can only serialize in-memory variables to kerchunk references if they do not have any encoding. +``` -combined_ds = xr.open_dataset(mapper, engine="kerchunk") +When you have many chunks, the reference file can get large enough to be unwieldy as json. In that case the references can be instead stored as parquet. Again this uses kerchunk internally. + +```python +combined_vds.virtualize.to_kerchunk('combined.parq', format='parquet') ``` -```{note} -Currently you can only serialize virtual variables backed by `ManifestArray` objects to kerchunk reference files, not real in-memory numpy-backed variables. +And again we can read these references using the "kerchunk" backend as if it were a regular Zarr store + +```python +combined_ds = xr.open_dataset('combined.parq', engine="kerchunk") ``` +By default references are placed in separate parquet file when the total number of references exceeds `record_size`. If there are fewer than `categorical_threshold` unique urls referenced by a particular variable, url will be stored as a categorical variable. + ### Writing as Zarr Alternatively, we can write these references out as an actual Zarr store, at least one that is compliant with the [proposed "Chunk Manifest" ZEP](https://github.com/zarr-developers/zarr-specs/issues/287). To do this we simply use the {py:meth}`ds.virtualize.to_zarr ` accessor method. diff --git a/pyproject.toml b/pyproject.toml index 26975cdf..6a947142 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,12 +14,11 @@ classifiers = [ "License :: OSI Approved :: Apache Software License", "Operating System :: OS Independent", "Programming Language :: Python", - "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", ] -requires-python = ">=3.9" +requires-python = ">=3.10" dynamic = ["version"] dependencies = [ "xarray>=2024.5.0", @@ -29,19 +28,23 @@ dependencies = [ "numpy>=2.0.0rc1", "ujson", "packaging", + "universal-pathlib", ] [project.optional-dependencies] test = [ "codecov", "pre-commit", + "ruff", "pytest-mypy", "pytest-cov", "pytest", - "scipy", "pooch", - "ruff", - + "scipy", + "netcdf4", + "fsspec", + "s3fs", + "fastparquet", ] diff --git a/virtualizarr/kerchunk.py b/virtualizarr/kerchunk.py index 1ed9c574..f0aecda4 100644 --- a/virtualizarr/kerchunk.py +++ b/virtualizarr/kerchunk.py @@ -1,9 +1,15 @@ +import base64 +import json +from enum import Enum, auto from pathlib import Path -from typing import List, NewType, Optional, Tuple, Union, cast +from typing import NewType, Optional, cast +import numpy as np import ujson # type: ignore import xarray as xr +from virtualizarr.manifests.manifest import join +from virtualizarr.utils import _fsspec_openfile_from_filepath from virtualizarr.zarr import ZArray, ZAttrs # Distinguishing these via type hints makes it a lot easier to mentally keep track of what the opaque kerchunk "reference dicts" actually mean @@ -18,9 +24,6 @@ ) # lower-level dict containing just the information for one zarr array -from enum import Enum, auto - - class AutoName(Enum): # Recommended by official Python docs for auto naming: # https://docs.python.org/3/library/enum.html#using-automatic-values @@ -37,8 +40,22 @@ class FileType(AutoName): zarr = auto() +class NumpyEncoder(json.JSONEncoder): + # TODO I don't understand how kerchunk gets around this problem of encoding numpy types (in the zattrs) whilst only using ujson + def default(self, obj): + if isinstance(obj, np.ndarray): + return obj.tolist() # Convert NumPy array to Python list + elif isinstance(obj, np.generic): + return obj.item() # Convert NumPy scalar to Python scalar + return json.JSONEncoder.default(self, obj) + + def read_kerchunk_references_from_file( - filepath: str, filetype: Optional[FileType] + filepath: str, + filetype: FileType | None, + reader_options: Optional[dict] = { + "storage_options": {"key": "", "secret": "", "anon": True} + }, ) -> KerchunkStoreRefs: """ Read a single legacy file and return kerchunk references to its contents. @@ -50,10 +67,15 @@ def read_kerchunk_references_from_file( filetype : FileType, default: None Type of file to be opened. Used to determine which kerchunk file format backend to use. If not provided will attempt to automatically infer the correct filetype from the the filepath's extension. + reader_options: dict, default {'storage_options':{'key':'', 'secret':'', 'anon':True}} + Dict passed into Kerchunk file readers. Note: Each Kerchunk file reader has distinct arguments, + so ensure reader_options match selected Kerchunk reader arguments. """ if filetype is None: - filetype = _automatically_determine_filetype(filepath) + filetype = _automatically_determine_filetype( + filepath=filepath, reader_options=reader_options + ) # if filetype is user defined, convert to FileType filetype = FileType(filetype) @@ -61,12 +83,14 @@ def read_kerchunk_references_from_file( if filetype.name.lower() == "netcdf3": from kerchunk.netCDF3 import NetCDF3ToZarr - refs = NetCDF3ToZarr(filepath, inline_threshold=0).translate() + refs = NetCDF3ToZarr(filepath, inline_threshold=0, **reader_options).translate() elif filetype.name.lower() == "netcdf4": from kerchunk.hdf import SingleHdf5ToZarr - refs = SingleHdf5ToZarr(filepath, inline_threshold=0).translate() + refs = SingleHdf5ToZarr( + filepath, inline_threshold=0, **reader_options + ).translate() elif filetype.name.lower() == "grib": # TODO Grib files should be handled as a DataTree object # see https://github.com/TomNicholas/VirtualiZarr/issues/11 @@ -74,11 +98,11 @@ def read_kerchunk_references_from_file( elif filetype.name.lower() == "tiff": from kerchunk.tiff import tiff_to_zarr - refs = tiff_to_zarr(filepath, inline_threshold=0) + refs = tiff_to_zarr(filepath, inline_threshold=0, **reader_options) elif filetype.name.lower() == "fits": from kerchunk.fits import process_file - refs = process_file(filepath, inline_threshold=0) + refs = process_file(filepath, inline_threshold=0, **reader_options) else: raise NotImplementedError(f"Unsupported file type: {filetype.name}") @@ -86,20 +110,24 @@ def read_kerchunk_references_from_file( return refs -def _automatically_determine_filetype(filepath: str) -> FileType: +def _automatically_determine_filetype( + *, filepath: str, reader_options: Optional[dict] = {} +) -> FileType: file_extension = Path(filepath).suffix + fpath = _fsspec_openfile_from_filepath( + filepath=filepath, reader_options=reader_options + ) if file_extension == ".nc": # based off of: https://github.com/TomNicholas/VirtualiZarr/pull/43#discussion_r1543415167 - with open(filepath, "rb") as f: - magic = f.read() + magic = fpath.read() + if magic[0:3] == b"CDF": filetype = FileType.netcdf3 elif magic[1:4] == b"HDF": filetype = FileType.netcdf4 else: raise ValueError(".nc file does not appear to be NETCDF3 OR NETCDF4") - elif file_extension == ".zarr": # TODO we could imagine opening an existing zarr store, concatenating it, and writing a new virtual one... raise NotImplementedError() @@ -112,6 +140,7 @@ def _automatically_determine_filetype(filepath: str) -> FileType: else: raise NotImplementedError(f"Unrecognised file extension: {file_extension}") + fpath.close() return filetype @@ -149,7 +178,7 @@ def extract_array_refs( def parse_array_refs( arr_refs: KerchunkArrRefs, -) -> Tuple[dict, ZArray, ZAttrs]: +) -> tuple[dict, ZArray, ZAttrs]: zarray = ZArray.from_kerchunk_refs(arr_refs.pop(".zarray")) zattrs = arr_refs.pop(".zattrs", {}) chunk_dict = arr_refs @@ -177,7 +206,7 @@ def dataset_to_kerchunk_refs(ds: xr.Dataset) -> KerchunkStoreRefs: all_arr_refs = {} for var_name, var in ds.variables.items(): - arr_refs = variable_to_kerchunk_arr_refs(var) + arr_refs = variable_to_kerchunk_arr_refs(var, var_name) prepended_with_var_name = { f"{var_name}/{key}": val for key, val in arr_refs.items() @@ -189,6 +218,7 @@ def dataset_to_kerchunk_refs(ds: xr.Dataset) -> KerchunkStoreRefs: "version": 1, "refs": { ".zgroup": '{"zarr_format":2}', + ".zattrs": ujson.dumps(ds.attrs), **all_arr_refs, }, } @@ -196,31 +226,67 @@ def dataset_to_kerchunk_refs(ds: xr.Dataset) -> KerchunkStoreRefs: return cast(KerchunkStoreRefs, ds_refs) -def variable_to_kerchunk_arr_refs(var: xr.Variable) -> KerchunkArrRefs: +def variable_to_kerchunk_arr_refs(var: xr.Variable, var_name: str) -> KerchunkArrRefs: """ - Create a dictionary containing kerchunk-style array references from a single xarray.Variable (which wraps a ManifestArray). + Create a dictionary containing kerchunk-style array references from a single xarray.Variable (which wraps either a ManifestArray or a numpy array). Partially encodes the inner dicts to json to match kerchunk behaviour (see https://github.com/fsspec/kerchunk/issues/415). """ from virtualizarr.manifests import ChunkEntry, ManifestArray - marr = var.data + if isinstance(var.data, ManifestArray): + marr = var.data - if not isinstance(marr, ManifestArray): - raise TypeError( - f"Can only serialize wrapped arrays of type ManifestArray, but got type {type(marr)}" - ) + arr_refs: dict[str, str | list[str | int]] = { + str(chunk_key): [entry['path'], entry['offset'], entry['length']] + for chunk_key, entry in marr.manifest.dict().items() + } - arr_refs: dict[str, Union[str, List[Union[str, int]]]] = { - str(chunk_key): ChunkEntry(**chunk_entry).to_kerchunk() - for chunk_key, chunk_entry in marr.manifest.dict().items() - } + zarray = marr.zarray + + else: + try: + np_arr = var.to_numpy() + except AttributeError as e: + raise TypeError( + f"Can only serialize wrapped arrays of type ManifestArray or numpy.ndarray, but got type {type(var.data)}" + ) from e + + if var.encoding: + if "scale_factor" in var.encoding: + raise NotImplementedError( + f"Cannot serialize loaded variable {var_name}, as it is encoded with a scale_factor" + ) + if "offset" in var.encoding: + raise NotImplementedError( + f"Cannot serialize loaded variable {var_name}, as it is encoded with an offset" + ) + if "calendar" in var.encoding: + raise NotImplementedError( + f"Cannot serialize loaded variable {var_name}, as it is encoded with a calendar" + ) + + # This encoding is what kerchunk does when it "inlines" data, see https://github.com/fsspec/kerchunk/blob/a0c4f3b828d37f6d07995925b324595af68c4a19/kerchunk/hdf.py#L472 + byte_data = np_arr.tobytes() + # TODO do I really need to encode then decode like this? + inlined_data = (b"base64:" + base64.b64encode(byte_data)).decode("utf-8") + + # TODO can this be generalized to save individual chunks of a dask array? + # TODO will this fail for a scalar? + arr_refs = {join(0 for _ in np_arr.shape): inlined_data} + + zarray = ZArray( + chunks=np_arr.shape, + shape=np_arr.shape, + dtype=np_arr.dtype, + order="C", + ) - zarray_dict = marr.zarray.to_kerchunk_json() + zarray_dict = zarray.to_kerchunk_json() arr_refs[".zarray"] = zarray_dict zattrs = var.attrs zattrs["_ARRAY_DIMENSIONS"] = list(var.dims) - arr_refs[".zattrs"] = ujson.dumps(zattrs) + arr_refs[".zattrs"] = json.dumps(zattrs, separators=(",", ":"), cls=NumpyEncoder) return cast(KerchunkArrRefs, arr_refs) diff --git a/virtualizarr/manifests/array.py b/virtualizarr/manifests/array.py index 5d132bc2..ed07d8e8 100644 --- a/virtualizarr/manifests/array.py +++ b/virtualizarr/manifests/array.py @@ -1,5 +1,5 @@ import warnings -from typing import Any, Tuple, Union +from typing import Any, Union import numpy as np @@ -26,8 +26,8 @@ class ManifestArray: def __init__( self, - zarray: Union[ZArray, dict], - chunkmanifest: Union[dict, ChunkManifest], + zarray: ZArray | dict, + chunkmanifest: dict | ChunkManifest, ) -> None: """ Create a ManifestArray directly from the .zarray information of a zarr array and the manifest of chunks. @@ -83,7 +83,7 @@ def zarray(self) -> ZArray: return self._zarray @property - def chunks(self) -> Tuple[int, ...]: + def chunks(self) -> tuple[int, ...]: return tuple(self.zarray.chunks) @property diff --git a/virtualizarr/manifests/array_api.py b/virtualizarr/manifests/array_api.py index 10561f3c..41e8a3e0 100644 --- a/virtualizarr/manifests/array_api.py +++ b/virtualizarr/manifests/array_api.py @@ -1,5 +1,5 @@ import itertools -from typing import TYPE_CHECKING, Callable, Dict, Iterable, List, Tuple, Union +from typing import TYPE_CHECKING, Callable, Iterable import numpy as np @@ -11,7 +11,7 @@ from .array import ManifestArray -MANIFESTARRAY_HANDLED_ARRAY_FUNCTIONS: Dict[ +MANIFESTARRAY_HANDLED_ARRAY_FUNCTIONS: dict[ str, Callable ] = {} # populated by the @implements decorators below @@ -52,7 +52,7 @@ def _check_same_dtypes(dtypes: list[np.dtype]) -> None: ) -def _check_same_codecs(codecs: List[Codec]) -> None: +def _check_same_codecs(codecs: list[Codec]) -> None: first_codec, *other_codecs = codecs for codec in other_codecs: if codec != first_codec: @@ -63,7 +63,7 @@ def _check_same_codecs(codecs: List[Codec]) -> None: ) -def _check_same_chunk_shapes(chunks_list: List[Tuple[int, ...]]) -> None: +def _check_same_chunk_shapes(chunks_list: list[tuple[int, ...]]) -> None: """Check all the chunk shapes are the same""" first_chunks, *other_chunks_list = chunks_list @@ -78,7 +78,7 @@ def _check_same_chunk_shapes(chunks_list: List[Tuple[int, ...]]) -> None: @implements(np.result_type) def result_type(*arrays_and_dtypes) -> np.dtype: """Called by xarray to ensure all arguments to concat have the same dtype.""" - first_dtype, *other_dtypes = [np.dtype(obj) for obj in arrays_and_dtypes] + first_dtype, *other_dtypes = (np.dtype(obj) for obj in arrays_and_dtypes) for other_dtype in other_dtypes: if other_dtype != first_dtype: raise ValueError("dtypes not all consistent") @@ -87,10 +87,10 @@ def result_type(*arrays_and_dtypes) -> np.dtype: @implements(np.concatenate) def concatenate( - arrays: Union[tuple["ManifestArray", ...], list["ManifestArray"]], + arrays: tuple["ManifestArray", ...] | list["ManifestArray"], /, *, - axis: Union[int, None] = 0, + axis: int | None = 0, ) -> "ManifestArray": """ Concatenate ManifestArrays by merging their chunk manifests. @@ -183,7 +183,7 @@ def _remove_element_at_position(t: tuple[int, ...], pos: int) -> tuple[int, ...] @implements(np.stack) def stack( - arrays: Union[tuple["ManifestArray", ...], list["ManifestArray"]], + arrays: tuple["ManifestArray", ...] | list["ManifestArray"], /, *, axis: int = 0, @@ -248,7 +248,7 @@ def stack( return ManifestArray(chunkmanifest=stacked_manifest, zarray=new_zarray) -def _check_same_shapes(shapes: List[Tuple[int, ...]]) -> None: +def _check_same_shapes(shapes: list[tuple[int, ...]]) -> None: first_shape, *other_shapes = shapes for other_shape in other_shapes: if other_shape != first_shape: @@ -265,7 +265,7 @@ def expand_dims(x: "ManifestArray", /, *, axis: int = 0) -> "ManifestArray": @implements(np.broadcast_to) -def broadcast_to(x: "ManifestArray", /, shape: Tuple[int, ...]) -> "ManifestArray": +def broadcast_to(x: "ManifestArray", /, shape: tuple[int, ...]) -> "ManifestArray": """ Broadcasts a ManifestArray to a specified shape, by either adjusting chunk keys or copying chunk manifest entries. """ @@ -320,7 +320,7 @@ def broadcast_to(x: "ManifestArray", /, shape: Tuple[int, ...]) -> "ManifestArra @implements(np.full_like) def full_like( - x: "ManifestArray", /, fill_value: bool, *, dtype: Union[np.dtype, None] + x: "ManifestArray", /, fill_value: bool, *, dtype: np.dtype | None ) -> np.ndarray: """ Returns a new array filled with fill_value and having the same shape as an input array x. diff --git a/virtualizarr/manifests/manifest.py b/virtualizarr/manifests/manifest.py index 242c1db0..2eee6b0e 100644 --- a/virtualizarr/manifests/manifest.py +++ b/virtualizarr/manifests/manifest.py @@ -1,6 +1,7 @@ import json import re -from typing import Any, Iterable, Iterator, List, NewType, Tuple, Union, cast +from collections.abc import Iterable, Iterator +from typing import Any, NewType, Tuple, Union, cast import numpy as np from pydantic import BaseModel, ConfigDict @@ -34,13 +35,11 @@ def __repr__(self) -> str: return f"ChunkEntry(path='{self.path}', offset={self.offset}, length={self.length})" @classmethod - def from_kerchunk( - cls, path_and_byte_range_info: List[Union[str, int]] - ) -> "ChunkEntry": + def from_kerchunk(cls, path_and_byte_range_info: list[str | int]) -> "ChunkEntry": path, offset, length = path_and_byte_range_info return ChunkEntry(path=path, offset=offset, length=length) - def to_kerchunk(self) -> List[Union[str, int]]: + def to_kerchunk(self) -> list[str | int]: """Write out in the format that kerchunk uses for chunk entries.""" return [self.path, self.offset, self.length] @@ -199,7 +198,7 @@ def ndim_chunk_grid(self) -> int: return self._paths.ndim @property - def shape_chunk_grid(self) -> Tuple[int, ...]: + def shape_chunk_grid(self) -> tuple[int, ...]: """ Number of separate chunks along each dimension. @@ -271,6 +270,7 @@ def __eq__(self, other: Any) -> bool: @classmethod def from_zarr_json(cls, filepath: str) -> "ChunkManifest": """Create a ChunkManifest from a Zarr manifest.json file.""" + with open(filepath, "r") as manifest_file: entries = json.load(manifest_file) @@ -321,7 +321,7 @@ def validate_chunk_keys(chunk_keys: Iterable[ChunkKey]): ) -def get_chunk_grid_shape(chunk_keys: Iterable[ChunkKey]) -> Tuple[int, ...]: +def get_chunk_grid_shape(chunk_keys: Iterable[ChunkKey]) -> tuple[int, ...]: # find max chunk index along each dimension zipped_indices = zip(*[split(key) for key in chunk_keys]) chunk_grid_shape = tuple( diff --git a/virtualizarr/tests/conftest.py b/virtualizarr/tests/conftest.py index 51d672d7..08651353 100644 --- a/virtualizarr/tests/conftest.py +++ b/virtualizarr/tests/conftest.py @@ -9,7 +9,8 @@ def netcdf4_file(tmpdir): # Save it to disk as netCDF (in temporary directory) filepath = f"{tmpdir}/air.nc" - ds.to_netcdf(filepath) + ds.to_netcdf(filepath, format="NETCDF4") + ds.close() return filepath @@ -28,5 +29,7 @@ def netcdf4_files(tmpdir): filepath2 = f"{tmpdir}/air2.nc" ds1.to_netcdf(filepath1) ds2.to_netcdf(filepath2) + ds1.close() + ds2.close() return filepath1, filepath2 diff --git a/virtualizarr/tests/test_integration.py b/virtualizarr/tests/test_integration.py index 42e2467a..064968b3 100644 --- a/virtualizarr/tests/test_integration.py +++ b/virtualizarr/tests/test_integration.py @@ -1,8 +1,97 @@ +import pytest import xarray as xr +import xarray.testing as xrt from virtualizarr import open_virtual_dataset +@pytest.mark.parametrize( + "inline_threshold, vars_to_inline", + [ + (5e2, ["lat", "lon"]), + pytest.param( + 5e4, ["lat", "lon", "time"], marks=pytest.mark.xfail(reason="time encoding") + ), + pytest.param( + 5e7, + ["lat", "lon", "time", "air"], + marks=pytest.mark.xfail(reason="scale factor encoding"), + ), + ], +) +def test_numpy_arrays_to_inlined_kerchunk_refs( + netcdf4_file, inline_threshold, vars_to_inline +): + from kerchunk.hdf import SingleHdf5ToZarr + + # inline_threshold is chosen to test inlining only the variables listed in vars_to_inline + expected = SingleHdf5ToZarr( + netcdf4_file, inline_threshold=int(inline_threshold) + ).translate() + + # loading the variables should produce same result as inlining them using kerchunk + vds = open_virtual_dataset( + netcdf4_file, loadable_variables=vars_to_inline, indexes={} + ) + refs = vds.virtualize.to_kerchunk(format="dict") + + # TODO I would just compare the entire dicts but kerchunk returns inconsistent results - see https://github.com/TomNicholas/VirtualiZarr/pull/73#issuecomment-2040931202 + # assert refs == expected + assert refs["refs"]["air/0.0.0"] == expected["refs"]["air/0.0.0"] + assert refs["refs"]["lon/0"] == expected["refs"]["lon/0"] + assert refs["refs"]["lat/0"] == expected["refs"]["lat/0"] + assert refs["refs"]["time/0"] == expected["refs"]["time/0"] + + +@pytest.mark.parametrize("format", ["json", "parquet"]) +class TestKerchunkRoundtrip: + def test_kerchunk_roundtrip_no_concat(self, tmpdir, format): + # set up example xarray dataset + ds = xr.tutorial.open_dataset("air_temperature", decode_times=False) + + # save it to disk as netCDF (in temporary directory) + ds.to_netcdf(f"{tmpdir}/air.nc") + + # use open_dataset_via_kerchunk to read it as references + vds = open_virtual_dataset(f"{tmpdir}/air.nc", indexes={}) + + # write those references to disk as kerchunk json + vds.virtualize.to_kerchunk(f"{tmpdir}/refs.{format}", format=format) + + # use fsspec to read the dataset from disk via the zarr store + roundtrip = xr.open_dataset(f"{tmpdir}/refs.{format}", engine="kerchunk") + + # assert equal to original dataset + xrt.assert_equal(roundtrip, ds) + + def test_kerchunk_roundtrip_concat(self, tmpdir, format): + # set up example xarray dataset + ds = xr.tutorial.open_dataset("air_temperature", decode_times=False) + + # split into two datasets + ds1, ds2 = ds.isel(time=slice(None, 1460)), ds.isel(time=slice(1460, None)) + + # save it to disk as netCDF (in temporary directory) + ds1.to_netcdf(f"{tmpdir}/air1.nc") + ds2.to_netcdf(f"{tmpdir}/air2.nc") + + # use open_dataset_via_kerchunk to read it as references + vds1 = open_virtual_dataset(f"{tmpdir}/air1.nc", indexes={}) + vds2 = open_virtual_dataset(f"{tmpdir}/air2.nc", indexes={}) + + # concatenate virtually along time + vds = xr.concat([vds1, vds2], dim="time", coords="minimal", compat="override") + + # write those references to disk as kerchunk json + vds.virtualize.to_kerchunk(f"{tmpdir}/refs.{format}", format=format) + + # use fsspec to read the dataset from disk via the zarr store + roundtrip = xr.open_dataset(f"{tmpdir}/refs.{format}", engine="kerchunk") + + # assert equal to original dataset + xrt.assert_equal(roundtrip, ds) + + def test_open_scalar_variable(tmpdir): # regression test for GH issue #100 diff --git a/virtualizarr/tests/test_kerchunk.py b/virtualizarr/tests/test_kerchunk.py index f3ff312f..a623bc02 100644 --- a/virtualizarr/tests/test_kerchunk.py +++ b/virtualizarr/tests/test_kerchunk.py @@ -1,4 +1,5 @@ import numpy as np +import pandas as pd import pytest import ujson # type: ignore import xarray as xr @@ -39,7 +40,7 @@ def test_dataset_from_df_refs(): assert da.data.zarray.compressor is None assert da.data.zarray.filters is None - assert da.data.zarray.fill_value is None + assert da.data.zarray.fill_value is np.nan assert da.data.zarray.order == "C" assert da.data.manifest.dict() == { @@ -78,7 +79,7 @@ def test_accessor_to_kerchunk_dict(self): chunks=(2, 3), compressor=None, filters=None, - fill_value=None, + fill_value=np.nan, order="C", ), ) @@ -88,6 +89,7 @@ def test_accessor_to_kerchunk_dict(self): "version": 1, "refs": { ".zgroup": '{"zarr_format":2}', + ".zattrs": "{}", "a/.zarray": '{"chunks":[2,3],"compressor":null,"dtype":" xr.Dataset: + return xr.Dataset( + {"x": xr.DataArray([10, 20, 30], dims="a", coords={"a": [0, 1, 2]})} + ) + + +def test_fsspec_openfile_from_path(tmp_path: pathlib.Path, dataset: xr.Dataset) -> None: + f = tmp_path / "dataset.nc" + dataset.to_netcdf(f) + + result = _fsspec_openfile_from_filepath(filepath=f.as_posix()) + assert isinstance(result, fsspec.implementations.local.LocalFileOpener) + + +def test_fsspec_openfile_memory(dataset: xr.Dataset): + fs = fsspec.filesystem("memory") + with contextlib.redirect_stderr(None): + # Suppress "Exception ignored in: " + with fs.open("dataset.nc", mode="wb") as f: + dataset.to_netcdf(f, engine="h5netcdf") + + result = _fsspec_openfile_from_filepath(filepath="memory://dataset.nc") + with result: + assert isinstance(result, fsspec.implementations.memory.MemoryFile) diff --git a/virtualizarr/tests/test_xarray.py b/virtualizarr/tests/test_xarray.py index a13977ff..695759bd 100644 --- a/virtualizarr/tests/test_xarray.py +++ b/virtualizarr/tests/test_xarray.py @@ -1,6 +1,8 @@ -from typing import Mapping +from collections.abc import Mapping +from unittest.mock import patch import numpy as np +import pytest import xarray as xr import xarray.testing as xrt from xarray.core.indexes import Index @@ -268,6 +270,24 @@ def test_combine_by_coords(self, netcdf4_files): assert combined_vds.xindexes["time"].to_pandas_index().is_monotonic_increasing +pytest.importorskip("s3fs") + + +@pytest.mark.parametrize( + "filetype", ["netcdf4", None], ids=["netcdf4 filetype", "None filetype"] +) +@pytest.mark.parametrize("indexes", [None, {}], ids=["None index", "empty dict index"]) +def test_anon_read_s3(filetype, indexes): + """Parameterized tests for empty vs supplied indexes and filetypes.""" + # TODO: Switch away from this s3 url after minIO is implemented. + fpath = "s3://carbonplan-share/virtualizarr/local.nc" + vds = open_virtual_dataset(fpath, filetype=filetype, indexes=indexes) + + assert vds.dims == {"time": 2920, "lat": 25, "lon": 53} + for var in vds.variables: + assert isinstance(vds[var].data, ManifestArray), var + + class TestLoadVirtualDataset: def test_loadable_variables(self, netcdf4_file): vars_to_load = ["air", "time"] @@ -280,6 +300,20 @@ def test_loadable_variables(self, netcdf4_file): assert isinstance(vds[name].data, ManifestArray), name full_ds = xr.open_dataset(netcdf4_file) + for name in full_ds.variables: if name in vars_to_load: xrt.assert_identical(vds.variables[name], full_ds.variables[name]) + + @patch("virtualizarr.kerchunk.read_kerchunk_references_from_file") + def test_open_virtual_dataset_passes_expected_args( + self, mock_read_kerchunk, netcdf4_file + ): + reader_options = {"option1": "value1", "option2": "value2"} + open_virtual_dataset(netcdf4_file, indexes={}, reader_options=reader_options) + args = { + "filepath": netcdf4_file, + "filetype": None, + "reader_options": reader_options, + } + mock_read_kerchunk.assert_called_once_with(**args) diff --git a/virtualizarr/tests/test_zarr.py b/virtualizarr/tests/test_zarr.py index 866fcc92..80d04b9c 100644 --- a/virtualizarr/tests/test_zarr.py +++ b/virtualizarr/tests/test_zarr.py @@ -17,7 +17,7 @@ def test_zarr_v3_roundtrip(tmpdir): chunks=(2, 3), compressor=None, filters=None, - fill_value=None, + fill_value=np.nan, order="C", zarr_format=3, ), diff --git a/virtualizarr/utils.py b/virtualizarr/utils.py new file mode 100644 index 00000000..e18993c3 --- /dev/null +++ b/virtualizarr/utils.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +import io +from typing import TYPE_CHECKING, Optional, Union + +if TYPE_CHECKING: + import fsspec.core + import fsspec.spec + + # See pangeo_forge_recipes.storage + OpenFileType = Union[ + fsspec.core.OpenFile, fsspec.spec.AbstractBufferedFile, io.IOBase + ] + + +def _fsspec_openfile_from_filepath( + *, + filepath: str, + reader_options: Optional[dict] = {}, +) -> OpenFileType: + """Converts input filepath to fsspec openfile object. + + Parameters + ---------- + filepath : str + Input filepath + reader_options : _type_, optional + Dict containing kwargs to pass to file opener, by default {'storage_options':{'key':'', 'secret':'', 'anon':True}} + + Returns + ------- + OpenFileType + An open file-like object, specific to the protocol supplied in filepath. + + Raises + ------ + NotImplementedError + Raises a Not Implemented Error if filepath protocol is not supported. + """ + + import fsspec + from upath import UPath + + universal_filepath = UPath(filepath) + protocol = universal_filepath.protocol + + if protocol == "s3": + protocol_defaults = {"key": "", "secret": "", "anon": True} + else: + protocol_defaults = {} + + storage_options = reader_options.get("storage_options", {}) # type: ignore + + # using dict merge operator to add in defaults if keys are not specified + storage_options = protocol_defaults | storage_options + fpath = fsspec.filesystem(protocol, **storage_options).open(filepath) + + return fpath diff --git a/virtualizarr/xarray.py b/virtualizarr/xarray.py index 8ab69d09..9ea01709 100644 --- a/virtualizarr/xarray.py +++ b/virtualizarr/xarray.py @@ -1,12 +1,8 @@ +from collections.abc import Iterable, Mapping, MutableMapping from pathlib import Path from typing import ( - Iterable, - List, Literal, - Mapping, - MutableMapping, Optional, - Union, overload, ) @@ -20,6 +16,7 @@ import virtualizarr.kerchunk as kerchunk from virtualizarr.kerchunk import FileType, KerchunkStoreRefs from virtualizarr.manifests import ChunkManifest, ManifestArray +from virtualizarr.utils import _fsspec_openfile_from_filepath from virtualizarr.zarr import ( attrs_from_zarr_group_json, dataset_to_zarr, @@ -35,11 +32,14 @@ class ManifestBackendArray(ManifestArray, BackendArray): def open_virtual_dataset( filepath: str, - filetype: Optional[FileType] = None, - drop_variables: Optional[Iterable[str]] = None, - loadable_variables: Optional[Iterable[str]] = None, - indexes: Optional[Mapping[str, Index]] = None, + filetype: FileType | None = None, + drop_variables: Iterable[str] | None = None, + loadable_variables: Iterable[str] | None = None, + indexes: Mapping[str, Index] | None = None, virtual_array_class=ManifestArray, + reader_options: Optional[dict] = { + "storage_options": {"key": "", "secret": "", "anon": True} + }, ) -> xr.Dataset: """ Open a file or store as an xarray Dataset wrapping virtualized zarr arrays. @@ -68,6 +68,9 @@ def open_virtual_dataset( virtual_array_class Virtual array class to use to represent the references to the chunks in each on-disk array. Currently can only be ManifestArray, but once VirtualZarrArray is implemented the default should be changed to that. + reader_options: dict, default {'storage_options':{'key':'', 'secret':'', 'anon':True}} + Dict passed into Kerchunk file readers. Note: Each Kerchunk file reader has distinct arguments, + so ensure reader_options match selected Kerchunk reader arguments. Returns ------- @@ -105,6 +108,7 @@ def open_virtual_dataset( vds_refs = kerchunk.read_kerchunk_references_from_file( filepath=filepath, filetype=filetype, + reader_options=reader_options, ) virtual_vars = virtual_vars_from_kerchunk_refs( vds_refs, @@ -117,7 +121,11 @@ def open_virtual_dataset( # TODO we are reading a bunch of stuff we know we won't need here, e.g. all of the data variables... # TODO it would also be nice if we could somehow consolidate this with the reading of the kerchunk references # TODO really we probably want a dedicated xarray backend that iterates over all variables only once - ds = xr.open_dataset(filepath, drop_variables=drop_variables) + fpath = _fsspec_openfile_from_filepath( + filepath=filepath, reader_options=reader_options + ) + + ds = xr.open_dataset(fpath, drop_variables=drop_variables) if indexes is None: # add default indexes by reading data from file @@ -159,8 +167,8 @@ def open_virtual_dataset( def open_virtual_dataset_from_v3_store( storepath: str, - drop_variables: List[str], - indexes: Optional[Mapping[str, Index]], + drop_variables: list[str], + indexes: Mapping[str, Index] | None, ) -> xr.Dataset: """ Read a Zarr v3 store and return an xarray Dataset containing virtualized arrays. @@ -210,7 +218,7 @@ def open_virtual_dataset_from_v3_store( def virtual_vars_from_kerchunk_refs( refs: KerchunkStoreRefs, - drop_variables: Optional[List[str]] = None, + drop_variables: list[str] | None = None, virtual_array_class=ManifestArray, ) -> Mapping[str, xr.Variable]: """ @@ -240,9 +248,9 @@ def virtual_vars_from_kerchunk_refs( def dataset_from_kerchunk_refs( refs: KerchunkStoreRefs, - drop_variables: List[str] = [], + drop_variables: list[str] = [], virtual_array_class: type = ManifestArray, - indexes: Optional[MutableMapping[str, Index]] = None, + indexes: MutableMapping[str, Index] | None = None, ) -> xr.Dataset: """ Translate a store-level kerchunk reference dict into an xarray Dataset containing virtualized arrays. @@ -299,7 +307,7 @@ def separate_coords( """ # this would normally come from CF decoding, let's hope the fact we're skipping that doesn't cause any problems... - coord_names: List[str] = [] + coord_names: list[str] = [] # split data and coordinate variables (promote dimension coordinates) data_vars = {} @@ -359,16 +367,24 @@ def to_kerchunk( ) -> KerchunkStoreRefs: ... @overload - def to_kerchunk(self, filepath: str, format: Literal["json"]) -> None: ... + def to_kerchunk(self, filepath: str | Path, format: Literal["json"]) -> None: ... @overload - def to_kerchunk(self, filepath: str, format: Literal["parquet"]) -> None: ... + def to_kerchunk( + self, + filepath: str | Path, + format: Literal["parquet"], + record_size: int = 100_000, + categorical_threshold: int = 10, + ) -> None: ... def to_kerchunk( self, - filepath: Optional[str] = None, - format: Union[Literal["dict"], Literal["json"], Literal["parquet"]] = "dict", - ) -> Union[KerchunkStoreRefs, None]: + filepath: str | Path | None = None, + format: Literal["dict", "json", "parquet"] = "dict", + record_size: int = 100_000, + categorical_threshold: int = 10, + ) -> KerchunkStoreRefs | None: """ Serialize all virtualized arrays in this xarray dataset into the kerchunk references format. @@ -379,6 +395,13 @@ def to_kerchunk( format : 'dict', 'json', or 'parquet' Format to serialize the kerchunk references as. If 'json' or 'parquet' then the 'filepath' argument is required. + record_size (parquet only): int + Number of references to store in each reference file (default 100,000). Bigger values + mean fewer read requests but larger memory footprint. + categorical_threshold (parquet only) : int + Encode urls as pandas.Categorical to reduce memory footprint if the ratio + of the number of unique urls to total number of refs for each variable + is greater than or equal to this number. (default 10) References ---------- @@ -397,6 +420,21 @@ def to_kerchunk( return None elif format == "parquet": - raise NotImplementedError() + from kerchunk.df import refs_to_dataframe + + if isinstance(filepath, Path): + url = str(filepath) + elif isinstance(filepath, str): + url = filepath + + # refs_to_dataframe is responsible for writing to parquet. + # at no point does it create a full in-memory dataframe. + refs_to_dataframe( + refs, + url=url, + record_size=record_size, + categorical_threshold=categorical_threshold, + ) + return None else: raise ValueError(f"Unrecognized output format: {format}") diff --git a/virtualizarr/zarr.py b/virtualizarr/zarr.py index bc703df9..dbba794a 100644 --- a/virtualizarr/zarr.py +++ b/virtualizarr/zarr.py @@ -3,13 +3,9 @@ from typing import ( TYPE_CHECKING, Any, - Dict, - List, Literal, NewType, Optional, - Tuple, - Union, ) import numpy as np @@ -29,8 +25,8 @@ class Codec(BaseModel): - compressor: Optional[str] = None - filters: Optional[List[Dict]] = None + compressor: str | None = None + filters: list[dict] | None = None def __repr__(self) -> str: return f"Codec(compressor={self.compressor}, filters={self.filters})" @@ -45,14 +41,14 @@ class ZArray(BaseModel): arbitrary_types_allowed=True, # only here so pydantic doesn't complain about the numpy dtype field ) - chunks: Tuple[int, ...] - compressor: Optional[str] = None + chunks: tuple[int, ...] + compressor: str | None = None dtype: np.dtype - fill_value: Optional[float] = None # float or int? - filters: Optional[List[Dict]] = None - order: Union[Literal["C"], Literal["F"]] - shape: Tuple[int, ...] - zarr_format: Union[Literal[2], Literal[3]] = 2 + fill_value: float | int | None = np.nan # float or int? + filters: list[dict] | None = None + order: Literal["C", "F"] + shape: tuple[int, ...] + zarr_format: Literal[2, 3] = 2 @field_validator("dtype") @classmethod @@ -78,13 +74,16 @@ def __repr__(self) -> str: @classmethod def from_kerchunk_refs(cls, decoded_arr_refs_zarray) -> "ZArray": - # TODO should we be doing some type coercion on the 'fill_value' here? + # coerce type of fill_value as kerchunk can be inconsistent with this + fill_value = decoded_arr_refs_zarray["fill_value"] + if fill_value is None or fill_value == "NaN" or fill_value == "nan": + fill_value = np.nan return ZArray( chunks=tuple(decoded_arr_refs_zarray["chunks"]), compressor=decoded_arr_refs_zarray["compressor"], dtype=np.dtype(decoded_arr_refs_zarray["dtype"]), - fill_value=decoded_arr_refs_zarray["fill_value"], + fill_value=fill_value, filters=decoded_arr_refs_zarray["filters"], order=decoded_arr_refs_zarray["order"], shape=tuple(decoded_arr_refs_zarray["shape"]), @@ -93,7 +92,12 @@ def from_kerchunk_refs(cls, decoded_arr_refs_zarray) -> "ZArray": def dict(self) -> dict[str, Any]: zarray_dict = dict(self) + zarray_dict["dtype"] = encode_dtype(zarray_dict["dtype"]) + + if zarray_dict["fill_value"] is np.nan: + zarray_dict["fill_value"] = None + return zarray_dict def to_kerchunk_json(self) -> str: @@ -105,10 +109,10 @@ def replace( compressor: Optional[str] = None, dtype: Optional[np.dtype] = None, fill_value: Optional[float] = None, # float or int? - filters: Optional[List[Dict]] = None, - order: Optional[Union[Literal["C"], Literal["F"]]] = None, - shape: Optional[Tuple[int, ...]] = None, - zarr_format: Optional[Union[Literal[2], Literal[3]]] = None, + filters: Optional[list[dict]] = None, + order: Optional[Literal["C"] | Literal["F"]] = None, + shape: Optional[tuple[int, ...]] = None, + zarr_format: Optional[Literal[2] | Literal[3]] = None, ) -> "ZArray": """ Convenience method to create a new ZArray from an existing one by altering only certain attributes. @@ -206,7 +210,7 @@ def to_zarr_json(var: xr.Variable, array_dir: Path) -> None: metadata_file.write(json_dumps(metadata)) -def zarr_v3_array_metadata(zarray: ZArray, dim_names: List[str], attrs: dict) -> dict: +def zarr_v3_array_metadata(zarray: ZArray, dim_names: list[str], attrs: dict) -> dict: """Construct a v3-compliant metadata dict from v2 zarray + information stored on the xarray variable.""" # TODO it would be nice if we could use the zarr-python metadata.ArrayMetadata classes to do this conversion for us @@ -244,13 +248,13 @@ def zarr_v3_array_metadata(zarray: ZArray, dim_names: List[str], attrs: dict) -> def attrs_from_zarr_group_json(filepath: Path) -> dict: - with open(filepath, "r") as metadata_file: + with open(filepath) as metadata_file: attrs = json.load(metadata_file) return attrs["attributes"] -def metadata_from_zarr_json(filepath: Path) -> Tuple[ZArray, List[str], dict]: - with open(filepath, "r") as metadata_file: +def metadata_from_zarr_json(filepath: Path) -> tuple[ZArray, list[str], dict]: + with open(filepath) as metadata_file: metadata = json.load(metadata_file) if { @@ -268,11 +272,16 @@ def metadata_from_zarr_json(filepath: Path) -> Tuple[ZArray, List[str], dict]: chunk_shape = metadata["chunk_grid"]["configuration"]["chunk_shape"] + if metadata["fill_value"] is None: + fill_value = np.nan + else: + fill_value = metadata["fill_value"] + zarray = ZArray( chunks=metadata["chunk_grid"]["configuration"]["chunk_shape"], compressor=metadata["codecs"], dtype=np.dtype(metadata["data_type"]), - fill_value=metadata["fill_value"], + fill_value=fill_value, filters=metadata.get("filters", None), order="C", shape=chunk_shape,