diff --git a/src/app/fdctl/config.c b/src/app/fdctl/config.c index 4b5fd6e134..0e8aa5384c 100644 --- a/src/app/fdctl/config.c +++ b/src/app/fdctl/config.c @@ -232,6 +232,7 @@ static int parse_key_value( config_t * config, ENTRY_UINT ( ., layout, quic_tile_count ); ENTRY_UINT ( ., layout, verify_tile_count ); ENTRY_UINT ( ., layout, bank_tile_count ); + ENTRY_UINT ( ., layout, shred_tile_count ); ENTRY_STR ( ., hugetlbfs, gigantic_page_mount_path ); ENTRY_STR ( ., hugetlbfs, huge_page_mount_path ); @@ -621,6 +622,7 @@ topo_initialize( config_t * config ) { ulong quic_tile_cnt = config->layout.quic_tile_count; ulong verify_tile_cnt = config->layout.verify_tile_count; ulong bank_tile_cnt = config->layout.bank_tile_count; + ulong shred_tile_cnt = config->layout.shred_tile_count; fd_topo_t topo[1] = { fd_topob_new( config->name ) }; @@ -661,7 +663,7 @@ topo_initialize( config_t * config ) { FOR(net_tile_cnt) fd_topob_link( topo, "net_netmux", "netmux_inout", 0, config->tiles.net.send_buffer_size, FD_NET_MTU, 1UL ); /**/ fd_topob_link( topo, "netmux_out", "netmux_inout", 0, config->tiles.net.send_buffer_size, FD_NET_MTU, 1UL ); FOR(quic_tile_cnt) fd_topob_link( topo, "quic_netmux", "netmux_inout", 0, config->tiles.net.send_buffer_size, FD_NET_MTU, 1UL ); - /**/ fd_topob_link( topo, "shred_netmux", "netmux_inout", 0, config->tiles.net.send_buffer_size, FD_NET_MTU, 1UL ); + FOR(shred_tile_cnt) fd_topob_link( topo, "shred_netmux", "netmux_inout", 0, config->tiles.net.send_buffer_size, FD_NET_MTU, 1UL ); FOR(quic_tile_cnt) fd_topob_link( topo, "quic_verify", "quic_verify", 1, config->tiles.verify.receive_buffer_size, 0UL, config->tiles.quic.txn_reassembly_count ); FOR(verify_tile_cnt) fd_topob_link( topo, "verify_dedup", "verify_dedup", 0, config->tiles.verify.receive_buffer_size, FD_TPU_DCACHE_MTU, 1UL ); /**/ fd_topob_link( topo, "dedup_pack", "dedup_pack", 0, config->tiles.verify.receive_buffer_size, FD_TPU_DCACHE_MTU, 1UL ); @@ -674,12 +676,12 @@ topo_initialize( config_t * config ) { /**/ fd_topob_link( topo, "poh_shred", "poh_shred", 0, 128UL, USHORT_MAX, 1UL ); /**/ fd_topob_link( topo, "crds_shred", "poh_shred", 0, 128UL, 8UL + 40200UL * 38UL, 1UL ); /* See long comment in fd_shred.c for an explanation about the size of this dcache. */ - /**/ fd_topob_link( topo, "shred_store", "shred_store", 0, 16384UL, 4UL*FD_SHRED_STORE_MTU, 4UL+config->tiles.shred.max_pending_shred_sets ); + FOR(shred_tile_cnt) fd_topob_link( topo, "shred_store", "shred_store", 0, 16384UL, 4UL*FD_SHRED_STORE_MTU, 4UL+config->tiles.shred.max_pending_shred_sets ); FOR(quic_tile_cnt) fd_topob_link( topo, "quic_sign", "quic_sign", 0, 128UL, 130UL, 1UL ); FOR(quic_tile_cnt) fd_topob_link( topo, "sign_quic", "sign_quic", 0, 128UL, 64UL, 1UL ); - /**/ fd_topob_link( topo, "shred_sign", "shred_sign", 0, 128UL, 32UL, 1UL ); - /**/ fd_topob_link( topo, "sign_shred", "sign_shred", 0, 128UL, 64UL, 1UL ); + FOR(shred_tile_cnt) fd_topob_link( topo, "shred_sign", "shred_sign", 0, 128UL, 32UL, 1UL ); + FOR(shred_tile_cnt) fd_topob_link( topo, "sign_shred", "sign_shred", 0, 128UL, 64UL, 1UL ); ushort tile_to_cpu[ FD_TILE_MAX ]; for( ulong i=0UL; itile_cnt], 0, "pack_bank", 0UL ); FOR(bank_tile_cnt) fd_topob_tile( topo, "bank", "bank", "metric_in", "metric_in", tile_to_cpu[topo->tile_cnt], 1, "bank_poh", i ); /**/ fd_topob_tile( topo, "poh", "poh", "metric_in", "metric_in", tile_to_cpu[topo->tile_cnt], 1, "poh_shred", 0UL ); - /**/ fd_topob_tile( topo, "shred", "shred", "metric_in", "metric_in", tile_to_cpu[topo->tile_cnt], 0, "shred_store", 0UL ); + FOR(shred_tile_cnt) fd_topob_tile( topo, "shred", "shred", "metric_in", "metric_in", tile_to_cpu[topo->tile_cnt], 0, "shred_store", i ); /**/ fd_topob_tile( topo, "store", "store", "metric_in", "metric_in", tile_to_cpu[topo->tile_cnt], 1, NULL, 0UL ); /**/ fd_topob_tile( topo, "sign", "sign", "metric_in", "metric_in", tile_to_cpu[topo->tile_cnt], 0, NULL, 0UL ); /**/ fd_topob_tile( topo, "metric", "metric", "metric_in", "metric_in", tile_to_cpu[topo->tile_cnt], 0, NULL, 0UL ); @@ -715,7 +717,7 @@ topo_initialize( config_t * config ) { FOR(net_tile_cnt) fd_topob_tile_in( topo, "net", i, "metric_in", "netmux_out", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */ FOR(net_tile_cnt) fd_topob_tile_in( topo, "netmux", 0UL, "metric_in", "net_netmux", i, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */ FOR(quic_tile_cnt) fd_topob_tile_in( topo, "netmux", 0UL, "metric_in", "quic_netmux", i, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */ - /**/ fd_topob_tile_in( topo, "netmux", 0UL, "metric_in", "shred_netmux", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */ + FOR(shred_tile_cnt) fd_topob_tile_in( topo, "netmux", 0UL, "metric_in", "shred_netmux", i, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */ FOR(quic_tile_cnt) fd_topob_tile_in( topo, "quic", i, "metric_in", "netmux_out", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */ FOR(quic_tile_cnt) fd_topob_tile_out( topo, "quic", i, "quic_netmux", i ); /* All verify tiles read from all QUIC tiles, packets are round robin. */ @@ -740,12 +742,12 @@ topo_initialize( config_t * config ) { /**/ fd_topob_tile_in( topo, "poh", 0UL, "metric_in", "stake_out", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); /**/ fd_topob_tile_in( topo, "poh", 0UL, "metric_in", "pack_bank", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); fd_topob_tile_out( topo, "poh", 0UL, "poh_pack", 0UL ); - /**/ fd_topob_tile_in( topo, "shred", 0UL, "metric_in", "netmux_out", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */ - /**/ fd_topob_tile_in( topo, "shred", 0UL, "metric_in", "poh_shred", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); - /**/ fd_topob_tile_in( topo, "shred", 0UL, "metric_in", "stake_out", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); - /**/ fd_topob_tile_in( topo, "shred", 0UL, "metric_in", "crds_shred", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); - /**/ fd_topob_tile_out( topo, "shred", 0UL, "shred_netmux", 0UL ); - /**/ fd_topob_tile_in( topo, "store", 0UL, "metric_in", "shred_store", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); + FOR(shred_tile_cnt) fd_topob_tile_in( topo, "shred", i, "metric_in", "netmux_out", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */ + FOR(shred_tile_cnt) fd_topob_tile_in( topo, "shred", i, "metric_in", "poh_shred", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); + FOR(shred_tile_cnt) fd_topob_tile_in( topo, "shred", i, "metric_in", "stake_out", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); + FOR(shred_tile_cnt) fd_topob_tile_in( topo, "shred", i, "metric_in", "crds_shred", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); + FOR(shred_tile_cnt) fd_topob_tile_out( topo, "shred", i, "shred_netmux", i ); + FOR(shred_tile_cnt) fd_topob_tile_in( topo, "store", 0UL, "metric_in", "shred_store", i, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); /* Sign links don't need to be reliable because they are synchronous, so there's at most one fragment in flight at a time anyway. The @@ -758,11 +760,12 @@ topo_initialize( config_t * config ) { /**/ fd_topob_tile_in( topo, "quic", i, "metric_in", "sign_quic", i, FD_TOPOB_UNRELIABLE, FD_TOPOB_UNPOLLED ); /**/ fd_topob_tile_out( topo, "sign", 0UL, "sign_quic", i ); } - - /**/ fd_topob_tile_in( topo, "sign", 0UL, "metric_in", "shred_sign", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); - /**/ fd_topob_tile_out( topo, "shred", 0UL, "shred_sign", 0UL ); - /**/ fd_topob_tile_in( topo, "shred", 0UL, "metric_in", "sign_shred", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_UNPOLLED ); - /**/ fd_topob_tile_out( topo, "sign", 0UL, "sign_shred", 0UL ); + for( ulong i=0UL; itiles[ fd_topo_find_tile( topo, "poh", 0UL ) ]; - fd_topo_tile_t * shred_tile = &topo->tiles[ fd_topo_find_tile( topo, "shred", 0UL ) ]; fd_topob_tile_uses( topo, poh_tile, poh_shred_obj, FD_SHMEM_JOIN_MODE_READ_WRITE ); - fd_topob_tile_uses( topo, shred_tile, poh_shred_obj, FD_SHMEM_JOIN_MODE_READ_ONLY ); + for( ulong i=0UL; itiles[ fd_topo_find_tile( topo, "shred", i ) ]; + fd_topob_tile_uses( topo, shred_tile, poh_shred_obj, FD_SHMEM_JOIN_MODE_READ_ONLY ); + } FD_TEST( fd_pod_insertf_ulong( topo->props, poh_shred_obj->id, "poh_shred" ) ); for( ulong i=0UL; itile_cnt; i++ ) { diff --git a/src/app/fdctl/config.h b/src/app/fdctl/config.h index 78a2c81ba3..30dcee6cd0 100644 --- a/src/app/fdctl/config.h +++ b/src/app/fdctl/config.h @@ -122,6 +122,7 @@ typedef struct { uint quic_tile_count; uint verify_tile_count; uint bank_tile_count; + uint shred_tile_count; } layout; struct { diff --git a/src/app/fdctl/config/default.toml b/src/app/fdctl/config/default.toml index c339412d6f..060a53aa61 100644 --- a/src/app/fdctl/config/default.toml +++ b/src/app/fdctl/config/default.toml @@ -540,6 +540,10 @@ dynamic_port_range = "8900-9000" # if they are not writing to the same accounts at the same time. bank_tile_count = 2 + # How many shred tiles to run. Multiple shred tiles can run in + # parallel to create shreds from transaction entries. + shred_tile_count = 1 + # All memory that will be used in Firedancer is pre-allocated in two # kinds of pages: huge and gigantic. Huge pages are 2MB and gigantic # pages are 1GB. This is done to prevent TLB misses which can have a diff --git a/src/app/fdctl/run/tiles/fd_shred.c b/src/app/fdctl/run/tiles/fd_shred.c index 1868b8cc0c..2c70a54fca 100644 --- a/src/app/fdctl/run/tiles/fd_shred.c +++ b/src/app/fdctl/run/tiles/fd_shred.c @@ -124,6 +124,15 @@ typedef struct { fd_fec_resolver_t * resolver; fd_pubkey_t identity_key[1]; /* Just the public key */ + ulong round_robin_id; + ulong round_robin_cnt; + /* Number of batches shredded from PoH during the current slot. + This should be the same for all the shred tiles. */ + ulong batch_cnt; + /* Slot of the most recent microblock we've seen from PoH, + or 0 if we haven't seen one yet */ + ulong slot; + fd_keyguard_client_t keyguard_client[1]; uint src_ip_addr; @@ -285,7 +294,6 @@ during_frag( void * _ctx, ulong sz, int * opt_filter ) { (void)seq; - (void)opt_filter; fd_shred_ctx_t * ctx = (fd_shred_ctx_t *)_ctx; @@ -351,34 +359,49 @@ during_frag( void * _ctx, ctx->pending_batch.slot = 0UL; ctx->pending_batch.pos = 0UL; ctx->pending_batch.microblock_cnt = 0UL; + ctx->batch_cnt = 0UL; } ctx->pending_batch.slot = target_slot; - /* Ugh, yet another memcpy */ - fd_memcpy( ctx->pending_batch.payload + ctx->pending_batch.pos, entry, entry_sz ); + if( FD_UNLIKELY( target_slot!=ctx->slot )) { + /* Reset batch count if we are in a new slot */ + ctx->batch_cnt = 0UL; + ctx->slot = target_slot; + } + if( FD_UNLIKELY( ctx->batch_cnt%ctx->round_robin_cnt==ctx->round_robin_id ) ) { + /* Ugh, yet another memcpy */ + fd_memcpy( ctx->pending_batch.payload + ctx->pending_batch.pos, entry, entry_sz ); + } else { + /* If we are not processing this batch, filter */ + *opt_filter = 1; + } ctx->pending_batch.pos += entry_sz; ctx->pending_batch.microblock_cnt++; int last_in_batch = entry_meta->block_complete | (ctx->pending_batch.pos > PENDING_BATCH_WMARK); + ctx->send_fec_set_idx = ULONG_MAX; if( FD_UNLIKELY( last_in_batch )) { - fd_shredder_init_batch( ctx->shredder, ctx->pending_batch.raw, sizeof(ulong)+ctx->pending_batch.pos, target_slot, entry_meta ); - - /* We sized this so it fits in one FEC set */ - FD_TEST( fd_shredder_next_fec_set( ctx->shredder, out ) ); - fd_shredder_fini_batch( ctx->shredder ); + if( FD_UNLIKELY( ctx->batch_cnt%ctx->round_robin_cnt==ctx->round_robin_id ) ) { + /* If it's our turn, shred this batch */ + fd_shredder_init_batch( ctx->shredder, ctx->pending_batch.raw, sizeof(ulong)+ctx->pending_batch.pos, target_slot, entry_meta ); + /* We sized this so it fits in one FEC set */ + FD_TEST( fd_shredder_next_fec_set( ctx->shredder, out ) ); + fd_shredder_fini_batch( ctx->shredder ); + + d_rcvd_join( d_rcvd_new( d_rcvd_delete( d_rcvd_leave( out->data_shred_rcvd ) ) ) ); + p_rcvd_join( p_rcvd_new( p_rcvd_delete( p_rcvd_leave( out->parity_shred_rcvd ) ) ) ); + + ctx->send_fec_set_idx = ctx->shredder_fec_set_idx; + } else { + /* If it's not our turn, update the indices for this slot */ + fd_shredder_skip_batch( ctx->shredder, sizeof(ulong)+ctx->pending_batch.pos, target_slot ); + } - d_rcvd_join( d_rcvd_new( d_rcvd_delete( d_rcvd_leave( out->data_shred_rcvd ) ) ) ); - p_rcvd_join( p_rcvd_new( p_rcvd_delete( p_rcvd_leave( out->parity_shred_rcvd ) ) ) ); - - ctx->send_fec_set_idx = ctx->shredder_fec_set_idx; - - /* reset state */ ctx->pending_batch.slot = 0UL; ctx->pending_batch.pos = 0UL; ctx->pending_batch.microblock_cnt = 0UL; - } else { - ctx->send_fec_set_idx = ULONG_MAX; + ctx->batch_cnt++; } } else { /* the common case, from the netmux tile */ /* The FEC resolver API does not present a prepare/commit model. If we @@ -392,6 +415,19 @@ during_frag( void * _ctx, uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->net_in_mem, chunk ); ulong hdr_sz = fd_disco_netmux_sig_hdr_sz( sig ); FD_TEST( hdr_sz < sz ); /* Should be ensured by the net tile */ + fd_shred_t const * shred = fd_shred_parse( dcache_entry+hdr_sz, sz-hdr_sz ); + if( FD_UNLIKELY( !shred ) ) { + *opt_filter = 1; + return; + }; + /* all shreds in the same FEC set will have the same signature + so we can round-robin shreds between the shred tiles based on + just the signature without splitting individual FEC sets. */ + ulong sig = fd_ulong_load_8( shred->signature ); + if( FD_LIKELY( sig%ctx->round_robin_cnt!=ctx->round_robin_id ) ) { + *opt_filter = 1; + return; + } fd_memcpy( ctx->shred_buffer, dcache_entry+hdr_sz, sz-hdr_sz ); ctx->shred_buffer_sz = sz-hdr_sz; } @@ -643,6 +679,11 @@ unprivileged_init( fd_topo_t * topo, FD_SCRATCH_ALLOC_INIT( l, scratch ); fd_shred_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof( fd_shred_ctx_t ), sizeof( fd_shred_ctx_t ) ); + ctx->round_robin_cnt = fd_topo_tile_name_cnt( topo, tile->name ); + ctx->round_robin_id = tile->kind_id; + ctx->batch_cnt = 0UL; + ctx->slot = ULONG_MAX; + ulong fec_resolver_footprint = fd_fec_resolver_footprint( tile->shred.fec_resolver_depth, 1UL, shred_store_mcache_depth, 128UL * tile->shred.fec_resolver_depth ); ulong fec_set_cnt = shred_store_mcache_depth + tile->shred.fec_resolver_depth + 4UL; diff --git a/src/app/fdctl/run/tiles/fd_store.c b/src/app/fdctl/run/tiles/fd_store.c index 5bfa70ef0a..2b82c8de8a 100644 --- a/src/app/fdctl/run/tiles/fd_store.c +++ b/src/app/fdctl/run/tiles/fd_store.c @@ -1,11 +1,15 @@ #include "tiles.h" +typedef struct { + fd_wksp_t * mem; + ulong chunk0; + ulong wmark; +} fd_store_in_ctx_t; + typedef struct { uchar __attribute__((aligned(32UL))) mem[ FD_SHRED_STORE_MTU ]; - fd_wksp_t * in_mem; - ulong in_chunk0; - ulong in_wmark; + fd_store_in_ctx_t in[ 32 ]; } fd_store_ctx_t; FD_FN_CONST static inline ulong @@ -56,10 +60,10 @@ during_frag( void * _ctx, fd_store_ctx_t * ctx = (fd_store_ctx_t *)_ctx; - if( FD_UNLIKELY( chunkin_chunk0 || chunk>ctx->in_wmark || sz>FD_SHRED_STORE_MTU || sz<32UL ) ) - FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->in_chunk0, ctx->in_wmark )); + if( FD_UNLIKELY( chunkin[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark || sz>FD_SHRED_STORE_MTU || sz<32UL ) ) + FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark )); - uchar * src = (uchar *)fd_chunk_to_laddr( ctx->in_mem, chunk ); + uchar * src = (uchar *)fd_chunk_to_laddr( ctx->in[in_idx].mem, chunk ); fd_memcpy( ctx->mem, src, sz ); } @@ -115,9 +119,14 @@ unprivileged_init( fd_topo_t * topo, FD_COMPILER_MFENCE(); FD_LOG_NOTICE(( "Got blockstore" )); - ctx->in_mem = topo->workspaces[ topo->objs[ topo->links[ tile->in_link_id[ 0UL ] ].dcache_obj_id ].wksp_id ].wksp; - ctx->in_chunk0 = fd_dcache_compact_chunk0( ctx->in_mem, topo->links[ tile->in_link_id[ 0UL ] ].dcache ); - ctx->in_wmark = fd_dcache_compact_wmark ( ctx->in_mem, topo->links[ tile->in_link_id[ 0UL ] ].dcache, topo->links[ tile->in_link_id[ 0UL ] ].mtu ); + for( ulong i=0; iin_cnt; i++ ) { + fd_topo_link_t * link = &topo->links[ tile->in_link_id[ i ] ]; + fd_topo_wksp_t * link_wksp = &topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ]; + + ctx->in[i].mem = link_wksp->wksp; + ctx->in[i].chunk0 = fd_dcache_compact_chunk0( ctx->in[i].mem, link->dcache ); + ctx->in[i].wmark = fd_dcache_compact_wmark ( ctx->in[i].mem, link->dcache, link->mtu ); + } ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, 1UL ); if( FD_UNLIKELY( scratch_top > (ulong)scratch + scratch_footprint( tile ) ) ) diff --git a/src/app/fddev/dev.c b/src/app/fddev/dev.c index 20c71b69a0..5244418c4d 100644 --- a/src/app/fddev/dev.c +++ b/src/app/fddev/dev.c @@ -132,13 +132,16 @@ update_config_for_dev( config_t * const config ) { /* Automatically compute the shred version from genesis if it exists and we don't know it. If it doesn't exist, we'll keep it set to zero and get from gossip. */ - ulong shred_id = fd_topo_find_tile( &config->topo, "shred", 0UL ); - if( FD_UNLIKELY( shred_id==ULONG_MAX ) ) FD_LOG_ERR(( "could not find shred tile" )); - fd_topo_tile_t * shred = &config->topo.tiles[ shred_id ]; - if( FD_LIKELY( shred->shred.expected_shred_version==(ushort)0 ) ) { - char genesis_path[ PATH_MAX ]; - FD_TEST( fd_cstr_printf_check( genesis_path, PATH_MAX, NULL, "%s/genesis.bin", config->ledger.path ) ); - shred->shred.expected_shred_version = compute_shred_version( genesis_path ); + char genesis_path[ PATH_MAX ]; + FD_TEST( fd_cstr_printf_check( genesis_path, PATH_MAX, NULL, "%s/genesis.bin", config->ledger.path ) ); + ushort shred_version = compute_shred_version( genesis_path ); + for( ulong i=0UL; ilayout.shred_tile_count; i++ ) { + ulong shred_id = fd_topo_find_tile( &config->topo, "shred", i ); + if( FD_UNLIKELY( shred_id==ULONG_MAX ) ) FD_LOG_ERR(( "could not find shred tile %lu", i )); + fd_topo_tile_t * shred = &config->topo.tiles[ shred_id ]; + if( FD_LIKELY( shred->shred.expected_shred_version==(ushort)0 ) ) { + shred->shred.expected_shred_version = shred_version; + } } if( FD_LIKELY( !strcmp( config->consensus.vote_account_path, "" ) ) ) diff --git a/src/disco/shred/fd_shredder.c b/src/disco/shred/fd_shredder.c index 340cdd10d7..578a053aad 100644 --- a/src/disco/shred/fd_shredder.c +++ b/src/disco/shred/fd_shredder.c @@ -82,6 +82,29 @@ fd_shredder_delete( void * mem ) { } +fd_shredder_t * +fd_shredder_skip_batch( fd_shredder_t * shredder, + ulong entry_batch_sz, + ulong slot ) { + + if( FD_UNLIKELY( entry_batch_sz==0UL ) ) return NULL; + + if( FD_UNLIKELY( slot != shredder->slot ) ) { + shredder->data_idx_offset = 0UL; + shredder->parity_idx_offset = 0UL; + } + + ulong data_shred_cnt = fd_shredder_count_data_shreds( entry_batch_sz ); + ulong parity_shred_cnt = fd_shredder_count_parity_shreds( entry_batch_sz ); + + shredder->data_idx_offset += data_shred_cnt; + shredder->parity_idx_offset += parity_shred_cnt; + shredder->slot = slot; + + return shredder; +} + + fd_shredder_t * fd_shredder_init_batch( fd_shredder_t * shredder, void const * entry_batch, diff --git a/src/disco/shred/fd_shredder.h b/src/disco/shred/fd_shredder.h index 5bc7e5b901..67d0f97726 100644 --- a/src/disco/shred/fd_shredder.h +++ b/src/disco/shred/fd_shredder.h @@ -163,6 +163,18 @@ fd_shredder_t * fd_shredder_init_batch( fd_shredder_t * shredder, ulong slot, fd_entry_batch_meta_t const * meta ); +/* fd_shredder_skip_batch updates the shredder state as necessary + to skip processing this current batch. shredder must be a valid + local join. entry_batch_sz must be strictly positive. + + Returns shredder, which will have data and parity shred indices + updated as if the caller had called fd_shredder_init_batch with + a batch of the specified size, followed by fd_shredder_next_fec_set + exactly fd_shredder_count_fec_sets( entry_batch_sz ) times. */ +fd_shredder_t * fd_shredder_skip_batch( fd_shredder_t * shredder, + ulong entry_batch_sz, + ulong slot ); + /* fd_shredder_next_fec_set extracts the next FEC set from the in progress batch. Computes the entirety of both data and parity shreds, including the parity information, Merkle proofs, and diff --git a/src/disco/shred/test_shredder.c b/src/disco/shred/test_shredder.c index a66f9b3e2d..4fd3986863 100644 --- a/src/disco/shred/test_shredder.c +++ b/src/disco/shred/test_shredder.c @@ -9,6 +9,10 @@ #define PERF_TEST2_SZ (1UL*1024UL*1024UL) uchar perf_test_entry_batch[ PERF_TEST_SZ ]; +/* Data used in test_skip_batch */ +#define SKIP_TEST_SZ (1024UL*1024UL) +uchar skip_test_data[ SKIP_TEST_SZ ]; + uchar fec_set_memory_1[ 2048UL * FD_REEDSOL_DATA_SHREDS_MAX ]; uchar fec_set_memory_2[ 2048UL * FD_REEDSOL_PARITY_SHREDS_MAX ]; @@ -16,13 +20,6 @@ uchar fec_set_memory_2[ 2048UL * FD_REEDSOL_PARITY_SHREDS_MAX ]; private key, second 32B are what we call the public key. */ FD_IMPORT_BINARY( test_private_key, "src/disco/shred/fixtures/demo-shreds.key" ); -#if FD_HAS_HOSTED -#include "../../util/net/fd_pcap.h" -#include - -FD_IMPORT_BINARY( test_pcap, "src/disco/shred/fixtures/demo-shreds.pcap" ); -FD_IMPORT_BINARY( test_bin, "src/disco/shred/fixtures/demo-shreds.bin" ); - fd_shredder_t _shredder[ 1 ]; struct signer_ctx { @@ -50,6 +47,13 @@ test_signer( void * _ctx, fd_ed25519_sign( signature, merkle_root, 32UL, ctx->public_key, ctx->private_key, ctx->sha512 ); } +#if FD_HAS_HOSTED +#include "../../util/net/fd_pcap.h" +#include + +FD_IMPORT_BINARY( test_pcap, "src/disco/shred/fixtures/demo-shreds.pcap" ); +FD_IMPORT_BINARY( test_bin, "src/disco/shred/fixtures/demo-shreds.bin" ); + static void test_shredder_pcap( void ) { signer_ctx_t signer_ctx[ 1 ]; @@ -141,6 +145,107 @@ test_shredder_pcap( void ) { #endif /* FD_HAS_HOSTED */ +static void +test_skip_batch( void ) { + fd_rng_t _rng[ 1 ]; fd_rng_t * r = fd_rng_join( fd_rng_new( _rng, 0U, 0UL ) ); + + signer_ctx_t signer_ctx[ 1 ]; + signer_ctx_init( signer_ctx, test_private_key ); + + #define SHREDDERS 4 + + FD_TEST( SHREDDERS>0 ); + + fd_shredder_t _shredders[ SHREDDERS ]; + fd_shredder_t * shredders[ SHREDDERS ]; + /* Initialize all the shredders */ + for( ulong i=0; idata_shreds[ j ] = data_shreds + 2048UL*j; + for( ulong j=0UL; jparity_shreds[ j ] = parity_shreds + 2048UL*j; + + ulong data_shred_cnt = 0; + ulong parity_shred_cnt = 0; + ulong idx = 0UL; + ulong slot = 1UL; + + for( ulong i=0; iSKIP_TEST_SZ, SKIP_TEST_SZ-idx, sz ); + ulong shredder = fd_rng_ulong_roll( r, SHREDDERS ); + for( ulong i=0; idata_shred_cnt; + parity_shred_cnt += _set->parity_shred_cnt; + } + FD_TEST( fd_shredder_fini_batch( shredders[ i ] ) ); + } else { + FD_TEST( fd_shredder_skip_batch( shredders[ i ], batch_sz, slot )); + } + } + for( ulong i=0; idata_idx_offset==data_shred_cnt ); + FD_TEST( shredders[ i ]->parity_idx_offset==parity_shred_cnt ); + } + idx += batch_sz; + /* Increment the slot every 200_000 bytes. */ + if( FD_UNLIKELY( idx/200000UL>=slot ) ) { + slot++; + data_shred_cnt = 0; + parity_shred_cnt = 0; + } + } + + /* Process a set with the first shredder. */ + memset( data_shreds, 0, FD_REEDSOL_DATA_SHREDS_MAX*2048UL ); + memset( parity_shreds, 0, FD_REEDSOL_PARITY_SHREDS_MAX*2048UL ); + FD_TEST( fd_shredder_init_batch( shredders[ 0 ], skip_test_data, idx, slot, meta ) ); + FD_TEST( fd_shredder_next_fec_set( shredders[ 0 ], _set ) ); + FD_TEST( fd_shredder_fini_batch( shredders[ 0 ] ) ); + + /* Make all the other shredders process the same data and compare the outputs. */ + fd_fec_set_t _temp_set[ 1 ]; + uchar temp_data_shreds [ 2048UL*FD_REEDSOL_DATA_SHREDS_MAX ] = { 0 }; + uchar temp_parity_shreds[ 2048UL*FD_REEDSOL_PARITY_SHREDS_MAX ] = { 0 }; + for( ulong j=0UL; jdata_shreds[ j ] = temp_data_shreds + 2048UL*j; + for( ulong j=0UL; jparity_shreds[ j ] = temp_parity_shreds + 2048UL*j; + for( ulong i=1UL; idata_shred_cnt==_temp_set->data_shred_cnt ); + FD_TEST( _set->parity_shred_cnt==_temp_set->parity_shred_cnt ); + + for( ulong j=0UL; j<_temp_set->data_shred_cnt; j++ ) + FD_TEST( !memcmp( _set->data_shreds[ j ], _temp_set->data_shreds[ j ], 2048UL ) ); + for( ulong j=0UL; j<_temp_set->parity_shred_cnt; j++ ) + FD_TEST( !memcmp( _set->parity_shreds[ j ], _temp_set->parity_shreds[ j ], 2048UL ) ); + } + + #undef SHREDDERS +} + static void test_shredder_count( void ) { @@ -218,6 +323,7 @@ test_shredder_count( void ) { } } + static void perf_test( void ) { for( ulong i=0UL; i