Skip to content
This repository has been archived by the owner on Nov 12, 2024. It is now read-only.

Write virtual references to Icechunk #1

Closed
wants to merge 43 commits into from
Closed
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
7b00e41
move vds_with_manifest_arrays fixture up
TomNicholas Sep 27, 2024
c82221c
sketch implementation
TomNicholas Sep 27, 2024
d29362b
test that we can create an icechunk store
TomNicholas Sep 27, 2024
2aa3cb5
fixture to create icechunk filestore in temporary directory
TomNicholas Sep 27, 2024
f2c095c
get the async fixture working properly
TomNicholas Sep 27, 2024
6abe32d
split into more functions
TomNicholas Sep 27, 2024
93080b3
change mode
TomNicholas Sep 27, 2024
bebf370
try creating zarr group and arrays explicitly
TomNicholas Sep 27, 2024
833e5f0
create root group from store
TomNicholas Sep 28, 2024
9853140
todos
TomNicholas Sep 28, 2024
030a96e
do away with the async pytest fixtures/functions
TomNicholas Sep 28, 2024
90ca5cf
successfully writes root group attrs
TomNicholas Sep 28, 2024
b138dde
check array metadata is correct
TomNicholas Sep 28, 2024
6631102
try to write array attributes
TomNicholas Sep 28, 2024
e92b56c
sketch test for checking virtual references have been set correctly
TomNicholas Sep 28, 2024
2c8c0ee
test setting single virtual ref
TomNicholas Sep 30, 2024
a2ce1ed
use async properly
TomNicholas Sep 30, 2024
9393995
better separation of handling of loadable variables
TomNicholas Oct 1, 2024
956e324
fix chunk key format
TomNicholas Oct 1, 2024
2d7d5f6
use require_array
TomNicholas Oct 1, 2024
8726e23
check that store supports writes
TomNicholas Oct 1, 2024
387f345
removed outdated note about awaiting
TomNicholas Oct 1, 2024
b2a0700
fix incorrect chunk key in test
TomNicholas Oct 2, 2024
4ffb55e
absolute path
TomNicholas Oct 2, 2024
f929fcb
convert to file URI before handing to icechunk
TomNicholas Oct 2, 2024
e9c1287
test that without encoding we can definitely read one chunk
TomNicholas Oct 2, 2024
2fe3012
Work on encoding test
mpiannucci Oct 2, 2024
33d8ce8
Merge remote-tracking branch 'origin/icechunk' into matt/icechunk-enc…
mpiannucci Oct 2, 2024
8aa6034
Update test to match
mpiannucci Oct 2, 2024
aa2d415
Quick comment
mpiannucci Oct 2, 2024
7e4e2ce
more comprehensive
mpiannucci Oct 2, 2024
9a03245
add attrtirbute encoding
mpiannucci Oct 3, 2024
9676485
Merge pull request #2 from earth-mover/matt/icechunk-encoding
TomNicholas Oct 4, 2024
bbaf3ba
Fix array dimensions
mpiannucci Oct 10, 2024
31945cd
Merge pull request #3 from earth-mover/matt/array-dims
mpiannucci Oct 11, 2024
49daa7e
Fix v3 codec pipeline
mpiannucci Oct 11, 2024
756ff92
Put xarray dep back
mpiannucci Oct 11, 2024
8c7242e
Handle codecs, but get bad results
mpiannucci Oct 12, 2024
666b676
Gzip an d zlib are not directly working
mpiannucci Oct 12, 2024
9076ad7
Get up working with numcodecs zarr 3 codecs
mpiannucci Oct 13, 2024
7a160fd
Update codec pipeline
mpiannucci Oct 14, 2024
286a9b5
Merge pull request #4 from earth-mover/matt/v3-codecs
mpiannucci Oct 15, 2024
8f1f96e
oUdpate to latest icechunk using sync api
mpiannucci Oct 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions virtualizarr/tests/test_writers/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import numpy as np
import pytest
from xarray import Dataset
from xarray.core.variable import Variable

from virtualizarr.manifests import ChunkManifest, ManifestArray


