Skip to content

Commit

Permalink
feat: dumb interval miner
Browse files Browse the repository at this point in the history
  • Loading branch information
dinhani-cw committed May 14, 2024
1 parent fb01706 commit 0443681
Show file tree
Hide file tree
Showing 12 changed files with 79 additions and 18 deletions.
26 changes: 18 additions & 8 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ impl ExecutorConfig {
/// TODO: remove BlockMiner after migration is completed.
pub async fn init(&self, storage: Arc<StratusStorage>, miner: Arc<BlockMiner>, relayer: Option<Arc<TransactionRelayer>>) -> Arc<Executor> {
let num_evms = max(self.num_evms, 1);
tracing::info!(config = ?self, "configuring executor");
tracing::info!(config = ?self, "starting executor");

// spawn evm in background using native threads
let (evm_tx, evm_rx) = crossbeam_channel::unbounded::<EvmTask>();
Expand Down Expand Up @@ -255,12 +255,22 @@ impl ExecutorConfig {
// Config: Miner
// -----------------------------------------------------------------------------
#[derive(Parser, DebugAsJson, Clone, serde::Serialize)]
pub struct MinerConfig {}
pub struct MinerConfig {
/// Target block time.
#[arg(long = "block-time", value_parser=parse_duration, env = "BLOCK_TIME")]
pub block_time: Option<Duration>,
}

impl MinerConfig {
pub fn init(&self, storage: Arc<StratusStorage>) -> Arc<BlockMiner> {
tracing::info!(config = ?self, "configuring block miner");
Arc::new(BlockMiner::new(storage))
tracing::info!(config = ?self, "starting block miner");
let miner = Arc::new(BlockMiner::new(storage));

if let Some(block_time) = self.block_time {
Arc::clone(&miner).spawn_interval_miner(block_time);
}

miner
}
}

Expand All @@ -276,7 +286,7 @@ pub struct RelayerConfig {

impl RelayerConfig {
pub async fn init(&self, storage: Arc<StratusStorage>) -> anyhow::Result<Option<Arc<TransactionRelayer>>> {
tracing::info!(config = ?self, "configuring transaction relayer");
tracing::info!(config = ?self, "starting transaction relayer");

match self.forward_to {
Some(ref url) => {
Expand Down Expand Up @@ -585,7 +595,7 @@ pub enum ExternalRpcStorageKind {
impl ExternalRpcStorageConfig {
/// Initializes external rpc storage implementation.
pub async fn init(&self) -> anyhow::Result<Arc<dyn ExternalRpcStorage>> {
tracing::info!(config = ?self, "configuring external rpc storage");
tracing::info!(config = ?self, "starting external rpc storage");

match self.external_rpc_storage_kind {
ExternalRpcStorageKind::Postgres { ref url } => {
Expand Down Expand Up @@ -631,7 +641,7 @@ pub enum TemporaryStorageKind {
impl TemporaryStorageConfig {
/// Initializes temporary storage implementation.
pub async fn init(&self) -> anyhow::Result<Arc<dyn TemporaryStorage>> {
tracing::info!(config = ?self, "configuring temporary storage");
tracing::info!(config = ?self, "starting temporary storage");

match self.temp_storage_kind {
TemporaryStorageKind::InMemory => Ok(Arc::new(InMemoryTemporaryStorage::default())),
Expand Down Expand Up @@ -683,7 +693,7 @@ pub enum PermanentStorageKind {
impl PermanentStorageConfig {
/// Initializes permanent storage implementation.
pub async fn init(&self) -> anyhow::Result<Arc<dyn PermanentStorage>> {
tracing::info!(config = ?self, "configuring permanent storage");
tracing::info!(config = ?self, "starting permanent storage");

let perm: Arc<dyn PermanentStorage> = match self.perm_storage_kind {
PermanentStorageKind::InMemory => Arc::new(InMemoryPermanentStorage::default()),
Expand Down
42 changes: 41 additions & 1 deletion src/eth/block_miner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::sync::Arc;
use std::thread;
use std::time::Duration;

use ethereum_types::BloomInput;
use keccak_hasher::KeccakHasher;
Expand Down Expand Up @@ -32,14 +34,22 @@ pub struct BlockMiner {
impl BlockMiner {
/// Creates a new [`BlockMiner`].
pub fn new(storage: Arc<StratusStorage>) -> Self {
tracing::info!("creating block miner");
tracing::info!("starting block miner");
Self {
storage,
notifier_blocks: broadcast::channel(u16::MAX as usize).0,
notifier_logs: broadcast::channel(u16::MAX as usize).0,
}
}

pub fn spawn_interval_miner(self: Arc<Self>, block_time: Duration) {
tracing::info!(block_time = %humantime::Duration::from(block_time), "spawning interval miner");

let t = thread::Builder::new().name("interval-miner".into());
t.spawn(move || interval_miner(self, block_time))
.expect("spawning interval miner should not fail");
}

/// Mines external block and external transactions.
///
/// Local transactions are not allowed to be part of the block.
Expand Down Expand Up @@ -237,3 +247,33 @@ pub fn block_from_local(number: BlockNumber, txs: NonEmpty<LocalTransactionExecu
// TODO: calculate size, state_root, receipts_root, parent_hash
Ok(block)
}

// -----------------------------------------------------------------------------
// Miner
// -----------------------------------------------------------------------------
fn interval_miner(miner: Arc<BlockMiner>, block_time: Duration) {
loop {
thread::sleep(block_time);
tracing::info!("mining block");

// mine
let block = match futures::executor::block_on(miner.mine_local()) {
Ok(block) => block,
Err(e) => {
tracing::error!(reason = ?e, "failed to mine block");
continue;
}
};

// commit
loop {
match futures::executor::block_on(miner.commit(block.clone())) {
Ok(_) => break,
Err(e) => {
tracing::error!(reason = ?e, "failed to commit block");
continue;
}
}
}
}
}
2 changes: 1 addition & 1 deletion src/eth/evm/revm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl Revm {
/// Creates a new instance of the Revm ready to be used.
#[allow(clippy::arc_with_non_send_sync)]
pub fn new(storage: Arc<StratusStorage>, config: EvmConfig) -> Self {
tracing::info!(?config, "creating revm");
tracing::info!(?config, "starting revm");

// configure handler
let mut handler = Handler::mainnet_with_spec(SpecId::LONDON);
Expand Down
2 changes: 1 addition & 1 deletion src/eth/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl Executor {
evm_tx: crossbeam_channel::Sender<EvmTask>,
num_evms: usize,
) -> Self {
tracing::info!(%num_evms, "creating executor");
tracing::info!(%num_evms, "starting executor");
Self {
evm_tx,
num_evms,
Expand Down
2 changes: 2 additions & 0 deletions src/eth/rpc/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ pub async fn serve_rpc(
chain_id: ChainId,
cancellation: CancellationToken,
) -> anyhow::Result<()> {
tracing::info!("starting rpc server");

// configure subscriptions
let subs = Arc::new(RpcSubscriptions::default());
let _handle_subs_cleaner = Arc::clone(&subs).spawn_subscriptions_cleaner();
Expand Down
6 changes: 6 additions & 0 deletions src/eth/rpc/rpc_subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub struct RpcSubscriptions {
impl RpcSubscriptions {
/// Spawns a new thread to clean up closed subscriptions from time to time.
pub fn spawn_subscriptions_cleaner(self: Arc<Self>) -> JoinHandle<anyhow::Result<()>> {
tracing::info!("spawning rpc subscriptions cleaner");

tokio::spawn(async move {
loop {
let any_new_heads_closed = self.new_heads.read().await.iter().any(|(_, sub)| sub.is_closed());
Expand All @@ -60,6 +62,8 @@ impl RpcSubscriptions {

/// Spawns a new thread that notifies subscribers about new created blocks.
pub fn spawn_new_heads_notifier(self: Arc<Self>, mut rx: broadcast::Receiver<Block>) -> JoinHandle<anyhow::Result<()>> {
tracing::info!("spawning rpc new heads notifier");

tokio::spawn(async move {
loop {
let Ok(block) = rx.recv().await else {
Expand All @@ -79,6 +83,8 @@ impl RpcSubscriptions {

/// Spawns a new thread that notifies subscribers about new transactions logs.
pub fn spawn_logs_notifier(self: Arc<Self>, mut rx: broadcast::Receiver<LogMined>) -> JoinHandle<anyhow::Result<()>> {
tracing::info!("spawning rpc logs notifier");

tokio::spawn(async move {
loop {
let Ok(log) = rx.recv().await else {
Expand Down
4 changes: 2 additions & 2 deletions src/eth/storage/inmemory/inmemory_permanent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl InMemoryPermanentStorage {

/// Creates a new InMemoryPermanentStorage from a snapshot dump.
pub fn from_snapshot(state: InMemoryPermanentStorageState) -> Self {
tracing::info!("creating inmemory permanent storage from snapshot");
tracing::info!("starting inmemory permanent storage from snapshot");
Self {
state: RwLock::new(state),
block_number: AtomicU64::new(0),
Expand All @@ -126,7 +126,7 @@ impl InMemoryPermanentStorage {

impl Default for InMemoryPermanentStorage {
fn default() -> Self {
tracing::info!("creating inmemory permanent storage");
tracing::info!("starting inmemory permanent storage");
Self {
state: RwLock::new(InMemoryPermanentStorageState::default()),
block_number: Default::default(),
Expand Down
2 changes: 1 addition & 1 deletion src/eth/storage/inmemory/inmemory_temporary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl InMemoryTemporaryStorage {

impl Default for InMemoryTemporaryStorage {
fn default() -> Self {
tracing::info!("creating inmemory temporary storage");
tracing::info!("starting inmemory temporary storage");
Self {
states: RwLock::new(NonEmpty::new(InMemoryTemporaryStorageState::default())),
}
Expand Down
2 changes: 1 addition & 1 deletion src/eth/storage/postgres_permanent/postgres_permanent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl Drop for PostgresPermanentStorage {
impl PostgresPermanentStorage {
/// Creates a new [`PostgresPermanentStorage`].
pub async fn new(config: PostgresPermanentStorageConfig) -> anyhow::Result<Self> {
tracing::info!(?config, "creating postgres permanent storage");
tracing::info!(?config, "starting postgres permanent storage");

let result = PgPoolOptions::new()
.min_connections(config.connections)
Expand Down
2 changes: 1 addition & 1 deletion src/eth/storage/rocks/rocks_permanent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub struct RocksPermanentStorage {

impl RocksPermanentStorage {
pub async fn new() -> anyhow::Result<Self> {
tracing::info!("creating rocksdb storage");
tracing::info!("starting rocksdb storage");

let state = RocksStorageState::new();
state.sync_data().await?;
Expand Down
2 changes: 1 addition & 1 deletion src/eth/transaction_relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub struct TransactionRelayer {
impl TransactionRelayer {
/// Creates a new [`TransactionRelayer`].
pub fn new(storage: Arc<StratusStorage>, chain: BlockchainClient) -> Self {
tracing::info!(?chain, "creating transaction relayer");
tracing::info!(?chain, "starting transaction relayer");
Self { storage, chain }
}

Expand Down
5 changes: 4 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,17 @@ where
println!("WARNING: env var PERM_STORAGE_CONNECTIONS is set to 1, if it cause connection problems, try increasing it");
}

// init services
// init metrics
#[cfg(feature = "metrics")]
infra::init_metrics(config.common().metrics_histogram_kind);

// init sentry
let _sentry_guard = config.common().sentry_url.as_ref().map(|sentry_url| infra::init_sentry(sentry_url));

// init tokio
let runtime = config.common().init_runtime();

// init tracing
runtime.block_on(async { infra::init_tracing(config.common().tracing_url.as_ref()) });

Self {
Expand Down

0 comments on commit 0443681

Please sign in to comment.