Skip to content

Commit

Permalink
WIP on ChunkManifest.from_arrays
Browse files Browse the repository at this point in the history
  • Loading branch information
norlandrhagen committed Dec 20, 2024
1 parent c1a5218 commit 6af84b4
Showing 1 changed file with 55 additions and 2 deletions.
57 changes: 55 additions & 2 deletions virtualizarr/readers/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async def _parse_zarr_v3_metadata(zarr_array: zarr.Array) -> ZArray:
)


async def build_chunk_manifest(
async def build_chunk_manifest_from_dict_mapping(
store_path: str, chunk_mapping_dict: dict, array_name: str, zarr_format: int
) -> ChunkManifest:
chunk_manifest_dict = {}
Expand All @@ -88,12 +88,59 @@ async def build_chunk_manifest(
return ChunkManifest(chunk_manifest_dict)


async def build_chunk_manifest(
zarr_array: zarr.AsyncArray, prefix: str, filepath: str
) -> ChunkManifest:
"""Build a ChunkManifest from arrays
keys will be chunks
add in filepath to chunks to make '_paths'
offsets are array of 0 of length (len(keys)) or len(paths)) np.ndarray[Any, np.dtype[np.uint64]]
sizes are '_lengths'
"""
import numpy as np

keys = [x async for x in zarr_array.store.list_prefix(prefix)]
filepath_list = [filepath] * len(keys)

# can this be lambda'ed?
# stolen from manifest.py
def combine_path_chunk(filepath: str, chunk_key: str):
return filepath + chunk_key

vectorized_chunk_path_combine_func = np.vectorize(
combine_path_chunk, otypes=[np.dtypes.StringDType()]
)

# _paths: np.ndarray[Any, np.dtypes.StringDType]
_paths = vectorized_chunk_path_combine_func(filepath_list, keys)

# _offsets: np.ndarray[Any, np.dtype[np.uint64]]
# this seems like a very overly complicated way to make a list of len n of 0s with a
# certain dtype... I might have gotten carried away on the np.vectorize hypetrain
_offsets = np.vectorize(lambda x: [x] * len(_paths), otypes=[np.uint64])(0)

# _lengths: np.ndarray[Any, np.dtype[np.uint64]]
# maybe concurrent_map isn't the way to go, I think it expects tuples...
_lengths = await concurrent_map((keys), zarr_array.store.getsize)

import ipdb

ipdb.set_trace()
return ChunkManifest.from_arrays(
paths=_paths, # type: ignore
offsets=_offsets,
lengths=_lengths,
)


async def get_chunk_mapping_prefix(zarr_array: zarr.AsyncArray, prefix: str) -> dict:
"""Create a chunk map"""

keys = [(x,) async for x in zarr_array.store.list_prefix(prefix)]

sizes = await concurrent_map(keys, zarr_array.store.getsize)

return {key[0]: size for key, size in zip(keys, sizes)}


Expand Down Expand Up @@ -122,11 +169,17 @@ async def virtual_variable_from_zarr_array(zarr_array: zarr.AsyncArray, filepath
array_name = zarr_array.basename
zarray_array = await build_zarray_metadata(zarr_array=zarr_array)

## TEST - build chunk manifest from arrays
chunk_manifest = await build_chunk_manifest(
zarr_array, prefix=f"{array_name}/c", filepath=filepath
)

# build mapping between chunks and # of bytes (size)
# FIXME!!!!: This is hardcoded for v3!
chunk_map = await get_chunk_mapping_prefix(zarr_array, prefix=f"{array_name}/c")

# transform chunk_map into ChunkManifest that fits into ManifestArray
chunk_manifest = await build_chunk_manifest(
chunk_manifest = await build_chunk_manifest_from_dict_mapping(
store_path=filepath,
chunk_mapping_dict=chunk_map,
array_name=array_name,
Expand Down

0 comments on commit 6af84b4

Please sign in to comment.