Skip to content

Commit

Permalink
Merge branch 'main' into dmrpp_root_group_fix
Browse files Browse the repository at this point in the history
  • Loading branch information
ayushnag committed Oct 19, 2024
2 parents b8fbb15 + 29ca4ac commit 42fb6a2
Show file tree
Hide file tree
Showing 26 changed files with 1,268 additions and 680 deletions.
9 changes: 7 additions & 2 deletions docs/releases.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,17 @@ New Features
- Load scalar variables by default. (:pull:`205`)
By `Gustavo Hidalgo <https://github.com/ghidalgo3>`_.

- Support empty files (:pull:`260`)
By `Justus Magin <https://github.com/keewis>`_.

Breaking changes
~~~~~~~~~~~~~~~~

- Serialize valid ZarrV3 metadata and require full compressor numcodec config (for :pull:`193`)
By `Gustavo Hidalgo <https://github.com/ghidalgo3>`_.
- VirtualiZarr's `ZArray`, `ChunkEntry`, and `Codec` no longer subclass
`pydantic.BaseModel` (:pull:`210`)
- `ZArray`'s `__init__` signature has changed to match `zarr.Array`'s (:pull:`xxx`)
- `ZArray`'s `__init__` signature has changed to match `zarr.Array`'s (:pull:`210`)

Deprecations
~~~~~~~~~~~~
Expand All @@ -55,7 +58,7 @@ Bug fixes
Documentation
~~~~~~~~~~~~~

- Adds virtualizarr + coiled serverless example notebook (:pull`223`)
- Adds virtualizarr + coiled serverless example notebook (:pull:`223`)
By `Raphael Hagen <https://github.com/norlandrhagen>`_.


Expand All @@ -64,6 +67,8 @@ Internal Changes

- Refactored internal structure significantly to split up everything to do with reading references from that to do with writing references.
(:issue:`229`) (:pull:`231`) By `Tom Nicholas <https://github.com/TomNicholas>`_.
- Refactored readers to consider every filetype as a separate reader, all standardized to present the same `open_virtual_dataset` interface internally.
(:pull:`261`) By `Tom Nicholas <https://github.com/TomNicholas>`_.

.. _v1.0.0:

Expand Down
311 changes: 92 additions & 219 deletions virtualizarr/backend.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,39 @@
import os
import warnings
from collections.abc import Iterable, Mapping, MutableMapping
from collections.abc import Iterable, Mapping
from enum import Enum, auto
from io import BufferedIOBase
from pathlib import Path
from typing import (
Any,
Hashable,
Optional,
cast,
)

import xarray as xr
from xarray.backends import AbstractDataStore, BackendArray
from xarray.core.indexes import Index, PandasIndex
from xarray.core.variable import IndexVariable
from xarray import Dataset
from xarray.core.indexes import Index

from virtualizarr.manifests import ManifestArray
from virtualizarr.types.kerchunk import KerchunkStoreRefs
from virtualizarr.utils import _FsspecFSFromFilepath

XArrayOpenT = str | os.PathLike[Any] | BufferedIOBase | AbstractDataStore
from virtualizarr.readers import (
DMRPPVirtualBackend,
FITSVirtualBackend,
HDF5VirtualBackend,
KerchunkVirtualBackend,
NetCDF3VirtualBackend,
TIFFVirtualBackend,
ZarrV3VirtualBackend,
)
from virtualizarr.utils import _FsspecFSFromFilepath, check_for_collisions

# TODO add entrypoint to allow external libraries to add to this mapping
VIRTUAL_BACKENDS = {
"kerchunk": KerchunkVirtualBackend,
"zarr_v3": ZarrV3VirtualBackend,
"dmrpp": DMRPPVirtualBackend,
# all the below call one of the kerchunk backends internally (https://fsspec.github.io/kerchunk/reference.html#file-format-backends)
"netcdf3": NetCDF3VirtualBackend,
"hdf5": HDF5VirtualBackend,
"netcdf4": HDF5VirtualBackend, # note this is the same as for hdf5
"tiff": TIFFVirtualBackend,
"fits": FITSVirtualBackend,
}


class AutoName(Enum):
Expand All @@ -43,10 +57,49 @@ class FileType(AutoName):
kerchunk = auto()


class ManifestBackendArray(ManifestArray, BackendArray):
"""Using this prevents xarray from wrapping the KerchunkArray in ExplicitIndexingAdapter etc."""
def automatically_determine_filetype(
*,
filepath: str,
reader_options: Optional[dict[str, Any]] = {},
) -> FileType:
"""
Attempt to automatically infer the correct reader for this filetype.
Uses magic bytes and file / directory suffixes.
"""

...
# TODO this should ideally handle every filetype that we have a reader for, not just kerchunk

# TODO how do we handle kerchunk json / parquet here?
if Path(filepath).suffix == ".zarr":
# TODO we could imagine opening an existing zarr store, concatenating it, and writing a new virtual one...
raise NotImplementedError()

