diff --git a/pyproject.toml b/pyproject.toml index 9887c824ca..97746e2d36 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -181,11 +181,6 @@ check_untyped_defs = false [[tool.mypy.overrides]] module = [ "zarr.v2.*", - "zarr.abc.codec", - "zarr.codecs.bytes", - "zarr.codecs.pipeline", - "zarr.codecs.sharding", - "zarr.codecs.transpose", "zarr.array_v2", "zarr.array", "zarr.sync", @@ -207,9 +202,6 @@ module = [ "zarr.array", "zarr.common", "zarr.store.local", - "zarr.codecs.blosc", - "zarr.codecs.gzip", - "zarr.codecs.zstd", ] disallow_untyped_calls = false diff --git a/src/zarr/codecs/__init__.py b/src/zarr/codecs/__init__.py index 8fa0c9f7b0..9afca97d67 100644 --- a/src/zarr/codecs/__init__.py +++ b/src/zarr/codecs/__init__.py @@ -7,3 +7,4 @@ from zarr.codecs.sharding import ShardingCodec, ShardingCodecIndexLocation # noqa: F401 from zarr.codecs.transpose import TransposeCodec # noqa: F401 from zarr.codecs.zstd import ZstdCodec # noqa: F401 +import zarr.codecs.numcodecs_ # noqa: F401 diff --git a/src/zarr/codecs/numcodecs_.py b/src/zarr/codecs/numcodecs_.py new file mode 100644 index 0000000000..c7803d5caf --- /dev/null +++ b/src/zarr/codecs/numcodecs_.py @@ -0,0 +1,253 @@ +from dataclasses import dataclass +from functools import cached_property +import math +from typing_extensions import Self +from warnings import warn + +import numpy as np +from zarr.abc.codec import ArrayArrayCodec, BytesBytesCodec, Codec + +import numcodecs +from zarr.codecs.registry import register_codec +from zarr.common import JSON, ArraySpec, BytesLike, parse_named_configuration, product, to_thread +from zarr.config import RuntimeConfiguration +from zarr.metadata import ArrayMetadata + +CODEC_PREFIX = "https://zarr.dev/numcodecs/" + + +def parse_codec_configuration(data: dict[str, JSON], expected_name_prefix: str) -> dict[str, JSON]: + parsed_name, parsed_configuration = parse_named_configuration(data) + if not parsed_name.startswith(expected_name_prefix): + raise ValueError( + f"Expected name to start with '{expected_name_prefix}'. Got {parsed_name} instead." + ) + id = parsed_name[len(expected_name_prefix):] + return {"id": id, **parsed_configuration} + + +@dataclass(frozen=True) +class NumcodecsCodec(Codec): + codec_config: dict[str, JSON] + + def __init__(self, *, codec_id: str | None = None, codec_config: dict[str, JSON]) -> None: + if "id" not in codec_config: + if not codec_id: + raise ValueError( + "The codec id needs to be supplied either through the id attribute " + "of the codec_config or through the codec_id argument." + ) + codec_config = {"id": codec_id, **codec_config} + elif codec_id and codec_config["id"] != codec_id: + raise ValueError(f"Codec id does not match {codec_id}. Got: {codec_config['id']}.") + + object.__setattr__(self, "codec_config", codec_config) + warn( + "Numcodecs codecs are not in the Zarr version 3 specification and " + "may not be supported by other zarr implementations.", + category=UserWarning, + ) + + @cached_property + def _codec(self) -> numcodecs.abc.Codec: + return numcodecs.get_codec(self.codec_config) + + @classmethod + def from_dict(cls, data: dict[str, JSON]) -> Self: + codec_config = parse_codec_configuration(data, CODEC_PREFIX) + assert isinstance(codec_config["id"], str) # for mypy + return cls(codec_config=codec_config) + + def to_dict(self) -> JSON: + codec_config = self.codec_config.copy() + codec_id = codec_config.pop("id") + return { + "name": f"{CODEC_PREFIX}{codec_id}", + "configuration": codec_config, + } + + def compute_encoded_size(self, input_byte_length: int, chunk_spec: ArraySpec) -> int: + return input_byte_length + + +class NumcodecsBytesBytesCodec(NumcodecsCodec, BytesBytesCodec): + def __init__(self, *, codec_id: str, codec_config: dict[str, JSON]) -> None: + super().__init__(codec_id=codec_id, codec_config=codec_config) + + async def decode( + self, + chunk_bytes: BytesLike, + _chunk_spec: ArraySpec, + _runtime_configuration: RuntimeConfiguration, + ) -> BytesLike: + return await to_thread(self._codec.decode, chunk_bytes) + + def _encode(self, chunk_bytes: BytesLike) -> BytesLike: + encoded = self._codec.encode(chunk_bytes) + if isinstance(encoded, np.ndarray): # Required for checksum codecs + return encoded.tobytes() + return encoded + + async def encode( + self, + chunk_bytes: BytesLike, + _chunk_spec: ArraySpec, + _runtime_configuration: RuntimeConfiguration, + ) -> BytesLike: + return await to_thread(self._encode, chunk_bytes) + + +class NumcodecsArrayArrayCodec(NumcodecsCodec, ArrayArrayCodec): + def __init__(self, *, codec_id: str, codec_config: dict[str, JSON]) -> None: + super().__init__(codec_id=codec_id, codec_config=codec_config) + + async def decode( + self, + chunk_array: np.ndarray, + chunk_spec: ArraySpec, + _runtime_configuration: RuntimeConfiguration, + ) -> np.ndarray: + out = await to_thread(self._codec.decode, chunk_array) + return out.reshape(chunk_spec.shape) + + async def encode( + self, + chunk_array: np.ndarray, + _chunk_spec: ArraySpec, + _runtime_configuration: RuntimeConfiguration, + ) -> np.ndarray: + return await to_thread(self._codec.encode, chunk_array) + + +def make_bytes_bytes_codec(codec_id: str) -> type[NumcodecsBytesBytesCodec]: + # rename for class scope + _codec_id = codec_id + + class _Codec(NumcodecsBytesBytesCodec): + def __init__(self, codec_config: dict[str, JSON] = {}) -> None: + super().__init__(codec_id=_codec_id, codec_config=codec_config) + + return _Codec + + +def make_array_array_codec(codec_id: str) -> type[NumcodecsArrayArrayCodec]: + # rename for class scope + _codec_id = codec_id + + class _Codec(NumcodecsArrayArrayCodec): + def __init__(self, codec_config: dict[str, JSON] = {}) -> None: + super().__init__(codec_id=_codec_id, codec_config=codec_config) + + return _Codec + + +def make_checksum_codec(codec_id: str) -> type[NumcodecsBytesBytesCodec]: + # rename for class scope + _codec_id = codec_id + + class _ChecksumCodec(NumcodecsBytesBytesCodec): + def __init__(self, codec_config: dict[str, JSON] = {}) -> None: + super().__init__(codec_id=_codec_id, codec_config=codec_config) + + def compute_encoded_size(self, input_byte_length: int, chunk_spec: ArraySpec) -> int: + return input_byte_length + 4 + + return _ChecksumCodec + + +class ShuffleCodec(NumcodecsBytesBytesCodec): + def __init__(self, codec_config: dict[str, JSON] = {}) -> None: + super().__init__(codec_id="shuffle", codec_config=codec_config) + + def evolve(self, array_spec: ArraySpec) -> Self: + if array_spec.dtype.itemsize != self.codec_config.get("elementsize"): + return self.__class__({**self.codec_config, "elementsize": array_spec.dtype.itemsize}) + return self + + +class FixedScaleOffsetCodec(NumcodecsArrayArrayCodec): + def __init__(self, codec_config: dict[str, JSON] = {}) -> None: + super().__init__(codec_id="fixedscaleoffset", codec_config=codec_config) + + def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec: + if astype := self.codec_config.get("astype"): + return ArraySpec( + chunk_spec.shape, + np.dtype(astype), + chunk_spec.fill_value, + ) + return chunk_spec + + def evolve(self, array_spec: ArraySpec) -> Self: + if str(array_spec.dtype) != self.codec_config.get("dtype"): + return self.__class__({**self.codec_config, "dtype": str(array_spec.dtype)}) + return self + + +class QuantizeCodec(NumcodecsArrayArrayCodec): + def __init__(self, codec_config: dict[str, JSON] = {}) -> None: + super().__init__(codec_id="quantize", codec_config=codec_config) + + def evolve(self, array_spec: ArraySpec) -> Self: + if str(array_spec.dtype) != self.codec_config.get("dtype"): + return self.__class__({**self.codec_config, "dtype": str(array_spec.dtype)}) + return self + + +class AsTypeCodec(NumcodecsArrayArrayCodec): + def __init__(self, codec_config: dict[str, JSON] = {}) -> None: + super().__init__(codec_id="astype", codec_config=codec_config) + + def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec: + return ArraySpec( + chunk_spec.shape, + np.dtype(self.codec_config["encode_dtype"]), + chunk_spec.fill_value, + ) + + def evolve(self, array_spec: ArraySpec) -> Self: + decode_dtype = self.codec_config.get("decode_dtype") + if str(array_spec.dtype) != decode_dtype: + return self.__class__({**self.codec_config, "decode_dtype": str(array_spec.dtype)}) + return self + + +class PackbitsCodec(NumcodecsArrayArrayCodec): + def __init__(self, codec_config: dict[str, JSON] = {}) -> None: + super().__init__(codec_id="packbits", codec_config=codec_config) + + def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec: + return ArraySpec( + (1 + math.ceil(product(chunk_spec.shape) / 8),), + np.dtype("uint8"), + chunk_spec.fill_value, + ) + + def validate(self, array_metadata: ArrayMetadata) -> None: + if array_metadata.dtype != np.dtype("bool"): + raise ValueError(f"Packbits filter requires bool dtype. Got {array_metadata.dtype}.") + + +# bytes-to-bytes codecs +register_codec(f"{CODEC_PREFIX}blosc", make_bytes_bytes_codec("blosc")) +register_codec(f"{CODEC_PREFIX}lz4", make_bytes_bytes_codec("lz4")) +register_codec(f"{CODEC_PREFIX}zstd", make_bytes_bytes_codec("zstd")) +register_codec(f"{CODEC_PREFIX}zlib", make_bytes_bytes_codec("zlib")) +register_codec(f"{CODEC_PREFIX}gzip", make_bytes_bytes_codec("gzip")) +register_codec(f"{CODEC_PREFIX}bz2", make_bytes_bytes_codec("bz2")) +register_codec(f"{CODEC_PREFIX}lzma", make_bytes_bytes_codec("lzma")) +register_codec(f"{CODEC_PREFIX}shuffle", ShuffleCodec) + +# array-to-array codecs ("filters") +register_codec(f"{CODEC_PREFIX}delta", make_array_array_codec("delta")) +register_codec(f"{CODEC_PREFIX}bitround", make_array_array_codec("bitround")) +register_codec(f"{CODEC_PREFIX}fixedscaleoffset", FixedScaleOffsetCodec) +register_codec(f"{CODEC_PREFIX}quantize", QuantizeCodec) +register_codec(f"{CODEC_PREFIX}packbits", PackbitsCodec) +register_codec(f"{CODEC_PREFIX}astype", AsTypeCodec) + +# bytes-to-bytes checksum codecs +register_codec(f"{CODEC_PREFIX}crc32", make_checksum_codec("crc32")) +register_codec(f"{CODEC_PREFIX}adler32", make_checksum_codec("adler32")) +register_codec(f"{CODEC_PREFIX}fletcher32", make_checksum_codec("fletcher32")) +register_codec(f"{CODEC_PREFIX}jenkins_lookup3", make_checksum_codec("jenkins_lookup3")) diff --git a/src/zarr/codecs/registry.py b/src/zarr/codecs/registry.py index 140e1372ef..707331efe4 100644 --- a/src/zarr/codecs/registry.py +++ b/src/zarr/codecs/registry.py @@ -2,33 +2,27 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: - from typing import Dict, Type from zarr.abc.codec import Codec from importlib.metadata import EntryPoint, entry_points as get_entry_points -__codec_registry: Dict[str, Type[Codec]] = {} -__lazy_load_codecs: Dict[str, EntryPoint] = {} +__codec_registry: dict[str, type[Codec]] = {} +__lazy_load_codecs: dict[str, EntryPoint] = {} -def _collect_entrypoints() -> None: +def _collect_entrypoints() -> dict[str, EntryPoint]: entry_points = get_entry_points() - if hasattr(entry_points, "select"): - # If entry_points() has a select method, use that. Python 3.10+ - for e in entry_points.select(group="zarr.codecs"): - __lazy_load_codecs[e.name] = e - else: - # Otherwise, fallback to using get - for e in entry_points.get("zarr.codecs", []): - __lazy_load_codecs[e.name] = e + for e in entry_points.select(group="zarr.codecs"): + __lazy_load_codecs[e.name] = e + return __lazy_load_codecs -def register_codec(key: str, codec_cls: Type[Codec]) -> None: +def register_codec(key: str, codec_cls: type[Codec]) -> None: __codec_registry[key] = codec_cls -def get_codec_class(key: str) -> Type[Codec]: +def get_codec_class(key: str) -> type[Codec]: item = __codec_registry.get(key) if item is None: if key in __lazy_load_codecs: diff --git a/src/zarr/codecs/sharding.py b/src/zarr/codecs/sharding.py index d4f8b7dfc9..0436c96f1f 100644 --- a/src/zarr/codecs/sharding.py +++ b/src/zarr/codecs/sharding.py @@ -18,6 +18,8 @@ from zarr.codecs.registry import register_codec from zarr.common import ( ArraySpec, + ChunkCoords, + BytesLike, ChunkCoordsLike, concurrent_map, parse_enum, @@ -45,13 +47,12 @@ from zarr.store import StorePath from zarr.common import ( JSON, - ChunkCoords, - BytesLike, SliceSelection, ) from zarr.config import RuntimeConfiguration MAX_UINT_64 = 2**64 - 1 +ShardMapping = Mapping[ChunkCoords, BytesLike | None] class ShardingCodecIndexLocation(Enum): @@ -126,7 +127,7 @@ def create_empty(cls, chunks_per_shard: ChunkCoords) -> _ShardIndex: return cls(offsets_and_lengths) -class _ShardProxy(Mapping): +class _ShardProxy(ShardMapping): index: _ShardIndex buf: BytesLike @@ -175,7 +176,7 @@ def merge_with_morton_order( cls, chunks_per_shard: ChunkCoords, tombstones: Set[ChunkCoords], - *shard_dicts: Mapping[ChunkCoords, BytesLike], + *shard_dicts: ShardMapping, ) -> _ShardBuilder: obj = cls.create_empty(chunks_per_shard) for chunk_coords in morton_order_iter(chunks_per_shard): @@ -375,7 +376,7 @@ async def decode_partial( all_chunk_coords = set(chunk_coords for chunk_coords, _, _ in indexed_chunks) # reading bytes of all requested chunks - shard_dict: Mapping[ChunkCoords, BytesLike] = {} + shard_dict: ShardMapping = {} if self._is_total_shard(all_chunk_coords, chunks_per_shard): # read entire shard shard_dict_maybe = await self._load_full_shard_maybe(store_path, chunks_per_shard) @@ -417,7 +418,7 @@ async def decode_partial( async def _read_chunk( self, - shard_dict: Mapping[ChunkCoords, Optional[BytesLike]], + shard_dict: ShardMapping, chunk_coords: ChunkCoords, chunk_selection: SliceSelection, out_selection: SliceSelection, diff --git a/src/zarr/common.py b/src/zarr/common.py index 7d8431f97e..8581f8f8cd 100644 --- a/src/zarr/common.py +++ b/src/zarr/common.py @@ -1,5 +1,16 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Union, Tuple, Iterable, Dict, List, TypeVar, overload, Any +from typing import ( + TYPE_CHECKING, + ParamSpec, + Union, + Tuple, + Iterable, + Dict, + List, + TypeVar, + overload, + Any, +) import asyncio import contextvars from dataclasses import dataclass @@ -48,7 +59,11 @@ async def run(item): return await asyncio.gather(*[asyncio.ensure_future(run(item)) for item in items]) -async def to_thread(func, /, *args, **kwargs): +P = ParamSpec("P") +U = TypeVar("U") + + +async def to_thread(func: Callable[P, U], /, *args: P.args, **kwargs: P.kwargs) -> U: loop = asyncio.get_running_loop() ctx = contextvars.copy_context() func_call = functools.partial(ctx.run, func, *args, **kwargs) diff --git a/tests/v3/test_codecs.py b/tests/v3/test_codecs.py index ffd225668b..e250d46d54 100644 --- a/tests/v3/test_codecs.py +++ b/tests/v3/test_codecs.py @@ -7,10 +7,11 @@ import numpy as np import pytest +from zarr.codecs.registry import get_codec_class import zarr.v2 from zarr.abc.codec import Codec from zarr.array import Array, AsyncArray -from zarr.common import Selection +from zarr.common import JSON, Selection from zarr.indexing import morton_order_iter from zarr.codecs import ( ShardingCodec, @@ -1042,3 +1043,151 @@ def test_update_attributes_array(store: Store): a = Array.open(store / "update_attributes") assert a.attrs["hello"] == "zarrita" + + +@pytest.mark.parametrize( + "codec_id", ["blosc", "lz4", "zstd", "zlib", "gzip", "bz2", "lzma", "shuffle"] +) +def test_generic_codec(store: Store, codec_id: str): + data = np.arange(0, 256, dtype="uint16").reshape((16, 16)) + + with pytest.warns(UserWarning, match="Numcodecs.*"): + a = Array.create( + store / "generic", + shape=data.shape, + chunk_shape=(16, 16), + dtype=data.dtype, + fill_value=0, + codecs=[ + BytesCodec(), + get_codec_class(f"https://zarr.dev/numcodecs/{codec_id}")({"id": codec_id}), + ], + ) + + a[:, :] = data.copy() + assert np.array_equal(data, a[:, :]) + + +@pytest.mark.parametrize( + "codec_config", + [ + {"id": "delta", "dtype": "float32"}, + {"id": "fixedscaleoffset", "offset": 0, "scale": 25.5}, + {"id": "fixedscaleoffset", "offset": 0, "scale": 51, "astype": "uint16"}, + {"id": "astype", "encode_dtype": "float32", "decode_dtype": "float64"}, + ], + ids=[ + "delta", + "fixedscaleoffset", + "fixedscaleoffset2", + "astype", + ], +) +def test_generic_filter(store: Store, codec_config: dict[str, JSON]): + data = np.linspace(0, 10, 256, dtype="float32").reshape((16, 16)) + + codec_id = codec_config["id"] + del codec_config["id"] + + with pytest.warns(UserWarning, match="Numcodecs.*"): + a = Array.create( + store / "generic", + shape=data.shape, + chunk_shape=(16, 16), + dtype=data.dtype, + fill_value=0, + codecs=[ + get_codec_class(f"https://zarr.dev/numcodecs/{codec_id}")(codec_config), + BytesCodec(), + ], + ) + + a[:, :] = data.copy() + a = Array.open(store / "generic") + assert np.array_equal(data, a[:, :]) + + +def test_generic_filter_bitround(store: Store): + data = np.linspace(0, 1, 256, dtype="float32").reshape((16, 16)) + + with pytest.warns(UserWarning, match="Numcodecs.*"): + a = Array.create( + store / "generic_bitround", + shape=data.shape, + chunk_shape=(16, 16), + dtype=data.dtype, + fill_value=0, + codecs=[ + get_codec_class("https://zarr.dev/numcodecs/bitround")({"keepbits": 3}), + BytesCodec(), + ], + ) + + a[:, :] = data.copy() + a = Array.open(store / "generic_bitround") + assert np.allclose(data, a[:, :], atol=0.1) + + +def test_generic_filter_quantize(store: Store): + data = np.linspace(0, 10, 256, dtype="float32").reshape((16, 16)) + + with pytest.warns(UserWarning, match="Numcodecs.*"): + a = Array.create( + store / "generic_quantize", + shape=data.shape, + chunk_shape=(16, 16), + dtype=data.dtype, + fill_value=0, + codecs=[ + get_codec_class("https://zarr.dev/numcodecs/quantize")({"digits": 3}), + BytesCodec(), + ], + ) + + a[:, :] = data.copy() + a = Array.open(store / "generic_quantize") + assert np.allclose(data, a[:, :], atol=0.001) + + +def test_generic_filter_packbits(store: Store): + data = np.zeros((16, 16), dtype="bool") + data[0:4, :] = True + + with pytest.warns(UserWarning, match="Numcodecs.*"): + a = Array.create( + store / "generic_packbits", + shape=data.shape, + chunk_shape=(16, 16), + dtype=data.dtype, + fill_value=0, + codecs=[ + get_codec_class("https://zarr.dev/numcodecs/packbits")(), + BytesCodec(), + ], + ) + + a[:, :] = data.copy() + a = Array.open(store / "generic_packbits") + assert np.array_equal(data, a[:, :]) + + +@pytest.mark.parametrize("codec_id", ["crc32", "adler32", "fletcher32", "jenkins_lookup3"]) +def test_generic_checksum(store: Store, codec_id: str): + data = np.linspace(0, 10, 256, dtype="float32").reshape((16, 16)) + + with pytest.warns(UserWarning, match="Numcodecs.*"): + a = Array.create( + store / "generic_checksum", + shape=data.shape, + chunk_shape=(16, 16), + dtype=data.dtype, + fill_value=0, + codecs=[ + BytesCodec(), + get_codec_class(f"https://zarr.dev/numcodecs/{codec_id}")(), + ], + ) + + a[:, :] = data.copy() + a = Array.open(store / "generic_checksum") + assert np.array_equal(data, a[:, :])