From ab3f10b4d50822d8b35773fc1d2f61bdae646326 Mon Sep 17 00:00:00 2001 From: carneiro-cw <156914855+carneiro-cw@users.noreply.github.com> Date: Mon, 4 Nov 2024 11:56:06 -0300 Subject: [PATCH] enha: save historic events to files instead of sending them to kafka (#1844) --- src/bin/historic_events_processor.rs | 109 +++++++++++++---------- src/eth/storage/rocks/types/unix_time.rs | 2 +- 2 files changed, 63 insertions(+), 48 deletions(-) diff --git a/src/bin/historic_events_processor.rs b/src/bin/historic_events_processor.rs index 35f8cb520..78e58bba9 100644 --- a/src/bin/historic_events_processor.rs +++ b/src/bin/historic_events_processor.rs @@ -1,55 +1,27 @@ use std::time::Duration; use anyhow::Context; -use clap::CommandFactory; -use futures::StreamExt; +use chrono::TimeZone; +use indicatif::ProgressBar; use rocksdb::properties::ESTIMATE_NUM_KEYS; use stratus::eth::storage::rocks::rocks_state::RocksStorageState; +use stratus::eth::storage::rocks::types::BlockRocksdb; use stratus::eth::storage::rocks::types::TransactionMinedRocksdb; use stratus::eth::storage::rocks::types::UnixTimeRocksdb; -use stratus::infra::kafka::KafkaConfig; -use stratus::infra::kafka::KafkaSecurityProtocol; use stratus::ledger::events::transaction_to_events; use stratus::ledger::events::AccountTransfers; -use stratus::log_and_err; +use stratus::ledger::events::Event; +/// Database timeout duration in seconds const TIMEOUT: Duration = Duration::from_secs(5); -fn parse_config(args: &clap::ArgMatches) -> KafkaConfig { - KafkaConfig { - bootstrap_servers: args.get_one::("bootstrap_servers").unwrap().clone(), - topic: args.get_one::("topic").unwrap().clone(), - client_id: args.get_one::("client_id").unwrap().clone(), - group_id: args.get_one::("group_id").cloned(), - security_protocol: args.get_one::("security_protocol").copied().unwrap_or_default(), - sasl_mechanisms: args.get_one::("sasl_mechanisms").cloned(), - sasl_username: args.get_one::("sasl_username").cloned(), - sasl_password: args.get_one::("sasl_password").cloned(), - ssl_ca_location: args.get_one::("ssl_ca_location").cloned(), - ssl_certificate_location: args.get_one::("ssl_certificate_location").cloned(), - ssl_key_location: args.get_one::("ssl_key_location").cloned(), - } -} - +/// Converts a mined transaction from RocksDB to account transfer events fn transaction_mined_rocks_db_to_events(block_timestamp: UnixTimeRocksdb, tx: TransactionMinedRocksdb) -> Vec { transaction_to_events(block_timestamp.into(), std::borrow::Cow::Owned(tx.into())) } -#[tokio::main] -async fn main() -> Result<(), anyhow::Error> { - tracing_subscriber::fmt::init(); - - // trick clap to ignore the ImporterConfig requirement - let config = parse_config( - &KafkaConfig::command() - .mut_group("KafkaConfig", |group| group.id("ImporterConfig")) - .get_matches(), - ); - - let state = RocksStorageState::new("data/rocksdb".to_string(), TIMEOUT, Some(0.1)).context("failed to create rocksdb state")?; - let connector = config.init()?; - - // Count total number of blocks first +/// Returns total count of blocks and transactions from RocksDB state +fn get_total_blocks_and_transactions(state: &RocksStorageState) -> (u64, u64) { let total_blocks = state .db .property_value_cf(&state.blocks_by_number.handle(), ESTIMATE_NUM_KEYS) @@ -66,6 +38,13 @@ async fn main() -> Result<(), anyhow::Error> { .parse::() .unwrap(); + (total_blocks, total_transactions) +} + +/// Creates progress bars for tracking block and transaction processing +fn create_progress_bar(state: &RocksStorageState) -> (ProgressBar, ProgressBar) { + let (total_blocks, total_transactions) = get_total_blocks_and_transactions(state); + tracing::info!(?total_transactions, "Estimated total transactions in db:"); let mb = indicatif::MultiProgress::new(); let tx_pb = mb.add(indicatif::ProgressBar::new(total_transactions)); @@ -81,6 +60,28 @@ async fn main() -> Result<(), anyhow::Error> { b_pb.set_style(style); b_pb.set_message("Blocks"); + (b_pb, tx_pb) +} + +/// Processes all transactions in a block and returns their event strings +fn process_block_events(block: BlockRocksdb) -> Vec { + let timestamp = block.header.timestamp; + block + .transactions + .into_iter() + .flat_map(|tx| transaction_mined_rocks_db_to_events(timestamp, tx)) + .map(|event| event.event_payload().unwrap()) + .collect() +} + +/// Main function that processes blockchain data and generates events +#[tokio::main] +async fn main() -> Result<(), anyhow::Error> { + tracing_subscriber::fmt::init(); + let state = RocksStorageState::new("data/rocksdb".to_string(), TIMEOUT, Some(0.1)).context("failed to create rocksdb state")?; + + let (b_pb, tx_pb) = create_progress_bar(&state); + // Load last processed block number from file let start_block = std::fs::read_to_string("last_processed_block") .map(|s| s.trim().parse::().unwrap()) @@ -93,28 +94,42 @@ async fn main() -> Result<(), anyhow::Error> { state.blocks_by_number.iter_start() }; + let mut days_since_0 = 0; + let mut event_batch = vec![]; for result in iter { let (number, block) = result.context("failed to read block")?; let block = block.into_inner(); let timestamp = block.header.timestamp; + if days_since_0 == 0 { + days_since_0 = timestamp.0 / 86_400; + } + let tx_count = block.transactions.len(); - let block_events = block - .transactions - .into_iter() - .flat_map(|tx| transaction_mined_rocks_db_to_events(timestamp, tx)); + let mut block_events = process_block_events(block); + + event_batch.append(&mut block_events); - let mut buffer = connector.send_buffered(block_events.collect(), 30)?; - while let Some(res) = buffer.next().await { - if let Err(e) = res { - return log_and_err!(reason = e, "failed to send events"); - } - } tx_pb.inc(tx_count as u64); b_pb.inc(1); // Save current block number to file after processing - std::fs::write("last_processed_block", number.to_string())?; + if days_since_0 != timestamp.0 / 86_400 { + let date = chrono::Utc + .timestamp_opt(0, 0) + .earliest() + .unwrap() + .checked_add_days(chrono::Days::new(days_since_0)) + .unwrap() + .date_naive(); + + days_since_0 = timestamp.0 / 86_400; + if !event_batch.is_empty() { + std::fs::write(format!("events/{}", date), event_batch.join("\n"))?; + } + std::fs::write("last_processed_block", number.to_string())?; + event_batch.clear(); + } } tx_pb.finish_with_message("Done!"); diff --git a/src/eth/storage/rocks/types/unix_time.rs b/src/eth/storage/rocks/types/unix_time.rs index 90cc42c5b..7f0152263 100644 --- a/src/eth/storage/rocks/types/unix_time.rs +++ b/src/eth/storage/rocks/types/unix_time.rs @@ -4,7 +4,7 @@ use crate::eth::primitives::UnixTime; use crate::gen_newtype_from; #[derive(Debug, Clone, Copy, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize, fake::Dummy)] -pub struct UnixTimeRocksdb(u64); +pub struct UnixTimeRocksdb(pub u64); gen_newtype_from!(self = UnixTimeRocksdb, other = u64);