Skip to content

Commit

Permalink
Add some code for wen-restart
Browse files Browse the repository at this point in the history
  • Loading branch information
yhzhangjump committed Oct 28, 2024
1 parent 7ed0381 commit 5fe027a
Show file tree
Hide file tree
Showing 16 changed files with 1,739 additions and 47 deletions.
1 change: 1 addition & 0 deletions src/app/fdctl/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ typedef struct {
char status_cache[ PATH_MAX ];
ulong tpool_thread_count;
char cluster_version[ 32 ];
int in_wen_restart;
} replay;

struct {
Expand Down
1 change: 1 addition & 0 deletions src/app/fdctl/config_parse.c
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ fdctl_pod_to_cfg( config_t * config,
CFG_POP ( cstr, tiles.replay.status_cache );
CFG_POP ( ulong, tiles.replay.tpool_thread_count );
CFG_POP ( cstr, tiles.replay.cluster_version );
CFG_POP ( bool, tiles.replay.in_wen_restart );

CFG_POP ( cstr, tiles.store_int.blockstore_restore );
CFG_POP ( cstr, tiles.store_int.slots_pending );
Expand Down
171 changes: 133 additions & 38 deletions src/app/fdctl/run/tiles/fd_gossip.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "../../../../disco/store/util.h"
#include "../../../../flamenco/gossip/fd_gossip.h"
#include "../../../../flamenco/runtime/fd_system_ids.h"
#include "../../../../disco/restart/fd_restart.h"
#include "../../../../util/fd_util.h"
#include "../../../../util/net/fd_eth.h"
#include "../../../../util/net/fd_ip4.h"
Expand All @@ -28,15 +29,17 @@

#define NET_IN_IDX 0
#define VOTER_IN_IDX 1
#define SIGN_IN_IDX 2
#define REPLAY_IN_IDX 2
#define SIGN_IN_IDX 3

#define NET_OUT_IDX 0
#define SHRED_OUT_IDX 1
#define REPAIR_OUT_IDX 2
#define DEDUP_OUT_IDX 3
#define SIGN_OUT_IDX 4
#define VOTER_OUT_IDX 5
#define EQVOC_OUT_IDX 6
#define REPLAY_OUT_IDX 6
#define EQVOC_OUT_IDX 7

#define CONTACT_INFO_PUBLISH_TIME_NS ((long)5e9)

Expand All @@ -49,21 +52,6 @@

static volatile ulong * fd_shred_version;

static int
fd_pubkey_eq( fd_pubkey_t const * key1, fd_pubkey_t const * key2 ) {
return memcmp( key1->key, key2->key, sizeof(fd_pubkey_t) ) == 0;
}

static ulong
fd_pubkey_hash( fd_pubkey_t const * key, ulong seed ) {
return fd_hash( seed, key->key, sizeof(fd_pubkey_t) );
}

static void
fd_pubkey_copy( fd_pubkey_t * keyd, fd_pubkey_t const * keys ) {
memcpy( keyd->key, keys->key, sizeof(fd_pubkey_t) );
}

/* Contact info table */
#define MAP_NAME fd_contact_info_table
#define MAP_KEY_T fd_pubkey_t
Expand Down Expand Up @@ -132,10 +120,24 @@ struct fd_gossip_tile_ctx {
ulong duplicate_shred_out_wmark;
ulong duplicate_shred_out_chunk;

fd_wksp_t * voter_in_mem;
ulong voter_in_chunk0;
ulong voter_in_wmark;

fd_wksp_t * replay_in_mem;
ulong replay_in_chunk0;
ulong replay_in_wmark;

fd_frag_meta_t * replay_out_mcache;
ulong * replay_out_sync;
ulong replay_out_depth;
ulong replay_out_seq;

fd_wksp_t * replay_out_mem;
ulong replay_out_chunk0;
ulong replay_out_wmark;
ulong replay_out_chunk;

fd_wksp_t * wksp;
fd_gossip_peer_addr_t gossip_my_addr;
fd_gossip_peer_addr_t tvu_my_addr;
Expand Down Expand Up @@ -176,6 +178,10 @@ struct fd_gossip_tile_ctx {

ulong replay_vote_txn_sz;
uchar replay_vote_txn [ FD_TXN_MTU ];

ulong restart_msg_sz;
long restart_last_voted_fork_push_time;
uchar restart_msg [ LAST_VOTED_FORK_MAX_MSG_BYTES ];
};
typedef struct fd_gossip_tile_ctx fd_gossip_tile_ctx_t;

Expand Down Expand Up @@ -287,7 +293,43 @@ static void
gossip_deliver_fun( fd_crds_data_t * data, void * arg ) {
fd_gossip_tile_ctx_t * ctx = (fd_gossip_tile_ctx_t *)arg;

if( fd_crds_data_is_vote( data ) ) {
if( fd_crds_data_is_restart_last_voted_fork_slots( data ) ) {
if( data->inner.restart_last_voted_fork_slots.offsets.discriminant!=fd_restart_slots_offsets_enum_raw_offsets ) {
FD_LOG_WARNING(( "Decoding RunLengthEncoding offsets is not implemented yet" ));
}

ulong struct_len = sizeof( fd_gossip_restart_last_voted_fork_slots_t );
ulong bitmap_len = data->inner.restart_last_voted_fork_slots.offsets.inner.raw_offsets.offsets.bits.bits_len;
if( FD_UNLIKELY( bitmap_len>LAST_VOTED_FORK_MAX_BITMAP_BYTES ) ) {
FD_LOG_WARNING(( "Ignore an invalid gossip message with bitmap length %lu, greater than %lu", bitmap_len, LAST_VOTED_FORK_MAX_BITMAP_BYTES ));
return;
}

uchar * last_vote_msg_ = fd_chunk_to_laddr( ctx->replay_out_mem, ctx->replay_out_chunk );
FD_STORE( uint, last_vote_msg_, fd_crds_data_enum_restart_last_voted_fork_slots );
last_vote_msg_ += sizeof(uint);

fd_memcpy( last_vote_msg_, &data->inner.restart_last_voted_fork_slots, struct_len );
fd_memcpy( last_vote_msg_ + struct_len,
data->inner.restart_last_voted_fork_slots.offsets.inner.raw_offsets.offsets.bits.bits,
bitmap_len );
fd_mcache_publish( ctx->replay_out_mcache, ctx->replay_out_depth, ctx->replay_out_seq, 1UL, ctx->replay_out_chunk,
sizeof(uint) + struct_len + bitmap_len, 0UL, 0, 0 );
ctx->replay_out_seq = fd_seq_inc( ctx->replay_out_seq, 1UL );
ctx->replay_out_chunk = fd_dcache_compact_next( ctx->replay_out_chunk, struct_len + bitmap_len, ctx->replay_out_chunk0, ctx->replay_out_wmark );
} else if( fd_crds_data_is_restart_heaviest_fork( data ) ) {
uchar * heaviest_fork_msg_ = fd_chunk_to_laddr( ctx->replay_out_mem, ctx->replay_out_chunk );
FD_STORE( uint, heaviest_fork_msg_, fd_crds_data_enum_restart_heaviest_fork );
heaviest_fork_msg_ += sizeof(uint);

fd_memcpy( heaviest_fork_msg_,
&data->inner.restart_heaviest_fork,
sizeof(fd_gossip_restart_heaviest_fork_t) );
fd_mcache_publish( ctx->replay_out_mcache, ctx->replay_out_depth, ctx->replay_out_seq, 1UL, ctx->replay_out_chunk,
sizeof(uint) + sizeof(fd_gossip_restart_heaviest_fork_t), 0UL, 0, 0 );
ctx->replay_out_seq = fd_seq_inc( ctx->replay_out_seq, 1UL );
ctx->replay_out_chunk = fd_dcache_compact_next( ctx->replay_out_chunk, sizeof(uint) + sizeof(fd_gossip_restart_heaviest_fork_t), ctx->replay_out_chunk0, ctx->replay_out_wmark );
} else if( fd_crds_data_is_vote( data ) ) {
fd_gossip_vote_t const * gossip_vote = &data->inner.vote;
if( verify_vote_txn( gossip_vote ) != 0 ) {
return;
Expand Down Expand Up @@ -369,7 +411,7 @@ before_frag( fd_gossip_tile_ctx_t * ctx,
(void)ctx;
(void)seq;

return in_idx != VOTER_IN_IDX && fd_disco_netmux_sig_proto( sig ) != DST_PROTO_GOSSIP;
return in_idx != VOTER_IN_IDX && in_idx != REPLAY_IN_IDX && fd_disco_netmux_sig_proto( sig ) != DST_PROTO_GOSSIP;
}

static inline void
Expand All @@ -382,13 +424,22 @@ during_frag( fd_gossip_tile_ctx_t * ctx,
(void)seq;
(void)sig;

if ( in_idx == VOTER_IN_IDX ) {
if( FD_UNLIKELY( chunk<ctx->replay_in_chunk0 || chunk>ctx->replay_in_wmark || sz>USHORT_MAX ) ) {
if( in_idx == REPLAY_IN_IDX ) {
if( FD_UNLIKELY( chunk<ctx->replay_in_chunk0 || chunk>ctx->replay_in_wmark || sz>LAST_VOTED_FORK_MAX_MSG_BYTES ) ) {
FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->replay_in_chunk0, ctx->replay_in_wmark ));
}
fd_memcpy( ctx->restart_msg, fd_chunk_to_laddr( ctx->replay_in_mem, chunk ), sz );
ctx->restart_msg_sz = sz;
return;
}

if ( in_idx == VOTER_IN_IDX ) {
if( FD_UNLIKELY( chunk<ctx->voter_in_chunk0 || chunk>ctx->voter_in_wmark || sz>USHORT_MAX ) ) {
FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->voter_in_chunk0, ctx->voter_in_wmark ));
}

ctx->replay_vote_txn_sz = sz;
memcpy( ctx->replay_vote_txn, fd_chunk_to_laddr( ctx->replay_in_mem, chunk ), sz );
memcpy( ctx->replay_vote_txn, fd_chunk_to_laddr( ctx->voter_in_mem, chunk ), sz );
return;
}

Expand Down Expand Up @@ -421,6 +472,11 @@ after_frag( fd_gossip_tile_ctx_t * ctx,
/* TODO: This doesn't seem right... */
if( in_idx!=NET_IN_IDX ) return;

if( in_idx == REPLAY_IN_IDX ) {
/* Messages from the replay tile for wen-restart are handled by after_credit periodically */
return;
}

if ( in_idx == VOTER_IN_IDX ) {
fd_crds_data_t vote_txn_crds;
vote_txn_crds.discriminant = fd_crds_data_enum_vote;
Expand Down Expand Up @@ -571,6 +627,25 @@ after_credit( fd_gossip_tile_ctx_t * ctx,
}
}

if( FD_UNLIKELY (( ctx->restart_msg_sz != 0 )&&( ctx->restart_last_voted_fork_push_time + LAST_VOTED_FORK_PUBLISH_PERIOD_NS < now )) ) {
ctx->restart_last_voted_fork_push_time = now;
fd_crds_data_t restart_msg;
restart_msg.discriminant = fd_crds_data_enum_restart_last_voted_fork_slots;
fd_memcpy( &restart_msg.inner.restart_last_voted_fork_slots, ctx->restart_msg, sizeof(fd_gossip_restart_last_voted_fork_slots_t) );

restart_msg.inner.restart_last_voted_fork_slots.shred_version = fd_gossip_get_shred_version( ctx->gossip );
restart_msg.inner.restart_last_voted_fork_slots.offsets.inner.raw_offsets.offsets.bits.bits = ctx->restart_msg + sizeof(fd_gossip_restart_last_voted_fork_slots_t);

FD_TEST( fd_gossip_push_value( ctx->gossip, &restart_msg, NULL ) == 0 );

FD_LOG_NOTICE(( "gossip sends the wen-restart message: struct=%luB, bitmap=%luB, shred_version=%u, vote_hash=%s",
sizeof(fd_gossip_restart_last_voted_fork_slots_t),
ctx->restart_msg_sz - sizeof(fd_gossip_restart_last_voted_fork_slots_t),
restart_msg.inner.restart_last_voted_fork_slots.shred_version,
FD_BASE58_ENC_32_ALLOCA( &restart_msg.inner.restart_last_voted_fork_slots.last_voted_hash ) ));
return;
}

ushort shred_version = fd_gossip_get_shred_version( ctx->gossip );
if( shred_version!=0U ) {
*fd_shred_version = shred_version;
Expand All @@ -597,21 +672,23 @@ unprivileged_init( fd_topo_t * topo,
fd_topo_tile_t * tile ) {
void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );

if( FD_UNLIKELY( tile->in_cnt != 3UL ||
strcmp( topo->links[ tile->in_link_id[ NET_IN_IDX ] ].name, "net_gossip" ) ||
strcmp( topo->links[ tile->in_link_id[ VOTER_IN_IDX ] ].name, "voter_gossip" ) ||
strcmp( topo->links[ tile->in_link_id[ SIGN_IN_IDX ] ].name, "sign_gossip" ) ) ) {
if( FD_UNLIKELY( tile->in_cnt != 4UL ||
strcmp( topo->links[ tile->in_link_id[ NET_IN_IDX ] ].name, "net_gossip" ) ||
strcmp( topo->links[ tile->in_link_id[ VOTER_IN_IDX ] ].name, "voter_gossip" ) ||
strcmp( topo->links[ tile->in_link_id[ REPLAY_IN_IDX ] ].name, "replay_gossi" ) ||
strcmp( topo->links[ tile->in_link_id[ SIGN_IN_IDX ] ].name, "sign_gossip" ) ) ) {
FD_LOG_ERR(( "gossip tile has none or unexpected input links %lu %s %s",
tile->in_cnt, topo->links[ tile->in_link_id[ 0 ] ].name, topo->links[ tile->in_link_id[ 1 ] ].name ));
}

if( FD_UNLIKELY( tile->out_cnt != 6 ||
strcmp( topo->links[ tile->out_link_id[ NET_OUT_IDX ] ].name, "gossip_net" ) ||
strcmp( topo->links[ tile->out_link_id[ SHRED_OUT_IDX ] ].name, "crds_shred" ) ||
strcmp( topo->links[ tile->out_link_id[ REPAIR_OUT_IDX ] ].name, "gossip_repai" ) ||
strcmp( topo->links[ tile->out_link_id[ DEDUP_OUT_IDX ] ].name, "gossip_dedup" ) ||
strcmp( topo->links[ tile->out_link_id[ SIGN_OUT_IDX ] ].name, "gossip_sign" ) ||
strcmp( topo->links[ tile->out_link_id[ VOTER_OUT_IDX ] ].name, "gossip_voter" ) ) ) {
if( FD_UNLIKELY( tile->out_cnt != 7UL ||
strcmp( topo->links[ tile->out_link_id[ NET_OUT_IDX ] ].name, "gossip_net" ) ||
strcmp( topo->links[ tile->out_link_id[ SHRED_OUT_IDX ] ].name, "crds_shred" ) ||
strcmp( topo->links[ tile->out_link_id[ REPAIR_OUT_IDX ] ].name, "gossip_repai" ) ||
strcmp( topo->links[ tile->out_link_id[ DEDUP_OUT_IDX ] ].name, "gossip_dedup" ) ||
strcmp( topo->links[ tile->out_link_id[ SIGN_OUT_IDX ] ].name, "gossip_sign" ) ||
strcmp( topo->links[ tile->out_link_id[ VOTER_OUT_IDX ] ].name, "gossip_voter" ) ||
strcmp( topo->links[ tile->out_link_id[ REPLAY_OUT_IDX ] ].name, "gossip_repla" ) ) ) {
FD_LOG_ERR(( "gossip tile has none or unexpected output links %lu %s %s",
tile->out_cnt, topo->links[ tile->out_link_id[ 0 ] ].name, topo->links[ tile->out_link_id[ 1 ] ].name ));
}
Expand Down Expand Up @@ -655,7 +732,9 @@ unprivileged_init( fd_topo_t * topo,

fd_net_create_packet_header_template( ctx->hdr, FD_NET_MTU, ctx->gossip_my_addr.addr, ctx->src_mac_addr, ctx->gossip_listen_port );

ctx->last_shred_dest_push_time = 0;
ctx->last_shred_dest_push_time = 0;
ctx->restart_msg_sz = 0;
ctx->restart_last_voted_fork_push_time = 0;

fd_topo_link_t * sign_in = &topo->links[ tile->in_link_id[ SIGN_IN_IDX ] ];
fd_topo_link_t * sign_out = &topo->links[ tile->out_link_id[ SIGN_OUT_IDX ] ];
Expand Down Expand Up @@ -726,10 +805,10 @@ unprivileged_init( fd_topo_t * topo,
ctx->net_in_chunk = fd_disco_compact_chunk0( ctx->net_in_mem );
ctx->net_in_wmark = fd_disco_compact_wmark( ctx->net_in_mem, netmux_link->mtu );

fd_topo_link_t * replay_in = &topo->links[ tile->in_link_id[ VOTER_IN_IDX ] ];
ctx->replay_in_mem = topo->workspaces[ topo->objs[ replay_in->dcache_obj_id ].wksp_id ].wksp;
ctx->replay_in_chunk0 = fd_dcache_compact_chunk0( ctx->replay_in_mem, replay_in->dcache );
ctx->replay_in_wmark = fd_dcache_compact_wmark( ctx->replay_in_mem, replay_in->dcache, replay_in->mtu );
fd_topo_link_t * voter_in = &topo->links[ tile->in_link_id[ VOTER_IN_IDX ] ];
ctx->voter_in_mem = topo->workspaces[ topo->objs[ voter_in->dcache_obj_id ].wksp_id ].wksp;
ctx->voter_in_chunk0 = fd_dcache_compact_chunk0( ctx->voter_in_mem, voter_in->dcache );
ctx->voter_in_wmark = fd_dcache_compact_wmark( ctx->voter_in_mem, voter_in->dcache, voter_in->mtu );

/* Set up shred contact info tile output */
fd_topo_link_t * shred_contact_out = &topo->links[ tile->out_link_id[ SHRED_OUT_IDX ] ];
Expand Down Expand Up @@ -775,6 +854,22 @@ unprivileged_init( fd_topo_t * topo,
ctx->voter_contact_out_wmark = fd_dcache_compact_wmark ( ctx->voter_contact_out_mem, voter_out->dcache, voter_out->mtu );
ctx->voter_contact_out_chunk = ctx->voter_contact_out_chunk0;

/* Set up crds restart messages input/output with the replay tile */
fd_topo_link_t * replay_in = &topo->links[ tile->in_link_id[ REPLAY_IN_IDX ] ];
ctx->replay_in_mem = topo->workspaces[ topo->objs[ replay_in->dcache_obj_id ].wksp_id ].wksp;
ctx->replay_in_chunk0 = fd_dcache_compact_chunk0( ctx->replay_in_mem, replay_in->dcache );
ctx->replay_in_wmark = fd_dcache_compact_wmark( ctx->replay_in_mem, replay_in->dcache, replay_in->mtu );

fd_topo_link_t * replay_out = &topo->links[ tile->out_link_id[ REPLAY_OUT_IDX ] ];
ctx->replay_out_mcache = replay_out->mcache;
ctx->replay_out_sync = fd_mcache_seq_laddr( ctx->replay_out_mcache );
ctx->replay_out_depth = fd_mcache_depth( ctx->replay_out_mcache );
ctx->replay_out_seq = fd_mcache_seq_query( ctx->replay_out_sync );
ctx->replay_out_mem = topo->workspaces[ topo->objs[ replay_out->dcache_obj_id ].wksp_id ].wksp;
ctx->replay_out_chunk0 = fd_dcache_compact_chunk0( ctx->replay_out_mem, replay_out->dcache );
ctx->replay_out_wmark = fd_dcache_compact_wmark ( ctx->replay_out_mem, replay_out->dcache, replay_out->mtu );
ctx->replay_out_chunk = ctx->replay_out_chunk0;

ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, 1UL );
if( FD_UNLIKELY( scratch_top>( (ulong)scratch + scratch_footprint( tile ) ) ) )
FD_LOG_ERR(( "scratch overflow %lu %lu %lu", scratch_top - (ulong)scratch - scratch_footprint( tile ), scratch_top, (ulong)scratch + scratch_footprint( tile ) ));
Expand Down
Loading

0 comments on commit 5fe027a

Please sign in to comment.