@pytest.fixture
def vds_with_manifest_arrays() -> Dataset:
arr = ManifestArray(
chunkmanifest=ChunkManifest(
entries={"0.0": dict(path="test.nc", offset=6144, length=48)}
TomNicholas marked this conversation as resolved.
Show resolved Hide resolved
),
zarray=dict(
shape=(2, 3),
dtype=np.dtype("<i8"),
chunks=(2, 3),
compressor={"id": "zlib", "level": 1},
filters=None,
fill_value=0,
order="C",
zarr_format=3,
),
)
var = Variable(dims=["x", "y"], data=arr, attrs={"units": "km"})
return Dataset({"a": var}, attrs={"something": 0})
120 changes: 120 additions & 0 deletions virtualizarr/tests/test_writers/test_icechunk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import asyncio
from pathlib import Path
from typing import TYPE_CHECKING

import pytest

pytest.importorskip("icechunk")

import numpy as np
import numpy.testing as npt
from xarray import Dataset, open_dataset
from xarray.core.variable import Variable
from zarr import Array, Group, group

from virtualizarr.manifests import ChunkManifest, ManifestArray
from virtualizarr.writers.icechunk import dataset_to_icechunk
from virtualizarr.zarr import ZArray

if TYPE_CHECKING:
from icechunk import IcechunkStore


@pytest.fixture
def icechunk_filestore(tmpdir) -> "IcechunkStore":
from icechunk import IcechunkStore, StorageConfig

storage = StorageConfig.filesystem(str(tmpdir))

# TODO if icechunk exposed a synchronous version of .open then we wouldn't need to use asyncio.run here
# TODO is this the correct mode to use?
store = asyncio.run(IcechunkStore.open(storage=storage, mode="r+"))

# TODO instead yield store then store.close() ??
return store


class TestWriteVirtualRefs:
def test_write_new_virtual_variable(
self, icechunk_filestore: "IcechunkStore", vds_with_manifest_arrays: Dataset
):
vds = vds_with_manifest_arrays

dataset_to_icechunk(vds, icechunk_filestore)

# check attrs
root_group = group(store=icechunk_filestore)
assert isinstance(root_group, Group)
assert root_group.attrs == {"something": 0}

# TODO check against vds, then perhaps parametrize?

# check array exists
assert "a" in root_group
arr = root_group["a"]
assert isinstance(arr, Array)

# check array metadata
# TODO why doesn't a .zarr_format or .version attribute exist on zarr.Array?
# assert arr.zarr_format == 3
assert arr.shape == (2, 3)
assert arr.chunks == (2, 3)
assert arr.dtype == np.dtype("<i8")
assert arr.order == "C"
assert arr.fill_value == 0
# TODO check compressor, filters?

# check array attrs
# TODO somehow this is broken by setting the dimension names???
# assert dict(arr.attrs) == {"units": "km"}

# check dimensions
assert arr.attrs["DIMENSION_NAMES"] == ["x", "y"]

def test_set_single_virtual_ref(
self, icechunk_filestore: "IcechunkStore", netcdf4_file: Path
):
# TODO kerchunk doesn't work with zarr-python v3 yet so we can't use open_virtual_dataset and icechunk together!
# vds = open_virtual_dataset(netcdf4_file, indexes={})

# instead for now just write out byte ranges explicitly
manifest = ChunkManifest(
{"0.0": {"path": netcdf4_file, "offset": 15419, "length": 7738000}}
)
zarray = ZArray(
shape=(2920, 25, 53),
chunks=(2920, 25, 53),
dtype=np.dtype("int16"),
compressor=None,
filters=None,
fill_value=None,
)
ma = ManifestArray(
chunkmanifest=manifest,
zarray=zarray,
)
air = Variable(data=ma, dims=["time", "lat", "lon"])
vds = Dataset(
{"air": air},
)

dataset_to_icechunk(vds, icechunk_filestore)

root_group = group(store=icechunk_filestore)
air_array = root_group["air"]
print(air_array)

# check chunk references
# TODO we can't explicitly check that the path/offset/length is correct because icechunk doesn't yet expose any get_virtual_refs method

expected_ds = open_dataset(netcdf4_file)
expected_air_array = expected_ds["air"].to_numpy()
npt.assert_equal(air_array, expected_air_array)

