Skip to content

Commit

Permalink
enha: alter the events file structure (#1848)
Browse files Browse the repository at this point in the history
* enha: alter the events file structure

* remove test file
  • Loading branch information
carneiro-cw authored Nov 5, 2024
1 parent fb6fa01 commit 98ecd07
Showing 1 changed file with 19 additions and 16 deletions.
35 changes: 19 additions & 16 deletions src/bin/historic_events_processor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::time::Duration;

use anyhow::Context;
use chrono::Datelike;
use chrono::TimeZone;
use chrono::Timelike;
use indicatif::ProgressBar;
use rocksdb::properties::ESTIMATE_NUM_KEYS;
use stratus::eth::storage::rocks::rocks_state::RocksStorageState;
Expand Down Expand Up @@ -43,6 +45,7 @@ fn get_total_blocks_and_transactions(state: &RocksStorageState) -> (u64, u64) {

/// Creates progress bars for tracking block and transaction processing
fn create_progress_bar(state: &RocksStorageState) -> (ProgressBar, ProgressBar) {
tracing::info!("creating progress bar");
let (total_blocks, total_transactions) = get_total_blocks_and_transactions(state);

tracing::info!(?total_transactions, "Estimated total transactions in db:");
Expand Down Expand Up @@ -75,34 +78,36 @@ fn process_block_events(block: BlockRocksdb) -> Vec<String> {
}

/// Main function that processes blockchain data and generates events
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
fn main() -> Result<(), anyhow::Error> {
tracing_subscriber::fmt::init();
let state = RocksStorageState::new("data/rocksdb".to_string(), TIMEOUT, Some(0.1)).context("failed to create rocksdb state")?;

let (b_pb, tx_pb) = create_progress_bar(&state);

// Load last processed block number from file
tracing::info!("loading last processed block");
let start_block = std::fs::read_to_string("last_processed_block")
.map(|s| s.trim().parse::<u64>().unwrap())
.unwrap_or(0);
tracing::info!(?start_block);

tracing::info!("creating rocksdb iterator");
let iter = if start_block > 0 {
b_pb.inc(start_block);
state.blocks_by_number.iter_from(start_block.into(), rocksdb::Direction::Forward)?
} else {
state.blocks_by_number.iter_start()
};

let mut days_since_0 = 0;
let mut hours_since_0 = 0;
let mut event_batch = vec![];
for result in iter {
let (number, block) = result.context("failed to read block")?;
let block = block.into_inner();

let timestamp = block.header.timestamp;
if days_since_0 == 0 {
days_since_0 = timestamp.0 / 86_400;
if hours_since_0 == 0 {
hours_since_0 = timestamp.0 / 3600;
}

let tx_count = block.transactions.len();
Expand All @@ -114,18 +119,16 @@ async fn main() -> Result<(), anyhow::Error> {
tx_pb.inc(tx_count as u64);
b_pb.inc(1);
// Save current block number to file after processing
if days_since_0 != timestamp.0 / 86_400 {
let date = chrono::Utc
.timestamp_opt(0, 0)
.earliest()
.unwrap()
.checked_add_days(chrono::Days::new(days_since_0))
.unwrap()
.date_naive();

days_since_0 = timestamp.0 / 86_400;
if hours_since_0 != timestamp.0 / 3600 {
let date = chrono::Utc.timestamp_opt((hours_since_0 * 3600) as i64, 0).earliest().unwrap();

hours_since_0 = timestamp.0 / 3600;
if !event_batch.is_empty() {
std::fs::write(format!("events/{}", date), event_batch.join("\n"))?;
let folder_path = format!("events/{}/{}/{}", date.year(), date.month(), date.day());
if !std::path::Path::new(&folder_path).exists() {
std::fs::create_dir_all(&folder_path)?;
}
std::fs::write(format!("{}/{}", folder_path, date.hour()), event_batch.join("\n"))?;
}
std::fs::write("last_processed_block", number.to_string())?;
event_batch.clear();
Expand Down

0 comments on commit 98ecd07

Please sign in to comment.