Skip to content

Commit

Permalink
Address some FIXMEs for wen-restart
Browse files Browse the repository at this point in the history
  • Loading branch information
yhzhangjump committed Nov 27, 2024
1 parent c4f1048 commit 5ac9a72
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 106 deletions.
75 changes: 41 additions & 34 deletions src/app/fdctl/run/tiles/fd_gossip.c
Original file line number Diff line number Diff line change
Expand Up @@ -293,40 +293,7 @@ static void
gossip_deliver_fun( fd_crds_data_t * data, void * arg ) {
fd_gossip_tile_ctx_t * ctx = (fd_gossip_tile_ctx_t *)arg;

if( fd_crds_data_is_restart_last_voted_fork_slots( data ) ) {
ulong struct_len = sizeof( fd_gossip_restart_last_voted_fork_slots_t );
/* TODO: handle RunLengthEncoding for bitmap */
ulong bitmap_len = data->inner.restart_last_voted_fork_slots.offsets.inner.raw_offsets.offsets.bits.bits_len;
if( FD_UNLIKELY( bitmap_len>LAST_VOTED_FORK_MAX_BITMAP_BYTES ) ) {
FD_LOG_WARNING(( "Ignore an invalid gossip message with bitmap length %lu, greater than %lu", bitmap_len, LAST_VOTED_FORK_MAX_BITMAP_BYTES ));
return;
}

uchar * last_vote_msg_ = fd_chunk_to_laddr( ctx->replay_out_mem, ctx->replay_out_chunk );
FD_STORE( uint, last_vote_msg_, fd_crds_data_enum_restart_last_voted_fork_slots );
fd_memcpy( last_vote_msg_+sizeof(uint), &data->inner.restart_last_voted_fork_slots, struct_len );
fd_memcpy( last_vote_msg_+sizeof(uint)+struct_len,
data->inner.restart_last_voted_fork_slots.offsets.inner.raw_offsets.offsets.bits.bits,
bitmap_len );

ulong total_len = sizeof(uint) + struct_len + bitmap_len;
fd_mcache_publish( ctx->replay_out_mcache, ctx->replay_out_depth, ctx->replay_out_seq, 1UL, ctx->replay_out_chunk,
total_len, 0UL, 0, 0 );
ctx->replay_out_seq = fd_seq_inc( ctx->replay_out_seq, 1UL );
ctx->replay_out_chunk = fd_dcache_compact_next( ctx->replay_out_chunk, total_len, ctx->replay_out_chunk0, ctx->replay_out_wmark );
} else if( fd_crds_data_is_restart_heaviest_fork( data ) ) {
uchar * heaviest_fork_msg_ = fd_chunk_to_laddr( ctx->replay_out_mem, ctx->replay_out_chunk );
FD_STORE( uint, heaviest_fork_msg_, fd_crds_data_enum_restart_heaviest_fork );
fd_memcpy( heaviest_fork_msg_+sizeof(uint),
&data->inner.restart_heaviest_fork,
sizeof(fd_gossip_restart_heaviest_fork_t) );

ulong total_len = sizeof(uint) + sizeof(fd_gossip_restart_heaviest_fork_t);
fd_mcache_publish( ctx->replay_out_mcache, ctx->replay_out_depth, ctx->replay_out_seq, 1UL, ctx->replay_out_chunk,
total_len, 0UL, 0, 0 );
ctx->replay_out_seq = fd_seq_inc( ctx->replay_out_seq, 1UL );
ctx->replay_out_chunk = fd_dcache_compact_next( ctx->replay_out_chunk, total_len, ctx->replay_out_chunk0, ctx->replay_out_wmark );
} else if( fd_crds_data_is_vote( data ) ) {
if( fd_crds_data_is_vote( data ) ) {
fd_gossip_vote_t const * gossip_vote = &data->inner.vote;
if( verify_vote_txn( gossip_vote ) != 0 ) {
return;
Expand Down Expand Up @@ -382,6 +349,46 @@ gossip_deliver_fun( fd_crds_data_t * data, void * arg ) {
// duplicate_shred->chunk_len, 0UL, 0, 0 );
// ctx->duplicate_shred_out_seq = fd_seq_inc( ctx->duplicate_shred_out_seq, 1UL );
// ctx->duplicate_shred_out_chunk = fd_dcache_compact_next( ctx->duplicate_shred_out_chunk, duplicate_shred->chunk_len, ctx->duplicate_shred_out_chunk0, ctx->duplicate_shred_out_wmark );
} else if( fd_crds_data_is_restart_last_voted_fork_slots( data ) ) {
ulong struct_len = sizeof( fd_gossip_restart_last_voted_fork_slots_t );
uchar * last_vote_msg_ = fd_chunk_to_laddr( ctx->replay_out_mem, ctx->replay_out_chunk );
FD_STORE( uint, last_vote_msg_, fd_crds_data_enum_restart_last_voted_fork_slots );

ulong bitmap_len = 0;
uchar * bitmap_dst = last_vote_msg_+sizeof(uint)+struct_len;
if ( FD_LIKELY( data->inner.restart_last_voted_fork_slots.offsets.discriminant==fd_restart_slots_offsets_enum_raw_offsets ) ) {
uchar * bitmap_src = data->inner.restart_last_voted_fork_slots.offsets.inner.raw_offsets.offsets.bits.bits;
bitmap_len = data->inner.restart_last_voted_fork_slots.offsets.inner.raw_offsets.offsets.bits.bits_len;
memcpy( bitmap_dst, bitmap_src, bitmap_len );
} else {
uchar bitmap_src [ LAST_VOTED_FORK_MAX_BITMAP_BYTES ];
fd_restart_convert_runlength_to_raw_bitmap( &data->inner.restart_last_voted_fork_slots, bitmap_src, &bitmap_len );
if( FD_UNLIKELY( bitmap_len > LAST_VOTED_FORK_MAX_BITMAP_BYTES ) ) {
FD_LOG_WARNING(( "Ignore an invalid gossip message with bitmap length greater than %lu", LAST_VOTED_FORK_MAX_BITMAP_BYTES ));
return;
}
memcpy( bitmap_dst, bitmap_src, bitmap_len );
}
/* Copy the struct to the buffer now because it may be modified by fd_restart_convert_runlength_to_raw_bitmap */
fd_memcpy( last_vote_msg_+sizeof(uint), &data->inner.restart_last_voted_fork_slots, struct_len );

ulong total_len = sizeof(uint) + struct_len + bitmap_len;
fd_mcache_publish( ctx->replay_out_mcache, ctx->replay_out_depth, ctx->replay_out_seq, 1UL, ctx->replay_out_chunk,
total_len, 0UL, 0, 0 );
ctx->replay_out_seq = fd_seq_inc( ctx->replay_out_seq, 1UL );
ctx->replay_out_chunk = fd_dcache_compact_next( ctx->replay_out_chunk, total_len, ctx->replay_out_chunk0, ctx->replay_out_wmark );
} else if( fd_crds_data_is_restart_heaviest_fork( data ) ) {
uchar * heaviest_fork_msg_ = fd_chunk_to_laddr( ctx->replay_out_mem, ctx->replay_out_chunk );
FD_STORE( uint, heaviest_fork_msg_, fd_crds_data_enum_restart_heaviest_fork );
fd_memcpy( heaviest_fork_msg_+sizeof(uint),
&data->inner.restart_heaviest_fork,
sizeof(fd_gossip_restart_heaviest_fork_t) );

ulong total_len = sizeof(uint) + sizeof(fd_gossip_restart_heaviest_fork_t);
fd_mcache_publish( ctx->replay_out_mcache, ctx->replay_out_depth, ctx->replay_out_seq, 1UL, ctx->replay_out_chunk,
total_len, 0UL, 0, 0 );
ctx->replay_out_seq = fd_seq_inc( ctx->replay_out_seq, 1UL );
ctx->replay_out_chunk = fd_dcache_compact_next( ctx->replay_out_chunk, total_len, ctx->replay_out_chunk0, ctx->replay_out_wmark );
}
}

Expand Down
63 changes: 34 additions & 29 deletions src/app/fdctl/run/tiles/fd_replay.c
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,11 @@ during_frag( fd_replay_tile_ctx_t * ctx,
FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->pack_in_chunk0, ctx->pack_in_wmark ));
}

fd_memcpy( ctx->restart_gossip_msg, fd_chunk_to_laddr( ctx->gossip_in_mem, chunk ), sz );
if( FD_LIKELY( ctx->in_wen_restart ) ) {
fd_memcpy( ctx->restart_gossip_msg, fd_chunk_to_laddr( ctx->gossip_in_mem, chunk ), sz );
} else {
FD_LOG_WARNING(( "Received a gossip message for wen-restart while FD is not in wen-restart mode" ));
}
return;
}

