Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch tokio runtime to use bundled pyo3_async #167

Merged
merged 1 commit into from
Oct 9, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 15 additions & 21 deletions icechunk-python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use tokio::sync::{Mutex, RwLock};
struct PyIcechunkStore {
consolidated: ConsolidatedStore,
store: Arc<RwLock<Store>>,
rt: tokio::runtime::Runtime,
}

#[pyclass(name = "StoreConfig")]
Expand Down Expand Up @@ -166,8 +165,7 @@ impl PyIcechunkStore {

let store = Store::from_consolidated(&consolidated, access_mode).await?;
let store = Arc::new(RwLock::new(store));
let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
Ok(Self { consolidated, store, rt })
Ok(Self { consolidated, store })
}

async fn as_consolidated(&self) -> PyIcechunkStoreResult<ConsolidatedStore> {
Expand Down Expand Up @@ -259,7 +257,8 @@ impl PyIcechunkStore {
}

fn as_bytes(&self) -> PyResult<Cow<[u8]>> {
let consolidated = self.rt.block_on(self.as_consolidated())?;
let consolidated =
pyo3_asyncio_0_21::tokio::get_runtime().block_on(self.as_consolidated())?;

// FIXME: Use rmp_serde instead of serde_json to optimize performance
let serialized = serde_json::to_vec(&consolidated)
Expand All @@ -275,12 +274,10 @@ impl PyIcechunkStore {
};

let readable_store = self.store.blocking_read();
let consolidated = self.rt.block_on(self.as_consolidated())?;
let consolidated =
pyo3_asyncio_0_21::tokio::get_runtime().block_on(self.as_consolidated())?;
let store = Arc::new(RwLock::new(readable_store.with_access_mode(access_mode)));
let rt = tokio::runtime::Runtime::new()
.map_err(|e| PyValueError::new_err(e.to_string()))?;

Ok(PyIcechunkStore { consolidated, store, rt })
Ok(PyIcechunkStore { consolidated, store })
}

fn checkout_snapshot<'py>(
Expand Down Expand Up @@ -340,7 +337,8 @@ impl PyIcechunkStore {
#[getter]
fn snapshot_id(&self) -> PyIcechunkStoreResult<String> {
let store = self.store.blocking_read();
let snapshot_id = self.rt.block_on(store.snapshot_id());
let snapshot_id =
pyo3_asyncio_0_21::tokio::get_runtime().block_on(store.snapshot_id());
Ok(snapshot_id.to_string())
}

Expand Down Expand Up @@ -385,8 +383,7 @@ impl PyIcechunkStore {

fn change_set_bytes(&self) -> PyIcechunkStoreResult<Vec<u8>> {
let store = self.store.blocking_read();
let res = self
.rt
let res = pyo3_asyncio_0_21::tokio::get_runtime()
.block_on(store.change_set_bytes())
.map_err(PyIcechunkStoreError::from)?;
Ok(res)
Expand All @@ -402,7 +399,8 @@ impl PyIcechunkStore {
#[getter]
fn has_uncommitted_changes(&self) -> PyIcechunkStoreResult<bool> {
let store = self.store.blocking_read();
let has_uncommitted_changes = self.rt.block_on(store.has_uncommitted_changes());
let has_uncommitted_changes = pyo3_asyncio_0_21::tokio::get_runtime()
.block_on(store.has_uncommitted_changes());
Ok(has_uncommitted_changes)
}

Expand Down Expand Up @@ -459,8 +457,7 @@ impl PyIcechunkStore {
}

fn ancestry(&self) -> PyIcechunkStoreResult<PyAsyncGenerator> {
let list = self
.rt
let list = pyo3_asyncio_0_21::tokio::get_runtime()
.block_on(async move {
let store = self.store.read().await;
store.ancestry().await
Expand Down Expand Up @@ -713,8 +710,7 @@ impl PyIcechunkStore {
}

fn list(&self) -> PyIcechunkStoreResult<PyAsyncGenerator> {
let list = self
.rt
let list = pyo3_asyncio_0_21::tokio::get_runtime()
.block_on(async move {
let store = self.store.read().await;
store.list().await
Expand All @@ -726,8 +722,7 @@ impl PyIcechunkStore {
}

fn list_prefix(&self, prefix: String) -> PyIcechunkStoreResult<PyAsyncGenerator> {
let list = self
.rt
let list = pyo3_asyncio_0_21::tokio::get_runtime()
.block_on(async move {
let store = self.store.read().await;
store.list_prefix(prefix.as_str()).await
Expand All @@ -738,8 +733,7 @@ impl PyIcechunkStore {
}

fn list_dir(&self, prefix: String) -> PyIcechunkStoreResult<PyAsyncGenerator> {
let list = self
.rt
let list = pyo3_asyncio_0_21::tokio::get_runtime()
.block_on(async move {
let store = self.store.read().await;
store.list_dir(prefix.as_str()).await
Expand Down
Loading