Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Generate blocks with dynamic tx_num #236

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ RUN_MODE=development # development or staging
RUST_BACKTRACE=1
RUST_LOG=rollup_state_manager=debug,info

NTXS=2
TX_SLOT_NUMS="2, 16, 64, 512"
BALANCELEVELS=3
ORDERLEVELS=3
ACCOUNTLEVELS=5
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ ethers = { git = "https://github.com/gakonst/ethers-rs" }
fluidex-common = { git = "https://github.com/fluidex/common-rs", branch = "master", features = [ "kafka", "l2-account", "non-blocking-tracing", "rollup-state-db" ] }
futures = "0.3.13"
hex = "0.4.3"
itertools = "0.10.0"
lazy_static = "1.4.0"
log = "0.4"
num = "0.4.0"
Expand Down
21 changes: 10 additions & 11 deletions src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ fn process_msgs(
None
};

let manager = ManagerWrapper::new(state, *params::NTXS, block_offset, *params::VERBOSE);
let manager = ManagerWrapper::new(state, (*params::TX_SLOT_NUMS).clone(), block_offset, *params::VERBOSE);
// TODO: change to to_hex_string, remove 'Fr(' and ')'
log::info!("genesis root {}", manager.root().to_string());

Expand Down Expand Up @@ -125,11 +125,12 @@ fn run_msg_processor(
let db_pool = PgPool::connect(Settings::db()).await.unwrap();
let mut old_block_check = true;
let mut old_block_num = 0;
let mut tx_num = 0;
loop {
// In the worst case we wait for about 119 seconds timeout until we try to
// generate a block, if there's any tx.
// TODO: dynamic timeout
match msg_receiver.recv_timeout(Duration::from_secs(120)) {
let need_to_flush = match msg_receiver.recv_timeout(Duration::from_secs(120)) {
Ok(msg) => {
log::debug!("recv new msg {:?}", msg);
match msg {
Expand All @@ -152,18 +153,15 @@ fn run_msg_processor(
processor.handle_withdraw_msg(&mut manager, withdraw);
}
}
false
}
Err(err) => match err {
RecvTimeoutError::Timeout => {
if manager.has_raw_tx() {
manager.flush_with_nop();
}
}
RecvTimeoutError::Timeout => true,
RecvTimeoutError::Disconnected => break,
},
};

for block in manager.pop_all_blocks() {
for block in manager.pop_all_blocks(need_to_flush) {
if old_block_check && is_present_block(&db_pool, &block).await.unwrap() {
// Skips this old block.
old_block_num += 1;
Expand All @@ -173,17 +171,18 @@ fn run_msg_processor(
// Once the block is a new one, no need to check if old.
old_block_check = false;

tx_num += block.detail.encoded_txs.len();
block_sender.try_send(block).unwrap();
}

let block_num = manager.get_block_generate_num() - old_block_num;
let secs = timing.elapsed().as_secs_f32();
log::info!(
"generate {} blocks with block_size {} in {}s: average TPS: {}",
"generate {} blocks with dynamic block_size {:?} in {}s: average TPS: {}",
block_num,
*params::NTXS,
*params::TX_SLOT_NUMS,
secs,
(*params::NTXS * block_num) as f32 / secs
tx_num as f32 / secs
);
}

Expand Down
44 changes: 35 additions & 9 deletions src/params.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,59 @@
use itertools::sorted;
use lazy_static::lazy_static;
use std::env;
use std::fmt::Debug;
use std::str::FromStr;

lazy_static! {
pub static ref NTXS: usize = std::env::var("NTXS")
.expect("NTXS not set in ENV")
.parse::<usize>()
.expect("parse NTXS");
pub static ref BALANCELEVELS: usize = std::env::var("BALANCELEVELS")
pub static ref TX_SLOT_NUMS: Vec<usize> = sorted(parse_env_to_collection::<Vec<usize>, usize>("TX_SLOT_NUMS")).collect();
pub static ref BALANCELEVELS: usize = env::var("BALANCELEVELS")
.expect("BALANCELEVELS not set in ENV")
.parse::<usize>()
.expect("parse BALANCELEVELS");
pub static ref ORDERLEVELS: usize = std::env::var("ORDERLEVELS")
pub static ref ORDERLEVELS: usize = env::var("ORDERLEVELS")
.expect("ORDERLEVELS not set in ENV")
.parse::<usize>()
.expect("parse ORDERLEVELS");
pub static ref ACCOUNTLEVELS: usize = std::env::var("ACCOUNTLEVELS")
pub static ref ACCOUNTLEVELS: usize = env::var("ACCOUNTLEVELS")
.expect("ACCOUNTLEVELS not set in ENV")
.parse::<usize>()
.expect("parse ACCOUNTLEVELS");
pub static ref MAXORDERNUM: usize = 2usize.pow(*ORDERLEVELS as u32);
pub static ref MAXACCOUNTNUM: usize = 2usize.pow(*ACCOUNTLEVELS as u32);
pub static ref MAXTOKENNUM: usize = 2usize.pow(*BALANCELEVELS as u32);
pub static ref VERBOSE: bool = std::env::var("VERBOSE")
pub static ref VERBOSE: bool = env::var("VERBOSE")
.unwrap_or_else(|_| false.to_string())
.parse::<bool>()
.unwrap_or(false);

// default overwrite for now
pub static ref OVERWRITE_SIGNATURE: bool = std::env::var("OVERWRITE_SIGNATURE")
pub static ref OVERWRITE_SIGNATURE: bool = env::var("OVERWRITE_SIGNATURE")
.unwrap_or_else(|_| true.to_string())
.parse::<bool>()
.unwrap_or(true);
}

fn parse_env_to_collection<F, I>(name: &str) -> F
where
I: FromStr,
I::Err: Debug,
F: FromIterator<I>,
{
env::var(name)
.unwrap_or_else(|_| panic!("{} not set in ENV", name))
.split(',')
.map(|i| i.trim().parse::<I>().unwrap())
.collect()
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_env_to_collection() {
env::set_var("DUMMY_USIZE_ARRAY", "2, 16, 64,512");
let parse_result: Vec<usize> = parse_env_to_collection("DUMMY_USIZE_ARRAY");
assert_eq!(parse_result, [2, 16, 64, 512]);
}
}
83 changes: 55 additions & 28 deletions src/state/manager_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@ use std::time::Instant;
// TODO: too many unwrap here
pub struct ManagerWrapper {
state: Arc<RwLock<GlobalState>>,
n_tx: usize,
// 0 <= len(buffered_txs) < n_tx
tx_slot_nums: Vec<usize>,
buffered_txs: Vec<RawTx>,
block_generate_num: usize,
//buffered_blocks: Vec<L2Block>,
tx_data_encoder: TxDataEncoder,
verbose: bool,
verify_sig: bool,
Expand Down Expand Up @@ -156,18 +154,17 @@ impl ManagerWrapper {
pub fn print_config() {
Tree::print_config();
}
pub fn new(state: Arc<RwLock<GlobalState>>, n_tx: usize, block_offset: Option<usize>, verbose: bool) -> Self {
pub fn new(state: Arc<RwLock<GlobalState>>, tx_slot_nums: Vec<usize>, block_offset: Option<usize>, verbose: bool) -> Self {
let tx_data_encoder = {
let st = state.read().unwrap();
TxDataEncoder::new(st.balance_bits() as u32, st.order_bits() as u32, st.account_bits() as u32)
};

Self {
state,
n_tx,
tx_slot_nums,
buffered_txs: Vec::new(),
block_generate_num: block_offset.unwrap_or(0),
//buffered_blocks: Vec::new(),
tx_data_encoder,
verbose,
verify_sig: true,
Expand Down Expand Up @@ -802,15 +799,6 @@ impl ManagerWrapper {
self.add_raw_tx(raw_tx);
}

pub fn flush_with_nop(&mut self) {
let mut cnt = 0;
while self.buffered_txs.len() % self.n_tx != 0 {
self.nop();
cnt += 1;
}
log::debug!("flush with {} nop", cnt);
}

pub fn check_sig(&self, account_id: u32, msg: &Fr, sig: &SignatureBJJ) -> anyhow::Result<()> {
let state = self.state();
if !state.has_account(account_id) {
Expand All @@ -826,39 +814,78 @@ impl ManagerWrapper {
Ok(())
}

pub fn pop_all_blocks(&mut self) -> Vec<L2Block> {
pub fn pop_all_blocks(&mut self, need_to_flush: bool) -> Vec<L2Block> {
let mut blocks = vec![];
self.generate_blocks(&mut blocks);
if need_to_flush {
self.flush_with_nop();
self.generate_blocks(&mut blocks);
}
blocks
}

fn flush_with_nop(&mut self) {
let mut i = 0;
let len = self.buffered_txs.len();
while i + self.n_tx <= len {
let tx_count = self.buffered_txs.len();
loop {
let tx_slot_num = self.calculate_tx_slot_num(tx_count - i);
if i + tx_slot_num > tx_count {
break;
}

i += tx_slot_num;
}

for _ in 0..tx_count - i {
self.nop();
}

log::debug!("flush with {} nop", i);
}

fn generate_blocks(&mut self, blocks: &mut Vec<L2Block>) {
let mut i = 0;
let tx_count = self.buffered_txs.len();
loop {
let tx_slot_num = self.calculate_tx_slot_num(tx_count - i);
if i + tx_slot_num > tx_count {
break;
}
let block = Self::forge_with_txs(
self.block_generate_num,
&self.buffered_txs[i..i + self.n_tx],
&self.buffered_txs[i..i + tx_slot_num],
&mut self.tx_data_encoder,
);
blocks.push(block);

self.block_generate_num += 1;

#[cfg(feature = "persist_sled")]
// TODO: fix unwrap
if self.block_generate_num % Settings::persist_every_n_block() == 0 {
self.persist(i)
self.persist(i, tx_slot_num);
}

i += self.n_tx;
i += tx_slot_num;
}
self.buffered_txs.drain(0..i);
blocks
}

fn calculate_tx_slot_num(&self, tx_count: usize) -> usize {
*self
.tx_slot_nums
.iter()
.rev()
.find(|&slot_num| tx_count > slot_num * 10)
.unwrap_or(&self.tx_slot_nums[0])
}

#[cfg(feature = "persist_sled")]
fn persist(&mut self, i: usize) {
fn persist(&mut self, i: usize, tx_slot_num: usize) {
log::info!("start to dump #{}", self.block_generate_num);
let start = Instant::now();
let last_offset = self.buffered_txs[i..i + self.n_tx].iter().rev().filter_map(|tx| tx.offset).next();
let last_offset = self.buffered_txs[i..i + tx_slot_num].iter().rev().filter_map(|tx| tx.offset).next();
if log::log_enabled!(log::Level::Debug) {
let offsets: Vec<Option<i64>> = self.buffered_txs[i..i + self.n_tx].iter().map(|tx| tx.offset).collect();
let offsets: Vec<Option<i64>> = self.buffered_txs[i..i + tx_slot_num].iter().map(|tx| tx.offset).collect();
log::debug!("block #{}, offsets: {:?}", self.block_generate_num, offsets);
}
if last_offset.is_none() {
Expand Down Expand Up @@ -911,7 +938,7 @@ mod test {
Settings::set(s);

let gs = GlobalState::new(2, 3, 2, false);
let mut wrapper = ManagerWrapper::new(Arc::new(RwLock::new(gs)), 2, None, false);
let mut wrapper = ManagerWrapper::new(Arc::new(RwLock::new(gs)), vec![2], None, false);

let key1 = L2Key {
eth_addr: Fr::zero(),
Expand Down Expand Up @@ -996,7 +1023,7 @@ mod test {
)
.unwrap();

let blks = wrapper.pop_all_blocks();
let blks = wrapper.pop_all_blocks(true);
assert_eq!(blks[0].detail.txdata_hash.low_u128(), 210768282952759810590552623169132871868u128);
assert_eq!(blks[1].detail.txdata_hash.low_u128(), 159409240260550832134647856072165320498u128);
assert_eq!(blks[2].detail.txdata_hash.low_u128(), 4036618609204034397054436922352855460u128);
Expand Down
17 changes: 8 additions & 9 deletions tests/circuit_tests/test_l2_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use rollup_state_manager::params;
use std::option::Option::None;
use std::sync::{Arc, RwLock};

const N_TXS: usize = 2;

pub struct Block {
n_txs: usize,
account_levels: usize,
Expand Down Expand Up @@ -62,7 +64,7 @@ impl Block {
self.verbose,
)));
let (sender, receiver) = crossbeam_channel::bounded(100);
let mut manager = ManagerWrapper::new(state, self.n_txs, None, self.verbose);
let mut manager = ManagerWrapper::new(state, vec![self.n_txs], None, self.verbose);

let token_id0 = 0;
let token_id1 = 1;
Expand Down Expand Up @@ -235,9 +237,7 @@ impl Block {
};
manager.full_spot_trade(full_trade, None);

manager.flush_with_nop();

for block in manager.pop_all_blocks() {
for block in manager.pop_all_blocks(true) {
sender.try_send(block).unwrap();
}

Expand All @@ -260,13 +260,12 @@ impl Block {
self.verbose,
)));
let (sender, receiver) = crossbeam_channel::bounded(100);
let mut manager = ManagerWrapper::new(state, self.n_txs, None, self.verbose);
let mut manager = ManagerWrapper::new(state, vec![self.n_txs], None, self.verbose);
// we need to have at least 1 account
manager.create_new_account(1).unwrap();
manager.nop();
manager.flush_with_nop();

for block in manager.pop_all_blocks() {
for block in manager.pop_all_blocks(true) {
sender.try_send(block).unwrap();
}

Expand All @@ -282,13 +281,13 @@ impl Block {
pub fn get_l2_block_test_case() -> CircuitTestCase {
let main = format!(
"Block({}, {}, {}, {})",
*params::NTXS,
N_TXS,
*params::BALANCELEVELS,
*params::ORDERLEVELS,
*params::ACCOUNTLEVELS
);
let test_data = Block::new(
*params::NTXS,
N_TXS,
*params::BALANCELEVELS,
*params::ORDERLEVELS,
*params::ACCOUNTLEVELS,
Expand Down
Loading