Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize update_api_cache #703

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions src/components/abciapp/src/abci/server/callback/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,24 +413,27 @@ pub fn end_block(
}

pub fn commit(s: &mut ABCISubmissionServer, req: &RequestCommit) -> ResponseCommit {
let la = s.la.write();
let mut state = la.get_committed_state().write();
let state = s.la.read().borrowable_ledger_state();

// will change `struct LedgerStatus`
let td_height = TENDERMINT_BLOCK_HEIGHT.load(Ordering::Relaxed);
state.set_tendermint_height(td_height as u64);
state.write().set_tendermint_height(td_height as u64);

// cache last block for QueryServer
pnk!(api_cache::update_api_cache(&mut state));
pnk!(api_cache::update_api_cache(state.clone()));

// snapshot them finally
let path = format!("{}/{}", &CFG.ledger_dir, &state.get_status().snapshot_file);
pnk!(serde_json::to_vec(&state.get_status())
let path = format!(
"{}/{}",
&CFG.ledger_dir,
&state.read().get_status().snapshot_file
);
pnk!(serde_json::to_vec(&state.read().get_status())
.c(d!())
.and_then(|s| fs::write(&path, s).c(d!(path))));

let mut r = ResponseCommit::new();
let la_hash = state.get_state_commitment().0.as_ref().to_vec();
let la_hash = state.read().get_state_commitment().0.as_ref().to_vec();
let cs_hash = s.account_base_app.write().commit(req).data;

if CFG.checkpoint.disable_evm_block_height < td_height
Expand Down
1 change: 1 addition & 0 deletions src/components/abciapp/src/api/submission_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ where
.apply_transaction(&mut block, txn_effect)
.c(d!("Failed to apply transaction"))
});

