diff --git a/src/bin/historic_events_processor.rs b/src/bin/historic_events_processor.rs index ce776faa1..66d498a35 100644 --- a/src/bin/historic_events_processor.rs +++ b/src/bin/historic_events_processor.rs @@ -101,6 +101,7 @@ fn main() -> Result<(), anyhow::Error> { let mut hours_since_0 = 0; let mut event_batch = vec![]; + let mut offset = std::fs::read_to_string("last_offset").map(|s| s.trim().parse::().unwrap()).unwrap_or(0); for result in iter { let (number, block) = result.context("failed to read block")?; let block = block.into_inner(); @@ -124,13 +125,24 @@ fn main() -> Result<(), anyhow::Error> { hours_since_0 = timestamp.0 / 3600; if !event_batch.is_empty() { - let folder_path = format!("events/{}/{:02}/{:02}", date.year(), date.month(), date.day()); + let folder_path = format!( + "events/ledger_wallet_events/year={}/month={:02}/day={:02}/hour={:02}", + date.year(), + date.month(), + date.day(), + date.hour() + ); if !std::path::Path::new(&folder_path).exists() { std::fs::create_dir_all(&folder_path)?; } - std::fs::write(format!("{}/{:02}", folder_path, date.hour()), event_batch.join("\n"))?; + std::fs::write( + format!("{}/ledger_wallet_events+backfill+{:010}.json", folder_path, offset), + event_batch.join("\n"), + )?; + offset += event_batch.len(); } std::fs::write("last_processed_block", number.to_string())?; + std::fs::write("last_offset", offset.to_string())?; event_batch.clear(); } }