Skip to content

Commit

Permalink
Merge branch 'main' into seba/shrinking
Browse files Browse the repository at this point in the history
  • Loading branch information
paraseba authored Oct 13, 2024
2 parents 7c6a592 + 74821b2 commit 04a244f
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 22 deletions.
4 changes: 3 additions & 1 deletion icechunk-python/python/icechunk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from zarr.core.sync import SyncMixin

from ._icechunk_python import (
KeyNotFound,
PyIcechunkStore,
S3Credentials,
SnapshotMetadata,
Expand Down Expand Up @@ -292,9 +293,10 @@ async def get(
-------
Buffer
"""

try:
result = await self._store.get(key, byte_range)
except ValueError as _e:
except KeyNotFound as _e:
# Zarr python expects None to be returned if the key does not exist
# but an IcechunkStore returns an error if the key does not exist
return None
Expand Down
6 changes: 6 additions & 0 deletions icechunk-python/python/icechunk/_icechunk_python.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,12 @@ class VirtualRefConfig:
"""
...

class KeyNotFound(Exception):
def __init__(
self,
info: Any
): ...

class StoreConfig:
# The number of concurrent requests to make when fetching partial values
get_partial_values_concurrency: int | None
Expand Down
14 changes: 13 additions & 1 deletion icechunk-python/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use icechunk::{
format::IcechunkFormatError, repository::RepositoryError, zarr::StoreError,
};
use pyo3::{exceptions::PyValueError, PyErr};
use pyo3::{
exceptions::{PyException, PyValueError},
PyErr,
};
use thiserror::Error;

/// A simple wrapper around the StoreError to make it easier to convert to a PyErr
Expand All @@ -12,6 +15,8 @@ use thiserror::Error;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Error)]
pub(crate) enum PyIcechunkStoreError {
#[error("key not found error: {0}")]
KeyNotFound(#[from] KeyNotFound),
#[error("store error: {0}")]
StoreError(#[from] StoreError),
#[error("repository Error: {0}")]
Expand All @@ -33,3 +38,10 @@ impl From<PyIcechunkStoreError> for PyErr {
}

pub(crate) type PyIcechunkStoreResult<T> = Result<T, PyIcechunkStoreError>;

pyo3::create_exception!(
_icechunk_python,
KeyNotFound,
PyException,
"The key is not present in the repository"
);
35 changes: 21 additions & 14 deletions icechunk-python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use icechunk::{
repository::VirtualChunkLocation,
storage::virtual_ref::ObjectStoreVirtualChunkResolverConfig,
zarr::{
ConsolidatedStore, ObjectId, RepositoryConfig, StorageConfig, StoreOptions,
VersionInfo,
ConsolidatedStore, ObjectId, RepositoryConfig, StorageConfig, StoreError,
StoreOptions, VersionInfo,
},
Repository, SnapshotMetadata,
};
Expand All @@ -25,6 +25,8 @@ use storage::{PyS3Credentials, PyStorageConfig, PyVirtualRefConfig};
use streams::PyAsyncGenerator;
use tokio::sync::{Mutex, RwLock};

pub use errors::KeyNotFound;

#[pyclass]
struct PyIcechunkStore {
consolidated: ConsolidatedStore,
Expand Down Expand Up @@ -496,17 +498,20 @@ impl PyIcechunkStore {
let store = Arc::clone(&self.store);
pyo3_asyncio_0_21::tokio::future_into_py(py, async move {
let byte_range = byte_range.unwrap_or((None, None)).into();
let data = store
.read()
.await
.get(&key, &byte_range)
.await
.map_err(PyIcechunkStoreError::from)?;
let pybytes = Python::with_gil(|py| {
let bound_bytes = PyBytes::new_bound(py, &data);
bound_bytes.to_object(py)
});
Ok(pybytes)
let data = store.read().await.get(&key, &byte_range).await;
// We need to distinguish the "safe" case of trying to fetch an uninitialized key
// from other types of errors, we use KeyNotFound exception for that
match data {
Ok(data) => {
let pybytes = Python::with_gil(|py| {
let bound_bytes = PyBytes::new_bound(py, &data);
bound_bytes.to_object(py)
});
Ok(pybytes)
}
Err(StoreError::NotFound(_)) => Err(KeyNotFound::new_err(key)),
Err(err) => Err(PyIcechunkStoreError::StoreError(err).into()),
}
})
}

Expand All @@ -524,6 +529,7 @@ impl PyIcechunkStore {
.await
.map_err(PyIcechunkStoreError::StoreError)?;

// FIXME: this processing is hiding errors in certain keys
let result = partial_values_stream
.into_iter()
// If we want to error instead of returning None we can collect into
Expand Down Expand Up @@ -746,8 +752,9 @@ impl PyIcechunkStore {

/// The icechunk Python module implemented in Rust.
#[pymodule]
fn _icechunk_python(m: &Bound<'_, PyModule>) -> PyResult<()> {
fn _icechunk_python(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add("__version__", env!("CARGO_PKG_VERSION"))?;
m.add("KeyNotFound", py.get_type_bound::<KeyNotFound>())?;
m.add_class::<PyStorageConfig>()?;
m.add_class::<PyIcechunkStore>()?;
m.add_class::<PyS3Credentials>()?;
Expand Down
17 changes: 15 additions & 2 deletions icechunk-python/tests/test_virtual_ref.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import numpy as np
import pytest
import zarr
import zarr.core
import zarr.core.buffer
Expand Down Expand Up @@ -31,7 +32,7 @@ def write_chunks_to_minio(chunks: list[tuple[str, bytes]]):
store.put(key, data)


async def test_write_minino_virtual_refs():
async def test_write_minio_virtual_refs():
write_chunks_to_minio(
[
("path/to/python/chunk-1", b"first"),
Expand All @@ -56,14 +57,18 @@ async def test_write_minino_virtual_refs():
),
)

array = zarr.Array.create(store, shape=(1, 1, 2), chunk_shape=(1, 1, 1), dtype="i4")
array = zarr.Array.create(store, shape=(1, 1, 3), chunk_shape=(1, 1, 1), dtype="i4")

await store.set_virtual_ref(
"c/0/0/0", "s3://testbucket/path/to/python/chunk-1", offset=0, length=4
)
await store.set_virtual_ref(
"c/0/0/1", "s3://testbucket/path/to/python/chunk-2", offset=1, length=4
)
# we write a ref that simulates a lost chunk
await store.set_virtual_ref(
"c/0/0/2", "s3://testbucket/path/to/python/non-existing", offset=1, length=4
)

buffer_prototype = zarr.core.buffer.default_buffer_prototype()

Expand All @@ -78,6 +83,14 @@ async def test_write_minino_virtual_refs():
assert array[0, 0, 0] == 1936877926
assert array[0, 0, 1] == 1852793701

# fetch uninitialized chunk should be None
assert await store.get("c/0/0/3", prototype=buffer_prototype) is None

# fetching a virtual ref that disappeared should be an exception
with pytest.raises(ValueError):
# TODO: we should include the key and other info in the exception
await store.get("c/0/0/2", prototype=buffer_prototype)

_snapshot_id = await store.commit("Add virtual refs")


Expand Down
62 changes: 58 additions & 4 deletions icechunk/src/zarr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ pub enum KeyNotFoundError {
ChunkNotFound { key: String, path: Path, coords: ChunkIndices },
#[error("node not found at `{path}`")]
NodeNotFound { path: Path },
#[error("v2 key not found at `{key}`")]
ZarrV2KeyNotFound { key: String },
}

#[derive(Debug, Error)]
Expand Down Expand Up @@ -622,6 +624,9 @@ impl Store {
}
Ok(())
}
Key::ZarrV2(_) => Err(StoreError::Unimplemented(
"Icechunk cannot set Zarr V2 metadata keys",
)),
}
}

Expand All @@ -645,10 +650,6 @@ impl Store {
}

match Key::parse(key)? {
Key::Metadata { .. } => Err(StoreError::NotAllowed(format!(
"use .set to modify metadata for key {}",
key
))),
Key::Chunk { node_path, coords } => {
self.repository
.write()
Expand All @@ -661,6 +662,9 @@ impl Store {
.await?;
Ok(())
}
Key::Metadata { .. } | Key::ZarrV2(_) => Err(StoreError::NotAllowed(
format!("use .set to modify metadata for key {}", key),
)),
}
}

Expand Down Expand Up @@ -692,6 +696,7 @@ impl Store {
let repository = guard.deref_mut();
Ok(repository.set_chunk_ref(node_path, coords, None).await?)
}
Key::ZarrV2(_) => Ok(()),
}
}

Expand Down Expand Up @@ -956,6 +961,9 @@ async fn get_key(
Key::Chunk { node_path, coords } => {
get_chunk_bytes(key, node_path, coords, byte_range, repo).await
}
Key::ZarrV2(key) => {
Err(StoreError::NotFound(KeyNotFoundError::ZarrV2KeyNotFound { key }))
}
}?;

Ok(bytes)
Expand All @@ -973,6 +981,7 @@ async fn exists(key: &str, repo: &Repository) -> StoreResult<bool> {
enum Key {
Metadata { node_path: Path },
Chunk { node_path: Path, coords: ChunkIndices },
ZarrV2(String),
}

impl Key {
Expand All @@ -982,6 +991,18 @@ impl Key {

fn parse(key: &str) -> Result<Self, StoreError> {
fn parse_chunk(key: &str) -> Result<Key, StoreError> {
if key == ".zgroup"
|| key == ".zarray"
|| key == ".zattrs"
|| key == ".zmetadata"
|| key.ends_with("/.zgroup")
|| key.ends_with("/.zarray")
|| key.ends_with("/.zattrs")
|| key.ends_with("/.zmetadata")
{
return Ok(Key::ZarrV2(key.to_string()));
}

if key == "c" {
return Ok(Key::Chunk {
node_path: Path::root(),
Expand Down Expand Up @@ -1051,6 +1072,7 @@ impl Display for Key {
.join("/");
f.write_str(s.as_str())
}
Key::ZarrV2(key) => f.write_str(key.as_str()),
}
}
}
Expand Down Expand Up @@ -1425,6 +1447,38 @@ mod tests {
Key::parse("c/0/0"),
Ok(Key::Chunk { node_path, coords}) if node_path.to_string() == "/" && coords == ChunkIndices(vec![0,0])
));
assert!(matches!(
Key::parse(".zarray"),
Ok(Key::ZarrV2(s) ) if s == ".zarray"
));
assert!(matches!(
Key::parse(".zgroup"),
Ok(Key::ZarrV2(s) ) if s == ".zgroup"
));
assert!(matches!(
Key::parse(".zattrs"),
Ok(Key::ZarrV2(s) ) if s == ".zattrs"
));
assert!(matches!(
Key::parse(".zmetadata"),
Ok(Key::ZarrV2(s) ) if s == ".zmetadata"
));
assert!(matches!(
Key::parse("foo/.zgroup"),
Ok(Key::ZarrV2(s) ) if s == "foo/.zgroup"
));
assert!(matches!(
Key::parse("foo/bar/.zarray"),
Ok(Key::ZarrV2(s) ) if s == "foo/bar/.zarray"
));
assert!(matches!(
Key::parse("foo/.zmetadata"),
Ok(Key::ZarrV2(s) ) if s == "foo/.zmetadata"
));
assert!(matches!(
Key::parse("foo/.zattrs"),
Ok(Key::ZarrV2(s) ) if s == "foo/.zattrs"
));
}

#[test]
Expand Down

0 comments on commit 04a244f

Please sign in to comment.