Skip to content

Commit

Permalink
fix(core): fix wrong archive id for shard blocks of key block
Browse files Browse the repository at this point in the history
  • Loading branch information
pashinov authored and Rexagon committed Dec 10, 2024
1 parent 6af627b commit b883c85
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 8 deletions.
9 changes: 8 additions & 1 deletion core/src/block_strider/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ where
let prepared_master = {
let cx = Box::new(BlockSubscriberContext {
mc_block_id,
mc_is_key_block: is_key_block,
is_key_block,
block: block.clone(),
archive_data,
Expand All @@ -204,7 +205,11 @@ where
// Start processing shard blocks in parallel
let mut process_futures = FuturesUnordered::new();
while let Some(blocks) = download_futures.next().await.transpose()? {
process_futures.push(Box::pin(self.process_shard_blocks(&mc_block_id, blocks)));
process_futures.push(Box::pin(self.process_shard_blocks(
&mc_block_id,
is_key_block,
blocks,
)));
}
metrics::histogram!("tycho_core_download_sc_blocks_time").record(started_at.elapsed());

Expand Down Expand Up @@ -280,11 +285,13 @@ where
async fn process_shard_blocks(
&self,
mc_block_id: &BlockId,
mc_is_key_block: bool,
mut blocks: Vec<BlockStuffAug>,
) -> Result<()> {
let start_preparing_block = |block: BlockStuffAug| {
let cx = Box::new(BlockSubscriberContext {
mc_block_id: *mc_block_id,
mc_is_key_block,
is_key_block: false,
block: block.data,
archive_data: block.archive_data,
Expand Down
3 changes: 2 additions & 1 deletion core/src/block_strider/state_applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ where
let started_at = std::time::Instant::now();
let cx = StateSubscriberContext {
mc_block_id: cx.mc_block_id,
mc_is_key_block: cx.mc_is_key_block,
is_key_block: cx.is_key_block,
block: cx.block.clone(), // TODO: rewrite without clone
archive_data: cx.archive_data.clone(), // TODO: rewrite without clone
Expand All @@ -204,7 +205,7 @@ where
self.inner
.storage
.block_storage()
.move_into_archive(&prepared.handle)
.move_into_archive(&prepared.handle, cx.mc_is_key_block)
.await?;
}

Expand Down
17 changes: 17 additions & 0 deletions core/src/block_strider/subscriber/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,17 @@ mod ps_subscriber;
// === trait BlockSubscriber ===

pub struct BlockSubscriberContext {
/// Related masterchain block id.
/// In case of context for mc block this id is the same as `block.id()`.
pub mc_block_id: BlockId,
/// Related masterchain block flag.
/// In case of context for mc block this flag is the same as `is_key_block`.
pub mc_is_key_block: bool,
/// Whether the `block` from this context is a key block.
pub is_key_block: bool,
/// Parsed block data.
pub block: BlockStuff,
/// Serialized block data.
pub archive_data: ArchiveData,
}

Expand Down Expand Up @@ -123,10 +131,19 @@ impl<B: BlockSubscriber> BlockSubscriberExt for B {
// === trait StateSubscriber ===

pub struct StateSubscriberContext {
/// Related masterchain block id.
/// In case of context for mc block this id is the same as `block.id()`.
pub mc_block_id: BlockId,
/// Related masterchain block flag.
/// In case of context for mc block this flag is the same as `is_key_block`.
pub mc_is_key_block: bool,
/// Whether the `block` from this context is a key block.
pub is_key_block: bool,
/// Parsed block data.
pub block: BlockStuff,
/// Serialized block data.
pub archive_data: ArchiveData,
/// Applied shard state.
pub state: ShardStateStuff,
}

Expand Down
17 changes: 11 additions & 6 deletions storage/src/store/block/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,11 @@ impl BlockStorage {
// === Archive stuff ===

/// Loads data and proof for the block and appends them to the corresponding archive.
pub async fn move_into_archive(&self, handle: &BlockHandle) -> Result<()> {
pub async fn move_into_archive(
&self,
handle: &BlockHandle,
mc_is_key_block: bool,
) -> Result<()> {
let _histogram = HistogramGuard::begin("tycho_storage_move_into_archive_time");

// Prepare data
Expand All @@ -651,7 +655,10 @@ impl BlockStorage {
let chunks_cf = self.db.archives.cf();

// Prepare archive
let archive_id = self.prepare_archive_id(handle);
let archive_id = self.prepare_archive_id(
handle.ref_by_mc_seqno(),
mc_is_key_block || handle.is_key_block(),
);
let archive_id_bytes = archive_id.id.to_be_bytes();

// 0. Create transaction
Expand Down Expand Up @@ -986,15 +993,13 @@ impl BlockStorage {
}
}

fn prepare_archive_id(&self, handle: &BlockHandle) -> PreparedArchiveId {
let mc_seqno = handle.ref_by_mc_seqno();

fn prepare_archive_id(&self, mc_seqno: u32, force_split_archive: bool) -> PreparedArchiveId {
let mut archive_ids = self.archive_ids.write();

// Get the closest archive id
let prev_id = archive_ids.range(..=mc_seqno).next_back().cloned();

if handle.is_key_block() {
if force_split_archive {
let is_new = archive_ids.insert(mc_seqno);
return PreparedArchiveId {
id: mc_seqno,
Expand Down

0 comments on commit b883c85

Please sign in to comment.