Skip to content

Commit

Permalink
Linting
Browse files Browse the repository at this point in the history
  • Loading branch information
abarciauskas-bgse committed Oct 28, 2024
1 parent 0365a45 commit 5846d7e
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 31 deletions.
123 changes: 111 additions & 12 deletions virtualizarr/tests/test_writers/test_icechunk_append.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def generate_chunk_manifest(
def gen_virtual_dataset(
file_uri: str,
shape: tuple[int, int] = (3, 4),
chunks: tuple[int, int] = (3, 4),
chunk_shape: tuple[int, int] = (3, 4),
dtype: np.dtype = np.dtype("int32"),
compressor: str = None,
filters: str = None,
Expand All @@ -81,13 +81,13 @@ def gen_virtual_dataset(
manifest = generate_chunk_manifest(
file_uri,
shape=shape,
chunks=chunks,
chunks=chunk_shape,
base_offset=base_offset,
length=length,
)
zarray = ZArray(
shape=shape,
chunks=chunks,
chunks=chunk_shape,
dtype=dtype,
compressor=compressor,
filters=filters,
Expand All @@ -107,7 +107,11 @@ def gen_virtual_dataset(
)


def test_set_single_virtual_ref_without_encoding(
# Success cases


## When appending to a single virtual ref without encoding, it succeeds
def test_append_virtual_ref_without_encoding(
icechunk_storage: "StorageConfig", simple_netcdf4: str
):
import xarray as xr
Expand Down Expand Up @@ -139,7 +143,7 @@ def test_set_single_virtual_ref_without_encoding(
npt.assert_equal(array, expected_array)


# encoding is applied
## When appending to a virtual ref with encoding, it succeeds
def test_append_virtual_ref_with_encoding(
icechunk_storage: "StorageConfig", netcdf4_files: tuple[str, str]
):
Expand All @@ -153,7 +157,7 @@ def test_append_virtual_ref_with_encoding(
gen_virtual_dataset(
file_uri=filepath1,
shape=(1460, 25, 53),
chunks=(1460, 25, 53),
chunk_shape=(1460, 25, 53),
dims=["time", "lat", "lon"],
dtype=np.dtype("int16"),
variable_name="air",
Expand All @@ -164,7 +168,7 @@ def test_append_virtual_ref_with_encoding(
gen_virtual_dataset(
file_uri=filepath2,
shape=(1460, 25, 53),
chunks=(1460, 25, 53),
chunk_shape=(1460, 25, 53),
dims=["time", "lat", "lon"],
dtype=np.dtype("int16"),
variable_name="air",
Expand Down Expand Up @@ -196,8 +200,103 @@ def test_append_virtual_ref_with_encoding(
npt.assert_equal(array.get_basic_selection() * scale_factor, expected_array)


# when chunking is different it fails
# There's a whole beartrap here around noticing if the last chunk is smaller than the other chunks. We should throw in that case (because zarr can't support it without variable-length chunks).
# when encoding is different it fails
# when using compression it works
# Should also test that it raises a clear error if you try to append with chunks of a different dtype etc. I would hope zarr-python would throw that for us.
## When appending to a virtual ref with compression, it succeeds
@pytest.mark.skip(reason="working on this")
def test_append_with_compression_succeeds(
icechunk_storage: "StorageConfig", simple_netcdf4: str
):
from icechunk import IcechunkStore

# Generate compressed dataset
vds = gen_virtual_dataset(
file_uri=simple_netcdf4, compressor="zlib", dtype=np.dtype("int16")
)

# Create icechunk store and commit the compressed dataset
icechunk_filestore = IcechunkStore.create(storage=icechunk_storage)
dataset_to_icechunk(vds, icechunk_filestore)
icechunk_filestore.commit("test commit")

# Append another dataset with compatible compression
icechunk_filestore_append = IcechunkStore.open_existing(
storage=icechunk_storage, mode="a"
)
dataset_to_icechunk(vds, icechunk_filestore_append, append_dim="x")


## When chunk shapes are different it fails
@pytest.mark.skip(reason="working on this")
def test_append_with_different_chunking_fails(
icechunk_storage: "StorageConfig", simple_netcdf4: str
):
from icechunk import IcechunkStore

# Generate a virtual dataset with specific chunking
vds = gen_virtual_dataset(file_uri=simple_netcdf4, chunk_shape=(3, 4))

# Create icechunk store and commit the dataset
icechunk_filestore = IcechunkStore.create(storage=icechunk_storage)
dataset_to_icechunk(vds, icechunk_filestore)
icechunk_filestore.commit("test commit")

# Try to append dataset with different chunking, expect failure
vds_different_chunking = gen_virtual_dataset(
file_uri=simple_netcdf4, chunk_shape=(1, 1)
)
icechunk_filestore_append = IcechunkStore.open_existing(
storage=icechunk_storage, mode="a"
)
with pytest.raises(ValueError, match="incompatible chunking"):
dataset_to_icechunk(
vds_different_chunking, icechunk_filestore_append, append_dim="x"
)


## When encoding is different it fails
@pytest.mark.skip(reason="working on this")
def test_append_with_different_encoding_fails(
icechunk_storage: "StorageConfig", simple_netcdf4: str
):
from icechunk import IcechunkStore

# Generate datasets with different encoding
vds1 = gen_virtual_dataset(file_uri=simple_netcdf4, encoding={"scale_factor": 0.1})
vds2 = gen_virtual_dataset(file_uri=simple_netcdf4, encoding={"scale_factor": 0.01})

# Create icechunk store and commit the first dataset
icechunk_filestore = IcechunkStore.create(storage=icechunk_storage)
dataset_to_icechunk(vds1, icechunk_filestore)
icechunk_filestore.commit("test commit")

# Try to append with different encoding, expect failure
icechunk_filestore_append = IcechunkStore.open_existing(
storage=icechunk_storage, mode="a"
)
with pytest.raises(ValueError, match="incompatible encoding"):
dataset_to_icechunk(vds2, icechunk_filestore_append, append_dim="x")


# When sizes of other dimensions are different, it fails
@pytest.mark.skip(reason="working on this")
def test_other_dimensions_different_length_fails(
icechunk_storage: "StorageConfig", simple_netcdf4: str
):
from icechunk import IcechunkStore

# Generate datasets with different lengths in non-append dimensions
vds1 = gen_virtual_dataset(file_uri=simple_netcdf4, shape=(5, 4)) # shape (5, 4)
vds2 = gen_virtual_dataset(file_uri=simple_netcdf4, shape=(6, 4)) # shape (6, 4)

# Create icechunk store and commit the first dataset
icechunk_filestore = IcechunkStore.create(storage=icechunk_storage)
dataset_to_icechunk(vds1, icechunk_filestore)
icechunk_filestore.commit("test commit")

# Attempt to append dataset with different length in non-append dimension, expect failure
icechunk_filestore_append = IcechunkStore.open_existing(
storage=icechunk_storage, mode="a"
)
with pytest.raises(
ValueError, match="incompatible lengths in non-append dimensions"
):
dataset_to_icechunk(vds2, icechunk_filestore_append, append_dim="x")
85 changes: 66 additions & 19 deletions virtualizarr/writers/icechunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from xarray.core.variable import Variable

from virtualizarr.manifests import ChunkManifest, ManifestArray
from virtualizarr.manifests import array_api as manifest_api
from virtualizarr.zarr import encode_dtype

if TYPE_CHECKING:
Expand Down Expand Up @@ -131,6 +132,50 @@ def write_variable_to_icechunk(
)


def num_chunks(
array,
axis: int,
):
return array.shape[axis] // array.chunks[axis]


def resize_array(
group: "Group",
name: str,
var: Variable,
append_axis: int,
): # -> "Array":
existing_array = group[name]
new_shape = list(existing_array.shape)
new_shape[append_axis] += var.shape[append_axis]
return existing_array.resize(tuple(new_shape))


def get_axis(
dims: list[str],
dim_name: str,
) -> int:
return dims.index(dim_name)


import zarr


def _check_compatibile_arrays(
ma: ManifestArray, existing_array: zarr.core.array.Array, append_axis: int
):
manifest_api._check_same_dtypes([ma.dtype, existing_array.dtype])
# this is kind of gross - _v3_codec_pipeline returns a tuple
# Question: Does anything need to be done to apply the codecs to the new manifest array?
manifest_api._check_same_codecs(
[list(ma.zarray._v3_codec_pipeline()), existing_array.metadata.codecs]
)
manifest_api._check_same_chunk_shapes([ma.chunks, existing_array.chunks])
manifest_api._check_same_ndims([ma.ndim, existing_array.ndim])
arr_shapes = [ma.shape, existing_array.shape]
manifest_api._check_same_shapes_except_on_concat_axis(arr_shapes, append_axis)


def write_virtual_variable_to_icechunk(
store: "IcechunkStore",
group: "Group",
Expand All @@ -141,34 +186,36 @@ def write_virtual_variable_to_icechunk(
"""Write a single virtual variable into an icechunk store"""
ma = cast(ManifestArray, var.data)
zarray = ma.zarray
shape = zarray.shape
mode = store.mode.str

# Aimee: resize the array if it already exists
# TODO: assert chunking and encoding is the same
existing_keys = tuple(group.array_keys())
dims = var.dims
append_axis, existing_num_chunks, arr = None, None, None
if name in existing_keys and mode == "a":
# resize
dims = var.dims
if append_dim in dims:
append_axis = dims.index(append_dim)
existing_array = group[name]
existing_size = existing_array.shape[append_axis]
existing_num_chunks = int(
existing_size / existing_array.chunks[append_axis]
)
new_shape = list(existing_array.shape)
new_shape[append_axis] += var.shape[append_axis]
# Tom: I wonder if some axis-handling logic from the concat function I wrote for ManifestArray could be re-used here.
shape = tuple(new_shape)
# this doesn't seem to actually resize the array
arr = existing_array.resize(shape)
if mode == "a" and append_dim in dims:
existing_array = group[name]
append_axis = get_axis(dims, append_dim)
# check if arrays can be concatenated
_check_compatibile_arrays(ma, existing_array, append_axis)

# determine number of existing chunks along the append axis
existing_num_chunks = num_chunks(
array=group[name],
axis=append_axis,
)

# resize the array
arr = resize_array(
group=group,
name=name,
var=var,
append_axis=append_axis,
)
else:
# create array if it doesn't already exist
arr = group.require_array(
name=name,
shape=shape,
shape=zarray.shape,
chunk_shape=zarray.chunks,
dtype=encode_dtype(zarray.dtype),
codecs=zarray._v3_codec_pipeline(),
Expand Down

0 comments on commit 5846d7e

Please sign in to comment.