From 0d44831458d2ac7c2d3a6e5644501d3c3d4c6ea1 Mon Sep 17 00:00:00 2001 From: Jesse de Wit Date: Fri, 29 Nov 2024 21:40:03 +0100 Subject: [PATCH] wrap node api with connection fallbacks --- libs/sdk-core/src/greenlight/node_api.rs | 400 +++++++++++++---------- 1 file changed, 230 insertions(+), 170 deletions(-) diff --git a/libs/sdk-core/src/greenlight/node_api.rs b/libs/sdk-core/src/greenlight/node_api.rs index 0ba278d74..4b63dc42a 100644 --- a/libs/sdk-core/src/greenlight/node_api.rs +++ b/libs/sdk-core/src/greenlight/node_api.rs @@ -1039,23 +1039,27 @@ impl NodeAPI for Greenlight { async fn configure_node(&self, close_to_address: Option) -> NodeResult<()> { match close_to_address { Some(close_to_addr) => { - self.get_client() - .await? - .configure(gl_client::pb::GlConfig { close_to_addr }) - .await - .map_err(|e| NodeError::Generic(format!("Unable to set node config: {}", e)))?; + let mut client = self.get_client().await?; + let mut client_clone = client.clone(); + let req = gl_client::pb::GlConfig { close_to_addr }; + with_connection_fallback(client.configure(req.clone()), || { + client_clone.configure(req) + }) + .await + .map_err(|e| NodeError::Generic(format!("Unable to set node config: {}", e)))?; } None => { - self.get_node_client() - .await? - .del_datastore(cln::DeldatastoreRequest { - key: vec!["glconf".to_string(), "request".to_string()], - generation: None, - }) - .await - .map_err(|e| { - NodeError::Generic(format!("Unable to delete node config: {}", e)) - })?; + let mut client = self.get_node_client().await?; + let mut client_clone = client.clone(); + let req = cln::DeldatastoreRequest { + key: vec!["glconf".to_string(), "request".to_string()], + generation: None, + }; + with_connection_fallback(client.del_datastore(req.clone()), || { + client_clone.del_datastore(req) + }) + .await + .map_err(|e| NodeError::Generic(format!("Unable to delete node config: {}", e)))?; } } Ok(()) @@ -1063,7 +1067,7 @@ impl NodeAPI for Greenlight { async fn create_invoice(&self, request: CreateInvoiceRequest) -> NodeResult { let mut client = self.get_node_client().await?; - let mut client_clone = self.get_node_client().await?; + let mut client_clone = client.clone(); let label = serde_json::to_string(&InvoiceLabel { unix_milli: SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis(), payer_amount_msat: request.payer_amount_msat, @@ -1092,29 +1096,30 @@ impl NodeAPI for Greenlight { } async fn fetch_bolt11(&self, payment_hash: Vec) -> NodeResult> { + let mut client = self.get_node_client().await?; + let mut client_clone = client.clone(); let request = cln::ListinvoicesRequest { payment_hash: Some(payment_hash), ..Default::default() }; - let result = self - .get_node_client() - .await? - .list_invoices(request) - .await? - .into_inner() - .invoices - .first() - .cloned() - .and_then(|invoice| { - invoice.bolt11.map(|bolt11| FetchBolt11Result { - bolt11, - payer_amount_msat: serde_json::from_str::(&invoice.label) - .map(|label| label.payer_amount_msat) - .ok() - .flatten(), - }) - }); + let result = with_connection_fallback(client.list_invoices(request.clone()), || { + client_clone.list_invoices(request) + }) + .await? + .into_inner() + .invoices + .first() + .cloned() + .and_then(|invoice| { + invoice.bolt11.map(|bolt11| FetchBolt11Result { + bolt11, + payer_amount_msat: serde_json::from_str::(&invoice.label) + .map(|label| label.payer_amount_msat) + .ok() + .flatten(), + }) + }); Ok(result) } @@ -1130,24 +1135,31 @@ impl NodeAPI for Greenlight { None => SyncState::default(), }; - let node_client = self.get_node_client().await?; + let client = self.get_node_client().await?; // get node info - let mut node_info_client = node_client.clone(); - let node_info_future = node_info_client.getinfo(cln::GetinfoRequest::default()); + let mut client_clone1 = client.clone(); + let mut client_clone2 = client.clone(); + let node_info_future = with_connection_fallback( + client_clone1.getinfo(cln::GetinfoRequest::default()), + || client_clone2.getinfo(cln::GetinfoRequest::default()), + ); // list both off chain funds and on chain fudns let funds_future = self.list_funds(); // Fetch closed channels from greenlight - let mut closed_channels_client = node_client.clone(); - let closed_channels_future = closed_channels_client - .list_closed_channels(cln::ListclosedchannelsRequest { id: None }); + let mut client_clone3 = client.clone(); + let mut client_clone4 = client.clone(); + let closed_channels_future = with_connection_fallback( + client_clone3.list_closed_channels(cln::ListclosedchannelsRequest { id: None }), + || client_clone4.list_closed_channels(cln::ListclosedchannelsRequest { id: None }), + ); // calculate the node new balance and in case the caller signals balance has changed // keep polling until the balance is updated let balance_future = Greenlight::fetch_channels_and_balance_with_retry( - node_client.clone(), + client.clone(), self.persister.clone(), match_local_balance, ); @@ -1242,7 +1254,8 @@ impl NodeAPI for Greenlight { async fn send_pay(&self, bolt11: String, max_hops: u32) -> NodeResult { let invoice = parse_invoice(&bolt11)?; let last_hop = invoice.routing_hints.first().and_then(|rh| rh.hops.first()); - let mut client: node::ClnClient = self.get_node_client().await?; + let mut client = self.get_node_client().await?; + let mut client_clone = client.clone(); // Valid the invoice network against the config network validate_network(invoice.clone(), self.sdk_config.network)?; @@ -1309,20 +1322,20 @@ impl NodeAPI for Greenlight { ); self.wait_channel_reestablished(&max.path).await?; // We send the part using the node API - client - .send_pay(SendpayRequest { - route, - payment_hash: hex::decode(invoice.payment_hash.clone())?, - label: None, - amount_msat: Some(Amount { - msat: amount_to_pay_msat, - }), - bolt11: Some(bolt11.clone()), - payment_secret: Some(invoice.payment_secret.clone()), - partid: Some(part_id), - localinvreqid: None, - groupid: Some(group_id), - }) + let req = SendpayRequest { + route, + payment_hash: hex::decode(invoice.payment_hash.clone())?, + label: None, + amount_msat: Some(Amount { + msat: amount_to_pay_msat, + }), + bolt11: Some(bolt11.clone()), + payment_secret: Some(invoice.payment_secret.clone()), + partid: Some(part_id), + localinvreqid: None, + groupid: Some(group_id), + }; + with_connection_fallback(client.send_pay(req.clone()), || client_clone.send_pay(req)) .await?; part_id += 1; amount_sent_msat += sent_msat; @@ -1334,13 +1347,17 @@ impl NodeAPI for Greenlight { // Now we wait for the first part to be completed as a way to wait for the payment // to complete. + let req = WaitsendpayRequest { + payment_hash: hex::decode(invoice.payment_hash.clone())?, + partid: Some(1), + timeout: Some(self.sdk_config.payment_timeout_sec), + groupid: Some(group_id), + }; let response = self - .with_keep_alive(client.wait_send_pay(WaitsendpayRequest { - payment_hash: hex::decode(invoice.payment_hash.clone())?, - partid: Some(1), - timeout: Some(self.sdk_config.payment_timeout_sec), - groupid: Some(group_id), - })) + .with_keep_alive(with_connection_fallback( + client.wait_send_pay(req.clone()), + || client_clone.wait_send_pay(req), + )) .await? .into_inner(); Ok(PaymentResponse { @@ -1366,7 +1383,7 @@ impl NodeAPI for Greenlight { } let mut client = self.get_node_client().await?; - let mut client_clone = self.get_node_client().await?; + let mut client_clone = client.clone(); let request = cln::PayRequest { bolt11, amount_msat: amount_msat.map(|amt| cln::Amount { msat: amt }), @@ -1476,6 +1493,8 @@ impl NodeAPI for Greenlight { retry_for: Some(self.sdk_config.payment_timeout_sec), maxdelay: None, }; + + // Not wrapped with connection retry, in case it causes to send twice. let result = self .with_keep_alive(client.key_send(request)) .await? @@ -1497,6 +1516,7 @@ impl NodeAPI for Greenlight { sat_per_vbyte: u32, ) -> NodeResult> { let mut client = self.get_node_client().await?; + let mut client_clone = client.clone(); let request = cln::WithdrawRequest { feerate: Some(cln::Feerate { @@ -1510,7 +1530,14 @@ impl NodeAPI for Greenlight { utxos: vec![], }; - Ok(client.withdraw(request).await?.into_inner().txid) + Ok( + with_connection_fallback(client.withdraw(request.clone()), || { + client_clone.withdraw(request) + }) + .await? + .into_inner() + .txid, + ) } async fn prepare_redeem_onchain_funds( @@ -1613,12 +1640,16 @@ impl NodeAPI for Greenlight { async fn connect_peer(&self, id: String, addr: String) -> NodeResult<()> { let mut client = self.get_node_client().await?; + let mut client_clone = client.clone(); let connect_req = cln::ConnectRequest { id: format!("{id}@{addr}"), host: None, port: None, }; - client.connect_peer(connect_req).await?; + with_connection_fallback(client.connect_peer(connect_req.clone()), || { + client_clone.connect_peer(connect_req) + }) + .await?; Ok(()) } @@ -1678,9 +1709,13 @@ impl NodeAPI for Greenlight { async fn close_peer_channels(&self, node_id: String) -> NodeResult> { let mut client = self.get_node_client().await?; - let closed_channels = client - .list_peer_channels(cln::ListpeerchannelsRequest { - id: Some(hex::decode(node_id)?), + let mut client_clone = client.clone(); + let req = cln::ListpeerchannelsRequest { + id: Some(hex::decode(node_id)?), + }; + let closed_channels = + with_connection_fallback(client.list_peer_channels(req.clone()), || { + client_clone.list_peer_channels(req) }) .await? .into_inner(); @@ -1703,17 +1738,18 @@ impl NodeAPI for Greenlight { if should_close { let chan_id = channel.channel_id.ok_or(anyhow!("Empty channel id"))?; - let response = client - .close(cln::CloseRequest { - id: hex::encode(chan_id), - unilateraltimeout: None, - destination: None, - fee_negotiation_step: None, - wrong_funding: None, - force_lease_closed: None, - feerange: vec![], - }) - .await; + let req = cln::CloseRequest { + id: hex::encode(chan_id), + unilateraltimeout: None, + destination: None, + fee_negotiation_step: None, + wrong_funding: None, + force_lease_closed: None, + feerange: vec![], + }; + let response = + with_connection_fallback(client.close(req.clone()), || client_clone.close(req)) + .await; match response { Ok(res) => { tx_ids.push(hex::encode( @@ -1733,10 +1769,13 @@ impl NodeAPI for Greenlight { &self, ) -> NodeResult> { let mut client = self.get_client().await?; - let stream = client - .stream_incoming(gl_client::signer::model::greenlight::StreamIncomingFilter {}) - .await? - .into_inner(); + let mut client_clone = client.clone(); + let req = gl_client::signer::model::greenlight::StreamIncomingFilter {}; + let stream = with_connection_fallback(client.stream_incoming(req.clone()), || { + client_clone.stream_incoming(req) + }) + .await? + .into_inner(); Ok(stream) } @@ -1744,19 +1783,25 @@ impl NodeAPI for Greenlight { &self, ) -> NodeResult> { let mut client = self.get_client().await?; - let stream = client - .stream_log(gl_client::signer::model::greenlight::StreamLogRequest {}) - .await? - .into_inner(); + let mut client_clone = client.clone(); + let req = gl_client::signer::model::greenlight::StreamLogRequest {}; + let stream = with_connection_fallback(client.stream_log(req.clone()), || { + client_clone.stream_log(req) + }) + .await? + .into_inner(); Ok(stream) } async fn static_backup(&self) -> NodeResult> { let mut client = self.get_node_client().await?; - let res = client - .static_backup(cln::StaticbackupRequest {}) - .await? - .into_inner(); + let mut client_clone = client.clone(); + let req = cln::StaticbackupRequest {}; + let res = with_connection_fallback(client.static_backup(req.clone()), || { + client_clone.static_backup(req) + }) + .await? + .into_inner(); let hex_vec: Vec = res.scb.into_iter().map(hex::encode).collect(); Ok(hex_vec) } @@ -1785,82 +1830,85 @@ impl NodeAPI for Greenlight { async fn execute_command(&self, command: String) -> NodeResult { let node_cmd = NodeCommand::from_str(&command).map_err(|_| anyhow!("Command not found: {command}"))?; + + let mut client = self.get_node_client().await?; + let mut client_clone = client.clone(); match node_cmd { NodeCommand::ListPeers => { - let resp = self - .get_node_client() - .await? - .list_peers(cln::ListpeersRequest::default()) - .await? - .into_inner(); + let req = cln::ListpeersRequest::default(); + let resp = with_connection_fallback(client.list_peers(req.clone()), || { + client_clone.list_peers(req) + }) + .await? + .into_inner(); Ok(crate::serializer::value::to_value(&resp)?) } NodeCommand::ListPeerChannels => { - let resp = self - .get_node_client() - .await? - .list_peer_channels(cln::ListpeerchannelsRequest::default()) - .await? - .into_inner(); + let req = cln::ListpeerchannelsRequest::default(); + let resp = with_connection_fallback(client.list_peer_channels(req.clone()), || { + client_clone.list_peer_channels(req.clone()) + }) + .await? + .into_inner(); Ok(crate::serializer::value::to_value(&resp)?) } NodeCommand::ListFunds => { - let resp = self - .get_node_client() - .await? - .list_funds(cln::ListfundsRequest::default()) - .await? - .into_inner(); + let req = cln::ListfundsRequest::default(); + let resp = with_connection_fallback(client.list_funds(req.clone()), || { + client_clone.list_funds(req) + }) + .await? + .into_inner(); Ok(crate::serializer::value::to_value(&resp)?) } NodeCommand::ListPayments => { - let resp = self - .get_node_client() - .await? - .list_pays(cln::ListpaysRequest::default()) - .await? - .into_inner(); + let req = cln::ListpaysRequest::default(); + let resp = with_connection_fallback(client.list_pays(req.clone()), || { + client_clone.list_pays(req) + }) + .await? + .into_inner(); Ok(crate::serializer::value::to_value(&resp)?) } NodeCommand::ListInvoices => { - let resp = self - .get_node_client() - .await? - .list_invoices(cln::ListinvoicesRequest::default()) - .await? - .into_inner(); + let req = cln::ListinvoicesRequest::default(); + let resp = with_connection_fallback(client.list_invoices(req.clone()), || { + client_clone.list_invoices(req) + }) + .await? + .into_inner(); + Ok(crate::serializer::value::to_value(&resp)?) } NodeCommand::CloseAllChannels => { - let peers_res = self - .get_node_client() - .await? - .list_peers(cln::ListpeersRequest::default()) - .await? - .into_inner(); - for p in peers_res.peers { + let req = cln::ListpeersRequest::default(); + let resp = with_connection_fallback(client.list_peers(req.clone()), || { + client_clone.list_peers(req) + }) + .await? + .into_inner(); + for p in resp.peers { self.close_peer_channels(hex::encode(p.id)).await?; } Ok(Value::String("All channels were closed".to_string())) } NodeCommand::GetInfo => { - let resp = self - .get_node_client() - .await? - .getinfo(cln::GetinfoRequest::default()) - .await? - .into_inner(); + let req = cln::GetinfoRequest::default(); + let resp = with_connection_fallback(client.getinfo(req.clone()), || { + client_clone.getinfo(req) + }) + .await? + .into_inner(); Ok(crate::serializer::value::to_value(&resp)?) } NodeCommand::Stop => { - let resp = self - .get_node_client() - .await? - .stop(cln::StopRequest::default()) - .await? - .into_inner(); + let req = cln::StopRequest::default(); + let resp = + with_connection_fallback(client.stop(req.clone()), || client_clone.stop(req)) + .await? + .into_inner(); Ok(crate::serializer::value::to_value(&resp)?) } } @@ -1873,20 +1921,23 @@ impl NodeAPI for Greenlight { last_hop_hint: Option<&RouteHintHop>, ) -> NodeResult> { let mut client = self.get_node_client().await?; + let mut client_clone = self.get_node_client().await?; let mut peers = HashMap::new(); - client - .list_peer_channels(cln::ListpeerchannelsRequest::default()) - .await? - .into_inner() - .channels - .into_iter() - .for_each(|channel| { - peers - .entry(channel.peer_id().to_vec()) - .or_insert(Vec::new()) - .push(channel) - }); + let req = cln::ListpeerchannelsRequest::default(); + with_connection_fallback(client.list_peer_channels(req.clone()), || { + client_clone.list_peer_channels(req) + }) + .await? + .into_inner() + .channels + .into_iter() + .for_each(|channel| { + peers + .entry(channel.peer_id().to_vec()) + .or_insert(Vec::new()) + .push(channel) + }); let mut max_channel_amounts = vec![]; for (peer, channels) in peers { @@ -1920,10 +1971,12 @@ impl NodeAPI for Greenlight { Ok(c) => Ok(c), Err(e) => Err(anyhow!("{}", e)), }?; - - match client - .stream_custommsg(gl_client::signer::model::greenlight::StreamCustommsgRequest {}) - .await + let mut client_clone = client.clone(); + let req = gl_client::signer::model::greenlight::StreamCustommsgRequest {}; + match with_connection_fallback(client.stream_custommsg(req.clone()), || { + client_clone.stream_custommsg(req) + }) + .await { Ok(s) => Ok(s), Err(e) => Err(anyhow!("{}", e)), @@ -1956,17 +2009,20 @@ impl NodeAPI for Greenlight { } async fn send_custom_message(&self, message: CustomMessage) -> NodeResult<()> { + let mut client = self.get_node_client().await?; + let mut client_clone = client.clone(); + let mut msg = message.message_type.to_be_bytes().to_vec(); msg.extend(message.payload); - let resp = self - .get_node_client() - .await? - .send_custom_msg(cln::SendcustommsgRequest { - msg, - node_id: message.peer_id, - }) - .await? - .into_inner(); + let req = cln::SendcustommsgRequest { + msg, + node_id: message.peer_id, + }; + let resp = with_connection_fallback(client.send_custom_msg(req.clone()), || { + client_clone.send_custom_msg(req) + }) + .await? + .into_inner(); debug!("send_custom_message returned status {:?}", resp.status); Ok(()) } @@ -1976,7 +2032,8 @@ impl NodeAPI for Greenlight { &self, lsp_info: &LspInformation, ) -> NodeResult<(Vec, bool)> { - let mut node_client = self.get_node_client().await?; + let mut client = self.get_node_client().await?; + let mut client_clone = client.clone(); let open_peer_channels = self.get_open_peer_channels_pb().await?; let (open_peer_channels_private, open_peer_channels_public): ( @@ -1995,10 +2052,13 @@ impl NodeAPI for Greenlight { .get_node_state()? .map(|n| n.id) .ok_or(NodeError::generic("Node info not found"))?; - let channels: HashMap, cln::ListchannelsChannels> = node_client - .list_channels(cln::ListchannelsRequest { - destination: Some(hex::decode(pubkey)?), - ..Default::default() + let req = cln::ListchannelsRequest { + destination: Some(hex::decode(pubkey)?), + ..Default::default() + }; + let channels: HashMap, cln::ListchannelsChannels> = + with_connection_fallback(client.list_channels(req.clone()), || { + client_clone.list_channels(req) }) .await? .into_inner()