# Read magic bytes from local or remote file
fpath = _FsspecFSFromFilepath(
filepath=filepath, reader_options=reader_options
).open_file()
magic_bytes = fpath.read(8)
fpath.close()

if magic_bytes.startswith(b"CDF"):
filetype = FileType.netcdf3
elif magic_bytes.startswith(b"\x0e\x03\x13\x01"):
raise NotImplementedError("HDF4 formatted files not supported")
elif magic_bytes.startswith(b"\x89HDF"):
filetype = FileType.hdf5
elif magic_bytes.startswith(b"GRIB"):
filetype = FileType.grib
elif magic_bytes.startswith(b"II*"):
filetype = FileType.tiff
elif magic_bytes.startswith(b"SIMPLE"):
filetype = FileType.fits
else:
raise NotImplementedError(
f"Unrecognised file based on header bytes: {magic_bytes}"
)

return filetype


def open_virtual_dataset(
Expand All @@ -61,15 +114,14 @@ def open_virtual_dataset(
indexes: Mapping[str, Index] | None = None,
virtual_array_class=ManifestArray,
reader_options: Optional[dict] = None,
) -> xr.Dataset:
) -> Dataset:
"""
Open a file or store as an xarray Dataset wrapping virtualized zarr arrays.
No data variables will be loaded unless specified in the ``loadable_variables`` kwarg (in which case they will be xarray lazily indexed arrays).
Xarray indexes can optionally be created (the default behaviour). To avoid creating any xarray indexes pass ``indexes={}``.
Parameters
----------
filepath : str, default None
Expand Down Expand Up @@ -112,217 +164,38 @@ def open_virtual_dataset(
stacklevel=2,
)

loadable_vars: dict[str, xr.Variable]
virtual_vars: dict[str, xr.Variable]
vars: dict[str, xr.Variable]

if drop_variables is None:
drop_variables = []
elif isinstance(drop_variables, str):
drop_variables = [drop_variables]
else:
drop_variables = list(drop_variables)
if loadable_variables is None:
loadable_variables = []
elif isinstance(loadable_variables, str):
loadable_variables = [loadable_variables]
else:
loadable_variables = list(loadable_variables)
common = set(drop_variables).intersection(set(loadable_variables))
if common:
raise ValueError(f"Cannot both load and drop variables {common}")
drop_variables, loadable_variables = check_for_collisions(
drop_variables,
loadable_variables,
)

if virtual_array_class is not ManifestArray:
raise NotImplementedError()

# if filetype is user defined, convert to FileType
if reader_options is None:
reader_options = {}

if filetype is not None:
# if filetype is user defined, convert to FileType
filetype = FileType(filetype)

if filetype == FileType.kerchunk:
from virtualizarr.readers.kerchunk import dataset_from_kerchunk_refs

fs = _FsspecFSFromFilepath(filepath=filepath, reader_options=reader_options)

# The kerchunk .parquet storage format isn't actually a parquet, but a directory that contains named parquets for each group/variable.
if fs.filepath.endswith("ref.parquet"):
from fsspec.implementations.reference import LazyReferenceMapper

lrm = LazyReferenceMapper(filepath, fs.fs)

# build reference dict from KV pairs in LazyReferenceMapper
# is there a better / more preformant way to extract this?
array_refs = {k: lrm[k] for k in lrm.keys()}

full_reference = {"refs": array_refs}

return dataset_from_kerchunk_refs(KerchunkStoreRefs(full_reference))

# JSON has no magic bytes, but the Kerchunk version 1 spec starts with 'version':
# https://fsspec.github.io/kerchunk/spec.html
elif fs.read_bytes(9).startswith(b'{"version'):
import ujson

with fs.open_file() as of:
refs = ujson.load(of)

return dataset_from_kerchunk_refs(KerchunkStoreRefs(refs))

else:
raise ValueError(
"The input Kerchunk reference did not seem to be in Kerchunk's JSON or Parquet spec: https://fsspec.github.io/kerchunk/spec.html. The Kerchunk format autodetection is quite flaky, so if your reference matches the Kerchunk spec feel free to open an issue: https://github.com/zarr-developers/VirtualiZarr/issues"
)

if filetype == FileType.zarr_v3:
# TODO is there a neat way of auto-detecting this?
from virtualizarr.readers.zarr import open_virtual_dataset_from_v3_store

return open_virtual_dataset_from_v3_store(
storepath=filepath, drop_variables=drop_variables, indexes=indexes
)
elif filetype == FileType.dmrpp:
from virtualizarr.readers.dmrpp import DMRParser

if loadable_variables != [] or indexes is None:
raise NotImplementedError(
"Specifying `loadable_variables` or auto-creating indexes with `indexes=None` is not supported for dmrpp files."
)

fpath = _FsspecFSFromFilepath(
filepath=filepath, reader_options=reader_options
).open_file()
parser = DMRParser(fpath.read(), data_filepath=filepath.strip(".dmrpp"))
vds = parser.parse_dataset()
vds.drop_vars(drop_variables)
return vds
else:
# we currently read every other filetype using kerchunks various file format backends
from virtualizarr.readers.kerchunk import (
fully_decode_arr_refs,
read_kerchunk_references_from_file,
virtual_vars_from_kerchunk_refs,
)

if reader_options is None:
reader_options = {}

# this is the only place we actually always need to use kerchunk directly
# TODO avoid even reading byte ranges for variables that will be dropped later anyway?
vds_refs = read_kerchunk_references_from_file(
filepath=filepath,
filetype=filetype,
group=group,
reader_options=reader_options,
)
virtual_vars = virtual_vars_from_kerchunk_refs(
vds_refs,
drop_variables=drop_variables + loadable_variables,
virtual_array_class=virtual_array_class,
)
ds_attrs = fully_decode_arr_refs(vds_refs["refs"]).get(".zattrs", {})
coord_names = ds_attrs.pop("coordinates", [])

if indexes is None or len(loadable_variables) > 0:
# TODO we are reading a bunch of stuff we know we won't need here, e.g. all of the data variables...
# TODO it would also be nice if we could somehow consolidate this with the reading of the kerchunk references
# TODO really we probably want a dedicated xarray backend that iterates over all variables only once
fpath = _FsspecFSFromFilepath(
filepath=filepath, reader_options=reader_options
).open_file()

# fpath can be `Any` thanks to fsspec.filesystem(...).open() returning Any.
# We'll (hopefully safely) cast it to what xarray is expecting, but this might let errors through.

ds = xr.open_dataset(
cast(XArrayOpenT, fpath),
drop_variables=drop_variables,
group=group,
decode_times=decode_times,
)

if indexes is None:
warnings.warn(
"Specifying `indexes=None` will create in-memory pandas indexes for each 1D coordinate, but concatenation of ManifestArrays backed by pandas indexes is not yet supported (see issue #18)."
"You almost certainly want to pass `indexes={}` to `open_virtual_dataset` instead."
)

# add default indexes by reading data from file
indexes = {name: index for name, index in ds.xindexes.items()}
elif indexes != {}:
# TODO allow manual specification of index objects
raise NotImplementedError()
else:
indexes = dict(**indexes) # for type hinting: to allow mutation

loadable_vars = {
str(name): var
for name, var in ds.variables.items()
if name in loadable_variables
}

# if we only read the indexes we can just close the file right away as nothing is lazy
if loadable_vars == {}:
ds.close()
else:
loadable_vars = {}
indexes = {}

vars = {**virtual_vars, **loadable_vars}

data_vars, coords = separate_coords(vars, indexes, coord_names)

vds = xr.Dataset(
data_vars,
coords=coords,
# indexes={}, # TODO should be added in a later version of xarray
attrs=ds_attrs,
filetype = automatically_determine_filetype(
filepath=filepath, reader_options=reader_options
)

# TODO we should probably also use vds.set_close() to tell xarray how to close the file we opened

return vds


def separate_coords(
vars: Mapping[str, xr.Variable],
indexes: MutableMapping[str, Index],
coord_names: Iterable[str] | None = None,
) -> tuple[dict[str, xr.Variable], xr.Coordinates]:
"""
Try to generate a set of coordinates that won't cause xarray to automatically build a pandas.Index for the 1D coordinates.
Currently requires this function as a workaround unless xarray PR #8124 is merged.
Will also preserve any loaded variables and indexes it is passed.
"""

if coord_names is None:
coord_names = []

# split data and coordinate variables (promote dimension coordinates)
data_vars = {}
coord_vars: dict[
str, tuple[Hashable, Any, dict[Any, Any], dict[Any, Any]] | xr.Variable
] = {}
for name, var in vars.items():
if name in coord_names or var.dims == (name,):
# use workaround to avoid creating IndexVariables described here https://github.com/pydata/xarray/pull/8107#discussion_r1311214263
if len(var.dims) == 1:
dim1d, *_ = var.dims
coord_vars[name] = (dim1d, var.data, var.attrs, var.encoding)
backend_cls = VIRTUAL_BACKENDS.get(filetype.name.lower())

if isinstance(var, IndexVariable):
# unless variable actually already is a loaded IndexVariable,
# in which case we need to keep it and add the corresponding indexes explicitly
coord_vars[str(name)] = var
# TODO this seems suspect - will it handle datetimes?
indexes[name] = PandasIndex(var, dim1d)
else:
coord_vars[name] = var
else:
data_vars[name] = var
if backend_cls is None:
raise NotImplementedError(f"Unsupported file type: {filetype.name}")

coords = xr.Coordinates(coord_vars, indexes=indexes)
vds = backend_cls.open_virtual_dataset(
filepath,
group=group,
drop_variables=drop_variables,
loadable_variables=loadable_variables,
decode_times=decode_times,
indexes=indexes,
reader_options=reader_options,
)

return data_vars, coords
return vds
Loading

0 comments on commit 42fb6a2

Please sign in to comment.