# note: we don't need to test that committing works, because now we have confirmed
# the refs are in the store (even uncommitted) it's icechunk's problem to manage them now.


# TODO test writing loadable variables

# TODO roundtripping tests - requires icechunk compatibility with xarray
25 changes: 1 addition & 24 deletions virtualizarr/tests/test_writers/test_zarr.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,14 @@
import json

import numpy as np
import pytest
import xarray.testing as xrt
from xarray import Dataset

from virtualizarr import ManifestArray, open_virtual_dataset
from virtualizarr import open_virtual_dataset
from virtualizarr.backend import FileType
from virtualizarr.manifests.manifest import ChunkManifest
from virtualizarr.readers.zarr import metadata_from_zarr_json
from virtualizarr.writers.zarr import dataset_to_zarr


@pytest.fixture
def vds_with_manifest_arrays() -> Dataset:
arr = ManifestArray(
chunkmanifest=ChunkManifest(
entries={"0.0": dict(path="test.nc", offset=6144, length=48)}
),
zarray=dict(
shape=(2, 3),
dtype=np.dtype("<i8"),
chunks=(2, 3),
compressor={"id": "zlib", "level": 1},
filters=None,
fill_value=0,
order="C",
zarr_format=3,
),
)
return Dataset({"a": (["x", "y"], arr)}, attrs={"something": 0})


def isconfigurable(value: dict) -> bool:
"""
Several metadata attributes in ZarrV3 use a dictionary with keys "name" : str and "configuration" : dict
Expand Down
152 changes: 152 additions & 0 deletions virtualizarr/writers/icechunk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import asyncio
from typing import TYPE_CHECKING

import numpy as np
from xarray import Dataset
from xarray.core.variable import Variable
from zarr import Group

from virtualizarr.manifests import ChunkManifest, ManifestArray
from virtualizarr.zarr import encode_dtype

if TYPE_CHECKING:
from icechunk import IcechunkStore


def dataset_to_icechunk(ds: Dataset, store: "IcechunkStore") -> None:
"""
Write an xarray dataset whose variables wrap ManifestArrays to an Icechunk store.

Currently requires all variables to be backed by ManifestArray objects.

