diff --git a/src/eth/follower/importer/importer.rs b/src/eth/follower/importer/importer.rs index 9711afc38..2149e4daa 100644 --- a/src/eth/follower/importer/importer.rs +++ b/src/eth/follower/importer/importer.rs @@ -301,31 +301,25 @@ impl Importer { // in case of failure, re-subscribe because current subscription may have been closed in the server. if let Some(sub) = &mut sub_new_heads { tracing::info!("{} awaiting block number from newHeads subscription", TASK_NAME); - let resubscribe_ws = match timeout(TIMEOUT_NEW_HEADS, sub.next()).await { + match timeout(TIMEOUT_NEW_HEADS, sub.next()).await { Ok(Some(Ok(block))) => { tracing::info!(block_number = %block.number(), "{} received newHeads event", TASK_NAME); set_external_rpc_current_block(block.number()); continue; } - Ok(None) => { + Ok(None) => if !Self::should_shutdown(TASK_NAME) { tracing::error!("{} newHeads subscription closed by the other side", TASK_NAME); - } - true - } - Ok(Some(Err(e))) => { + }, + Ok(Some(Err(e))) => if !Self::should_shutdown(TASK_NAME) { tracing::error!(reason = ?e, "{} failed to read newHeads subscription event", TASK_NAME); - } - true - } - Err(_) => { + }, + Err(_) => if !Self::should_shutdown(TASK_NAME) { tracing::error!("{} timed-out waiting for newHeads subscription event", TASK_NAME); - } - true - } - }; + }, + } if Self::should_shutdown(TASK_NAME) { return Ok(()); @@ -333,7 +327,7 @@ impl Importer { // resubscribe if necessary. // only update the existing subscription if succedeed, otherwise we will try again in the next iteration. - if chain.supports_ws() && resubscribe_ws { + if chain.supports_ws() { tracing::info!("{} resubscribing to newHeads event", TASK_NAME); match chain.subscribe_new_heads().await { Ok(sub) => {