Expand Down Expand Up @@ -893,23 +897,21 @@ after_frag( fd_replay_tile_ctx_t * ctx,
(void)tsorig;

if( FD_UNLIKELY( in_idx==GOSSIP_IN_IDX ) ) {
if( FD_UNLIKELY( !ctx->in_wen_restart ) ) {
FD_LOG_WARNING(( "Received gossip messages for wen-restart while FD is not in wen-restart mode" ));
} else {
ulong heaviest_fork_found = 0;
fd_restart_recv_gossip_msg( ctx->restart, ctx->restart_gossip_msg, &heaviest_fork_found );
if( FD_UNLIKELY( heaviest_fork_found ) ) {
ulong need_repair = 0;
fd_restart_find_heaviest_fork_bank_hash( ctx->restart, ctx->funk, ctx->blockstore, &need_repair );
if( FD_LIKELY( need_repair ) ) {
/* Send the heaviest fork slot to the store tile for repair and replay */
uchar * buf = fd_chunk_to_laddr( ctx->store_out_mem, ctx->store_out_chunk );
FD_STORE( ulong, buf, ctx->restart->heaviest_fork_slot );
fd_mcache_publish( ctx->store_out_mcache, ctx->store_out_depth, ctx->store_out_seq, 1UL, ctx->store_out_chunk,
sizeof(ulong), 0UL, 0, 0 );
ctx->store_out_seq = fd_seq_inc( ctx->store_out_seq, 1UL );
ctx->store_out_chunk = fd_dcache_compact_next( ctx->store_out_chunk, sizeof(ulong), ctx->store_out_chunk0, ctx->store_out_wmark );
}
if( FD_UNLIKELY( !ctx->in_wen_restart ) ) return;

ulong heaviest_fork_found = 0;
fd_restart_recv_gossip_msg( ctx->restart, ctx->restart_gossip_msg, &heaviest_fork_found );
if( FD_UNLIKELY( heaviest_fork_found ) ) {
ulong need_repair = 0;
fd_restart_find_heaviest_fork_bank_hash( ctx->restart, ctx->funk, ctx->blockstore, &need_repair );
if( FD_LIKELY( need_repair ) ) {
/* Send the heaviest fork slot to the store tile for repair and replay */
uchar * buf = fd_chunk_to_laddr( ctx->store_out_mem, ctx->store_out_chunk );
FD_STORE( ulong, buf, ctx->restart->heaviest_fork_slot );
fd_mcache_publish( ctx->store_out_mcache, ctx->store_out_depth, ctx->store_out_seq, 1UL, ctx->store_out_chunk,
sizeof(ulong), 0UL, 0, 0 );
ctx->store_out_seq = fd_seq_inc( ctx->store_out_seq, 1UL );
ctx->store_out_chunk = fd_dcache_compact_next( ctx->store_out_chunk, sizeof(ulong), ctx->store_out_chunk0, ctx->store_out_wmark );
}
}
return;
Expand Down Expand Up @@ -1467,8 +1469,21 @@ after_credit( fd_replay_tile_ctx_t * ctx,
ulong buf_len = 0;
uchar * buf = fd_chunk_to_laddr( ctx->gossip_out_mem, ctx->gossip_out_chunk );
fd_sysvar_slot_history_read( ctx->slot_ctx, fd_scratch_virtual(), ctx->slot_ctx->slot_history );

fd_epoch_bank_t * epoch_bank = fd_exec_epoch_ctx_epoch_bank( ctx->slot_ctx->epoch_ctx );
fd_stakes_t * stakes = &epoch_bank->stakes;
/* FIXME: Restoring funk checkpoint does not give the correct epoch number,
* i.e., the epoch number when the funk checkpoint file is produced.
* For now, we need to redo stakes->epoch in order to avoid a BHM in
* a local setup when repairing blocks. */
stakes->epoch = ctx->blockstore->smr / epoch_bank->epoch_schedule.slots_per_epoch;
FD_LOG_NOTICE(( "slots_per_epoch=%lu, blockstore smr=%lu", epoch_bank->epoch_schedule.slots_per_epoch, ctx->blockstore->smr ));
FD_LOG_WARNING(( "Reset stakes->epoch=%lu (blockstore root / slots per epoch)", stakes->epoch ));

fd_restart_init( ctx->restart,
stakes->epoch,
&ctx->slot_ctx->slot_bank.epoch_stakes,
&epoch_bank->next_epoch_stakes,
ctx->tower,
ctx->slot_ctx->slot_history,
ctx->funk,
Expand All @@ -1485,17 +1500,6 @@ after_credit( fd_replay_tile_ctx_t * ctx,
buf_len, 0UL, 0, 0 );
ctx->gossip_out_seq = fd_seq_inc( ctx->gossip_out_seq, 1UL );
ctx->gossip_out_chunk = fd_dcache_compact_next( ctx->gossip_out_chunk, buf_len, ctx->gossip_out_chunk0, ctx->gossip_out_wmark );

/* FIXME: Restoring funk checkpoint does not give the correct epoch number,
* i.e., the epoch number when the funk checkpoint file is produced.
* For now, we need to redo stakes->epoch in order to avoid a BHM in
* a local setup when repairing blocks. */
fd_epoch_bank_t * epoch_bank = fd_exec_epoch_ctx_epoch_bank( ctx->slot_ctx->epoch_ctx );
fd_stakes_t * stakes = &epoch_bank->stakes;
stakes->epoch = ctx->blockstore->smr / epoch_bank->epoch_schedule.slots_per_epoch;
FD_LOG_NOTICE(( "slots_per_epoch=%lu, blockstore root=%lu", epoch_bank->epoch_schedule.slots_per_epoch, ctx->blockstore->smr ));
FD_LOG_WARNING(( "Reset stakes->epoch=%lu (blockstore root / slots per epoch)", stakes->epoch ));

} FD_SCRATCH_SCOPE_END;
}
}
Expand Down Expand Up @@ -1549,6 +1553,7 @@ during_housekeeping( void * _ctx ) {
fd_ghost_publish( ctx->ghost, smr );
}

/* TODO: generate snapshot file instead of checkpointing funk; dump the latest tower sent */
/* FIXME: Decide how to tell FD to checkpoint funk and then halt before restarting FD in wen-restart mode */
if( ctx->in_wen_restart && ctx->curr_slot>ctx->snapshot_slot+250 ) {
checkpt( ctx );
Expand Down
Loading

0 comments on commit 5ac9a72

Please sign in to comment.