Skip to content

Commit

Permalink
shred: allow multiple tiles
Browse files Browse the repository at this point in the history
  • Loading branch information
anwayde committed Apr 3, 2024
1 parent cfe64a7 commit 4ba08fd
Show file tree
Hide file tree
Showing 9 changed files with 265 additions and 59 deletions.
43 changes: 24 additions & 19 deletions src/app/fdctl/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ static int parse_key_value( config_t * config,
ENTRY_UINT ( ., layout, quic_tile_count );
ENTRY_UINT ( ., layout, verify_tile_count );
ENTRY_UINT ( ., layout, bank_tile_count );
ENTRY_UINT ( ., layout, shred_tile_count );

ENTRY_STR ( ., hugetlbfs, gigantic_page_mount_path );
ENTRY_STR ( ., hugetlbfs, huge_page_mount_path );
Expand Down Expand Up @@ -621,6 +622,7 @@ topo_initialize( config_t * config ) {
ulong quic_tile_cnt = config->layout.quic_tile_count;
ulong verify_tile_cnt = config->layout.verify_tile_count;
ulong bank_tile_cnt = config->layout.bank_tile_count;
ulong shred_tile_cnt = config->layout.shred_tile_count;

fd_topo_t topo[1] = { fd_topob_new( config->name ) };

Expand Down Expand Up @@ -661,7 +663,7 @@ topo_initialize( config_t * config ) {
FOR(net_tile_cnt) fd_topob_link( topo, "net_netmux", "netmux_inout", 0, config->tiles.net.send_buffer_size, FD_NET_MTU, 1UL );
/**/ fd_topob_link( topo, "netmux_out", "netmux_inout", 0, config->tiles.net.send_buffer_size, FD_NET_MTU, 1UL );
FOR(quic_tile_cnt) fd_topob_link( topo, "quic_netmux", "netmux_inout", 0, config->tiles.net.send_buffer_size, FD_NET_MTU, 1UL );
/**/ fd_topob_link( topo, "shred_netmux", "netmux_inout", 0, config->tiles.net.send_buffer_size, FD_NET_MTU, 1UL );
FOR(shred_tile_cnt) fd_topob_link( topo, "shred_netmux", "netmux_inout", 0, config->tiles.net.send_buffer_size, FD_NET_MTU, 1UL );
FOR(quic_tile_cnt) fd_topob_link( topo, "quic_verify", "quic_verify", 1, config->tiles.verify.receive_buffer_size, 0UL, config->tiles.quic.txn_reassembly_count );
FOR(verify_tile_cnt) fd_topob_link( topo, "verify_dedup", "verify_dedup", 0, config->tiles.verify.receive_buffer_size, FD_TPU_DCACHE_MTU, 1UL );
/**/ fd_topob_link( topo, "dedup_pack", "dedup_pack", 0, config->tiles.verify.receive_buffer_size, FD_TPU_DCACHE_MTU, 1UL );
Expand All @@ -674,12 +676,12 @@ topo_initialize( config_t * config ) {
/**/ fd_topob_link( topo, "poh_shred", "poh_shred", 0, 128UL, USHORT_MAX, 1UL );
/**/ fd_topob_link( topo, "crds_shred", "poh_shred", 0, 128UL, 8UL + 40200UL * 38UL, 1UL );
/* See long comment in fd_shred.c for an explanation about the size of this dcache. */
/**/ fd_topob_link( topo, "shred_store", "shred_store", 0, 16384UL, 4UL*FD_SHRED_STORE_MTU, 4UL+config->tiles.shred.max_pending_shred_sets );
FOR(shred_tile_cnt) fd_topob_link( topo, "shred_store", "shred_store", 0, 16384UL, 4UL*FD_SHRED_STORE_MTU, 4UL+config->tiles.shred.max_pending_shred_sets );

FOR(quic_tile_cnt) fd_topob_link( topo, "quic_sign", "quic_sign", 0, 128UL, 130UL, 1UL );
FOR(quic_tile_cnt) fd_topob_link( topo, "sign_quic", "sign_quic", 0, 128UL, 64UL, 1UL );
/**/ fd_topob_link( topo, "shred_sign", "shred_sign", 0, 128UL, 32UL, 1UL );
/**/ fd_topob_link( topo, "sign_shred", "sign_shred", 0, 128UL, 64UL, 1UL );
FOR(shred_tile_cnt) fd_topob_link( topo, "shred_sign", "shred_sign", 0, 128UL, 32UL, 1UL );
FOR(shred_tile_cnt) fd_topob_link( topo, "sign_shred", "sign_shred", 0, 128UL, 64UL, 1UL );

ushort tile_to_cpu[ FD_TILE_MAX ];
for( ulong i=0UL; i<FD_TILE_MAX; i++ ) tile_to_cpu[ i ] = USHORT_MAX; /* Unassigned tiles will be floating. */
Expand All @@ -695,7 +697,7 @@ topo_initialize( config_t * config ) {
/**/ fd_topob_tile( topo, "pack", "pack", "metric_in", "metric_in", tile_to_cpu[topo->tile_cnt], 0, "pack_bank", 0UL );
FOR(bank_tile_cnt) fd_topob_tile( topo, "bank", "bank", "metric_in", "metric_in", tile_to_cpu[topo->tile_cnt], 1, "bank_poh", i );
/**/ fd_topob_tile( topo, "poh", "poh", "metric_in", "metric_in", tile_to_cpu[topo->tile_cnt], 1, "poh_shred", 0UL );
/**/ fd_topob_tile( topo, "shred", "shred", "metric_in", "metric_in", tile_to_cpu[topo->tile_cnt], 0, "shred_store", 0UL );
FOR(shred_tile_cnt) fd_topob_tile( topo, "shred", "shred", "metric_in", "metric_in", tile_to_cpu[topo->tile_cnt], 0, "shred_store", i );
/**/ fd_topob_tile( topo, "store", "store", "metric_in", "metric_in", tile_to_cpu[topo->tile_cnt], 1, NULL, 0UL );
/**/ fd_topob_tile( topo, "sign", "sign", "metric_in", "metric_in", tile_to_cpu[topo->tile_cnt], 0, NULL, 0UL );
/**/ fd_topob_tile( topo, "metric", "metric", "metric_in", "metric_in", tile_to_cpu[topo->tile_cnt], 0, NULL, 0UL );
Expand All @@ -715,7 +717,7 @@ topo_initialize( config_t * config ) {
FOR(net_tile_cnt) fd_topob_tile_in( topo, "net", i, "metric_in", "netmux_out", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */
FOR(net_tile_cnt) fd_topob_tile_in( topo, "netmux", 0UL, "metric_in", "net_netmux", i, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */
FOR(quic_tile_cnt) fd_topob_tile_in( topo, "netmux", 0UL, "metric_in", "quic_netmux", i, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */
/**/ fd_topob_tile_in( topo, "netmux", 0UL, "metric_in", "shred_netmux", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */
FOR(shred_tile_cnt) fd_topob_tile_in( topo, "netmux", 0UL, "metric_in", "shred_netmux", i, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */
FOR(quic_tile_cnt) fd_topob_tile_in( topo, "quic", i, "metric_in", "netmux_out", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */
FOR(quic_tile_cnt) fd_topob_tile_out( topo, "quic", i, "quic_netmux", i );
/* All verify tiles read from all QUIC tiles, packets are round robin. */
Expand All @@ -740,12 +742,12 @@ topo_initialize( config_t * config ) {
/**/ fd_topob_tile_in( topo, "poh", 0UL, "metric_in", "stake_out", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
/**/ fd_topob_tile_in( topo, "poh", 0UL, "metric_in", "pack_bank", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
fd_topob_tile_out( topo, "poh", 0UL, "poh_pack", 0UL );
/**/ fd_topob_tile_in( topo, "shred", 0UL, "metric_in", "netmux_out", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */
/**/ fd_topob_tile_in( topo, "shred", 0UL, "metric_in", "poh_shred", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
/**/ fd_topob_tile_in( topo, "shred", 0UL, "metric_in", "stake_out", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
/**/ fd_topob_tile_in( topo, "shred", 0UL, "metric_in", "crds_shred", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
/**/ fd_topob_tile_out( topo, "shred", 0UL, "shred_netmux", 0UL );
/**/ fd_topob_tile_in( topo, "store", 0UL, "metric_in", "shred_store", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
FOR(shred_tile_cnt) fd_topob_tile_in( topo, "shred", i, "metric_in", "netmux_out", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */
FOR(shred_tile_cnt) fd_topob_tile_in( topo, "shred", i, "metric_in", "poh_shred", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
FOR(shred_tile_cnt) fd_topob_tile_in( topo, "shred", i, "metric_in", "stake_out", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
FOR(shred_tile_cnt) fd_topob_tile_in( topo, "shred", i, "metric_in", "crds_shred", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
FOR(shred_tile_cnt) fd_topob_tile_out( topo, "shred", i, "shred_netmux", i );
FOR(shred_tile_cnt) fd_topob_tile_in( topo, "store", 0UL, "metric_in", "shred_store", i, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );

/* Sign links don't need to be reliable because they are synchronous,
so there's at most one fragment in flight at a time anyway. The
Expand All @@ -758,11 +760,12 @@ topo_initialize( config_t * config ) {
/**/ fd_topob_tile_in( topo, "quic", i, "metric_in", "sign_quic", i, FD_TOPOB_UNRELIABLE, FD_TOPOB_UNPOLLED );
/**/ fd_topob_tile_out( topo, "sign", 0UL, "sign_quic", i );
}

/**/ fd_topob_tile_in( topo, "sign", 0UL, "metric_in", "shred_sign", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED );
/**/ fd_topob_tile_out( topo, "shred", 0UL, "shred_sign", 0UL );
/**/ fd_topob_tile_in( topo, "shred", 0UL, "metric_in", "sign_shred", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_UNPOLLED );
/**/ fd_topob_tile_out( topo, "sign", 0UL, "sign_shred", 0UL );
for( ulong i=0UL; i<shred_tile_cnt; i++ ) {
/**/ fd_topob_tile_in( topo, "sign", 0UL, "metric_in", "shred_sign", i, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED );
/**/ fd_topob_tile_out( topo, "shred", i, "shred_sign", i );
/**/ fd_topob_tile_in( topo, "shred", i, "metric_in", "sign_shred", i, FD_TOPOB_UNRELIABLE, FD_TOPOB_UNPOLLED );
/**/ fd_topob_tile_out( topo, "sign", 0UL, "sign_shred", i );
}

/* PoH tile represents the Solana Labs address space, so it's
responsible for publishing Solana Labs provided data to
Expand Down Expand Up @@ -792,9 +795,11 @@ topo_initialize( config_t * config ) {
version from the Solana Labs boot path to the shred tile. */
fd_topo_obj_t * poh_shred_obj = fd_topob_obj( topo, "fseq", "poh_shred" );
fd_topo_tile_t * poh_tile = &topo->tiles[ fd_topo_find_tile( topo, "poh", 0UL ) ];
fd_topo_tile_t * shred_tile = &topo->tiles[ fd_topo_find_tile( topo, "shred", 0UL ) ];
fd_topob_tile_uses( topo, poh_tile, poh_shred_obj, FD_SHMEM_JOIN_MODE_READ_WRITE );
fd_topob_tile_uses( topo, shred_tile, poh_shred_obj, FD_SHMEM_JOIN_MODE_READ_ONLY );
for( ulong i=0UL; i<shred_tile_cnt; i++ ) {
fd_topo_tile_t * shred_tile = &topo->tiles[ fd_topo_find_tile( topo, "shred", i ) ];
fd_topob_tile_uses( topo, shred_tile, poh_shred_obj, FD_SHMEM_JOIN_MODE_READ_ONLY );
}
FD_TEST( fd_pod_insertf_ulong( topo->props, poh_shred_obj->id, "poh_shred" ) );

for( ulong i=0UL; i<topo->tile_cnt; i++ ) {
Expand Down
1 change: 1 addition & 0 deletions src/app/fdctl/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ typedef struct {
uint quic_tile_count;
uint verify_tile_count;
uint bank_tile_count;
uint shred_tile_count;
} layout;

struct {
Expand Down
4 changes: 4 additions & 0 deletions src/app/fdctl/config/default.toml
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,10 @@ dynamic_port_range = "8900-9000"
# if they are not writing to the same accounts at the same time.
bank_tile_count = 2

# How many shred tiles to run. Multiple shred tiles can run in
# parallel to create shreds from transaction entries.
shred_tile_count = 1

# All memory that will be used in Firedancer is pre-allocated in two
# kinds of pages: huge and gigantic. Huge pages are 2MB and gigantic
# pages are 1GB. This is done to prevent TLB misses which can have a
Expand Down
73 changes: 57 additions & 16 deletions src/app/fdctl/run/tiles/fd_shred.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,15 @@ typedef struct {
fd_fec_resolver_t * resolver;
fd_pubkey_t identity_key[1]; /* Just the public key */

ulong round_robin_id;
ulong round_robin_cnt;
/* Number of batches shredded from PoH during the current slot.
This should be the same for all the shred tiles. */
ulong batch_cnt;
/* Slot of the most recent microblock we've seen from PoH,
or 0 if we haven't seen one yet */
ulong slot;

fd_keyguard_client_t keyguard_client[1];

uint src_ip_addr;
Expand Down Expand Up @@ -285,7 +294,6 @@ during_frag( void * _ctx,
ulong sz,
int * opt_filter ) {
(void)seq;
(void)opt_filter;

fd_shred_ctx_t * ctx = (fd_shred_ctx_t *)_ctx;

Expand Down Expand Up @@ -351,34 +359,49 @@ during_frag( void * _ctx,
ctx->pending_batch.slot = 0UL;
ctx->pending_batch.pos = 0UL;
ctx->pending_batch.microblock_cnt = 0UL;
ctx->batch_cnt = 0UL;
}

ctx->pending_batch.slot = target_slot;
/* Ugh, yet another memcpy */
fd_memcpy( ctx->pending_batch.payload + ctx->pending_batch.pos, entry, entry_sz );
if( FD_UNLIKELY( target_slot!=ctx->slot )) {
/* Reset batch count if we are in a new slot */
ctx->batch_cnt = 0UL;
ctx->slot = target_slot;
}
if( FD_UNLIKELY( ctx->batch_cnt%ctx->round_robin_cnt==ctx->round_robin_id ) ) {
/* Ugh, yet another memcpy */
fd_memcpy( ctx->pending_batch.payload + ctx->pending_batch.pos, entry, entry_sz );
} else {
/* If we are not processing this batch, filter */
*opt_filter = 1;
}
ctx->pending_batch.pos += entry_sz;
ctx->pending_batch.microblock_cnt++;

int last_in_batch = entry_meta->block_complete | (ctx->pending_batch.pos > PENDING_BATCH_WMARK);

ctx->send_fec_set_idx = ULONG_MAX;
if( FD_UNLIKELY( last_in_batch )) {
fd_shredder_init_batch( ctx->shredder, ctx->pending_batch.raw, sizeof(ulong)+ctx->pending_batch.pos, target_slot, entry_meta );

/* We sized this so it fits in one FEC set */
FD_TEST( fd_shredder_next_fec_set( ctx->shredder, out ) );
fd_shredder_fini_batch( ctx->shredder );
if( FD_UNLIKELY( ctx->batch_cnt%ctx->round_robin_cnt==ctx->round_robin_id ) ) {
/* If it's our turn, shred this batch */
fd_shredder_init_batch( ctx->shredder, ctx->pending_batch.raw, sizeof(ulong)+ctx->pending_batch.pos, target_slot, entry_meta );
/* We sized this so it fits in one FEC set */
FD_TEST( fd_shredder_next_fec_set( ctx->shredder, out ) );
fd_shredder_fini_batch( ctx->shredder );

d_rcvd_join( d_rcvd_new( d_rcvd_delete( d_rcvd_leave( out->data_shred_rcvd ) ) ) );
p_rcvd_join( p_rcvd_new( p_rcvd_delete( p_rcvd_leave( out->parity_shred_rcvd ) ) ) );

ctx->send_fec_set_idx = ctx->shredder_fec_set_idx;
} else {
/* If it's not our turn, update the indices for this slot */
fd_shredder_skip_batch( ctx->shredder, sizeof(ulong)+ctx->pending_batch.pos, target_slot );
}

d_rcvd_join( d_rcvd_new( d_rcvd_delete( d_rcvd_leave( out->data_shred_rcvd ) ) ) );
p_rcvd_join( p_rcvd_new( p_rcvd_delete( p_rcvd_leave( out->parity_shred_rcvd ) ) ) );

ctx->send_fec_set_idx = ctx->shredder_fec_set_idx;

/* reset state */
ctx->pending_batch.slot = 0UL;
ctx->pending_batch.pos = 0UL;
ctx->pending_batch.microblock_cnt = 0UL;
} else {
ctx->send_fec_set_idx = ULONG_MAX;
ctx->batch_cnt++;
}
} else { /* the common case, from the netmux tile */
/* The FEC resolver API does not present a prepare/commit model. If we
Expand All @@ -392,6 +415,19 @@ during_frag( void * _ctx,
uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->net_in_mem, chunk );
ulong hdr_sz = fd_disco_netmux_sig_hdr_sz( sig );
FD_TEST( hdr_sz < sz ); /* Should be ensured by the net tile */
fd_shred_t const * shred = fd_shred_parse( dcache_entry+hdr_sz, sz-hdr_sz );
if( FD_UNLIKELY( !shred ) ) {
*opt_filter = 1;
return;
};
/* all shreds in the same FEC set will have the same signature
so we can round-robin shreds between the shred tiles based on
just the signature without splitting individual FEC sets. */
ulong sig = fd_ulong_load_8( shred->signature );
if( FD_LIKELY( sig%ctx->round_robin_cnt!=ctx->round_robin_id ) ) {
*opt_filter = 1;
return;
}
fd_memcpy( ctx->shred_buffer, dcache_entry+hdr_sz, sz-hdr_sz );
ctx->shred_buffer_sz = sz-hdr_sz;
}
Expand Down Expand Up @@ -643,6 +679,11 @@ unprivileged_init( fd_topo_t * topo,
FD_SCRATCH_ALLOC_INIT( l, scratch );
fd_shred_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof( fd_shred_ctx_t ), sizeof( fd_shred_ctx_t ) );

ctx->round_robin_cnt = fd_topo_tile_name_cnt( topo, tile->name );
ctx->round_robin_id = tile->kind_id;
ctx->batch_cnt = 0UL;
ctx->slot = ULONG_MAX;

ulong fec_resolver_footprint = fd_fec_resolver_footprint( tile->shred.fec_resolver_depth, 1UL, shred_store_mcache_depth,
128UL * tile->shred.fec_resolver_depth );
ulong fec_set_cnt = shred_store_mcache_depth + tile->shred.fec_resolver_depth + 4UL;
Expand Down
Loading

0 comments on commit 4ba08fd

Please sign in to comment.