Skip to content

Commit

Permalink
Switch tokio runtime to use bundled pyo3_async (#167)
Browse files Browse the repository at this point in the history
  • Loading branch information
mpiannucci authored Oct 9, 2024
1 parent e38c35b commit b183edd
Showing 1 changed file with 15 additions and 21 deletions.
36 changes: 15 additions & 21 deletions icechunk-python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use tokio::sync::{Mutex, RwLock};
struct PyIcechunkStore {
consolidated: ConsolidatedStore,
store: Arc<RwLock<Store>>,
rt: tokio::runtime::Runtime,
}

#[pyclass(name = "StoreConfig")]
Expand Down Expand Up @@ -166,8 +165,7 @@ impl PyIcechunkStore {

let store = Store::from_consolidated(&consolidated, access_mode).await?;
let store = Arc::new(RwLock::new(store));
let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
Ok(Self { consolidated, store, rt })
Ok(Self { consolidated, store })
}

async fn as_consolidated(&self) -> PyIcechunkStoreResult<ConsolidatedStore> {
Expand Down Expand Up @@ -259,7 +257,8 @@ impl PyIcechunkStore {
}

fn as_bytes(&self) -> PyResult<Cow<[u8]>> {
let consolidated = self.rt.block_on(self.as_consolidated())?;
let consolidated =
pyo3_asyncio_0_21::tokio::get_runtime().block_on(self.as_consolidated())?;

// FIXME: Use rmp_serde instead of serde_json to optimize performance
let serialized = serde_json::to_vec(&consolidated)
Expand All @@ -275,12 +274,10 @@ impl PyIcechunkStore {
};

let readable_store = self.store.blocking_read();
let consolidated = self.rt.block_on(self.as_consolidated())?;
let consolidated =
pyo3_asyncio_0_21::tokio::get_runtime().block_on(self.as_consolidated())?;
let store = Arc::new(RwLock::new(readable_store.with_access_mode(access_mode)));
let rt = tokio::runtime::Runtime::new()
.map_err(|e| PyValueError::new_err(e.to_string()))?;

Ok(PyIcechunkStore { consolidated, store, rt })
Ok(PyIcechunkStore { consolidated, store })
}

fn checkout_snapshot<'py>(
Expand Down Expand Up @@ -340,7 +337,8 @@ impl PyIcechunkStore {
#[getter]
fn snapshot_id(&self) -> PyIcechunkStoreResult<String> {
let store = self.store.blocking_read();
let snapshot_id = self.rt.block_on(store.snapshot_id());
let snapshot_id =
pyo3_asyncio_0_21::tokio::get_runtime().block_on(store.snapshot_id());
Ok(snapshot_id.to_string())
}

Expand Down Expand Up @@ -385,8 +383,7 @@ impl PyIcechunkStore {

fn change_set_bytes(&self) -> PyIcechunkStoreResult<Vec<u8>> {
let store = self.store.blocking_read();
let res = self
.rt
let res = pyo3_asyncio_0_21::tokio::get_runtime()
.block_on(store.change_set_bytes())
.map_err(PyIcechunkStoreError::from)?;
Ok(res)
Expand All @@ -402,7 +399,8 @@ impl PyIcechunkStore {
#[getter]
fn has_uncommitted_changes(&self) -> PyIcechunkStoreResult<bool> {
let store = self.store.blocking_read();
let has_uncommitted_changes = self.rt.block_on(store.has_uncommitted_changes());
let has_uncommitted_changes = pyo3_asyncio_0_21::tokio::get_runtime()
.block_on(store.has_uncommitted_changes());
Ok(has_uncommitted_changes)
}

Expand Down Expand Up @@ -459,8 +457,7 @@ impl PyIcechunkStore {
}

fn ancestry(&self) -> PyIcechunkStoreResult<PyAsyncGenerator> {
let list = self
.rt
let list = pyo3_asyncio_0_21::tokio::get_runtime()
.block_on(async move {
let store = self.store.read().await;
store.ancestry().await
Expand Down Expand Up @@ -713,8 +710,7 @@ impl PyIcechunkStore {
}

fn list(&self) -> PyIcechunkStoreResult<PyAsyncGenerator> {
let list = self
.rt
let list = pyo3_asyncio_0_21::tokio::get_runtime()
.block_on(async move {
let store = self.store.read().await;
store.list().await
Expand All @@ -726,8 +722,7 @@ impl PyIcechunkStore {
}

fn list_prefix(&self, prefix: String) -> PyIcechunkStoreResult<PyAsyncGenerator> {
let list = self
.rt
let list = pyo3_asyncio_0_21::tokio::get_runtime()
.block_on(async move {
let store = self.store.read().await;
store.list_prefix(prefix.as_str()).await
Expand All @@ -738,8 +733,7 @@ impl PyIcechunkStore {
}

fn list_dir(&self, prefix: String) -> PyIcechunkStoreResult<PyAsyncGenerator> {
let list = self
.rt
let list = pyo3_asyncio_0_21::tokio::get_runtime()
.block_on(async move {
let store = self.store.read().await;
store.list_dir(prefix.as_str()).await
Expand Down

0 comments on commit b183edd

Please sign in to comment.