Skip to content

Commit

Permalink
Merge branch 'main' into feature/xarray-demo
Browse files Browse the repository at this point in the history
  • Loading branch information
mpiannucci authored Oct 9, 2024
2 parents 8e64a9a + b183edd commit a9c40d7
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 45 deletions.
36 changes: 15 additions & 21 deletions icechunk-python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use tokio::sync::{Mutex, RwLock};
struct PyIcechunkStore {
consolidated: ConsolidatedStore,
store: Arc<RwLock<Store>>,
rt: tokio::runtime::Runtime,
}

#[pyclass(name = "StoreConfig")]
Expand Down Expand Up @@ -166,8 +165,7 @@ impl PyIcechunkStore {

let store = Store::from_consolidated(&consolidated, access_mode).await?;
let store = Arc::new(RwLock::new(store));
let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
Ok(Self { consolidated, store, rt })
Ok(Self { consolidated, store })
}

async fn as_consolidated(&self) -> PyIcechunkStoreResult<ConsolidatedStore> {
Expand Down Expand Up @@ -259,7 +257,8 @@ impl PyIcechunkStore {
}

fn as_bytes(&self) -> PyResult<Cow<[u8]>> {
let consolidated = self.rt.block_on(self.as_consolidated())?;
let consolidated =
pyo3_asyncio_0_21::tokio::get_runtime().block_on(self.as_consolidated())?;

// FIXME: Use rmp_serde instead of serde_json to optimize performance
let serialized = serde_json::to_vec(&consolidated)
Expand All @@ -275,12 +274,10 @@ impl PyIcechunkStore {
};

let readable_store = self.store.blocking_read();
let consolidated = self.rt.block_on(self.as_consolidated())?;
let consolidated =
pyo3_asyncio_0_21::tokio::get_runtime().block_on(self.as_consolidated())?;
let store = Arc::new(RwLock::new(readable_store.with_access_mode(access_mode)));
let rt = tokio::runtime::Runtime::new()
.map_err(|e| PyValueError::new_err(e.to_string()))?;

Ok(PyIcechunkStore { consolidated, store, rt })
Ok(PyIcechunkStore { consolidated, store })
}

fn checkout_snapshot<'py>(
Expand Down Expand Up @@ -340,7 +337,8 @@ impl PyIcechunkStore {
#[getter]
fn snapshot_id(&self) -> PyIcechunkStoreResult<String> {
let store = self.store.blocking_read();
let snapshot_id = self.rt.block_on(store.snapshot_id());
let snapshot_id =
pyo3_asyncio_0_21::tokio::get_runtime().block_on(store.snapshot_id());
Ok(snapshot_id.to_string())
}

Expand Down Expand Up @@ -385,8 +383,7 @@ impl PyIcechunkStore {

fn change_set_bytes(&self) -> PyIcechunkStoreResult<Vec<u8>> {
let store = self.store.blocking_read();
let res = self
.rt
let res = pyo3_asyncio_0_21::tokio::get_runtime()
.block_on(store.change_set_bytes())
.map_err(PyIcechunkStoreError::from)?;
Ok(res)
Expand All @@ -402,7 +399,8 @@ impl PyIcechunkStore {
#[getter]
fn has_uncommitted_changes(&self) -> PyIcechunkStoreResult<bool> {
let store = self.store.blocking_read();
let has_uncommitted_changes = self.rt.block_on(store.has_uncommitted_changes());
let has_uncommitted_changes = pyo3_asyncio_0_21::tokio::get_runtime()
.block_on(store.has_uncommitted_changes());
Ok(has_uncommitted_changes)
}

Expand Down Expand Up @@ -459,8 +457,7 @@ impl PyIcechunkStore {
}

fn ancestry(&self) -> PyIcechunkStoreResult<PyAsyncGenerator> {
let list = self
.rt
let list = pyo3_asyncio_0_21::tokio::get_runtime()
.block_on(async move {
let store = self.store.read().await;
store.ancestry().await
Expand Down Expand Up @@ -713,8 +710,7 @@ impl PyIcechunkStore {
}

fn list(&self) -> PyIcechunkStoreResult<PyAsyncGenerator> {
let list = self
.rt
let list = pyo3_asyncio_0_21::tokio::get_runtime()
.block_on(async move {
let store = self.store.read().await;
store.list().await
Expand All @@ -726,8 +722,7 @@ impl PyIcechunkStore {
}

fn list_prefix(&self, prefix: String) -> PyIcechunkStoreResult<PyAsyncGenerator> {
let list = self
.rt
let list = pyo3_asyncio_0_21::tokio::get_runtime()
.block_on(async move {
let store = self.store.read().await;
store.list_prefix(prefix.as_str()).await
Expand All @@ -738,8 +733,7 @@ impl PyIcechunkStore {
}

fn list_dir(&self, prefix: String) -> PyIcechunkStoreResult<PyAsyncGenerator> {
let list = self
.rt
let list = pyo3_asyncio_0_21::tokio::get_runtime()
.block_on(async move {
let store = self.store.read().await;
store.list_dir(prefix.as_str()).await
Expand Down
21 changes: 7 additions & 14 deletions icechunk/src/metadata/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ pub enum DataType {
Float64,
Complex64,
Complex128,
// FIXME: serde serialization
RawBits(usize),
String,
Bytes,
}

impl DataType {
Expand Down Expand Up @@ -67,17 +67,9 @@ impl TryFrom<&str> for DataType {
"float64" => Ok(DataType::Float64),
"complex64" => Ok(DataType::Complex64),
"complex128" => Ok(DataType::Complex128),
_ => {
let mut it = value.chars();
if it.next() == Some('r') {
it.as_str()
.parse()
.map(DataType::RawBits)
.map_err(|_| "Cannot parse RawBits size")
} else {
Err("Unknown data type, cannot parse")
}
}
"string" => Ok(DataType::String),
"bytes" => Ok(DataType::Bytes),
_ => Err("Unknown data type, cannot parse"),
}
}
}
Expand All @@ -100,7 +92,8 @@ impl Display for DataType {
Float64 => f.write_str("float64"),
Complex64 => f.write_str("complex64"),
Complex128 => f.write_str("complex128"),
RawBits(usize) => write!(f, "r{}", usize),
String => f.write_str("string"),
Bytes => f.write_str("bytes"),
}
}
}
30 changes: 21 additions & 9 deletions icechunk/src/metadata/fill_value.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use test_strategy::Arbitrary;

Expand All @@ -22,7 +23,8 @@ pub enum FillValue {
Float64(f64),
Complex64(f32, f32),
Complex128(f64, f64),
RawBits(Vec<u8>),
String(String),
Bytes(Vec<u8>),
}

impl FillValue {
Expand Down Expand Up @@ -181,20 +183,29 @@ impl FillValue {
}
}

(DataType::RawBits(n), serde_json::Value::Array(arr)) if arr.len() == *n => {
let bits = arr
(DataType::String, serde_json::Value::String(s)) => {
Ok(FillValue::String(s.clone()))
}

(DataType::Bytes, serde_json::Value::Array(arr)) => {
let bytes = arr
.iter()
.map(|b| FillValue::from_data_type_and_json(&DataType::UInt8, b))
.collect::<Result<Vec<_>, _>>()?;
Ok(FillValue::RawBits(
bits.iter()
Ok(FillValue::Bytes(
bytes
.iter()
.map(|b| match b {
FillValue::UInt8(n) => *n,
_ => 0,
FillValue::UInt8(n) => Ok(*n),
_ => Err(IcechunkFormatError::FillValueParse {
data_type: dt.clone(),
value: value.clone(),
}),
})
.collect(),
.try_collect()?,
))
}

_ => Err(IcechunkFormatError::FillValueParse {
data_type: dt.clone(),
value: value.clone(),
Expand All @@ -218,7 +229,8 @@ impl FillValue {
FillValue::Float64(_) => DataType::Float64,
FillValue::Complex64(_, _) => DataType::Complex64,
FillValue::Complex128(_, _) => DataType::Complex128,
FillValue::RawBits(v) => DataType::RawBits(v.len()),
FillValue::String(_) => DataType::String,
FillValue::Bytes(_) => DataType::Bytes,
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion icechunk/src/zarr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,8 @@ impl From<ZarrArrayMetadata> for ZarrArrayMetadataSerialzer {
}
FillValue::Complex64(r, i) => ([r, i].as_ref()).into(),
FillValue::Complex128(r, i) => ([r, i].as_ref()).into(),
FillValue::RawBits(r) => r.into(),
FillValue::String(s) => s.into(),
FillValue::Bytes(b) => b.into(),
}
}

Expand Down

0 comments on commit a9c40d7

Please sign in to comment.