Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address some FIXMEs for wen-restart #3540

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 41 additions & 34 deletions src/app/fdctl/run/tiles/fd_gossip.c
Original file line number Diff line number Diff line change
Expand Up @@ -293,40 +293,7 @@ static void
gossip_deliver_fun( fd_crds_data_t * data, void * arg ) {
fd_gossip_tile_ctx_t * ctx = (fd_gossip_tile_ctx_t *)arg;

if( fd_crds_data_is_restart_last_voted_fork_slots( data ) ) {
ulong struct_len = sizeof( fd_gossip_restart_last_voted_fork_slots_t );
/* TODO: handle RunLengthEncoding for bitmap */
ulong bitmap_len = data->inner.restart_last_voted_fork_slots.offsets.inner.raw_offsets.offsets.bits.bits_len;
if( FD_UNLIKELY( bitmap_len>LAST_VOTED_FORK_MAX_BITMAP_BYTES ) ) {
FD_LOG_WARNING(( "Ignore an invalid gossip message with bitmap length %lu, greater than %lu", bitmap_len, LAST_VOTED_FORK_MAX_BITMAP_BYTES ));
return;
}

uchar * last_vote_msg_ = fd_chunk_to_laddr( ctx->replay_out_mem, ctx->replay_out_chunk );
FD_STORE( uint, last_vote_msg_, fd_crds_data_enum_restart_last_voted_fork_slots );
fd_memcpy( last_vote_msg_+sizeof(uint), &data->inner.restart_last_voted_fork_slots, struct_len );
fd_memcpy( last_vote_msg_+sizeof(uint)+struct_len,
data->inner.restart_last_voted_fork_slots.offsets.inner.raw_offsets.offsets.bits.bits,
bitmap_len );

ulong total_len = sizeof(uint) + struct_len + bitmap_len;
fd_mcache_publish( ctx->replay_out_mcache, ctx->replay_out_depth, ctx->replay_out_seq, 1UL, ctx->replay_out_chunk,
total_len, 0UL, 0, 0 );
ctx->replay_out_seq = fd_seq_inc( ctx->replay_out_seq, 1UL );
ctx->replay_out_chunk = fd_dcache_compact_next( ctx->replay_out_chunk, total_len, ctx->replay_out_chunk0, ctx->replay_out_wmark );
} else if( fd_crds_data_is_restart_heaviest_fork( data ) ) {
uchar * heaviest_fork_msg_ = fd_chunk_to_laddr( ctx->replay_out_mem, ctx->replay_out_chunk );
FD_STORE( uint, heaviest_fork_msg_, fd_crds_data_enum_restart_heaviest_fork );
fd_memcpy( heaviest_fork_msg_+sizeof(uint),
&data->inner.restart_heaviest_fork,
sizeof(fd_gossip_restart_heaviest_fork_t) );

ulong total_len = sizeof(uint) + sizeof(fd_gossip_restart_heaviest_fork_t);
fd_mcache_publish( ctx->replay_out_mcache, ctx->replay_out_depth, ctx->replay_out_seq, 1UL, ctx->replay_out_chunk,
total_len, 0UL, 0, 0 );
ctx->replay_out_seq = fd_seq_inc( ctx->replay_out_seq, 1UL );
ctx->replay_out_chunk = fd_dcache_compact_next( ctx->replay_out_chunk, total_len, ctx->replay_out_chunk0, ctx->replay_out_wmark );
} else if( fd_crds_data_is_vote( data ) ) {
if( fd_crds_data_is_vote( data ) ) {
fd_gossip_vote_t const * gossip_vote = &data->inner.vote;
if( verify_vote_txn( gossip_vote ) != 0 ) {
return;
Expand Down Expand Up @@ -382,6 +349,46 @@ gossip_deliver_fun( fd_crds_data_t * data, void * arg ) {
// duplicate_shred->chunk_len, 0UL, 0, 0 );
// ctx->duplicate_shred_out_seq = fd_seq_inc( ctx->duplicate_shred_out_seq, 1UL );
// ctx->duplicate_shred_out_chunk = fd_dcache_compact_next( ctx->duplicate_shred_out_chunk, duplicate_shred->chunk_len, ctx->duplicate_shred_out_chunk0, ctx->duplicate_shred_out_wmark );
} else if( fd_crds_data_is_restart_last_voted_fork_slots( data ) ) {
ulong struct_len = sizeof( fd_gossip_restart_last_voted_fork_slots_t );
uchar * last_vote_msg_ = fd_chunk_to_laddr( ctx->replay_out_mem, ctx->replay_out_chunk );
FD_STORE( uint, last_vote_msg_, fd_crds_data_enum_restart_last_voted_fork_slots );

ulong bitmap_len = 0;
uchar * bitmap_dst = last_vote_msg_+sizeof(uint)+struct_len;
if ( FD_LIKELY( data->inner.restart_last_voted_fork_slots.offsets.discriminant==fd_restart_slots_offsets_enum_raw_offsets ) ) {
uchar * bitmap_src = data->inner.restart_last_voted_fork_slots.offsets.inner.raw_offsets.offsets.bits.bits;
bitmap_len = data->inner.restart_last_voted_fork_slots.offsets.inner.raw_offsets.offsets.bits.bits_len;
memcpy( bitmap_dst, bitmap_src, bitmap_len );
} else {
uchar bitmap_src [ LAST_VOTED_FORK_MAX_BITMAP_BYTES ];
fd_restart_convert_runlength_to_raw_bitmap( &data->inner.restart_last_voted_fork_slots, bitmap_src, &bitmap_len );
if( FD_UNLIKELY( bitmap_len > LAST_VOTED_FORK_MAX_BITMAP_BYTES ) ) {
FD_LOG_WARNING(( "Ignore an invalid gossip message with bitmap length greater than %lu", LAST_VOTED_FORK_MAX_BITMAP_BYTES ));
return;
}
memcpy( bitmap_dst, bitmap_src, bitmap_len );
}
/* Copy the struct to the buffer now because it may be modified by fd_restart_convert_runlength_to_raw_bitmap */
fd_memcpy( last_vote_msg_+sizeof(uint), &data->inner.restart_last_voted_fork_slots, struct_len );

ulong total_len = sizeof(uint) + struct_len + bitmap_len;
fd_mcache_publish( ctx->replay_out_mcache, ctx->replay_out_depth, ctx->replay_out_seq, 1UL, ctx->replay_out_chunk,
total_len, 0UL, 0, 0 );
ctx->replay_out_seq = fd_seq_inc( ctx->replay_out_seq, 1UL );
ctx->replay_out_chunk = fd_dcache_compact_next( ctx->replay_out_chunk, total_len, ctx->replay_out_chunk0, ctx->replay_out_wmark );
} else if( fd_crds_data_is_restart_heaviest_fork( data ) ) {
uchar * heaviest_fork_msg_ = fd_chunk_to_laddr( ctx->replay_out_mem, ctx->replay_out_chunk );
FD_STORE( uint, heaviest_fork_msg_, fd_crds_data_enum_restart_heaviest_fork );
fd_memcpy( heaviest_fork_msg_+sizeof(uint),
&data->inner.restart_heaviest_fork,
sizeof(fd_gossip_restart_heaviest_fork_t) );

ulong total_len = sizeof(uint) + sizeof(fd_gossip_restart_heaviest_fork_t);
fd_mcache_publish( ctx->replay_out_mcache, ctx->replay_out_depth, ctx->replay_out_seq, 1UL, ctx->replay_out_chunk,
total_len, 0UL, 0, 0 );
ctx->replay_out_seq = fd_seq_inc( ctx->replay_out_seq, 1UL );
ctx->replay_out_chunk = fd_dcache_compact_next( ctx->replay_out_chunk, total_len, ctx->replay_out_chunk0, ctx->replay_out_wmark );
}
}

Expand Down
63 changes: 34 additions & 29 deletions src/app/fdctl/run/tiles/fd_replay.c
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,11 @@ during_frag( fd_replay_tile_ctx_t * ctx,
FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->pack_in_chunk0, ctx->pack_in_wmark ));
}

fd_memcpy( ctx->restart_gossip_msg, fd_chunk_to_laddr( ctx->gossip_in_mem, chunk ), sz );
if( FD_LIKELY( ctx->in_wen_restart ) ) {
fd_memcpy( ctx->restart_gossip_msg, fd_chunk_to_laddr( ctx->gossip_in_mem, chunk ), sz );
} else {
FD_LOG_WARNING(( "Received a gossip message for wen-restart while FD is not in wen-restart mode" ));
}
return;
}

