diff --git a/.env b/.env index 7a4d3c29..082123b4 100644 --- a/.env +++ b/.env @@ -2,7 +2,7 @@ RUN_MODE=development # development or staging RUST_BACKTRACE=1 RUST_LOG=rollup_state_manager=debug,info -NTXS=2 +TX_SLOT_NUMS="2, 16, 64, 512" BALANCELEVELS=3 ORDERLEVELS=3 ACCOUNTLEVELS=5 diff --git a/Cargo.lock b/Cargo.lock index bf9a376b..74b553fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2881,6 +2881,7 @@ dependencies = [ "fluidex-common", "futures", "hex", + "itertools 0.10.1", "lazy_static", "log", "num", diff --git a/Cargo.toml b/Cargo.toml index 43a74075..e8269a1d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ ethers = { git = "https://github.com/gakonst/ethers-rs" } fluidex-common = { git = "https://github.com/fluidex/common-rs", branch = "master", features = [ "kafka", "l2-account", "non-blocking-tracing", "rollup-state-db" ] } futures = "0.3.13" hex = "0.4.3" +itertools = "0.10.0" lazy_static = "1.4.0" log = "0.4" num = "0.4.0" diff --git a/src/bin/main.rs b/src/bin/main.rs index 84c05066..2c729549 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -72,7 +72,7 @@ fn process_msgs( None }; - let manager = ManagerWrapper::new(state, *params::NTXS, block_offset, *params::VERBOSE); + let manager = ManagerWrapper::new(state, (*params::TX_SLOT_NUMS).clone(), block_offset, *params::VERBOSE); // TODO: change to to_hex_string, remove 'Fr(' and ')' log::info!("genesis root {}", manager.root().to_string()); @@ -125,11 +125,12 @@ fn run_msg_processor( let db_pool = PgPool::connect(Settings::db()).await.unwrap(); let mut old_block_check = true; let mut old_block_num = 0; + let mut tx_num = 0; loop { // In the worst case we wait for about 119 seconds timeout until we try to // generate a block, if there's any tx. // TODO: dynamic timeout - match msg_receiver.recv_timeout(Duration::from_secs(120)) { + let need_to_flush = match msg_receiver.recv_timeout(Duration::from_secs(120)) { Ok(msg) => { log::debug!("recv new msg {:?}", msg); match msg { @@ -152,18 +153,15 @@ fn run_msg_processor( processor.handle_withdraw_msg(&mut manager, withdraw); } } + false } Err(err) => match err { - RecvTimeoutError::Timeout => { - if manager.has_raw_tx() { - manager.flush_with_nop(); - } - } + RecvTimeoutError::Timeout => true, RecvTimeoutError::Disconnected => break, }, }; - for block in manager.pop_all_blocks() { + for block in manager.pop_all_blocks(need_to_flush) { if old_block_check && is_present_block(&db_pool, &block).await.unwrap() { // Skips this old block. old_block_num += 1; @@ -173,17 +171,18 @@ fn run_msg_processor( // Once the block is a new one, no need to check if old. old_block_check = false; + tx_num += block.detail.encoded_txs.len(); block_sender.try_send(block).unwrap(); } let block_num = manager.get_block_generate_num() - old_block_num; let secs = timing.elapsed().as_secs_f32(); log::info!( - "generate {} blocks with block_size {} in {}s: average TPS: {}", + "generate {} blocks with dynamic block_size {:?} in {}s: average TPS: {}", block_num, - *params::NTXS, + *params::TX_SLOT_NUMS, secs, - (*params::NTXS * block_num) as f32 / secs + tx_num as f32 / secs ); } diff --git a/src/params.rs b/src/params.rs index 3e905edf..0416415a 100644 --- a/src/params.rs +++ b/src/params.rs @@ -1,33 +1,59 @@ +use itertools::sorted; use lazy_static::lazy_static; +use std::env; +use std::fmt::Debug; +use std::str::FromStr; lazy_static! { - pub static ref NTXS: usize = std::env::var("NTXS") - .expect("NTXS not set in ENV") - .parse::() - .expect("parse NTXS"); - pub static ref BALANCELEVELS: usize = std::env::var("BALANCELEVELS") + pub static ref TX_SLOT_NUMS: Vec = sorted(parse_env_to_collection::, usize>("TX_SLOT_NUMS")).collect(); + pub static ref BALANCELEVELS: usize = env::var("BALANCELEVELS") .expect("BALANCELEVELS not set in ENV") .parse::() .expect("parse BALANCELEVELS"); - pub static ref ORDERLEVELS: usize = std::env::var("ORDERLEVELS") + pub static ref ORDERLEVELS: usize = env::var("ORDERLEVELS") .expect("ORDERLEVELS not set in ENV") .parse::() .expect("parse ORDERLEVELS"); - pub static ref ACCOUNTLEVELS: usize = std::env::var("ACCOUNTLEVELS") + pub static ref ACCOUNTLEVELS: usize = env::var("ACCOUNTLEVELS") .expect("ACCOUNTLEVELS not set in ENV") .parse::() .expect("parse ACCOUNTLEVELS"); pub static ref MAXORDERNUM: usize = 2usize.pow(*ORDERLEVELS as u32); pub static ref MAXACCOUNTNUM: usize = 2usize.pow(*ACCOUNTLEVELS as u32); pub static ref MAXTOKENNUM: usize = 2usize.pow(*BALANCELEVELS as u32); - pub static ref VERBOSE: bool = std::env::var("VERBOSE") + pub static ref VERBOSE: bool = env::var("VERBOSE") .unwrap_or_else(|_| false.to_string()) .parse::() .unwrap_or(false); // default overwrite for now - pub static ref OVERWRITE_SIGNATURE: bool = std::env::var("OVERWRITE_SIGNATURE") + pub static ref OVERWRITE_SIGNATURE: bool = env::var("OVERWRITE_SIGNATURE") .unwrap_or_else(|_| true.to_string()) .parse::() .unwrap_or(true); } + +fn parse_env_to_collection(name: &str) -> F +where + I: FromStr, + I::Err: Debug, + F: FromIterator, +{ + env::var(name) + .unwrap_or_else(|_| panic!("{} not set in ENV", name)) + .split(',') + .map(|i| i.trim().parse::().unwrap()) + .collect() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_env_to_collection() { + env::set_var("DUMMY_USIZE_ARRAY", "2, 16, 64,512"); + let parse_result: Vec = parse_env_to_collection("DUMMY_USIZE_ARRAY"); + assert_eq!(parse_result, [2, 16, 64, 512]); + } +} diff --git a/src/state/manager_wrapper.rs b/src/state/manager_wrapper.rs index 4cd1b444..24699577 100644 --- a/src/state/manager_wrapper.rs +++ b/src/state/manager_wrapper.rs @@ -24,11 +24,9 @@ use std::time::Instant; // TODO: too many unwrap here pub struct ManagerWrapper { state: Arc>, - n_tx: usize, - // 0 <= len(buffered_txs) < n_tx + tx_slot_nums: Vec, buffered_txs: Vec, block_generate_num: usize, - //buffered_blocks: Vec, tx_data_encoder: TxDataEncoder, verbose: bool, verify_sig: bool, @@ -156,7 +154,7 @@ impl ManagerWrapper { pub fn print_config() { Tree::print_config(); } - pub fn new(state: Arc>, n_tx: usize, block_offset: Option, verbose: bool) -> Self { + pub fn new(state: Arc>, tx_slot_nums: Vec, block_offset: Option, verbose: bool) -> Self { let tx_data_encoder = { let st = state.read().unwrap(); TxDataEncoder::new(st.balance_bits() as u32, st.order_bits() as u32, st.account_bits() as u32) @@ -164,10 +162,9 @@ impl ManagerWrapper { Self { state, - n_tx, + tx_slot_nums, buffered_txs: Vec::new(), block_generate_num: block_offset.unwrap_or(0), - //buffered_blocks: Vec::new(), tx_data_encoder, verbose, verify_sig: true, @@ -802,15 +799,6 @@ impl ManagerWrapper { self.add_raw_tx(raw_tx); } - pub fn flush_with_nop(&mut self) { - let mut cnt = 0; - while self.buffered_txs.len() % self.n_tx != 0 { - self.nop(); - cnt += 1; - } - log::debug!("flush with {} nop", cnt); - } - pub fn check_sig(&self, account_id: u32, msg: &Fr, sig: &SignatureBJJ) -> anyhow::Result<()> { let state = self.state(); if !state.has_account(account_id) { @@ -826,14 +814,46 @@ impl ManagerWrapper { Ok(()) } - pub fn pop_all_blocks(&mut self) -> Vec { + pub fn pop_all_blocks(&mut self, need_to_flush: bool) -> Vec { let mut blocks = vec![]; + self.generate_blocks(&mut blocks); + if need_to_flush { + self.flush_with_nop(); + self.generate_blocks(&mut blocks); + } + blocks + } + + fn flush_with_nop(&mut self) { let mut i = 0; - let len = self.buffered_txs.len(); - while i + self.n_tx <= len { + let tx_count = self.buffered_txs.len(); + loop { + let tx_slot_num = self.calculate_tx_slot_num(tx_count - i); + if i + tx_slot_num > tx_count { + break; + } + + i += tx_slot_num; + } + + for _ in 0..tx_count - i { + self.nop(); + } + + log::debug!("flush with {} nop", i); + } + + fn generate_blocks(&mut self, blocks: &mut Vec) { + let mut i = 0; + let tx_count = self.buffered_txs.len(); + loop { + let tx_slot_num = self.calculate_tx_slot_num(tx_count - i); + if i + tx_slot_num > tx_count { + break; + } let block = Self::forge_with_txs( self.block_generate_num, - &self.buffered_txs[i..i + self.n_tx], + &self.buffered_txs[i..i + tx_slot_num], &mut self.tx_data_encoder, ); blocks.push(block); @@ -841,24 +861,31 @@ impl ManagerWrapper { self.block_generate_num += 1; #[cfg(feature = "persist_sled")] - // TODO: fix unwrap if self.block_generate_num % Settings::persist_every_n_block() == 0 { - self.persist(i) + self.persist(i, tx_slot_num); } - i += self.n_tx; + i += tx_slot_num; } self.buffered_txs.drain(0..i); - blocks + } + + fn calculate_tx_slot_num(&self, tx_count: usize) -> usize { + *self + .tx_slot_nums + .iter() + .rev() + .find(|&slot_num| tx_count > slot_num * 10) + .unwrap_or(&self.tx_slot_nums[0]) } #[cfg(feature = "persist_sled")] - fn persist(&mut self, i: usize) { + fn persist(&mut self, i: usize, tx_slot_num: usize) { log::info!("start to dump #{}", self.block_generate_num); let start = Instant::now(); - let last_offset = self.buffered_txs[i..i + self.n_tx].iter().rev().filter_map(|tx| tx.offset).next(); + let last_offset = self.buffered_txs[i..i + tx_slot_num].iter().rev().filter_map(|tx| tx.offset).next(); if log::log_enabled!(log::Level::Debug) { - let offsets: Vec> = self.buffered_txs[i..i + self.n_tx].iter().map(|tx| tx.offset).collect(); + let offsets: Vec> = self.buffered_txs[i..i + tx_slot_num].iter().map(|tx| tx.offset).collect(); log::debug!("block #{}, offsets: {:?}", self.block_generate_num, offsets); } if last_offset.is_none() { @@ -911,7 +938,7 @@ mod test { Settings::set(s); let gs = GlobalState::new(2, 3, 2, false); - let mut wrapper = ManagerWrapper::new(Arc::new(RwLock::new(gs)), 2, None, false); + let mut wrapper = ManagerWrapper::new(Arc::new(RwLock::new(gs)), vec![2], None, false); let key1 = L2Key { eth_addr: Fr::zero(), @@ -996,7 +1023,7 @@ mod test { ) .unwrap(); - let blks = wrapper.pop_all_blocks(); + let blks = wrapper.pop_all_blocks(true); assert_eq!(blks[0].detail.txdata_hash.low_u128(), 210768282952759810590552623169132871868u128); assert_eq!(blks[1].detail.txdata_hash.low_u128(), 159409240260550832134647856072165320498u128); assert_eq!(blks[2].detail.txdata_hash.low_u128(), 4036618609204034397054436922352855460u128); diff --git a/tests/circuit_tests/test_l2_block.rs b/tests/circuit_tests/test_l2_block.rs index 9ba7aa28..ef5614c8 100644 --- a/tests/circuit_tests/test_l2_block.rs +++ b/tests/circuit_tests/test_l2_block.rs @@ -14,6 +14,8 @@ use rollup_state_manager::params; use std::option::Option::None; use std::sync::{Arc, RwLock}; +const N_TXS: usize = 2; + pub struct Block { n_txs: usize, account_levels: usize, @@ -62,7 +64,7 @@ impl Block { self.verbose, ))); let (sender, receiver) = crossbeam_channel::bounded(100); - let mut manager = ManagerWrapper::new(state, self.n_txs, None, self.verbose); + let mut manager = ManagerWrapper::new(state, vec![self.n_txs], None, self.verbose); let token_id0 = 0; let token_id1 = 1; @@ -235,9 +237,7 @@ impl Block { }; manager.full_spot_trade(full_trade, None); - manager.flush_with_nop(); - - for block in manager.pop_all_blocks() { + for block in manager.pop_all_blocks(true) { sender.try_send(block).unwrap(); } @@ -260,13 +260,12 @@ impl Block { self.verbose, ))); let (sender, receiver) = crossbeam_channel::bounded(100); - let mut manager = ManagerWrapper::new(state, self.n_txs, None, self.verbose); + let mut manager = ManagerWrapper::new(state, vec![self.n_txs], None, self.verbose); // we need to have at least 1 account manager.create_new_account(1).unwrap(); manager.nop(); - manager.flush_with_nop(); - for block in manager.pop_all_blocks() { + for block in manager.pop_all_blocks(true) { sender.try_send(block).unwrap(); } @@ -282,13 +281,13 @@ impl Block { pub fn get_l2_block_test_case() -> CircuitTestCase { let main = format!( "Block({}, {}, {}, {})", - *params::NTXS, + N_TXS, *params::BALANCELEVELS, *params::ORDERLEVELS, *params::ACCOUNTLEVELS ); let test_data = Block::new( - *params::NTXS, + N_TXS, *params::BALANCELEVELS, *params::ORDERLEVELS, *params::ACCOUNTLEVELS, diff --git a/tests/global_state/bench.rs b/tests/global_state/bench.rs index 097d9872..a11937ce 100644 --- a/tests/global_state/bench.rs +++ b/tests/global_state/bench.rs @@ -21,6 +21,8 @@ use std::time::Instant; #[cfg(not(feature = "windows_build"))] use {pprof::protos::Message, std::io::Write}; +const N_TXS: usize = 2; + fn bench_with_dummy_transfers() -> Result<()> { GlobalState::print_config(); let state = Arc::new(RwLock::new(GlobalState::new( @@ -35,7 +37,7 @@ fn bench_with_dummy_transfers() -> Result<()> { ..Default::default() }; - let mut manager = ManagerWrapper::new(state, *params::NTXS, None, *params::VERBOSE); + let mut manager = ManagerWrapper::new(state, vec![N_TXS], None, *params::VERBOSE); // step1: create users let user1 = Account::from_mnemonic(1, &get_mnemonic_by_account_id(1)).unwrap(); @@ -124,7 +126,7 @@ fn bench_with_real_trades(_circuit_repo: &Path) -> Result> { // by clone accounts with same trades let loop_num = 50; - let mut manager = ManagerWrapper::new(state, *params::NTXS, None, *params::VERBOSE); + let mut manager = ManagerWrapper::new(state, vec![N_TXS], None, *params::VERBOSE); let timing = Instant::now(); let mut inner_timing = Instant::now(); @@ -193,11 +195,12 @@ fn bench_with_real_trades(_circuit_repo: &Path) -> Result> { //println!("\nepoch {} done", i); } - let blocks: Vec<_> = manager.pop_all_blocks(); + let blocks: Vec<_> = manager.pop_all_blocks(true); + let tx_num = blocks.iter().fold(0, |acc, b| acc + b.detail.encoded_txs.len()); println!( "bench for {} blocks (TPS: {})", blocks.len(), - (*params::NTXS * blocks.len()) as f32 / timing.elapsed().as_secs_f32() + tx_num as f32 / timing.elapsed().as_secs_f32() ); Ok(blocks) } diff --git a/tests/global_state/gen_testcase.rs b/tests/global_state/gen_testcase.rs index b47e686d..512f4ddb 100644 --- a/tests/global_state/gen_testcase.rs +++ b/tests/global_state/gen_testcase.rs @@ -20,6 +20,8 @@ use rollup_state_manager::config::Settings; use rollup_state_manager::msg::{msg_loader, msg_processor}; use std::option::Option::None; +const N_TXS: usize = 2; + fn replay_msgs( msg_receiver: crossbeam_channel::Receiver, block_sender: crossbeam_channel::Sender, @@ -31,7 +33,7 @@ fn replay_msgs( *params::ACCOUNTLEVELS, *params::VERBOSE, ))); - let mut manager = ManagerWrapper::new(state, *params::NTXS, None, *params::VERBOSE); + let mut manager = ManagerWrapper::new(state, vec![N_TXS], None, *params::VERBOSE); println!("genesis root {}", manager.root()); @@ -65,9 +67,10 @@ fn replay_msgs( } } } - manager.flush_with_nop(); - for block in manager.pop_all_blocks() { + let mut tx_num = 0; + for block in manager.pop_all_blocks(true) { + tx_num += block.detail.encoded_txs.len(); block_sender.try_send(block).unwrap(); } @@ -83,7 +86,7 @@ fn replay_msgs( println!( "genesis {} blocks (TPS: {})", block_num, - (*params::NTXS * block_num) as f32 / timing.elapsed().as_secs_f32() + tx_num as f32 / timing.elapsed().as_secs_f32() ); Ok(()) })) @@ -114,7 +117,7 @@ pub fn export_circuit_and_testdata(circuit_repo: &Path, blocks: Vec) -> src: String::from("src/block.circom"), main: format!( "Block({}, {}, {}, {})", - *params::NTXS, + N_TXS, *params::BALANCELEVELS, *params::ORDERLEVELS, *params::ACCOUNTLEVELS