From 2b23f4ae3e95029dd080f166d3d50104ff48370d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zoe=20Faltib=C3=A0?= Date: Fri, 10 Nov 2023 17:35:23 +0100 Subject: [PATCH] avoid locking mutex in async methods --- src/ldk.rs | 308 +++++++++++++++++++++-------------------- src/main.rs | 17 ++- src/rgb.rs | 205 +++++++++++++++++++++++---- src/routes.rs | 374 ++++++++++++++++++++++++-------------------------- src/utils.rs | 37 +---- 5 files changed, 536 insertions(+), 405 deletions(-) diff --git a/src/ldk.rs b/src/ldk.rs index 0295351..707ff21 100644 --- a/src/ldk.rs +++ b/src/ldk.rs @@ -59,7 +59,7 @@ use std::io::{Read, Seek, SeekFrom}; use std::path::{Path, PathBuf}; use std::str::FromStr; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::{Arc, Mutex, MutexGuard, RwLock}; use std::time::{Duration, SystemTime}; use strict_encoding::{FieldName, TypeName}; use tokio::sync::watch::Sender; @@ -88,6 +88,7 @@ pub(crate) struct LdkBackgroundServices { background_processor: Option>>, } +#[derive(Clone)] pub(crate) struct PaymentInfo { pub(crate) preimage: Option, pub(crate) secret: Option, @@ -118,6 +119,103 @@ impl_writeable_tlv_based!(OutboundPaymentInfoStorage, { (0, payments, required), }); +impl UnlockedAppState { + pub(crate) fn add_inbound_payment(&self, payment_hash: PaymentHash, payment_info: PaymentInfo) { + let mut inbound = self.get_inbound_payments(); + inbound.payments.insert(payment_hash, payment_info); + self.save_inbound_payments(inbound); + } + + pub(crate) fn add_outbound_payment(&self, payment_id: PaymentId, payment_info: PaymentInfo) { + let mut outbound = self.get_outbound_payments(); + outbound.payments.insert(payment_id, payment_info); + self.save_outbound_payments(outbound); + } + + fn fail_outbound_pending_payments(&self, recent_payments_payment_ids: Vec) { + let mut outbound = self.get_outbound_payments(); + for (payment_id, payment_info) in outbound + .payments + .iter_mut() + .filter(|(_, i)| matches!(i.status, HTLCStatus::Pending)) + { + if !recent_payments_payment_ids.contains(payment_id) { + payment_info.status = HTLCStatus::Failed; + } + } + self.save_outbound_payments(outbound); + } + + pub(crate) fn inbound_payments(&self) -> HashMap { + self.get_inbound_payments().payments.clone() + } + + pub(crate) fn outbound_payments(&self) -> HashMap { + self.get_outbound_payments().payments.clone() + } + + fn save_inbound_payments(&self, inbound: MutexGuard) { + self.fs_store + .write("", "", INBOUND_PAYMENTS_FNAME, &inbound.encode()) + .unwrap(); + } + + fn save_outbound_payments(&self, outbound: MutexGuard) { + self.fs_store + .write("", "", OUTBOUND_PAYMENTS_FNAME, &outbound.encode()) + .unwrap(); + } + + fn upsert_inbound_payment( + &self, + payment_hash: PaymentHash, + status: HTLCStatus, + preimage: Option, + secret: Option, + amt_msat: Option, + ) { + let mut inbound = self.get_inbound_payments(); + match inbound.payments.entry(payment_hash) { + Entry::Occupied(mut e) => { + let payment = e.get_mut(); + payment.status = status; + payment.preimage = preimage; + payment.secret = secret; + } + Entry::Vacant(e) => { + e.insert(PaymentInfo { + preimage, + secret, + status, + amt_msat, + }); + } + } + self.save_inbound_payments(inbound); + } + + pub(crate) fn update_outbound_payment( + &self, + payment_id: PaymentId, + status: HTLCStatus, + preimage: Option, + ) -> PaymentInfo { + let mut outbound = self.get_outbound_payments(); + let outbound_payment = outbound.payments.get_mut(&payment_id).unwrap(); + outbound_payment.status = status; + outbound_payment.preimage = preimage; + let payment = (*outbound_payment).clone(); + self.save_outbound_payments(outbound); + payment + } + + pub(crate) fn update_outbound_payment_status(&self, payment_id: PaymentId, status: HTLCStatus) { + let mut outbound = self.get_outbound_payments(); + outbound.payments.get_mut(&payment_id).unwrap().status = status; + self.save_outbound_payments(outbound); + } +} + type ChainMonitor = chainmonitor::ChainMonitor< InMemorySigner, Arc, @@ -216,26 +314,15 @@ async fn handle_ldk_events( }; let unlocked_state_copy = unlocked_state.clone(); - let online_copy = unlocked_state.rgb_online.clone(); let unsigned_psbt = tokio::task::spawn_blocking(move || { unlocked_state_copy - .get_rgb_wallet() - .send_begin( - online_copy, - recipient_map, - true, - FEE_RATE, - MIN_CHANNEL_CONFIRMATIONS, - ) + .rgb_send_begin(recipient_map, true, FEE_RATE, MIN_CHANNEL_CONFIRMATIONS) .unwrap() }) .await .unwrap(); - let signed_psbt = unlocked_state - .get_rgb_wallet() - .sign_psbt(unsigned_psbt, None) - .unwrap(); + let signed_psbt = unlocked_state.rgb_sign_psbt(unsigned_psbt).unwrap(); let psbt = BdkPsbt::from_str(&signed_psbt).unwrap(); @@ -246,8 +333,7 @@ async fn handle_ldk_events( fs::write(psbt_path, psbt.to_string()).unwrap(); let consignment_path = unlocked_state - .get_rgb_wallet() - .get_wallet_dir() + .rgb_get_wallet_dir() .join("transfers") .join(funding_txid.clone()) .join(asset_id) @@ -330,27 +416,13 @@ async fn handle_ldk_events( } => (payment_preimage, Some(payment_secret)), PaymentPurpose::SpontaneousPayment(preimage) => (Some(preimage), None), }; - let mut inbound = unlocked_state.get_inbound_payments(); - match inbound.payments.entry(payment_hash) { - Entry::Occupied(mut e) => { - let payment = e.get_mut(); - payment.status = HTLCStatus::Succeeded; - payment.preimage = payment_preimage; - payment.secret = payment_secret; - } - Entry::Vacant(e) => { - e.insert(PaymentInfo { - preimage: payment_preimage, - secret: payment_secret, - status: HTLCStatus::Succeeded, - amt_msat: Some(amount_msat), - }); - } - } - unlocked_state - .fs_store - .write("", "", INBOUND_PAYMENTS_FNAME, &inbound.encode()) - .unwrap(); + unlocked_state.upsert_inbound_payment( + payment_hash, + HTLCStatus::Succeeded, + payment_preimage, + payment_secret, + Some(amount_msat), + ); } Event::PaymentSent { payment_preimage, @@ -359,29 +431,23 @@ async fn handle_ldk_events( payment_id, .. } => { - let mut outbound = unlocked_state.get_outbound_payments(); - for (id, payment) in outbound.payments.iter_mut() { - if *id == payment_id.unwrap() { - payment.preimage = Some(payment_preimage); - payment.status = HTLCStatus::Succeeded; - tracing::info!( - "EVENT: successfully sent payment of {:?} millisatoshis{} from \ - payment hash {} with preimage {}", - payment.amt_msat, - if let Some(fee) = fee_paid_msat { - format!(" (fee {} msat)", fee) - } else { - "".to_string() - }, - payment_hash, - payment_preimage - ); - } - } - unlocked_state - .fs_store - .write("", "", OUTBOUND_PAYMENTS_FNAME, &outbound.encode()) - .unwrap(); + let payment = unlocked_state.update_outbound_payment( + payment_id.unwrap(), + HTLCStatus::Succeeded, + Some(payment_preimage), + ); + tracing::info!( + "EVENT: successfully sent payment of {:?} millisatoshis{} from \ + payment hash {} with preimage {}", + payment.amt_msat, + if let Some(fee) = fee_paid_msat { + format!(" (fee {} msat)", fee) + } else { + "".to_string() + }, + payment_hash, + payment_preimage + ); } Event::OpenChannelRequest { ref temporary_channel_id, @@ -433,15 +499,7 @@ async fn handle_ldk_events( } ); - let mut outbound = unlocked_state.get_outbound_payments(); - if outbound.payments.contains_key(&payment_id) { - let payment = outbound.payments.get_mut(&payment_id).unwrap(); - payment.status = HTLCStatus::Failed; - } - unlocked_state - .fs_store - .write("", "", OUTBOUND_PAYMENTS_FNAME, &outbound.encode()) - .unwrap(); + unlocked_state.update_outbound_payment_status(payment_id, HTLCStatus::Failed); } Event::InvoiceRequestFailed { payment_id } => { tracing::error!( @@ -449,15 +507,7 @@ async fn handle_ldk_events( payment_id, ); - let mut outbound = unlocked_state.get_outbound_payments(); - if outbound.payments.contains_key(&payment_id) { - let payment = outbound.payments.get_mut(&payment_id).unwrap(); - payment.status = HTLCStatus::Failed; - } - unlocked_state - .fs_store - .write("", "", OUTBOUND_PAYMENTS_FNAME, &outbound.encode()) - .unwrap(); + unlocked_state.update_outbound_payment_status(payment_id, HTLCStatus::Failed); } Event::PaymentForwarded { prev_channel_id, @@ -584,13 +634,9 @@ async fn handle_ldk_events( let psbt_str = fs::read_to_string(psbt_path).unwrap(); let state_copy = unlocked_state.clone(); - let online_copy = unlocked_state.rgb_online.clone(); let psbt_str_copy = psbt_str.clone(); let _txid = tokio::task::spawn_blocking(move || { - state_copy - .get_rgb_wallet() - .send_end(online_copy, psbt_str_copy) - .unwrap() + state_copy.rgb_send_end(psbt_str_copy).unwrap() }) .await .unwrap(); @@ -605,11 +651,7 @@ async fn handle_ldk_events( let asset_schema = AssetSchema::from_schema_id(schema_id).unwrap(); let mut runtime = get_rgb_runtime(Path::new(&static_state.ldk_data_dir)); - match unlocked_state.get_rgb_wallet().save_new_asset( - &mut runtime, - &asset_schema, - contract_id, - ) { + match unlocked_state.rgb_save_new_asset(&mut runtime, &asset_schema, contract_id) { Ok(_) => {} Err(e) if e.to_string().contains("UNIQUE constraint failed") => {} Err(e) => panic!("Failed saving asset: {}", e), @@ -629,14 +671,8 @@ async fn handle_ldk_events( ); tokio::task::spawn_blocking(move || { - unlocked_state - .get_rgb_wallet() - .refresh(unlocked_state.rgb_online.clone(), None, vec![]) - .unwrap(); - unlocked_state - .get_rgb_wallet() - .refresh(unlocked_state.rgb_online.clone(), None, vec![]) - .unwrap() + unlocked_state.rgb_refresh().unwrap(); + unlocked_state.rgb_refresh().unwrap() }) .await .unwrap(); @@ -723,14 +759,7 @@ async fn _spend_outputs( let contract_id = transfer_info.contract_id; let receive_data = unlocked_state - .get_rgb_wallet() - .witness_receive( - None, - None, - None, - vec![static_state.proxy_endpoint.clone()], - 0, - ) + .rgb_witness_receive(vec![static_state.proxy_endpoint.clone()]) .unwrap(); let script_buf_str = receive_data.recipient_id; let script_buf = ScriptBuf::from_hex(&script_buf_str).unwrap(); @@ -958,7 +987,7 @@ async fn _spend_outputs( } if !vanilla_output_descriptors.is_empty() { - let address_str = unlocked_state.get_rgb_wallet().get_address().unwrap(); + let address_str = unlocked_state.rgb_get_address().unwrap(); let address = Address::from_str(&address_str).unwrap().assume_checked(); let script_buf = address.script_pubkey(); let bdk_script = BdkScript::from(script_buf.into_bytes()); @@ -981,14 +1010,8 @@ async fn _spend_outputs( if need_rgb_refresh { tokio::task::spawn_blocking(move || { - unlocked_state - .get_rgb_wallet() - .refresh(unlocked_state.rgb_online.clone(), None, vec![]) - .unwrap(); - unlocked_state - .get_rgb_wallet() - .refresh(unlocked_state.rgb_online.clone(), None, vec![]) - .unwrap() + unlocked_state.rgb_refresh().unwrap(); + unlocked_state.rgb_refresh().unwrap() }) .await .unwrap(); @@ -1421,35 +1444,6 @@ pub(crate) async fn start_ldk( let outbound_payments = Arc::new(Mutex::new(disk::read_outbound_payment_info(Path::new( &format!("{}/{}", ldk_data_dir, OUTBOUND_PAYMENTS_FNAME), )))); - let recent_payments_payment_ids = channel_manager - .list_recent_payments() - .into_iter() - .map(|p| match p { - RecentPaymentDetails::Pending { payment_id, .. } => payment_id, - RecentPaymentDetails::Fulfilled { payment_id, .. } => payment_id, - RecentPaymentDetails::Abandoned { payment_id, .. } => payment_id, - RecentPaymentDetails::AwaitingInvoice { payment_id } => payment_id, - }) - .collect::>(); - for (payment_id, payment_info) in outbound_payments - .lock() - .unwrap() - .payments - .iter_mut() - .filter(|(_, i)| matches!(i.status, HTLCStatus::Pending)) - { - if !recent_payments_payment_ids.contains(payment_id) { - payment_info.status = HTLCStatus::Failed; - } - } - fs_store - .write( - "", - "", - OUTBOUND_PAYMENTS_FNAME, - &outbound_payments.lock().unwrap().encode(), - ) - .unwrap(); let xkey: ExtendedKey = mnemonic .clone() @@ -1515,6 +1509,18 @@ pub(crate) async fn start_ldk( rgb_online, }); + let recent_payments_payment_ids = channel_manager + .list_recent_payments() + .into_iter() + .map(|p| match p { + RecentPaymentDetails::Pending { payment_id, .. } => payment_id, + RecentPaymentDetails::Fulfilled { payment_id, .. } => payment_id, + RecentPaymentDetails::Abandoned { payment_id, .. } => payment_id, + RecentPaymentDetails::AwaitingInvoice { payment_id } => payment_id, + }) + .collect::>(); + unlocked_state.fail_outbound_pending_payments(recent_payments_payment_ids); + // Handle LDK Events let unlocked_state_copy = Arc::clone(&unlocked_state); let static_state_copy = Arc::clone(static_state); @@ -1633,16 +1639,14 @@ pub(crate) async fn start_ldk( )) } -pub(crate) async fn stop_ldk(app_state: Arc) { - tracing::info!("Stopping LDK"); - - let join_handle = { - let mut ldk_background_services = app_state.ldk_background_services.lock().unwrap(); +impl AppState { + fn stop_ldk(&self) -> Option>> { + let mut ldk_background_services = self.get_ldk_background_services(); if ldk_background_services.is_none() { // node is locked tracing::info!("LDK is not running"); - return; + return None; } let ldk_background_services = ldk_background_services.as_mut().unwrap(); @@ -1661,9 +1665,13 @@ pub(crate) async fn stop_ldk(app_state: Arc) { } else { None } - }; + } +} + +pub(crate) async fn stop_ldk(app_state: Arc) { + tracing::info!("Stopping LDK"); - if let Some(join_handle) = join_handle { + if let Some(join_handle) = app_state.stop_ldk() { join_handle.await.unwrap().unwrap(); } diff --git a/src/main.rs b/src/main.rs index 527cda0..1aa7dde 100644 --- a/src/main.rs +++ b/src/main.rs @@ -127,6 +127,18 @@ pub(crate) async fn app(args: LdkUserInfo) -> Result<(Router, Arc), Ap Ok((router, app_state)) } +impl AppState { + fn wait_state_change(&self) -> bool { + let _unlocked_state = self.get_unlocked_app_state(); + let mut changing_state = self.get_changing_state(); + if !*changing_state { + *changing_state = true; + return true; + } + false + } +} + /// Tokio signal handler that will wait for a user to press CTRL+C. /// We use this in our hyper `Server` method `with_graceful_shutdown`. async fn shutdown_signal(app_state: Arc) { @@ -160,10 +172,7 @@ async fn shutdown_signal(app_state: Arc) { let app_state_copy = app_state.clone(); loop { { - let _unlocked_state = app_state_copy.get_unlocked_app_state(); - let mut changing_state = app_state_copy.get_changing_state(); - if !*changing_state { - *changing_state = true; + if app_state_copy.wait_state_change() { break; } } diff --git a/src/rgb.rs b/src/rgb.rs index 64af486..69c1408 100644 --- a/src/rgb.rs +++ b/src/rgb.rs @@ -17,46 +17,26 @@ use lightning::events::bump_transaction::{Utxo, WalletSource}; use lightning::rgb_utils::STATIC_BLINDING; use rgb_core::Operation; use rgb_lib::utils::RgbRuntime; -use rgb_lib::wallet::Online; -use rgb_lib::{BitcoinNetwork, SignOptions, Wallet as RgbLibWallet}; +use rgb_lib::wallet::{ + AssetNIA, Assets, Balance, BtcBalance, Online, ReceiveData, Recipient, + Transaction as RgbLibTransaction, Transfer, Unspent, +}; +use rgb_lib::{ + AssetSchema, BitcoinNetwork, Error as RgbLibError, SignOptions, Wallet as RgbLibWallet, +}; use rgbstd::containers::{Bindle, BuilderSeal, Transfer as RgbTransfer}; use rgbstd::contract::{ContractId, GraphSeal}; -use rgbstd::interface::{TransitionBuilder, TypedState}; +use rgbstd::interface::{ContractIface, TransitionBuilder, TypedState}; use rgbstd::persistence::Inventory; use rgbstd::Txid as RgbTxid; use rgbwallet::psbt::opret::OutputOpret; use rgbwallet::psbt::{PsbtDbc, RgbExt, RgbInExt}; use std::collections::HashMap; +use std::path::PathBuf; use std::str::FromStr; use std::sync::{Arc, Mutex}; -use crate::error::APIError; - -pub(crate) fn match_rgb_lib_error(error: &rgb_lib::Error, default: APIError) -> APIError { - tracing::error!("ERR from rgb-lib: {error:?}"); - match error { - rgb_lib::Error::AllocationsAlreadyAvailable => APIError::AllocationsAlreadyAvailable, - rgb_lib::Error::AssetNotFound { .. } => APIError::UnknownContractId, - rgb_lib::Error::InsufficientAllocationSlots => APIError::NoAvailableUtxos, - rgb_lib::Error::InsufficientBitcoins { needed, available } => { - APIError::InsufficientFunds(needed - available) - } - rgb_lib::Error::InvalidAssetID { asset_id } => APIError::InvalidAssetID(asset_id.clone()), - rgb_lib::Error::InvalidBlindedUTXO { details } => { - APIError::InvalidBlindedUTXO(details.clone()) - } - rgb_lib::Error::InvalidFeeRate { details } => APIError::InvalidFeeRate(details.clone()), - rgb_lib::Error::InvalidName { details } => APIError::InvalidName(details.clone()), - rgb_lib::Error::InvalidPrecision { details } => APIError::InvalidPrecision(details.clone()), - rgb_lib::Error::InvalidTicker { details } => APIError::InvalidTicker(details.clone()), - rgb_lib::Error::InvalidTransportEndpoints { details } => { - APIError::InvalidTransportEndpoints(details.clone()) - } - rgb_lib::Error::RecipientIDAlreadyUsed => APIError::RecipientIDAlreadyUsed, - rgb_lib::Error::OutputBelowDustLimit => APIError::OutputBelowDustLimit, - _ => default, - } -} +use crate::utils::UnlockedAppState; pub(crate) fn update_transition_beneficiary( psbt: &PartiallySignedTransaction, @@ -93,6 +73,171 @@ pub(crate) fn get_bitcoin_network(network: &Network) -> BitcoinNetwork { BitcoinNetwork::from_str(&network.to_string()).unwrap() } +impl UnlockedAppState { + pub(crate) fn rgb_blind_receive( + &self, + asset_id: Option, + transport_endpoints: Vec, + min_confirmations: u8, + ) -> Result { + self.get_rgb_wallet().blind_receive( + asset_id, + None, + None, + transport_endpoints, + min_confirmations, + ) + } + + pub(crate) fn rgb_create_utxos( + &self, + up_to: bool, + num: u8, + size: u32, + fee_rate: f32, + ) -> Result { + self.get_rgb_wallet().create_utxos( + self.rgb_online.clone(), + up_to, + Some(num), + Some(size), + fee_rate, + ) + } + + pub(crate) fn rgb_get_address(&self) -> Result { + self.get_rgb_wallet().get_address() + } + + pub(crate) fn rgb_get_asset_balance( + &self, + contract_id: ContractId, + ) -> Result { + self.get_rgb_wallet() + .get_asset_balance(contract_id.to_string()) + } + + pub(crate) fn rgb_get_btc_balance(&self) -> Result { + self.get_rgb_wallet() + .get_btc_balance(self.rgb_online.clone()) + } + + pub(crate) fn rgb_get_wallet_dir(&self) -> PathBuf { + self.get_rgb_wallet().get_wallet_dir() + } + + pub(crate) fn rgb_issue_asset_nia( + &self, + ticker: String, + name: String, + precision: u8, + amounts: Vec, + ) -> Result { + self.get_rgb_wallet().issue_asset_nia( + self.rgb_online.clone(), + ticker, + name, + precision, + amounts, + ) + } + + pub(crate) fn rgb_list_assets(&self) -> Result { + self.get_rgb_wallet().list_assets(vec![]) + } + + pub(crate) fn rgb_list_transactions(&self) -> Result, RgbLibError> { + self.get_rgb_wallet() + .list_transactions(Some(self.rgb_online.clone())) + } + + pub(crate) fn rgb_list_transfers( + &self, + asset_id: String, + ) -> Result, RgbLibError> { + self.get_rgb_wallet().list_transfers(Some(asset_id)) + } + + pub(crate) fn rgb_list_unspents(&self) -> Result, RgbLibError> { + self.get_rgb_wallet() + .list_unspents(Some(self.rgb_online.clone()), false) + } + + pub(crate) fn rgb_refresh(&self) -> Result { + self.get_rgb_wallet() + .refresh(self.rgb_online.clone(), None, vec![]) + } + + pub(crate) fn rgb_save_new_asset( + &self, + runtime: &mut RgbRuntime, + asset_schema: &AssetSchema, + contract_id: ContractId, + ) -> Result { + self.get_rgb_wallet() + .save_new_asset(runtime, asset_schema, contract_id) + } + + pub(crate) fn rgb_send( + &self, + recipient_map: HashMap>, + donation: bool, + fee_rate: f32, + min_confirmations: u8, + ) -> Result { + self.get_rgb_wallet().send( + self.rgb_online.clone(), + recipient_map, + donation, + fee_rate, + min_confirmations, + ) + } + + pub(crate) fn rgb_send_begin( + &self, + recipient_map: HashMap>, + donation: bool, + fee_rate: f32, + min_confirmations: u8, + ) -> Result { + self.get_rgb_wallet().send_begin( + self.rgb_online.clone(), + recipient_map, + donation, + fee_rate, + min_confirmations, + ) + } + + pub(crate) fn rgb_send_btc( + &self, + address: String, + amount: u64, + fee_rate: f32, + ) -> Result { + self.get_rgb_wallet() + .send_btc(self.rgb_online.clone(), address, amount, fee_rate) + } + + pub(crate) fn rgb_send_end(&self, signed_psbt: String) -> Result { + self.get_rgb_wallet() + .send_end(self.rgb_online.clone(), signed_psbt) + } + + pub(crate) fn rgb_sign_psbt(&self, unsigned_psbt: String) -> Result { + self.get_rgb_wallet().sign_psbt(unsigned_psbt, None) + } + + pub(crate) fn rgb_witness_receive( + &self, + transport_endpoints: Vec, + ) -> Result { + self.get_rgb_wallet() + .witness_receive(None, None, None, transport_endpoints, 0) + } +} + pub(crate) trait RgbUtilities { fn send_rgb( &mut self, diff --git a/src/routes.rs b/src/routes.rs index 0fa79b1..4d704d9 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -11,8 +11,6 @@ use lightning::ln::ChannelId; use lightning::onion_message::{Destination, OnionMessagePath}; use lightning::rgb_utils::{get_rgb_payment_info_path, parse_rgb_payment_info}; use lightning::sign::EntropySource; -use lightning::util::persist::KVStore; -use lightning::util::ser::Writeable; use lightning::{ ln::{ channelmanager::{PaymentId, RecipientOnionFields, Retry}, @@ -31,7 +29,7 @@ use lightning_invoice::payment::pay_invoice; use lightning_invoice::Bolt11Invoice; use lightning_invoice::{utils::create_invoice_from_channelmanager, Currency}; use rgb_lib::wallet::{Invoice as RgbLibInvoice, Recipient, RecipientData}; -use rgb_lib::{generate_keys, BitcoinNetwork as RgbLibNetwork}; +use rgb_lib::{generate_keys, BitcoinNetwork as RgbLibNetwork, Error as RgbLibError}; use rgbstd::contract::{ContractId, SecretSeal}; use rgbwallet::RgbTransport; use serde::{Deserialize, Serialize}; @@ -41,15 +39,15 @@ use std::{ sync::Arc, time::{Duration, SystemTime}, }; +use tokio::sync::MutexGuard as TokioMutexGuard; use crate::backup::{do_backup, restore_backup}; -use crate::disk::{INBOUND_PAYMENTS_FNAME, OUTBOUND_PAYMENTS_FNAME}; -use crate::ldk::{start_ldk, stop_ldk, MIN_CHANNEL_CONFIRMATIONS}; -use crate::rgb::{get_bitcoin_network, match_rgb_lib_error}; +use crate::ldk::{start_ldk, stop_ldk, LdkBackgroundServices, MIN_CHANNEL_CONFIRMATIONS}; +use crate::rgb::get_bitcoin_network; use crate::utils::{ - check_already_initialized, check_locked, check_password_strength, check_password_validity, - check_unlocked, encrypt_and_save_mnemonic, get_mnemonic_path, hex_str, - hex_str_to_compressed_pubkey, hex_str_to_vec, UserOnionMessageContents, + check_already_initialized, check_password_strength, check_password_validity, + encrypt_and_save_mnemonic, get_mnemonic_path, hex_str, hex_str_to_compressed_pubkey, + hex_str_to_vec, UnlockedAppState, UserOnionMessageContents, }; use crate::{ disk, @@ -575,15 +573,92 @@ pub(crate) struct Utxo { pub(crate) colorable: bool, } +impl AppState { + fn check_changing_state(&self) -> Result<(), APIError> { + if *self.get_changing_state() { + return Err(APIError::ChangingState); + } + Ok(()) + } + + async fn check_locked( + &self, + ) -> Result>>, APIError> { + let unlocked_app_state = self.get_unlocked_app_state().await; + if unlocked_app_state.is_some() { + Err(APIError::UnlockedNode) + } else { + self.check_changing_state()?; + Ok(unlocked_app_state) + } + } + + async fn check_unlocked( + &self, + ) -> Result>>, APIError> { + let unlocked_app_state = self.get_unlocked_app_state().await; + if unlocked_app_state.is_none() { + Err(APIError::LockedNode) + } else { + self.check_changing_state()?; + Ok(unlocked_app_state) + } + } + + fn update_changing_state(&self, updated: bool) { + let mut changing_state = self.get_changing_state(); + *changing_state = updated; + } + + fn update_ldk_background_services(&self, updated: Option) { + let mut ldk_background_services = self.get_ldk_background_services(); + *ldk_background_services = updated; + } + + async fn update_unlocked_app_state(&self, updated: Option>) { + let mut unlocked_app_state = self.get_unlocked_app_state().await; + *unlocked_app_state = updated; + } +} + +impl From for APIError { + fn from(error: RgbLibError) -> Self { + match error { + RgbLibError::AllocationsAlreadyAvailable => APIError::AllocationsAlreadyAvailable, + RgbLibError::AssetNotFound { .. } => APIError::UnknownContractId, + RgbLibError::FailedIssuance { details } => { + APIError::FailedIssuingAsset(details.clone()) + } + RgbLibError::InsufficientAllocationSlots => APIError::NoAvailableUtxos, + RgbLibError::InsufficientBitcoins { needed, available } => { + APIError::InsufficientFunds(needed - available) + } + RgbLibError::InvalidAssetID { asset_id } => APIError::InvalidAssetID(asset_id.clone()), + RgbLibError::InvalidBlindedUTXO { details } => { + APIError::InvalidBlindedUTXO(details.clone()) + } + RgbLibError::InvalidFeeRate { details } => APIError::InvalidFeeRate(details.clone()), + RgbLibError::InvalidName { details } => APIError::InvalidName(details.clone()), + RgbLibError::InvalidPrecision { details } => { + APIError::InvalidPrecision(details.clone()) + } + RgbLibError::InvalidTicker { details } => APIError::InvalidTicker(details.clone()), + RgbLibError::InvalidTransportEndpoints { details } => { + APIError::InvalidTransportEndpoints(details.clone()) + } + RgbLibError::RecipientIDAlreadyUsed => APIError::RecipientIDAlreadyUsed, + RgbLibError::OutputBelowDustLimit => APIError::OutputBelowDustLimit, + _ => APIError::Unexpected, + } + } +} + pub(crate) async fn address( State(state): State>, ) -> Result, APIError> { - let unlocked_state = check_unlocked(&state)?.clone().unwrap(); + let unlocked_state = state.check_unlocked().await?.clone().unwrap(); - let address = unlocked_state - .get_rgb_wallet() - .get_address() - .map_err(|e| match_rgb_lib_error(&e, APIError::Unexpected))?; + let address = unlocked_state.rgb_get_address()?; Ok(Json(AddressResponse { address })) } @@ -592,15 +667,12 @@ pub(crate) async fn asset_balance( State(state): State>, WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { - let unlocked_state = check_unlocked(&state)?.clone().unwrap(); + let unlocked_state = state.check_unlocked().await?.clone().unwrap(); let contract_id = ContractId::from_str(&payload.asset_id) .map_err(|_| APIError::InvalidAssetID(payload.asset_id))?; - let balance = unlocked_state - .get_rgb_wallet() - .get_asset_balance(contract_id.to_string()) - .map_err(|e| match_rgb_lib_error(&e, APIError::Unexpected))?; + let balance = unlocked_state.rgb_get_asset_balance(contract_id)?; let ldk_data_dir_path = PathBuf::from(state.static_state.ldk_data_dir.clone()); let mut offchain_outbound = 0; @@ -631,7 +703,7 @@ pub(crate) async fn backup( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let _unlocked_state = check_locked(&state)?; + let _unlocked_state = state.check_locked().await?; let _mnemonic = check_password_validity(&payload.password, &state.static_state.storage_dir_path)?; @@ -650,12 +722,9 @@ pub(crate) async fn backup( pub(crate) async fn btc_balance( State(state): State>, ) -> Result, APIError> { - let unlocked_state = check_unlocked(&state)?.clone().unwrap(); + let unlocked_state = state.check_unlocked().await?.clone().unwrap(); - let btc_balance = unlocked_state - .get_rgb_wallet() - .get_btc_balance(unlocked_state.rgb_online.clone()) - .map_err(|e| match_rgb_lib_error(&e, APIError::Unexpected))?; + let btc_balance = unlocked_state.rgb_get_btc_balance()?; let vanilla = BtcBalance { settled: btc_balance.vanilla.settled, @@ -677,7 +746,7 @@ pub(crate) async fn change_password( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let _unlocked_state = check_locked(&state)?; + let _unlocked_state = state.check_locked().await?; check_password_strength(payload.new_password.clone())?; @@ -700,7 +769,7 @@ pub(crate) async fn close_channel( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let unlocked_state = check_unlocked(&state)?.clone().unwrap(); + let unlocked_state = state.check_unlocked().await?.clone().unwrap(); let channel_id_vec = hex_str_to_vec(&payload.channel_id); if channel_id_vec.is_none() || channel_id_vec.as_ref().unwrap().len() != 32 { @@ -746,7 +815,7 @@ pub(crate) async fn connect_peer( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let unlocked_state = check_unlocked(&state)?.clone().unwrap(); + let unlocked_state = state.check_unlocked().await?.clone().unwrap(); let (peer_pubkey, peer_addr) = parse_peer_info(payload.peer_pubkey_and_addr.to_string())?; @@ -763,18 +832,14 @@ pub(crate) async fn create_utxos( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let unlocked_state = check_unlocked(&state)?.clone().unwrap(); + let unlocked_state = state.check_unlocked().await?.clone().unwrap(); - unlocked_state - .get_rgb_wallet() - .create_utxos( - unlocked_state.rgb_online.clone(), - payload.up_to, - Some(payload.num.unwrap_or(UTXO_NUM)), - Some(UTXO_SIZE_SAT), - FEE_RATE, - ) - .map_err(|e| match_rgb_lib_error(&e, APIError::Unexpected))?; + unlocked_state.rgb_create_utxos( + payload.up_to, + payload.num.unwrap_or(UTXO_NUM), + UTXO_SIZE_SAT, + FEE_RATE, + )?; tracing::debug!("UTXO creation complete"); Ok(Json(EmptyResponse {})) @@ -840,7 +905,7 @@ pub(crate) async fn disconnect_peer( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let unlocked_state = check_unlocked(&state)?.clone().unwrap(); + let unlocked_state = state.check_unlocked().await?.clone().unwrap(); let peer_pubkey = match bitcoin::secp256k1::PublicKey::from_str(&payload.peer_pubkey) { Ok(pubkey) => pubkey, @@ -879,7 +944,7 @@ pub(crate) async fn init( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let _unlocked_state = check_locked(&state)?; + let _unlocked_state = state.check_locked().await?; check_password_strength(payload.password.clone())?; @@ -901,17 +966,15 @@ pub(crate) async fn invoice_status( State(state): State>, WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { - let unlocked_state = check_unlocked(&state)?.clone().unwrap(); + let unlocked_state = state.check_unlocked().await?.clone().unwrap(); let invoice = match Bolt11Invoice::from_str(&payload.invoice) { Err(e) => return Err(APIError::InvalidInvoice(e.to_string())), Ok(v) => v, }; - let inbound = unlocked_state.get_inbound_payments(); - let payment_hash = PaymentHash(invoice.payment_hash().into_inner()); - let status = match inbound.payments.get(&payment_hash) { + let status = match unlocked_state.inbound_payments().get(&payment_hash) { Some(v) => match v.status { HTLCStatus::Pending if invoice.is_expired() => InvoiceStatus::Expired, HTLCStatus::Pending => InvoiceStatus::Pending, @@ -929,18 +992,14 @@ pub(crate) async fn issue_asset( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let unlocked_state = check_unlocked(&state)?.clone().unwrap(); - - let asset = unlocked_state - .get_rgb_wallet() - .issue_asset_nia( - unlocked_state.rgb_online.clone(), - payload.ticker, - payload.name, - payload.precision, - payload.amounts, - ) - .map_err(|e| match_rgb_lib_error(&e, APIError::FailedIssuingAsset(e.to_string())))?; + let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + + let asset = unlocked_state.rgb_issue_asset_nia( + payload.ticker, + payload.name, + payload.precision, + payload.amounts, + )?; Ok(Json(IssueAssetResponse { asset_id: asset.asset_id, @@ -954,7 +1013,7 @@ pub(crate) async fn keysend( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let unlocked_state = check_unlocked(&state)?.clone().unwrap(); + let unlocked_state = state.check_unlocked().await?.clone().unwrap(); let dest_pubkey = match hex_str_to_compressed_pubkey(&payload.dest_pubkey) { Some(pk) => pk, @@ -989,8 +1048,7 @@ pub(crate) async fn keysend( PaymentParameters::for_keysend(dest_pubkey, 40, false), amt_msat, ); - let mut outbound = unlocked_state.get_outbound_payments(); - outbound.payments.insert( + unlocked_state.add_outbound_payment( payment_id, PaymentInfo { preimage: None, @@ -999,10 +1057,6 @@ pub(crate) async fn keysend( amt_msat: Some(amt_msat), }, ); - unlocked_state - .fs_store - .write("", "", OUTBOUND_PAYMENTS_FNAME, &outbound.encode()) - .unwrap(); let status = match unlocked_state .channel_manager .send_spontaneous_payment_with_retry( @@ -1022,11 +1076,7 @@ pub(crate) async fn keysend( } Err(e) => { tracing::error!("ERROR: failed to send payment: {:?}", e); - outbound.payments.get_mut(&payment_id).unwrap().status = HTLCStatus::Failed; - unlocked_state - .fs_store - .write("", "", OUTBOUND_PAYMENTS_FNAME, &outbound.encode()) - .unwrap(); + unlocked_state.update_outbound_payment_status(payment_id, HTLCStatus::Failed); HTLCStatus::Failed } }; @@ -1043,12 +1093,9 @@ pub(crate) async fn keysend( pub(crate) async fn list_assets( State(state): State>, ) -> Result, APIError> { - let unlocked_state = check_unlocked(&state)?.clone().unwrap(); + let unlocked_state = state.check_unlocked().await?.clone().unwrap(); - let rgb_assets = unlocked_state - .get_rgb_wallet() - .list_assets(vec![]) - .map_err(|e| match_rgb_lib_error(&e, APIError::Unexpected))?; + let rgb_assets = unlocked_state.rgb_list_assets()?; let mut assets = vec![]; for asset in rgb_assets.nia.unwrap() { @@ -1068,7 +1115,7 @@ pub(crate) async fn list_assets( pub(crate) async fn list_channels( State(state): State>, ) -> Result, APIError> { - let unlocked_state = check_unlocked(&state)?.clone().unwrap(); + let unlocked_state = state.check_unlocked().await?.clone().unwrap(); let mut channels = vec![]; for chan_info in unlocked_state.channel_manager.list_channels() { @@ -1125,14 +1172,14 @@ pub(crate) async fn list_channels( pub(crate) async fn list_payments( State(state): State>, ) -> Result, APIError> { - let unlocked_state = check_unlocked(&state)?.clone().unwrap(); + let unlocked_state = state.check_unlocked().await?.clone().unwrap(); - let inbound = unlocked_state.get_inbound_payments(); - let outbound = unlocked_state.get_outbound_payments(); + let inbound_payments = unlocked_state.inbound_payments(); + let outbound_payments = unlocked_state.outbound_payments(); let mut payments = vec![]; let ldk_data_dir_path = Path::new(&state.static_state.ldk_data_dir); - for (payment_hash, payment_info) in &inbound.payments { + for (payment_hash, payment_info) in &inbound_payments { let rgb_payment_info_path = get_rgb_payment_info_path(payment_hash, ldk_data_dir_path); let (asset_amount, asset_id) = if rgb_payment_info_path.exists() { let rgb_payment_info = parse_rgb_payment_info(&rgb_payment_info_path); @@ -1153,7 +1200,7 @@ pub(crate) async fn list_payments( }) } - for (payment_id, payment_info) in &outbound.payments { + for (payment_id, payment_info) in &outbound_payments { let payment_hash = PaymentHash(payment_id.0); let rgb_payment_info_path = get_rgb_payment_info_path(&payment_hash, ldk_data_dir_path); let (asset_amount, asset_id) = if rgb_payment_info_path.exists() { @@ -1181,7 +1228,7 @@ pub(crate) async fn list_payments( pub(crate) async fn list_peers( State(state): State>, ) -> Result, APIError> { - let unlocked_state = check_unlocked(&state)?.clone().unwrap(); + let unlocked_state = state.check_unlocked().await?.clone().unwrap(); let mut peers = vec![]; for (pubkey, _) in unlocked_state.peer_manager.get_peer_node_ids() { @@ -1196,14 +1243,10 @@ pub(crate) async fn list_peers( pub(crate) async fn list_transactions( State(state): State>, ) -> Result, APIError> { - let unlocked_state = check_unlocked(&state)?.clone().unwrap(); + let unlocked_state = state.check_unlocked().await?.clone().unwrap(); let mut transactions = vec![]; - for tx in unlocked_state - .get_rgb_wallet() - .list_transactions(Some(unlocked_state.rgb_online.clone())) - .map_err(|e| match_rgb_lib_error(&e, APIError::Unexpected))? - { + for tx in unlocked_state.rgb_list_transactions()? { transactions.push(Transaction { transaction_type: match tx.transaction_type { rgb_lib::TransactionType::RgbSend => TransactionType::RgbSend, @@ -1229,14 +1272,10 @@ pub(crate) async fn list_transfers( State(state): State>, WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { - let unlocked_state = check_unlocked(&state)?.clone().unwrap(); + let unlocked_state = state.check_unlocked().await?.clone().unwrap(); let mut transfers = vec![]; - for transfer in unlocked_state - .get_rgb_wallet() - .list_transfers(Some(payload.asset_id)) - .map_err(|e| match_rgb_lib_error(&e, APIError::Unexpected))? - { + for transfer in unlocked_state.rgb_list_transfers(payload.asset_id)? { transfers.push(Transfer { idx: transfer.idx, created_at: transfer.created_at, @@ -1280,14 +1319,10 @@ pub(crate) async fn list_transfers( pub(crate) async fn list_unspents( State(state): State>, ) -> Result, APIError> { - let unlocked_state = check_unlocked(&state)?.clone().unwrap(); + let unlocked_state = state.check_unlocked().await?.clone().unwrap(); let mut unspents = vec![]; - for unspent in unlocked_state - .get_rgb_wallet() - .list_unspents(Some(unlocked_state.rgb_online.clone()), false) - .map_err(|e| match_rgb_lib_error(&e, APIError::Unexpected))? - { + for unspent in unlocked_state.rgb_list_unspents()? { unspents.push(Unspent { utxo: Utxo { outpoint: unspent.utxo.outpoint.to_string(), @@ -1313,7 +1348,7 @@ pub(crate) async fn ln_invoice( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let unlocked_state = check_unlocked(&state)?.clone().unwrap(); + let unlocked_state = state.check_unlocked().await?.clone().unwrap(); let contract_id = if let Some(asset_id) = payload.asset_id { Some(ContractId::from_str(&asset_id).map_err(|_| APIError::InvalidAssetID(asset_id))?) @@ -1327,7 +1362,6 @@ pub(crate) async fn ln_invoice( ))); } - let mut inbound = unlocked_state.get_inbound_payments(); let currency = match state.static_state.network { Network::Bitcoin => Currency::Bitcoin, Network::Testnet => Currency::BitcoinTestnet, @@ -1351,7 +1385,7 @@ pub(crate) async fn ln_invoice( }; let payment_hash = PaymentHash((*invoice.payment_hash()).into_inner()); - inbound.payments.insert( + unlocked_state.add_inbound_payment( payment_hash, PaymentInfo { preimage: None, @@ -1360,10 +1394,6 @@ pub(crate) async fn ln_invoice( amt_msat: payload.amt_msat, }, ); - unlocked_state - .fs_store - .write("", "", INBOUND_PAYMENTS_FNAME, &inbound.encode()) - .unwrap(); Ok(Json(LNInvoiceResponse { invoice: invoice.to_string(), @@ -1377,13 +1407,13 @@ pub(crate) async fn lock( ) -> Result, APIError> { tracing::info!("Lock started"); no_cancel(async move { - match check_unlocked(&state) { + match state.check_unlocked().await { Ok(unlocked_state) => { - *state.get_changing_state() = true; + state.update_changing_state(true); drop(unlocked_state); } Err(e) => { - *state.get_changing_state() = false; + state.update_changing_state(false); return Err(e); } } @@ -1392,13 +1422,11 @@ pub(crate) async fn lock( stop_ldk(state.clone()).await; tracing::debug!("LDK stopped"); - let mut unlocked_app_state = state.get_unlocked_app_state(); - *unlocked_app_state = None; + state.update_unlocked_app_state(None).await; - let mut ldk_background_services = state.get_ldk_background_services(); - *ldk_background_services = None; + state.update_ldk_background_services(None); - *state.get_changing_state() = false; + state.update_changing_state(false); tracing::info!("Lock completed"); Ok(Json(EmptyResponse {})) @@ -1409,7 +1437,7 @@ pub(crate) async fn lock( pub(crate) async fn network_info( State(state): State>, ) -> Result, APIError> { - let unlocked_state = check_unlocked(&state)?.clone().unwrap(); + let unlocked_state = state.check_unlocked().await?.clone().unwrap(); let best_block = unlocked_state.channel_manager.current_best_block(); @@ -1422,7 +1450,7 @@ pub(crate) async fn network_info( pub(crate) async fn node_info( State(state): State>, ) -> Result, APIError> { - let unlocked_state = check_unlocked(&state)?.clone().unwrap(); + let unlocked_state = state.check_unlocked().await?.clone().unwrap(); let chans = unlocked_state.channel_manager.list_channels(); @@ -1440,7 +1468,7 @@ pub(crate) async fn open_channel( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let unlocked_state = check_unlocked(&state)?.clone().unwrap(); + let unlocked_state = state.check_unlocked().await?.clone().unwrap(); let (peer_pubkey, peer_addr) = parse_peer_info(payload.peer_pubkey_and_addr.to_string())?; @@ -1477,10 +1505,7 @@ pub(crate) async fn open_channel( connect_peer_if_necessary(peer_pubkey, peer_addr, unlocked_state.peer_manager.clone()) .await?; - let balance = unlocked_state - .get_rgb_wallet() - .get_asset_balance(contract_id.to_string()) - .map_err(|e| match_rgb_lib_error(&e, APIError::Unexpected))?; + let balance = unlocked_state.rgb_get_asset_balance(contract_id)?; let spendable_rgb_amount = balance.spendable; @@ -1550,19 +1575,13 @@ pub(crate) async fn refresh_transfers( State(state): State>, ) -> Result, APIError> { no_cancel(async move { - let unlocked_state = check_unlocked(&state)?.clone().unwrap(); + let unlocked_state = state.check_unlocked().await?.clone().unwrap(); - tokio::task::spawn_blocking(move || { - unlocked_state - .get_rgb_wallet() - .refresh(unlocked_state.rgb_online.clone(), None, vec![]) - .map_err(|e| match_rgb_lib_error(&e, APIError::Unexpected)) - }) - .await - .unwrap()?; + tokio::task::spawn_blocking(move || unlocked_state.rgb_refresh()) + .await + .unwrap()?; tracing::info!("Refresh complete"); - Ok(Json(EmptyResponse {})) }) .await @@ -1573,7 +1592,7 @@ pub(crate) async fn restore( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let _unlocked_state = check_locked(&state)?; + let _unlocked_state = state.check_locked().await?; let mnemonic_path = get_mnemonic_path(&state.static_state.storage_dir_path); check_already_initialized(&mnemonic_path)?; @@ -1597,18 +1616,13 @@ pub(crate) async fn rgb_invoice( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let unlocked_state = check_unlocked(&state)?.clone().unwrap(); + let unlocked_state = state.check_unlocked().await?.clone().unwrap(); - let receive_data = unlocked_state - .get_rgb_wallet() - .blind_receive( - payload.asset_id, - None, - None, - vec![state.static_state.proxy_endpoint.clone()], - payload.min_confirmations, - ) - .map_err(|e| match_rgb_lib_error(&e, APIError::Unexpected))?; + let receive_data = unlocked_state.rgb_blind_receive( + payload.asset_id, + vec![state.static_state.proxy_endpoint.clone()], + payload.min_confirmations, + )?; Ok(Json(RgbInvoiceResponse { recipient_id: receive_data.recipient_id, @@ -1624,7 +1638,7 @@ pub(crate) async fn send_asset( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let unlocked_state = check_unlocked(&state)?.clone().unwrap(); + let unlocked_state = state.check_unlocked().await?.clone().unwrap(); let secret_seal = SecretSeal::from_str(&payload.blinded_utxo) .map_err(|e| APIError::InvalidBlindedUTXO(e.to_string()))?; @@ -1637,16 +1651,12 @@ pub(crate) async fn send_asset( }; let txid = tokio::task::spawn_blocking(move || { - unlocked_state - .get_rgb_wallet() - .send( - unlocked_state.rgb_online.clone(), - recipient_map, - payload.donation, - FEE_RATE, - payload.min_confirmations, - ) - .map_err(|e| match_rgb_lib_error(&e, APIError::Unexpected)) + unlocked_state.rgb_send( + recipient_map, + payload.donation, + FEE_RATE, + payload.min_confirmations, + ) }) .await .unwrap()?; @@ -1661,17 +1671,10 @@ pub(crate) async fn send_btc( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let unlocked_state = check_unlocked(&state)?.clone().unwrap(); - - let txid = unlocked_state - .get_rgb_wallet() - .send_btc( - unlocked_state.rgb_online.clone(), - payload.address, - payload.amount, - payload.fee_rate, - ) - .map_err(|e| match_rgb_lib_error(&e, APIError::Unexpected))?; + let unlocked_state = state.check_unlocked().await?.clone().unwrap(); + + let txid = + unlocked_state.rgb_send_btc(payload.address, payload.amount, payload.fee_rate)?; Ok(Json(SendBtcResponse { txid })) }) @@ -1683,7 +1686,7 @@ pub(crate) async fn send_onion_message( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let unlocked_state = check_unlocked(&state)?.clone().unwrap(); + let unlocked_state = state.check_unlocked().await?.clone().unwrap(); if payload.node_ids.is_empty() { return Err(APIError::InvalidNodeIds(s!( @@ -1753,7 +1756,7 @@ pub(crate) async fn send_payment( WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { no_cancel(async move { - let unlocked_state = check_unlocked(&state)?.clone().unwrap(); + let unlocked_state = state.check_unlocked().await?.clone().unwrap(); let invoice = match Bolt11Invoice::from_str(&payload.invoice) { Err(e) => return Err(APIError::InvalidInvoice(e.to_string())), @@ -1796,8 +1799,7 @@ pub(crate) async fn send_payment( let payment_id = PaymentId((*invoice.payment_hash()).into_inner()); let payment_secret = *invoice.payment_secret(); - let mut outbound = unlocked_state.get_outbound_payments(); - outbound.payments.insert( + unlocked_state.add_outbound_payment( payment_id, PaymentInfo { preimage: None, @@ -1806,10 +1808,6 @@ pub(crate) async fn send_payment( amt_msat: invoice.amount_milli_satoshis(), }, ); - unlocked_state - .fs_store - .write("", "", OUTBOUND_PAYMENTS_FNAME, &outbound.encode()) - .unwrap(); let status = match pay_invoice( &invoice, @@ -1828,11 +1826,7 @@ pub(crate) async fn send_payment( } Err(e) => { tracing::error!("ERROR: failed to send payment: {:?}", e); - outbound.payments.get_mut(&payment_id).unwrap().status = HTLCStatus::Failed; - unlocked_state - .fs_store - .write("", "", OUTBOUND_PAYMENTS_FNAME, &outbound.encode()) - .unwrap(); + unlocked_state.update_outbound_payment_status(payment_id, HTLCStatus::Failed); HTLCStatus::Failed } }; @@ -1851,9 +1845,7 @@ pub(crate) async fn shutdown( ) -> Result, APIError> { no_cancel(async move { let _unlocked_app_state = state.get_unlocked_app_state(); - if *state.get_changing_state() { - return Err(APIError::ChangingState); - } + state.check_changing_state()?; state.cancel_token.cancel(); Ok(Json(EmptyResponse {})) @@ -1865,7 +1857,7 @@ pub(crate) async fn sign_message( State(state): State>, WithRejection(Json(payload), _): WithRejection, APIError>, ) -> Result, APIError> { - let unlocked_state = check_unlocked(&state)?.clone().unwrap(); + let unlocked_state = state.check_unlocked().await?.clone().unwrap(); let message = payload.message.trim(); let signed_message = lightning::util::message_signing::sign( @@ -1883,9 +1875,9 @@ pub(crate) async fn unlock( ) -> Result, APIError> { tracing::info!("Unlock started"); no_cancel(async move { - match check_locked(&state) { + match state.check_locked().await { Ok(unlocked_state) => { - *state.get_changing_state() = true; + state.update_changing_state(true); drop(unlocked_state); } Err(e) => { @@ -1899,7 +1891,7 @@ pub(crate) async fn unlock( ) { Ok(mnemonic) => mnemonic, Err(e) => { - *state.get_changing_state() = false; + state.update_changing_state(false); return Err(e); } }; @@ -1909,19 +1901,19 @@ pub(crate) async fn unlock( match start_ldk(state.clone(), mnemonic).await { Ok((nlbs, nuap)) => (nlbs, nuap), Err(e) => { - *state.get_changing_state() = false; + state.update_changing_state(false); return Err(e); } }; tracing::debug!("LDK started"); - let mut unlocked_app_state = state.get_unlocked_app_state(); - *unlocked_app_state = Some(new_unlocked_app_state); + state + .update_unlocked_app_state(Some(new_unlocked_app_state)) + .await; - let mut ldk_background_services = state.get_ldk_background_services(); - *ldk_background_services = Some(new_ldk_background_services); + state.update_ldk_background_services(Some(new_ldk_background_services)); - *state.get_changing_state() = false; + state.update_changing_state(false); tracing::info!("Unlock completed"); Ok(Json(EmptyResponse {})) diff --git a/src/utils.rs b/src/utils.rs index 4898698..ff112ac 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -23,6 +23,7 @@ use std::{ sync::{Arc, Mutex, MutexGuard}, time::Duration, }; +use tokio::sync::{Mutex as TokioMutex, MutexGuard as TokioMutexGuard}; use tokio_util::sync::CancellationToken; use crate::{ @@ -50,7 +51,7 @@ const PASSWORD_MIN_LENGTH: u8 = 8; pub(crate) struct AppState { pub(crate) static_state: Arc, pub(crate) cancel_token: CancellationToken, - pub(crate) unlocked_app_state: Arc>>>, + pub(crate) unlocked_app_state: Arc>>>, pub(crate) ldk_background_services: Arc>>, pub(crate) changing_state: Mutex, } @@ -64,8 +65,10 @@ impl AppState { self.ldk_background_services.lock().unwrap() } - pub(crate) fn get_unlocked_app_state(&self) -> MutexGuard>> { - self.unlocked_app_state.lock().unwrap() + pub(crate) async fn get_unlocked_app_state( + &self, + ) -> TokioMutexGuard>> { + self.unlocked_app_state.lock().await } } @@ -137,32 +140,6 @@ pub(crate) fn check_already_initialized(mnemonic_path: &str) -> Result<(), APIEr Ok(()) } -pub(crate) fn check_locked( - state: &Arc, -) -> Result>>, APIError> { - let unlocked_app_state = state.unlocked_app_state.lock().unwrap(); - if unlocked_app_state.is_some() { - Err(APIError::UnlockedNode) - } else if *state.get_changing_state() { - Err(APIError::ChangingState) - } else { - Ok(unlocked_app_state) - } -} - -pub(crate) fn check_unlocked( - state: &Arc, -) -> Result>>, APIError> { - let unlocked_app_state = state.unlocked_app_state.lock().unwrap(); - if unlocked_app_state.is_none() { - Err(APIError::LockedNode) - } else if *state.get_changing_state() { - Err(APIError::ChangingState) - } else { - Ok(unlocked_app_state) - } -} - pub(crate) fn check_password_strength(password: String) -> Result<(), APIError> { if password.len() < PASSWORD_MIN_LENGTH as usize { return Err(APIError::InvalidPassword(format!( @@ -428,7 +405,7 @@ pub(crate) async fn start_daemon(args: LdkUserInfo) -> Result, App Ok(Arc::new(AppState { static_state, cancel_token, - unlocked_app_state: Arc::new(Mutex::new(None)), + unlocked_app_state: Arc::new(TokioMutex::new(None)), ldk_background_services: Arc::new(Mutex::new(None)), changing_state: Mutex::new(false), }))