Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): download shard blocks in archive provider #487

Merged
merged 7 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions block-util/src/archive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,24 @@ impl Archive {
Ok(res)
}

pub fn check_mc_blocks_range(&self) -> Result<()> {
match (
self.mc_block_ids.first_key_value(),
self.mc_block_ids.last_key_value(),
) {
(Some((first_seqno, _)), Some((last_seqno, _))) => {
if (last_seqno - first_seqno + 1) != self.mc_block_ids.len() as u32 {
anyhow::bail!("archive does not contain some mc blocks")
}

Ok(())
}
_ => {
anyhow::bail!("archive is empty")
}
}
}

// TODO: Make async
pub fn get_entry_by_id(
&self,
Expand Down
11 changes: 11 additions & 0 deletions cli/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use anyhow::{Context, Result};
use bytes::Bytes;
use everscale_crypto::ed25519;
use everscale_types::models::*;
use futures_util::future;
use futures_util::future::BoxFuture;
use tycho_block_util::block::BlockIdRelation;
use tycho_block_util::state::MinRefMcStateTracker;
Expand Down Expand Up @@ -491,6 +492,7 @@ struct SetSyncContext {
impl BlockProvider for SetSyncContext {
type GetNextBlockFut<'a> = futures_util::future::Ready<OptionalBlockStuff>;
type GetBlockFut<'a> = futures_util::future::Ready<OptionalBlockStuff>;
type CleanupFut<'a> = futures_util::future::Ready<Result<()>>;

fn get_next_block<'a>(&'a self, _: &'a BlockId) -> Self::GetNextBlockFut<'a> {
self.adapter.set_sync_context(self.ctx);
Expand All @@ -500,6 +502,10 @@ impl BlockProvider for SetSyncContext {
fn get_block<'a>(&'a self, _: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
futures_util::future::ready(None)
}

fn cleanup_until(&self, _mc_seqno: u32) -> Self::CleanupFut<'_> {
futures_util::future::ready(Ok(()))
}
}

struct CollatorStateSubscriber {
Expand Down Expand Up @@ -530,6 +536,7 @@ struct CollatorBlockProvider {
impl BlockProvider for CollatorBlockProvider {
type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
type CleanupFut<'a> = future::Ready<Result<()>>;

fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
self.adapter.wait_for_block_next(prev_block_id)
Expand All @@ -538,6 +545,10 @@ impl BlockProvider for CollatorBlockProvider {
fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
self.adapter.wait_for_block(&block_id_relation.block_id)
}

fn cleanup_until(&self, _mc_seqno: u32) -> Self::CleanupFut<'_> {
futures_util::future::ready(Ok(()))
}
}

struct CollatorControl {
Expand Down
5 changes: 5 additions & 0 deletions collator/tests/collation_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ struct StrangeBlockProvider {
impl BlockProvider for StrangeBlockProvider {
type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
type CleanupFut<'a> = futures_util::future::Ready<Result<()>>;

fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
tracing::info!("Get next block: {:?}", prev_block_id);
Expand All @@ -42,6 +43,10 @@ impl BlockProvider for StrangeBlockProvider {
tracing::info!("Get block: {:?}", block_id);
self.adapter.wait_for_block(&block_id.block_id)
}

fn cleanup_until(&self, _mc_seqno: u32) -> Self::CleanupFut<'_> {
futures_util::future::ready(Ok(()))
}
}

impl StateSubscriber for StrangeBlockProvider {
Expand Down
22 changes: 19 additions & 3 deletions core/src/block_strider/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,17 @@ where
// NOTE: Start fetching the next master block in parallel to the processing of the current one
next_master_fut = JoinTask::new(self.fetch_next_master_block(next.id()));

let _histogram = HistogramGuard::begin("tycho_core_process_strider_step_time");
self.process_mc_block(next.data, next.archive_data).await?;
let mc_seqno = next.id().seqno;

{
let _histogram = HistogramGuard::begin("tycho_core_process_strider_step_time");
self.process_mc_block(next.data, next.archive_data).await?;
}

{
let _histogram = HistogramGuard::begin("tycho_core_provider_cleanup_time");
self.provider.cleanup_until(mc_seqno).await?;
}
}

tracing::info!("block strider loop finished");
Expand All @@ -169,6 +178,7 @@ where
let prepared_master = {
let cx = Box::new(BlockSubscriberContext {
mc_block_id,
mc_is_key_block: is_key_block,
is_key_block,
block: block.clone(),
archive_data,
Expand All @@ -195,7 +205,11 @@ where
// Start processing shard blocks in parallel
let mut process_futures = FuturesUnordered::new();
while let Some(blocks) = download_futures.next().await.transpose()? {
process_futures.push(Box::pin(self.process_shard_blocks(&mc_block_id, blocks)));
process_futures.push(Box::pin(self.process_shard_blocks(
&mc_block_id,
is_key_block,
blocks,
)));
}
metrics::histogram!("tycho_core_download_sc_blocks_time").record(started_at.elapsed());

Expand Down Expand Up @@ -271,11 +285,13 @@ where
async fn process_shard_blocks(
&self,
mc_block_id: &BlockId,
mc_is_key_block: bool,
mut blocks: Vec<BlockStuffAug>,
) -> Result<()> {
let start_preparing_block = |block: BlockStuffAug| {
let cx = Box::new(BlockSubscriberContext {
mc_block_id: *mc_block_id,
mc_is_key_block,
is_key_block: false,
block: block.data,
archive_data: block.archive_data,
Expand Down
Loading
Loading