From e1a1c0c049c04ddd85726c0f204724bb4728513e Mon Sep 17 00:00:00 2001 From: Giulio rebuffo Date: Wed, 17 Jan 2024 10:06:18 +0100 Subject: [PATCH] Caplin: Fixed goroutine hell (#9246) --- cl/cltypes/blinded_beacon_block.go | 4 ++ .../format/snapshot_format/blocks.go | 29 +++++++++++ .../format/snapshot_format/blocks_test.go | 12 ++++- cl/phase1/forkchoice/forkchoice.go | 2 + cl/phase1/forkchoice/on_attestation.go | 50 +++++++++++++++---- cmd/caplin/caplin1/run.go | 2 + 6 files changed, 89 insertions(+), 10 deletions(-) diff --git a/cl/cltypes/blinded_beacon_block.go b/cl/cltypes/blinded_beacon_block.go index ca62b8580d0..f15a6d83c66 100644 --- a/cl/cltypes/blinded_beacon_block.go +++ b/cl/cltypes/blinded_beacon_block.go @@ -309,6 +309,10 @@ func (b *SignedBlindedBeaconBlock) Clone() clonable.Clonable { return NewSignedBlindedBeaconBlock(b.Block.Body.beaconCfg) } +func (b *BlindedBeaconBody) ExecutionPayloadMerkleProof() ([][32]byte, error) { + return merkle_tree.MerkleProof(4, 9, b.getSchema(false)...) +} + // make sure that the type implements the interface ssz2.ObjectSSZ var _ ssz2.ObjectSSZ = (*BlindedBeaconBody)(nil) var _ ssz2.ObjectSSZ = (*BlindedBeaconBlock)(nil) diff --git a/cl/persistence/format/snapshot_format/blocks.go b/cl/persistence/format/snapshot_format/blocks.go index 3692a6de30b..11f68d4b8e5 100644 --- a/cl/persistence/format/snapshot_format/blocks.go +++ b/cl/persistence/format/snapshot_format/blocks.go @@ -156,3 +156,32 @@ func ReadBlockHeaderFromSnapshotWithExecutionData(r io.Reader, cfg *clparams.Bea blockHash := blindedBlock.Block.Body.ExecutionPayload.BlockHash return blockHeader, blockNumber, blockHash, nil } + +func ReadBlindedBlockFromSnapshot(r io.Reader, cfg *clparams.BeaconChainConfig) (*cltypes.SignedBlindedBeaconBlock, error) { + buffer := buffersPool.Get().(*bytes.Buffer) + defer buffersPool.Put(buffer) + buffer.Reset() + + blindedBlock := cltypes.NewSignedBlindedBeaconBlock(cfg) + + // Read the metadata + metadataSlab := make([]byte, 33) + v, _, err := readMetadataForBlock(r, metadataSlab) + if err != nil { + return nil, err + } + // Read the length + length := make([]byte, 8) + if _, err := io.ReadFull(r, length); err != nil { + return nil, err + } + // Read the block + if _, err := io.CopyN(buffer, r, int64(binary.BigEndian.Uint64(length))); err != nil { + return nil, err + } + // Decode the block in blinded + if err := blindedBlock.DecodeSSZ(buffer.Bytes(), int(v)); err != nil { + return nil, err + } + return blindedBlock, nil +} diff --git a/cl/persistence/format/snapshot_format/blocks_test.go b/cl/persistence/format/snapshot_format/blocks_test.go index 8c357fd4b01..eae38978810 100644 --- a/cl/persistence/format/snapshot_format/blocks_test.go +++ b/cl/persistence/format/snapshot_format/blocks_test.go @@ -74,12 +74,22 @@ func TestBlockSnapshotEncoding(t *testing.T) { require.NoError(t, err) hash3, err := header.HashSSZ() require.NoError(t, err) - + // now do it with blinded require.Equal(t, hash1, hash2) require.Equal(t, header.Signature, blk.Signature) require.Equal(t, header.Header.Slot, blk.Block.Slot) + b.Reset() + _, err = snapshot_format.WriteBlockForSnapshot(&b, blk, nil) + require.NoError(t, err) + blk4, err := snapshot_format.ReadBlindedBlockFromSnapshot(&b, &clparams.MainnetBeaconConfig) + require.NoError(t, err) + + hash4, err := blk4.HashSSZ() + require.NoError(t, err) + require.Equal(t, hash1, hash4) + if blk.Version() >= clparams.BellatrixVersion { require.Equal(t, bn, blk.Block.Body.ExecutionPayload.BlockNumber) require.Equal(t, bHash, blk.Block.Body.ExecutionPayload.BlockHash) diff --git a/cl/phase1/forkchoice/forkchoice.go b/cl/phase1/forkchoice/forkchoice.go index 1e87939540b..689c8240d3a 100644 --- a/cl/phase1/forkchoice/forkchoice.go +++ b/cl/phase1/forkchoice/forkchoice.go @@ -80,6 +80,8 @@ type ForkChoiceStore struct { unrealizedJustifiedCheckpoint solid.Checkpoint unrealizedFinalizedCheckpoint solid.Checkpoint proposerBoostRoot libcommon.Hash + // attestations that are not yet processed + attestationSet sync.Map // head data headHash libcommon.Hash headSlot uint64 diff --git a/cl/phase1/forkchoice/on_attestation.go b/cl/phase1/forkchoice/on_attestation.go index 3e80efac4e8..3c59c4c2eb8 100644 --- a/cl/phase1/forkchoice/on_attestation.go +++ b/cl/phase1/forkchoice/on_attestation.go @@ -13,6 +13,8 @@ import ( libcommon "github.com/ledgerwatch/erigon-lib/common" ) +const maxAttestationJobLifetime = 30 * time.Minute + // OnAttestation processes incoming attestations. func (f *ForkChoiceStore) OnAttestation(attestation *solid.Attestation, fromBlock bool, insert bool) error { if !f.synced.Load() { @@ -81,6 +83,10 @@ func (f *ForkChoiceStore) OnAggregateAndProof(aggregateAndProof *cltypes.SignedA committeeIndex := aggregateAndProof.Message.Aggregate.AttestantionData().ValidatorIndex() epoch := state.GetEpochAtSlot(f.beaconCfg, slot) + if err := f.validateOnAttestation(aggregateAndProof.Message.Aggregate, false); err != nil { + return err + } + target := aggregateAndProof.Message.Aggregate.AttestantionData().Target() targetState, err := f.getCheckpointState(target) if err != nil { @@ -101,22 +107,48 @@ func (f *ForkChoiceStore) OnAggregateAndProof(aggregateAndProof *cltypes.SignedA return f.OnAttestation(aggregateAndProof.Message.Aggregate, false, false) } +type attestationJob struct { + attestation *solid.Attestation + insert bool + when time.Time +} + // scheduleAttestationForLaterProcessing scheudules an attestation for later processing func (f *ForkChoiceStore) scheduleAttestationForLaterProcessing(attestation *solid.Attestation, insert bool) { + root, err := attestation.HashSSZ() + if err != nil { + log.Error("failed to hash attestation", "err", err) + return + } + f.attestationSet.Store(root, &attestationJob{ + attestation: attestation, + insert: insert, + when: time.Now(), + }) +} + +func (f *ForkChoiceStore) StartAttestationsRTT() { go func() { - logInterval := time.NewTicker(50 * time.Millisecond) + interval := time.NewTicker(500 * time.Millisecond) for { select { case <-f.ctx.Done(): return - case <-logInterval.C: - if f.Slot() < attestation.AttestantionData().Slot()+1 { - continue - } - if err := f.OnAttestation(attestation, false, insert); err != nil { - log.Debug("could not process scheduled attestation", "reason", err) - } - return + case <-interval.C: + f.attestationSet.Range(func(key, value interface{}) bool { + job := value.(*attestationJob) + if time.Since(job.when) > maxAttestationJobLifetime { + f.attestationSet.Delete(key) + return true + } + if f.Slot() >= job.attestation.AttestantionData().Slot()+1 { + if err := f.OnAttestation(job.attestation, false, job.insert); err != nil { + log.Warn("failed to process attestation", "err", err) + } + f.attestationSet.Delete(key) + } + return true + }) } } }() diff --git a/cmd/caplin/caplin1/run.go b/cmd/caplin/caplin1/run.go index 6edd4857287..a2b27138178 100644 --- a/cmd/caplin/caplin1/run.go +++ b/cmd/caplin/caplin1/run.go @@ -226,6 +226,8 @@ func RunCaplinPhase1(ctx context.Context, sentinel sentinel.SentinelClient, engi log.Info("Beacon API started", "addr", cfg.Address) } + forkChoice.StartAttestationsRTT() + stageCfg := stages.ClStagesCfg(beaconRpc, antiq, genesisConfig, beaconConfig, state, engine, gossipManager, forkChoice, historyDB, indexDB, csn, dirs.Tmp, dbConfig, backfilling, syncedDataManager) sync := stages.ConsensusClStages(ctx, stageCfg)