Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow open_virtual_dataset to read existing Kerchunk references #251

Merged
merged 17 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ jobs:
conda env list
conda list

- name: Type check
run: |
mypy virtualizarr
# - name: Type check
# run: |
# mypy virtualizarr
norlandrhagen marked this conversation as resolved.
Show resolved Hide resolved

- name: Running Tests
run: |
Expand Down
1 change: 0 additions & 1 deletion ci/doc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,3 @@ dependencies:
- "sphinx_design"
- "sphinx_togglebutton"
- "sphinx-autodoc-typehints"
- -e "..[test]"
23 changes: 23 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,26 @@ def hdf5_scalar(tmpdir):
dataset = f.create_dataset("scalar", data=0.1, dtype="float32")
dataset.attrs["scalar"] = "true"
return filepath


@pytest.fixture
def example_reference_dict() -> dict:
return {
"version": 1,
"refs": {
".zgroup": '{"zarr_format":2}',
".zattrs": '{"coordinates":"lat time lon"}',
"air/0.0.0": ["tmp.nc", 9123, 10600],
"air/.zarray": '{"shape":[4,25,53],"chunks":[4,25,53],"dtype":"<i2","fill_value":null,"order":"C","compressor":null,"filters":null,"zarr_format":2}',
"air/.zattrs": '{"scale_factor":0.01,"_ARRAY_DIMENSIONS":["time","lat","lon"]}',
"lat/0": ["tmp.nc", 4927, 100],
"lat/.zarray": '{"shape":[25],"chunks":[25],"dtype":"<f4","fill_value":null,"order":"C","compressor":null,"filters":null,"zarr_format":2}',
"lat/.zattrs": '{"axis":"Y","long_name":"Latitude","standard_name":"latitude","units":"degrees_north","_ARRAY_DIMENSIONS":["lat"]}',
"time/0": ["tmp.nc", 23396, 16],
"time/.zarray": '{"shape":[4],"chunks":[4],"dtype":"<f4","fill_value":null,"order":"C","compressor":null,"filters":null,"zarr_format":2}',
"time/.zattrs": '{"calendar":"standard","long_name":"Time","standard_name":"time","units":"hours since 1800-01-01","_ARRAY_DIMENSIONS":["time"]}',
"lon/0": ["tmp.nc", 23184, 212],
"lon/.zarray": '{"shape":[53],"chunks":[53],"dtype":"<f4","fill_value":null,"order":"C","compressor":null,"filters":null,"zarr_format":2}',
"lon/.zattrs": '{"axis":"X","long_name":"Longitude","standard_name":"longitude","units":"degrees_east","_ARRAY_DIMENSIONS":["lon"]}',
},
}
5 changes: 5 additions & 0 deletions docs/releases.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ v1.0.1 (unreleased)

New Features
~~~~~~~~~~~~


- Can open `kerchunk` reference files with ``open_virtual_dataset``.
(:pull:`251`, :pull:`186`) By `Raphael Hagen <https://github.com/norlandrhagen>`_ & `Kristen Thyng <https://github.com/kthyng>`_.

- Adds defaults for `open_virtual_dataset_from_v3_store` in (:pull:`234`)
By `Raphael Hagen <https://github.com/norlandrhagen>`_.

Expand Down
12 changes: 12 additions & 0 deletions docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,18 @@ Currently there are not yet any zarr v3 readers which understand the chunk manif
This store can however be read by {py:func}`~virtualizarr.xarray.open_virtual_dataset`, by passing `filetype="zarr_v3"`.
```

## Opening Kerchunk references from disk as virtual datasets

You can open kerchunk references from disk as virtual datasets. This may be useful in appending workflows or creating checkpoints for larger datasets.

```python

vds = open_virtual_dataset('combined.json', format='kerchunk_json')
# or
vds = open_virtual_dataset('combined.parquet', format='kerchunk_parquet')

