Skip to content

Commit

Permalink
ref: split run function (#37)
Browse files Browse the repository at this point in the history
* ref: tweak `L1Fetcher` metrics

* ref: split `L1Fetcher::run`

* ref: saturate metrics with values stored in snapshot

Allows for the metrics to be printed out at the very start

* Apply suggestions from code review

Co-authored-by: Tuomas Mäkinen <[email protected]>

---------

Co-authored-by: Tuomas Mäkinen <[email protected]>
  • Loading branch information
zeapoz and tuommaki authored Oct 25, 2023
1 parent 2296a66 commit 4b8fcd5
Showing 1 changed file with 148 additions and 111 deletions.
259 changes: 148 additions & 111 deletions src/l1_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,19 @@ struct L1Metrics {
latest_l1_block_nbr: u64,
latest_l2_block_nbr: u64,

first_l1_block: Option<u64>,
first_l1_block: u64,
last_l1_block: u64,
}

impl L1Metrics {
fn print(&mut self) {
if self.first_l1_block.is_none()
|| self.latest_l1_block_nbr == 0
|| self.latest_l2_block_nbr == 0
{
if self.latest_l1_block_nbr == 0 {
return;
}

let first_l1_block = self.first_l1_block.unwrap();
let progress = {
let total = self.last_l1_block - first_l1_block;
let cur = self.latest_l1_block_nbr - first_l1_block;
let total = self.last_l1_block - self.first_l1_block;
let cur = self.latest_l1_block_nbr - self.first_l1_block;
// If polling past `last_l1_block`, stop at 100%.
let perc = std::cmp::min((cur * 100) / total, 100);
format!("{perc:>2}%")
Expand All @@ -83,6 +79,7 @@ pub struct L1Fetcher {
contract: Contract,
config: L1FetcherOptions,
snapshot: Option<Arc<Mutex<StateSnapshot>>>,
metrics: Arc<Mutex<L1Metrics>>,
}

impl L1Fetcher {
Expand All @@ -96,15 +93,17 @@ impl L1Fetcher {
let abi_file = std::fs::File::open("./IZkSync.json")?;
let contract = Contract::load(abi_file)?;

let metrics = Arc::new(Mutex::new(L1Metrics::default()));

Ok(L1Fetcher {
provider,
contract,
config,
snapshot,
metrics,
})
}

#[allow(clippy::too_many_lines)]
pub async fn run(&self, sink: mpsc::Sender<CommitBlockInfoV1>) -> Result<()> {
// Start fetching from the `GENESIS_BLOCK` unless the `start_block` argument is supplied,
// in which case, start from that instead. If no argument was supplied and a state snapshot
Expand All @@ -124,122 +123,108 @@ impl L1Fetcher {
};
}

let metrics = Arc::new(Mutex::new(L1Metrics {
first_l1_block: Some(current_l1_block_number.as_u64()),
..Default::default()
}));
let end_block = self
.config
.block_count
.map(|count| U64::from(self.config.start_block + count));

let end_block_number = end_block.unwrap_or(
self.provider
.get_block(BlockNumber::Latest)
.await
.unwrap()
.unwrap()
.number
.unwrap(),
);

// Initialize metrics with last state, if it exists.
{
let mut metrics = self.metrics.lock().await;
metrics.last_l1_block = end_block_number.as_u64();
metrics.first_l1_block = current_l1_block_number.as_u64();
metrics.latest_l1_block_nbr = current_l1_block_number.as_u64();
if let Some(snapshot) = &self.snapshot {
metrics.latest_l2_block_nbr = snapshot.lock().await.latest_l2_block_number;
}
}

tokio::spawn({
let metrics = metrics.clone();
let metrics = self.metrics.clone();
async move {
loop {
tokio::time::sleep(Duration::from_secs(METRICS_PRINT_INTERVAL_S)).await;
metrics.lock().await.print();
tokio::time::sleep(Duration::from_secs(METRICS_PRINT_INTERVAL_S)).await;
}
}
});

let event = self.contract.events_by_name("BlockCommit")?[0].clone();
let function = self.contract.functions_by_name("commitBlocks")?[0].clone();
// Wait for shutdown signal in background.
let (shutdown_tx, shutdown_rx) = oneshot::channel();
tokio::spawn(async move {
let _ = tokio::signal::ctrl_c().await;
tracing::info!("Shutdown signal received, finishing up and shutting down...");
let _ = shutdown_tx.send("");
});

let (hash_tx, mut hash_rx) = mpsc::channel(5);
let (calldata_tx, mut calldata_rx) = mpsc::channel(5);
let (hash_tx, hash_rx) = mpsc::channel(5);
let (calldata_tx, calldata_rx) = mpsc::channel(5);

// Split L1 block processing into three async steps:
// - BlockCommit event filter.
// - Referred L1 block fetch.
// - Calldata parsing.
let tx_handle = tokio::spawn({
let mut last_block = current_l1_block_number.as_u64();
let metrics = metrics.clone();
let provider = self.provider.clone();
async move {
while let Some(hash) = hash_rx.recv().await {
let Ok(Some(tx)) = L1Fetcher::retry_call(
|| provider.get_transaction(hash),
L1FetchError::GetTx,
)
.await
else {
continue;
};
// If an `end_block` was supplied we shouldn't poll for newer blocks.
let mut disable_polling = self.config.disable_polling;
if end_block.is_some() {
disable_polling = false;
}

if let Some(current_block) = tx.block_number {
let current_block = current_block.as_u64();
if last_block < current_block {
let mut metrics = metrics.lock().await;
metrics.l1_blocks_processed += current_block - last_block;
last_block = current_block;
}
}
// Split L1 block processing into three async steps:
// - BlockCommit event filter (main).
// - Referred L1 block fetch (tx).
// - Calldata parsing (parse).
let tx_handle =
self.spawn_tx_handler(hash_rx, calldata_tx, current_l1_block_number.as_u64());
let parse_handle = self.spawn_parsing_handler(calldata_rx, sink)?;
let main_handle = self.spawn_main_handler(
hash_tx,
shutdown_rx,
current_l1_block_number,
end_block_number,
disable_polling,
)?;

calldata_tx.send(tx.input).await.unwrap();
}
}
});
tx_handle.await?;
parse_handle.await?;
main_handle.await?;

let parse_handle = tokio::spawn({
let metrics = metrics.clone();
async move {
while let Some(calldata) = calldata_rx.recv().await {
let blocks = match parse_calldata(&function, &calldata) {
Ok(blks) => blks,
Err(e) => {
tracing::error!("failed to parse calldata: {e}");
continue;
}
};
self.metrics.lock().await.print();

for blk in blocks {
// NOTE: Let's see if we want to increment this in batches, instead of each block individually.
let mut metrics = metrics.lock().await;
metrics.l2_blocks_processed += 1;
metrics.latest_l2_block_nbr = blk.block_number;
sink.send(blk).await.unwrap();
}
}
}
});
Ok(())
}

let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
let main_handle = tokio::spawn({
let provider_clone = self.provider.clone();
let snapshot_clone = self.snapshot.clone();
let metrics = metrics.clone();
let mut disable_polling = self.config.disable_polling;
let end_block = self
.config
.block_count
.map(|count| U64::from(self.config.start_block + count));
fn spawn_main_handler(
&self,
hash_tx: mpsc::Sender<H256>,
mut shutdown_rx: oneshot::Receiver<&'static str>,
mut current_l1_block_number: U64,
end_block_number: U64,
disable_polling: bool,
) -> Result<tokio::task::JoinHandle<()>> {
let metrics = self.metrics.clone();
let event = self.contract.events_by_name("BlockCommit")?[0].clone();
let provider_clone = self.provider.clone();
let snapshot_clone = self.snapshot.clone();

Ok(tokio::spawn({
async move {
let mut latest_l2_block_number = U256::zero();

// If an `end_block` was supplied we shouldn't poll for newer blocks.
if end_block.is_some() {
disable_polling = true;
}

let end_block_number = end_block.unwrap_or(
provider_clone
.get_block(BlockNumber::Latest)
.await
.unwrap()
.unwrap()
.number
.unwrap(),
);

// Update last L1 block to metrics calculation.
metrics.clone().lock().await.last_l1_block = end_block_number.as_u64();

loop {
// Break when reaching the `end_block` or on the receivement of a `ctrl_c` signal.
if (disable_polling && current_l1_block_number > end_block_number)
|| shutdown_rx.try_recv().is_ok()
{
// Store our current L1 block number so we can resume from where we left
// off.
// off, we also make sure to update the metrics before leaving the loop.
metrics.lock().await.latest_l1_block_nbr = current_l1_block_number.as_u64();
if let Some(snapshot) = &snapshot_clone {
snapshot.lock().await.latest_l1_block_number = current_l1_block_number;
}
Expand Down Expand Up @@ -290,22 +275,74 @@ impl L1Fetcher {
current_l1_block_number += BLOCK_STEP.into();
}
}
});
}))
}

// Wait for shutdown signal in background.
tokio::spawn(async move {
let _ = tokio::signal::ctrl_c().await;
tracing::info!("Shutdown signal received, finishing up and shutting down...");
let _ = shutdown_tx.send("");
});
fn spawn_tx_handler(
&self,
mut hash_rx: mpsc::Receiver<H256>,
calldata_tx: mpsc::Sender<Bytes>,
mut last_block: u64,
) -> tokio::task::JoinHandle<()> {
let metrics = self.metrics.clone();
let provider = self.provider.clone();

main_handle.await?;
tx_handle.await?;
parse_handle.await?;
tokio::spawn({
async move {
while let Some(hash) = hash_rx.recv().await {
let Ok(Some(tx)) = L1Fetcher::retry_call(
|| provider.get_transaction(hash),
L1FetchError::GetTx,
)
.await
else {
continue;
};

metrics.lock().await.print();
if let Some(current_block) = tx.block_number {
let current_block = current_block.as_u64();
if last_block < current_block {
let mut metrics = metrics.lock().await;
metrics.l1_blocks_processed += current_block - last_block;
last_block = current_block;
}
}

Ok(())
calldata_tx.send(tx.input).await.unwrap();
}
}
})
}

fn spawn_parsing_handler(
&self,
mut calldata_rx: mpsc::Receiver<Bytes>,
sink: mpsc::Sender<CommitBlockInfoV1>,
) -> Result<tokio::task::JoinHandle<()>> {
let metrics = self.metrics.clone();
let function = self.contract.functions_by_name("commitBlocks")?[0].clone();

Ok(tokio::spawn({
async move {
while let Some(calldata) = calldata_rx.recv().await {
let blocks = match parse_calldata(&function, &calldata) {
Ok(blks) => blks,
Err(e) => {
tracing::error!("failed to parse calldata: {e}");
continue;
}
};

for blk in blocks {
// NOTE: Let's see if we want to increment this in batches, instead of each block individually.
let mut metrics = metrics.lock().await;
metrics.l2_blocks_processed += 1;
metrics.latest_l2_block_nbr = blk.block_number;
sink.send(blk).await.unwrap();
}
}
}
}))
}

async fn retry_call<T, Fut>(callback: impl Fn() -> Fut, err: L1FetchError) -> Result<T>
Expand Down

0 comments on commit 4b8fcd5

Please sign in to comment.