diff --git a/icechunk-python/src/lib.rs b/icechunk-python/src/lib.rs index 33c8c3fd..a1b13bb0 100644 --- a/icechunk-python/src/lib.rs +++ b/icechunk-python/src/lib.rs @@ -29,7 +29,6 @@ use tokio::sync::{Mutex, RwLock}; struct PyIcechunkStore { consolidated: ConsolidatedStore, store: Arc>, - rt: tokio::runtime::Runtime, } #[pyclass(name = "StoreConfig")] @@ -166,8 +165,7 @@ impl PyIcechunkStore { let store = Store::from_consolidated(&consolidated, access_mode).await?; let store = Arc::new(RwLock::new(store)); - let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?; - Ok(Self { consolidated, store, rt }) + Ok(Self { consolidated, store }) } async fn as_consolidated(&self) -> PyIcechunkStoreResult { @@ -259,7 +257,8 @@ impl PyIcechunkStore { } fn as_bytes(&self) -> PyResult> { - let consolidated = self.rt.block_on(self.as_consolidated())?; + let consolidated = + pyo3_asyncio_0_21::tokio::get_runtime().block_on(self.as_consolidated())?; // FIXME: Use rmp_serde instead of serde_json to optimize performance let serialized = serde_json::to_vec(&consolidated) @@ -275,12 +274,10 @@ impl PyIcechunkStore { }; let readable_store = self.store.blocking_read(); - let consolidated = self.rt.block_on(self.as_consolidated())?; + let consolidated = + pyo3_asyncio_0_21::tokio::get_runtime().block_on(self.as_consolidated())?; let store = Arc::new(RwLock::new(readable_store.with_access_mode(access_mode))); - let rt = tokio::runtime::Runtime::new() - .map_err(|e| PyValueError::new_err(e.to_string()))?; - - Ok(PyIcechunkStore { consolidated, store, rt }) + Ok(PyIcechunkStore { consolidated, store }) } fn checkout_snapshot<'py>( @@ -340,7 +337,8 @@ impl PyIcechunkStore { #[getter] fn snapshot_id(&self) -> PyIcechunkStoreResult { let store = self.store.blocking_read(); - let snapshot_id = self.rt.block_on(store.snapshot_id()); + let snapshot_id = + pyo3_asyncio_0_21::tokio::get_runtime().block_on(store.snapshot_id()); Ok(snapshot_id.to_string()) } @@ -385,8 +383,7 @@ impl PyIcechunkStore { fn change_set_bytes(&self) -> PyIcechunkStoreResult> { let store = self.store.blocking_read(); - let res = self - .rt + let res = pyo3_asyncio_0_21::tokio::get_runtime() .block_on(store.change_set_bytes()) .map_err(PyIcechunkStoreError::from)?; Ok(res) @@ -402,7 +399,8 @@ impl PyIcechunkStore { #[getter] fn has_uncommitted_changes(&self) -> PyIcechunkStoreResult { let store = self.store.blocking_read(); - let has_uncommitted_changes = self.rt.block_on(store.has_uncommitted_changes()); + let has_uncommitted_changes = pyo3_asyncio_0_21::tokio::get_runtime() + .block_on(store.has_uncommitted_changes()); Ok(has_uncommitted_changes) } @@ -459,8 +457,7 @@ impl PyIcechunkStore { } fn ancestry(&self) -> PyIcechunkStoreResult { - let list = self - .rt + let list = pyo3_asyncio_0_21::tokio::get_runtime() .block_on(async move { let store = self.store.read().await; store.ancestry().await @@ -713,8 +710,7 @@ impl PyIcechunkStore { } fn list(&self) -> PyIcechunkStoreResult { - let list = self - .rt + let list = pyo3_asyncio_0_21::tokio::get_runtime() .block_on(async move { let store = self.store.read().await; store.list().await @@ -726,8 +722,7 @@ impl PyIcechunkStore { } fn list_prefix(&self, prefix: String) -> PyIcechunkStoreResult { - let list = self - .rt + let list = pyo3_asyncio_0_21::tokio::get_runtime() .block_on(async move { let store = self.store.read().await; store.list_prefix(prefix.as_str()).await @@ -738,8 +733,7 @@ impl PyIcechunkStore { } fn list_dir(&self, prefix: String) -> PyIcechunkStoreResult { - let list = self - .rt + let list = pyo3_asyncio_0_21::tokio::get_runtime() .block_on(async move { let store = self.store.read().await; store.list_dir(prefix.as_str()).await diff --git a/icechunk/src/metadata/data_type.rs b/icechunk/src/metadata/data_type.rs index c052d1be..4349c5f2 100644 --- a/icechunk/src/metadata/data_type.rs +++ b/icechunk/src/metadata/data_type.rs @@ -20,8 +20,8 @@ pub enum DataType { Float64, Complex64, Complex128, - // FIXME: serde serialization - RawBits(usize), + String, + Bytes, } impl DataType { @@ -67,17 +67,9 @@ impl TryFrom<&str> for DataType { "float64" => Ok(DataType::Float64), "complex64" => Ok(DataType::Complex64), "complex128" => Ok(DataType::Complex128), - _ => { - let mut it = value.chars(); - if it.next() == Some('r') { - it.as_str() - .parse() - .map(DataType::RawBits) - .map_err(|_| "Cannot parse RawBits size") - } else { - Err("Unknown data type, cannot parse") - } - } + "string" => Ok(DataType::String), + "bytes" => Ok(DataType::Bytes), + _ => Err("Unknown data type, cannot parse"), } } } @@ -100,7 +92,8 @@ impl Display for DataType { Float64 => f.write_str("float64"), Complex64 => f.write_str("complex64"), Complex128 => f.write_str("complex128"), - RawBits(usize) => write!(f, "r{}", usize), + String => f.write_str("string"), + Bytes => f.write_str("bytes"), } } } diff --git a/icechunk/src/metadata/fill_value.rs b/icechunk/src/metadata/fill_value.rs index 7f03f49e..31910248 100644 --- a/icechunk/src/metadata/fill_value.rs +++ b/icechunk/src/metadata/fill_value.rs @@ -1,3 +1,4 @@ +use itertools::Itertools; use serde::{Deserialize, Serialize}; use test_strategy::Arbitrary; @@ -22,7 +23,8 @@ pub enum FillValue { Float64(f64), Complex64(f32, f32), Complex128(f64, f64), - RawBits(Vec), + String(String), + Bytes(Vec), } impl FillValue { @@ -181,20 +183,29 @@ impl FillValue { } } - (DataType::RawBits(n), serde_json::Value::Array(arr)) if arr.len() == *n => { - let bits = arr + (DataType::String, serde_json::Value::String(s)) => { + Ok(FillValue::String(s.clone())) + } + + (DataType::Bytes, serde_json::Value::Array(arr)) => { + let bytes = arr .iter() .map(|b| FillValue::from_data_type_and_json(&DataType::UInt8, b)) .collect::, _>>()?; - Ok(FillValue::RawBits( - bits.iter() + Ok(FillValue::Bytes( + bytes + .iter() .map(|b| match b { - FillValue::UInt8(n) => *n, - _ => 0, + FillValue::UInt8(n) => Ok(*n), + _ => Err(IcechunkFormatError::FillValueParse { + data_type: dt.clone(), + value: value.clone(), + }), }) - .collect(), + .try_collect()?, )) } + _ => Err(IcechunkFormatError::FillValueParse { data_type: dt.clone(), value: value.clone(), @@ -218,7 +229,8 @@ impl FillValue { FillValue::Float64(_) => DataType::Float64, FillValue::Complex64(_, _) => DataType::Complex64, FillValue::Complex128(_, _) => DataType::Complex128, - FillValue::RawBits(v) => DataType::RawBits(v.len()), + FillValue::String(_) => DataType::String, + FillValue::Bytes(_) => DataType::Bytes, } } } diff --git a/icechunk/src/zarr.rs b/icechunk/src/zarr.rs index f42c3640..454b471a 100644 --- a/icechunk/src/zarr.rs +++ b/icechunk/src/zarr.rs @@ -1168,7 +1168,8 @@ impl From for ZarrArrayMetadataSerialzer { } FillValue::Complex64(r, i) => ([r, i].as_ref()).into(), FillValue::Complex128(r, i) => ([r, i].as_ref()).into(), - FillValue::RawBits(r) => r.into(), + FillValue::String(s) => s.into(), + FillValue::Bytes(b) => b.into(), } }