From 98ecd07f8d4b68a003280e8643d530ff56928c9c Mon Sep 17 00:00:00 2001 From: carneiro-cw <156914855+carneiro-cw@users.noreply.github.com> Date: Tue, 5 Nov 2024 18:40:28 -0300 Subject: [PATCH] enha: alter the events file structure (#1848) * enha: alter the events file structure * remove test file --- src/bin/historic_events_processor.rs | 35 +++++++++++++++------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/src/bin/historic_events_processor.rs b/src/bin/historic_events_processor.rs index 78e58bba9..92f20e6b7 100644 --- a/src/bin/historic_events_processor.rs +++ b/src/bin/historic_events_processor.rs @@ -1,7 +1,9 @@ use std::time::Duration; use anyhow::Context; +use chrono::Datelike; use chrono::TimeZone; +use chrono::Timelike; use indicatif::ProgressBar; use rocksdb::properties::ESTIMATE_NUM_KEYS; use stratus::eth::storage::rocks::rocks_state::RocksStorageState; @@ -43,6 +45,7 @@ fn get_total_blocks_and_transactions(state: &RocksStorageState) -> (u64, u64) { /// Creates progress bars for tracking block and transaction processing fn create_progress_bar(state: &RocksStorageState) -> (ProgressBar, ProgressBar) { + tracing::info!("creating progress bar"); let (total_blocks, total_transactions) = get_total_blocks_and_transactions(state); tracing::info!(?total_transactions, "Estimated total transactions in db:"); @@ -75,18 +78,20 @@ fn process_block_events(block: BlockRocksdb) -> Vec { } /// Main function that processes blockchain data and generates events -#[tokio::main] -async fn main() -> Result<(), anyhow::Error> { +fn main() -> Result<(), anyhow::Error> { tracing_subscriber::fmt::init(); let state = RocksStorageState::new("data/rocksdb".to_string(), TIMEOUT, Some(0.1)).context("failed to create rocksdb state")?; let (b_pb, tx_pb) = create_progress_bar(&state); // Load last processed block number from file + tracing::info!("loading last processed block"); let start_block = std::fs::read_to_string("last_processed_block") .map(|s| s.trim().parse::().unwrap()) .unwrap_or(0); + tracing::info!(?start_block); + tracing::info!("creating rocksdb iterator"); let iter = if start_block > 0 { b_pb.inc(start_block); state.blocks_by_number.iter_from(start_block.into(), rocksdb::Direction::Forward)? @@ -94,15 +99,15 @@ async fn main() -> Result<(), anyhow::Error> { state.blocks_by_number.iter_start() }; - let mut days_since_0 = 0; + let mut hours_since_0 = 0; let mut event_batch = vec![]; for result in iter { let (number, block) = result.context("failed to read block")?; let block = block.into_inner(); let timestamp = block.header.timestamp; - if days_since_0 == 0 { - days_since_0 = timestamp.0 / 86_400; + if hours_since_0 == 0 { + hours_since_0 = timestamp.0 / 3600; } let tx_count = block.transactions.len(); @@ -114,18 +119,16 @@ async fn main() -> Result<(), anyhow::Error> { tx_pb.inc(tx_count as u64); b_pb.inc(1); // Save current block number to file after processing - if days_since_0 != timestamp.0 / 86_400 { - let date = chrono::Utc - .timestamp_opt(0, 0) - .earliest() - .unwrap() - .checked_add_days(chrono::Days::new(days_since_0)) - .unwrap() - .date_naive(); - - days_since_0 = timestamp.0 / 86_400; + if hours_since_0 != timestamp.0 / 3600 { + let date = chrono::Utc.timestamp_opt((hours_since_0 * 3600) as i64, 0).earliest().unwrap(); + + hours_since_0 = timestamp.0 / 3600; if !event_batch.is_empty() { - std::fs::write(format!("events/{}", date), event_batch.join("\n"))?; + let folder_path = format!("events/{}/{}/{}", date.year(), date.month(), date.day()); + if !std::path::Path::new(&folder_path).exists() { + std::fs::create_dir_all(&folder_path)?; + } + std::fs::write(format!("{}/{}", folder_path, date.hour()), event_batch.join("\n"))?; } std::fs::write("last_processed_block", number.to_string())?; event_batch.clear();