Parameters
----------
ds: xr.Dataset
store: IcechunkStore
"""
from icechunk import IcechunkStore

if not isinstance(store, IcechunkStore):
raise TypeError(f"expected type IcechunkStore, but got type {type(store)}")

# TODO should we check that the store supports writes at this point?
TomNicholas marked this conversation as resolved.
Show resolved Hide resolved

# TODO only supports writing to the root group currently
# TODO pass zarr_format kwarg?
root_group = Group.from_store(store=store)

# TODO this is Frozen, the API for setting attributes must be something else
# root_group.attrs = ds.attrs
for k, v in ds.attrs.items():
root_group.attrs[k] = v

# we should be able to write references for each variable concurrently
asyncio.run(
write_variables_to_icechunk_group(
ds.variables,
store=store,
group=root_group,
)
)


async def write_variables_to_icechunk_group(
variables,
store,
group,
):
await asyncio.gather(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the right approach to me. its work stealing not parallel so remember that to keep perf expecations in check

*(
write_variable_to_icechunk(
store=store,
group=group,
arr_name=name,
var=var,
)
for name, var in variables.items()
)
)


async def write_variable_to_icechunk(
store: "IcechunkStore",
group: Group,
arr_name: str,
var: Variable,
) -> None:
if not isinstance(var.data, ManifestArray):
# TODO is writing loadable_variables just normal xarray ds.to_zarr?
raise NotImplementedError()

ma = var.data
zarray = ma.zarray

# TODO should I be checking that this array doesn't already exist? Or is that icechunks' job?
arr = group.create_array(
TomNicholas marked this conversation as resolved.
Show resolved Hide resolved
arr_name,
shape=zarray.shape,
chunk_shape=zarray.chunks,
dtype=encode_dtype(zarray.dtype),
# TODO fill_value?
# TODO order?
# TODO zarr format?
# TODO compressors?
)

# TODO it would be nice if we could assign directly to the .attrs property
for k, v in var.attrs.items():
arr.attrs[k] = v
# TODO we should probably be doing some encoding of those attributes?
arr.attrs["DIMENSION_NAMES"] = var.dims

await write_manifest_virtual_refs(
store=store,
group=group,
arr_name=arr_name,
manifest=ma.manifest,
)


async def write_manifest_virtual_refs(
store: "IcechunkStore",
group: Group,
arr_name: str,
manifest: ChunkManifest,
) -> None:
"""Write all the virtual references for one array manifest at once."""

key_prefix = f"{group.name}{arr_name}"
if key_prefix.startswith("/"):
# remove preceding / character
# TODO unsure if this is correct
key_prefix = key_prefix[1:]

# loop over every reference in the ChunkManifest for that array
# TODO inefficient: this should be replaced with something that sets all (new) references for the array at once
# but Icechunk need to expose a suitable API first
it = np.nditer(
[manifest._paths, manifest._offsets, manifest._lengths],
flags=[
"refs_ok",
"multi_index",
"c_index", # TODO is "c_index" correct? what's the convention for zarr chunk keys?
],
op_flags=[["readonly"]] * 3,
)
for path, offset, length in it:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works, but it will set them in serial. You can do this in parallel, creating the tasks with asyncio.ensure_future instead of calling await. Then you just gather them as you do above into a single future to return out to await later, or you can await the gathered future in the function, depending on the level of concurrency you desire

Copy link
Author

@TomNicholas TomNicholas Oct 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, but wouldn't it be better for this iteration to just happen on the icechunk end instead? That would both simplify this code and presumably be more efficient overall.

e.g. can icechunk expose a method for setting the virtual refs of an entire array at once like

async def set_array_as_virtual_refs(
    self,
    key_prefix: str,
    paths: np.ndarray[Any, np.dtypes.StringDType],
    offsets: np.ndarray[Any, np.dtype[np.uint64]],
    lengths: np.ndarray[Any, np.dtype[np.uint64]],
):
    ...

then do the loop over the chunks in rust? Or do you think that's departing from the zarr-like abstraction that icechunk presents?

Copy link

@mpiannucci mpiannucci Oct 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a technical level supporting this on the rust side would be fast, but i am worried about a little about departure from being a Zarr store first and foremost and leaking the abstraction. This would most likely require a new dependency on the numpy rust bindings to be efficient enough to make a difference.

@rabernat also mentioned the possibility of adding another package specifically to make icechunk/virtualizarr more efficient which is possible as another option if we dont want to put it in main python bindings.

I think in the short term, lets focus on getting it to work as is and leave this as future optimization after we have some idea of the real world performance? Adding bulk will not be as hard as rounding out all the initial support things IMO

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a little worried about departure from being a Zarr store

require a new dependency on the numpy rust bindings

This is reasonable, but the alternative requires me to iterate in python over many millions of elements of numpy arrays, and send every single one off to icechunk as a separate async request. That seems unnecessary gymnastics when we already have all the elements arranged very neatly in-memory.

another package specifically to make icechunk/virtualizarr more efficient

Seems kinda over-complicated, but could definitely solve the problem.

I think in the short term, lets focus on getting it to work as is and leave this as future optimization after we have some idea of the real world performance? Adding bulk will not be as hard as rounding out all the initial support things IMO

Agree that getting it to work correctly with a nice user API is the priority for now, and we can worry about this again after measuring performance.

Copy link
Author

@TomNicholas TomNicholas Oct 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another idea to think about would be if virtualizarr's implementation of the chunk manifest actually used icechunk's rust implementation

zarr-developers/VirtualiZarr#23

This would be a stronger argument for a separate package IMO - a rust crate implementing the Manifest class that both icechunk and virtualizarr depended on, and could be used to exchange references efficiently between the two libraries.

cc @rabernat

index = it.multi_index
chunk_key = "/".join(str(i) for i in index)

key = f"{key_prefix}/{chunk_key}" # should be of form 'group/name/0/1/2'

print(key)

# TODO this needs to be awaited or something
# set each reference individually
await store.set_virtual_ref(
# TODO it would be marginally neater if I could pass the group and name as separate args
key=key,
location=path.item(),
offset=offset.item(),
length=length.item(),
)
Loading