Expand Down Expand Up @@ -893,23 +897,21 @@ after_frag( fd_replay_tile_ctx_t * ctx,
(void)tsorig;

if( FD_UNLIKELY( in_idx==GOSSIP_IN_IDX ) ) {
if( FD_UNLIKELY( !ctx->in_wen_restart ) ) {
FD_LOG_WARNING(( "Received gossip messages for wen-restart while FD is not in wen-restart mode" ));
} else {
ulong heaviest_fork_found = 0;
fd_restart_recv_gossip_msg( ctx->restart, ctx->restart_gossip_msg, &heaviest_fork_found );
if( FD_UNLIKELY( heaviest_fork_found ) ) {
ulong need_repair = 0;
fd_restart_find_heaviest_fork_bank_hash( ctx->restart, ctx->funk, ctx->blockstore, &need_repair );
if( FD_LIKELY( need_repair ) ) {
/* Send the heaviest fork slot to the store tile for repair and replay */
uchar * buf = fd_chunk_to_laddr( ctx->store_out_mem, ctx->store_out_chunk );
FD_STORE( ulong, buf, ctx->restart->heaviest_fork_slot );
fd_mcache_publish( ctx->store_out_mcache, ctx->store_out_depth, ctx->store_out_seq, 1UL, ctx->store_out_chunk,
sizeof(ulong), 0UL, 0, 0 );
ctx->store_out_seq = fd_seq_inc( ctx->store_out_seq, 1UL );
ctx->store_out_chunk = fd_dcache_compact_next( ctx->store_out_chunk, sizeof(ulong), ctx->store_out_chunk0, ctx->store_out_wmark );
}
if( FD_UNLIKELY( !ctx->in_wen_restart ) ) return;

ulong heaviest_fork_found = 0;
fd_restart_recv_gossip_msg( ctx->restart, ctx->restart_gossip_msg, &heaviest_fork_found );
if( FD_UNLIKELY( heaviest_fork_found ) ) {
ulong need_repair = 0;
fd_restart_find_heaviest_fork_bank_hash( ctx->restart, ctx->funk, ctx->blockstore, &need_repair );
if( FD_LIKELY( need_repair ) ) {
/* Send the heaviest fork slot to the store tile for repair and replay */
uchar * buf = fd_chunk_to_laddr( ctx->store_out_mem, ctx->store_out_chunk );
FD_STORE( ulong, buf, ctx->restart->heaviest_fork_slot );
fd_mcache_publish( ctx->store_out_mcache, ctx->store_out_depth, ctx->store_out_seq, 1UL, ctx->store_out_chunk,
sizeof(ulong), 0UL, 0, 0 );
ctx->store_out_seq = fd_seq_inc( ctx->store_out_seq, 1UL );
ctx->store_out_chunk = fd_dcache_compact_next( ctx->store_out_chunk, sizeof(ulong), ctx->store_out_chunk0, ctx->store_out_wmark );
}
}
return;
Expand Down Expand Up @@ -1467,8 +1469,21 @@ after_credit( fd_replay_tile_ctx_t * ctx,
ulong buf_len = 0;
uchar * buf = fd_chunk_to_laddr( ctx->gossip_out_mem, ctx->gossip_out_chunk );
fd_sysvar_slot_history_read( ctx->slot_ctx, fd_scratch_virtual(), ctx->slot_ctx->slot_history );

fd_epoch_bank_t * epoch_bank = fd_exec_epoch_ctx_epoch_bank( ctx->slot_ctx->epoch_ctx );
fd_stakes_t * stakes = &epoch_bank->stakes;
/* FIXME: Restoring funk checkpoint does not give the correct epoch number,
* i.e., the epoch number when the funk checkpoint file is produced.
* For now, we need to redo stakes->epoch in order to avoid a BHM in
* a local setup when repairing blocks. */
stakes->epoch = ctx->blockstore->smr / epoch_bank->epoch_schedule.slots_per_epoch;
FD_LOG_NOTICE(( "slots_per_epoch=%lu, blockstore smr=%lu", epoch_bank->epoch_schedule.slots_per_epoch, ctx->blockstore->smr ));
FD_LOG_WARNING(( "Reset stakes->epoch=%lu (blockstore root / slots per epoch)", stakes->epoch ));

fd_restart_init( ctx->restart,
stakes->epoch,
&ctx->slot_ctx->slot_bank.epoch_stakes,
&epoch_bank->next_epoch_stakes,
ctx->tower,
ctx->slot_ctx->slot_history,
ctx->funk,
Expand All @@ -1485,17 +1500,6 @@ after_credit( fd_replay_tile_ctx_t * ctx,
buf_len, 0UL, 0, 0 );
ctx->gossip_out_seq = fd_seq_inc( ctx->gossip_out_seq, 1UL );
ctx->gossip_out_chunk = fd_dcache_compact_next( ctx->gossip_out_chunk, buf_len, ctx->gossip_out_chunk0, ctx->gossip_out_wmark );

/* FIXME: Restoring funk checkpoint does not give the correct epoch number,
* i.e., the epoch number when the funk checkpoint file is produced.
* For now, we need to redo stakes->epoch in order to avoid a BHM in
* a local setup when repairing blocks. */
fd_epoch_bank_t * epoch_bank = fd_exec_epoch_ctx_epoch_bank( ctx->slot_ctx->epoch_ctx );
fd_stakes_t * stakes = &epoch_bank->stakes;
stakes->epoch = ctx->blockstore->smr / epoch_bank->epoch_schedule.slots_per_epoch;
FD_LOG_NOTICE(( "slots_per_epoch=%lu, blockstore root=%lu", epoch_bank->epoch_schedule.slots_per_epoch, ctx->blockstore->smr ));
FD_LOG_WARNING(( "Reset stakes->epoch=%lu (blockstore root / slots per epoch)", stakes->epoch ));

} FD_SCRATCH_SCOPE_END;
}
}
Expand Down Expand Up @@ -1549,6 +1553,7 @@ during_housekeeping( void * _ctx ) {
fd_ghost_publish( ctx->ghost, smr );
}

/* TODO: generate snapshot file instead of checkpointing funk; dump the latest tower sent */
/* FIXME: Decide how to tell FD to checkpoint funk and then halt before restarting FD in wen-restart mode */
if( ctx->in_wen_restart && ctx->curr_slot>ctx->snapshot_slot+250 ) {
checkpt( ctx );
Expand Down
Loading