Skip to content

Commit

Permalink
bit of restructure
Browse files Browse the repository at this point in the history
  • Loading branch information
norlandrhagen committed Dec 19, 2024
1 parent 1c5e42d commit 89d8555
Showing 1 changed file with 53 additions and 51 deletions.
104 changes: 53 additions & 51 deletions virtualizarr/readers/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,9 @@ async def _parse_zarr_v3_metadata(zarr_array: zarr.Array) -> ZArray:
else:
fill_value = zarr_array.metadata.fill_value

# Codecs from test looks like: (BytesCodec(endian=<Endian.little: 'little'>),)
# Questions: What do we do with endian info?
codecs = get_codecs(zarr_array)

# Question: How should we parse the values from get_codecs?
# typing: Union[Codec, tuple["ArrayArrayCodec | ArrayBytesCodec | BytesBytesCodec", ...]]
# mypy: ... is not indexable [index]
# added tmp bypyass for mypy
compressor = getattr(codecs[0], "compressor", None) # type: ignore
filters = getattr(codecs[0], "filters", None) # type: ignore

Expand All @@ -71,7 +66,7 @@ async def build_chunk_manifest(
store_path: str, chunk_mapping_dict: dict, array_name: str, zarr_format: int
) -> ChunkManifest:
chunk_manifest_dict = {}

# ToDo: We could skip the dict creation and built the Manifest from arrays directly
for key, value in chunk_mapping_dict.items():
if zarr_format == 2:
# split on array name + trailing slash
Expand All @@ -83,7 +78,7 @@ async def build_chunk_manifest(
chunk_key = (
key.split(array_name + "/")[-1].split("c/")[-1].replace("/", ".")
)
# key.split('/c/')[-1]

chunk_manifest_dict[chunk_key] = {
"path": store_path + "/" + key,
"offset": 0,
Expand Down Expand Up @@ -124,12 +119,12 @@ async def build_zarray_metadata(zarr_array: zarr.AsyncArray):


async def virtual_variable_from_zarr_array(zarr_array: zarr.AsyncArray, filepath: str):
# keys: array_zarray & array_dims
array_name = zarr_array.basename
zarray_array = await build_zarray_metadata(zarr_array=zarr_array)

array_name = zarr_array.basename
# build mapping between chunks and # of bytes (size)
chunk_map = await get_chunk_mapping_prefix(zarr_array, prefix=f"{array_name}/c")

# transform chunk_map into ChunkManifest that fits into ManifestArray
chunk_manifest = await build_chunk_manifest(
store_path=filepath,
Expand All @@ -151,23 +146,18 @@ async def virtual_variable_from_zarr_array(zarr_array: zarr.AsyncArray, filepath


async def virtual_dataset_from_zarr_group(
zarr_group: zarr.AsyncGroup,
filepath: str,
group: str | None = None,
group: str,
drop_variables: Iterable[str] | None = [],
virtual_variables: Iterable[str] | None = [],
loadable_variables: Iterable[str] | None = [],
decode_times: bool | None = None,
indexes: Mapping[str, Index] | None = None,
reader_options: dict = {},
):
import zarr

zg = await zarr.api.asynchronous.open_group(
filepath, storage_options=reader_options.get("storage_options"), mode="r"
)

virtual_zarr_arrays = await asyncio.gather(
*[zg.getitem(var) for var in virtual_variables]
*[zarr_group.getitem(var) for var in virtual_variables]
)

virtual_variable_arrays = await asyncio.gather(
Expand Down Expand Up @@ -211,7 +201,7 @@ async def virtual_dataset_from_zarr_group(
loadable_vars=loadable_vars,
indexes=indexes,
coord_names=coord_names,
attrs=zg.attrs,
attrs=zarr_group.attrs,
)


Expand All @@ -228,11 +218,6 @@ def open_virtual_dataset(
reader_options: Optional[dict] = None,
) -> Dataset:
# Question: Is this something we want to pass through?
if virtual_backend_kwargs:
raise NotImplementedError(
"Zarr reader does not understand any virtual_backend_kwargs"
)

import asyncio

import zarr
Expand All @@ -241,40 +226,56 @@ def open_virtual_dataset(
if version.parse(zarr.__version__).major < 3:
raise ImportError("Zarr V3 is required")

drop_variables, loadable_variables = check_for_collisions(
drop_variables,
loadable_variables,
)
async def _open_virtual_dataset(
filepath=filepath,
group=group,
drop_variables=drop_variables,
loadable_variables=loadable_variables,
decode_times=decode_times,
indexes=indexes,
virtual_backend_kwargs=virtual_backend_kwargs,
reader_options=reader_options,
):
if virtual_backend_kwargs:
raise NotImplementedError(
"Zarr reader does not understand any virtual_backend_kwargs"
)

drop_variables, loadable_variables = check_for_collisions(
drop_variables,
loadable_variables,
)

filepath = validate_and_normalize_path_to_uri(
filepath, fs_root=Path.cwd().as_uri()
)
# This currently fails for local filepaths (ie. tests):
# *** TypeError: Filesystem needs to support async operations.
# https://github.com/zarr-developers/zarr-python/issues/2554
filepath = validate_and_normalize_path_to_uri(
filepath, fs_root=Path.cwd().as_uri()
)
# This currently fails for local filepaths (ie. tests):
# *** TypeError: Filesystem needs to support async operations.
# https://github.com/zarr-developers/zarr-python/issues/2554

if reader_options is None:
reader_options = {}
if reader_options is None:
reader_options = {}

# This is just to grab array keys, so is sync alright?
zg = zarr.open_group(
filepath, storage_options=reader_options.get("storage_options"), mode="r"
)
zg = await zarr.api.asynchronous.open_group(
filepath,
storage_options=reader_options.get("storage_options"),
mode="r",
)

zarr_array_keys = [val for val in zg.array_keys()]
zarr_array_keys = [key async for key in zg.array_keys()]

missing_vars = set(loadable_variables) - set(zarr_array_keys)
if missing_vars:
raise ValueError(
f"Some loadable variables specified are not present in this zarr store: {missing_vars}"
missing_vars = set(loadable_variables) - set(zarr_array_keys)
if missing_vars:
raise ValueError(
f"Some loadable variables specified are not present in this zarr store: {missing_vars}"
)
virtual_vars = list(
set(zarr_array_keys) - set(loadable_variables) - set(drop_variables)
)
virtual_vars = list(
set(zarr_array_keys) - set(loadable_variables) - set(drop_variables)
)

# How does this asyncio.run call interact with zarr-pythons async event loop?
return asyncio.run(
virtual_dataset_from_zarr_group(
# How does this asyncio.run call interact with zarr-pythons async event loop?
return await virtual_dataset_from_zarr_group(
zarr_group=zg,
filepath=filepath,
group=group,
virtual_variables=virtual_vars,
Expand All @@ -284,4 +285,5 @@ def open_virtual_dataset(
indexes=indexes,
reader_options=reader_options,
)
)

return asyncio.run(_open_virtual_dataset())

0 comments on commit 89d8555

Please sign in to comment.