Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: dumb interval miner #842

Merged
merged 1 commit into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ impl ExecutorConfig {
/// TODO: remove BlockMiner after migration is completed.
pub async fn init(&self, storage: Arc<StratusStorage>, miner: Arc<BlockMiner>, relayer: Option<Arc<TransactionRelayer>>) -> Arc<Executor> {
let num_evms = max(self.num_evms, 1);
tracing::info!(config = ?self, "configuring executor");
tracing::info!(config = ?self, "starting executor");

// spawn evm in background using native threads
let (evm_tx, evm_rx) = crossbeam_channel::unbounded::<EvmTask>();
Expand Down Expand Up @@ -255,12 +255,22 @@ impl ExecutorConfig {
// Config: Miner
// -----------------------------------------------------------------------------
#[derive(Parser, DebugAsJson, Clone, serde::Serialize)]
pub struct MinerConfig {}
pub struct MinerConfig {
/// Target block time.
#[arg(long = "block-time", value_parser=parse_duration, env = "BLOCK_TIME")]
pub block_time: Option<Duration>,
}

impl MinerConfig {
pub fn init(&self, storage: Arc<StratusStorage>) -> Arc<BlockMiner> {
tracing::info!(config = ?self, "configuring block miner");
Arc::new(BlockMiner::new(storage))
tracing::info!(config = ?self, "starting block miner");
let miner = Arc::new(BlockMiner::new(storage));

if let Some(block_time) = self.block_time {
Arc::clone(&miner).spawn_interval_miner(block_time);
}

miner
}
}

Expand All @@ -276,7 +286,7 @@ pub struct RelayerConfig {

impl RelayerConfig {
pub async fn init(&self, storage: Arc<StratusStorage>) -> anyhow::Result<Option<Arc<TransactionRelayer>>> {
tracing::info!(config = ?self, "configuring transaction relayer");
tracing::info!(config = ?self, "starting transaction relayer");

match self.forward_to {
Some(ref url) => {
Expand Down Expand Up @@ -585,7 +595,7 @@ pub enum ExternalRpcStorageKind {
impl ExternalRpcStorageConfig {
/// Initializes external rpc storage implementation.
pub async fn init(&self) -> anyhow::Result<Arc<dyn ExternalRpcStorage>> {
tracing::info!(config = ?self, "configuring external rpc storage");
tracing::info!(config = ?self, "starting external rpc storage");

match self.external_rpc_storage_kind {
ExternalRpcStorageKind::Postgres { ref url } => {
Expand Down Expand Up @@ -631,7 +641,7 @@ pub enum TemporaryStorageKind {
impl TemporaryStorageConfig {
/// Initializes temporary storage implementation.
pub async fn init(&self) -> anyhow::Result<Arc<dyn TemporaryStorage>> {
tracing::info!(config = ?self, "configuring temporary storage");
tracing::info!(config = ?self, "starting temporary storage");

match self.temp_storage_kind {
TemporaryStorageKind::InMemory => Ok(Arc::new(InMemoryTemporaryStorage::default())),
Expand Down Expand Up @@ -683,7 +693,7 @@ pub enum PermanentStorageKind {
impl PermanentStorageConfig {
/// Initializes permanent storage implementation.
pub async fn init(&self) -> anyhow::Result<Arc<dyn PermanentStorage>> {
tracing::info!(config = ?self, "configuring permanent storage");
tracing::info!(config = ?self, "starting permanent storage");

let perm: Arc<dyn PermanentStorage> = match self.perm_storage_kind {
PermanentStorageKind::InMemory => Arc::new(InMemoryPermanentStorage::default()),
Expand Down
42 changes: 41 additions & 1 deletion src/eth/block_miner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::sync::Arc;
use std::thread;
use std::time::Duration;

use ethereum_types::BloomInput;
use keccak_hasher::KeccakHasher;
Expand Down Expand Up @@ -32,14 +34,22 @@ pub struct BlockMiner {
impl BlockMiner {
/// Creates a new [`BlockMiner`].
pub fn new(storage: Arc<StratusStorage>) -> Self {
tracing::info!("creating block miner");
tracing::info!("starting block miner");
Self {
storage,
notifier_blocks: broadcast::channel(u16::MAX as usize).0,
notifier_logs: broadcast::channel(u16::MAX as usize).0,
}
}

pub fn spawn_interval_miner(self: Arc<Self>, block_time: Duration) {
tracing::info!(block_time = %humantime::Duration::from(block_time), "spawning interval miner");

let t = thread::Builder::new().name("interval-miner".into());
t.spawn(move || interval_miner(self, block_time))
.expect("spawning interval miner should not fail");
}

/// Mines external block and external transactions.
///
/// Local transactions are not allowed to be part of the block.
Expand Down Expand Up @@ -237,3 +247,33 @@ pub fn block_from_local(number: BlockNumber, txs: NonEmpty<LocalTransactionExecu
// TODO: calculate size, state_root, receipts_root, parent_hash
Ok(block)
}

// -----------------------------------------------------------------------------
// Miner
// -----------------------------------------------------------------------------
fn interval_miner(miner: Arc<BlockMiner>, block_time: Duration) {
loop {
thread::sleep(block_time);
tracing::info!("mining block");

// mine
let block = match futures::executor::block_on(miner.mine_local()) {
Ok(block) => block,
Err(e) => {
tracing::error!(reason = ?e, "failed to mine block");
continue;
}
};

// commit
loop {
match futures::executor::block_on(miner.commit(block.clone())) {
Ok(_) => break,
Err(e) => {
tracing::error!(reason = ?e, "failed to commit block");
continue;
}
}
}
}
}
2 changes: 1 addition & 1 deletion src/eth/evm/revm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl Revm {
/// Creates a new instance of the Revm ready to be used.
#[allow(clippy::arc_with_non_send_sync)]
pub fn new(storage: Arc<StratusStorage>, config: EvmConfig) -> Self {
tracing::info!(?config, "creating revm");
tracing::info!(?config, "starting revm");

// configure handler
let mut handler = Handler::mainnet_with_spec(SpecId::LONDON);
Expand Down
2 changes: 1 addition & 1 deletion src/eth/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl Executor {
evm_tx: crossbeam_channel::Sender<EvmTask>,
num_evms: usize,
) -> Self {
tracing::info!(%num_evms, "creating executor");
tracing::info!(%num_evms, "starting executor");
Self {
evm_tx,
num_evms,
Expand Down
2 changes: 2 additions & 0 deletions src/eth/rpc/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ pub async fn serve_rpc(
chain_id: ChainId,
cancellation: CancellationToken,
) -> anyhow::Result<()> {
tracing::info!("starting rpc server");

// configure subscriptions
let subs = Arc::new(RpcSubscriptions::default());
let _handle_subs_cleaner = Arc::clone(&subs).spawn_subscriptions_cleaner();
Expand Down
6 changes: 6 additions & 0 deletions src/eth/rpc/rpc_subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub struct RpcSubscriptions {
impl RpcSubscriptions {
/// Spawns a new thread to clean up closed subscriptions from time to time.
pub fn spawn_subscriptions_cleaner(self: Arc<Self>) -> JoinHandle<anyhow::Result<()>> {
tracing::info!("spawning rpc subscriptions cleaner");

tokio::spawn(async move {
loop {
let any_new_heads_closed = self.new_heads.read().await.iter().any(|(_, sub)| sub.is_closed());
Expand All @@ -60,6 +62,8 @@ impl RpcSubscriptions {

/// Spawns a new thread that notifies subscribers about new created blocks.
pub fn spawn_new_heads_notifier(self: Arc<Self>, mut rx: broadcast::Receiver<Block>) -> JoinHandle<anyhow::Result<()>> {
tracing::info!("spawning rpc new heads notifier");

tokio::spawn(async move {
loop {
let Ok(block) = rx.recv().await else {
Expand All @@ -79,6 +83,8 @@ impl RpcSubscriptions {

/// Spawns a new thread that notifies subscribers about new transactions logs.
pub fn spawn_logs_notifier(self: Arc<Self>, mut rx: broadcast::Receiver<LogMined>) -> JoinHandle<anyhow::Result<()>> {
tracing::info!("spawning rpc logs notifier");

tokio::spawn(async move {
loop {
let Ok(log) = rx.recv().await else {
Expand Down
4 changes: 2 additions & 2 deletions src/eth/storage/inmemory/inmemory_permanent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl InMemoryPermanentStorage {

/// Creates a new InMemoryPermanentStorage from a snapshot dump.
pub fn from_snapshot(state: InMemoryPermanentStorageState) -> Self {
tracing::info!("creating inmemory permanent storage from snapshot");
tracing::info!("starting inmemory permanent storage from snapshot");
Self {
state: RwLock::new(state),
block_number: AtomicU64::new(0),
Expand All @@ -126,7 +126,7 @@ impl InMemoryPermanentStorage {

impl Default for InMemoryPermanentStorage {
fn default() -> Self {
tracing::info!("creating inmemory permanent storage");
tracing::info!("starting inmemory permanent storage");
Self {
state: RwLock::new(InMemoryPermanentStorageState::default()),
block_number: Default::default(),
Expand Down
2 changes: 1 addition & 1 deletion src/eth/storage/inmemory/inmemory_temporary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl InMemoryTemporaryStorage {

impl Default for InMemoryTemporaryStorage {
fn default() -> Self {
tracing::info!("creating inmemory temporary storage");
tracing::info!("starting inmemory temporary storage");
Self {
states: RwLock::new(NonEmpty::new(InMemoryTemporaryStorageState::default())),
}
Expand Down
2 changes: 1 addition & 1 deletion src/eth/storage/postgres_permanent/postgres_permanent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl Drop for PostgresPermanentStorage {
impl PostgresPermanentStorage {
/// Creates a new [`PostgresPermanentStorage`].
pub async fn new(config: PostgresPermanentStorageConfig) -> anyhow::Result<Self> {
tracing::info!(?config, "creating postgres permanent storage");
tracing::info!(?config, "starting postgres permanent storage");

let result = PgPoolOptions::new()
.min_connections(config.connections)
Expand Down
2 changes: 1 addition & 1 deletion src/eth/storage/rocks/rocks_permanent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub struct RocksPermanentStorage {

impl RocksPermanentStorage {
pub async fn new() -> anyhow::Result<Self> {
tracing::info!("creating rocksdb storage");
tracing::info!("starting rocksdb storage");

let state = RocksStorageState::new();
state.sync_data().await?;
Expand Down
2 changes: 1 addition & 1 deletion src/eth/transaction_relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub struct TransactionRelayer {
impl TransactionRelayer {
/// Creates a new [`TransactionRelayer`].
pub fn new(storage: Arc<StratusStorage>, chain: BlockchainClient) -> Self {
tracing::info!(?chain, "creating transaction relayer");
tracing::info!(?chain, "starting transaction relayer");
Self { storage, chain }
}

Expand Down
5 changes: 4 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,17 @@ where
println!("WARNING: env var PERM_STORAGE_CONNECTIONS is set to 1, if it cause connection problems, try increasing it");
}

// init services
// init metrics
#[cfg(feature = "metrics")]
infra::init_metrics(config.common().metrics_histogram_kind);

// init sentry
let _sentry_guard = config.common().sentry_url.as_ref().map(|sentry_url| infra::init_sentry(sentry_url));

// init tokio
let runtime = config.common().init_runtime();

// init tracing
runtime.block_on(async { infra::init_tracing(config.common().tracing_url.as_ref()) });

Self {
Expand Down
Loading