Skip to content

Commit

Permalink
TEMPORARY COMMIT
Browse files Browse the repository at this point in the history
  • Loading branch information
marcospb19-cw committed Jul 2, 2024
1 parent 7a55af7 commit b7cf84f
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 38 deletions.
126 changes: 92 additions & 34 deletions src/bin/importer_offline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use std::cmp::min;
use std::fs;
use std::sync::mpsc;
use std::sync::Arc;

use anyhow::anyhow;
Expand All @@ -24,6 +25,7 @@ use stratus::eth::primitives::ExternalReceipt;
use stratus::eth::primitives::ExternalReceipts;
use stratus::eth::storage::ExternalRpcStorage;
use stratus::eth::storage::InMemoryPermanentStorage;
use stratus::eth::storage::INMEMORY_TEMPORARY_STORAGE_MAX_BLOCKS;
use stratus::eth::BlockMiner;
use stratus::eth::Executor;
use stratus::ext::spawn_named;
Expand All @@ -34,13 +36,26 @@ use stratus::utils::calculate_tps_and_bpm;
use stratus::utils::DropTimer;
use stratus::GlobalServices;
use stratus::GlobalState;
use tokio::sync::mpsc;
use tokio::sync::mpsc as async_mpsc;
use tokio::time::Instant;

/// Number of tasks in the backlog. Each task contains `--blocks-by-fetch` blocks and all receipts for them.
const BACKLOG_SIZE: usize = 50;
/// Number of loader tasks buffered. Each task contains `--blocks-by-fetch` blocks and all receipts for them.
const RPC_LOADER_CHANNEL_CAPACITY: usize = 25;

type BacklogTask = (Vec<ExternalBlock>, Vec<ExternalReceipt>);
/// Size of the block batches to save after execution, and capacity of the batch channel.
///
/// We are using half of the inmemory temporary storage and a channel with size 1, this way, we can parallize
/// execution and storage saving while staying below the inmemory storage limit.
///
/// By setting channel capacity to 1, we use backpressure to ensure that at most
/// `INMEMORY_TEMPORARY_STORAGE_MAX_BLOCKS` are executed at a time.
///
/// REMOVE THIS: Decrease by one to give room for the pending block.
/// TODO: why `max/2-1` doesn't work?
const STORAGE_WRITER_BATCHES_SIZE: usize = INMEMORY_TEMPORARY_STORAGE_MAX_BLOCKS / 3;
const STORAGE_WRITER_CHANNEL_CAPACITY: usize = 1;

type LoadedBatch = (Vec<ExternalBlock>, Vec<ExternalReceipt>);

