Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow open_virtual_dataset to read existing Kerchunk references #251

Merged
merged 17 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion ci/doc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,3 @@ dependencies:
- "sphinx_design"
- "sphinx_togglebutton"
- "sphinx-autodoc-typehints"
- -e "..[test]"
14 changes: 14 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,20 @@ def netcdf4_file(tmpdir):
return filepath


@pytest.fixture
def netcdf4_virtual_dataset(netcdf4_file):
from virtualizarr import open_virtual_dataset

return open_virtual_dataset(netcdf4_file, indexes={})


@pytest.fixture
def netcdf4_inlined_ref(netcdf4_file):
from kerchunk.hdf import SingleHdf5ToZarr

return SingleHdf5ToZarr(netcdf4_file, inline_threshold=1000).translate()


@pytest.fixture
def hdf5_groups_file(tmpdir):
# Set up example xarray dataset
Expand Down
5 changes: 5 additions & 0 deletions docs/releases.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ v1.0.1 (unreleased)

New Features
~~~~~~~~~~~~


- Can open `kerchunk` reference files with ``open_virtual_dataset``.
(:pull:`251`, :pull:`186`) By `Raphael Hagen <https://github.com/norlandrhagen>`_ & `Kristen Thyng <https://github.com/kthyng>`_.

- Adds defaults for `open_virtual_dataset_from_v3_store` in (:pull:`234`)
By `Raphael Hagen <https://github.com/norlandrhagen>`_.

Expand Down
13 changes: 13 additions & 0 deletions docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,19 @@ Currently there are not yet any zarr v3 readers which understand the chunk manif
This store can however be read by {py:func}`~virtualizarr.xarray.open_virtual_dataset`, by passing `filetype="zarr_v3"`.
```

## Opening Kerchunk references from disk as virtual datasets

You can open `json` or `paruqet` Kerchunk references from disk as virtual datasets. This may be useful in appending workflows or creating checkpoints for larger datasets.
norlandrhagen marked this conversation as resolved.
Show resolved Hide resolved

```python

vds = open_virtual_dataset('combined.json', format='kerchunk')
# or
vds = open_virtual_dataset('combined.parquet', format='kerchunk')

norlandrhagen marked this conversation as resolved.
Show resolved Hide resolved

