Skip to content

Commit

Permalink
wrap breez server with connection fallbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
JssDWt committed Nov 29, 2024
1 parent 42b591e commit e4fa424
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 51 deletions.
17 changes: 12 additions & 5 deletions libs/sdk-common/src/breez_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::grpc::support_client::SupportClient;
use crate::grpc::swapper_client::SwapperClient;
use crate::grpc::{ChainApiServersRequest, PingRequest};
use crate::prelude::{ServiceConnectivityError, ServiceConnectivityErrorKind};
use crate::tonic_wrap::with_connection_fallback;

pub static PRODUCTION_BREEZSERVER_URL: &str = "https://bs1.breez.technology:443";
pub static STAGING_BREEZSERVER_URL: &str = "https://bs1-st.breez.technology:443";
Expand Down Expand Up @@ -112,9 +113,11 @@ impl BreezServer {

pub async fn fetch_mempoolspace_urls(&self) -> Result<Vec<String>, ServiceConnectivityError> {
let mut client = self.get_information_client().await;

let chain_api_servers = client
.chain_api_servers(ChainApiServersRequest {})
let mut client_clone = client.clone();
let chain_api_servers =
with_connection_fallback(client.chain_api_servers(ChainApiServersRequest {}), || {
client_clone.chain_api_servers(ChainApiServersRequest {})
})
.await
.map_err(|e| {
ServiceConnectivityError::new(
Expand All @@ -138,9 +141,12 @@ impl BreezServer {

pub async fn fetch_boltz_swapper_urls(&self) -> Result<Vec<String>, ServiceConnectivityError> {
let mut client = self.get_information_client().await;
let mut client_clone = client.clone();

let chain_api_servers = client
.chain_api_servers(ChainApiServersRequest {})
let chain_api_servers =
with_connection_fallback(client.chain_api_servers(ChainApiServersRequest {}), || {
client_clone.chain_api_servers(ChainApiServersRequest {})
})
.await
.map_err(|e| {
ServiceConnectivityError::new(
Expand All @@ -163,6 +169,7 @@ impl BreezServer {
}
}

#[derive(Clone)]
pub struct ApiKeyInterceptor {
api_key_metadata: Option<MetadataValue<Ascii>>,
}
Expand Down
16 changes: 9 additions & 7 deletions libs/sdk-common/src/fiat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use std::collections::HashMap;

use anyhow::{anyhow, Result};
use serde::{Deserialize, Serialize};
use tonic::Request;

use crate::grpc::RatesRequest;
use crate::prelude::BreezServer;
use crate::tonic_wrap::with_connection_fallback;

/// Trait covering fiat-related functionality
#[tonic::async_trait]
Expand Down Expand Up @@ -97,12 +97,14 @@ impl FiatAPI for BreezServer {

async fn fetch_fiat_rates(&self) -> Result<Vec<Rate>> {
let mut client = self.get_information_client().await;

let request = Request::new(RatesRequest {});
let response = client
.rates(request)
.await
.map_err(|e| anyhow!("Fetch rates request failed: {e}"))?;
let mut client_clone = client.clone();

let request = RatesRequest {};
let response = with_connection_fallback(client.rates(request.clone()), || {
client_clone.rates(request)
})
.await
.map_err(|e| anyhow!("Fetch rates request failed: {e}"))?;

let mut rates = response.into_inner().rates;
rates.sort_by(|a, b| a.coin.cmp(&b.coin));
Expand Down
42 changes: 32 additions & 10 deletions libs/sdk-core/src/lsp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use sdk_common::grpc::{
SubscribeNotificationsRequest, UnsubscribeNotificationsRequest,
};
use sdk_common::prelude::BreezServer;
use sdk_common::tonic_wrap::with_connection_fallback;
use serde::{Deserialize, Serialize};
use tonic::Request;

/// Details of supported LSP
#[derive(Clone, Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -94,9 +94,13 @@ impl LspInformation {
impl LspAPI for BreezServer {
async fn list_lsps(&self, pubkey: String) -> SdkResult<Vec<LspInformation>> {
let mut client = self.get_channel_opener_client().await?;
let mut client_clone = client.clone();

let request = Request::new(LspListRequest { pubkey });
let response = client.lsp_list(request).await?;
let request = LspListRequest { pubkey };
let response = with_connection_fallback(client.lsp_list(request.clone()), || {
client_clone.lsp_list(request)
})
.await?;
let mut lsp_list: Vec<LspInformation> = Vec::new();
for (lsp_id, lsp_info) in response.into_inner().lsps.into_iter() {
match LspInformation::try_from(&lsp_id, lsp_info) {
Expand All @@ -110,9 +114,13 @@ impl LspAPI for BreezServer {

async fn list_used_lsps(&self, pubkey: String) -> SdkResult<Vec<LspInformation>> {
let mut client = self.get_channel_opener_client().await?;
let mut client_clone = client.clone();

let request = Request::new(LspFullListRequest { pubkey });
let response = client.lsp_full_list(request).await?;
let request = LspFullListRequest { pubkey };
let response = with_connection_fallback(client.lsp_full_list(request.clone()), || {
client_clone.lsp_full_list(request)
})
.await?;
let mut lsp_list: Vec<LspInformation> = Vec::new();
for grpc_lsp_info in response.into_inner().lsps.into_iter() {
let lsp_id = grpc_lsp_info.id.clone();
Expand All @@ -138,6 +146,7 @@ impl LspAPI for BreezServer {
};

let mut client = self.get_payment_notifier_client().await;
let mut client_clone = client.clone();

let mut buf = Vec::with_capacity(subscribe_request.encoded_len());
subscribe_request
Expand All @@ -150,7 +159,11 @@ impl LspAPI for BreezServer {
lsp_id,
blob: encrypt(lsp_pubkey, buf)?,
};
let response = client.register_payment_notification(request).await?;
let response = with_connection_fallback(
client.register_payment_notification(request.clone()),
|| client_clone.register_payment_notification(request),
)
.await?;

Ok(response.into_inner())
}
Expand All @@ -168,6 +181,7 @@ impl LspAPI for BreezServer {
};

let mut client = self.get_payment_notifier_client().await;
let mut client_clone = client.clone();

let mut buf = Vec::with_capacity(unsubscribe_request.encoded_len());
unsubscribe_request
Expand All @@ -180,7 +194,11 @@ impl LspAPI for BreezServer {
lsp_id,
blob: encrypt(lsp_pubkey, buf)?,
};
let response = client.remove_payment_notification(request).await?;
let response =
with_connection_fallback(client.remove_payment_notification(request.clone()), || {
client_clone.remove_payment_notification(request)
})
.await?;

Ok(response.into_inner())
}
Expand All @@ -192,6 +210,7 @@ impl LspAPI for BreezServer {
payment_info: PaymentInformation,
) -> SdkResult<RegisterPaymentReply> {
let mut client = self.get_channel_opener_client().await?;
let mut client_clone = client.clone();

let mut buf = Vec::with_capacity(payment_info.encoded_len());
payment_info
Expand All @@ -200,11 +219,14 @@ impl LspAPI for BreezServer {
err: format!("(LSP {lsp_id}) Failed to encode payment info: {e}"),
})?;

let request = Request::new(RegisterPaymentRequest {
let request = RegisterPaymentRequest {
lsp_id,
blob: encrypt(lsp_pubkey, buf)?,
});
let response = client.register_payment(request).await?;
};
let response = with_connection_fallback(client.register_payment(request.clone()), || {
client_clone.register_payment(request)
})
.await?;

Ok(response.into_inner())
}
Expand Down
16 changes: 10 additions & 6 deletions libs/sdk-core/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use sdk_common::prelude::*;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use strum_macros::{Display, EnumString};
use tonic_wrap::with_connection_fallback;

use crate::bitcoin::blockdata::opcodes;
use crate::bitcoin::blockdata::script::Builder;
Expand Down Expand Up @@ -400,12 +401,15 @@ pub(crate) trait ReverseSwapperRoutingAPI: Send + Sync {
#[tonic::async_trait]
impl ReverseSwapperRoutingAPI for BreezServer {
async fn fetch_reverse_routing_node(&self) -> ReverseSwapResult<Vec<u8>> {
Ok(self
.get_swapper_client()
.await
.get_reverse_routing_node(grpc::GetReverseRoutingNodeRequest::default())
.await
.map(|reply| reply.into_inner().node_id)?)
let mut client = self.get_swapper_client().await;
let mut client_clone = client.clone();

Ok(with_connection_fallback(
client.get_reverse_routing_node(grpc::GetReverseRoutingNodeRequest::default()),
|| client_clone.get_reverse_routing_node(grpc::GetReverseRoutingNodeRequest::default()),
)
.await
.map(|reply| reply.into_inner().node_id)?)
}
}

Expand Down
35 changes: 20 additions & 15 deletions libs/sdk-core/src/support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use anyhow::anyhow;
use chrono::{DateTime, Utc};
use sdk_common::grpc::{BreezStatusRequest, ReportPaymentFailureRequest};
use sdk_common::prelude::BreezServer;
use sdk_common::tonic_wrap::with_connection_fallback;
use serde::{Deserialize, Serialize};
use tonic::Request;

#[derive(Serialize, Deserialize)]
struct PaymentFailureReport {
Expand All @@ -33,15 +33,16 @@ impl TryFrom<i32> for HealthCheckStatus {
impl SupportAPI for BreezServer {
async fn service_health_check(&self) -> SdkResult<ServiceHealthCheckResponse> {
let mut client = self.get_support_client().await?;
let mut client_clone = client.clone();

let request = Request::new(BreezStatusRequest {});
let response =
client
.breez_status(request)
.await
.map_err(|e| SdkError::ServiceConnectivity {
err: format!("(Breez) Fetch status failed: {e}"),
})?;
let request = BreezStatusRequest {};
let response = with_connection_fallback(client.breez_status(request.clone()), || {
client_clone.breez_status(request)
})
.await
.map_err(|e| SdkError::ServiceConnectivity {
err: format!("(Breez) Fetch status failed: {e}"),
})?;
let status = response.into_inner().status.try_into()?;
Ok(ServiceHealthCheckResponse { status })
}
Expand All @@ -54,13 +55,15 @@ impl SupportAPI for BreezServer {
comment: Option<String>,
) -> SdkResult<()> {
let mut client = self.get_support_client().await?;
let mut client_clone = client.clone();

let timestamp: DateTime<Utc> = SystemTime::now().into();
let report = PaymentFailureReport {
node_state: node_state.clone(),
payment,
};

let request = Request::new(ReportPaymentFailureRequest {
let request = ReportPaymentFailureRequest {
sdk_version: option_env!("CARGO_PKG_VERSION")
.unwrap_or_default()
.to_string(),
Expand All @@ -70,11 +73,13 @@ impl SupportAPI for BreezServer {
timestamp: timestamp.to_rfc3339(),
comment: comment.unwrap_or_default(),
report: serde_json::to_string(&report)?,
});
_ = client.report_payment_failure(request).await.map_err(|e| {
SdkError::ServiceConnectivity {
err: format!("(Breez) Report payment failure failed: {e}"),
}
};
_ = with_connection_fallback(client.report_payment_failure(request.clone()), || {
client_clone.report_payment_failure(request)
})
.await
.map_err(|e| SdkError::ServiceConnectivity {
err: format!("(Breez) Report payment failure failed: {e}"),
})?;
Ok(())
}
Expand Down
24 changes: 16 additions & 8 deletions libs/sdk-core/src/swap_in/swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use rand::Rng;
use ripemd::{Digest, Ripemd160};
use sdk_common::grpc::{AddFundInitRequest, GetSwapPaymentRequest};
use sdk_common::prelude::BreezServer;
use sdk_common::tonic_wrap::with_connection_fallback;
use tokio::sync::broadcast;

use crate::bitcoin::blockdata::constants::WITNESS_SCALE_FACTOR;
Expand Down Expand Up @@ -44,15 +45,21 @@ impl SwapperAPI for BreezServer {
payer_pubkey: Vec<u8>,
node_id: String,
) -> SwapResult<Swap> {
let mut fund_client = self.get_swapper_client().await;
let mut client = self.get_swapper_client().await;
let mut client_clone = client.clone();

let req = AddFundInitRequest {
hash: hash.clone(),
pubkey: payer_pubkey.clone(),
node_id,
notification_token: "".to_string(),
};

let result = fund_client.add_fund_init(req).await?.into_inner();
let result = with_connection_fallback(client.add_fund_init(req.clone()), || {
client_clone.add_fund_init(req)
})
.await?
.into_inner();
Ok(Swap {
bitcoin_address: result.address,
swapper_pubkey: result.pubkey,
Expand All @@ -65,15 +72,16 @@ impl SwapperAPI for BreezServer {
}

async fn complete_swap(&self, bolt11: String) -> Result<()> {
let mut client = self.get_swapper_client().await;
let mut client_clone = client.clone();
let req = GetSwapPaymentRequest {
payment_request: bolt11,
};
let resp = self
.get_swapper_client()
.await
.get_swap_payment(req)
.await?
.into_inner();
let resp = with_connection_fallback(client.get_swap_payment(req.clone()), || {
client_clone.get_swap_payment(req)
})
.await?
.into_inner();

match resp.swap_error() {
crate::grpc::get_swap_payment_reply::SwapError::NoError => Ok(()),
Expand Down

0 comments on commit e4fa424

Please sign in to comment.