Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize importer-offline (batch writing + more parallelism) #1168

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 93 additions & 35 deletions src/bin/importer_offline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

use std::cmp::min;
use std::fs;
use std::sync::mpsc;
use std::sync::Arc;

use anyhow::anyhow;
Expand All @@ -26,6 +27,7 @@ use stratus::eth::primitives::ExternalReceipt;
use stratus::eth::primitives::ExternalReceipts;
use stratus::eth::storage::ExternalRpcStorage;
use stratus::eth::storage::InMemoryPermanentStorage;
use stratus::eth::storage::INMEMORY_TEMPORARY_STORAGE_MAX_BLOCKS;
use stratus::ext::spawn_named;
use stratus::ext::spawn_thread;
use stratus::ext::to_json_string_pretty;
Expand All @@ -34,13 +36,26 @@ use stratus::utils::calculate_tps_and_bpm;
use stratus::utils::DropTimer;
use stratus::GlobalServices;
use stratus::GlobalState;
use tokio::sync::mpsc;
use tokio::sync::mpsc as async_mpsc;
use tokio::time::Instant;

/// Number of tasks in the backlog. Each task contains `--blocks-by-fetch` blocks and all receipts for them.
const BACKLOG_SIZE: usize = 50;
/// Number of loader tasks buffered. Each task contains `--blocks-by-fetch` blocks and all receipts for them.
const RPC_LOADER_CHANNEL_CAPACITY: usize = 25;

type BacklogTask = (Vec<ExternalBlock>, Vec<ExternalReceipt>);
/// Size of the block batches to save after execution, and capacity of the batch channel.
///
/// We are using half of the inmemory temporary storage and a channel with size 1, this way, we can parallize
/// execution and storage saving while staying below the inmemory storage limit.
///
/// By setting channel capacity to 1, we use backpressure to ensure that at most
/// `INMEMORY_TEMPORARY_STORAGE_MAX_BLOCKS` are executed at a time.
///
/// REMOVE THIS: Decrease by one to give room for the pending block.
/// TODO: why `max/2-1` doesn't work?
const STORAGE_WRITER_BATCHES_SIZE: usize = INMEMORY_TEMPORARY_STORAGE_MAX_BLOCKS / 3;
Copy link
Contributor Author

@marcospb19-cw marcospb19-cw Jul 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Auto-review requested changes: update these comments and quickly check if it's obvious why it behaves this way.

const STORAGE_WRITER_CHANNEL_CAPACITY: usize = 1;

type LoadedBatch = (Vec<ExternalBlock>, Vec<ExternalReceipt>);

