Skip to content

Commit

Permalink
Write to parquet (#110)
Browse files Browse the repository at this point in the history
* Add writing refs to parquet

* Add comment and fix type unions for Python 3.9

* Add docs

* Add roundtrip test

* Avoid creating indexes

---------

Co-authored-by: Tom Nicholas <[email protected]>
  • Loading branch information
jsignell and TomNicholas authored May 15, 2024
1 parent 8923b8c commit f9ca667
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 13 deletions.
25 changes: 17 additions & 8 deletions docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -321,27 +321,36 @@ Once we've combined references to all the chunks of all our legacy files into on

The [kerchunk library](https://github.com/fsspec/kerchunk) has its own [specification](https://fsspec.github.io/kerchunk/spec.html) for how byte range references should be serialized (either as a JSON or parquet file).

To write out all the references in the virtual dataset as a single kerchunk-compliant JSON file, you can use the {py:meth}`ds.virtualize.to_kerchunk <virtualizarr.xarray.VirtualiZarrDatasetAccessor.to_kerchunk>` accessor method.
To write out all the references in the virtual dataset as a single kerchunk-compliant JSON or parquet file, you can use the {py:meth}`ds.virtualize.to_kerchunk <virtualizarr.xarray.VirtualiZarrDatasetAccessor.to_kerchunk>` accessor method.

```python
combined_vds.virtualize.to_kerchunk('combined.json', format='json')
```

These references can now be interpreted like they were a Zarr store by [fsspec](https://github.com/fsspec/filesystem_spec), using kerchunk's built-in xarray backend (so you need kerchunk to be installed to use `engine='kerchunk'`).
These references can now be interpreted like they were a Zarr store by [fsspec](https://github.com/fsspec/filesystem_spec), using kerchunk's built-in xarray backend (kerchunk must be installed to use `engine='kerchunk'`).

```python
import fsspec

fs = fsspec.filesystem("reference", fo=f"combined.json")
mapper = fs.get_mapper("")

combined_ds = xr.open_dataset(mapper, engine="kerchunk")
combined_ds = xr.open_dataset('combined.json', engine="kerchunk")
```

```{note}
Currently you can only serialize virtual variables backed by `ManifestArray` objects to kerchunk reference files, not real in-memory numpy-backed variables.
```

When you have many chunks, the reference file can get large enough to be unwieldy as json. In that case the references can be instead stored as parquet. Again this uses kerchunk internally.

```python
combined_vds.virtualize.to_kerchunk('combined.parq', format='parquet')
```

And again we can read these references using the "kerchunk" backend as if it were a regular Zarr store

```python
combined_ds = xr.open_dataset('combined.parq', engine="kerchunk")
```

By default references are placed in separate parquet file when the total number of references exceeds `record_size`. If there are fewer than `categorical_threshold` unique urls referenced by a particular variable, url will be stored as a categorical variable.

### Writing as Zarr

Alternatively, we can write these references out as an actual Zarr store, at least one that is compliant with the [proposed "Chunk Manifest" ZEP](https://github.com/zarr-developers/zarr-specs/issues/287). To do this we simply use the {py:meth}`ds.virtualize.to_zarr <virtualizarr.xarray.VirtualiZarrDatasetAccessor.to_zarr>` accessor method.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ test = [
"scipy",
"pooch",
"ruff",
"fastparquet",
"s3fs"

]


Expand Down
23 changes: 23 additions & 0 deletions virtualizarr/tests/test_integration.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import pytest
import xarray as xr
import xarray.testing as xrt

from virtualizarr import open_virtual_dataset

Expand All @@ -11,3 +13,24 @@ def test_open_scalar_variable(tmpdir):

vds = open_virtual_dataset(f"{tmpdir}/scalar.nc")
assert vds["a"].shape == ()


@pytest.mark.parametrize("format", ["json", "parquet"])
def test_kerchunk_roundtrip(tmpdir, format):
# set up example xarray dataset
ds = xr.tutorial.open_dataset("air_temperature", decode_times=False)

# save it to disk as netCDF (in temporary directory)
ds.to_netcdf(f"{tmpdir}/air.nc")

# use open_virtual_dataset to read it as references
vds = open_virtual_dataset(f"{tmpdir}/air.nc", indexes={})

# write those references to disk as kerchunk json
vds.virtualize.to_kerchunk(f"{tmpdir}/refs.{format}", format=format)

# read the dataset from disk via the zarr store
roundtrip = xr.open_dataset(f"{tmpdir}/refs.{format}", engine="kerchunk")

# assert equal to original dataset
xrt.assert_equal(roundtrip, ds)
42 changes: 42 additions & 0 deletions virtualizarr/tests/test_kerchunk.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import numpy as np
import pandas as pd
import pytest
import ujson # type: ignore
import xarray as xr
Expand Down Expand Up @@ -127,6 +128,47 @@ def test_accessor_to_kerchunk_json(self, tmp_path):
}
assert loaded_refs == expected_ds_refs

def test_accessor_to_kerchunk_parquet(self, tmp_path):
chunks_dict = {
"0.0": {"path": "foo.nc", "offset": 100, "length": 100},
"0.1": {"path": "foo.nc", "offset": 200, "length": 100},
}
manifest = ChunkManifest(entries=chunks_dict)
arr = ManifestArray(
chunkmanifest=manifest,
zarray=dict(
shape=(2, 4),
dtype=np.dtype("<i8"),
chunks=(2, 2),
compressor=None,
filters=None,
fill_value=None,
order="C",
),
)
ds = xr.Dataset({"a": (["x", "y"], arr)})

filepath = tmp_path / "refs"

ds.virtualize.to_kerchunk(filepath, format="parquet", record_size=2)

with open(tmp_path / "refs" / ".zmetadata") as f:
meta = ujson.load(f)
assert list(meta) == ["metadata", "record_size"]
assert meta["record_size"] == 2

df0 = pd.read_parquet(filepath / "a" / "refs.0.parq")

assert df0.to_dict() == {
"offset": {0: 100, 1: 200},
"path": {
0: "foo.nc",
1: "foo.nc",
},
"size": {0: 100, 1: 100},
"raw": {0: None, 1: None},
}


def test_kerchunk_roundtrip_in_memory_no_concat():
# Set up example xarray dataset
Expand Down
38 changes: 34 additions & 4 deletions virtualizarr/xarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,15 +366,23 @@ def to_kerchunk(
) -> KerchunkStoreRefs: ...

@overload
def to_kerchunk(self, filepath: str, format: Literal["json"]) -> None: ...
def to_kerchunk(self, filepath: str | Path, format: Literal["json"]) -> None: ...

@overload
def to_kerchunk(self, filepath: str, format: Literal["parquet"]) -> None: ...
def to_kerchunk(
self,
filepath: str | Path,
format: Literal["parquet"],
record_size: int = 100_000,
categorical_threshold: int = 10,
) -> None: ...

def to_kerchunk(
self,
filepath: str | None = None,
filepath: str | Path | None = None,
format: Literal["dict", "json", "parquet"] = "dict",
record_size: int = 100_000,
categorical_threshold: int = 10,
) -> KerchunkStoreRefs | None:
"""
Serialize all virtualized arrays in this xarray dataset into the kerchunk references format.
Expand All @@ -386,6 +394,13 @@ def to_kerchunk(
format : 'dict', 'json', or 'parquet'
Format to serialize the kerchunk references as.
If 'json' or 'parquet' then the 'filepath' argument is required.
record_size (parquet only): int
Number of references to store in each reference file (default 100,000). Bigger values
mean fewer read requests but larger memory footprint.
categorical_threshold (parquet only) : int
Encode urls as pandas.Categorical to reduce memory footprint if the ratio
of the number of unique urls to total number of refs for each variable
is greater than or equal to this number. (default 10)
References
----------
Expand All @@ -404,6 +419,21 @@ def to_kerchunk(

return None
elif format == "parquet":
raise NotImplementedError()
from kerchunk.df import refs_to_dataframe

if isinstance(filepath, Path):
url = str(filepath)
elif isinstance(filepath, str):
url = filepath

# refs_to_dataframe is responsible for writing to parquet.
# at no point does it create a full in-memory dataframe.
refs_to_dataframe(
refs,
url=url,
record_size=record_size,
categorical_threshold=categorical_threshold,
)
return None
else:
raise ValueError(f"Unrecognized output format: {format}")

0 comments on commit f9ca667

Please sign in to comment.