```

## Rewriting existing manifests

Sometimes it can be useful to rewrite the contents of an already-generated manifest or virtual dataset.
Expand Down
7 changes: 5 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ test = [
"pytest",
"ruff",
"s3fs",
"scipy",
]
"scipy"]
norlandrhagen marked this conversation as resolved.
Show resolved Hide resolved


[project.urls]
Expand Down Expand Up @@ -87,6 +86,10 @@ ignore_missing_imports = true
module = "kerchunk.*"
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = "ujson.*"
ignore_missing_imports = true

[tool.ruff]
# Same as Black.
line-length = 88
Expand Down
46 changes: 40 additions & 6 deletions virtualizarr/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
from xarray.core.variable import IndexVariable

from virtualizarr.manifests import ManifestArray
from virtualizarr.utils import _fsspec_openfile_from_filepath
from virtualizarr.types.kerchunk import KerchunkStoreRefs
from virtualizarr.utils import _FsspecFSFromFilepath

XArrayOpenT = str | os.PathLike[Any] | BufferedIOBase | AbstractDataStore

Expand All @@ -39,6 +40,8 @@ class FileType(AutoName):
zarr = auto()
dmrpp = auto()
zarr_v3 = auto()
kerchunk_json = auto()
kerchunk_parquet = auto()
norlandrhagen marked this conversation as resolved.
Show resolved Hide resolved


class ManifestBackendArray(ManifestArray, BackendArray):
Expand Down Expand Up @@ -67,13 +70,14 @@ def open_virtual_dataset(

Xarray indexes can optionally be created (the default behaviour). To avoid creating any xarray indexes pass ``indexes={}``.


Parameters
----------
filepath : str, default None
File path to open as a set of virtualized zarr arrays.
filetype : FileType, default None
Type of file to be opened. Used to determine which kerchunk file format backend to use.
Can be one of {'netCDF3', 'netCDF4', 'HDF', 'TIFF', 'GRIB', 'FITS', 'zarr_v3'}.
Can be one of {'netCDF3', 'netCDF4', 'HDF', 'TIFF', 'GRIB', 'FITS', 'zarr_v3', 'kerchunk_json', 'kerchunk_parquet'}.
If not provided will attempt to automatically infer the correct filetype from header bytes.
group : str, default is None
Path to the HDF5/netCDF4 group in the given file to open. Given as a str, supported by filetypes “netcdf4” and “hdf5”.
Expand Down Expand Up @@ -133,9 +137,39 @@ def open_virtual_dataset(
raise NotImplementedError()

# if filetype is user defined, convert to FileType

if filetype is not None:
filetype = FileType(filetype)

if filetype == FileType.kerchunk_parquet:
from fsspec.implementations.reference import LazyReferenceMapper

from virtualizarr.readers.kerchunk import dataset_from_kerchunk_refs
norlandrhagen marked this conversation as resolved.
Show resolved Hide resolved

fs = _FsspecFSFromFilepath(filepath=filepath, reader_options=reader_options).fs
lrm = LazyReferenceMapper(filepath, fs)

# build reference dict from KV pairs in LazyReferenceMapper
array_refs = {k: lrm[k] for k in lrm.keys()}

# where does version come from? Is it always 1?
full_reference = {"version": 1, "refs": array_refs}
norlandrhagen marked this conversation as resolved.
Show resolved Hide resolved

return dataset_from_kerchunk_refs(KerchunkStoreRefs(full_reference))
norlandrhagen marked this conversation as resolved.
Show resolved Hide resolved

if filetype == FileType.kerchunk_json:
import ast

from virtualizarr.readers.kerchunk import dataset_from_kerchunk_refs

fpath = _FsspecFSFromFilepath(
filepath=filepath, reader_options=reader_options
).open_file()

refs = ast.literal_eval(fpath.read().decode("utf-8"))
norlandrhagen marked this conversation as resolved.
Show resolved Hide resolved

return dataset_from_kerchunk_refs(KerchunkStoreRefs(refs))

norlandrhagen marked this conversation as resolved.
Show resolved Hide resolved
if filetype == FileType.zarr_v3:
# TODO is there a neat way of auto-detecting this?
from virtualizarr.readers.zarr import open_virtual_dataset_from_v3_store
Expand All @@ -151,9 +185,9 @@ def open_virtual_dataset(
"Specifying `loadable_variables` or auto-creating indexes with `indexes=None` is not supported for dmrpp files."
)

fpath = _fsspec_openfile_from_filepath(
fpath = _FsspecFSFromFilepath(
filepath=filepath, reader_options=reader_options
)
).open_file()
parser = DMRParser(fpath.read(), data_filepath=filepath.strip(".dmrpp"))
vds = parser.parse_dataset()
vds.drop_vars(drop_variables)
Expand Down Expand Up @@ -189,9 +223,9 @@ def open_virtual_dataset(
# TODO we are reading a bunch of stuff we know we won't need here, e.g. all of the data variables...
# TODO it would also be nice if we could somehow consolidate this with the reading of the kerchunk references
# TODO really we probably want a dedicated xarray backend that iterates over all variables only once
fpath = _fsspec_openfile_from_filepath(
fpath = _FsspecFSFromFilepath(
filepath=filepath, reader_options=reader_options
)
).open_file()

# fpath can be `Any` thanks to fsspec.filesystem(...).open() returning Any.
# We'll (hopefully safely) cast it to what xarray is expecting, but this might let errors through.
Expand Down
6 changes: 3 additions & 3 deletions virtualizarr/readers/kerchunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
KerchunkArrRefs,
KerchunkStoreRefs,
)
from virtualizarr.utils import _fsspec_openfile_from_filepath
from virtualizarr.utils import _FsspecFSFromFilepath
from virtualizarr.zarr import ZArray, ZAttrs


Expand All @@ -28,9 +28,9 @@ def _automatically_determine_filetype(
raise NotImplementedError()

# Read magic bytes from local or remote file
fpath = _fsspec_openfile_from_filepath(
fpath = _FsspecFSFromFilepath(
filepath=filepath, reader_options=reader_options
)
).open_file()
magic_bytes = fpath.read(8)
fpath.close()

Expand Down
30 changes: 30 additions & 0 deletions virtualizarr/tests/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,3 +336,33 @@ def test_open_dataset_with_scalar(self, hdf5_scalar, tmpdir):
vds = open_virtual_dataset(hdf5_scalar)
assert vds.scalar.dims == ()
assert vds.scalar.attrs == {"scalar": "true"}


@pytest.mark.parametrize(
"reference_format",
[
"kerchunk_json",
"kerchunk_parquet",
],
)
def test_open_virtual_dataset_existing_kerchunk_refs(
tmp_path, example_reference_dict, reference_format
):
if reference_format == "kerchunk_json":
ref_filepath = tmp_path / "ref.json"
with open(ref_filepath, "w") as f:
f.write(repr(example_reference_dict))

if reference_format == "kerchunk_parquet":
from kerchunk.df import refs_to_dataframe

ref_filepath = tmp_path / "ref.parquet"
refs_to_dataframe(fo=example_reference_dict, url=ref_filepath.as_posix())

vds = open_virtual_dataset(
filepath=ref_filepath.as_posix(), filetype=reference_format, indexes={}
)

assert list(vds) == ["air"]
assert set(vds.coords) == set(["lat", "lon", "time"])
assert set(vds.variables) == set(["lat", "air", "lon", "time"])
norlandrhagen marked this conversation as resolved.
Show resolved Hide resolved
6 changes: 3 additions & 3 deletions virtualizarr/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pytest
import xarray as xr

from virtualizarr.utils import _fsspec_openfile_from_filepath
from virtualizarr.utils import _FsspecFSFromFilepath


@pytest.fixture
Expand All @@ -21,7 +21,7 @@ def test_fsspec_openfile_from_path(tmp_path: pathlib.Path, dataset: xr.Dataset)
f = tmp_path / "dataset.nc"
dataset.to_netcdf(f)

result = _fsspec_openfile_from_filepath(filepath=f.as_posix())
result = _FsspecFSFromFilepath(filepath=f.as_posix()).open_file()
assert isinstance(result, fsspec.implementations.local.LocalFileOpener)


Expand All @@ -32,6 +32,6 @@ def test_fsspec_openfile_memory(dataset: xr.Dataset):
with fs.open("dataset.nc", mode="wb") as f:
dataset.to_netcdf(f, engine="h5netcdf")

result = _fsspec_openfile_from_filepath(filepath="memory://dataset.nc")
result = _FsspecFSFromFilepath(filepath="memory://dataset.nc").open_file()
with result:
assert isinstance(result, fsspec.implementations.memory.MemoryFile)
55 changes: 30 additions & 25 deletions virtualizarr/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import io
from typing import TYPE_CHECKING, Optional, Union
from typing import TYPE_CHECKING, Dict, Optional, Union
norlandrhagen marked this conversation as resolved.
Show resolved Hide resolved

if TYPE_CHECKING:
import fsspec.core
Expand All @@ -13,42 +13,47 @@
]


def _fsspec_openfile_from_filepath(
*,
filepath: str,
reader_options: Optional[dict] = None,
) -> OpenFileType:
"""Converts input filepath to fsspec openfile object.
from dataclasses import dataclass, field


@dataclass
class _FsspecFSFromFilepath:
"""Class to create fsspec Filesystem from input filepath.

Parameters
----------
filepath : str
Input filepath
reader_options : dict, optional
Dict containing kwargs to pass to file opener, by default {}
fs : Option | None
The fsspec filesystem object, created in __post_init__

Returns
-------
OpenFileType
An open file-like object, specific to the protocol supplied in filepath.

Raises
------
NotImplementedError
Raises a Not Implemented Error if filepath protocol is not supported.
"""

import fsspec
from upath import UPath
filepath: str
reader_options: Optional[Dict] = field(default_factory=dict)
fs: fsspec.AbstractFileSystem = field(init=False)

def open_file(self) -> OpenFileType:
"""Calls `.open` on fsspec.Filesystem instantiation using self.filepath as an input.

universal_filepath = UPath(filepath)
protocol = universal_filepath.protocol
Returns
-------
OpenFileType
file opened with fsspec
"""
return self.fs.open(self.filepath)

if reader_options is None:
reader_options = {}
def __post_init__(self) -> None:
"""Initialize the fsspec filesystem object"""
import fsspec
from upath import UPath

storage_options = reader_options.get("storage_options", {}) # type: ignore
universal_filepath = UPath(self.filepath)
protocol = universal_filepath.protocol

fpath = fsspec.filesystem(protocol, **storage_options).open(filepath)
self.reader_options = self.reader_options or {}
storage_options = self.reader_options.get("storage_options", {}) # type: ignore

return fpath
self.fs = fsspec.filesystem(protocol, **storage_options)
Loading