```

## Rewriting existing manifests

Sometimes it can be useful to rewrite the contents of an already-generated manifest or virtual dataset.
Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ ignore_missing_imports = true
module = "kerchunk.*"
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = "ujson.*"
ignore_missing_imports = true

[tool.ruff]
# Same as Black.
line-length = 88
Expand Down
47 changes: 41 additions & 6 deletions virtualizarr/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
from xarray.core.variable import IndexVariable

from virtualizarr.manifests import ManifestArray
from virtualizarr.utils import _fsspec_openfile_from_filepath
from virtualizarr.types.kerchunk import KerchunkStoreRefs
from virtualizarr.utils import _FsspecFSFromFilepath

XArrayOpenT = str | os.PathLike[Any] | BufferedIOBase | AbstractDataStore

Expand All @@ -39,6 +40,7 @@ class FileType(AutoName):
zarr = auto()
dmrpp = auto()
zarr_v3 = auto()
kerchunk = auto()


class ManifestBackendArray(ManifestArray, BackendArray):
Expand Down Expand Up @@ -67,13 +69,14 @@ def open_virtual_dataset(

Xarray indexes can optionally be created (the default behaviour). To avoid creating any xarray indexes pass ``indexes={}``.


Parameters
----------
filepath : str, default None
File path to open as a set of virtualized zarr arrays.
filetype : FileType, default None
Type of file to be opened. Used to determine which kerchunk file format backend to use.
Can be one of {'netCDF3', 'netCDF4', 'HDF', 'TIFF', 'GRIB', 'FITS', 'zarr_v3'}.
Can be one of {'netCDF3', 'netCDF4', 'HDF', 'TIFF', 'GRIB', 'FITS', 'zarr_v3', 'kerchunk'}.
If not provided will attempt to automatically infer the correct filetype from header bytes.
group : str, default is None
Path to the HDF5/netCDF4 group in the given file to open. Given as a str, supported by filetypes “netcdf4” and “hdf5”.
Expand Down Expand Up @@ -133,9 +136,41 @@ def open_virtual_dataset(
raise NotImplementedError()

# if filetype is user defined, convert to FileType

if filetype is not None:
filetype = FileType(filetype)

if filetype == FileType.kerchunk:
# add parquet path
from fsspec.implementations.reference import LazyReferenceMapper

from virtualizarr.readers.kerchunk import dataset_from_kerchunk_refs
norlandrhagen marked this conversation as resolved.
Show resolved Hide resolved

fs = _FsspecFSFromFilepath(filepath=filepath, reader_options=reader_options)

# yuck, but JSON doesn't have magic bytes
if fs.filepath.endswith(".json"):
norlandrhagen marked this conversation as resolved.
Show resolved Hide resolved
import ujson

from virtualizarr.readers.kerchunk import dataset_from_kerchunk_refs

fpath = fs.open_file()

refs = ujson.load(fpath)
return dataset_from_kerchunk_refs(KerchunkStoreRefs(refs))

# also yuck, but the .parquet isn't actually a parquet it's a directory that contains named parquets for each group.
if fs.filepath.endswith("ref.parquet"):
norlandrhagen marked this conversation as resolved.
Show resolved Hide resolved
lrm = LazyReferenceMapper(filepath, fs.fs)

# build reference dict from KV pairs in LazyReferenceMapper
# is there a better / more preformant way to extract this?
array_refs = {k: lrm[k] for k in lrm.keys()}

full_reference = {"refs": array_refs}

return dataset_from_kerchunk_refs(KerchunkStoreRefs(full_reference))

norlandrhagen marked this conversation as resolved.
Show resolved Hide resolved
if filetype == FileType.zarr_v3:
# TODO is there a neat way of auto-detecting this?
from virtualizarr.readers.zarr import open_virtual_dataset_from_v3_store
Expand All @@ -151,9 +186,9 @@ def open_virtual_dataset(
"Specifying `loadable_variables` or auto-creating indexes with `indexes=None` is not supported for dmrpp files."
)

fpath = _fsspec_openfile_from_filepath(
fpath = _FsspecFSFromFilepath(
filepath=filepath, reader_options=reader_options
)
).open_file()
parser = DMRParser(fpath.read(), data_filepath=filepath.strip(".dmrpp"))
vds = parser.parse_dataset()
vds.drop_vars(drop_variables)
Expand Down Expand Up @@ -189,9 +224,9 @@ def open_virtual_dataset(
# TODO we are reading a bunch of stuff we know we won't need here, e.g. all of the data variables...
# TODO it would also be nice if we could somehow consolidate this with the reading of the kerchunk references
# TODO really we probably want a dedicated xarray backend that iterates over all variables only once
fpath = _fsspec_openfile_from_filepath(
fpath = _FsspecFSFromFilepath(
filepath=filepath, reader_options=reader_options
)
).open_file()

# fpath can be `Any` thanks to fsspec.filesystem(...).open() returning Any.
# We'll (hopefully safely) cast it to what xarray is expecting, but this might let errors through.
Expand Down
4 changes: 3 additions & 1 deletion virtualizarr/manifests/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,9 @@ def _from_kerchunk_chunk_dict(
chunk_entries: dict[ChunkKey, ChunkDictEntry] = {}
for k, v in kerchunk_chunk_dict.items():
if isinstance(v, (str, bytes)):
raise NotImplementedError("TODO: handle inlined data")
raise NotImplementedError(
"Reading inlined reference data is currently not supported. [ToDo]"
)
elif not isinstance(v, (tuple, list)):
raise TypeError(f"Unexpected type {type(v)} for chunk value: {v}")
chunk_entries[k] = ChunkEntry.from_kerchunk(v).dict()
Expand Down
6 changes: 3 additions & 3 deletions virtualizarr/readers/kerchunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
KerchunkArrRefs,
KerchunkStoreRefs,
)
from virtualizarr.utils import _fsspec_openfile_from_filepath
from virtualizarr.utils import _FsspecFSFromFilepath
from virtualizarr.zarr import ZArray, ZAttrs


Expand All @@ -28,9 +28,9 @@ def _automatically_determine_filetype(
raise NotImplementedError()

# Read magic bytes from local or remote file
fpath = _fsspec_openfile_from_filepath(
fpath = _FsspecFSFromFilepath(
filepath=filepath, reader_options=reader_options
)
).open_file()
magic_bytes = fpath.read(8)
fpath.close()

Expand Down
62 changes: 62 additions & 0 deletions virtualizarr/tests/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,3 +336,65 @@ def test_open_dataset_with_scalar(self, hdf5_scalar, tmpdir):
vds = open_virtual_dataset(hdf5_scalar)
assert vds.scalar.dims == ()
assert vds.scalar.attrs == {"scalar": "true"}


@pytest.mark.parametrize(
"reference_format",
[
"json",
"parquet",
],
)
def test_open_virtual_dataset_existing_kerchunk_refs(
tmp_path, netcdf4_virtual_dataset, reference_format
):
example_reference_dict = netcdf4_virtual_dataset.virtualize.to_kerchunk(
format="dict"
)
if reference_format == "json":
ref_filepath = tmp_path / "ref.json"

import ujson

with open(ref_filepath, "w") as json_file:
ujson.dump(example_reference_dict, json_file)

if reference_format == "parquet":
from kerchunk.df import refs_to_dataframe

ref_filepath = tmp_path / "ref.parquet"
refs_to_dataframe(fo=example_reference_dict, url=ref_filepath.as_posix())
vds = open_virtual_dataset(
filepath=ref_filepath.as_posix(), filetype="kerchunk", indexes={}
)

# Inconsistent results! https://github.com/TomNicholas/VirtualiZarr/pull/73#issuecomment-2040931202
# assert vds.virtualize.to_kerchunk(format='dict') == example_reference_dict

refs = vds.virtualize.to_kerchunk(format="dict")
expected_refs = netcdf4_virtual_dataset.virtualize.to_kerchunk(format="dict")
assert refs["refs"]["air/0.0.0"] == expected_refs["refs"]["air/0.0.0"]
assert refs["refs"]["lon/0"] == expected_refs["refs"]["lon/0"]
assert refs["refs"]["lat/0"] == expected_refs["refs"]["lat/0"]
assert refs["refs"]["time/0"] == expected_refs["refs"]["time/0"]

assert list(vds) == list(netcdf4_virtual_dataset)
assert set(vds.coords) == set(netcdf4_virtual_dataset.coords)
assert set(vds.variables) == set(netcdf4_virtual_dataset.variables)


def test_notimplemented_read_inline_refs(tmp_path, netcdf4_inlined_ref):
# For now, we raise a NotImplementedError if we read existing references that have inlined data
# https://github.com/zarr-developers/VirtualiZarr/pull/251#pullrequestreview-2361916932

ref_filepath = tmp_path / "ref.json"

import ujson

with open(ref_filepath, "w") as json_file:
ujson.dump(netcdf4_inlined_ref, json_file)

with pytest.raises(NotImplementedError):
open_virtual_dataset(
filepath=ref_filepath.as_posix(), filetype="kerchunk", indexes={}
)
6 changes: 3 additions & 3 deletions virtualizarr/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pytest
import xarray as xr

from virtualizarr.utils import _fsspec_openfile_from_filepath
from virtualizarr.utils import _FsspecFSFromFilepath


@pytest.fixture
Expand All @@ -21,7 +21,7 @@ def test_fsspec_openfile_from_path(tmp_path: pathlib.Path, dataset: xr.Dataset)
f = tmp_path / "dataset.nc"
dataset.to_netcdf(f)

result = _fsspec_openfile_from_filepath(filepath=f.as_posix())
result = _FsspecFSFromFilepath(filepath=f.as_posix()).open_file()
assert isinstance(result, fsspec.implementations.local.LocalFileOpener)


Expand All @@ -32,6 +32,6 @@ def test_fsspec_openfile_memory(dataset: xr.Dataset):
with fs.open("dataset.nc", mode="wb") as f:
dataset.to_netcdf(f, engine="h5netcdf")

result = _fsspec_openfile_from_filepath(filepath="memory://dataset.nc")
result = _FsspecFSFromFilepath(filepath="memory://dataset.nc").open_file()
with result:
assert isinstance(result, fsspec.implementations.memory.MemoryFile)
55 changes: 30 additions & 25 deletions virtualizarr/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,42 +13,47 @@
]


def _fsspec_openfile_from_filepath(
*,
filepath: str,
reader_options: Optional[dict] = None,
) -> OpenFileType:
"""Converts input filepath to fsspec openfile object.
from dataclasses import dataclass, field


@dataclass
class _FsspecFSFromFilepath:
"""Class to create fsspec Filesystem from input filepath.

Parameters
----------
filepath : str
Input filepath
reader_options : dict, optional
Dict containing kwargs to pass to file opener, by default {}
dict containing kwargs to pass to file opener, by default {}
fs : Option | None
The fsspec filesystem object, created in __post_init__

Returns
-------
OpenFileType
An open file-like object, specific to the protocol supplied in filepath.

Raises
------
NotImplementedError
Raises a Not Implemented Error if filepath protocol is not supported.
"""

import fsspec
from upath import UPath
filepath: str
reader_options: Optional[dict] = field(default_factory=dict)
fs: fsspec.AbstractFileSystem = field(init=False)

def open_file(self) -> OpenFileType:
"""Calls `.open` on fsspec.Filesystem instantiation using self.filepath as an input.

universal_filepath = UPath(filepath)
protocol = universal_filepath.protocol
Returns
-------
OpenFileType
file opened with fsspec
"""
return self.fs.open(self.filepath)

if reader_options is None:
reader_options = {}
def __post_init__(self) -> None:
"""Initialize the fsspec filesystem object"""
import fsspec
from upath import UPath

storage_options = reader_options.get("storage_options", {}) # type: ignore
universal_filepath = UPath(self.filepath)
protocol = universal_filepath.protocol

fpath = fsspec.filesystem(protocol, **storage_options).open(filepath)
self.reader_options = self.reader_options or {}
storage_options = self.reader_options.get("storage_options", {}) # type: ignore

return fpath
self.fs = fsspec.filesystem(protocol, **storage_options)
Loading