fn main() -> anyhow::Result<()> {
let global_services = GlobalServices::<ImporterOfflineConfig>::init();
Expand Down Expand Up @@ -69,55 +84,64 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> {
None => block_number_to_stop(&rpc_storage).await?,
};

// init shared data between importer and external rpc storage loader
let (backlog_tx, backlog_rx) = mpsc::channel::<BacklogTask>(BACKLOG_SIZE);
// init channel that sends batches from external storage loader to block executor
let (loader_tx, loader_rx) = async_mpsc::channel::<LoadedBatch>(RPC_LOADER_CHANNEL_CAPACITY);

// init channel that sends batches from executor to the storage writer
let (writer_batch_tx, writer_batch_rx) = mpsc::sync_channel::<Vec<Block>>(STORAGE_WRITER_CHANNEL_CAPACITY);

// load genesis accounts
let initial_accounts = rpc_storage.read_initial_accounts().await?;
storage.save_accounts(initial_accounts.clone())?;

let storage_loader = execute_external_rpc_storage_loader(rpc_storage, config.blocks_by_fetch, config.paralellism, block_start, block_end, backlog_tx);
spawn_named("storage-loader", async move {
if let Err(e) = storage_loader.await {
tracing::error!(reason = ?e, "'storage-loader' task failed");
let storage_loader_fut = run_external_rpc_storage_loader(rpc_storage, config.blocks_by_fetch, config.paralellism, block_start, block_end, loader_tx);
let _storage_loader = spawn_named("storage-loader", async move {
if let Err(err) = storage_loader_fut.await {
tracing::error!(?err, "'storage-loader' task failed");
}
});

let block_importer = spawn_thread("block-importer", || {
if let Err(e) = execute_block_importer(executor, miner, backlog_rx, block_snapshots) {
tracing::error!(reason = ?e, "'block-importer' task failed");
let _block_executor = spawn_thread("block-executor", {
let miner = Arc::clone(&miner);
|| {
if let Err(err) = run_external_block_executor(executor, miner, loader_rx, writer_batch_tx, block_snapshots) {
tracing::error!(?err, "'block-executor' task failed");
}
}
});

block_importer
.join()
.expect("'block-importer' thread panic'ed instead of properly returning an error");
let block_saver = spawn_thread("block-saver", || {
if let Err(err) = run_block_saver(miner, writer_batch_rx) {
tracing::error!(?err, "'block-saver' task failed");
}
});

block_saver.join().expect("'block-saver' thread panic'ed instead of returning an error");

Ok(())
}

// -----------------------------------------------------------------------------
// Block importer
// Block executor
// -----------------------------------------------------------------------------
fn execute_block_importer(
// services
fn run_external_block_executor(
executor: Arc<Executor>,
miner: Arc<BlockMiner>,
// data
mut backlog_rx: mpsc::Receiver<BacklogTask>,
mut loader_rx: async_mpsc::Receiver<LoadedBatch>,
writer_batch_tx: mpsc::SyncSender<Vec<Block>>,
blocks_to_export_snapshot: Vec<BlockNumber>,
) -> anyhow::Result<()> {
const TASK_NAME: &str = "external-block-executor";
let _timer = DropTimer::start("importer-offline::execute_block_importer");
let _timer = DropTimer::start("importer-offline::run_external_block_executor");

// receives blocks and receipts from the backlog to reexecute and import
// receives blocks and receipts from the loader to reexecute and import
loop {
if GlobalState::warn_if_shutdown(TASK_NAME) {
return Ok(());
};

// receive new tasks to execute, or exit
let Some((blocks, receipts)) = backlog_rx.blocking_recv() else {
let Some((blocks, receipts)) = loader_rx.blocking_recv() else {
tracing::info!("{} has no more blocks to process", TASK_NAME);
return Ok(());
};
Expand All @@ -132,7 +156,8 @@ fn execute_block_importer(
let mut transaction_count = 0;
let instant_before_execution = Instant::now();

for block in blocks.into_iter() {
let mut block_batch = Vec::with_capacity(STORAGE_WRITER_BATCHES_SIZE);
for block in blocks {
if GlobalState::warn_if_shutdown(TASK_NAME) {
return Ok(());
}
Expand All @@ -146,7 +171,12 @@ fn execute_block_importer(
if blocks_to_export_snapshot.contains(&mined_block.number()) {
export_snapshot(&block, &receipts, &mined_block)?;
}
miner.commit(mined_block.clone())?;
block_batch.push(mined_block);

if block_batch.len() == STORAGE_WRITER_BATCHES_SIZE {
writer_batch_tx.send(block_batch).unwrap();
block_batch = vec![];
}
}

let duration = instant_before_execution.elapsed();
Expand All @@ -161,21 +191,49 @@ fn execute_block_importer(
receipts = receipts.len(),
"reexecuted blocks batch",
);

// send the leftovers
if !block_batch.is_empty() {
writer_batch_tx.send(block_batch).unwrap();
}
}
}

// -----------------------------------------------------------------------------
// Block saver
// -----------------------------------------------------------------------------
fn run_block_saver(miner: Arc<BlockMiner>, writer_batch_rx: mpsc::Receiver<Vec<Block>>) -> anyhow::Result<()> {
const TASK_NAME: &str = "block-saver";
let _timer = DropTimer::start("importer-offline::run_block_saver");

// receives blocks and receipts from the loader to reexecute and import
loop {
if GlobalState::warn_if_shutdown(TASK_NAME) {
return Ok(());
};

// receive new tasks to execute, or exit
let Ok(blocks_batch) = writer_batch_rx.recv() else {
tracing::info!("{} has no more batches to save", TASK_NAME);
return Ok(());
};

for block in blocks_batch {
miner.commit(block)?;
}
}
}

// -----------------------------------------------------------------------------
// Block loader
// -----------------------------------------------------------------------------
async fn execute_external_rpc_storage_loader(
// services
async fn run_external_rpc_storage_loader(
rpc_storage: Arc<dyn ExternalRpcStorage>,
// data
blocks_by_fetch: usize,
paralellism: usize,
mut start: BlockNumber,
end: BlockNumber,
backlog: mpsc::Sender<BacklogTask>,
loader_tx: async_mpsc::Sender<LoadedBatch>,
) -> anyhow::Result<()> {
const TASK_NAME: &str = "external-block-loader";
tracing::info!(%start, %end, "creating task {}", TASK_NAME);
Expand Down Expand Up @@ -220,14 +278,14 @@ async fn execute_external_rpc_storage_loader(
return log_and_err!(message);
}

// send to backlog
if backlog.send((blocks, receipts)).await.is_err() {
return Err(anyhow!(GlobalState::shutdown_from(TASK_NAME, "failed to send task to importer")));
// send to channel
if loader_tx.send((blocks, receipts)).await.is_err() {
return Err(anyhow!(GlobalState::shutdown_from(TASK_NAME, "failed to send task to executor")));
};
}
}

async fn load_blocks_and_receipts(rpc_storage: Arc<dyn ExternalRpcStorage>, start: BlockNumber, end: BlockNumber) -> anyhow::Result<BacklogTask> {
async fn load_blocks_and_receipts(rpc_storage: Arc<dyn ExternalRpcStorage>, start: BlockNumber, end: BlockNumber) -> anyhow::Result<LoadedBatch> {
tracing::info!(%start, %end, "loading blocks and receipts");
let blocks_task = rpc_storage.read_blocks_in_range(start, end);
let receipts_task = rpc_storage.read_receipts_in_range(start, end);
Expand Down
4 changes: 2 additions & 2 deletions src/eth/storage/inmemory/inmemory_temporary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::eth::storage::TemporaryStorage;
use crate::log_and_err;

/// Number of previous blocks to keep inmemory to detect conflicts between different blocks.
const MAX_BLOCKS: usize = 64;
pub const MAX_BLOCKS: usize = 64;

#[derive(Debug)]
pub struct InMemoryTemporaryStorage {
Expand Down Expand Up @@ -74,7 +74,7 @@ pub struct InMemoryTemporaryStorageState {

impl InMemoryTemporaryStorageState {
/// Validates there is an active pending block being mined and returns a reference to it.
fn require_active_block(&mut self) -> anyhow::Result<&PendingBlock> {
fn require_active_block(&self) -> anyhow::Result<&PendingBlock> {
match &self.block {
Some(block) => Ok(block),
None => log_and_err!("no pending block being mined"), // try calling set_active_block_number_as_next_if_not_set or any other method to create a new block on temp storage
Expand Down
1 change: 1 addition & 0 deletions src/eth/storage/inmemory/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ pub use inmemory_history::InMemoryHistory;
pub use inmemory_permanent::InMemoryPermanentStorage;
pub use inmemory_permanent::InMemoryPermanentStorageState;
pub use inmemory_temporary::InMemoryTemporaryStorage;
pub use inmemory_temporary::MAX_BLOCKS as INMEMORY_TEMPORARY_STORAGE_MAX_BLOCKS;
1 change: 1 addition & 0 deletions src/eth/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub use external_rpc_storage::ExternalRpcStorage;
pub use inmemory::InMemoryPermanentStorage;
pub use inmemory::InMemoryPermanentStorageState;
pub use inmemory::InMemoryTemporaryStorage;
pub use inmemory::INMEMORY_TEMPORARY_STORAGE_MAX_BLOCKS;
pub use permanent_storage::PermanentStorage;
pub use postgres_external_rpc::PostgresExternalRpcStorage;
pub use postgres_external_rpc::PostgresExternalRpcStorageConfig;
Expand Down
2 changes: 1 addition & 1 deletion src/eth/storage/stratus_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ impl StratusStorage {
metrics::inc_storage_finish_block(m.elapsed, label::TEMP, m.result.is_ok());
});

if let Ok(ref block) = result {
if let Ok(block) = &result {
Span::with(|s| s.rec_str("block_number", &block.number));
}

Expand Down
2 changes: 1 addition & 1 deletion src/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ macro_rules! channel_read {
$crate::channel_read_impl!($rx, timeout_ms: 2000)
};
($rx: ident, $timeout_ms:expr) => {
$crate::channel_read_impl!($rx, timeout_ms: $timeout_ms),
$crate::channel_read_impl!($rx, timeout_ms: $timeout_ms)
};
}

Expand Down

0 comments on commit b7cf84f

Please sign in to comment.