From b883c85f9d4e3afc6a46a6b00bbb68b805de1e35 Mon Sep 17 00:00:00 2001 From: Alexey Pashinov Date: Tue, 10 Dec 2024 17:59:12 +0100 Subject: [PATCH] fix(core): fix wrong archive id for shard blocks of key block --- core/src/block_strider/mod.rs | 9 ++++++++- core/src/block_strider/state_applier.rs | 3 ++- core/src/block_strider/subscriber/mod.rs | 17 +++++++++++++++++ storage/src/store/block/mod.rs | 17 +++++++++++------ 4 files changed, 38 insertions(+), 8 deletions(-) diff --git a/core/src/block_strider/mod.rs b/core/src/block_strider/mod.rs index 01a30c680..9f4857844 100644 --- a/core/src/block_strider/mod.rs +++ b/core/src/block_strider/mod.rs @@ -178,6 +178,7 @@ where let prepared_master = { let cx = Box::new(BlockSubscriberContext { mc_block_id, + mc_is_key_block: is_key_block, is_key_block, block: block.clone(), archive_data, @@ -204,7 +205,11 @@ where // Start processing shard blocks in parallel let mut process_futures = FuturesUnordered::new(); while let Some(blocks) = download_futures.next().await.transpose()? { - process_futures.push(Box::pin(self.process_shard_blocks(&mc_block_id, blocks))); + process_futures.push(Box::pin(self.process_shard_blocks( + &mc_block_id, + is_key_block, + blocks, + ))); } metrics::histogram!("tycho_core_download_sc_blocks_time").record(started_at.elapsed()); @@ -280,11 +285,13 @@ where async fn process_shard_blocks( &self, mc_block_id: &BlockId, + mc_is_key_block: bool, mut blocks: Vec, ) -> Result<()> { let start_preparing_block = |block: BlockStuffAug| { let cx = Box::new(BlockSubscriberContext { mc_block_id: *mc_block_id, + mc_is_key_block, is_key_block: false, block: block.data, archive_data: block.archive_data, diff --git a/core/src/block_strider/state_applier.rs b/core/src/block_strider/state_applier.rs index 39a332cff..ae7d8bb96 100644 --- a/core/src/block_strider/state_applier.rs +++ b/core/src/block_strider/state_applier.rs @@ -184,6 +184,7 @@ where let started_at = std::time::Instant::now(); let cx = StateSubscriberContext { mc_block_id: cx.mc_block_id, + mc_is_key_block: cx.mc_is_key_block, is_key_block: cx.is_key_block, block: cx.block.clone(), // TODO: rewrite without clone archive_data: cx.archive_data.clone(), // TODO: rewrite without clone @@ -204,7 +205,7 @@ where self.inner .storage .block_storage() - .move_into_archive(&prepared.handle) + .move_into_archive(&prepared.handle, cx.mc_is_key_block) .await?; } diff --git a/core/src/block_strider/subscriber/mod.rs b/core/src/block_strider/subscriber/mod.rs index 2491e49f1..d8bbdbf21 100644 --- a/core/src/block_strider/subscriber/mod.rs +++ b/core/src/block_strider/subscriber/mod.rs @@ -21,9 +21,17 @@ mod ps_subscriber; // === trait BlockSubscriber === pub struct BlockSubscriberContext { + /// Related masterchain block id. + /// In case of context for mc block this id is the same as `block.id()`. pub mc_block_id: BlockId, + /// Related masterchain block flag. + /// In case of context for mc block this flag is the same as `is_key_block`. + pub mc_is_key_block: bool, + /// Whether the `block` from this context is a key block. pub is_key_block: bool, + /// Parsed block data. pub block: BlockStuff, + /// Serialized block data. pub archive_data: ArchiveData, } @@ -123,10 +131,19 @@ impl BlockSubscriberExt for B { // === trait StateSubscriber === pub struct StateSubscriberContext { + /// Related masterchain block id. + /// In case of context for mc block this id is the same as `block.id()`. pub mc_block_id: BlockId, + /// Related masterchain block flag. + /// In case of context for mc block this flag is the same as `is_key_block`. + pub mc_is_key_block: bool, + /// Whether the `block` from this context is a key block. pub is_key_block: bool, + /// Parsed block data. pub block: BlockStuff, + /// Serialized block data. pub archive_data: ArchiveData, + /// Applied shard state. pub state: ShardStateStuff, } diff --git a/storage/src/store/block/mod.rs b/storage/src/store/block/mod.rs index 9c4f59cf9..8ab3907ab 100644 --- a/storage/src/store/block/mod.rs +++ b/storage/src/store/block/mod.rs @@ -640,7 +640,11 @@ impl BlockStorage { // === Archive stuff === /// Loads data and proof for the block and appends them to the corresponding archive. - pub async fn move_into_archive(&self, handle: &BlockHandle) -> Result<()> { + pub async fn move_into_archive( + &self, + handle: &BlockHandle, + mc_is_key_block: bool, + ) -> Result<()> { let _histogram = HistogramGuard::begin("tycho_storage_move_into_archive_time"); // Prepare data @@ -651,7 +655,10 @@ impl BlockStorage { let chunks_cf = self.db.archives.cf(); // Prepare archive - let archive_id = self.prepare_archive_id(handle); + let archive_id = self.prepare_archive_id( + handle.ref_by_mc_seqno(), + mc_is_key_block || handle.is_key_block(), + ); let archive_id_bytes = archive_id.id.to_be_bytes(); // 0. Create transaction @@ -986,15 +993,13 @@ impl BlockStorage { } } - fn prepare_archive_id(&self, handle: &BlockHandle) -> PreparedArchiveId { - let mc_seqno = handle.ref_by_mc_seqno(); - + fn prepare_archive_id(&self, mc_seqno: u32, force_split_archive: bool) -> PreparedArchiveId { let mut archive_ids = self.archive_ids.write(); // Get the closest archive id let prev_id = archive_ids.range(..=mc_seqno).next_back().cloned(); - if handle.is_key_block() { + if force_split_archive { let is_new = archive_ids.insert(mc_seqno); return PreparedArchiveId { id: mc_seqno,