Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write to parquet #110

Merged
merged 7 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ test = [
"scipy",
"pooch",
"ruff",

"fastparquet",
]


Expand Down
42 changes: 42 additions & 0 deletions virtualizarr/tests/test_kerchunk.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import numpy as np
import pandas as pd
import pytest
import ujson # type: ignore
import xarray as xr
Expand Down Expand Up @@ -127,6 +128,47 @@ def test_accessor_to_kerchunk_json(self, tmp_path):
}
assert loaded_refs == expected_ds_refs

def test_accessor_to_kerchunk_parquet(self, tmp_path):
chunks_dict = {
"0.0": {"path": "foo.nc", "offset": 100, "length": 100},
"0.1": {"path": "foo.nc", "offset": 200, "length": 100},
}
manifest = ChunkManifest(entries=chunks_dict)
arr = ManifestArray(
chunkmanifest=manifest,
zarray=dict(
shape=(2, 4),
dtype=np.dtype("<i8"),
chunks=(2, 2),
compressor=None,
filters=None,
fill_value=None,
order="C",
),
)
ds = xr.Dataset({"a": (["x", "y"], arr)})

filepath = tmp_path / "refs"

ds.virtualize.to_kerchunk(filepath, format="parquet", record_size=2)

with open(tmp_path / "refs" / ".zmetadata") as f:
meta = ujson.load(f)
assert list(meta) == ["metadata", "record_size"]
assert meta["record_size"] == 2

df0 = pd.read_parquet(filepath / "a" / "refs.0.parq")

assert df0.to_dict() == {
"offset": {0: 100, 1: 200},
"path": {
0: "foo.nc",
1: "foo.nc",
},
"size": {0: 100, 1: 100},
"raw": {0: None, 1: None},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does "raw" mean in this specification?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it means "inlined" in the kerchunk library tests this ref: "a/6": b"data", results in a non-null "raw"

https://github.com/fsspec/kerchunk/blob/a5eae4d45f7f601346d0fdc58c0864911330c5d6/kerchunk/tests/test_df.py#L62

}


def test_kerchunk_roundtrip_in_memory_no_concat():
# Set up example xarray dataset
Expand Down
35 changes: 31 additions & 4 deletions virtualizarr/xarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,15 +359,23 @@ def to_kerchunk(
) -> KerchunkStoreRefs: ...

@overload
def to_kerchunk(self, filepath: str, format: Literal["json"]) -> None: ...
def to_kerchunk(self, filepath: str | Path, format: Literal["json"]) -> None: ...

@overload
def to_kerchunk(self, filepath: str, format: Literal["parquet"]) -> None: ...
def to_kerchunk(
self,
filepath: str | Path,
format: Literal["parquet"],
record_size: int = 100_000,
categorical_threshold: int = 10,
) -> None: ...

def to_kerchunk(
self,
filepath: Optional[str] = None,
filepath: str | Path | None = None,
format: Union[Literal["dict"], Literal["json"], Literal["parquet"]] = "dict",
record_size: int = 100_000,
categorical_threshold: int = 10,
) -> Union[KerchunkStoreRefs, None]:
"""
Serialize all virtualized arrays in this xarray dataset into the kerchunk references format.
Expand All @@ -379,6 +387,13 @@ def to_kerchunk(
format : 'dict', 'json', or 'parquet'
Format to serialize the kerchunk references as.
If 'json' or 'parquet' then the 'filepath' argument is required.
record_size (parquet only): int
Number of references to store in each reference file (default 100,000). Bigger values
mean fewer read requests but larger memory footprint.
categorical_threshold (parquet only) : int
Encode urls as pandas.Categorical to reduce memory footprint if the ratio
of the number of unique urls to total number of refs for each variable
is greater than or equal to this number. (default 10)

References
----------
Expand All @@ -397,6 +412,18 @@ def to_kerchunk(

return None
elif format == "parquet":
raise NotImplementedError()
from kerchunk.df import refs_to_dataframe

if isinstance(filepath, Path):
url = str(filepath)
elif isinstance(filepath, str):
url = filepath

return refs_to_dataframe(
jsignell marked this conversation as resolved.
Show resolved Hide resolved
refs,
TomNicholas marked this conversation as resolved.
Show resolved Hide resolved
url=url,
record_size=record_size,
categorical_threshold=categorical_threshold,
)
else:
raise ValueError(f"Unrecognized output format: {format}")
Loading