Skip to content

Commit

Permalink
tests passing for reading parquet references to virtual dataset, refa…
Browse files Browse the repository at this point in the history
…ctored _fsspec_open... to class
  • Loading branch information
norlandrhagen committed Oct 9, 2024
1 parent a5dcef0 commit ba7daca
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 45 deletions.
38 changes: 26 additions & 12 deletions virtualizarr/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
from xarray.core.variable import IndexVariable

from virtualizarr.manifests import ManifestArray
from virtualizarr.utils import _fsspec_openfile_from_filepath
from virtualizarr.types.kerchunk import KerchunkStoreRefs
from virtualizarr.utils import _FsspecFSFromFilepath

XArrayOpenT = str | os.PathLike[Any] | BufferedIOBase | AbstractDataStore

Expand Down Expand Up @@ -136,25 +137,38 @@ def open_virtual_dataset(
raise NotImplementedError()

# if filetype is user defined, convert to FileType

if filetype is not None:
filetype = FileType(filetype)

if filetype == FileType.kerchunk_parquet:
from fsspec.implementations.reference import LazyReferenceMapper

from virtualizarr.readers.kerchunk import dataset_from_kerchunk_refs

fs = _FsspecFSFromFilepath(filepath=filepath, reader_options=reader_options).fs
lrm = LazyReferenceMapper(filepath, fs)

# build reference dict from KV pairs in LazyReferenceMapper
array_refs = {k: lrm[k] for k in lrm.keys()}

# where does version come from? Is it always 1?
full_reference = {"version": 1, "refs": array_refs}

return dataset_from_kerchunk_refs(KerchunkStoreRefs(full_reference))

if filetype == FileType.kerchunk_json:
import ast

from virtualizarr.readers.kerchunk import dataset_from_kerchunk_refs

fpath = _fsspec_openfile_from_filepath(
fpath = _FsspecFSFromFilepath(
filepath=filepath, reader_options=reader_options
)
).open_file()

refs = ast.literal_eval(fpath.read().decode("utf-8"))

vds = dataset_from_kerchunk_refs(refs)
return vds

if filetype == FileType.kerchunk_parquet:
raise NotImplementedError()
return dataset_from_kerchunk_refs(KerchunkStoreRefs(refs))

if filetype == FileType.zarr_v3:
# TODO is there a neat way of auto-detecting this?
Expand All @@ -171,9 +185,9 @@ def open_virtual_dataset(
"Specifying `loadable_variables` or auto-creating indexes with `indexes=None` is not supported for dmrpp files."
)

fpath = _fsspec_openfile_from_filepath(
fpath = _FsspecFSFromFilepath(
filepath=filepath, reader_options=reader_options
)
).open_file()
parser = DMRParser(fpath.read(), data_filepath=filepath.strip(".dmrpp"))
vds = parser.parse_dataset()
vds.drop_vars(drop_variables)
Expand Down Expand Up @@ -209,9 +223,9 @@ def open_virtual_dataset(
# TODO we are reading a bunch of stuff we know we won't need here, e.g. all of the data variables...
# TODO it would also be nice if we could somehow consolidate this with the reading of the kerchunk references
# TODO really we probably want a dedicated xarray backend that iterates over all variables only once
fpath = _fsspec_openfile_from_filepath(
fpath = _FsspecFSFromFilepath(
filepath=filepath, reader_options=reader_options
)
).open_file()

# fpath can be `Any` thanks to fsspec.filesystem(...).open() returning Any.
# We'll (hopefully safely) cast it to what xarray is expecting, but this might let errors through.
Expand Down
6 changes: 3 additions & 3 deletions virtualizarr/readers/kerchunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
KerchunkArrRefs,
KerchunkStoreRefs,
)
from virtualizarr.utils import _fsspec_openfile_from_filepath
from virtualizarr.utils import _FsspecFSFromFilepath
from virtualizarr.zarr import ZArray, ZAttrs


Expand All @@ -28,9 +28,9 @@ def _automatically_determine_filetype(
raise NotImplementedError()

# Read magic bytes from local or remote file
fpath = _fsspec_openfile_from_filepath(
fpath = _FsspecFSFromFilepath(
filepath=filepath, reader_options=reader_options
)
).open_file()
magic_bytes = fpath.read(8)
fpath.close()

Expand Down
10 changes: 8 additions & 2 deletions virtualizarr/tests/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ def test_open_dataset_with_scalar(self, hdf5_scalar, tmpdir):
"reference_format",
[
"kerchunk_json",
pytest.param("kerchunk_parquet", marks=pytest.mark.skip(reason="wip")),
"kerchunk_parquet",
],
)
def test_open_virtual_dataset_existing_kerchunk_refs(
Expand All @@ -353,8 +353,14 @@ def test_open_virtual_dataset_existing_kerchunk_refs(
with open(ref_filepath, "w") as f:
f.write(repr(example_reference_dict))

if reference_format == "kerchunk_parquet":
from kerchunk.df import refs_to_dataframe

ref_filepath = tmp_path / "ref.parquet"
refs_to_dataframe(fo=example_reference_dict, url=ref_filepath.as_posix())

vds = open_virtual_dataset(
filepath=ref_filepath, filetype=reference_format, indexes={}
filepath=ref_filepath.as_posix(), filetype=reference_format, indexes={}
)

assert list(vds) == ["air"]
Expand Down
6 changes: 3 additions & 3 deletions virtualizarr/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pytest
import xarray as xr

from virtualizarr.utils import _fsspec_openfile_from_filepath
from virtualizarr.utils import _FsspecFSFromFilepath


@pytest.fixture
Expand All @@ -21,7 +21,7 @@ def test_fsspec_openfile_from_path(tmp_path: pathlib.Path, dataset: xr.Dataset)
f = tmp_path / "dataset.nc"
dataset.to_netcdf(f)

result = _fsspec_openfile_from_filepath(filepath=f.as_posix())
result = _FsspecFSFromFilepath(filepath=f.as_posix()).open_file()
assert isinstance(result, fsspec.implementations.local.LocalFileOpener)


Expand All @@ -32,6 +32,6 @@ def test_fsspec_openfile_memory(dataset: xr.Dataset):
with fs.open("dataset.nc", mode="wb") as f:
dataset.to_netcdf(f, engine="h5netcdf")

result = _fsspec_openfile_from_filepath(filepath="memory://dataset.nc")
result = _FsspecFSFromFilepath(filepath="memory://dataset.nc").open_file()
with result:
assert isinstance(result, fsspec.implementations.memory.MemoryFile)
55 changes: 30 additions & 25 deletions virtualizarr/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import io
from typing import TYPE_CHECKING, Optional, Union
from typing import TYPE_CHECKING, Dict, Optional, Union

if TYPE_CHECKING:
import fsspec.core
Expand All @@ -13,42 +13,47 @@
]


def _fsspec_openfile_from_filepath(
*,
filepath: str,
reader_options: Optional[dict] = None,
) -> OpenFileType:
"""Converts input filepath to fsspec openfile object.
from dataclasses import dataclass, field


@dataclass
class _FsspecFSFromFilepath:
"""Class to create fsspec Filesystem from input filepath.
Parameters
----------
filepath : str
Input filepath
reader_options : dict, optional
Dict containing kwargs to pass to file opener, by default {}
fs : Option | None
The fsspec filesystem object, created in __post_init__
Returns
-------
OpenFileType
An open file-like object, specific to the protocol supplied in filepath.
Raises
------
NotImplementedError
Raises a Not Implemented Error if filepath protocol is not supported.
"""

import fsspec
from upath import UPath
filepath: str
reader_options: Optional[Dict] = field(default_factory=dict)
fs: fsspec.AbstractFileSystem = field(init=False)

def open_file(self) -> OpenFileType:
"""Calls `.open` on fsspec.Filesystem instantiation using self.filepath as an input.
universal_filepath = UPath(filepath)
protocol = universal_filepath.protocol
Returns
-------
OpenFileType
file opened with fsspec
"""
return self.fs.open(self.filepath)

if reader_options is None:
reader_options = {}
def __post_init__(self) -> None:
"""Initialize the fsspec filesystem object"""
import fsspec
from upath import UPath

storage_options = reader_options.get("storage_options", {}) # type: ignore
universal_filepath = UPath(self.filepath)
protocol = universal_filepath.protocol

fpath = fsspec.filesystem(protocol, **storage_options).open(filepath)
self.reader_options = self.reader_options or {}
storage_options = self.reader_options.get("storage_options", {}) # type: ignore

return fpath
self.fs = fsspec.filesystem(protocol, **storage_options)

0 comments on commit ba7daca

Please sign in to comment.