Skip to content

Commit

Permalink
csv-exporter: feat: chunk CSV files (#460)
Browse files Browse the repository at this point in the history
Co-authored-by: Bruno Antonieto <[email protected]>
  • Loading branch information
marcospb19-cw and brunoantonieto-cw authored Mar 27, 2024
1 parent 7681444 commit 46c6821
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 10 deletions.
18 changes: 16 additions & 2 deletions src/bin/importer_offline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ const BACKLOG_SIZE: usize = 50;
/// Number of blocks processed in memory before data is flushed to temporary storage and CSV files.
const FLUSH_INTERVAL_IN_BLOCKS: u64 = 100;

/// The maximum amount of blocks in each CSV chunk file.
const CSV_CHUNKING_BLOCKS_INTERVAL: u64 = 250_000;

type BacklogTask = (Vec<ExternalBlock>, Vec<ExternalReceipt>);

#[tokio::main]
Expand Down Expand Up @@ -118,14 +121,25 @@ async fn execute_block_importer(
Some(ref mut csv) => {
let block = executor.import_external_to_temp(block, &mut receipts).await?;
let number = *block.number();

csv.add_block(block)?;

// flush when reached the specified interval or is the last block in the loop
let should_flush = (number.as_u64() % FLUSH_INTERVAL_IN_BLOCKS == 0) || (block_index == block_last_index);
let is_last_block = block_index == block_last_index;
let is_chunk_interval_end = number.as_u64() % CSV_CHUNKING_BLOCKS_INTERVAL == 0;
let is_flush_interval_end = number.as_u64() % FLUSH_INTERVAL_IN_BLOCKS == 0;

let should_chunk_csv_files = is_chunk_interval_end && !is_last_block;
let should_flush = is_flush_interval_end || is_last_block || should_chunk_csv_files;

if should_flush {
csv.flush()?;
stratus_storage.flush_temp().await?;
}

if should_chunk_csv_files {
tracing::info!("Chunk ended at block number {number}, starting next CSV chunks for the next block");
csv.finish_current_chunks(number)?;
}
}
// when not exporting to csv, persist the entire block to permanent immediately
None => {
Expand Down
80 changes: 72 additions & 8 deletions src/eth/storage/csv/csv_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,32 +169,43 @@ pub struct CsvExporter {
impl CsvExporter {
/// Creates a new [`CsvExporter`].
pub fn new(number: BlockNumber) -> anyhow::Result<Self> {
let CsvFiles {
accounts_csv,
historical_slots_csv,
historical_balances_csv,
historical_nonces_csv,
transactions_csv,
blocks_csv,
logs_csv,
topics_csv,
} = CsvFiles::load(number)?;

Ok(Self {
staged_blocks: Vec::new(),
staged_initial_accounts: Vec::new(),

accounts_csv: csv_writer(ACCOUNTS_FILE, BlockNumber::ZERO, &ACCOUNTS_HEADERS)?,
accounts_csv,
accounts_id: LastId::new_zero(ACCOUNTS_FILE),

historical_slots_csv: csv_writer(HISTORICAL_SLOTS_FILE, number, &HISTORICAL_SLOTS_HEADERS)?,
historical_slots_csv,
historical_slots_id: LastId::new(HISTORICAL_SLOTS_FILE)?,

historical_balances_csv: csv_writer(HISTORICAL_BALANCES_FILE, number, &HISTORICAL_BALANCES_HEADERS)?,
historical_balances_csv,
historical_balances_id: LastId::new(HISTORICAL_BALANCES_FILE)?,

historical_nonces_csv: csv_writer(HISTORICAL_NONCES_FILE, number, &HISTORICAL_NONCES_HEADERS)?,
historical_nonces_csv,
historical_nonces_id: LastId::new(HISTORICAL_NONCES_FILE)?,

transactions_csv: csv_writer(TRANSACTIONS_FILE, number, &TRANSACTIONS_HEADERS)?,
transactions_csv,
transactions_id: LastId::new(TRANSACTIONS_FILE)?,

blocks_csv: csv_writer(BLOCKS_FILE, number, &BLOCKS_HEADERS)?,
blocks_csv,
blocks_id: LastId::new(BLOCKS_FILE)?,

logs_csv: csv_writer(LOGS_FILE, number, &LOGS_HEADERS)?,
logs_csv,
logs_id: LastId::new(LOGS_FILE)?,

topics_csv: csv_writer(TOPICS_FILE, number, &TOPICS_HEADERS)?,
topics_csv,
topics_id: LastId::new(TOPICS_FILE)?,
})
}
Expand Down Expand Up @@ -259,6 +270,33 @@ impl CsvExporter {
Ok(())
}

/// Close current files and creates new CSV chunks to write on instead.
pub fn finish_current_chunks(&mut self, block_number: BlockNumber) -> anyhow::Result<()> {
self.flush()?;

let CsvFiles {
accounts_csv,
historical_slots_csv,
historical_balances_csv,
historical_nonces_csv,
transactions_csv,
blocks_csv,
logs_csv,
topics_csv,
} = CsvFiles::load(block_number)?;

self.accounts_csv = accounts_csv;
self.historical_slots_csv = historical_slots_csv;
self.historical_balances_csv = historical_balances_csv;
self.historical_nonces_csv = historical_nonces_csv;
self.transactions_csv = transactions_csv;
self.blocks_csv = blocks_csv;
self.logs_csv = logs_csv;
self.topics_csv = topics_csv;

Ok(())
}

fn export_initial_accounts(&mut self, accounts: Vec<Account>) -> anyhow::Result<()> {
for account in accounts {
self.accounts_id.value += 1;
Expand Down Expand Up @@ -506,6 +544,32 @@ impl LastId {
}
}

struct CsvFiles {
accounts_csv: csv::Writer<File>,
historical_slots_csv: csv::Writer<File>,
historical_balances_csv: csv::Writer<File>,
historical_nonces_csv: csv::Writer<File>,
transactions_csv: csv::Writer<File>,
blocks_csv: csv::Writer<File>,
logs_csv: csv::Writer<File>,
topics_csv: csv::Writer<File>,
}

impl CsvFiles {
fn load(block_number: BlockNumber) -> anyhow::Result<Self> {
Ok(Self {
accounts_csv: csv_writer(ACCOUNTS_FILE, block_number, &ACCOUNTS_HEADERS)?,
historical_slots_csv: csv_writer(HISTORICAL_SLOTS_FILE, block_number, &HISTORICAL_SLOTS_HEADERS)?,
historical_balances_csv: csv_writer(HISTORICAL_BALANCES_FILE, block_number, &HISTORICAL_BALANCES_HEADERS)?,
historical_nonces_csv: csv_writer(HISTORICAL_NONCES_FILE, block_number, &HISTORICAL_NONCES_HEADERS)?,
transactions_csv: csv_writer(TRANSACTIONS_FILE, block_number, &TRANSACTIONS_HEADERS)?,
blocks_csv: csv_writer(BLOCKS_FILE, block_number, &BLOCKS_HEADERS)?,
logs_csv: csv_writer(LOGS_FILE, block_number, &LOGS_HEADERS)?,
topics_csv: csv_writer(TOPICS_FILE, block_number, &TOPICS_HEADERS)?,
})
}
}

/// Creates a new CSV writer at the specified path. If the file exists, it will overwrite it.
fn csv_writer(base_path: &'static str, number: BlockNumber, headers: &[&'static str]) -> anyhow::Result<csv::Writer<File>> {
let path = format!("{}-{}.csv", base_path, number);
Expand Down

0 comments on commit 46c6821

Please sign in to comment.