Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

253 implement fault tolerance mechanisms eg retries dead letter queues circuit breakers e5 #280

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ pub(crate) enum ForwardError {
UncoordinatedSender,
#[error("Internal server error")]
InternalServerError,
#[error("Service unavailable")]
CircuitOpen,
}

impl IntoResponse for ForwardError {
Expand All @@ -18,6 +20,7 @@ impl IntoResponse for ForwardError {
ForwardError::MalformedBody => StatusCode::BAD_REQUEST,
ForwardError::UncoordinatedSender => StatusCode::UNAUTHORIZED,
ForwardError::InternalServerError => StatusCode::INTERNAL_SERVER_ERROR,
ForwardError::CircuitOpen => StatusCode::SERVICE_UNAVAILABLE,
};

let body = Json(serde_json::json!({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ use mongodb::bson::doc;
use serde_json::{json, Value};
use shared::{
repository::entity::{Connection, RoutedMessage},
retry::{retry_async, RetryOptions},
state::{AppState, AppStateRepository},
};
use std::sync::Arc;
use std::time::Duration;

/// Mediator receives forwarded messages, extract the next field in the message body, and the attachments in the message
/// then stores the attachment with the next field as key for pickup
Expand All @@ -24,6 +26,11 @@ pub(crate) async fn mediator_forward_process(
.as_ref()
.ok_or_else(|| ForwardError::InternalServerError)?;

let circuit_breaker = state.circuit_breaker.clone();
if circuit_breaker.is_open() {
return Err(ForwardError::CircuitOpen);
}
ndefokou marked this conversation as resolved.
Show resolved Hide resolved

let next = match checks(&message, connection_repository).await.ok() {
Some(next) => Ok(next),
None => Err(ForwardError::InternalServerError),
Expand All @@ -36,15 +43,38 @@ pub(crate) async fn mediator_forward_process(
AttachmentData::Base64 { value: data } => json!(data.base64),
AttachmentData::Links { value: data } => json!(data.links),
};
message_repository
.store(RoutedMessage {
id: None,
message: attached,
recipient_did: next.as_ref().unwrap().to_owned(),
})
.await
.map_err(|_| ForwardError::InternalServerError)?;

let result = retry_async(
|| {
let attached = attached.clone();
let recipient_did = next.as_ref().unwrap().to_owned();

async move {
message_repository
.store(RoutedMessage {
id: None,
message: attached,
recipient_did,
})
.await
}
},
RetryOptions::new()
.retries(5)
.exponential_backoff(Duration::from_millis(100))
.max_delay(Duration::from_secs(1)),
)
.await;
ndefokou marked this conversation as resolved.
Show resolved Hide resolved

match result {
Ok(_) => circuit_breaker.record_success(),
Err(_) => {
circuit_breaker.record_failure();
return Err(ForwardError::InternalServerError);
}
};
}

Ok(None)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
use shared::{
midlw::ensure_transport_return_route_is_decorated_all,
repository::entity::Connection,
retry::{retry_async, RetryOptions},
state::{AppState, AppStateRepository},
CircuitBreaker::CircuitBreaker,

Check warning on line 26 in crates/web-plugins/didcomm-messaging/protocols/mediator-coordination/src/handler/stateful.rs

View workflow job for this annotation

GitHub Actions / Build and test

unused import: `CircuitBreaker::CircuitBreaker`

Check warning on line 26 in crates/web-plugins/didcomm-messaging/protocols/mediator-coordination/src/handler/stateful.rs

View workflow job for this annotation

GitHub Actions / Build and test

unused import: `CircuitBreaker::CircuitBreaker`
ndefokou marked this conversation as resolved.
Show resolved Hide resolved
};
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
use uuid::Uuid;

/// Process a DIDComm mediate request
Expand Down Expand Up @@ -53,10 +55,24 @@
.ok_or(MediationError::InternalServerError)?;

// If there is already mediation, send mediate deny
if let Some(_connection) = connection_repository
.find_one_by(doc! { "client_did": sender_did})
.await
.map_err(|_| MediationError::InternalServerError)?
if let Some(_connection) = retry_async(
|| {
let sender_did = sender_did.clone();
let connection_repository = connection_repository.clone();

async move {
connection_repository
.find_one_by(doc! { "client_did": sender_did })
.await
}
},
RetryOptions::new()
.retries(5)
.exponential_backoff(Duration::from_millis(100))
.max_delay(Duration::from_secs(1)),
)
.await
.map_err(|_| MediationError::InternalServerError)?
{
tracing::info!("Sending mediate deny.");
return Ok(Some(
Expand Down Expand Up @@ -85,15 +101,24 @@
.as_ref()
.ok_or(MediationError::InternalServerError)?;

let diddoc = state
.did_resolver
.resolve(&routing_did)
.await
.map_err(|err| {
tracing::error!("Failed to resolve DID: {:?}", err);
MediationError::InternalServerError
})?
.ok_or(MediationError::InternalServerError)?;
let diddoc = retry_async(
|| {
let did_resolver = state.did_resolver.clone();
let routing_did = routing_did.clone();

async move { did_resolver.resolve(&routing_did).await.map_err(|_| ()) }
},
ndefokou marked this conversation as resolved.
Show resolved Hide resolved
RetryOptions::new()
.retries(5)
.exponential_backoff(Duration::from_millis(100))
.max_delay(Duration::from_secs(2)),
)
.await
.map_err(|err| {
tracing::error!("Failed to resolve DID: {:?}", err);
MediationError::InternalServerError
})?
.ok_or(MediationError::InternalServerError)?;

let agreem_keys_jwk: Jwk = agreem_keys.try_into().unwrap();

Expand Down Expand Up @@ -229,11 +254,29 @@

// Find connection for this keylist update

let connection = connection_repository
.find_one_by(doc! { "client_did": &sender })
.await
.unwrap()
.ok_or_else(|| MediationError::UncoordinatedSender)?;
let connection = retry_async(
|| {
let connection_repository = connection_repository.clone();
let sender = sender.clone();

async move {
connection_repository
.find_one_by(doc! { "client_did": &sender })
.await
.map_err(|_| ())
}
},
RetryOptions::new()
.retries(5)
.exponential_backoff(Duration::from_millis(100))
.max_delay(Duration::from_secs(2)),
)
.await
.map_err(|err| {
tracing::error!("Failed to find connection after retries: {:?}", err);
MediationError::InternalServerError
})?
.ok_or_else(|| MediationError::UncoordinatedSender)?;

// Prepare handles to relevant collections

Expand Down Expand Up @@ -347,11 +390,29 @@
.as_ref()
.ok_or(MediationError::InternalServerError)?;

let connection = connection_repository
.find_one_by(doc! { "client_did": &sender })
.await
.unwrap()
.ok_or_else(|| MediationError::UncoordinatedSender)?;
let connection = retry_async(
|| {
let connection_repository = connection_repository.clone();
let sender = sender.clone();

async move {
connection_repository
.find_one_by(doc! { "client_did": &sender })
.await
.map_err(|_| ())
}
},
RetryOptions::new()
.retries(5)
.exponential_backoff(Duration::from_millis(100))
.max_delay(Duration::from_secs(2)),
)
.await
.map_err(|err| {
tracing::error!("Failed to find connection after retries: {:?}", err);
MediationError::InternalServerError
})?
.ok_or_else(|| MediationError::UncoordinatedSender)?;

println!("keylist: {:?}", connection);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

#[error("Malformed request. {0}")]
MalformedRequest(String),

#[error("Service unavailable")]
CircuitOpen,

Check warning on line 19 in crates/web-plugins/didcomm-messaging/protocols/pickup/src/error.rs

View workflow job for this annotation

GitHub Actions / Build and test

variant `CircuitOpen` is never constructed

Check warning on line 19 in crates/web-plugins/didcomm-messaging/protocols/pickup/src/error.rs

View workflow job for this annotation

GitHub Actions / Build and test

variant `CircuitOpen` is never constructed
}

impl IntoResponse for PickupError {
Expand All @@ -22,6 +25,7 @@
PickupError::MissingSenderDID | PickupError::MalformedRequest(_) => {
StatusCode::BAD_REQUEST
}
PickupError::CircuitOpen => StatusCode::SERVICE_UNAVAILABLE,
PickupError::InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PickupError::MissingClientConnection => StatusCode::UNAUTHORIZED,
};
Expand Down
Loading
Loading