diff --git a/src/bin/importer_offline.rs b/src/bin/importer_offline.rs index 4d49efe18..7da6f5890 100644 --- a/src/bin/importer_offline.rs +++ b/src/bin/importer_offline.rs @@ -32,6 +32,9 @@ const BACKLOG_SIZE: usize = 50; /// Number of blocks processed in memory before data is flushed to temporary storage and CSV files. const FLUSH_INTERVAL_IN_BLOCKS: u64 = 100; +/// The maximum amount of blocks in each CSV chunk file. +const CSV_CHUNKING_BLOCKS_INTERVAL: u64 = 250_000; + type BacklogTask = (Vec, Vec); #[tokio::main] @@ -118,14 +121,25 @@ async fn execute_block_importer( Some(ref mut csv) => { let block = executor.import_external_to_temp(block, &mut receipts).await?; let number = *block.number(); + csv.add_block(block)?; - // flush when reached the specified interval or is the last block in the loop - let should_flush = (number.as_u64() % FLUSH_INTERVAL_IN_BLOCKS == 0) || (block_index == block_last_index); + let is_last_block = block_index == block_last_index; + let is_chunk_interval_end = number.as_u64() % CSV_CHUNKING_BLOCKS_INTERVAL == 0; + let is_flush_interval_end = number.as_u64() % FLUSH_INTERVAL_IN_BLOCKS == 0; + + let should_chunk_csv_files = is_chunk_interval_end && !is_last_block; + let should_flush = is_flush_interval_end || is_last_block || should_chunk_csv_files; + if should_flush { csv.flush()?; stratus_storage.flush_temp().await?; } + + if should_chunk_csv_files { + tracing::info!("Chunk ended at block number {number}, starting next CSV chunks for the next block"); + csv.finish_current_chunks(number)?; + } } // when not exporting to csv, persist the entire block to permanent immediately None => { diff --git a/src/eth/storage/csv/csv_exporter.rs b/src/eth/storage/csv/csv_exporter.rs index f3a7d6dcf..387e25dec 100644 --- a/src/eth/storage/csv/csv_exporter.rs +++ b/src/eth/storage/csv/csv_exporter.rs @@ -169,32 +169,43 @@ pub struct CsvExporter { impl CsvExporter { /// Creates a new [`CsvExporter`]. pub fn new(number: BlockNumber) -> anyhow::Result { + let CsvFiles { + accounts_csv, + historical_slots_csv, + historical_balances_csv, + historical_nonces_csv, + transactions_csv, + blocks_csv, + logs_csv, + topics_csv, + } = CsvFiles::load(number)?; + Ok(Self { staged_blocks: Vec::new(), staged_initial_accounts: Vec::new(), - accounts_csv: csv_writer(ACCOUNTS_FILE, BlockNumber::ZERO, &ACCOUNTS_HEADERS)?, + accounts_csv, accounts_id: LastId::new_zero(ACCOUNTS_FILE), - historical_slots_csv: csv_writer(HISTORICAL_SLOTS_FILE, number, &HISTORICAL_SLOTS_HEADERS)?, + historical_slots_csv, historical_slots_id: LastId::new(HISTORICAL_SLOTS_FILE)?, - historical_balances_csv: csv_writer(HISTORICAL_BALANCES_FILE, number, &HISTORICAL_BALANCES_HEADERS)?, + historical_balances_csv, historical_balances_id: LastId::new(HISTORICAL_BALANCES_FILE)?, - historical_nonces_csv: csv_writer(HISTORICAL_NONCES_FILE, number, &HISTORICAL_NONCES_HEADERS)?, + historical_nonces_csv, historical_nonces_id: LastId::new(HISTORICAL_NONCES_FILE)?, - transactions_csv: csv_writer(TRANSACTIONS_FILE, number, &TRANSACTIONS_HEADERS)?, + transactions_csv, transactions_id: LastId::new(TRANSACTIONS_FILE)?, - blocks_csv: csv_writer(BLOCKS_FILE, number, &BLOCKS_HEADERS)?, + blocks_csv, blocks_id: LastId::new(BLOCKS_FILE)?, - logs_csv: csv_writer(LOGS_FILE, number, &LOGS_HEADERS)?, + logs_csv, logs_id: LastId::new(LOGS_FILE)?, - topics_csv: csv_writer(TOPICS_FILE, number, &TOPICS_HEADERS)?, + topics_csv, topics_id: LastId::new(TOPICS_FILE)?, }) } @@ -259,6 +270,33 @@ impl CsvExporter { Ok(()) } + /// Close current files and creates new CSV chunks to write on instead. + pub fn finish_current_chunks(&mut self, block_number: BlockNumber) -> anyhow::Result<()> { + self.flush()?; + + let CsvFiles { + accounts_csv, + historical_slots_csv, + historical_balances_csv, + historical_nonces_csv, + transactions_csv, + blocks_csv, + logs_csv, + topics_csv, + } = CsvFiles::load(block_number)?; + + self.accounts_csv = accounts_csv; + self.historical_slots_csv = historical_slots_csv; + self.historical_balances_csv = historical_balances_csv; + self.historical_nonces_csv = historical_nonces_csv; + self.transactions_csv = transactions_csv; + self.blocks_csv = blocks_csv; + self.logs_csv = logs_csv; + self.topics_csv = topics_csv; + + Ok(()) + } + fn export_initial_accounts(&mut self, accounts: Vec) -> anyhow::Result<()> { for account in accounts { self.accounts_id.value += 1; @@ -506,6 +544,32 @@ impl LastId { } } +struct CsvFiles { + accounts_csv: csv::Writer, + historical_slots_csv: csv::Writer, + historical_balances_csv: csv::Writer, + historical_nonces_csv: csv::Writer, + transactions_csv: csv::Writer, + blocks_csv: csv::Writer, + logs_csv: csv::Writer, + topics_csv: csv::Writer, +} + +impl CsvFiles { + fn load(block_number: BlockNumber) -> anyhow::Result { + Ok(Self { + accounts_csv: csv_writer(ACCOUNTS_FILE, block_number, &ACCOUNTS_HEADERS)?, + historical_slots_csv: csv_writer(HISTORICAL_SLOTS_FILE, block_number, &HISTORICAL_SLOTS_HEADERS)?, + historical_balances_csv: csv_writer(HISTORICAL_BALANCES_FILE, block_number, &HISTORICAL_BALANCES_HEADERS)?, + historical_nonces_csv: csv_writer(HISTORICAL_NONCES_FILE, block_number, &HISTORICAL_NONCES_HEADERS)?, + transactions_csv: csv_writer(TRANSACTIONS_FILE, block_number, &TRANSACTIONS_HEADERS)?, + blocks_csv: csv_writer(BLOCKS_FILE, block_number, &BLOCKS_HEADERS)?, + logs_csv: csv_writer(LOGS_FILE, block_number, &LOGS_HEADERS)?, + topics_csv: csv_writer(TOPICS_FILE, block_number, &TOPICS_HEADERS)?, + }) + } +} + /// Creates a new CSV writer at the specified path. If the file exists, it will overwrite it. fn csv_writer(base_path: &'static str, number: BlockNumber, headers: &[&'static str]) -> anyhow::Result> { let path = format!("{}-{}.csv", base_path, number);