Skip to content

Commit

Permalink
enha: save historic events to files instead of sending them to kafka (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
carneiro-cw authored Nov 4, 2024
1 parent 0e30414 commit ab3f10b
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 48 deletions.
109 changes: 62 additions & 47 deletions src/bin/historic_events_processor.rs
Original file line number Diff line number Diff line change
@@ -1,55 +1,27 @@
use std::time::Duration;

use anyhow::Context;
use clap::CommandFactory;
use futures::StreamExt;
use chrono::TimeZone;
use indicatif::ProgressBar;
use rocksdb::properties::ESTIMATE_NUM_KEYS;
use stratus::eth::storage::rocks::rocks_state::RocksStorageState;
use stratus::eth::storage::rocks::types::BlockRocksdb;
use stratus::eth::storage::rocks::types::TransactionMinedRocksdb;
use stratus::eth::storage::rocks::types::UnixTimeRocksdb;
use stratus::infra::kafka::KafkaConfig;
use stratus::infra::kafka::KafkaSecurityProtocol;
use stratus::ledger::events::transaction_to_events;
use stratus::ledger::events::AccountTransfers;
use stratus::log_and_err;
use stratus::ledger::events::Event;

/// Database timeout duration in seconds
const TIMEOUT: Duration = Duration::from_secs(5);

fn parse_config(args: &clap::ArgMatches) -> KafkaConfig {
KafkaConfig {
bootstrap_servers: args.get_one::<String>("bootstrap_servers").unwrap().clone(),
topic: args.get_one::<String>("topic").unwrap().clone(),
client_id: args.get_one::<String>("client_id").unwrap().clone(),
group_id: args.get_one::<String>("group_id").cloned(),
security_protocol: args.get_one::<KafkaSecurityProtocol>("security_protocol").copied().unwrap_or_default(),
sasl_mechanisms: args.get_one::<String>("sasl_mechanisms").cloned(),
sasl_username: args.get_one::<String>("sasl_username").cloned(),
sasl_password: args.get_one::<String>("sasl_password").cloned(),
ssl_ca_location: args.get_one::<String>("ssl_ca_location").cloned(),
ssl_certificate_location: args.get_one::<String>("ssl_certificate_location").cloned(),
ssl_key_location: args.get_one::<String>("ssl_key_location").cloned(),
}
}

/// Converts a mined transaction from RocksDB to account transfer events
fn transaction_mined_rocks_db_to_events(block_timestamp: UnixTimeRocksdb, tx: TransactionMinedRocksdb) -> Vec<AccountTransfers> {
transaction_to_events(block_timestamp.into(), std::borrow::Cow::Owned(tx.into()))
}

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
tracing_subscriber::fmt::init();

// trick clap to ignore the ImporterConfig requirement
let config = parse_config(
&KafkaConfig::command()
.mut_group("KafkaConfig", |group| group.id("ImporterConfig"))
.get_matches(),
);

let state = RocksStorageState::new("data/rocksdb".to_string(), TIMEOUT, Some(0.1)).context("failed to create rocksdb state")?;
let connector = config.init()?;

// Count total number of blocks first
/// Returns total count of blocks and transactions from RocksDB state
fn get_total_blocks_and_transactions(state: &RocksStorageState) -> (u64, u64) {
let total_blocks = state
.db
.property_value_cf(&state.blocks_by_number.handle(), ESTIMATE_NUM_KEYS)
Expand All @@ -66,6 +38,13 @@ async fn main() -> Result<(), anyhow::Error> {
.parse::<u64>()
.unwrap();

(total_blocks, total_transactions)
}

/// Creates progress bars for tracking block and transaction processing
fn create_progress_bar(state: &RocksStorageState) -> (ProgressBar, ProgressBar) {
let (total_blocks, total_transactions) = get_total_blocks_and_transactions(state);

tracing::info!(?total_transactions, "Estimated total transactions in db:");
let mb = indicatif::MultiProgress::new();
let tx_pb = mb.add(indicatif::ProgressBar::new(total_transactions));
Expand All @@ -81,6 +60,28 @@ async fn main() -> Result<(), anyhow::Error> {
b_pb.set_style(style);
b_pb.set_message("Blocks");

(b_pb, tx_pb)
}

/// Processes all transactions in a block and returns their event strings
fn process_block_events(block: BlockRocksdb) -> Vec<String> {
let timestamp = block.header.timestamp;
block
.transactions
.into_iter()
.flat_map(|tx| transaction_mined_rocks_db_to_events(timestamp, tx))
.map(|event| event.event_payload().unwrap())
.collect()
}

/// Main function that processes blockchain data and generates events
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
tracing_subscriber::fmt::init();
let state = RocksStorageState::new("data/rocksdb".to_string(), TIMEOUT, Some(0.1)).context("failed to create rocksdb state")?;

let (b_pb, tx_pb) = create_progress_bar(&state);

// Load last processed block number from file
let start_block = std::fs::read_to_string("last_processed_block")
.map(|s| s.trim().parse::<u64>().unwrap())
Expand All @@ -93,28 +94,42 @@ async fn main() -> Result<(), anyhow::Error> {
state.blocks_by_number.iter_start()
};

let mut days_since_0 = 0;
let mut event_batch = vec![];
for result in iter {
let (number, block) = result.context("failed to read block")?;
let block = block.into_inner();

let timestamp = block.header.timestamp;
if days_since_0 == 0 {
days_since_0 = timestamp.0 / 86_400;
}

let tx_count = block.transactions.len();

let block_events = block
.transactions
.into_iter()
.flat_map(|tx| transaction_mined_rocks_db_to_events(timestamp, tx));
let mut block_events = process_block_events(block);

event_batch.append(&mut block_events);

let mut buffer = connector.send_buffered(block_events.collect(), 30)?;
while let Some(res) = buffer.next().await {
if let Err(e) = res {
return log_and_err!(reason = e, "failed to send events");
}
}
tx_pb.inc(tx_count as u64);
b_pb.inc(1);
// Save current block number to file after processing
std::fs::write("last_processed_block", number.to_string())?;
if days_since_0 != timestamp.0 / 86_400 {
let date = chrono::Utc
.timestamp_opt(0, 0)
.earliest()
.unwrap()
.checked_add_days(chrono::Days::new(days_since_0))
.unwrap()
.date_naive();

days_since_0 = timestamp.0 / 86_400;
if !event_batch.is_empty() {
std::fs::write(format!("events/{}", date), event_batch.join("\n"))?;
}
std::fs::write("last_processed_block", number.to_string())?;
event_batch.clear();
}
}

tx_pb.finish_with_message("Done!");
Expand Down
2 changes: 1 addition & 1 deletion src/eth/storage/rocks/types/unix_time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::eth::primitives::UnixTime;
use crate::gen_newtype_from;

#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize, fake::Dummy)]
pub struct UnixTimeRocksdb(u64);
pub struct UnixTimeRocksdb(pub u64);

gen_newtype_from!(self = UnixTimeRocksdb, other = u64);

Expand Down

0 comments on commit ab3f10b

Please sign in to comment.