From 806261f8d868baeb19e7d36672e43ce12c27408e Mon Sep 17 00:00:00 2001 From: Jesse de Wit Date: Fri, 29 Nov 2024 13:27:22 +0100 Subject: [PATCH] detect and act on network change When there is a network change, tonic behaves as follows: - After the keepalive timeout, reconnect automatically. - Before the keepalive timeout, any grpc call will time out. After the timeout, the connection is reestablished. This commit adds a mechanism to reconnect all grpc clients after one of the clients detects a network change. Initially it was attempted to only retry based on a keepalive timeout error, but a network change affects all grpc clients, so subsequent requests to other grpc endpoints would still fail with a timeout. Ofcourse those grpc clients can also add a retry-on-timeout, but since it is known at this point the grpc clients are temporarily dead, reconnect them immediately. This ensures subsequent calls to other endpoints won't add additional time by waiting for a timeout. tokio was bumped to 1.41 to allow cloning watch::Sender. --- libs/Cargo.lock | 26 +++--- libs/Cargo.toml | 2 +- libs/sdk-core/src/breez_services.rs | 79 ++++++++++------ libs/sdk-core/src/greenlight/node_api.rs | 110 +++++++++++++++++++---- tools/sdk-cli/Cargo.lock | 46 ++++++---- 5 files changed, 184 insertions(+), 79 deletions(-) diff --git a/libs/Cargo.lock b/libs/Cargo.lock index aac2f2e60..7c04e128a 100644 --- a/libs/Cargo.lock +++ b/libs/Cargo.lock @@ -1610,9 +1610,9 @@ dependencies = [ [[package]] name = "hermit-abi" -version = "0.3.3" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" [[package]] name = "hex" @@ -1864,7 +1864,7 @@ version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" dependencies = [ - "hermit-abi 0.3.3", + "hermit-abi 0.3.9", "rustix", "windows-sys 0.48.0", ] @@ -2071,13 +2071,14 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.11" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" dependencies = [ + "hermit-abi 0.3.9", "libc", "wasi", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -2237,7 +2238,7 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" dependencies = [ - "hermit-abi 0.3.3", + "hermit-abi 0.3.9", "libc", ] @@ -3744,21 +3745,20 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.34.0" +version = "1.41.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0c014766411e834f7af5b8f4cf46257aab4036ca95e9d2c144a10f59ad6f5b9" +checksum = "22cfb5bee7a6a52939ca9224d6ac897bb669134078daa8735560897f69de4d33" dependencies = [ "backtrace", "bytes", "libc", "mio", - "num_cpus", "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2 0.5.5", "tokio-macros", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -3773,9 +3773,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.2.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" +checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", diff --git a/libs/Cargo.toml b/libs/Cargo.toml index a782c38b2..0c2a5dd6e 100644 --- a/libs/Cargo.toml +++ b/libs/Cargo.toml @@ -43,7 +43,7 @@ serde_json = "1.0" strum = "0.25" strum_macros = "0.25" thiserror = "1.0.56" -tokio = { version = "1", features = ["full"] } +tokio = { version = "1.41", features = ["full"] } tonic = "^0.8" tonic-build = "^0.8" uniffi = "0.23.0" diff --git a/libs/sdk-core/src/breez_services.rs b/libs/sdk-core/src/breez_services.rs index 72bb62486..fba0d462c 100644 --- a/libs/sdk-core/src/breez_services.rs +++ b/libs/sdk-core/src/breez_services.rs @@ -170,8 +170,9 @@ pub struct BreezServices { backup_watcher: Arc, shutdown_sender: watch::Sender<()>, shutdown_receiver: watch::Receiver<()>, - hibernation_sender: watch::Sender<()>, - hibernation_receiver: watch::Receiver<()>, + reconnect_sender: watch::Sender<()>, + reconnect_receiver: watch::Receiver<()>, + network_change_receiver: watch::Receiver<()>, } impl BreezServices { @@ -1439,6 +1440,7 @@ impl BreezServices { async fn start_background_tasks(self: &Arc) -> SdkResult<()> { // Detect hibernation self.detect_hibernation(); + self.detect_network_change(); // start the signer let (shutdown_signer_sender, signer_signer_receiver) = watch::channel(()); @@ -1491,6 +1493,34 @@ impl BreezServices { Ok(()) } + fn detect_network_change(self: &Arc) { + let cloned = Arc::clone(self); + let mut network_change_receiver = cloned.network_change_receiver.clone(); + tokio::spawn(async move { + loop { + if network_change_receiver.changed().await.is_err() { + debug!("network change detector stopped"); + return; + } + + debug!("Network change detected."); + cloned.reconnect().await; + } + }); + } + + async fn reconnect(self: &Arc) { + // Reconnect node api before notifying anything else, to + // ensure there are no races reconnecting dependant + // services. + debug!("Reconnecting node api."); + self.node_api.reconnect().await; + + // Now notify dependant services. + debug!("Notifying services about reconnect."); + let _ = self.reconnect_sender.send(()); + } + fn detect_hibernation(self: &Arc) { let cloned = Arc::clone(self); tokio::spawn(async move { @@ -1509,25 +1539,15 @@ impl BreezServices { .saturating_sub(DETECT_HIBERNATE_SLEEP_DURATION) .ge(&DETECT_HIBERNATE_MAX_OFFSET) { - // Reconnect node api before notifying anything else, to - // ensure there are no races reconnecting dependant - // services. - debug!( - "Hibernation detected, time diff {}s, reconnecting node api.", - elapsed.as_secs_f32() - ); - cloned.node_api.reconnect().await; - - // Now notify dependant services. - debug!("Hibernation detected, notifying services."); - let _ = cloned.hibernation_sender.send(()); + debug!("Hibernation detected, time diff {}s", elapsed.as_secs_f32()); + cloned.reconnect().await; } } }); } async fn start_signer(self: &Arc, mut shutdown_receiver: watch::Receiver<()>) { - let mut hibernation_receiver = self.hibernation_receiver.clone(); + let mut reconnect_receiver = self.reconnect_receiver.clone(); let node_api = self.node_api.clone(); tokio::spawn(async move { @@ -1543,9 +1563,9 @@ impl BreezServices { true } - _ = hibernation_receiver.changed() => { + _ = reconnect_receiver.changed() => { // NOTE: The node api is reconnected already inside the - // detect_hibernation function, to avoid races. + // reconnect function, to avoid races. false } }; @@ -1648,7 +1668,7 @@ impl BreezServices { let cloned = self.clone(); tokio::spawn(async move { let mut shutdown_receiver = cloned.shutdown_receiver.clone(); - let mut reconnect_receiver = cloned.hibernation_receiver.clone(); + let mut reconnect_receiver = cloned.reconnect_receiver.clone(); loop { if shutdown_receiver.has_changed().unwrap_or(true) { return; @@ -1674,7 +1694,7 @@ impl BreezServices { } _ = reconnect_receiver.changed() => { - debug!("Reconnect hibernation: track invoices"); + debug!("Reconnect: track invoices"); break; } }; @@ -1733,7 +1753,7 @@ impl BreezServices { let cloned = self.clone(); tokio::spawn(async move { let mut shutdown_receiver = cloned.shutdown_receiver.clone(); - let mut reconnect_receiver = cloned.hibernation_receiver.clone(); + let mut reconnect_receiver = cloned.reconnect_receiver.clone(); loop { if shutdown_receiver.has_changed().unwrap_or(true) { return; @@ -1759,7 +1779,7 @@ impl BreezServices { } _ = reconnect_receiver.changed() => { - debug!("Reconnect hibernation: track logs"); + debug!("Reconnect: track logs"); break; } }; @@ -2394,7 +2414,8 @@ impl BreezServicesBuilder { }); } - let (hibernation_sender, hibernation_receiver) = watch::channel(()); + let (reconnect_sender, reconnect_receiver) = watch::channel(()); + let (network_change_sender, network_change_receiver) = watch::channel(()); // The storage is implemented via sqlite. let persister = self .persister @@ -2410,6 +2431,7 @@ impl BreezServicesBuilder { self.seed.clone().unwrap(), restore_only, persister.clone(), + network_change_sender.clone(), ) .await?; let gl_arc = Arc::new(greenlight); @@ -2468,16 +2490,16 @@ impl BreezServicesBuilder { } }); - // Reconnect breez server on hibernation. + // Reconnect breez server on when requested. let cloned_breez_server = breez_server.clone(); - let mut cloned_hibernation_receiver = hibernation_receiver.clone(); + let mut cloned_reconnect_receiver = reconnect_receiver.clone(); tokio::spawn(async move { loop { - if cloned_hibernation_receiver.changed().await.is_err() { + if cloned_reconnect_receiver.changed().await.is_err() { return; } - debug!("Reconnect hibernation: reconnecting breez server"); + debug!("Reconnect: reconnecting breez server"); let _ = cloned_breez_server.reconnect().await; } }); @@ -2572,8 +2594,9 @@ impl BreezServicesBuilder { backup_watcher: Arc::new(backup_watcher), shutdown_sender, shutdown_receiver, - hibernation_sender, - hibernation_receiver, + reconnect_sender, + reconnect_receiver, + network_change_receiver, }); Ok(breez_services) diff --git a/libs/sdk-core/src/greenlight/node_api.rs b/libs/sdk-core/src/greenlight/node_api.rs index f2f68d053..c5af0a606 100644 --- a/libs/sdk-core/src/greenlight/node_api.rs +++ b/libs/sdk-core/src/greenlight/node_api.rs @@ -1,5 +1,6 @@ use std::cmp::{min, Reverse}; use std::collections::{HashMap, HashSet}; +use std::error::Error; use std::iter::Iterator; use std::pin::Pin; use std::str::FromStr; @@ -23,7 +24,7 @@ use gl_client::pb::cln::{ ListclosedchannelsClosedchannels, ListpaysPays, ListpeerchannelsChannels, ListsendpaysPayments, PreapproveinvoiceRequest, SendpayRequest, SendpayRoute, WaitsendpayRequest, }; -use gl_client::pb::{OffChainPayment, PayStatus, TrampolinePayRequest}; +use gl_client::pb::{OffChainPayment, TrampolinePayRequest}; use gl_client::scheduler::Scheduler; use gl_client::signer::model::greenlight::{amount, scheduler}; use gl_client::signer::Signer; @@ -72,6 +73,7 @@ pub(crate) struct Greenlight { node_client: Mutex>, persister: Arc, inprogress_payments: AtomicU16, + network_change_detector: watch::Sender<()>, } #[derive(Serialize, Deserialize)] @@ -100,6 +102,7 @@ impl Greenlight { seed: Vec, restore_only: Option, persister: Arc, + network_change_detector: watch::Sender<()>, ) -> NodeResult { // Derive the encryption key from the seed let temp_signer = Arc::new(Signer::new( @@ -161,7 +164,13 @@ impl Greenlight { match encrypted_creds { Some(c) => { persister.set_gl_credentials(c)?; - Greenlight::new(config, seed, creds.clone(), persister) + Greenlight::new( + config, + seed, + creds.clone(), + persister, + network_change_detector, + ) } None => Err(NodeError::generic("Failed to encrypt credentials")), } @@ -175,6 +184,7 @@ impl Greenlight { seed: Vec, device: Device, persister: Arc, + network_change_detector: watch::Sender<()>, ) -> NodeResult { let greenlight_network = sdk_config.network.into(); let signer = Signer::new(seed.clone(), greenlight_network, device.clone())?; @@ -187,6 +197,7 @@ impl Greenlight { node_client: Mutex::new(None), persister, inprogress_payments: AtomicU16::new(0), + network_change_detector, }) } @@ -704,6 +715,61 @@ impl Greenlight { res } + async fn with_connection_fallback( + &self, + main: M, + fallback: impl FnOnce() -> F, + ) -> Result + where + M: Future>, + F: Future>, + T: std::fmt::Debug, + { + let res = main.await; + let status = match res { + Ok(t) => return Ok(t), + Err(s) => s, + }; + info!("AAAAAAAAAAAAAAA got status"); + + let source = match status.source() { + Some(source) => source, + None => return Err(status), + }; + info!("AAAAAAAAAAAAAAA got source"); + + let error: &tonic::transport::Error = match source.downcast_ref() { + Some(error) => error, + None => return Err(status), + }; + info!("AAAAAAAAAAAAAAA got error"); + + if error.to_string() != "transport error" { + return Err(status); + } + info!("AAAAAAAAAAAAAAA is transport error"); + + let source = match error.source() { + Some(source) => source, + None => return Err(status), + }; + info!("AAAAAAAAAAAAAAA got source"); + info!("AAAAAAAAAAAAAAA {}", source.to_string()); + if !source.to_string().contains("keep-alive timed out") { + return Err(status); + } + + info!("AAAAAAAAAAAAAAA is connection error"); + if self.network_change_detector.send(()).is_err() { + warn!("greenlight network change detector died."); + } + + let res = fallback().await; + + info!("AAAAAAAAAAAAAAA fallback returned {:?}", res); + res + } + // pulls transactions from greenlight based on last sync timestamp. // greenlight gives us the payments via API and for received payments we are looking for settled invoices. async fn pull_transactions( @@ -997,14 +1063,14 @@ struct SyncState { #[tonic::async_trait] impl NodeAPI for Greenlight { async fn reconnect(&self) { - debug!("Reconnect hibernation: request received"); + debug!("Reconnect: request received"); // Force refresh existing grpc clients *self.gl_client.lock().await = None; *self.node_client.lock().await = None; // Create a new signer - debug!("Reconnect hibernation: creating new signer"); + debug!("Reconnect: creating new signer"); let new_signer = match Signer::new( self.seed.clone(), self.sdk_config.network.into(), @@ -1013,7 +1079,7 @@ impl NodeAPI for Greenlight { Ok(new_signer) => new_signer, Err(e) => { error!( - "Reconnect hibernation: failed to create new signer after reconnect request: {:?}", + "Reconnect: failed to create new signer after reconnect request: {:?}", e ); return; @@ -1062,6 +1128,7 @@ impl NodeAPI for Greenlight { async fn create_invoice(&self, request: CreateInvoiceRequest) -> NodeResult { let mut client = self.get_node_client().await?; + let mut client_clone = self.get_node_client().await?; let label = serde_json::to_string(&InvoiceLabel { unix_milli: SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis(), payer_amount_msat: request.payer_amount_msat, @@ -1081,7 +1148,12 @@ impl NodeAPI for Greenlight { cltv: request.cltv, }; - let res = client.invoice(cln_request).await?.into_inner(); + let res = self + .with_connection_fallback(client.invoice(cln_request.clone()), || { + client_clone.invoice(cln_request) + }) + .await? + .into_inner(); Ok(res.bolt11) } @@ -1359,7 +1431,8 @@ impl NodeAPI for Greenlight { description = invoice.description; } - let mut client: node::ClnClient = self.get_node_client().await?; + let mut client = self.get_node_client().await?; + let mut client_clone = self.get_node_client().await?; let request = cln::PayRequest { bolt11, amount_msat: amount_msat.map(|amt| cln::Amount { msat: amt }), @@ -1377,7 +1450,11 @@ impl NodeAPI for Greenlight { }), }; let result: cln::PayResponse = self - .with_keep_alive(client.pay(request)) + .with_keep_alive( + self.with_connection_fallback(client.pay(request.clone()), || { + client_clone.pay(request) + }), + ) .await? .into_inner(); @@ -1407,6 +1484,7 @@ impl NodeAPI for Greenlight { let fee_percent = ((fee_msat as f64 / amount_msat as f64) * 100.) as f32; debug!("using fee msat {} fee percent {}", fee_msat, fee_percent); let mut client = self.get_client().await?; + let mut client_clone = client.clone(); let request = TrampolinePayRequest { bolt11, trampoline_node_id, @@ -1417,7 +1495,11 @@ impl NodeAPI for Greenlight { maxfeepercent: fee_percent, }; let result = self - .with_keep_alive(client.trampoline_pay(request)) + .with_keep_alive( + self.with_connection_fallback(client.trampoline_pay(request.clone()), || { + client_clone.trampoline_pay(request) + }), + ) .await? .into_inner(); @@ -2292,16 +2374,6 @@ impl TryFrom for Payment { } } -impl From for PaymentStatus { - fn from(value: PayStatus) -> Self { - match value { - PayStatus::Pending => PaymentStatus::Pending, - PayStatus::Complete => PaymentStatus::Complete, - PayStatus::Failed => PaymentStatus::Failed, - } - } -} - /// Construct a lightning transaction from an invoice impl TryFrom for Payment { type Error = NodeError; diff --git a/tools/sdk-cli/Cargo.lock b/tools/sdk-cli/Cargo.lock index 6223d9895..d00a78541 100644 --- a/tools/sdk-cli/Cargo.lock +++ b/tools/sdk-cli/Cargo.lock @@ -1468,9 +1468,9 @@ dependencies = [ [[package]] name = "hermit-abi" -version = "0.3.1" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fed44880c466736ef9a5c5b5facefb5ed0785676d0c02d612db14e54f0d84286" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" [[package]] name = "hex" @@ -1568,7 +1568,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2", + "socket2 0.4.9", "tokio", "tower-service", "tracing", @@ -1691,7 +1691,7 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" dependencies = [ - "hermit-abi 0.3.1", + "hermit-abi 0.3.9", "libc", "windows-sys 0.48.0", ] @@ -1708,7 +1708,7 @@ version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adcf93614601c8129ddf72e2d5633df827ba6551541c6d8c59520a371475be1f" dependencies = [ - "hermit-abi 0.3.1", + "hermit-abi 0.3.9", "io-lifetimes", "rustix 0.37.27", "windows-sys 0.48.0", @@ -1912,13 +1912,14 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.11" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" dependencies = [ + "hermit-abi 0.3.9", "libc", "wasi", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -2379,9 +2380,9 @@ dependencies = [ [[package]] name = "pin-project-lite" -version = "0.2.9" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" +checksum = "915a1e146535de9163f3987b8944ed8cf49a18bb0056bcebcdcece385cece4ff" [[package]] name = "pin-utils" @@ -3287,6 +3288,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "socket2" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c970269d99b64e60ec3bd6ad27270092a5394c4e309314b18ae3fe575695fbe8" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "spin" version = "0.5.2" @@ -3509,21 +3520,20 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.28.2" +version = "1.41.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94d7b1cfd2aa4011f2de74c2c4c63665e27a71006b0a192dcd2710272e73dfa2" +checksum = "22cfb5bee7a6a52939ca9224d6ac897bb669134078daa8735560897f69de4d33" dependencies = [ - "autocfg", + "backtrace", "bytes", "libc", "mio", - "num_cpus", "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.5.8", "tokio-macros", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -3538,9 +3548,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.1.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" +checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote",