diff --git a/src/components/abciapp/src/abci/server/callback/mod.rs b/src/components/abciapp/src/abci/server/callback/mod.rs index 88145a901..229302d8d 100644 --- a/src/components/abciapp/src/abci/server/callback/mod.rs +++ b/src/components/abciapp/src/abci/server/callback/mod.rs @@ -413,24 +413,27 @@ pub fn end_block( } pub fn commit(s: &mut ABCISubmissionServer, req: &RequestCommit) -> ResponseCommit { - let la = s.la.write(); - let mut state = la.get_committed_state().write(); + let state = s.la.read().borrowable_ledger_state(); // will change `struct LedgerStatus` let td_height = TENDERMINT_BLOCK_HEIGHT.load(Ordering::Relaxed); - state.set_tendermint_height(td_height as u64); + state.write().set_tendermint_height(td_height as u64); // cache last block for QueryServer - pnk!(api_cache::update_api_cache(&mut state)); + pnk!(api_cache::update_api_cache(state.clone())); // snapshot them finally - let path = format!("{}/{}", &CFG.ledger_dir, &state.get_status().snapshot_file); - pnk!(serde_json::to_vec(&state.get_status()) + let path = format!( + "{}/{}", + &CFG.ledger_dir, + &state.read().get_status().snapshot_file + ); + pnk!(serde_json::to_vec(&state.read().get_status()) .c(d!()) .and_then(|s| fs::write(&path, s).c(d!(path)))); let mut r = ResponseCommit::new(); - let la_hash = state.get_state_commitment().0.as_ref().to_vec(); + let la_hash = state.read().get_state_commitment().0.as_ref().to_vec(); let cs_hash = s.account_base_app.write().commit(req).data; if CFG.checkpoint.disable_evm_block_height < td_height diff --git a/src/components/abciapp/src/api/submission_server/mod.rs b/src/components/abciapp/src/api/submission_server/mod.rs index 7aed3660f..a6c338156 100644 --- a/src/components/abciapp/src/api/submission_server/mod.rs +++ b/src/components/abciapp/src/api/submission_server/mod.rs @@ -222,6 +222,7 @@ where .apply_transaction(&mut block, txn_effect) .c(d!("Failed to apply transaction")) }); + match temp_sid { Ok(temp_sid) => { self.pending_txns.push((temp_sid, handle.clone(), txn)); diff --git a/src/ledger/src/store/api_cache.rs b/src/ledger/src/store/api_cache.rs index edd82a237..ad2c53a65 100644 --- a/src/ledger/src/store/api_cache.rs +++ b/src/ledger/src/store/api_cache.rs @@ -2,6 +2,10 @@ //! # Cached data for APIs //! +use std::sync::{ + atomic::{AtomicBool, AtomicU64, Ordering}, + Arc, +}; use { crate::{ data_model::{ @@ -16,12 +20,19 @@ use { }, fbnc::{new_mapx, new_mapxnk, Mapx, Mapxnk}, globutils::wallet, + lazy_static::lazy_static, + parking_lot::RwLock, ruc::*, serde::{Deserialize, Serialize}, std::collections::HashSet, zei::xfr::{sig::XfrPublicKey, structs::OwnerMemo}, }; +lazy_static! { + static ref CHECK_LOST_DATA: AtomicBool = AtomicBool::new(false); + static ref UPDATE_API_CACHE_COUNTER: AtomicU64 = AtomicU64::new(0); +} + type Issuances = Vec<(TxOutput, Option)>; /// Used in APIs @@ -314,12 +325,12 @@ pub fn get_transferred_nonconfidential_assets( } /// check the lost data -pub fn check_lost_data(ledger: &mut LedgerState) -> Result<()> { +pub fn check_lost_data(arc_ledger: Arc>) -> Result<()> { // check the lost txn sids - let cur_txn_sid = ledger.get_next_txn().0; - let api_cache_opt = ledger.api_cache.as_mut(); + + let cur_txn_sid = arc_ledger.read().get_next_txn().0; let mut last_txn_sid: usize = 0; - if let Some(api_cache) = api_cache_opt { + if let Some(api_cache) = arc_ledger.read().api_cache.as_ref() { if let Some(sid) = api_cache.last_sid.get(&"last_txn_sid".to_string()) { last_txn_sid = sid as usize; }; @@ -329,6 +340,8 @@ pub fn check_lost_data(ledger: &mut LedgerState) -> Result<()> { if last_txn_sid < cur_txn_sid { for index in last_txn_sid..cur_txn_sid { + let mut ledger = arc_ledger.write(); + if !ledger .api_cache .as_mut() @@ -365,10 +378,11 @@ pub fn check_lost_data(ledger: &mut LedgerState) -> Result<()> { } // check the lost memos - let cur_txo_sid = ledger.get_next_txo().0; - let last_txo_sid_opt = ledger + let cur_txo_sid = arc_ledger.read().get_next_txo().0; + let last_txo_sid_opt = arc_ledger + .read() .api_cache - .as_mut() + .as_ref() .unwrap() .last_sid .get(&"last_txo_sid".to_string()); @@ -380,6 +394,7 @@ pub fn check_lost_data(ledger: &mut LedgerState) -> Result<()> { if last_txo_sid < cur_txo_sid { for index in last_txo_sid..cur_txo_sid { + let mut ledger = arc_ledger.write(); if !ledger .api_cache .as_mut() @@ -388,6 +403,7 @@ pub fn check_lost_data(ledger: &mut LedgerState) -> Result<()> { .contains_key(&TxoSID(index)) { let utxo_opt = ledger.get_utxo(TxoSID(index)); + if let Some(utxo) = utxo_opt { let ftx = ledger .get_transaction_light( @@ -455,17 +471,28 @@ pub fn check_lost_data(ledger: &mut LedgerState) -> Result<()> { .insert("last_txo_sid".to_string(), index as u64); } } + Ok(()) } /// update the data of QueryServer when we create a new block in ABCI -pub fn update_api_cache(ledger: &mut LedgerState) -> Result<()> { +pub fn update_api_cache(arc_ledger: Arc>) -> Result<()> { if !*KEEP_HIST { return Ok(()); } - check_lost_data(ledger)?; + let c = UPDATE_API_CACHE_COUNTER.fetch_add(1, Ordering::Acquire); + if !CHECK_LOST_DATA.load(Ordering::Acquire) && c % 32 == 0 { + CHECK_LOST_DATA.store(true, Ordering::Release); + let ledger_cloned = arc_ledger.clone(); + std::thread::spawn(move || { + pnk!(check_lost_data(ledger_cloned)); + CHECK_LOST_DATA.store(false, Ordering::Release); + }); + } + + let mut ledger = arc_ledger.write(); ledger.api_cache.as_mut().unwrap().cache_hist_data(); let block = if let Some(b) = ledger.blocks.last() { @@ -474,33 +501,36 @@ pub fn update_api_cache(ledger: &mut LedgerState) -> Result<()> { return Ok(()); }; - let prefix = ledger.api_cache.as_mut().unwrap().prefix.clone(); - + let prefix = ledger.api_cache.as_ref().unwrap().prefix.clone(); // Update ownership status for (txn_sid, txo_sids) in block.txns.iter().map(|v| (v.tx_id, v.txo_ids.as_slice())) { let curr_txn = ledger.get_transaction_light(txn_sid).c(d!())?.txn; // get the transaction, ownership addresses, and memos associated with each transaction let (addresses, owner_memos) = { - let addresses: Vec = txo_sids - .iter() - .map(|sid| XfrAddress { - key: ((ledger - .get_utxo_light(*sid) - .or_else(|| ledger.get_spent_utxo_light(*sid)) - .unwrap() - .utxo) - .0) - .record - .public_key, - }) - .collect(); - + let mut addresses: Vec = vec![]; + + for sid in txo_sids.iter() { + let key_op = { + if let Some(utxo) = ledger.get_utxo_light(*sid) { + Some(utxo.utxo.0.record.public_key) + } else { + ledger + .get_spent_utxo_light(*sid) + .map(|u| u.utxo.0.record.public_key) + } + }; + if let Some(key) = key_op { + addresses.push(XfrAddress { key }); + } + } let owner_memos = curr_txn.get_owner_memos_ref(); (addresses, owner_memos) }; + // Update related addresses + // Apply classify_op for each operation in curr_txn let classify_op = |op: &Operation| { match op { Operation::Claim(i) => { @@ -526,8 +556,8 @@ pub fn update_api_cache(ledger: &mut LedgerState) -> Result<()> { let key = XfrAddress { key: me.utxo.record.public_key, }; - #[allow(unused_mut)] - let mut hist = ledger + + ledger .api_cache .as_mut() .unwrap() @@ -539,8 +569,8 @@ pub fn update_api_cache(ledger: &mut LedgerState) -> Result<()> { prefix, key.to_base64() )) - }); - hist.insert(i.height, me.clone()); + }) + .insert(i.height, me.clone()); }), _ => { /* filter more operations before this line */ } }; @@ -586,7 +616,7 @@ pub fn update_api_cache(ledger: &mut LedgerState) -> Result<()> { } // Add created asset - for op in &curr_txn.body.operations { + for op in curr_txn.body.operations.iter() { match op { Operation::DefineAsset(define_asset) => { ledger