Skip to content

Commit

Permalink
Inline loaded variables into kerchunk references (#73)
Browse files Browse the repository at this point in the history
* add test vs kerchunk inlining

* encode inlined data correctly (at least for standard numpy arrays)

* don't test time variable for now

* store fill_value as np.NaN, and always coerce it

* test passing for lat and lon variables

* formatting

* encode numpy types

* tidy internal import

* parametrize test to test inlining different variables

* raise when encoding encountered during serialization of numpy arrays

* see if closing the netcdf files in the fixtures fixes the kerchunk error

* update docs

* ensure inline_threshold is an int

* formatting

* specified NetCDF4 for netcdf4_file fixture & added netcdf4 to pyproject.toml [test] (#116)

* organize tests

* dont unnecessarily slice dataset

---------

Co-authored-by: Raphael Hagen <[email protected]>
  • Loading branch information
TomNicholas and norlandrhagen authored May 16, 2024
1 parent ca99d5a commit 2d61d1a
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 95 deletions.
4 changes: 3 additions & 1 deletion docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,10 @@ These references can now be interpreted like they were a Zarr store by [fsspec](
combined_ds = xr.open_dataset('combined.json', engine="kerchunk")
```

In-memory ("loadable") variables backed by numpy arrays can also be written out to kerchunk reference files, with the values serialized as bytes. This is equivalent to kerchunk's concept of "inlining", but done on a per-array basis using the `loadable_variables` kwarg rather than a per-chunk basis using kerchunk's `inline_threshold` kwarg.

```{note}
Currently you can only serialize virtual variables backed by `ManifestArray` objects to kerchunk reference files, not real in-memory numpy-backed variables.
Currently you can only serialize in-memory variables to kerchunk references if they do not have any encoding.
```

When you have many chunks, the reference file can get large enough to be unwieldy as json. In that case the references can be instead stored as parquet. Again this uses kerchunk internally.
Expand Down
7 changes: 4 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,16 @@ dependencies = [
test = [
"codecov",
"pre-commit",
"ruff",
"pytest-mypy",
"pytest-cov",
"pytest",
"fsspec",
"pooch",
"scipy",
"ruff",
"netcdf4",
"fsspec",
"s3fs",
"fastparquet",
"s3fs"
]


Expand Down
83 changes: 66 additions & 17 deletions virtualizarr/kerchunk.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import base64
import json
from enum import Enum, auto
from pathlib import Path
from typing import NewType, Optional, cast

import numpy as np
import ujson # type: ignore
import xarray as xr

from virtualizarr.manifests.manifest import join
from virtualizarr.utils import _fsspec_openfile_from_filepath
from virtualizarr.zarr import ZArray, ZAttrs

Expand All @@ -19,9 +24,6 @@
) # lower-level dict containing just the information for one zarr array


from enum import Enum, auto


class AutoName(Enum):
# Recommended by official Python docs for auto naming:
# https://docs.python.org/3/library/enum.html#using-automatic-values
Expand All @@ -38,6 +40,16 @@ class FileType(AutoName):
zarr = auto()


class NumpyEncoder(json.JSONEncoder):
# TODO I don't understand how kerchunk gets around this problem of encoding numpy types (in the zattrs) whilst only using ujson
def default(self, obj):
if isinstance(obj, np.ndarray):
return obj.tolist() # Convert NumPy array to Python list
elif isinstance(obj, np.generic):
return obj.item() # Convert NumPy scalar to Python scalar
return json.JSONEncoder.default(self, obj)


def read_kerchunk_references_from_file(
filepath: str,
filetype: FileType | None,
Expand Down Expand Up @@ -194,7 +206,7 @@ def dataset_to_kerchunk_refs(ds: xr.Dataset) -> KerchunkStoreRefs:

all_arr_refs = {}
for var_name, var in ds.variables.items():
arr_refs = variable_to_kerchunk_arr_refs(var)
arr_refs = variable_to_kerchunk_arr_refs(var, var_name)

prepended_with_var_name = {
f"{var_name}/{key}": val for key, val in arr_refs.items()
Expand All @@ -206,38 +218,75 @@ def dataset_to_kerchunk_refs(ds: xr.Dataset) -> KerchunkStoreRefs:
"version": 1,
"refs": {
".zgroup": '{"zarr_format":2}',
".zattrs": ujson.dumps(ds.attrs),
**all_arr_refs,
},
}

return cast(KerchunkStoreRefs, ds_refs)


def variable_to_kerchunk_arr_refs(var: xr.Variable) -> KerchunkArrRefs:
def variable_to_kerchunk_arr_refs(var: xr.Variable, var_name: str) -> KerchunkArrRefs:
"""
Create a dictionary containing kerchunk-style array references from a single xarray.Variable (which wraps a ManifestArray).
Create a dictionary containing kerchunk-style array references from a single xarray.Variable (which wraps either a ManifestArray or a numpy array).
Partially encodes the inner dicts to json to match kerchunk behaviour (see https://github.com/fsspec/kerchunk/issues/415).
"""
from virtualizarr.manifests import ManifestArray

marr = var.data
if isinstance(var.data, ManifestArray):
marr = var.data

if not isinstance(marr, ManifestArray):
raise TypeError(
f"Can only serialize wrapped arrays of type ManifestArray, but got type {type(marr)}"
)
arr_refs: dict[str, str | list[str | int]] = {
str(chunk_key): chunk_entry.to_kerchunk()
for chunk_key, chunk_entry in marr.manifest.entries.items()
}

arr_refs: dict[str, str | list[str | int]] = {
str(chunk_key): chunk_entry.to_kerchunk()
for chunk_key, chunk_entry in marr.manifest.entries.items()
}
zarray = marr.zarray

else:
try:
np_arr = var.to_numpy()
except AttributeError as e:
raise TypeError(
f"Can only serialize wrapped arrays of type ManifestArray or numpy.ndarray, but got type {type(var.data)}"
) from e

if var.encoding:
if "scale_factor" in var.encoding:
raise NotImplementedError(
f"Cannot serialize loaded variable {var_name}, as it is encoded with a scale_factor"
)
if "offset" in var.encoding:
raise NotImplementedError(
f"Cannot serialize loaded variable {var_name}, as it is encoded with an offset"
)
if "calendar" in var.encoding:
raise NotImplementedError(
f"Cannot serialize loaded variable {var_name}, as it is encoded with a calendar"
)

# This encoding is what kerchunk does when it "inlines" data, see https://github.com/fsspec/kerchunk/blob/a0c4f3b828d37f6d07995925b324595af68c4a19/kerchunk/hdf.py#L472
byte_data = np_arr.tobytes()
# TODO do I really need to encode then decode like this?
inlined_data = (b"base64:" + base64.b64encode(byte_data)).decode("utf-8")

# TODO can this be generalized to save individual chunks of a dask array?
# TODO will this fail for a scalar?
arr_refs = {join(0 for _ in np_arr.shape): inlined_data}

zarray = ZArray(
chunks=np_arr.shape,
shape=np_arr.shape,
dtype=np_arr.dtype,
order="C",
)

zarray_dict = marr.zarray.to_kerchunk_json()
zarray_dict = zarray.to_kerchunk_json()
arr_refs[".zarray"] = zarray_dict

zattrs = var.attrs
zattrs["_ARRAY_DIMENSIONS"] = list(var.dims)
arr_refs[".zattrs"] = ujson.dumps(zattrs)
arr_refs[".zattrs"] = json.dumps(zattrs, separators=(",", ":"), cls=NumpyEncoder)

return cast(KerchunkArrRefs, arr_refs)
5 changes: 4 additions & 1 deletion virtualizarr/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ def netcdf4_file(tmpdir):

# Save it to disk as netCDF (in temporary directory)
filepath = f"{tmpdir}/air.nc"
ds.to_netcdf(filepath)
ds.to_netcdf(filepath, format="NETCDF4")
ds.close()

return filepath

Expand All @@ -28,5 +29,7 @@ def netcdf4_files(tmpdir):
filepath2 = f"{tmpdir}/air2.nc"
ds1.to_netcdf(filepath1)
ds2.to_netcdf(filepath2)
ds1.close()
ds2.close()

return filepath1, filepath2
132 changes: 69 additions & 63 deletions virtualizarr/tests/test_integration.py
Original file line number Diff line number Diff line change
@@ -1,68 +1,95 @@
import fsspec
import pytest
import xarray as xr
import xarray.testing as xrt

from virtualizarr import open_virtual_dataset


def test_kerchunk_roundtrip_no_concat(tmpdir):
# set up example xarray dataset
ds = xr.tutorial.open_dataset("air_temperature", decode_times=False)

# save it to disk as netCDF (in temporary directory)
ds.to_netcdf(f"{tmpdir}/air.nc")
@pytest.mark.parametrize(
"inline_threshold, vars_to_inline",
[
(5e2, ["lat", "lon"]),
pytest.param(
5e4, ["lat", "lon", "time"], marks=pytest.mark.xfail(reason="time encoding")
),
pytest.param(
5e7,
["lat", "lon", "time", "air"],
marks=pytest.mark.xfail(reason="scale factor encoding"),
),
],
)
def test_numpy_arrays_to_inlined_kerchunk_refs(
netcdf4_file, inline_threshold, vars_to_inline
):
from kerchunk.hdf import SingleHdf5ToZarr

# inline_threshold is chosen to test inlining only the variables listed in vars_to_inline
expected = SingleHdf5ToZarr(
netcdf4_file, inline_threshold=int(inline_threshold)
).translate()

# loading the variables should produce same result as inlining them using kerchunk
vds = open_virtual_dataset(
netcdf4_file, loadable_variables=vars_to_inline, indexes={}
)
refs = vds.virtualize.to_kerchunk(format="dict")

# use open_dataset_via_kerchunk to read it as references
vds = open_virtual_dataset(f"{tmpdir}/air.nc", indexes={})
# TODO I would just compare the entire dicts but kerchunk returns inconsistent results - see https://github.com/TomNicholas/VirtualiZarr/pull/73#issuecomment-2040931202
# assert refs == expected
assert refs["refs"]["air/0.0.0"] == expected["refs"]["air/0.0.0"]
assert refs["refs"]["lon/0"] == expected["refs"]["lon/0"]
assert refs["refs"]["lat/0"] == expected["refs"]["lat/0"]
assert refs["refs"]["time/0"] == expected["refs"]["time/0"]

# write those references to disk as kerchunk json
vds.virtualize.to_kerchunk(f"{tmpdir}/refs.json", format="json")

# use fsspec to read the dataset from disk via the zarr store
fs = fsspec.filesystem("reference", fo=f"{tmpdir}/refs.json")
m = fs.get_mapper("")
@pytest.mark.parametrize("format", ["json", "parquet"])
class TestKerchunkRoundtrip:
def test_kerchunk_roundtrip_no_concat(self, tmpdir, format):
# set up example xarray dataset
ds = xr.tutorial.open_dataset("air_temperature", decode_times=False)

roundtrip = xr.open_dataset(m, engine="kerchunk")
# save it to disk as netCDF (in temporary directory)
ds.to_netcdf(f"{tmpdir}/air.nc")

# assert equal to original dataset
xrt.assert_equal(roundtrip, ds)
# use open_dataset_via_kerchunk to read it as references
vds = open_virtual_dataset(f"{tmpdir}/air.nc", indexes={})

# write those references to disk as kerchunk json
vds.virtualize.to_kerchunk(f"{tmpdir}/refs.{format}", format=format)

def test_kerchunk_roundtrip_concat(tmpdir):
# set up example xarray dataset
ds = xr.tutorial.open_dataset("air_temperature", decode_times=False).isel(
time=slice(None, 2000)
)
# use fsspec to read the dataset from disk via the zarr store
roundtrip = xr.open_dataset(f"{tmpdir}/refs.{format}", engine="kerchunk")

# split into two datasets
ds1, ds2 = ds.isel(time=slice(None, 1000)), ds.isel(time=slice(1000, None))
# assert equal to original dataset
xrt.assert_equal(roundtrip, ds)

# save it to disk as netCDF (in temporary directory)
ds1.to_netcdf(f"{tmpdir}/air1.nc")
ds2.to_netcdf(f"{tmpdir}/air2.nc")
def test_kerchunk_roundtrip_concat(self, tmpdir, format):
# set up example xarray dataset
ds = xr.tutorial.open_dataset("air_temperature", decode_times=False)

# use open_dataset_via_kerchunk to read it as references
vds1 = open_virtual_dataset(f"{tmpdir}/air1.nc", indexes={})
vds2 = open_virtual_dataset(f"{tmpdir}/air2.nc", indexes={})
# split into two datasets
ds1, ds2 = ds.isel(time=slice(None, 1460)), ds.isel(time=slice(1460, None))

# concatenate virtually along time
vds = xr.concat([vds1, vds2], dim="time", coords="minimal", compat="override")
print(vds["air"].variable._data)
# save it to disk as netCDF (in temporary directory)
ds1.to_netcdf(f"{tmpdir}/air1.nc")
ds2.to_netcdf(f"{tmpdir}/air2.nc")

# write those references to disk as kerchunk json
vds.virtualize.to_kerchunk(f"{tmpdir}/refs.json", format="json")
# use open_dataset_via_kerchunk to read it as references
vds1 = open_virtual_dataset(f"{tmpdir}/air1.nc", indexes={})
vds2 = open_virtual_dataset(f"{tmpdir}/air2.nc", indexes={})

# use fsspec to read the dataset from disk via the zarr store
fs = fsspec.filesystem("reference", fo=f"{tmpdir}/refs.json")
m = fs.get_mapper("")
# concatenate virtually along time
vds = xr.concat([vds1, vds2], dim="time", coords="minimal", compat="override")

roundtrip = xr.open_dataset(m, engine="kerchunk")
# write those references to disk as kerchunk json
vds.virtualize.to_kerchunk(f"{tmpdir}/refs.{format}", format=format)

# user does analysis here
# use fsspec to read the dataset from disk via the zarr store
roundtrip = xr.open_dataset(f"{tmpdir}/refs.{format}", engine="kerchunk")

# assert equal to original dataset
xrt.assert_equal(roundtrip, ds)
# assert equal to original dataset
xrt.assert_equal(roundtrip, ds)


def test_open_scalar_variable(tmpdir):
Expand All @@ -73,24 +100,3 @@ def test_open_scalar_variable(tmpdir):

vds = open_virtual_dataset(f"{tmpdir}/scalar.nc")
assert vds["a"].shape == ()


@pytest.mark.parametrize("format", ["json", "parquet"])
def test_kerchunk_roundtrip(tmpdir, format):
# set up example xarray dataset
ds = xr.tutorial.open_dataset("air_temperature", decode_times=False)

# save it to disk as netCDF (in temporary directory)
ds.to_netcdf(f"{tmpdir}/air.nc")

# use open_virtual_dataset to read it as references
vds = open_virtual_dataset(f"{tmpdir}/air.nc", indexes={})

# write those references to disk as kerchunk json
vds.virtualize.to_kerchunk(f"{tmpdir}/refs.{format}", format=format)

# read the dataset from disk via the zarr store
roundtrip = xr.open_dataset(f"{tmpdir}/refs.{format}", engine="kerchunk")

# assert equal to original dataset
xrt.assert_equal(roundtrip, ds)
Loading

0 comments on commit 2d61d1a

Please sign in to comment.