Skip to content

Commit

Permalink
Async R2D2 connection methods (#66)
Browse files Browse the repository at this point in the history
  • Loading branch information
smklein authored May 15, 2024
1 parent b13845a commit 718226e
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 10 deletions.
36 changes: 28 additions & 8 deletions src/async_traits.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Async versions of traits for issuing Diesel queries.
use crate::connection::Connection as SingleConnection;
use crate::connection::Connection;
use async_trait::async_trait;
use diesel::{
connection::{
Expand All @@ -12,6 +12,7 @@ use diesel::{
methods::{ExecuteDsl, LimitDsl, LoadQuery},
RunQueryDsl,
},
r2d2::R2D2Connection,
result::Error as DieselError,
};
use futures::future::BoxFuture;
Expand Down Expand Up @@ -40,6 +41,25 @@ fn retryable_error(err: &DieselError) -> bool {
}
}

/// An async variant of [`diesel::r2d2::R2D2Connection`].
#[async_trait]
pub trait AsyncR2D2Connection<Conn>: AsyncConnection<Conn>
where
Conn: 'static + DieselConnection + R2D2Connection,
Self: Send + Sized + 'static,
{
async fn ping_async(&mut self) -> diesel::result::QueryResult<()> {
self.as_async_conn().run(|conn| conn.ping()).await
}

async fn is_broken_async(&mut self) -> bool {
self.as_async_conn()
.run(|conn| Ok::<bool, ()>(conn.is_broken()))
.await
.unwrap()
}
}

/// An async variant of [`diesel::connection::Connection`].
#[async_trait]
pub trait AsyncConnection<Conn>: AsyncSimpleConnection<Conn>
Expand All @@ -52,7 +72,7 @@ where
#[doc(hidden)]
fn as_sync_conn(&self) -> MutexGuard<'_, Conn>;
#[doc(hidden)]
fn as_async_conn(&self) -> &SingleConnection<Conn>;
fn as_async_conn(&self) -> &Connection<Conn>;

/// Runs the function `f` in an context where blocking is safe.
async fn run<R, E, Func>(&self, f: Func) -> Result<R, E>
Expand Down Expand Up @@ -169,7 +189,7 @@ where
where
R: Any + Send + 'static,
Fut: FutureExt<Output = Result<R, DieselError>> + Send,
Func: (Fn(SingleConnection<Conn>) -> Fut) + Send + Sync,
Func: (Fn(Connection<Conn>) -> Fut) + Send + Sync,
RetryFut: FutureExt<Output = bool> + Send,
RetryFunc: Fn() -> RetryFut + Send + Sync,
{
Expand Down Expand Up @@ -201,7 +221,7 @@ where
#[cfg(feature = "cockroach")]
async fn transaction_async_with_retry_inner(
&self,
f: &(dyn Fn(SingleConnection<Conn>) -> BoxFuture<'_, Result<Box<dyn Any + Send>, DieselError>>
f: &(dyn Fn(Connection<Conn>) -> BoxFuture<'_, Result<Box<dyn Any + Send>, DieselError>>
+ Send
+ Sync),
retry: &(dyn Fn() -> BoxFuture<'_, bool> + Send + Sync),
Expand Down Expand Up @@ -231,7 +251,7 @@ where
// Add a SAVEPOINT to which we can later return.
Self::add_retry_savepoint(&conn).await?;

let async_conn = SingleConnection(Self::as_async_conn(&conn).0.clone());
let async_conn = Connection(Self::as_async_conn(&conn).0.clone());
match f(async_conn).await {
Ok(value) => {
// The user-level operation succeeded: try to commit the
Expand Down Expand Up @@ -288,7 +308,7 @@ where
R: Send + 'static,
E: From<DieselError> + Send + 'static,
Fut: Future<Output = Result<R, E>> + Send,
Func: FnOnce(SingleConnection<Conn>) -> Fut + Send,
Func: FnOnce(Connection<Conn>) -> Fut + Send,
{
// This function sure has a bunch of generic parameters, which can cause
// a lot of code to be generated, and can slow down compile-time.
Expand All @@ -314,7 +334,7 @@ where
async fn transaction_async_inner<'a, E>(
&'a self,
f: Box<
dyn FnOnce(SingleConnection<Conn>) -> BoxFuture<'a, Result<Box<dyn Any + Send>, E>>
dyn FnOnce(Connection<Conn>) -> BoxFuture<'a, Result<Box<dyn Any + Send>, E>>
+ Send
+ 'a,
>,
Expand Down Expand Up @@ -348,7 +368,7 @@ where
// enough to be referenceable by a Future, but short enough that we can
// guarantee it doesn't live persist after this function returns, feel
// free to make that change.
let async_conn = SingleConnection(Self::as_async_conn(&conn).0.clone());
let async_conn = Connection(Self::as_async_conn(&conn).0.clone());
match f(async_conn).await {
Ok(value) => {
conn.run_with_shared_connection(|conn| {
Expand Down
1 change: 0 additions & 1 deletion src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ where
self.inner()
}

// TODO: Consider removing me.
fn as_async_conn(&self) -> &Connection<Conn> {
self
}
Expand Down
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ mod connection_manager;
mod error;

pub use async_traits::{
AsyncConnection, AsyncRunQueryDsl, AsyncSaveChangesDsl, AsyncSimpleConnection,
AsyncConnection, AsyncR2D2Connection, AsyncRunQueryDsl, AsyncSaveChangesDsl,
AsyncSimpleConnection,
};
pub use connection::Connection;
pub use connection_manager::ConnectionManager;
Expand Down

0 comments on commit 718226e

Please sign in to comment.