Skip to content

Commit

Permalink
optimize update_api_cache, move check_lost_data to backend thread.
Browse files Browse the repository at this point in the history
  • Loading branch information
shaitao committed Nov 7, 2022
1 parent 783a3e6 commit 57327de
Show file tree
Hide file tree
Showing 4 changed files with 2,459 additions and 37 deletions.
17 changes: 10 additions & 7 deletions src/components/abciapp/src/abci/server/callback/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,24 +413,27 @@ pub fn end_block(
}

pub fn commit(s: &mut ABCISubmissionServer, req: &RequestCommit) -> ResponseCommit {
let la = s.la.write();
let mut state = la.get_committed_state().write();
let state = s.la.read().borrowable_ledger_state();

// will change `struct LedgerStatus`
let td_height = TENDERMINT_BLOCK_HEIGHT.load(Ordering::Relaxed);
state.set_tendermint_height(td_height as u64);
state.write().set_tendermint_height(td_height as u64);

// cache last block for QueryServer
pnk!(api_cache::update_api_cache(&mut state));
pnk!(api_cache::update_api_cache(state.clone()));

// snapshot them finally
let path = format!("{}/{}", &CFG.ledger_dir, &state.get_status().snapshot_file);
pnk!(serde_json::to_vec(&state.get_status())
let path = format!(
"{}/{}",
&CFG.ledger_dir,
&state.read().get_status().snapshot_file
);
pnk!(serde_json::to_vec(&state.read().get_status())
.c(d!())
.and_then(|s| fs::write(&path, s).c(d!(path))));

let mut r = ResponseCommit::new();
let la_hash = state.get_state_commitment().0.as_ref().to_vec();
let la_hash = state.read().get_state_commitment().0.as_ref().to_vec();
let cs_hash = s.account_base_app.write().commit(req).data;

if CFG.checkpoint.disable_evm_block_height < td_height
Expand Down
1 change: 1 addition & 0 deletions src/components/abciapp/src/api/submission_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ where
.apply_transaction(&mut block, txn_effect)
.c(d!("Failed to apply transaction"))
});

