From 03560e29942225282a4ca53f99befdedb84cc825 Mon Sep 17 00:00:00 2001 From: Arun Raj M Date: Wed, 18 Oct 2023 11:55:32 +0000 Subject: [PATCH 1/6] Update connection_manager.rs --- src/connection_manager.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/connection_manager.rs b/src/connection_manager.rs index 5da755a..d02b750 100644 --- a/src/connection_manager.rs +++ b/src/connection_manager.rs @@ -77,7 +77,14 @@ where async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> { let c = Connection(conn.0.clone()); self.run_blocking(move |m| { - m.is_valid(&mut *c.inner())?; + if m.has_broken(&mut *c.inner()) { + return Err(ConnectionError::Connection( + diesel::r2d2::Error::ConnectionError(BadConnection( + "connection brokenn".to_string(), + )), + )); + } + // m.is_valid(&mut *c.inner())?; Ok(()) }) .await From d697d011dfd086b4586573ad612f8dc79763b177 Mon Sep 17 00:00:00 2001 From: Arun Raj M Date: Wed, 18 Oct 2023 17:27:43 +0530 Subject: [PATCH 2/6] chore: add imports --- src/connection_manager.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/connection_manager.rs b/src/connection_manager.rs index d02b750..c4ce305 100644 --- a/src/connection_manager.rs +++ b/src/connection_manager.rs @@ -3,6 +3,7 @@ use crate::{Connection, ConnectionError}; use async_trait::async_trait; use diesel::r2d2::{self, ManageConnection, R2D2Connection}; +use diesel::ConnectionError::BadConnection; use std::sync::{Arc, Mutex}; /// A connection manager which implements [`bb8::ManageConnection`] to From cd55d0c24598b9c964ce6acd15991eb6e0fefdc9 Mon Sep 17 00:00:00 2001 From: Arun Raj M Date: Wed, 18 Oct 2023 18:16:34 +0530 Subject: [PATCH 3/6] chore: panic instead of error --- src/connection_manager.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/connection_manager.rs b/src/connection_manager.rs index c4ce305..4adc256 100644 --- a/src/connection_manager.rs +++ b/src/connection_manager.rs @@ -79,11 +79,12 @@ where let c = Connection(conn.0.clone()); self.run_blocking(move |m| { if m.has_broken(&mut *c.inner()) { - return Err(ConnectionError::Connection( - diesel::r2d2::Error::ConnectionError(BadConnection( - "connection brokenn".to_string(), - )), - )); + // let error = Err(ConnectionError::Connection( + // diesel::r2d2::Error::ConnectionError(BadConnection( + // "connection brokenn".to_string(), + // )), + // )); + panic!("connection broken"); } // m.is_valid(&mut *c.inner())?; Ok(()) From 69d59315bef9778903abdc6d923044ca2757fd02 Mon Sep 17 00:00:00 2001 From: Arun Raj M Date: Wed, 25 Oct 2023 03:18:44 +0530 Subject: [PATCH 4/6] chore: add blocking is_valid --- Cargo.toml | 1 + src/connection_manager.rs | 55 +++++++++++++++++++++++++++++---------- 2 files changed, 42 insertions(+), 14 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e137f9d..918c8a7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ async-trait = "0.1.73" diesel = { version = "2.1.0", default-features = false, features = [ "r2d2" ] } thiserror = "1.0" tokio = { version = "1.32", default-features = false, features = [ "rt-multi-thread" ] } +tracing = "0.1.37" [dev-dependencies] diesel = { version = "2.1.0", features = [ "postgres", "r2d2" ] } diff --git a/src/connection_manager.rs b/src/connection_manager.rs index 4adc256..254df00 100644 --- a/src/connection_manager.rs +++ b/src/connection_manager.rs @@ -3,7 +3,6 @@ use crate::{Connection, ConnectionError}; use async_trait::async_trait; use diesel::r2d2::{self, ManageConnection, R2D2Connection}; -use diesel::ConnectionError::BadConnection; use std::sync::{Arc, Mutex}; /// A connection manager which implements [`bb8::ManageConnection`] to @@ -58,6 +57,17 @@ impl ConnectionManager { // Intentionally panic if the inner closure panics. .unwrap() } + + fn run(&self, f: F) -> R + where + R: Send + 'static, + F: Send + 'static + FnOnce(&r2d2::ConnectionManager) -> R, + { + let cloned = self.inner.clone(); + let cloned = cloned.lock().unwrap(); + f(&*cloned) + } + } #[async_trait] @@ -75,23 +85,21 @@ where .map_err(ConnectionError::Connection) } + // async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> { + // let c = Connection(conn.0.clone()); + // self.run_blocking(move |m| { + // m.is_valid(&mut *c.inner())?; + // Ok(()) + // }) + // .await + // } + async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> { let c = Connection(conn.0.clone()); - self.run_blocking(move |m| { - if m.has_broken(&mut *c.inner()) { - // let error = Err(ConnectionError::Connection( - // diesel::r2d2::Error::ConnectionError(BadConnection( - // "connection brokenn".to_string(), - // )), - // )); - panic!("connection broken"); - } - // m.is_valid(&mut *c.inner())?; - Ok(()) - }) - .await + self.run(move |m| closure_for_is_valid_of_manager(m, c)) } + fn has_broken(&self, _: &mut Self::Connection) -> bool { // Diesel returns this value internally. We have no way of calling the // inner method without blocking as this method is not async, but `bb8` @@ -99,3 +107,22 @@ where false } } + +#[tracing::instrument(skip_all)] +fn closure_for_is_valid_of_manager( + m: &r2d2::ConnectionManager, + conn: Connection, +) -> Result<(), ConnectionError> +where + T: R2D2Connection + Send + 'static, +{ + if m.has_broken(&mut *conn.inner()) { + return Err(ConnectionError::Connection( + diesel::r2d2::Error::ConnectionError(diesel::ConnectionError::BadConnection( + "connection brokenn".to_string(), + )), + )); + } + Ok(()) +} + From 53b4ab901aab7635c8215fd1c2d542c8db443094 Mon Sep 17 00:00:00 2001 From: Arun Raj M Date: Wed, 25 Oct 2023 03:56:53 +0530 Subject: [PATCH 5/6] chore: add tracing 0.1.36 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 918c8a7..6a6dd8f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ async-trait = "0.1.73" diesel = { version = "2.1.0", default-features = false, features = [ "r2d2" ] } thiserror = "1.0" tokio = { version = "1.32", default-features = false, features = [ "rt-multi-thread" ] } -tracing = "0.1.37" +tracing = "0.1.36" [dev-dependencies] diesel = { version = "2.1.0", features = [ "postgres", "r2d2" ] } From 1ebaf18439b6de2d05838959cb2d285c106900c7 Mon Sep 17 00:00:00 2001 From: Arun Raj M Date: Sat, 4 Nov 2023 16:48:01 +0530 Subject: [PATCH 6/6] chore: add feature flag to use has_broken as valid check --- Cargo.toml | 4 +++- src/connection_manager.rs | 37 ++++++++++++++++++++++++------------- 2 files changed, 27 insertions(+), 14 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6a6dd8f..d93ea94 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,13 +8,15 @@ license = "MIT" repository = "https://github.com/oxidecomputer/async-bb8-diesel" keywords = ["diesel", "r2d2", "pool", "tokio", "async"] +[features] +use_has_broken_as_valid_check = [] + [dependencies] bb8 = "0.8" async-trait = "0.1.73" diesel = { version = "2.1.0", default-features = false, features = [ "r2d2" ] } thiserror = "1.0" tokio = { version = "1.32", default-features = false, features = [ "rt-multi-thread" ] } -tracing = "0.1.36" [dev-dependencies] diesel = { version = "2.1.0", features = [ "postgres", "r2d2" ] } diff --git a/src/connection_manager.rs b/src/connection_manager.rs index 254df00..2d50029 100644 --- a/src/connection_manager.rs +++ b/src/connection_manager.rs @@ -58,6 +58,7 @@ impl ConnectionManager { .unwrap() } + #[cfg(feature = "use_has_broken_as_valid_check")] fn run(&self, f: F) -> R where R: Send + 'static, @@ -67,7 +68,6 @@ impl ConnectionManager { let cloned = cloned.lock().unwrap(); f(&*cloned) } - } #[async_trait] @@ -85,20 +85,20 @@ where .map_err(ConnectionError::Connection) } - // async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> { - // let c = Connection(conn.0.clone()); - // self.run_blocking(move |m| { - // m.is_valid(&mut *c.inner())?; - // Ok(()) - // }) - // .await - // } - async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> { let c = Connection(conn.0.clone()); - self.run(move |m| closure_for_is_valid_of_manager(m, c)) - } + #[cfg(not(feature = "use_has_broken_as_valid_check"))] + { + self.run_blocking(move |m| closure_for_is_valid_of_manager(m, c)) + .await + } + + #[cfg(feature = "use_has_broken_as_valid_check")] + { + self.run(move |m| closure_for_is_valid_of_manager(m, c)) + } + } fn has_broken(&self, _: &mut Self::Connection) -> bool { // Diesel returns this value internally. We have no way of calling the @@ -108,7 +108,7 @@ where } } -#[tracing::instrument(skip_all)] +#[cfg(feature = "use_has_broken_as_valid_check")] fn closure_for_is_valid_of_manager( m: &r2d2::ConnectionManager, conn: Connection, @@ -126,3 +126,14 @@ where Ok(()) } +#[cfg(not(feature = "use_has_broken_as_valid_check"))] +fn closure_for_is_valid_of_manager( + m: &r2d2::ConnectionManager, + conn: Connection, +) -> Result<(), ConnectionError> +where + T: R2D2Connection + Send + 'static, +{ + m.is_valid(&mut *conn.inner())?; + Ok(()) +}