fn main() -> anyhow::Result<()> {
let global_services = GlobalServices::<ImporterOfflineConfig>::init();
Expand Down Expand Up @@ -69,29 +84,39 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> {
None => block_number_to_stop(&rpc_storage).await?,
};

// init shared data between importer and external rpc storage loader
let (backlog_tx, backlog_rx) = mpsc::channel::<BacklogTask>(BACKLOG_SIZE);
// init channel that sends batches from external storage loader to block executor
let (loader_tx, loader_rx) = async_mpsc::channel::<LoadedBatch>(RPC_LOADER_CHANNEL_CAPACITY);

// init channel that sends batches from executor to the storage writer
let (writer_batch_tx, writer_batch_rx) = mpsc::sync_channel::<Vec<Block>>(STORAGE_WRITER_CHANNEL_CAPACITY);

// load genesis accounts
let initial_accounts = rpc_storage.read_initial_accounts().await?;
storage.save_accounts(initial_accounts.clone())?;

let storage_loader = execute_external_rpc_storage_loader(rpc_storage, config.blocks_by_fetch, config.paralellism, block_start, block_end, backlog_tx);
spawn_named("storage-loader", async move {
if let Err(e) = storage_loader.await {
tracing::error!(reason = ?e, "'storage-loader' task failed");
let storage_loader_fut = run_external_rpc_storage_loader(rpc_storage, config.blocks_by_fetch, config.paralellism, block_start, block_end, loader_tx);
let _storage_loader = spawn_named("storage-loader", async move {
if let Err(err) = storage_loader_fut.await {
tracing::error!(?err, "'storage-loader' task failed");
}
});

let block_importer = spawn_thread("block-importer", || {
if let Err(e) = execute_block_importer(executor, miner, backlog_rx, block_snapshots) {
tracing::error!(reason = ?e, "'block-importer' task failed");
let _block_executor = spawn_thread("block-executor", {
let miner = Arc::clone(&miner);
|| {
if let Err(err) = run_external_block_executor(executor, miner, loader_rx, writer_batch_tx, block_snapshots) {
tracing::error!(?err, "'block-executor' task failed");
}
}
});

block_importer
.join()
.expect("'block-importer' thread panic'ed instead of properly returning an error");
let block_saver = spawn_thread("block-saver", || {
if let Err(err) = run_block_saver(miner, writer_batch_rx) {
tracing::error!(?err, "'block-saver' task failed");
}
});

block_saver.join().expect("'block-saver' thread panic'ed instead of returning an error");

// Explicitly block the `main` thread to drop the storage.
drop(storage);
Expand All @@ -100,27 +125,26 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> {
}

// -----------------------------------------------------------------------------
// Block importer
// Block executor
// -----------------------------------------------------------------------------
fn execute_block_importer(
// services
fn run_external_block_executor(
executor: Arc<Executor>,
miner: Arc<Miner>,
// data
mut backlog_rx: mpsc::Receiver<BacklogTask>,
mut loader_rx: async_mpsc::Receiver<LoadedBatch>,
writer_batch_tx: mpsc::SyncSender<Vec<Block>>,
blocks_to_export_snapshot: Vec<BlockNumber>,
) -> anyhow::Result<()> {
const TASK_NAME: &str = "external-block-executor";
let _timer = DropTimer::start("importer-offline::execute_block_importer");
let _timer = DropTimer::start("importer-offline::run_external_block_executor");

// receives blocks and receipts from the backlog to reexecute and import
// receives blocks and receipts from the loader to reexecute and import
loop {
if GlobalState::is_shutdown_warn(TASK_NAME) {
return Ok(());
};

// receive blocks to execute
let Some((blocks, receipts)) = backlog_rx.blocking_recv() else {
// receive new tasks to execute, or exit
let Some((blocks, receipts)) = loader_rx.blocking_recv() else {
tracing::info!("{} has no more blocks to reexecute", TASK_NAME);
return Ok(());
};
Expand Down Expand Up @@ -148,7 +172,8 @@ fn execute_block_importer(
let mut transaction_count = 0;
let instant_before_execution = Instant::now();

for block in blocks.into_iter() {
let mut block_batch = Vec::with_capacity(STORAGE_WRITER_BATCHES_SIZE);
for block in blocks {
if GlobalState::is_shutdown_warn(TASK_NAME) {
return Ok(());
}
Expand All @@ -162,7 +187,12 @@ fn execute_block_importer(
if blocks_to_export_snapshot.contains(&mined_block.number()) {
export_snapshot(&block, &receipts, &mined_block)?;
}
miner.commit(mined_block.clone())?;
block_batch.push(mined_block);

if block_batch.len() == STORAGE_WRITER_BATCHES_SIZE {
writer_batch_tx.send(block_batch).unwrap();
block_batch = vec![];
}
}

let duration = instant_before_execution.elapsed();
Expand All @@ -177,21 +207,49 @@ fn execute_block_importer(
receipts = receipts.len(),
"reexecuted blocks batch",
);

// send the leftovers
if !block_batch.is_empty() {
writer_batch_tx.send(block_batch).unwrap();
}
}
}

// -----------------------------------------------------------------------------
// Block saver
// -----------------------------------------------------------------------------
fn run_block_saver(miner: Arc<Miner>, writer_batch_rx: mpsc::Receiver<Vec<Block>>) -> anyhow::Result<()> {
const TASK_NAME: &str = "block-saver";
let _timer = DropTimer::start("importer-offline::run_block_saver");

// receives blocks and receipts from the loader to reexecute and import
loop {
if GlobalState::is_shutdown_warn(TASK_NAME) {
return Ok(());
};

// receive new tasks to execute, or exit
let Ok(blocks_batch) = writer_batch_rx.recv() else {
tracing::info!("{} has no more batches to save", TASK_NAME);
return Ok(());
};

for block in blocks_batch {
miner.commit(block)?;
}
}
}

// -----------------------------------------------------------------------------
// Block loader
// -----------------------------------------------------------------------------
async fn execute_external_rpc_storage_loader(
// services
async fn run_external_rpc_storage_loader(
rpc_storage: Arc<dyn ExternalRpcStorage>,
// data
blocks_by_fetch: usize,
paralellism: usize,
mut start: BlockNumber,
end: BlockNumber,
backlog: mpsc::Sender<BacklogTask>,
loader_tx: async_mpsc::Sender<LoadedBatch>,
) -> anyhow::Result<()> {
const TASK_NAME: &str = "external-block-loader";
tracing::info!(%start, %end, "creating task {}", TASK_NAME);
Expand Down Expand Up @@ -236,14 +294,14 @@ async fn execute_external_rpc_storage_loader(
return log_and_err!(message);
}

// send to backlog
if backlog.send((blocks, receipts)).await.is_err() {
return Err(anyhow!(GlobalState::shutdown_from(TASK_NAME, "failed to send task to importer")));
// send to channel
if loader_tx.send((blocks, receipts)).await.is_err() {
return Err(anyhow!(GlobalState::shutdown_from(TASK_NAME, "failed to send task to executor")));
};
}
}

async fn load_blocks_and_receipts(rpc_storage: Arc<dyn ExternalRpcStorage>, block_start: BlockNumber, block_end: BlockNumber) -> anyhow::Result<BacklogTask> {
async fn load_blocks_and_receipts(rpc_storage: Arc<dyn ExternalRpcStorage>, block_start: BlockNumber, block_end: BlockNumber) -> anyhow::Result<LoadedBatch> {
tracing::info!(%block_start, %block_end, "loading blocks and receipts");
let blocks_task = rpc_storage.read_blocks_in_range(block_start, block_end);
let receipts_task = rpc_storage.read_receipts_in_range(block_start, block_end);
Expand Down
2 changes: 1 addition & 1 deletion src/eth/storage/inmemory/inmemory_temporary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::eth::storage::TemporaryStorage;
use crate::log_and_err;

/// Number of previous blocks to keep inmemory to detect conflicts between different blocks.
const MAX_BLOCKS: usize = 64;
pub const MAX_BLOCKS: usize = 64;

#[derive(Debug)]
pub struct InMemoryTemporaryStorage {
Expand Down
1 change: 1 addition & 0 deletions src/eth/storage/inmemory/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ pub use inmemory_history::InMemoryHistory;
pub use inmemory_permanent::InMemoryPermanentStorage;
pub use inmemory_permanent::InMemoryPermanentStorageState;
pub use inmemory_temporary::InMemoryTemporaryStorage;
pub use inmemory_temporary::MAX_BLOCKS as INMEMORY_TEMPORARY_STORAGE_MAX_BLOCKS;
1 change: 1 addition & 0 deletions src/eth/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub use external_rpc_storage::ExternalRpcStorageKind;
pub use inmemory::InMemoryPermanentStorage;
pub use inmemory::InMemoryPermanentStorageState;
pub use inmemory::InMemoryTemporaryStorage;
pub use inmemory::INMEMORY_TEMPORARY_STORAGE_MAX_BLOCKS;
pub use permanent_storage::PermanentStorage;
pub use permanent_storage::PermanentStorageConfig;
pub use permanent_storage::PermanentStorageKind;
Expand Down
2 changes: 1 addition & 1 deletion src/eth/storage/stratus_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ impl StratusStorage {
})
.map_err(Into::into);

if let Ok(ref block) = result {
if let Ok(block) = &result {
Span::with(|s| s.rec_str("block_number", &block.number));
}

Expand Down