Skip to content

Commit

Permalink
feat: handling null fields in csv (#408)
Browse files Browse the repository at this point in the history
  • Loading branch information
dinhani-cw authored Mar 19, 2024
1 parent f2f79a7 commit 79c6344
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 58 deletions.
20 changes: 10 additions & 10 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -98,16 +98,16 @@ db-load-csv:
echo "truncate blocks;" >> data/psql.txt
echo "truncate transactions;" >> data/psql.txt
echo "truncate logs;" >> data/psql.txt
echo "truncate topics" >> data/psql.txt

ls -tr1 data/accounts-*.csv | xargs -I{} printf "\\\\copy accounts from '$(pwd)/%s' delimiter E'\\\\t' csv header null 'NULL';\n" "{}" >> data/psql.txt
ls -tr1 data/historical_nonces-*.csv | xargs -I{} printf "\\\\copy historical_nonces from '$(pwd)/%s' delimiter E'\\\\t' csv header null 'NULL';\n" "{}" >> data/psql.txt
ls -tr1 data/historical_balances-*.csv | xargs -I{} printf "\\\\copy historical_balances from '$(pwd)/%s' delimiter E'\\\\t' csv header null 'NULL';\n" "{}" >> data/psql.txt
ls -tr1 data/historical_slots-*.csv | xargs -I{} printf "\\\\copy historical_slots from '$(pwd)/%s' delimiter E'\\\\t' csv header null 'NULL';\n" "{}" >> data/psql.txt
ls -tr1 data/blocks-*.csv | xargs -I{} printf "\\\\copy blocks from '$(pwd)/%s' delimiter E'\\\\t' csv header null 'NULL';\n" "{}" >> data/psql.txt
ls -tr1 data/transactions-*.csv | xargs -I{} printf "\\\\copy transactions from '$(pwd)/%s' delimiter E'\\\\t' csv header null 'NULL';\n" "{}" >> data/psql.txt
ls -tr1 data/topics-*.csv | xargs -I{} printf "\\\\copy topics from '$(pwd)/%s' delimiter E'\\\\t' csv header null 'NULL';\n" "{}" >> data/psql.txt
ls -tr1 data/logs-*.csv | xargs -I{} printf "\\\\copy logs from '$(pwd)/%s' delimiter E'\\\\t' csv header null 'NULL';\n" "{}" >> data/psql.txt
echo "truncate topics;" >> data/psql.txt

ls -tr1 data/accounts-*.csv | xargs -I{} printf "\\\\copy accounts from '$(pwd)/%s' delimiter E'\\\\t' csv header;\n" "{}" >> data/psql.txt
ls -tr1 data/historical_nonces-*.csv | xargs -I{} printf "\\\\copy historical_nonces from '$(pwd)/%s' delimiter E'\\\\t' csv header;\n" "{}" >> data/psql.txt
ls -tr1 data/historical_balances-*.csv | xargs -I{} printf "\\\\copy historical_balances from '$(pwd)/%s' delimiter E'\\\\t' csv header;\n" "{}" >> data/psql.txt
ls -tr1 data/historical_slots-*.csv | xargs -I{} printf "\\\\copy historical_slots from '$(pwd)/%s' delimiter E'\\\\t' csv header;\n" "{}" >> data/psql.txt
ls -tr1 data/blocks-*.csv | xargs -I{} printf "\\\\copy blocks from '$(pwd)/%s' delimiter E'\\\\t' csv header;\n" "{}" >> data/psql.txt
ls -tr1 data/transactions-*.csv | xargs -I{} printf "\\\\copy transactions from '$(pwd)/%s' delimiter E'\\\\t' csv header;\n" "{}" >> data/psql.txt
ls -tr1 data/logs-*.csv | xargs -I{} printf "\\\\copy logs from '$(pwd)/%s' delimiter E'\\\\t' csv header;\n" "{}" >> data/psql.txt
ls -tr1 data/topics-*.csv | xargs -I{} printf "\\\\copy topics from '$(pwd)/%s' delimiter E'\\\\t' csv header;\n" "{}" >> data/psql.txt

cat data/psql.txt | pgcli -h localhost -u postgres -d stratus --less-chatty

Expand Down
2 changes: 1 addition & 1 deletion src/bin/importer_offline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async fn main() -> anyhow::Result<()> {
let accounts = rpc_storage.read_initial_accounts().await?;
if let Some(ref mut csv) = csv {
for account in accounts.iter() {
csv.add_account(account.clone())?;
csv.add_initial_account(account.clone())?;
}
csv.flush()?;
}
Expand Down
89 changes: 42 additions & 47 deletions src/eth/storage/csv/csv_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ const TOPICS_HEADERS: [&str; 10] = [

/// Export CSV files in the same format of the PostgreSQL tables.
pub struct CsvExporter {
staged_accounts: Vec<Account>,
staged_initial_accounts: Vec<Account>,
staged_blocks: Vec<Block>,

accounts_csv: csv::Writer<File>,
Expand Down Expand Up @@ -168,7 +168,7 @@ impl CsvExporter {
pub fn new(number: BlockNumber) -> anyhow::Result<Self> {
Ok(Self {
staged_blocks: Vec::new(),
staged_accounts: Vec::new(),
staged_initial_accounts: Vec::new(),

accounts_csv: csv_writer(ACCOUNTS_FILE, BlockNumber::ZERO, &ACCOUNTS_HEADERS)?,
accounts_id: LastId::new_zero(ACCOUNTS_FILE),
Expand Down Expand Up @@ -200,9 +200,9 @@ impl CsvExporter {
// Stagers
// -------------------------------------------------------------------------

/// Add an account to be exported.
pub fn add_account(&mut self, account: Account) -> anyhow::Result<()> {
self.staged_accounts.push(account);
/// Add an initial account to be exported.
pub fn add_initial_account(&mut self, account: Account) -> anyhow::Result<()> {
self.staged_initial_accounts.push(account);
Ok(())
}

Expand All @@ -217,15 +217,15 @@ impl CsvExporter {
// -------------------------------------------------------------------------
pub fn flush(&mut self) -> anyhow::Result<()> {
// export accounts
let accounts = self.staged_accounts.drain(..).collect_vec();
self.export_accounts(accounts)?;
let initial_accounts = self.staged_initial_accounts.drain(..).collect_vec();
self.export_initial_accounts(initial_accounts)?;

// export blocks
let blocks = self.staged_blocks.drain(..).collect_vec();
for block in blocks {
self.export_account_changes(block.compact_account_changes(), block.number())?;
self.export_account_changes(*block.number(), block.compact_account_changes(), block.number())?;
self.export_block(block.header)?;
self.export_transactions(block.transactions)?;
self.export_blocks(block.header)?;
}

// flush pending data
Expand Down Expand Up @@ -256,20 +256,21 @@ impl CsvExporter {
Ok(())
}

fn export_accounts(&mut self, accounts: Vec<Account>) -> anyhow::Result<()> {
fn export_initial_accounts(&mut self, accounts: Vec<Account>) -> anyhow::Result<()> {
for account in accounts {
self.accounts_id.value += 1;
let now = now();
let row = [
self.accounts_id.value.to_string(), // id
account.address.to_string(), // address
account.bytecode.map(|x| x.to_string()).unwrap_or(NULL.to_string()), // bytecode
account.balance.to_string(), // latest_balance
account.nonce.to_string(), // latest_nonce
"0".to_owned(), // creation_block
"0".to_owned(), // previous_balance
"0".to_owned(), // previous_nonce
now(), // created_at
now(), // updated_at
NULL.to_owned(), // previous_balance
NULL.to_owned(), // previous_nonce
now.clone(), // created_at
now, // updated_at
];
self.accounts_csv.write_record(row).context("failed to write csv transaction")?;
}
Expand All @@ -286,7 +287,7 @@ impl CsvExporter {

// export data
let now = now();
let record = [
let row = [
self.transactions_id.value.to_string(), // id
tx.input.hash.to_string(), // hash
tx.input.from.to_string(), // signer_address
Expand All @@ -308,16 +309,15 @@ impl CsvExporter {
now.clone(), // created_at
now, // updated_at
];
self.transactions_csv.write_record(record).context("failed to write csv transaction")?;
self.transactions_csv.write_record(row).context("failed to write csv transaction")?;
}
Ok(())
}

fn export_blocks(&mut self, block: BlockHeader) -> anyhow::Result<()> {
fn export_block(&mut self, block: BlockHeader) -> anyhow::Result<()> {
self.blocks_id.value += 1;

let now = now();
let record = [
let row = [
self.blocks_id.value.to_string(), // id
block.number.to_string(), // number
block.hash.to_string(), // hash
Expand All @@ -341,63 +341,61 @@ impl CsvExporter {
now, // updated_at
];

self.blocks_csv.write_record(record).context("failed to write csv block")?;
self.blocks_csv.write_record(row).context("failed to write csv block")?;

Ok(())
}

fn export_account_changes(&mut self, changes: Vec<ExecutionAccountChanges>, block_number: &BlockNumber) -> anyhow::Result<()> {
fn export_account_changes(&mut self, number: BlockNumber, changes: Vec<ExecutionAccountChanges>, block_number: &BlockNumber) -> anyhow::Result<()> {
for change in changes {
let now = now();

// accounts
if change.is_account_creation() {
self.accounts_id.value += 1;
let now = now();
let change_bytecode = change
.bytecode
.take_ref()
.and_then(|x| x.clone().map(|bytes| bytes.to_string()))
.unwrap_or(NULL.to_string());
let row = [
self.accounts_id.value.to_string(), // id
change.address.to_string(), // address
change_bytecode, // bytecode
change.balance.take_ref().map(|x| x.to_string()).unwrap_or_default(), // latest_balance
change.nonce.take_ref().map(|x| x.to_string()).unwrap_or_default(), // latest_nonce
"0".to_owned(), // creation_block
"0".to_owned(), // previous_balance
"0".to_owned(), // previous_nonce
now.clone(), // created_at
now, // updated_at
self.accounts_id.value.to_string(), // id
change.address.to_string(), // address
change_bytecode, // bytecode
change.balance.take_ref().map(|x| x.to_string()).unwrap_or_default(), // latest_balance
change.nonce.take_ref().map(|x| x.to_string()).unwrap_or_default(), // latest_nonce
number.to_string().to_owned(), // creation_block
change.balance.take_original_ref().map(|x| x.to_string()).unwrap_or_else(|| NULL.to_owned()), // previous_balance
change.nonce.take_original_ref().map(|x| x.to_string()).unwrap_or_else(|| NULL.to_owned()), // previous_nonce
now.clone(), // created_at
now.clone(), // updated_at
];
self.accounts_csv.write_record(row).context("failed to write csv transaction")?;
}

// historical_nonces
if let Some(nonce) = change.nonce.take_modified() {
self.historical_nonces_id.value += 1;

let now = now();
let row = [
self.historical_balances_id.value.to_string(), // id
change.address.to_string(), // address
nonce.to_string(), // nonce
block_number.to_string(), // block_number
now.clone(), // updated_at
now, // created_at
now.clone(), // created_at
];
self.historical_nonces_csv.write_record(row).context("failed to write csv historical nonces")?;
}
// historical_balances
if let Some(balance) = change.balance.take_modified() {
self.historical_balances_id.value += 1;
let now = now();
let row = [
self.historical_balances_id.value.to_string(), // id
change.address.to_string(), // address
balance.to_string(), // balance
block_number.to_string(), // block_number
now.clone(), // updated_at
now, // created_at
now.clone(), // created_at
];
self.historical_balances_csv
.write_record(row)
Expand All @@ -408,15 +406,14 @@ impl CsvExporter {
for slot in change.slots.into_values() {
if let Some(slot) = slot.take_modified() {
self.historical_slots_id.value += 1;
let now = now();
let row = [
self.historical_slots_id.value.to_string(), // id
slot.index.to_string(), // idx
slot.value.to_string(), // value
block_number.to_string(), // block_number
change.address.to_string(), // account_address
now.clone(), // updated_at
now, // created_at
now.clone(), // created_at
];
self.historical_slots_csv.write_record(row).context("failed to write csv historical slots")?;
}
Expand All @@ -429,7 +426,7 @@ impl CsvExporter {
for log in logs {
self.logs_id.value += 1;
let now = now();
let record = [
let row = [
self.logs_id.value.to_string(), // id
log.address().to_string(), // address
log.log.data.to_string(), // data
Expand All @@ -441,7 +438,7 @@ impl CsvExporter {
now.clone(), // created_at
now, // updated_at
];
self.logs_csv.write_record(record).context("failed to write csv transaction log")?;
self.logs_csv.write_record(row).context("failed to write csv transaction log")?;

self.export_topics(log)?;
}
Expand All @@ -452,9 +449,8 @@ impl CsvExporter {
let topics = log.log.topics;
for (idx, topic) in topics.into_iter().enumerate() {
self.topics_id.value += 1;

let now = now();
let record = [
let row = [
self.topics_id.value.to_string(), // id
topic.to_string(), // topic
log.transaction_hash.to_string(), // transaction_hash
Expand All @@ -466,7 +462,7 @@ impl CsvExporter {
now.clone(), // created_at
now, // updated_at
];
self.topics_csv.write_record(record).context("failed to write csv transaction topic")?;
self.topics_csv.write_record(row).context("failed to write csv transaction topic")?;
}
Ok(())
}
Expand Down Expand Up @@ -515,7 +511,7 @@ fn csv_writer(base_path: &'static str, number: BlockNumber, headers: &[&'static
let mut writer = csv::WriterBuilder::new()
.has_headers(true)
.delimiter(b'\t')
.quote_style(csv::QuoteStyle::Always)
.quote_style(csv::QuoteStyle::Necessary)
.buffer_capacity(buffer_size.as_u64() as usize)
.from_path(path)
.context("failed to create csv writer")?;
Expand All @@ -527,6 +523,5 @@ fn csv_writer(base_path: &'static str, number: BlockNumber, headers: &[&'static

/// Returns the current date formatted for the CSV file.
fn now() -> String {
let now = chrono::Utc::now();
now.format("%Y-%m-%d %H:%M:%S%.6f").to_string()
chrono::Utc::now().format("%Y-%m-%d %H:%M:%S%.6f").to_string()
}

0 comments on commit 79c6344

Please sign in to comment.