From afa53e9ebad4ac17f8c9324fe02d3f241262176f Mon Sep 17 00:00:00 2001 From: Giulio Date: Fri, 29 Nov 2024 10:40:58 +0100 Subject: [PATCH 01/27] save --- cl/phase1/forkchoice/forkchoice.go | 77 +++++++++++---------- cl/phase1/forkchoice/interface.go | 1 + cl/phase1/forkchoice/on_block.go | 59 +++++++++++++++- cl/phase1/network/services/block_service.go | 5 ++ 4 files changed, 105 insertions(+), 37 deletions(-) diff --git a/cl/phase1/forkchoice/forkchoice.go b/cl/phase1/forkchoice/forkchoice.go index 41cc13b9088..fd2b27b9fcc 100644 --- a/cl/phase1/forkchoice/forkchoice.go +++ b/cl/phase1/forkchoice/forkchoice.go @@ -87,14 +87,15 @@ type ForkChoiceStore struct { unrealizedJustifiedCheckpoint atomic.Value unrealizedFinalizedCheckpoint atomic.Value - proposerBoostRoot atomic.Value - headHash libcommon.Hash - headSlot uint64 - genesisTime uint64 - genesisValidatorsRoot libcommon.Hash - weights map[libcommon.Hash]uint64 - headSet map[libcommon.Hash]struct{} - hotSidecars map[libcommon.Hash][]*cltypes.BlobSidecar // Set of sidecars that are not yet processed. + proposerBoostRoot atomic.Value + headHash libcommon.Hash + headSlot uint64 + genesisTime uint64 + genesisValidatorsRoot libcommon.Hash + weights map[libcommon.Hash]uint64 + headSet map[libcommon.Hash]struct{} + hotSidecars map[libcommon.Hash][]*cltypes.BlobSidecar // Set of sidecars that are not yet processed. + verifiedExecutionPayload *lru.Cache[libcommon.Hash, struct{}] // childrens childrens sync.Map @@ -173,6 +174,11 @@ func NewForkChoiceStore( Epoch: state2.Epoch(anchorState.BeaconState), } + verifiedExecutionPayload, err := lru.New[libcommon.Hash, struct{}](1024) + if err != nil { + return nil, err + } + eth2Roots, err := lru.New[libcommon.Hash, libcommon.Hash](checkpointsPerCache) if err != nil { return nil, err @@ -227,33 +233,34 @@ func NewForkChoiceStore( headSet := make(map[libcommon.Hash]struct{}) headSet[anchorRoot] = struct{}{} f := &ForkChoiceStore{ - forkGraph: forkGraph, - equivocatingIndicies: make([]byte, anchorState.ValidatorLength(), anchorState.ValidatorLength()*2), - latestMessages: newLatestMessagesStore(anchorState.ValidatorLength()), - eth2Roots: eth2Roots, - engine: engine, - operationsPool: operationsPool, - beaconCfg: anchorState.BeaconConfig(), - preverifiedSizes: preverifiedSizes, - finalityCheckpoints: finalityCheckpoints, - totalActiveBalances: totalActiveBalances, - randaoMixesLists: randaoMixesLists, - randaoDeltas: randaoDeltas, - headSet: headSet, - weights: make(map[libcommon.Hash]uint64), - participation: participation, - emitters: emitters, - genesisTime: anchorState.GenesisTime(), - syncedDataManager: syncedDataManager, - nextBlockProposers: nextBlockProposers, - genesisValidatorsRoot: anchorState.GenesisValidatorsRoot(), - hotSidecars: make(map[libcommon.Hash][]*cltypes.BlobSidecar), - blobStorage: blobStorage, - ethClock: ethClock, - optimisticStore: optimistic.NewOptimisticStore(), - validatorMonitor: validatorMonitor, - probabilisticHeadGetter: probabilisticHeadGetter, - publicKeysRegistry: publicKeysRegistry, + forkGraph: forkGraph, + equivocatingIndicies: make([]byte, anchorState.ValidatorLength(), anchorState.ValidatorLength()*2), + latestMessages: newLatestMessagesStore(anchorState.ValidatorLength()), + eth2Roots: eth2Roots, + engine: engine, + operationsPool: operationsPool, + beaconCfg: anchorState.BeaconConfig(), + preverifiedSizes: preverifiedSizes, + finalityCheckpoints: finalityCheckpoints, + totalActiveBalances: totalActiveBalances, + randaoMixesLists: randaoMixesLists, + randaoDeltas: randaoDeltas, + headSet: headSet, + weights: make(map[libcommon.Hash]uint64), + participation: participation, + emitters: emitters, + genesisTime: anchorState.GenesisTime(), + syncedDataManager: syncedDataManager, + nextBlockProposers: nextBlockProposers, + genesisValidatorsRoot: anchorState.GenesisValidatorsRoot(), + hotSidecars: make(map[libcommon.Hash][]*cltypes.BlobSidecar), + blobStorage: blobStorage, + ethClock: ethClock, + optimisticStore: optimistic.NewOptimisticStore(), + validatorMonitor: validatorMonitor, + probabilisticHeadGetter: probabilisticHeadGetter, + publicKeysRegistry: publicKeysRegistry, + verifiedExecutionPayload: verifiedExecutionPayload, } f.justifiedCheckpoint.Store(anchorCheckpoint) f.finalizedCheckpoint.Store(anchorCheckpoint) diff --git a/cl/phase1/forkchoice/interface.go b/cl/phase1/forkchoice/interface.go index fcee6a3a83d..c5d0bfe2d62 100644 --- a/cl/phase1/forkchoice/interface.go +++ b/cl/phase1/forkchoice/interface.go @@ -89,6 +89,7 @@ type ForkChoiceStorageWriter interface { fullValidation bool, checkDataAvaibility bool, ) error + AddPreverifiedBlobSidecar(blobSidecar *cltypes.BlobSidecar) error OnTick(time uint64) SetSynced(synced bool) diff --git a/cl/phase1/forkchoice/on_block.go b/cl/phase1/forkchoice/on_block.go index f205ed9e61b..36123e05f83 100644 --- a/cl/phase1/forkchoice/on_block.go +++ b/cl/phase1/forkchoice/on_block.go @@ -78,6 +78,61 @@ func collectOnBlockLatencyToUnixTime(ethClock eth_clock.EthereumClock, slot uint monitor.ObserveBlockImportingLatency(initialSlotTime) } +func (f *ForkChoiceStore) ProcessBlockExecution(ctx context.Context, block *cltypes.SignedBeaconBlock) error { + blockRoot, err := block.Block.HashSSZ() + if err != nil { + return err + } + + if f.engine == nil || f.verifiedExecutionPayload.Contains(blockRoot) { + return nil + } + + var versionedHashes []libcommon.Hash + if f.engine != nil && block.Version() >= clparams.DenebVersion { + versionedHashes = []libcommon.Hash{} + solid.RangeErr[*cltypes.KZGCommitment](block.Block.Body.BlobKzgCommitments, func(i1 int, k *cltypes.KZGCommitment, i2 int) error { + versionedHash, err := utils.KzgCommitmentToVersionedHash(libcommon.Bytes48(*k)) + if err != nil { + return err + } + versionedHashes = append(versionedHashes, versionedHash) + return nil + }) + } + + if block.Version() >= clparams.DenebVersion { + if err := verifyKzgCommitmentsAgainstTransactions(f.beaconCfg, block.Block.Body.ExecutionPayload, block.Block.Body.BlobKzgCommitments); err != nil { + return fmt.Errorf("OnBlock: failed to process kzg commitments: %v", err) + } + } + timeStartExec := time.Now() + + payloadStatus, err := f.engine.NewPayload(ctx, block.Block.Body.ExecutionPayload, &block.Block.ParentRoot, versionedHashes) + switch payloadStatus { + case execution_client.PayloadStatusInvalidated: + log.Warn("OnBlock: block is invalid", "block", libcommon.Hash(blockRoot), "err", err) + f.forkGraph.MarkHeaderAsInvalid(blockRoot) + // remove from optimistic candidate + if err := f.optimisticStore.InvalidateBlock(block.Block); err != nil { + return fmt.Errorf("failed to remove block from optimistic store: %v", err) + } + return errors.New("block is invalid") + case execution_client.PayloadStatusValidated: + log.Trace("OnBlock: block is validated", "block", libcommon.Hash(blockRoot)) + // remove from optimistic candidate + if err := f.optimisticStore.ValidateBlock(block.Block); err != nil { + return fmt.Errorf("failed to validate block in optimistic store: %v", err) + } + f.verifiedExecutionPayload.Add(block.Block.Body.ExecutionPayload.BlockHash, struct{}{}) + } + if err != nil { + return fmt.Errorf("newPayload failed: %v", err) + } + monitor.ObserveExecutionTime(timeStartExec) + return nil +} + func (f *ForkChoiceStore) OnBlock(ctx context.Context, block *cltypes.SignedBeaconBlock, newPayload, fullValidation, checkDataAvaiability bool) error { f.mu.Lock() defer f.mu.Unlock() @@ -124,7 +179,7 @@ func (f *ForkChoiceStore) OnBlock(ctx context.Context, block *cltypes.SignedBeac } startEngine := time.Now() - if newPayload && f.engine != nil { + if newPayload && f.engine != nil && !f.verifiedExecutionPayload.Contains(blockRoot) { if block.Version() >= clparams.DenebVersion { if err := verifyKzgCommitmentsAgainstTransactions(f.beaconCfg, block.Block.Body.ExecutionPayload, block.Block.Body.BlobKzgCommitments); err != nil { return fmt.Errorf("OnBlock: failed to process kzg commitments: %v", err) @@ -141,7 +196,6 @@ func (f *ForkChoiceStore) OnBlock(ctx context.Context, block *cltypes.SignedBeac } case execution_client.PayloadStatusInvalidated: log.Warn("OnBlock: block is invalid", "block", libcommon.Hash(blockRoot), "err", err) - log.Debug("OnBlock: block is invalid", "block", libcommon.Hash(blockRoot)) f.forkGraph.MarkHeaderAsInvalid(blockRoot) // remove from optimistic candidate if err := f.optimisticStore.InvalidateBlock(block.Block); err != nil { @@ -154,6 +208,7 @@ func (f *ForkChoiceStore) OnBlock(ctx context.Context, block *cltypes.SignedBeac if err := f.optimisticStore.ValidateBlock(block.Block); err != nil { return fmt.Errorf("failed to validate block in optimistic store: %v", err) } + f.verifiedExecutionPayload.Add(block.Block.Body.ExecutionPayload.BlockHash, struct{}{}) } if err != nil { return fmt.Errorf("newPayload failed: %v", err) diff --git a/cl/phase1/network/services/block_service.go b/cl/phase1/network/services/block_service.go index ea1fe2d6597..2763133c432 100644 --- a/cl/phase1/network/services/block_service.go +++ b/cl/phase1/network/services/block_service.go @@ -19,6 +19,7 @@ package services import ( "context" "errors" + "fmt" "sync" "time" @@ -121,6 +122,10 @@ func (b *blockService) ProcessMessage(ctx context.Context, _ *uint64, msg *cltyp return ErrIgnore } + if err := b.forkchoiceStore.ProcessBlockExecution(ctx, msg); err != nil { + return fmt.Errorf("failed to pre-process block execution: %w", err) + } + if err := b.syncedData.ViewHeadState(func(headState *state.CachingBeaconState) error { // [IGNORE] The block is from a slot greater than the latest finalized slot -- i.e. validate that signed_beacon_block.message.slot > compute_start_slot_at_epoch(store.finalized_checkpoint.epoch) // (a client MAY choose to validate and store such blocks for additional purposes -- e.g. slashing detection, archive nodes, etc). From d235453ca0324990a3d98e19abc1713428f72ea0 Mon Sep 17 00:00:00 2001 From: Giulio Date: Fri, 29 Nov 2024 10:41:54 +0100 Subject: [PATCH 02/27] save --- cl/phase1/forkchoice/interface.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cl/phase1/forkchoice/interface.go b/cl/phase1/forkchoice/interface.go index c5d0bfe2d62..967fd7bce67 100644 --- a/cl/phase1/forkchoice/interface.go +++ b/cl/phase1/forkchoice/interface.go @@ -89,7 +89,7 @@ type ForkChoiceStorageWriter interface { fullValidation bool, checkDataAvaibility bool, ) error - + ProcessBlockExecution(ctx context.Context, block *cltypes.SignedBeaconBlock) error AddPreverifiedBlobSidecar(blobSidecar *cltypes.BlobSidecar) error OnTick(time uint64) SetSynced(synced bool) From beffa79fd12307dfe30fe79e35a8c5aac3cd9147 Mon Sep 17 00:00:00 2001 From: Giulio Date: Fri, 29 Nov 2024 10:48:29 +0100 Subject: [PATCH 03/27] save --- cl/phase1/forkchoice/on_block.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cl/phase1/forkchoice/on_block.go b/cl/phase1/forkchoice/on_block.go index 36123e05f83..d96e8cb7a20 100644 --- a/cl/phase1/forkchoice/on_block.go +++ b/cl/phase1/forkchoice/on_block.go @@ -179,6 +179,7 @@ func (f *ForkChoiceStore) OnBlock(ctx context.Context, block *cltypes.SignedBeac } startEngine := time.Now() + fmt.Println(!f.verifiedExecutionPayload.Contains(blockRoot)) if newPayload && f.engine != nil && !f.verifiedExecutionPayload.Contains(blockRoot) { if block.Version() >= clparams.DenebVersion { if err := verifyKzgCommitmentsAgainstTransactions(f.beaconCfg, block.Block.Body.ExecutionPayload, block.Block.Body.BlobKzgCommitments); err != nil { From 7cd41c31c04ace729adc1ef8a1d93471f2d61e63 Mon Sep 17 00:00:00 2001 From: Giulio Date: Fri, 29 Nov 2024 10:51:22 +0100 Subject: [PATCH 04/27] save --- cl/phase1/forkchoice/on_block.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cl/phase1/forkchoice/on_block.go b/cl/phase1/forkchoice/on_block.go index d96e8cb7a20..ebec6dc2824 100644 --- a/cl/phase1/forkchoice/on_block.go +++ b/cl/phase1/forkchoice/on_block.go @@ -209,7 +209,7 @@ func (f *ForkChoiceStore) OnBlock(ctx context.Context, block *cltypes.SignedBeac if err := f.optimisticStore.ValidateBlock(block.Block); err != nil { return fmt.Errorf("failed to validate block in optimistic store: %v", err) } - f.verifiedExecutionPayload.Add(block.Block.Body.ExecutionPayload.BlockHash, struct{}{}) + f.verifiedExecutionPayload.Add(blockRoot, struct{}{}) } if err != nil { return fmt.Errorf("newPayload failed: %v", err) From f3c868849f5f0b1262615d9c805a95c530a5cd1e Mon Sep 17 00:00:00 2001 From: Giulio Date: Fri, 29 Nov 2024 10:55:20 +0100 Subject: [PATCH 05/27] save --- cl/phase1/forkchoice/on_block.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cl/phase1/forkchoice/on_block.go b/cl/phase1/forkchoice/on_block.go index ebec6dc2824..cc603574efb 100644 --- a/cl/phase1/forkchoice/on_block.go +++ b/cl/phase1/forkchoice/on_block.go @@ -124,7 +124,7 @@ func (f *ForkChoiceStore) ProcessBlockExecution(ctx context.Context, block *clty if err := f.optimisticStore.ValidateBlock(block.Block); err != nil { return fmt.Errorf("failed to validate block in optimistic store: %v", err) } - f.verifiedExecutionPayload.Add(block.Block.Body.ExecutionPayload.BlockHash, struct{}{}) + f.verifiedExecutionPayload.Add(blockRoot, struct{}{}) } if err != nil { return fmt.Errorf("newPayload failed: %v", err) From 811dfd95d9da355a5d79cecd45a5bfb6837893d8 Mon Sep 17 00:00:00 2001 From: Giulio Date: Fri, 29 Nov 2024 15:37:57 +0100 Subject: [PATCH 06/27] save --- cl/beacon/handler/pool.go | 17 ++++++++++--- .../network/services/attestation_service.go | 24 +++++++++++++++---- 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/cl/beacon/handler/pool.go b/cl/beacon/handler/pool.go index 264446e9947..ceb506b0c9c 100644 --- a/cl/beacon/handler/pool.go +++ b/cl/beacon/handler/pool.go @@ -95,6 +95,8 @@ func (a *ApiHandler) PostEthV1BeaconPoolAttestations(w http.ResponseWriter, r *h } failures := []poolingFailure{} + + attestationsForGossip := make([]*services.AttestationWithGossipData, 0, len(req)) for i, attestation := range req { if a.syncedData.Syncing() { beaconhttp.NewEndpointError(http.StatusServiceUnavailable, errors.New("head state not available")).WriteTo(w) @@ -126,7 +128,7 @@ func (a *ApiHandler) PostEthV1BeaconPoolAttestations(w http.ResponseWriter, r *h beaconhttp.NewEndpointError(http.StatusInternalServerError, err).WriteTo(w) return } - attestationWithGossipData := &services.AttestationWithGossipData{ + attestationsForGossip = append(attestationsForGossip, &services.AttestationWithGossipData{ Attestation: attestation, GossipData: &sentinel.GossipData{ Data: encodedSSZ, @@ -134,17 +136,26 @@ func (a *ApiHandler) PostEthV1BeaconPoolAttestations(w http.ResponseWriter, r *h SubnetId: &subnet, }, ImmediateProcess: true, // we want to process attestation immediately + NoPublish: true, // we don't want to publish attestation to gossip within the processer + }) + } + + for _, attestationWithGossipData := range attestationsForGossip { + if _, err := a.sentinel.PublishGossip(r.Context(), attestationWithGossipData.GossipData); err != nil { + log.Warn("[Beacon REST] failed to publish attestation to gossip", "err", err) } + } - if err := a.attestationService.ProcessMessage(r.Context(), &subnet, attestationWithGossipData); err != nil && !errors.Is(err, services.ErrIgnore) { + for i, attestationWithGossipData := range attestationsForGossip { + if err := a.attestationService.ProcessMessage(r.Context(), attestationWithGossipData.GossipData.SubnetId, attestationWithGossipData); err != nil && !errors.Is(err, services.ErrIgnore) { log.Warn("[Beacon REST] failed to process attestation in attestation service", "err", err) failures = append(failures, poolingFailure{ Index: i, Message: err.Error(), }) - continue } } + if len(failures) > 0 { errResp := poolingError{ Code: http.StatusBadRequest, diff --git a/cl/phase1/network/services/attestation_service.go b/cl/phase1/network/services/attestation_service.go index 48d8a98e50f..149350633a6 100644 --- a/cl/phase1/network/services/attestation_service.go +++ b/cl/phase1/network/services/attestation_service.go @@ -23,6 +23,7 @@ import ( "sync" "time" + "github.com/Giulio2002/bls" "github.com/erigontech/erigon-lib/common" sentinel "github.com/erigontech/erigon-lib/gointerfaces/sentinelproto" "github.com/erigontech/erigon-lib/log/v3" @@ -70,6 +71,7 @@ type AttestationWithGossipData struct { GossipData *sentinel.GossipData // ImmediateProcess indicates whether the attestation should be processed immediately or able to be scheduled for later processing. ImmediateProcess bool + NoPublish bool } func NewAttestationService( @@ -268,13 +270,27 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64, }, } + // For this object it is 60% faster to verify the signature in a single call than batch verify it. if att.ImmediateProcess { - return s.batchSignatureVerifier.ImmediateVerification(aggregateVerificationData) - + valid, err := bls.Verify(signature[:], signingRoot[:], pubKey[:]) + if err != nil { + log.Crit("[AttestationService] signature verification failed with the error: " + err.Error()) + return err + } + if !valid { + log.Debug("[AttestationService] received invalid signature on the gossip", "topic", att.GossipData.Name) + return fmt.Errorf("invalid signature") + } + if !att.NoPublish { + if _, err = s.batchSignatureVerifier.sentinel.PublishGossip(ctx, att.GossipData); err != nil { + log.Debug("failed to publish gossip", "err", err) + } + } + aggregateVerificationData.F() + return nil } - // push the signatures to verify asynchronously and run final functions after that. - s.batchSignatureVerifier.AsyncVerifyAttestation(aggregateVerificationData) + s.batchSignatureVerifier.AsyncVerifyAggregateProof(aggregateVerificationData) // As the logic goes, if we return ErrIgnore there will be no peer banning and further publishing // gossip data into the network by the gossip manager. That's what we want because we will be doing that ourselves From 24e0cc53e2ffe995c6cce2de882de5c643c11c7e Mon Sep 17 00:00:00 2001 From: Giulio Date: Fri, 29 Nov 2024 18:12:58 +0100 Subject: [PATCH 07/27] save --- .../forkchoice/fork_graph/fork_graph_disk.go | 48 +++++++++++++------ cl/phase1/forkchoice/fork_graph/interface.go | 2 +- cl/phase1/forkchoice/interface.go | 4 +- cl/phase1/forkchoice/on_block.go | 9 +++- cl/phase1/network/services/block_service.go | 25 +++++++++- 5 files changed, 69 insertions(+), 19 deletions(-) diff --git a/cl/phase1/forkchoice/fork_graph/fork_graph_disk.go b/cl/phase1/forkchoice/fork_graph/fork_graph_disk.go index c8847f7097e..6ff946bc870 100644 --- a/cl/phase1/forkchoice/fork_graph/fork_graph_disk.go +++ b/cl/phase1/forkchoice/fork_graph/fork_graph_disk.go @@ -169,8 +169,13 @@ func (f *forkGraphDisk) AnchorSlot() uint64 { return f.anchorSlot } +func (f *forkGraphDisk) isBlockRootTheCurrentState(blockRoot libcommon.Hash) bool { + blockRootState, _ := f.currentState.BlockRoot() + return blockRoot == blockRootState +} + // Add a new node and edge to the graph -func (f *forkGraphDisk) AddChainSegment(signedBlock *cltypes.SignedBeaconBlock, fullValidation bool) (*state.CachingBeaconState, ChainSegmentInsertionResult, error) { +func (f *forkGraphDisk) AddChainSegment(signedBlock *cltypes.SignedBeaconBlock, fullValidation, shallowImport bool) (*state.CachingBeaconState, ChainSegmentInsertionResult, error) { block := signedBlock.Block blockRoot, err := block.HashSSZ() if err != nil { @@ -187,10 +192,17 @@ func (f *forkGraphDisk) AddChainSegment(signedBlock *cltypes.SignedBeaconBlock, return nil, BelowAnchor, nil } - newState, err := f.getState(block.ParentRoot, false, true) - if err != nil { - return nil, LogisticError, fmt.Errorf("AddChainSegment: %w, parentRoot; %x", err, block.ParentRoot) + isBlockRootTheCurrentState := f.isBlockRootTheCurrentState(blockRoot) + var newState *state.CachingBeaconState + if isBlockRootTheCurrentState { + newState = f.currentState + } else { + newState, err = f.getState(block.ParentRoot, false, true) + if err != nil { + return nil, LogisticError, fmt.Errorf("AddChainSegment: %w, parentRoot; %x", err, block.ParentRoot) + } } + if newState == nil { log.Trace("AddChainSegment: missing segment", "block", libcommon.Hash(blockRoot)) return nil, MissingSegment, nil @@ -199,7 +211,7 @@ func (f *forkGraphDisk) AddChainSegment(signedBlock *cltypes.SignedBeaconBlock, parentBlock, hasParentBlock := f.getBlock(block.ParentRoot) // Before processing the state: update the newest lightclient update. - if block.Version() >= clparams.AltairVersion && hasParentBlock && fullValidation && hasFinalized && f.rcfg.Beacon { + if block.Version() >= clparams.AltairVersion && hasParentBlock && fullValidation && hasFinalized && f.rcfg.Beacon && !shallowImport { nextSyncCommitteeBranch, err := newState.NextSyncCommitteeBranch() if err != nil { return nil, LogisticError, err @@ -244,14 +256,22 @@ func (f *forkGraphDisk) AddChainSegment(signedBlock *cltypes.SignedBeaconBlock, blockRewardsCollector := ð2.BlockRewardsCollector{} - // Execute the state - if invalidBlockErr := transition.TransitionState(newState, signedBlock, blockRewardsCollector, fullValidation); invalidBlockErr != nil { - // Add block to list of invalid blocks - log.Warn("Invalid beacon block", "slot", block.Slot, "blockRoot", blockRoot, "reason", invalidBlockErr) - f.badBlocks.Store(libcommon.Hash(blockRoot), struct{}{}) - //f.currentState = nil + if !isBlockRootTheCurrentState { + // Execute the state + if invalidBlockErr := transition.TransitionState(newState, signedBlock, blockRewardsCollector, fullValidation); invalidBlockErr != nil { + // Add block to list of invalid blocks + log.Warn("Invalid beacon block", "slot", block.Slot, "blockRoot", blockRoot, "reason", invalidBlockErr) + f.badBlocks.Store(libcommon.Hash(blockRoot), struct{}{}) + //f.currentState = nil + + return nil, InvalidBlock, invalidBlockErr + } + f.blockRewards.Store(libcommon.Hash(blockRoot), blockRewardsCollector) + } - return nil, InvalidBlock, invalidBlockErr + f.currentState = newState + if shallowImport { + return newState, Success, nil } // update diff storages. @@ -261,7 +281,6 @@ func (f *forkGraphDisk) AddChainSegment(signedBlock *cltypes.SignedBeaconBlock, f.currentIndicies.add(epoch, newState.RawCurrentEpochParticipation()) f.previousIndicies.add(epoch, newState.RawPreviousEpochParticipation()) } - f.blockRewards.Store(libcommon.Hash(blockRoot), blockRewardsCollector) period := f.beaconCfg.SyncCommitteePeriod(newState.Slot()) f.syncCommittees.Store(period, syncCommittees{ @@ -277,6 +296,7 @@ func (f *forkGraphDisk) AddChainSegment(signedBlock *cltypes.SignedBeaconBlock, f.lightclientBootstraps.Store(libcommon.Hash(blockRoot), lightclientBootstrap) } } + f.blocks.Store(libcommon.Hash(blockRoot), signedBlock) bodyRoot, err := signedBlock.Block.Body.HashSSZ() if err != nil { @@ -297,7 +317,7 @@ func (f *forkGraphDisk) AddChainSegment(signedBlock *cltypes.SignedBeaconBlock, if newState.Slot() > f.highestSeen { f.highestSeen = newState.Slot() } - f.currentState = newState + return newState, Success, nil } diff --git a/cl/phase1/forkchoice/fork_graph/interface.go b/cl/phase1/forkchoice/fork_graph/interface.go index 173cc7a9cd9..dc96f6de320 100644 --- a/cl/phase1/forkchoice/fork_graph/interface.go +++ b/cl/phase1/forkchoice/fork_graph/interface.go @@ -35,7 +35,7 @@ import ( * to analyze and manipulate the state of the blockchain. */ type ForkGraph interface { - AddChainSegment(signedBlock *cltypes.SignedBeaconBlock, fullValidation bool) (*state.CachingBeaconState, ChainSegmentInsertionResult, error) + AddChainSegment(signedBlock *cltypes.SignedBeaconBlock, fullValidation, shallowImport bool) (*state.CachingBeaconState, ChainSegmentInsertionResult, error) GetHeader(blockRoot libcommon.Hash) (*cltypes.BeaconBlockHeader, bool) GetState(blockRoot libcommon.Hash, alwaysCopy bool) (*state.CachingBeaconState, error) GetCurrentJustifiedCheckpoint(blockRoot libcommon.Hash) (solid.Checkpoint, bool) diff --git a/cl/phase1/forkchoice/interface.go b/cl/phase1/forkchoice/interface.go index 967fd7bce67..22ebb9a398e 100644 --- a/cl/phase1/forkchoice/interface.go +++ b/cl/phase1/forkchoice/interface.go @@ -89,9 +89,11 @@ type ForkChoiceStorageWriter interface { fullValidation bool, checkDataAvaibility bool, ) error - ProcessBlockExecution(ctx context.Context, block *cltypes.SignedBeaconBlock) error AddPreverifiedBlobSidecar(blobSidecar *cltypes.BlobSidecar) error OnTick(time uint64) SetSynced(synced bool) ProcessAttestingIndicies(attestation *solid.Attestation, attestionIndicies []uint64) + + ProcessBlockExecution(ctx context.Context, block *cltypes.SignedBeaconBlock) error + ProcessBlockConsensus(ctx context.Context, block *cltypes.SignedBeaconBlock) error } diff --git a/cl/phase1/forkchoice/on_block.go b/cl/phase1/forkchoice/on_block.go index cc603574efb..cd188347378 100644 --- a/cl/phase1/forkchoice/on_block.go +++ b/cl/phase1/forkchoice/on_block.go @@ -133,6 +133,13 @@ func (f *ForkChoiceStore) ProcessBlockExecution(ctx context.Context, block *clty return nil } +func (f *ForkChoiceStore) ProcessBlockConsensus(ctx context.Context, block *cltypes.SignedBeaconBlock) error { + f.mu.Lock() + defer f.mu.Unlock() + _, _, err := f.forkGraph.AddChainSegment(block, true, true) + return fmt.Errorf("ProcessBlockConsensus: replay block, status %+v", err) +} + func (f *ForkChoiceStore) OnBlock(ctx context.Context, block *cltypes.SignedBeaconBlock, newPayload, fullValidation, checkDataAvaiability bool) error { f.mu.Lock() defer f.mu.Unlock() @@ -218,7 +225,7 @@ func (f *ForkChoiceStore) OnBlock(ctx context.Context, block *cltypes.SignedBeac } log.Trace("OnBlock: engine", "elapsed", time.Since(startEngine)) startStateProcess := time.Now() - lastProcessedState, status, err := f.forkGraph.AddChainSegment(block, fullValidation) + lastProcessedState, status, err := f.forkGraph.AddChainSegment(block, fullValidation, false) if err != nil { return err } diff --git a/cl/phase1/network/services/block_service.go b/cl/phase1/network/services/block_service.go index 2763133c432..5f47c4fbf04 100644 --- a/cl/phase1/network/services/block_service.go +++ b/cl/phase1/network/services/block_service.go @@ -122,8 +122,29 @@ func (b *blockService) ProcessMessage(ctx context.Context, _ *uint64, msg *cltyp return ErrIgnore } - if err := b.forkchoiceStore.ProcessBlockExecution(ctx, msg); err != nil { - return fmt.Errorf("failed to pre-process block execution: %w", err) + var wg sync.WaitGroup + + var ( + errExec error + errConsensus error + ) + + wg.Add(2) + go func() { + defer wg.Done() + errExec = b.forkchoiceStore.ProcessBlockExecution(ctx, msg) + }() + go func() { + defer wg.Done() + errConsensus = b.forkchoiceStore.ProcessBlockConsensus(ctx, msg) + }() + wg.Wait() + + if errExec != nil { + return fmt.Errorf("failed to pre-process block execution: %w", errExec) + } + if errConsensus != nil { + return fmt.Errorf("failed to pre-process block consensus: %w", errConsensus) } if err := b.syncedData.ViewHeadState(func(headState *state.CachingBeaconState) error { From 5c9a95aeed4b9c7275dc2ec8c517faa1d58c271f Mon Sep 17 00:00:00 2001 From: Giulio Date: Fri, 29 Nov 2024 18:21:05 +0100 Subject: [PATCH 08/27] save --- cl/phase1/forkchoice/on_block.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cl/phase1/forkchoice/on_block.go b/cl/phase1/forkchoice/on_block.go index cd188347378..e6643bd7fa3 100644 --- a/cl/phase1/forkchoice/on_block.go +++ b/cl/phase1/forkchoice/on_block.go @@ -136,7 +136,9 @@ func (f *ForkChoiceStore) ProcessBlockExecution(ctx context.Context, block *clty func (f *ForkChoiceStore) ProcessBlockConsensus(ctx context.Context, block *cltypes.SignedBeaconBlock) error { f.mu.Lock() defer f.mu.Unlock() + start := time.Now() _, _, err := f.forkGraph.AddChainSegment(block, true, true) + fmt.Println("consensus time", time.Since(start)) return fmt.Errorf("ProcessBlockConsensus: replay block, status %+v", err) } From 424f0268a4c98aff31a79e007a880af4f2b1bbba Mon Sep 17 00:00:00 2001 From: Giulio Date: Fri, 29 Nov 2024 18:23:10 +0100 Subject: [PATCH 09/27] save --- cl/phase1/network/services/block_service.go | 22 ++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/cl/phase1/network/services/block_service.go b/cl/phase1/network/services/block_service.go index 5f47c4fbf04..b4e88725d63 100644 --- a/cl/phase1/network/services/block_service.go +++ b/cl/phase1/network/services/block_service.go @@ -129,17 +129,6 @@ func (b *blockService) ProcessMessage(ctx context.Context, _ *uint64, msg *cltyp errConsensus error ) - wg.Add(2) - go func() { - defer wg.Done() - errExec = b.forkchoiceStore.ProcessBlockExecution(ctx, msg) - }() - go func() { - defer wg.Done() - errConsensus = b.forkchoiceStore.ProcessBlockConsensus(ctx, msg) - }() - wg.Wait() - if errExec != nil { return fmt.Errorf("failed to pre-process block execution: %w", errExec) } @@ -182,6 +171,17 @@ func (b *blockService) ProcessMessage(ctx context.Context, _ *uint64, msg *cltyp return ErrInvalidCommitmentsCount } + wg.Add(2) + go func() { + defer wg.Done() + errExec = b.forkchoiceStore.ProcessBlockExecution(ctx, msg) + }() + go func() { + defer wg.Done() + errConsensus = b.forkchoiceStore.ProcessBlockConsensus(ctx, msg) + }() + wg.Wait() + b.publishBlockGossipEvent(msg) // the rest of the validation is done in the forkchoice store if err := b.processAndStoreBlock(ctx, msg); err != nil { From 9100f2aab87353112224f665b16b66c15c71ac3c Mon Sep 17 00:00:00 2001 From: Giulio Date: Fri, 29 Nov 2024 18:23:57 +0100 Subject: [PATCH 10/27] save --- cl/phase1/network/services/block_service.go | 50 ++++++++++----------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/cl/phase1/network/services/block_service.go b/cl/phase1/network/services/block_service.go index b4e88725d63..7cd89b6f97a 100644 --- a/cl/phase1/network/services/block_service.go +++ b/cl/phase1/network/services/block_service.go @@ -122,20 +122,6 @@ func (b *blockService) ProcessMessage(ctx context.Context, _ *uint64, msg *cltyp return ErrIgnore } - var wg sync.WaitGroup - - var ( - errExec error - errConsensus error - ) - - if errExec != nil { - return fmt.Errorf("failed to pre-process block execution: %w", errExec) - } - if errConsensus != nil { - return fmt.Errorf("failed to pre-process block consensus: %w", errConsensus) - } - if err := b.syncedData.ViewHeadState(func(headState *state.CachingBeaconState) error { // [IGNORE] The block is from a slot greater than the latest finalized slot -- i.e. validate that signed_beacon_block.message.slot > compute_start_slot_at_epoch(store.finalized_checkpoint.epoch) // (a client MAY choose to validate and store such blocks for additional purposes -- e.g. slashing detection, archive nodes, etc). @@ -171,17 +157,6 @@ func (b *blockService) ProcessMessage(ctx context.Context, _ *uint64, msg *cltyp return ErrInvalidCommitmentsCount } - wg.Add(2) - go func() { - defer wg.Done() - errExec = b.forkchoiceStore.ProcessBlockExecution(ctx, msg) - }() - go func() { - defer wg.Done() - errConsensus = b.forkchoiceStore.ProcessBlockConsensus(ctx, msg) - }() - wg.Wait() - b.publishBlockGossipEvent(msg) // the rest of the validation is done in the forkchoice store if err := b.processAndStoreBlock(ctx, msg); err != nil { @@ -228,6 +203,31 @@ func (b *blockService) scheduleBlockForLaterProcessing(block *cltypes.SignedBeac // processAndStoreBlock processes and stores a block func (b *blockService) processAndStoreBlock(ctx context.Context, block *cltypes.SignedBeaconBlock) error { + var wg sync.WaitGroup + + var ( + errExec error + errConsensus error + ) + + wg.Add(2) + go func() { + defer wg.Done() + errExec = b.forkchoiceStore.ProcessBlockExecution(ctx, block) + }() + go func() { + defer wg.Done() + errConsensus = b.forkchoiceStore.ProcessBlockConsensus(ctx, block) + }() + wg.Wait() + + if errExec != nil { + return fmt.Errorf("failed to pre-process block execution: %w", errExec) + } + if errConsensus != nil { + return fmt.Errorf("failed to pre-process block consensus: %w", errConsensus) + } + if err := b.db.Update(ctx, func(tx kv.RwTx) error { return beacon_indicies.WriteBeaconBlockAndIndicies(ctx, tx, block, false) }); err != nil { From 453577fb436fac32d135bedfd23888f3a8e2d3af Mon Sep 17 00:00:00 2001 From: Giulio Date: Fri, 29 Nov 2024 18:25:37 +0100 Subject: [PATCH 11/27] save --- cl/phase1/forkchoice/on_block.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cl/phase1/forkchoice/on_block.go b/cl/phase1/forkchoice/on_block.go index e6643bd7fa3..8ef0fb79300 100644 --- a/cl/phase1/forkchoice/on_block.go +++ b/cl/phase1/forkchoice/on_block.go @@ -139,7 +139,10 @@ func (f *ForkChoiceStore) ProcessBlockConsensus(ctx context.Context, block *clty start := time.Now() _, _, err := f.forkGraph.AddChainSegment(block, true, true) fmt.Println("consensus time", time.Since(start)) - return fmt.Errorf("ProcessBlockConsensus: replay block, status %+v", err) + if err != nil { + return fmt.Errorf("ProcessBlockConsensus: replay block, status %+v", err) + } + return nil } func (f *ForkChoiceStore) OnBlock(ctx context.Context, block *cltypes.SignedBeaconBlock, newPayload, fullValidation, checkDataAvaiability bool) error { From 1f1895b847b03b7f8cd0b95cdd57882d57bdcbb9 Mon Sep 17 00:00:00 2001 From: Giulio Date: Fri, 29 Nov 2024 18:36:15 +0100 Subject: [PATCH 12/27] save --- cl/phase1/forkchoice/fork_graph/fork_graph_disk.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cl/phase1/forkchoice/fork_graph/fork_graph_disk.go b/cl/phase1/forkchoice/fork_graph/fork_graph_disk.go index 6ff946bc870..9e3ac140a33 100644 --- a/cl/phase1/forkchoice/fork_graph/fork_graph_disk.go +++ b/cl/phase1/forkchoice/fork_graph/fork_graph_disk.go @@ -211,7 +211,7 @@ func (f *forkGraphDisk) AddChainSegment(signedBlock *cltypes.SignedBeaconBlock, parentBlock, hasParentBlock := f.getBlock(block.ParentRoot) // Before processing the state: update the newest lightclient update. - if block.Version() >= clparams.AltairVersion && hasParentBlock && fullValidation && hasFinalized && f.rcfg.Beacon && !shallowImport { + if block.Version() >= clparams.AltairVersion && hasParentBlock && fullValidation && hasFinalized && f.rcfg.Beacon { nextSyncCommitteeBranch, err := newState.NextSyncCommitteeBranch() if err != nil { return nil, LogisticError, err From 1711496ca117804ee8b5934249d6c27b72866230 Mon Sep 17 00:00:00 2001 From: Giulio Date: Fri, 29 Nov 2024 21:17:59 +0100 Subject: [PATCH 13/27] save --- cl/phase1/forkchoice/fork_graph/fork_graph_disk.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/cl/phase1/forkchoice/fork_graph/fork_graph_disk.go b/cl/phase1/forkchoice/fork_graph/fork_graph_disk.go index 9e3ac140a33..26c00eec41a 100644 --- a/cl/phase1/forkchoice/fork_graph/fork_graph_disk.go +++ b/cl/phase1/forkchoice/fork_graph/fork_graph_disk.go @@ -113,8 +113,8 @@ type forkGraphDisk struct { beaconCfg *clparams.BeaconChainConfig genesisTime uint64 // highest block seen - highestSeen, anchorSlot uint64 - lowestAvailableBlock atomic.Uint64 + anchorSlot uint64 + lowestAvailableBlock atomic.Uint64 newestLightClientUpdate atomic.Value // the lightclientUpdates leaks memory, but it's not a big deal since new data is added every 27 hours. @@ -314,9 +314,6 @@ func (f *forkGraphDisk) AddChainSegment(signedBlock *cltypes.SignedBeaconBlock, // Lastly add checkpoints to caches as well. f.currentJustifiedCheckpoints.Store(libcommon.Hash(blockRoot), newState.CurrentJustifiedCheckpoint()) f.finalizedCheckpoints.Store(libcommon.Hash(blockRoot), newState.FinalizedCheckpoint()) - if newState.Slot() > f.highestSeen { - f.highestSeen = newState.Slot() - } return newState, Success, nil } From 848c2fbe4cb11683d3eaba110b82247d44bc216b Mon Sep 17 00:00:00 2001 From: Giulio Date: Fri, 29 Nov 2024 21:33:05 +0100 Subject: [PATCH 14/27] save --- cl/beacon/handler/block_production.go | 5 ++++ cl/beacon/handler/pool.go | 27 +++++++++---------- .../forkchoice/fork_graph/fork_graph_test.go | 8 +++--- .../mock_services/forkchoice_mock.go | 8 ++++++ .../network/services/attestation_service.go | 10 +++---- .../attestation_producer.go | 1 - 6 files changed, 33 insertions(+), 26 deletions(-) diff --git a/cl/beacon/handler/block_production.go b/cl/beacon/handler/block_production.go index b1632ddc115..3785a9400f5 100644 --- a/cl/beacon/handler/block_production.go +++ b/cl/beacon/handler/block_production.go @@ -148,6 +148,8 @@ func (a *ApiHandler) GetEthV1ValidatorAttestationData( if err != nil { return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, err) } + start := time.Now() + // wait until the head state is at the target slot or later err = a.waitUntilHeadStateAtEpochIsReadyOrCountAsMissed(r.Context(), a.syncedData, *slot/a.beaconChainCfg.SlotsPerEpoch) if err != nil { @@ -182,6 +184,9 @@ func (a *ApiHandler) GetEthV1ValidatorAttestationData( if ok { return newBeaconResponse(attestationData), nil } + defer func() { + a.logger.Debug("[Hot-Path] Produced Attestation", "slot", *slot, "duration", time.Since(start)) + }() clversion := a.beaconChainCfg.GetCurrentStateVersion(*slot / a.beaconChainCfg.SlotsPerEpoch) if clversion.BeforeOrEqual(clparams.DenebVersion) && committeeIndex == nil { diff --git a/cl/beacon/handler/pool.go b/cl/beacon/handler/pool.go index ceb506b0c9c..f57366f9357 100644 --- a/cl/beacon/handler/pool.go +++ b/cl/beacon/handler/pool.go @@ -95,7 +95,6 @@ func (a *ApiHandler) PostEthV1BeaconPoolAttestations(w http.ResponseWriter, r *h } failures := []poolingFailure{} - attestationsForGossip := make([]*services.AttestationWithGossipData, 0, len(req)) for i, attestation := range req { if a.syncedData.Syncing() { @@ -121,28 +120,26 @@ func (a *ApiHandler) PostEthV1BeaconPoolAttestations(w http.ResponseWriter, r *h } cIndex = index } - subnet := subnets.ComputeSubnetForAttestation(committeeCountPerSlot, slot, cIndex, a.beaconChainCfg.SlotsPerEpoch, a.netConfig.AttestationSubnetCount) encodedSSZ, err := attestation.EncodeSSZ(nil) if err != nil { beaconhttp.NewEndpointError(http.StatusInternalServerError, err).WriteTo(w) return } + gossipData := &sentinel.GossipData{ + Data: encodedSSZ, + Name: gossip.TopicNamePrefixBeaconAttestation, + SubnetId: &subnet, + } attestationsForGossip = append(attestationsForGossip, &services.AttestationWithGossipData{ - Attestation: attestation, - GossipData: &sentinel.GossipData{ - Data: encodedSSZ, - Name: gossip.TopicNamePrefixBeaconAttestation, - SubnetId: &subnet, - }, + Attestation: attestation, + GossipData: gossipData, ImmediateProcess: true, // we want to process attestation immediately - NoPublish: true, // we don't want to publish attestation to gossip within the processer }) - } - - for _, attestationWithGossipData := range attestationsForGossip { - if _, err := a.sentinel.PublishGossip(r.Context(), attestationWithGossipData.GossipData); err != nil { - log.Warn("[Beacon REST] failed to publish attestation to gossip", "err", err) + // preemption: we publish gossip data before processing it + if _, err := a.sentinel.PublishGossip(r.Context(), gossipData); err != nil { + log.Warn("[Beacon REST] failed to publish attestation gossip", "err", err) + continue } } @@ -153,9 +150,9 @@ func (a *ApiHandler) PostEthV1BeaconPoolAttestations(w http.ResponseWriter, r *h Index: i, Message: err.Error(), }) + continue } } - if len(failures) > 0 { errResp := poolingError{ Code: http.StatusBadRequest, diff --git a/cl/phase1/forkchoice/fork_graph/fork_graph_test.go b/cl/phase1/forkchoice/fork_graph/fork_graph_test.go index 71258930173..ba507d7dba2 100644 --- a/cl/phase1/forkchoice/fork_graph/fork_graph_test.go +++ b/cl/phase1/forkchoice/fork_graph/fork_graph_test.go @@ -51,20 +51,20 @@ func TestForkGraphInDisk(t *testing.T) { require.NoError(t, utils.DecodeSSZSnappy(anchorState, anchor, int(clparams.Phase0Version))) emitter := beaconevents.NewEventEmitter() graph := NewForkGraphDisk(anchorState, afero.NewMemMapFs(), beacon_router_configuration.RouterConfiguration{}, emitter) - _, status, err := graph.AddChainSegment(blockA, true) + _, status, err := graph.AddChainSegment(blockA, true, false) require.NoError(t, err) require.Equal(t, status, Success) // Now make blockC a bad block blockC.Block.ProposerIndex = 81214459 // some invalid thing - _, status, err = graph.AddChainSegment(blockC, true) + _, status, err = graph.AddChainSegment(blockC, true, false) require.Error(t, err) require.Equal(t, status, InvalidBlock) // Save current state hash - _, status, err = graph.AddChainSegment(blockB, true) + _, status, err = graph.AddChainSegment(blockB, true, false) require.NoError(t, err) require.Equal(t, status, Success) // Try again with same should yield success - _, status, err = graph.AddChainSegment(blockB, true) + _, status, err = graph.AddChainSegment(blockB, true, false) require.NoError(t, err) require.Equal(t, status, PreValidated) } diff --git a/cl/phase1/forkchoice/mock_services/forkchoice_mock.go b/cl/phase1/forkchoice/mock_services/forkchoice_mock.go index a6bae30313f..877bcb13378 100644 --- a/cl/phase1/forkchoice/mock_services/forkchoice_mock.go +++ b/cl/phase1/forkchoice/mock_services/forkchoice_mock.go @@ -367,3 +367,11 @@ func (f *ForkChoiceStorageMock) IsRootOptimistic(root common.Hash) bool { func (f *ForkChoiceStorageMock) IsHeadOptimistic() bool { return false } + +func (f *ForkChoiceStorageMock) ProcessBlockConsensus(ctx context.Context, block *cltypes.SignedBeaconBlock) error { + return nil +} + +func (f *ForkChoiceStorageMock) ProcessBlockExecution(ctx context.Context, block *cltypes.SignedBeaconBlock) error { + return nil +} diff --git a/cl/phase1/network/services/attestation_service.go b/cl/phase1/network/services/attestation_service.go index 149350633a6..b8391a3a41e 100644 --- a/cl/phase1/network/services/attestation_service.go +++ b/cl/phase1/network/services/attestation_service.go @@ -71,7 +71,6 @@ type AttestationWithGossipData struct { GossipData *sentinel.GossipData // ImmediateProcess indicates whether the attestation should be processed immediately or able to be scheduled for later processing. ImmediateProcess bool - NoPublish bool } func NewAttestationService( @@ -270,7 +269,7 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64, }, } - // For this object it is 60% faster to verify the signature in a single call than batch verify it. + // For this object it is 60% faster to verify the signature in a single call than to verify it in a loop. if att.ImmediateProcess { valid, err := bls.Verify(signature[:], signingRoot[:], pubKey[:]) if err != nil { @@ -281,11 +280,10 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64, log.Debug("[AttestationService] received invalid signature on the gossip", "topic", att.GossipData.Name) return fmt.Errorf("invalid signature") } - if !att.NoPublish { - if _, err = s.batchSignatureVerifier.sentinel.PublishGossip(ctx, att.GossipData); err != nil { - log.Debug("failed to publish gossip", "err", err) - } + if _, err = s.batchSignatureVerifier.sentinel.PublishGossip(ctx, att.GossipData); err != nil { + log.Debug("failed to publish gossip", "err", err) } + aggregateVerificationData.F() return nil } diff --git a/cl/validator/attestation_producer/attestation_producer.go b/cl/validator/attestation_producer/attestation_producer.go index 4ee667ac65c..4f16a6db3fa 100644 --- a/cl/validator/attestation_producer/attestation_producer.go +++ b/cl/validator/attestation_producer/attestation_producer.go @@ -200,7 +200,6 @@ func (ap *attestationProducer) ProduceAndCacheAttestationData(tx kv.Tx, baseStat Source: baseState.CurrentJustifiedCheckpoint(), Target: targetCheckpoint, } - fmt.Println("baseAttestationData", baseAttestationData, "epoch", epoch, "baseStateBlockRoot", baseStateBlockRoot) ap.attestationsCache.Add(epoch, baseAttestationData) ap.blockRootsUsedForSlotCache.Add(slot, baseStateBlockRoot) From 3355b1c1a5478cd4340658df43d61983c9855a7f Mon Sep 17 00:00:00 2001 From: Giulio Date: Fri, 29 Nov 2024 21:34:07 +0100 Subject: [PATCH 15/27] save --- cl/beacon/handler/block_production.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/cl/beacon/handler/block_production.go b/cl/beacon/handler/block_production.go index 3785a9400f5..531961cac04 100644 --- a/cl/beacon/handler/block_production.go +++ b/cl/beacon/handler/block_production.go @@ -181,12 +181,14 @@ func (a *ApiHandler) GetEthV1ValidatorAttestationData( if err != nil { log.Warn("Failed to get attestation data", "err", err) } + + defer func() { + a.logger.Debug("[Hot-Path] Produced Attestation", "slot", *slot, "committee_index", *committeeIndex, "cached", "beacon_block_root", attestationData.BeaconBlockRoot, ok, "duration", time.Since(start)) + }() + if ok { return newBeaconResponse(attestationData), nil } - defer func() { - a.logger.Debug("[Hot-Path] Produced Attestation", "slot", *slot, "duration", time.Since(start)) - }() clversion := a.beaconChainCfg.GetCurrentStateVersion(*slot / a.beaconChainCfg.SlotsPerEpoch) if clversion.BeforeOrEqual(clparams.DenebVersion) && committeeIndex == nil { From 3433826c55af33ffcac3bb87cb8c97b94e1a968c Mon Sep 17 00:00:00 2001 From: Giulio Date: Fri, 29 Nov 2024 21:34:25 +0100 Subject: [PATCH 16/27] save --- cl/beacon/handler/block_production.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cl/beacon/handler/block_production.go b/cl/beacon/handler/block_production.go index 531961cac04..1e950f52797 100644 --- a/cl/beacon/handler/block_production.go +++ b/cl/beacon/handler/block_production.go @@ -183,7 +183,9 @@ func (a *ApiHandler) GetEthV1ValidatorAttestationData( } defer func() { - a.logger.Debug("[Hot-Path] Produced Attestation", "slot", *slot, "committee_index", *committeeIndex, "cached", "beacon_block_root", attestationData.BeaconBlockRoot, ok, "duration", time.Since(start)) + a.logger.Debug("Produced Attestation", "slot", *slot, + "committee_index", *committeeIndex, "cached", "beacon_block_root", + attestationData.BeaconBlockRoot, ok, "duration", time.Since(start)) }() if ok { From 1b9cb661ec64aa93987b5d5a89ade72912d2d555 Mon Sep 17 00:00:00 2001 From: Giulio Date: Fri, 29 Nov 2024 21:40:21 +0100 Subject: [PATCH 17/27] save --- cl/phase1/forkchoice/fork_graph/fork_graph_disk.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cl/phase1/forkchoice/fork_graph/fork_graph_disk.go b/cl/phase1/forkchoice/fork_graph/fork_graph_disk.go index 26c00eec41a..92418688d19 100644 --- a/cl/phase1/forkchoice/fork_graph/fork_graph_disk.go +++ b/cl/phase1/forkchoice/fork_graph/fork_graph_disk.go @@ -211,7 +211,7 @@ func (f *forkGraphDisk) AddChainSegment(signedBlock *cltypes.SignedBeaconBlock, parentBlock, hasParentBlock := f.getBlock(block.ParentRoot) // Before processing the state: update the newest lightclient update. - if block.Version() >= clparams.AltairVersion && hasParentBlock && fullValidation && hasFinalized && f.rcfg.Beacon { + if block.Version() >= clparams.AltairVersion && hasParentBlock && fullValidation && hasFinalized && f.rcfg.Beacon && !isBlockRootTheCurrentState { nextSyncCommitteeBranch, err := newState.NextSyncCommitteeBranch() if err != nil { return nil, LogisticError, err From 3fa6b845fffe1ac47b5a34351cadf9293d87b249 Mon Sep 17 00:00:00 2001 From: Giulio Date: Fri, 29 Nov 2024 21:41:23 +0100 Subject: [PATCH 18/27] save --- cl/phase1/network/services/attestation_service.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cl/phase1/network/services/attestation_service.go b/cl/phase1/network/services/attestation_service.go index b8391a3a41e..1b1c6ceeb16 100644 --- a/cl/phase1/network/services/attestation_service.go +++ b/cl/phase1/network/services/attestation_service.go @@ -288,7 +288,8 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64, return nil } - s.batchSignatureVerifier.AsyncVerifyAggregateProof(aggregateVerificationData) + // push the signatures to verify asynchronously and run final functions after that. + s.batchSignatureVerifier.AsyncVerifyAttestation(aggregateVerificationData) // As the logic goes, if we return ErrIgnore there will be no peer banning and further publishing // gossip data into the network by the gossip manager. That's what we want because we will be doing that ourselves From 82d1a087bb1686dd949cdf6439fa7229cab3d4f3 Mon Sep 17 00:00:00 2001 From: Giulio Date: Fri, 29 Nov 2024 21:42:56 +0100 Subject: [PATCH 19/27] save --- cl/beacon/handler/pool.go | 26 +++++++++----------------- 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/cl/beacon/handler/pool.go b/cl/beacon/handler/pool.go index f57366f9357..264446e9947 100644 --- a/cl/beacon/handler/pool.go +++ b/cl/beacon/handler/pool.go @@ -95,7 +95,6 @@ func (a *ApiHandler) PostEthV1BeaconPoolAttestations(w http.ResponseWriter, r *h } failures := []poolingFailure{} - attestationsForGossip := make([]*services.AttestationWithGossipData, 0, len(req)) for i, attestation := range req { if a.syncedData.Syncing() { beaconhttp.NewEndpointError(http.StatusServiceUnavailable, errors.New("head state not available")).WriteTo(w) @@ -120,31 +119,24 @@ func (a *ApiHandler) PostEthV1BeaconPoolAttestations(w http.ResponseWriter, r *h } cIndex = index } + subnet := subnets.ComputeSubnetForAttestation(committeeCountPerSlot, slot, cIndex, a.beaconChainCfg.SlotsPerEpoch, a.netConfig.AttestationSubnetCount) encodedSSZ, err := attestation.EncodeSSZ(nil) if err != nil { beaconhttp.NewEndpointError(http.StatusInternalServerError, err).WriteTo(w) return } - gossipData := &sentinel.GossipData{ - Data: encodedSSZ, - Name: gossip.TopicNamePrefixBeaconAttestation, - SubnetId: &subnet, - } - attestationsForGossip = append(attestationsForGossip, &services.AttestationWithGossipData{ - Attestation: attestation, - GossipData: gossipData, + attestationWithGossipData := &services.AttestationWithGossipData{ + Attestation: attestation, + GossipData: &sentinel.GossipData{ + Data: encodedSSZ, + Name: gossip.TopicNamePrefixBeaconAttestation, + SubnetId: &subnet, + }, ImmediateProcess: true, // we want to process attestation immediately - }) - // preemption: we publish gossip data before processing it - if _, err := a.sentinel.PublishGossip(r.Context(), gossipData); err != nil { - log.Warn("[Beacon REST] failed to publish attestation gossip", "err", err) - continue } - } - for i, attestationWithGossipData := range attestationsForGossip { - if err := a.attestationService.ProcessMessage(r.Context(), attestationWithGossipData.GossipData.SubnetId, attestationWithGossipData); err != nil && !errors.Is(err, services.ErrIgnore) { + if err := a.attestationService.ProcessMessage(r.Context(), &subnet, attestationWithGossipData); err != nil && !errors.Is(err, services.ErrIgnore) { log.Warn("[Beacon REST] failed to process attestation in attestation service", "err", err) failures = append(failures, poolingFailure{ Index: i, From 2e3b665d36b4f8e97f94f6d19a0b4a33970756c7 Mon Sep 17 00:00:00 2001 From: Giulio Date: Fri, 29 Nov 2024 21:45:36 +0100 Subject: [PATCH 20/27] save --- cl/phase1/forkchoice/on_block.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/cl/phase1/forkchoice/on_block.go b/cl/phase1/forkchoice/on_block.go index 8ef0fb79300..8bfec99c7ca 100644 --- a/cl/phase1/forkchoice/on_block.go +++ b/cl/phase1/forkchoice/on_block.go @@ -138,10 +138,12 @@ func (f *ForkChoiceStore) ProcessBlockConsensus(ctx context.Context, block *clty defer f.mu.Unlock() start := time.Now() _, _, err := f.forkGraph.AddChainSegment(block, true, true) - fmt.Println("consensus time", time.Since(start)) if err != nil { return fmt.Errorf("ProcessBlockConsensus: replay block, status %+v", err) } + if time.Since(start) > 1*time.Millisecond { + log.Debug("OnBlock", "elapsed", time.Since(start)) + } return nil } @@ -190,9 +192,9 @@ func (f *ForkChoiceStore) OnBlock(ctx context.Context, block *cltypes.SignedBeac } } + isVerifiedExecutionPayload := f.verifiedExecutionPayload.Contains(blockRoot) startEngine := time.Now() - fmt.Println(!f.verifiedExecutionPayload.Contains(blockRoot)) - if newPayload && f.engine != nil && !f.verifiedExecutionPayload.Contains(blockRoot) { + if newPayload && f.engine != nil && !isVerifiedExecutionPayload { if block.Version() >= clparams.DenebVersion { if err := verifyKzgCommitmentsAgainstTransactions(f.beaconCfg, block.Block.Body.ExecutionPayload, block.Block.Body.BlobKzgCommitments); err != nil { return fmt.Errorf("OnBlock: failed to process kzg commitments: %v", err) @@ -331,8 +333,9 @@ func (f *ForkChoiceStore) OnBlock(ctx context.Context, block *cltypes.SignedBeac if f.validatorMonitor != nil { f.validatorMonitor.OnNewBlock(lastProcessedState, block.Block) } - - log.Debug("OnBlock", "elapsed", time.Since(start)) + if !isVerifiedExecutionPayload { + log.Debug("OnBlock", "elapsed", time.Since(start)) + } return nil } From 6245e769fdd1428e2e4adf857ad7a3e85db7a183 Mon Sep 17 00:00:00 2001 From: Giulio Date: Fri, 29 Nov 2024 21:49:05 +0100 Subject: [PATCH 21/27] save --- cl/phase1/network/services/attestation_service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cl/phase1/network/services/attestation_service.go b/cl/phase1/network/services/attestation_service.go index 1b1c6ceeb16..f14a1e84f19 100644 --- a/cl/phase1/network/services/attestation_service.go +++ b/cl/phase1/network/services/attestation_service.go @@ -99,7 +99,7 @@ func NewAttestationService( attestationProcessed: lru.NewWithTTL[[32]byte, struct{}]("attestation_processed", validatorAttestationCacheSize, epochDuration), } - //go a.loop(ctx) + //go a.loop(ct) return a } From c27f1ed27db9b00b9ee8d55b66e9f69839b88f54 Mon Sep 17 00:00:00 2001 From: Giulio Date: Fri, 29 Nov 2024 21:49:13 +0100 Subject: [PATCH 22/27] save --- cl/phase1/network/services/attestation_service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cl/phase1/network/services/attestation_service.go b/cl/phase1/network/services/attestation_service.go index f14a1e84f19..1b1c6ceeb16 100644 --- a/cl/phase1/network/services/attestation_service.go +++ b/cl/phase1/network/services/attestation_service.go @@ -99,7 +99,7 @@ func NewAttestationService( attestationProcessed: lru.NewWithTTL[[32]byte, struct{}]("attestation_processed", validatorAttestationCacheSize, epochDuration), } - //go a.loop(ct) + //go a.loop(ctx) return a } From 8d444e2c618c694fb56391f4153265d8808376c6 Mon Sep 17 00:00:00 2001 From: Giulio Date: Fri, 29 Nov 2024 21:51:37 +0100 Subject: [PATCH 23/27] save --- .../network/services/attestation_service.go | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/cl/phase1/network/services/attestation_service.go b/cl/phase1/network/services/attestation_service.go index 1b1c6ceeb16..48d8a98e50f 100644 --- a/cl/phase1/network/services/attestation_service.go +++ b/cl/phase1/network/services/attestation_service.go @@ -23,7 +23,6 @@ import ( "sync" "time" - "github.com/Giulio2002/bls" "github.com/erigontech/erigon-lib/common" sentinel "github.com/erigontech/erigon-lib/gointerfaces/sentinelproto" "github.com/erigontech/erigon-lib/log/v3" @@ -269,23 +268,9 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64, }, } - // For this object it is 60% faster to verify the signature in a single call than to verify it in a loop. if att.ImmediateProcess { - valid, err := bls.Verify(signature[:], signingRoot[:], pubKey[:]) - if err != nil { - log.Crit("[AttestationService] signature verification failed with the error: " + err.Error()) - return err - } - if !valid { - log.Debug("[AttestationService] received invalid signature on the gossip", "topic", att.GossipData.Name) - return fmt.Errorf("invalid signature") - } - if _, err = s.batchSignatureVerifier.sentinel.PublishGossip(ctx, att.GossipData); err != nil { - log.Debug("failed to publish gossip", "err", err) - } + return s.batchSignatureVerifier.ImmediateVerification(aggregateVerificationData) - aggregateVerificationData.F() - return nil } // push the signatures to verify asynchronously and run final functions after that. From cbba4e89bf762b392afb58b14c61005ba9b37279 Mon Sep 17 00:00:00 2001 From: Giulio Date: Fri, 29 Nov 2024 22:03:32 +0100 Subject: [PATCH 24/27] save --- cl/beacon/handler/block_production.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cl/beacon/handler/block_production.go b/cl/beacon/handler/block_production.go index 1e950f52797..9878a290b09 100644 --- a/cl/beacon/handler/block_production.go +++ b/cl/beacon/handler/block_production.go @@ -184,7 +184,7 @@ func (a *ApiHandler) GetEthV1ValidatorAttestationData( defer func() { a.logger.Debug("Produced Attestation", "slot", *slot, - "committee_index", *committeeIndex, "cached", "beacon_block_root", + "committee_index", *committeeIndex, "cached", ok, "beacon_block_root", attestationData.BeaconBlockRoot, ok, "duration", time.Since(start)) }() From 0e199f5d2c8bbf5537c61cd021a4f767b9822081 Mon Sep 17 00:00:00 2001 From: Giulio Date: Fri, 29 Nov 2024 22:08:13 +0100 Subject: [PATCH 25/27] save --- cl/beacon/handler/block_production.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cl/beacon/handler/block_production.go b/cl/beacon/handler/block_production.go index 9878a290b09..bf96b718e58 100644 --- a/cl/beacon/handler/block_production.go +++ b/cl/beacon/handler/block_production.go @@ -185,7 +185,7 @@ func (a *ApiHandler) GetEthV1ValidatorAttestationData( defer func() { a.logger.Debug("Produced Attestation", "slot", *slot, "committee_index", *committeeIndex, "cached", ok, "beacon_block_root", - attestationData.BeaconBlockRoot, ok, "duration", time.Since(start)) + attestationData.BeaconBlockRoot, "duration", time.Since(start)) }() if ok { From eb54268e7a5ef661128da4a8b6339620e746d470 Mon Sep 17 00:00:00 2001 From: Giulio Date: Sat, 30 Nov 2024 11:51:26 +0100 Subject: [PATCH 26/27] save --- cl/phase1/network/gossip_manager.go | 2 +- cl/phase1/network/services/block_service.go | 33 +++++++-------------- 2 files changed, 12 insertions(+), 23 deletions(-) diff --git a/cl/phase1/network/gossip_manager.go b/cl/phase1/network/gossip_manager.go index 3f4540d18e6..09942967a00 100644 --- a/cl/phase1/network/gossip_manager.go +++ b/cl/phase1/network/gossip_manager.go @@ -104,7 +104,7 @@ func NewGossipReceiver( voluntaryExitService: voluntaryExitService, blsToExecutionChangeService: blsToExecutionChangeService, proposerSlashingService: proposerSlashingService, - attestationsLimiter: newTimeBasedRateLimiter(6*time.Second, 250), + attestationsLimiter: newTimeBasedRateLimiter(6*time.Second, 800), } } diff --git a/cl/phase1/network/services/block_service.go b/cl/phase1/network/services/block_service.go index 7cd89b6f97a..56ed8be96e8 100644 --- a/cl/phase1/network/services/block_service.go +++ b/cl/phase1/network/services/block_service.go @@ -19,7 +19,6 @@ package services import ( "context" "errors" - "fmt" "sync" "time" @@ -37,6 +36,7 @@ import ( "github.com/erigontech/erigon/cl/phase1/forkchoice" "github.com/erigontech/erigon/cl/transition/impl/eth2" "github.com/erigontech/erigon/cl/utils/eth_clock" + "golang.org/x/sync/errgroup" ) var ( @@ -203,29 +203,18 @@ func (b *blockService) scheduleBlockForLaterProcessing(block *cltypes.SignedBeac // processAndStoreBlock processes and stores a block func (b *blockService) processAndStoreBlock(ctx context.Context, block *cltypes.SignedBeaconBlock) error { - var wg sync.WaitGroup + group, _ := errgroup.WithContext(ctx) - var ( - errExec error - errConsensus error - ) - - wg.Add(2) - go func() { - defer wg.Done() - errExec = b.forkchoiceStore.ProcessBlockExecution(ctx, block) - }() - go func() { - defer wg.Done() - errConsensus = b.forkchoiceStore.ProcessBlockConsensus(ctx, block) - }() - wg.Wait() + group.Go(func() error { + return b.forkchoiceStore.ProcessBlockExecution(ctx, block) + }) + group.Go(func() error { + return b.forkchoiceStore.ProcessBlockConsensus(ctx, block) + }) - if errExec != nil { - return fmt.Errorf("failed to pre-process block execution: %w", errExec) - } - if errConsensus != nil { - return fmt.Errorf("failed to pre-process block consensus: %w", errConsensus) + err := group.Wait() + if err != nil { + return err } if err := b.db.Update(ctx, func(tx kv.RwTx) error { From b05b964793774e69fe03cf8bc170fa641f92d9c0 Mon Sep 17 00:00:00 2001 From: Giulio Date: Sat, 30 Nov 2024 12:22:26 +0100 Subject: [PATCH 27/27] save --- cl/phase1/network/gossip_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cl/phase1/network/gossip_manager.go b/cl/phase1/network/gossip_manager.go index 09942967a00..3f4540d18e6 100644 --- a/cl/phase1/network/gossip_manager.go +++ b/cl/phase1/network/gossip_manager.go @@ -104,7 +104,7 @@ func NewGossipReceiver( voluntaryExitService: voluntaryExitService, blsToExecutionChangeService: blsToExecutionChangeService, proposerSlashingService: proposerSlashingService, - attestationsLimiter: newTimeBasedRateLimiter(6*time.Second, 800), + attestationsLimiter: newTimeBasedRateLimiter(6*time.Second, 250), } }