match temp_sid {
Ok(temp_sid) => {
self.pending_txns.push((temp_sid, handle.clone(), txn));
Expand Down
89 changes: 59 additions & 30 deletions src/ledger/src/store/api_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
//! # Cached data for APIs
//!
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use {
crate::{
data_model::{
Expand All @@ -16,12 +20,18 @@ use {
},
fbnc::{new_mapx, new_mapxnk, Mapx, Mapxnk},
globutils::wallet,
lazy_static::lazy_static,
parking_lot::RwLock,
ruc::*,
serde::{Deserialize, Serialize},
std::collections::HashSet,
zei::xfr::{sig::XfrPublicKey, structs::OwnerMemo},
};

lazy_static! {
static ref CHECK_LOST_DATA: AtomicBool = AtomicBool::new(false);
}

type Issuances = Vec<(TxOutput, Option<OwnerMemo>)>;

/// Used in APIs
Expand Down Expand Up @@ -314,12 +324,12 @@ pub fn get_transferred_nonconfidential_assets(
}

/// check the lost data
pub fn check_lost_data(ledger: &mut LedgerState) -> Result<()> {
pub fn check_lost_data(arc_ledger: Arc<RwLock<LedgerState>>) -> Result<()> {
// check the lost txn sids
let cur_txn_sid = ledger.get_next_txn().0;
let api_cache_opt = ledger.api_cache.as_mut();

let cur_txn_sid = arc_ledger.read().get_next_txn().0;
let mut last_txn_sid: usize = 0;
if let Some(api_cache) = api_cache_opt {
if let Some(api_cache) = arc_ledger.read().api_cache.as_ref() {
if let Some(sid) = api_cache.last_sid.get(&"last_txn_sid".to_string()) {
last_txn_sid = sid as usize;
};
Expand All @@ -329,6 +339,8 @@ pub fn check_lost_data(ledger: &mut LedgerState) -> Result<()> {

if last_txn_sid < cur_txn_sid {
for index in last_txn_sid..cur_txn_sid {
let mut ledger = arc_ledger.write();

if !ledger
.api_cache
.as_mut()
Expand Down Expand Up @@ -365,10 +377,11 @@ pub fn check_lost_data(ledger: &mut LedgerState) -> Result<()> {
}

// check the lost memos
let cur_txo_sid = ledger.get_next_txo().0;
let last_txo_sid_opt = ledger
let cur_txo_sid = arc_ledger.read().get_next_txo().0;
let last_txo_sid_opt = arc_ledger
.read()
.api_cache
.as_mut()
.as_ref()
.unwrap()
.last_sid
.get(&"last_txo_sid".to_string());
Expand All @@ -380,6 +393,7 @@ pub fn check_lost_data(ledger: &mut LedgerState) -> Result<()> {

if last_txo_sid < cur_txo_sid {
for index in last_txo_sid..cur_txo_sid {
let mut ledger = arc_ledger.write();
if !ledger
.api_cache
.as_mut()
Expand All @@ -388,6 +402,7 @@ pub fn check_lost_data(ledger: &mut LedgerState) -> Result<()> {
.contains_key(&TxoSID(index))
{
let utxo_opt = ledger.get_utxo(TxoSID(index));

if let Some(utxo) = utxo_opt {
let ftx = ledger
.get_transaction_light(
Expand Down Expand Up @@ -455,17 +470,28 @@ pub fn check_lost_data(ledger: &mut LedgerState) -> Result<()> {
.insert("last_txo_sid".to_string(), index as u64);
}
}

Ok(())
}

/// update the data of QueryServer when we create a new block in ABCI
pub fn update_api_cache(ledger: &mut LedgerState) -> Result<()> {
pub fn update_api_cache(arc_ledger: Arc<RwLock<LedgerState>>) -> Result<()> {
if !*KEEP_HIST {
return Ok(());
}

check_lost_data(ledger)?;
let is_checking = CHECK_LOST_DATA.load(Ordering::Acquire);

if !is_checking {
CHECK_LOST_DATA.store(true, Ordering::Release);
let ledger_cloned = arc_ledger.clone();
std::thread::spawn(move || {
pnk!(check_lost_data(ledger_cloned));
CHECK_LOST_DATA.store(false, Ordering::Release);
});
}

let mut ledger = arc_ledger.write();
ledger.api_cache.as_mut().unwrap().cache_hist_data();

let block = if let Some(b) = ledger.blocks.last() {
Expand All @@ -474,33 +500,36 @@ pub fn update_api_cache(ledger: &mut LedgerState) -> Result<()> {
return Ok(());
};

let prefix = ledger.api_cache.as_mut().unwrap().prefix.clone();

let prefix = ledger.api_cache.as_ref().unwrap().prefix.clone();
// Update ownership status
for (txn_sid, txo_sids) in block.txns.iter().map(|v| (v.tx_id, v.txo_ids.as_slice()))
{
let curr_txn = ledger.get_transaction_light(txn_sid).c(d!())?.txn;
// get the transaction, ownership addresses, and memos associated with each transaction
let (addresses, owner_memos) = {
let addresses: Vec<XfrAddress> = txo_sids
.iter()
.map(|sid| XfrAddress {
key: ((ledger
.get_utxo_light(*sid)
.or_else(|| ledger.get_spent_utxo_light(*sid))
.unwrap()
.utxo)
.0)
.record
.public_key,
})
.collect();

let mut addresses: Vec<XfrAddress> = vec![];

for sid in txo_sids.iter() {
let key_op = {
if let Some(utxo) = ledger.get_utxo_light(*sid) {
Some(utxo.utxo.0.record.public_key)
} else {
ledger
.get_spent_utxo_light(*sid)
.map(|u| u.utxo.0.record.public_key)
}
};
if let Some(key) = key_op {
addresses.push(XfrAddress { key });
}
}
let owner_memos = curr_txn.get_owner_memos_ref();

(addresses, owner_memos)
};

// Update related addresses
// Apply classify_op for each operation in curr_txn
let classify_op = |op: &Operation| {
match op {
Operation::Claim(i) => {
Expand All @@ -526,8 +555,8 @@ pub fn update_api_cache(ledger: &mut LedgerState) -> Result<()> {
let key = XfrAddress {
key: me.utxo.record.public_key,
};
#[allow(unused_mut)]
let mut hist = ledger

ledger
.api_cache
.as_mut()
.unwrap()
Expand All @@ -539,8 +568,8 @@ pub fn update_api_cache(ledger: &mut LedgerState) -> Result<()> {
prefix,
key.to_base64()
))
});
hist.insert(i.height, me.clone());
})
.insert(i.height, me.clone());
}),
_ => { /* filter more operations before this line */ }
};
Expand Down Expand Up @@ -586,7 +615,7 @@ pub fn update_api_cache(ledger: &mut LedgerState) -> Result<()> {
}

// Add created asset
for op in &curr_txn.body.operations {
for op in curr_txn.body.operations.iter() {
match op {
Operation::DefineAsset(define_asset) => {
ledger
Expand Down
Loading

0 comments on commit 57327de

Please sign in to comment.