From 26f27233a6739b17e4d3094fa5bff3ae90445c14 Mon Sep 17 00:00:00 2001 From: Julia Signell Date: Mon, 13 May 2024 14:40:20 -0400 Subject: [PATCH 1/5] Add writing refs to parquet --- pyproject.toml | 2 +- virtualizarr/tests/test_kerchunk.py | 42 +++++++++++++++++++++++++++++ virtualizarr/xarray.py | 35 +++++++++++++++++++++--- 3 files changed, 74 insertions(+), 5 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 57aa46f4..859eafef 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,7 +41,7 @@ test = [ "scipy", "pooch", "ruff", - + "fastparquet", ] diff --git a/virtualizarr/tests/test_kerchunk.py b/virtualizarr/tests/test_kerchunk.py index 7a10f465..c6f7c6ee 100644 --- a/virtualizarr/tests/test_kerchunk.py +++ b/virtualizarr/tests/test_kerchunk.py @@ -1,4 +1,5 @@ import numpy as np +import pandas as pd import pytest import ujson # type: ignore import xarray as xr @@ -127,6 +128,47 @@ def test_accessor_to_kerchunk_json(self, tmp_path): } assert loaded_refs == expected_ds_refs + def test_accessor_to_kerchunk_parquet(self, tmp_path): + chunks_dict = { + "0.0": {"path": "foo.nc", "offset": 100, "length": 100}, + "0.1": {"path": "foo.nc", "offset": 200, "length": 100}, + } + manifest = ChunkManifest(entries=chunks_dict) + arr = ManifestArray( + chunkmanifest=manifest, + zarray=dict( + shape=(2, 4), + dtype=np.dtype(" KerchunkStoreRefs: ... @overload - def to_kerchunk(self, filepath: str, format: Literal["json"]) -> None: ... + def to_kerchunk(self, filepath: str | Path, format: Literal["json"]) -> None: ... @overload - def to_kerchunk(self, filepath: str, format: Literal["parquet"]) -> None: ... + def to_kerchunk( + self, + filepath: str | Path, + format: Literal["parquet"], + record_size: int = 100_000, + categorical_threshold: int = 10, + ) -> None: ... def to_kerchunk( self, - filepath: Optional[str] = None, + filepath: str | Path | None = None, format: Union[Literal["dict"], Literal["json"], Literal["parquet"]] = "dict", + record_size: int = 100_000, + categorical_threshold: int = 10, ) -> Union[KerchunkStoreRefs, None]: """ Serialize all virtualized arrays in this xarray dataset into the kerchunk references format. @@ -379,6 +387,13 @@ def to_kerchunk( format : 'dict', 'json', or 'parquet' Format to serialize the kerchunk references as. If 'json' or 'parquet' then the 'filepath' argument is required. + record_size (parquet only): int + Number of references to store in each reference file (default 100,000). Bigger values + mean fewer read requests but larger memory footprint. + categorical_threshold (parquet only) : int + Encode urls as pandas.Categorical to reduce memory footprint if the ratio + of the number of unique urls to total number of refs for each variable + is greater than or equal to this number. (default 10) References ---------- @@ -397,6 +412,18 @@ def to_kerchunk( return None elif format == "parquet": - raise NotImplementedError() + from kerchunk.df import refs_to_dataframe + + if isinstance(filepath, Path): + url = str(filepath) + elif isinstance(filepath, str): + url = filepath + + return refs_to_dataframe( + refs, + url=url, + record_size=record_size, + categorical_threshold=categorical_threshold, + ) else: raise ValueError(f"Unrecognized output format: {format}") From e371fb08622821dae4eda67bb94d1f2ba4e0cf1e Mon Sep 17 00:00:00 2001 From: Julia Signell Date: Mon, 13 May 2024 15:25:41 -0400 Subject: [PATCH 2/5] Add comment and fix type unions for Python 3.9 --- virtualizarr/xarray.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/virtualizarr/xarray.py b/virtualizarr/xarray.py index 125b9608..ae853b33 100644 --- a/virtualizarr/xarray.py +++ b/virtualizarr/xarray.py @@ -359,12 +359,14 @@ def to_kerchunk( ) -> KerchunkStoreRefs: ... @overload - def to_kerchunk(self, filepath: str | Path, format: Literal["json"]) -> None: ... + def to_kerchunk( + self, filepath: Union[str, Path], format: Literal["json"] + ) -> None: ... @overload def to_kerchunk( self, - filepath: str | Path, + filepath: Union[str, Path], format: Literal["parquet"], record_size: int = 100_000, categorical_threshold: int = 10, @@ -372,7 +374,7 @@ def to_kerchunk( def to_kerchunk( self, - filepath: str | Path | None = None, + filepath: Optional[Union[str, Path]] = None, format: Union[Literal["dict"], Literal["json"], Literal["parquet"]] = "dict", record_size: int = 100_000, categorical_threshold: int = 10, @@ -419,11 +421,14 @@ def to_kerchunk( elif isinstance(filepath, str): url = filepath - return refs_to_dataframe( + # refs_to_dataframe is responsible for writing to parquet. + # at no point does it create a full in-memory dataframe. + refs_to_dataframe( refs, url=url, record_size=record_size, categorical_threshold=categorical_threshold, ) + return None else: raise ValueError(f"Unrecognized output format: {format}") From abfed2fb62da8aa5e90160617f2e89373fbd3e69 Mon Sep 17 00:00:00 2001 From: Julia Signell Date: Tue, 14 May 2024 16:49:41 -0400 Subject: [PATCH 3/5] Add docs --- docs/usage.md | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/docs/usage.md b/docs/usage.md index 4fc5411a..02acbf7e 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -311,27 +311,36 @@ Once we've combined references to all the chunks of all our legacy files into on The [kerchunk library](https://github.com/fsspec/kerchunk) has its own [specification](https://fsspec.github.io/kerchunk/spec.html) for how byte range references should be serialized (either as a JSON or parquet file). -To write out all the references in the virtual dataset as a single kerchunk-compliant JSON file, you can use the {py:meth}`ds.virtualize.to_kerchunk ` accessor method. +To write out all the references in the virtual dataset as a single kerchunk-compliant JSON or parquet file, you can use the {py:meth}`ds.virtualize.to_kerchunk ` accessor method. ```python combined_vds.virtualize.to_kerchunk('combined.json', format='json') ``` -These references can now be interpreted like they were a Zarr store by [fsspec](https://github.com/fsspec/filesystem_spec), using kerchunk's built-in xarray backend (so you need kerchunk to be installed to use `engine='kerchunk'`). +These references can now be interpreted like they were a Zarr store by [fsspec](https://github.com/fsspec/filesystem_spec), using kerchunk's built-in xarray backend (kerchunk must be installed to use `engine='kerchunk'`). ```python -import fsspec - -fs = fsspec.filesystem("reference", fo=f"combined.json") -mapper = fs.get_mapper("") - -combined_ds = xr.open_dataset(mapper, engine="kerchunk") +combined_ds = xr.open_dataset('combined.json', engine="kerchunk") ``` ```{note} Currently you can only serialize virtual variables backed by `ManifestArray` objects to kerchunk reference files, not real in-memory numpy-backed variables. ``` +When you have many chunks, the reference file can get large enough to be unwieldy as json. In that case the references can be instead stored as parquet. Again this uses kerchunk internally. + +```python +combined_vds.virtualize.to_kerchunk('combined.parq', format='parquet') +``` + +And again we can read these references using the "kerchunk" backend as if it were a regular Zarr store + +```python +combined_ds = xr.open_dataset('combined.parq', engine="kerchunk") +``` + +By default references are placed in separate parquet file when the total number of references exceeds `record_size`. If there are fewer than `categorical_threshold` unique urls referenced by a particular variable, url will be stored as a categorical variable. + ### Writing as Zarr Alternatively, we can write these references out as an actual Zarr store, at least one that is compliant with the [proposed "Chunk Manifest" ZEP](https://github.com/zarr-developers/zarr-specs/issues/287). To do this we simply use the {py:meth}`ds.virtualize.to_zarr ` accessor method. From 5b8b9a6aa7ee616c9e8a83dbec0968e71d0c729b Mon Sep 17 00:00:00 2001 From: Julia Signell Date: Tue, 14 May 2024 17:14:54 -0400 Subject: [PATCH 4/5] Add roundtrip test --- virtualizarr/tests/test_integration.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/virtualizarr/tests/test_integration.py b/virtualizarr/tests/test_integration.py index 42e2467a..2ada793c 100644 --- a/virtualizarr/tests/test_integration.py +++ b/virtualizarr/tests/test_integration.py @@ -1,4 +1,6 @@ +import pytest import xarray as xr +import xarray.testing as xrt from virtualizarr import open_virtual_dataset @@ -11,3 +13,24 @@ def test_open_scalar_variable(tmpdir): vds = open_virtual_dataset(f"{tmpdir}/scalar.nc") assert vds["a"].shape == () + + +@pytest.mark.parametrize("format", ["json", "parquet"]) +def test_kerchunk_roundtrip(tmpdir, format): + # set up example xarray dataset + ds = xr.tutorial.open_dataset("air_temperature", decode_times=False) + + # save it to disk as netCDF (in temporary directory) + ds.to_netcdf(f"{tmpdir}/air.nc") + + # use open_virtual_dataset to read it as references + vds = open_virtual_dataset(f"{tmpdir}/air.nc") + + # write those references to disk as kerchunk json + vds.virtualize.to_kerchunk(f"{tmpdir}/refs.{format}", format=format) + + # read the dataset from disk via the zarr store + roundtrip = xr.open_dataset(f"{tmpdir}/refs.{format}", engine="kerchunk") + + # assert equal to original dataset + xrt.assert_equal(roundtrip, ds) From 6fea195b04dc4914b86aac3400da03b96c93e261 Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Wed, 15 May 2024 08:42:40 -0600 Subject: [PATCH 5/5] Avoid creating indexes --- virtualizarr/tests/test_integration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/virtualizarr/tests/test_integration.py b/virtualizarr/tests/test_integration.py index 2ada793c..578bfab1 100644 --- a/virtualizarr/tests/test_integration.py +++ b/virtualizarr/tests/test_integration.py @@ -24,7 +24,7 @@ def test_kerchunk_roundtrip(tmpdir, format): ds.to_netcdf(f"{tmpdir}/air.nc") # use open_virtual_dataset to read it as references - vds = open_virtual_dataset(f"{tmpdir}/air.nc") + vds = open_virtual_dataset(f"{tmpdir}/air.nc", indexes={}) # write those references to disk as kerchunk json vds.virtualize.to_kerchunk(f"{tmpdir}/refs.{format}", format=format)