diff --git a/justfile b/justfile index 309052792..bdada2bb2 100644 --- a/justfile +++ b/justfile @@ -98,16 +98,16 @@ db-load-csv: echo "truncate blocks;" >> data/psql.txt echo "truncate transactions;" >> data/psql.txt echo "truncate logs;" >> data/psql.txt - echo "truncate topics" >> data/psql.txt - - ls -tr1 data/accounts-*.csv | xargs -I{} printf "\\\\copy accounts from '$(pwd)/%s' delimiter E'\\\\t' csv header null 'NULL';\n" "{}" >> data/psql.txt - ls -tr1 data/historical_nonces-*.csv | xargs -I{} printf "\\\\copy historical_nonces from '$(pwd)/%s' delimiter E'\\\\t' csv header null 'NULL';\n" "{}" >> data/psql.txt - ls -tr1 data/historical_balances-*.csv | xargs -I{} printf "\\\\copy historical_balances from '$(pwd)/%s' delimiter E'\\\\t' csv header null 'NULL';\n" "{}" >> data/psql.txt - ls -tr1 data/historical_slots-*.csv | xargs -I{} printf "\\\\copy historical_slots from '$(pwd)/%s' delimiter E'\\\\t' csv header null 'NULL';\n" "{}" >> data/psql.txt - ls -tr1 data/blocks-*.csv | xargs -I{} printf "\\\\copy blocks from '$(pwd)/%s' delimiter E'\\\\t' csv header null 'NULL';\n" "{}" >> data/psql.txt - ls -tr1 data/transactions-*.csv | xargs -I{} printf "\\\\copy transactions from '$(pwd)/%s' delimiter E'\\\\t' csv header null 'NULL';\n" "{}" >> data/psql.txt - ls -tr1 data/topics-*.csv | xargs -I{} printf "\\\\copy topics from '$(pwd)/%s' delimiter E'\\\\t' csv header null 'NULL';\n" "{}" >> data/psql.txt - ls -tr1 data/logs-*.csv | xargs -I{} printf "\\\\copy logs from '$(pwd)/%s' delimiter E'\\\\t' csv header null 'NULL';\n" "{}" >> data/psql.txt + echo "truncate topics;" >> data/psql.txt + + ls -tr1 data/accounts-*.csv | xargs -I{} printf "\\\\copy accounts from '$(pwd)/%s' delimiter E'\\\\t' csv header;\n" "{}" >> data/psql.txt + ls -tr1 data/historical_nonces-*.csv | xargs -I{} printf "\\\\copy historical_nonces from '$(pwd)/%s' delimiter E'\\\\t' csv header;\n" "{}" >> data/psql.txt + ls -tr1 data/historical_balances-*.csv | xargs -I{} printf "\\\\copy historical_balances from '$(pwd)/%s' delimiter E'\\\\t' csv header;\n" "{}" >> data/psql.txt + ls -tr1 data/historical_slots-*.csv | xargs -I{} printf "\\\\copy historical_slots from '$(pwd)/%s' delimiter E'\\\\t' csv header;\n" "{}" >> data/psql.txt + ls -tr1 data/blocks-*.csv | xargs -I{} printf "\\\\copy blocks from '$(pwd)/%s' delimiter E'\\\\t' csv header;\n" "{}" >> data/psql.txt + ls -tr1 data/transactions-*.csv | xargs -I{} printf "\\\\copy transactions from '$(pwd)/%s' delimiter E'\\\\t' csv header;\n" "{}" >> data/psql.txt + ls -tr1 data/logs-*.csv | xargs -I{} printf "\\\\copy logs from '$(pwd)/%s' delimiter E'\\\\t' csv header;\n" "{}" >> data/psql.txt + ls -tr1 data/topics-*.csv | xargs -I{} printf "\\\\copy topics from '$(pwd)/%s' delimiter E'\\\\t' csv header;\n" "{}" >> data/psql.txt cat data/psql.txt | pgcli -h localhost -u postgres -d stratus --less-chatty diff --git a/src/bin/importer_offline.rs b/src/bin/importer_offline.rs index 95adc82f9..27b455a49 100644 --- a/src/bin/importer_offline.rs +++ b/src/bin/importer_offline.rs @@ -53,7 +53,7 @@ async fn main() -> anyhow::Result<()> { let accounts = rpc_storage.read_initial_accounts().await?; if let Some(ref mut csv) = csv { for account in accounts.iter() { - csv.add_account(account.clone())?; + csv.add_initial_account(account.clone())?; } csv.flush()?; } diff --git a/src/eth/storage/csv/csv_exporter.rs b/src/eth/storage/csv/csv_exporter.rs index 75063da48..f77ecf3da 100644 --- a/src/eth/storage/csv/csv_exporter.rs +++ b/src/eth/storage/csv/csv_exporter.rs @@ -135,7 +135,7 @@ const TOPICS_HEADERS: [&str; 10] = [ /// Export CSV files in the same format of the PostgreSQL tables. pub struct CsvExporter { - staged_accounts: Vec, + staged_initial_accounts: Vec, staged_blocks: Vec, accounts_csv: csv::Writer, @@ -168,7 +168,7 @@ impl CsvExporter { pub fn new(number: BlockNumber) -> anyhow::Result { Ok(Self { staged_blocks: Vec::new(), - staged_accounts: Vec::new(), + staged_initial_accounts: Vec::new(), accounts_csv: csv_writer(ACCOUNTS_FILE, BlockNumber::ZERO, &ACCOUNTS_HEADERS)?, accounts_id: LastId::new_zero(ACCOUNTS_FILE), @@ -200,9 +200,9 @@ impl CsvExporter { // Stagers // ------------------------------------------------------------------------- - /// Add an account to be exported. - pub fn add_account(&mut self, account: Account) -> anyhow::Result<()> { - self.staged_accounts.push(account); + /// Add an initial account to be exported. + pub fn add_initial_account(&mut self, account: Account) -> anyhow::Result<()> { + self.staged_initial_accounts.push(account); Ok(()) } @@ -217,15 +217,15 @@ impl CsvExporter { // ------------------------------------------------------------------------- pub fn flush(&mut self) -> anyhow::Result<()> { // export accounts - let accounts = self.staged_accounts.drain(..).collect_vec(); - self.export_accounts(accounts)?; + let initial_accounts = self.staged_initial_accounts.drain(..).collect_vec(); + self.export_initial_accounts(initial_accounts)?; // export blocks let blocks = self.staged_blocks.drain(..).collect_vec(); for block in blocks { - self.export_account_changes(block.compact_account_changes(), block.number())?; + self.export_account_changes(*block.number(), block.compact_account_changes(), block.number())?; + self.export_block(block.header)?; self.export_transactions(block.transactions)?; - self.export_blocks(block.header)?; } // flush pending data @@ -256,9 +256,10 @@ impl CsvExporter { Ok(()) } - fn export_accounts(&mut self, accounts: Vec) -> anyhow::Result<()> { + fn export_initial_accounts(&mut self, accounts: Vec) -> anyhow::Result<()> { for account in accounts { self.accounts_id.value += 1; + let now = now(); let row = [ self.accounts_id.value.to_string(), // id account.address.to_string(), // address @@ -266,10 +267,10 @@ impl CsvExporter { account.balance.to_string(), // latest_balance account.nonce.to_string(), // latest_nonce "0".to_owned(), // creation_block - "0".to_owned(), // previous_balance - "0".to_owned(), // previous_nonce - now(), // created_at - now(), // updated_at + NULL.to_owned(), // previous_balance + NULL.to_owned(), // previous_nonce + now.clone(), // created_at + now, // updated_at ]; self.accounts_csv.write_record(row).context("failed to write csv transaction")?; } @@ -286,7 +287,7 @@ impl CsvExporter { // export data let now = now(); - let record = [ + let row = [ self.transactions_id.value.to_string(), // id tx.input.hash.to_string(), // hash tx.input.from.to_string(), // signer_address @@ -308,16 +309,15 @@ impl CsvExporter { now.clone(), // created_at now, // updated_at ]; - self.transactions_csv.write_record(record).context("failed to write csv transaction")?; + self.transactions_csv.write_record(row).context("failed to write csv transaction")?; } Ok(()) } - fn export_blocks(&mut self, block: BlockHeader) -> anyhow::Result<()> { + fn export_block(&mut self, block: BlockHeader) -> anyhow::Result<()> { self.blocks_id.value += 1; - let now = now(); - let record = [ + let row = [ self.blocks_id.value.to_string(), // id block.number.to_string(), // number block.hash.to_string(), // hash @@ -341,33 +341,34 @@ impl CsvExporter { now, // updated_at ]; - self.blocks_csv.write_record(record).context("failed to write csv block")?; + self.blocks_csv.write_record(row).context("failed to write csv block")?; Ok(()) } - fn export_account_changes(&mut self, changes: Vec, block_number: &BlockNumber) -> anyhow::Result<()> { + fn export_account_changes(&mut self, number: BlockNumber, changes: Vec, block_number: &BlockNumber) -> anyhow::Result<()> { for change in changes { + let now = now(); + // accounts if change.is_account_creation() { self.accounts_id.value += 1; - let now = now(); let change_bytecode = change .bytecode .take_ref() .and_then(|x| x.clone().map(|bytes| bytes.to_string())) .unwrap_or(NULL.to_string()); let row = [ - self.accounts_id.value.to_string(), // id - change.address.to_string(), // address - change_bytecode, // bytecode - change.balance.take_ref().map(|x| x.to_string()).unwrap_or_default(), // latest_balance - change.nonce.take_ref().map(|x| x.to_string()).unwrap_or_default(), // latest_nonce - "0".to_owned(), // creation_block - "0".to_owned(), // previous_balance - "0".to_owned(), // previous_nonce - now.clone(), // created_at - now, // updated_at + self.accounts_id.value.to_string(), // id + change.address.to_string(), // address + change_bytecode, // bytecode + change.balance.take_ref().map(|x| x.to_string()).unwrap_or_default(), // latest_balance + change.nonce.take_ref().map(|x| x.to_string()).unwrap_or_default(), // latest_nonce + number.to_string().to_owned(), // creation_block + change.balance.take_original_ref().map(|x| x.to_string()).unwrap_or_else(|| NULL.to_owned()), // previous_balance + change.nonce.take_original_ref().map(|x| x.to_string()).unwrap_or_else(|| NULL.to_owned()), // previous_nonce + now.clone(), // created_at + now.clone(), // updated_at ]; self.accounts_csv.write_record(row).context("failed to write csv transaction")?; } @@ -375,29 +376,26 @@ impl CsvExporter { // historical_nonces if let Some(nonce) = change.nonce.take_modified() { self.historical_nonces_id.value += 1; - - let now = now(); let row = [ self.historical_balances_id.value.to_string(), // id change.address.to_string(), // address nonce.to_string(), // nonce block_number.to_string(), // block_number now.clone(), // updated_at - now, // created_at + now.clone(), // created_at ]; self.historical_nonces_csv.write_record(row).context("failed to write csv historical nonces")?; } // historical_balances if let Some(balance) = change.balance.take_modified() { self.historical_balances_id.value += 1; - let now = now(); let row = [ self.historical_balances_id.value.to_string(), // id change.address.to_string(), // address balance.to_string(), // balance block_number.to_string(), // block_number now.clone(), // updated_at - now, // created_at + now.clone(), // created_at ]; self.historical_balances_csv .write_record(row) @@ -408,7 +406,6 @@ impl CsvExporter { for slot in change.slots.into_values() { if let Some(slot) = slot.take_modified() { self.historical_slots_id.value += 1; - let now = now(); let row = [ self.historical_slots_id.value.to_string(), // id slot.index.to_string(), // idx @@ -416,7 +413,7 @@ impl CsvExporter { block_number.to_string(), // block_number change.address.to_string(), // account_address now.clone(), // updated_at - now, // created_at + now.clone(), // created_at ]; self.historical_slots_csv.write_record(row).context("failed to write csv historical slots")?; } @@ -429,7 +426,7 @@ impl CsvExporter { for log in logs { self.logs_id.value += 1; let now = now(); - let record = [ + let row = [ self.logs_id.value.to_string(), // id log.address().to_string(), // address log.log.data.to_string(), // data @@ -441,7 +438,7 @@ impl CsvExporter { now.clone(), // created_at now, // updated_at ]; - self.logs_csv.write_record(record).context("failed to write csv transaction log")?; + self.logs_csv.write_record(row).context("failed to write csv transaction log")?; self.export_topics(log)?; } @@ -452,9 +449,8 @@ impl CsvExporter { let topics = log.log.topics; for (idx, topic) in topics.into_iter().enumerate() { self.topics_id.value += 1; - let now = now(); - let record = [ + let row = [ self.topics_id.value.to_string(), // id topic.to_string(), // topic log.transaction_hash.to_string(), // transaction_hash @@ -466,7 +462,7 @@ impl CsvExporter { now.clone(), // created_at now, // updated_at ]; - self.topics_csv.write_record(record).context("failed to write csv transaction topic")?; + self.topics_csv.write_record(row).context("failed to write csv transaction topic")?; } Ok(()) } @@ -515,7 +511,7 @@ fn csv_writer(base_path: &'static str, number: BlockNumber, headers: &[&'static let mut writer = csv::WriterBuilder::new() .has_headers(true) .delimiter(b'\t') - .quote_style(csv::QuoteStyle::Always) + .quote_style(csv::QuoteStyle::Necessary) .buffer_capacity(buffer_size.as_u64() as usize) .from_path(path) .context("failed to create csv writer")?; @@ -527,6 +523,5 @@ fn csv_writer(base_path: &'static str, number: BlockNumber, headers: &[&'static /// Returns the current date formatted for the CSV file. fn now() -> String { - let now = chrono::Utc::now(); - now.format("%Y-%m-%d %H:%M:%S%.6f").to_string() + chrono::Utc::now().format("%Y-%m-%d %H:%M:%S%.6f").to_string() }