diff --git a/libs/gl-plugin/src/awaitables.rs b/libs/gl-plugin/src/awaitables.rs index 3e4d3807a..4a3963c47 100644 --- a/libs/gl-plugin/src/awaitables.rs +++ b/libs/gl-plugin/src/awaitables.rs @@ -1,13 +1,25 @@ -use std::time::Duration; - use cln_rpc::{ - model::requests::{ - ConnectRequest, GetinfoRequest, GetrouteRequest, ListpeerchannelsRequest, ListpeersRequest, + model::{ + requests::{ + ConnectRequest, GetinfoRequest, GetrouteRequest, ListpeerchannelsRequest, + ListpeersRequest, + }, + responses::GetrouteResponse, }, primitives::{Amount, PublicKey, ShortChannelId}, ClnRpc, }; +use std::{ + future::Future, + path::{Path, PathBuf}, + pin::Pin, + time::Duration, +}; use thiserror; +use tokio::time::Instant; + +// The delay between consecutive rpc calls of the same type. +const RPC_CALL_DELAY_MSEC: u64 = 250; #[derive(Debug, thiserror::Error)] pub enum Error { @@ -17,201 +29,244 @@ pub enum Error { Channel(&'static str), #[error("RPC error: {0}")] Rpc(#[from] cln_rpc::RpcError), + #[error("Error talking to a GL service: {0}")] + Service(String), } /// A struct to track the status of a peer connection. pub struct AwaitablePeer { peer_id: PublicKey, - rpc: ClnRpc, + rpc_path: PathBuf, + + ensure_peer_connection: Option> + Send>>>, } impl AwaitablePeer { - pub fn new(peer_id: PublicKey, rpc: ClnRpc) -> Self { - AwaitablePeer { peer_id, rpc } + pub fn new(peer_id: PublicKey, rpc_path: PathBuf) -> Self { + AwaitablePeer { + peer_id, + rpc_path, + ensure_peer_connection: None, + } } pub async fn wait(&mut self) -> Result<(), Error> { - ensure_peer_connection(&mut self.rpc, self.peer_id).await + ensure_peer_connection(&self.rpc_path, self.peer_id).await + } +} + +impl Future for AwaitablePeer { + type Output = Result<(), Error>; + + fn poll( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + // Ensure that the peer is connected. + + if self.ensure_peer_connection.is_none() { + let fut = Box::pin(ensure_peer_connection( + self.rpc_path.clone(), + self.peer_id.clone(), + )); + self.ensure_peer_connection = Some(fut); + } + + let ensure_peer_connection = self.ensure_peer_connection.as_mut().unwrap(); + match ensure_peer_connection.as_mut().poll(cx) { + std::task::Poll::Ready(result) => std::task::Poll::Ready(result), + std::task::Poll::Pending => std::task::Poll::Pending, + } } } -/// A struct to track the status of a channel. +/// A struct to track the status of a channel. It implements `Future` to +/// await an operable channel state before returning the spendable amount +/// on this channel. pub struct AwaitableChannel { scid: ShortChannelId, peer_id: PublicKey, - rpc: ClnRpc, - version: String, + rpc_path: PathBuf, + + version: Option, + peer_connected: bool, + channel_ready: bool, + route_found: bool, + + last_check: Option, + rpc_call_delay: Duration, + + get_version: Option> + Send>>>, + ensure_peer_connection: Option> + Send>>>, + billboard: Option, Error>> + Send>>>, + get_route: Option> + Send>>>, + spendable_msat: Option> + Send>>>, } -impl AwaitableChannel { - pub async fn new( - peer_id: PublicKey, - scid: ShortChannelId, - mut rpc: ClnRpc, - ) -> Result { - let info = rpc - .call_typed(&GetinfoRequest {}) - .await - .map_err(|_| Error::Peer("unable to connect"))?; - let version = info.version; - Ok(AwaitableChannel { - peer_id, - scid, - rpc, - version, - }) - } +impl Future for AwaitableChannel { + type Output = Result; - pub async fn wait(&mut self) -> Result { - use tokio::time::sleep; - // Ensure that we are connected to the peer, return an Error if - // we can not connect to the peer. - ensure_peer_connection(&mut self.rpc, self.peer_id).await?; - - // Next step is to wait for the channel to be - // re-established. For this we look into the billboard and - // wait for some magic strings to appear (yeah, I know...) - log::debug!("Checking if channel {} is ready", self.scid); - while !self.billboard().await?.into_iter().any(|s| { - s.find("Channel ready").is_some() || s.find("Reconnected, and reestablished").is_some() - }) { - sleep(Duration::from_millis(250)).await; + fn poll( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let now = Instant::now(); + + if let Some(last_check) = self.last_check { + // We already checked and still need to wait before we retry + let elapsed = now.duration_since(last_check); + if elapsed < self.rpc_call_delay { + return std::task::Poll::Pending; + } } - log::debug!("Channel {} is established", self.scid); - - // Finally, we need to check that we have the gossip required - // to route through the channel. We could check for channels - // individually, but we can check them all at once by using - // `getroute` to the peer. - loop { - let route = self - .rpc - .call_typed(&GetrouteRequest { - id: self.peer_id, - amount_msat: cln_rpc::primitives::Amount::from_msat(1), - riskfactor: 1, - cltv: None, - fromid: None, - fuzzpercent: Some(0), - exclude: None, - maxhops: Some(1), - }) - .await; - - if route.is_ok() { - log::debug!("Peer {:?} is routable", self.peer_id.to_string()); - break; - } else { - sleep(Duration::from_millis(500)).await; + + // Get version if not set already. + if self.version.is_none() { + if self.get_version.is_none() { + let fut = Box::pin(get_version(self.rpc_path.clone())); + self.get_version = Some(fut); + } + + let get_version = self.get_version.as_mut().unwrap(); + match get_version.as_mut().poll(cx) { + std::task::Poll::Ready(v) => { + self.version = Some(v?); + } + std::task::Poll::Pending => return std::task::Poll::Pending, } } - self.spendable_msat().await - } + // Ensure that the peer is connected. + if !self.peer_connected { + if self.ensure_peer_connection.is_none() { + let fut = Box::pin(ensure_peer_connection( + self.rpc_path.clone(), + self.peer_id.clone(), + )); + self.ensure_peer_connection = Some(fut); + } + + let ensure_peer_connection = self.ensure_peer_connection.as_mut().unwrap(); + match ensure_peer_connection.as_mut().poll(cx) { + std::task::Poll::Ready(result) => { + result?; + log::debug!("Peer {} is connected", self.peer_id.to_string()); + self.peer_connected = true; + } + std::task::Poll::Pending => return std::task::Poll::Pending, + } + } + + // Ensure that the channel is reestablished. + if !self.channel_ready { + if self.billboard.is_none() { + let fut = Box::pin(billboard( + self.rpc_path.clone(), + self.version.as_ref().unwrap().clone(), + self.peer_id.clone(), + self.scid, + )); + self.billboard = Some(fut); + } + + let billboard = self.billboard.as_mut().unwrap(); + match billboard.as_mut().poll(cx) { + std::task::Poll::Ready(result) => { + let result = result?; + if !result.into_iter().any(|s| { + s.find("Channel ready").is_some() + || s.find("Reconnected, and reestablished").is_some() + }) { + // Reset billboard and last_check to back-off for a bit. + self.last_check = Some(Instant::now()); + self.billboard = None; + return std::task::Poll::Pending; + } + log::debug!("Channel {} is established", self.scid); + self.channel_ready = true; + } + std::task::Poll::Pending => return std::task::Poll::Pending, + } + } + + // Ensure that the channel can be used to route an htlc to the peer. + if !self.route_found { + if self.get_route.is_none() { + let fut = Box::pin(get_route(self.rpc_path.clone(), self.peer_id.clone())); + self.get_route = Some(fut); + } + + let get_route = self.get_route.as_mut().unwrap(); + match get_route.as_mut().poll(cx) { + std::task::Poll::Ready(route) => { + if route.is_ok() { + log::debug!("Peer {:?} is routable", self.peer_id.to_string()); + self.route_found = true; + } else { + // Reset get_route and last_check to back-off for a bit. + self.last_check = Some(Instant::now()); + self.get_route = None; + return std::task::Poll::Pending; + }; + } + std::task::Poll::Pending => return std::task::Poll::Pending, + } + } + + // Return the amount that can be send via this channel. + if self.spendable_msat.is_none() { + let fut = Box::pin(spendable_msat( + self.rpc_path.clone(), + self.version.as_ref().unwrap().clone(), + self.peer_id.clone(), + self.scid, + )); + self.spendable_msat = Some(fut); + } - /// Retrieve the spendable amount for the channel. - async fn spendable_msat(&mut self) -> Result { - if *self.version >= *"v23.05gl1" { - Ok(self - .rpc - .call_typed(&ListpeerchannelsRequest { - id: Some(self.peer_id), - }) - .await - .map_err(|e| Error::Rpc(e))? - .channels - .ok_or(Error::Channel("No channels found"))? - .into_iter() - .filter(|c| { - c.short_channel_id == Some(self.scid) - || c.alias.clone().and_then(|a| a.local) == Some(self.scid) - }) - .nth(0) - .ok_or(Error::Channel( - "Could not find the channel in listpeerchannels", - ))? - .spendable_msat - .ok_or(Error::Channel("No amount found"))?) - } else { - #[allow(deprecated)] - Ok(self - .rpc - .call_typed(&ListpeersRequest { - id: Some(self.peer_id), - level: None, - }) - .await - .map_err(|e| Error::Rpc(e))? - .peers - .into_iter() - .nth(0) - .ok_or(Error::Peer("Has no peerlist"))? - .channels - .into_iter() - .nth(0) - .ok_or(Error::Channel("Empty channel list"))? - .into_iter() - .filter(|c| c.short_channel_id == Some(self.scid)) - .nth(0) - .ok_or(Error::Channel("No channel with scid"))? - .spendable_msat - .ok_or(Error::Channel("No amount found"))?) + let spendable_msat = self.spendable_msat.as_mut().unwrap(); + match spendable_msat.as_mut().poll(cx) { + std::task::Poll::Ready(amount) => std::task::Poll::Ready(amount), + std::task::Poll::Pending => std::task::Poll::Pending, } } +} - async fn billboard(&mut self) -> Result, Error> { - if *self.version >= *"v23.05gl1" { - Ok(self - .rpc - .call_typed(&ListpeerchannelsRequest { - id: Some(self.peer_id), - }) - .await - .map_err(|e| Error::Rpc(e))? - .channels - .unwrap() - .into_iter() - .filter(|c| { - c.short_channel_id == Some(self.scid) - || c.alias.clone().and_then(|a| a.local) == Some(self.scid) - }) - .nth(0) - .ok_or(Error::Channel( - "Could not find the channel in listpeerchannels", - ))? - .status - .unwrap()) - } else { - #[allow(deprecated)] - Ok(self - .rpc - .call_typed(&ListpeersRequest { - id: Some(self.peer_id), - level: None, - }) - .await - .map_err(|e| Error::Rpc(e))? - .peers - .into_iter() - .nth(0) - .unwrap() - .channels - .into_iter() - .nth(0) - .unwrap() - .into_iter() - .filter(|c| c.short_channel_id == Some(self.scid)) - .nth(0) - .unwrap() - .status - .unwrap()) +impl AwaitableChannel { + pub async fn new(peer_id: PublicKey, scid: ShortChannelId, rpc_path: PathBuf) -> Self { + AwaitableChannel { + peer_id, + scid, + rpc_path, + version: None, + peer_connected: false, + channel_ready: false, + route_found: false, + last_check: None, + rpc_call_delay: Duration::from_millis(RPC_CALL_DELAY_MSEC), + get_version: None, + ensure_peer_connection: None, + billboard: None, + get_route: None, + spendable_msat: None, } } } +async fn connect(rpc_path: impl AsRef) -> Result { + ClnRpc::new(rpc_path) + .await + .map_err(|e| Error::Service(format!("cant connect to rpc {}", e.to_string()))) +} + /// Try to connect to the peer if we are not already connected. -async fn ensure_peer_connection(rpc: &mut ClnRpc, peer_id: PublicKey) -> Result<(), Error> { +async fn ensure_peer_connection( + rpc_path: impl AsRef, + peer_id: PublicKey, +) -> Result<(), Error> { log::debug!("Checking if peer {} is connected", peer_id); + + let mut rpc = connect(rpc_path).await?; let res = rpc .call_typed(&cln_rpc::model::requests::ListpeersRequest { id: Some(peer_id), @@ -236,3 +291,135 @@ async fn ensure_peer_connection(rpc: &mut ClnRpc, peer_id: PublicKey) -> Result< } Ok(()) } + +async fn get_version(rpc_path: impl AsRef) -> Result { + let mut rpc = connect(rpc_path).await?; + let info = rpc + .call_typed(&GetinfoRequest {}) + .await + .map_err(|_| Error::Peer("unable to connect"))?; + Ok(info.version) +} + +async fn billboard( + rpc_path: impl AsRef, + version: String, + peer_id: PublicKey, + scid: ShortChannelId, +) -> Result, Error> { + let mut rpc = connect(rpc_path).await?; + if *version >= *"v23.05gl1" { + Ok(rpc + .call_typed(&ListpeerchannelsRequest { id: Some(peer_id) }) + .await + .map_err(|e| Error::Rpc(e))? + .channels + .ok_or(Error::Channel("No channels found"))? + .into_iter() + .filter(|c| { + c.short_channel_id == Some(scid) + || c.alias.clone().and_then(|a| a.local) == Some(scid) + }) + .nth(0) + .ok_or(Error::Channel( + "Could not find the channel in listpeerchannels", + ))? + .status + .ok_or(Error::Channel("Status not found"))?) + } else { + #[allow(deprecated)] + Ok(rpc + .call_typed(&ListpeersRequest { + id: Some(peer_id), + level: None, + }) + .await + .map_err(|e| Error::Rpc(e))? + .peers + .into_iter() + .nth(0) + .ok_or(Error::Channel("Has no peers list"))? + .channels + .into_iter() + .nth(0) + .ok_or(Error::Channel("Has no channels list"))? + .into_iter() + .filter(|c| c.short_channel_id == Some(scid)) + .nth(0) + .ok_or(Error::Channel("No channel with scid"))? + .status + .ok_or(Error::Channel("No amount found"))?) + } +} + +async fn get_route( + rpc_path: impl AsRef, + peer_id: PublicKey, +) -> Result { + let mut rpc = connect(rpc_path).await?; + Ok(rpc + .call_typed(&GetrouteRequest { + id: peer_id, + amount_msat: cln_rpc::primitives::Amount::from_msat(1), + riskfactor: 1, + cltv: None, + fromid: None, + fuzzpercent: Some(0), + exclude: None, + maxhops: Some(1), + }) + .await?) +} + +async fn spendable_msat( + rpc_path: impl AsRef, + version: String, + peer_id: PublicKey, + scid: ShortChannelId, +) -> Result { + let mut rpc = connect(rpc_path).await?; + if *version >= *"v23.05gl1" { + Ok(rpc + .call_typed(&ListpeerchannelsRequest { id: Some(peer_id) }) + .await + .map_err(|e| Error::Rpc(e))? + .channels + .ok_or(Error::Channel("No channels found"))? + .into_iter() + .filter(|c| { + c.short_channel_id == Some(scid) + || c.alias.clone().and_then(|a| a.local) == Some(scid) + }) + .nth(0) + .ok_or(Error::Channel( + "Could not find the channel in listpeerchannels", + ))? + .spendable_msat + .ok_or(Error::Channel("No amount found"))?) + } else { + #[allow(deprecated)] + Ok(rpc + .call_typed(&ListpeersRequest { + id: Some(peer_id), + level: None, + }) + .await + .map_err(|e| Error::Rpc(e))? + .peers + .into_iter() + .nth(0) + .ok_or(Error::Channel("Has no peers list"))? + .channels + .into_iter() + .nth(0) + .ok_or(Error::Channel("Has no channels list"))? + .into_iter() + .filter(|c| c.short_channel_id == Some(scid)) + .nth(0) + .ok_or(Error::Channel("No channel with scid"))? + .spendable_msat + .ok_or(Error::Channel("No amount found"))?) + } +} + +pub fn assert_send(_: T) {} diff --git a/libs/gl-plugin/src/tramp.rs b/libs/gl-plugin/src/tramp.rs index 7b4dcad8d..e2b34bfa7 100644 --- a/libs/gl-plugin/src/tramp.rs +++ b/libs/gl-plugin/src/tramp.rs @@ -9,6 +9,8 @@ use futures::{future::join_all, FutureExt}; use gl_client::bitcoin::hashes::hex::ToHex; use log::{debug, warn}; use serde::{Deserialize, Serialize}; +use std::path::PathBuf; +use std::str::FromStr; use std::{path::Path, time::Duration}; use tokio::time::{timeout_at, Instant}; @@ -71,7 +73,7 @@ pub async fn trampolinepay( // Wait for the peer connection to re-establish. log::debug!("Await peer connection to {}", node_id.to_hex()); - AwaitablePeer::new(node_id, ClnRpc::new(&rpc_path).await?) + AwaitablePeer::new(node_id, rpc_path.as_ref().to_path_buf()) .wait() .await?; @@ -171,7 +173,9 @@ pub async fn trampolinepay( // Await and filter out re-established channels. let deadline = Instant::now() + Duration::from_secs(AWAIT_CHANNELS_TIMEOUT_SEC); - let mut channels = reestablished_channels(channels, node_id, &rpc_path, deadline).await?; + let mut channels = + reestablished_channels(channels, node_id, rpc_path.as_ref().to_path_buf(), deadline) + .await?; // Note: We can also do this inside the reestablished_channels function // but as we want to be greedy picking our channels we don't want to @@ -354,24 +358,30 @@ async fn do_pay( async fn reestablished_channels( channels: Vec, node_id: PublicKey, - rpc_path: impl AsRef, + rpc_path: PathBuf, deadline: Instant, ) -> Result> { // Wait for channels to re-establish. - let mut awaitables = Vec::new(); + crate::awaitables::assert_send(AwaitableChannel::new( + node_id, + ShortChannelId::from_str("1x1x1")?, + rpc_path.clone(), + )); + let mut futures = Vec::new(); for c in &channels { - awaitables.push( - AwaitableChannel::new(node_id, c.short_channel_id, ClnRpc::new(&rpc_path).await?) - .await?, + let rp = rpc_path.clone(); + futures.push( + async move { + timeout_at( + deadline, + AwaitableChannel::new(node_id, c.short_channel_id, rp), + ) + .await + } + .boxed(), ); } - // Wait with timeout for channel to re-establish. - let futures: Vec<_> = awaitables - .into_iter() - .map(|mut aw| async move { timeout_at(deadline, aw.wait()).await }.boxed()) - .collect(); - log::info!( "Starting {} tasks to wait for channels to be ready", futures.len()