match temp_sid {
Ok(temp_sid) => {
self.pending_txns.push((temp_sid, handle.clone(), txn));
Expand Down
90 changes: 60 additions & 30 deletions src/ledger/src/store/api_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
//! # Cached data for APIs
//!

use std::sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
};
use {
crate::{
data_model::{
Expand All @@ -16,12 +20,19 @@ use {
},
fbnc::{new_mapx, new_mapxnk, Mapx, Mapxnk},
globutils::wallet,
lazy_static::lazy_static,
parking_lot::RwLock,
ruc::*,
serde::{Deserialize, Serialize},
std::collections::HashSet,
zei::xfr::{sig::XfrPublicKey, structs::OwnerMemo},
};

lazy_static! {
static ref CHECK_LOST_DATA: AtomicBool = AtomicBool::new(false);
static ref UPDATE_API_CACHE_COUNTER: AtomicU64 = AtomicU64::new(0);
}

type Issuances = Vec<(TxOutput, Option<OwnerMemo>)>;

/// Used in APIs
Expand Down Expand Up @@ -314,12 +325,12 @@ pub fn get_transferred_nonconfidential_assets(
}

/// check the lost data
pub fn check_lost_data(ledger: &mut LedgerState) -> Result<()> {
pub fn check_lost_data(arc_ledger: Arc<RwLock<LedgerState>>) -> Result<()> {
// check the lost txn sids
let cur_txn_sid = ledger.get_next_txn().0;
let api_cache_opt = ledger.api_cache.as_mut();

let cur_txn_sid = arc_ledger.read().get_next_txn().0;
let mut last_txn_sid: usize = 0;
if let Some(api_cache) = api_cache_opt {
if let Some(api_cache) = arc_ledger.read().api_cache.as_ref() {
if let Some(sid) = api_cache.last_sid.get(&"last_txn_sid".to_string()) {
last_txn_sid = sid as usize;
};
Expand All @@ -329,6 +340,8 @@ pub fn check_lost_data(ledger: &mut LedgerState) -> Result<()> {

if last_txn_sid < cur_txn_sid {
for index in last_txn_sid..cur_txn_sid {
let mut ledger = arc_ledger.write();

if !ledger
.api_cache
.as_mut()
Expand Down Expand Up @@ -365,10 +378,11 @@ pub fn check_lost_data(ledger: &mut LedgerState) -> Result<()> {
}

// check the lost memos
let cur_txo_sid = ledger.get_next_txo().0;
let last_txo_sid_opt = ledger
let cur_txo_sid = arc_ledger.read().get_next_txo().0;
let last_txo_sid_opt = arc_ledger
.read()
.api_cache
.as_mut()
.as_ref()
.unwrap()
.last_sid
.get(&"last_txo_sid".to_string());
Expand All @@ -380,6 +394,7 @@ pub fn check_lost_data(ledger: &mut LedgerState) -> Result<()> {

if last_txo_sid < cur_txo_sid {
for index in last_txo_sid..cur_txo_sid {
let mut ledger = arc_ledger.write();
if !ledger
.api_cache
.as_mut()
Expand All @@ -388,6 +403,7 @@ pub fn check_lost_data(ledger: &mut LedgerState) -> Result<()> {
.contains_key(&TxoSID(index))
{
let utxo_opt = ledger.get_utxo(TxoSID(index));

if let Some(utxo) = utxo_opt {
let ftx = ledger
.get_transaction_light(
Expand Down Expand Up @@ -455,17 +471,28 @@ pub fn check_lost_data(ledger: &mut LedgerState) -> Result<()> {
.insert("last_txo_sid".to_string(), index as u64);
}
}

Ok(())
}

/// update the data of QueryServer when we create a new block in ABCI
pub fn update_api_cache(ledger: &mut LedgerState) -> Result<()> {
pub fn update_api_cache(arc_ledger: Arc<RwLock<LedgerState>>) -> Result<()> {
if !*KEEP_HIST {
return Ok(());
}

check_lost_data(ledger)?;
let c = UPDATE_API_CACHE_COUNTER.fetch_add(1, Ordering::Acquire);

if !CHECK_LOST_DATA.load(Ordering::Acquire) && c % 32 == 0 {
CHECK_LOST_DATA.store(true, Ordering::Release);
let ledger_cloned = arc_ledger.clone();
std::thread::spawn(move || {
pnk!(check_lost_data(ledger_cloned));
CHECK_LOST_DATA.store(false, Ordering::Release);
});
}

let mut ledger = arc_ledger.write();
ledger.api_cache.as_mut().unwrap().cache_hist_data();

let block = if let Some(b) = ledger.blocks.last() {
Expand All @@ -474,33 +501,36 @@ pub fn update_api_cache(ledger: &mut LedgerState) -> Result<()> {
return Ok(());
};

let prefix = ledger.api_cache.as_mut().unwrap().prefix.clone();

let prefix = ledger.api_cache.as_ref().unwrap().prefix.clone();
// Update ownership status
for (txn_sid, txo_sids) in block.txns.iter().map(|v| (v.tx_id, v.txo_ids.as_slice()))
{
let curr_txn = ledger.get_transaction_light(txn_sid).c(d!())?.txn;
// get the transaction, ownership addresses, and memos associated with each transaction
let (addresses, owner_memos) = {
let addresses: Vec<XfrAddress> = txo_sids
.iter()
.map(|sid| XfrAddress {
key: ((ledger
.get_utxo_light(*sid)
.or_else(|| ledger.get_spent_utxo_light(*sid))
.unwrap()
.utxo)
.0)
.record
.public_key,
})
.collect();

let mut addresses: Vec<XfrAddress> = vec![];

for sid in txo_sids.iter() {
let key_op = {
if let Some(utxo) = ledger.get_utxo_light(*sid) {
Some(utxo.utxo.0.record.public_key)
} else {
ledger
.get_spent_utxo_light(*sid)
.map(|u| u.utxo.0.record.public_key)
}
};
if let Some(key) = key_op {
addresses.push(XfrAddress { key });
}
}
let owner_memos = curr_txn.get_owner_memos_ref();

(addresses, owner_memos)
};

// Update related addresses
// Apply classify_op for each operation in curr_txn
let classify_op = |op: &Operation| {
match op {
Operation::Claim(i) => {
Expand All @@ -526,8 +556,8 @@ pub fn update_api_cache(ledger: &mut LedgerState) -> Result<()> {
let key = XfrAddress {
key: me.utxo.record.public_key,
};
#[allow(unused_mut)]
let mut hist = ledger

ledger
.api_cache
.as_mut()
.unwrap()
Expand All @@ -539,8 +569,8 @@ pub fn update_api_cache(ledger: &mut LedgerState) -> Result<()> {
prefix,
key.to_base64()
))
});
hist.insert(i.height, me.clone());
})
.insert(i.height, me.clone());
}),
_ => { /* filter more operations before this line */ }
};
Expand Down Expand Up @@ -586,7 +616,7 @@ pub fn update_api_cache(ledger: &mut LedgerState) -> Result<()> {
}

// Add created asset
for op in &curr_txn.body.operations {
for op in curr_txn.body.operations.iter() {
match op {
Operation::DefineAsset(define_asset) => {
ledger
Expand Down