Skip to content

Commit

Permalink
Caplin: Fixed goroutine hell (#9246)
Browse files Browse the repository at this point in the history
  • Loading branch information
Giulio2002 authored Jan 17, 2024
1 parent e1195ba commit e1a1c0c
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 10 deletions.
4 changes: 4 additions & 0 deletions cl/cltypes/blinded_beacon_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,10 @@ func (b *SignedBlindedBeaconBlock) Clone() clonable.Clonable {
return NewSignedBlindedBeaconBlock(b.Block.Body.beaconCfg)
}

func (b *BlindedBeaconBody) ExecutionPayloadMerkleProof() ([][32]byte, error) {
return merkle_tree.MerkleProof(4, 9, b.getSchema(false)...)
}

// make sure that the type implements the interface ssz2.ObjectSSZ
var _ ssz2.ObjectSSZ = (*BlindedBeaconBody)(nil)
var _ ssz2.ObjectSSZ = (*BlindedBeaconBlock)(nil)
Expand Down
29 changes: 29 additions & 0 deletions cl/persistence/format/snapshot_format/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,32 @@ func ReadBlockHeaderFromSnapshotWithExecutionData(r io.Reader, cfg *clparams.Bea
blockHash := blindedBlock.Block.Body.ExecutionPayload.BlockHash
return blockHeader, blockNumber, blockHash, nil
}

func ReadBlindedBlockFromSnapshot(r io.Reader, cfg *clparams.BeaconChainConfig) (*cltypes.SignedBlindedBeaconBlock, error) {
buffer := buffersPool.Get().(*bytes.Buffer)
defer buffersPool.Put(buffer)
buffer.Reset()

blindedBlock := cltypes.NewSignedBlindedBeaconBlock(cfg)

// Read the metadata
metadataSlab := make([]byte, 33)
v, _, err := readMetadataForBlock(r, metadataSlab)
if err != nil {
return nil, err
}
// Read the length
length := make([]byte, 8)
if _, err := io.ReadFull(r, length); err != nil {
return nil, err
}
// Read the block
if _, err := io.CopyN(buffer, r, int64(binary.BigEndian.Uint64(length))); err != nil {
return nil, err
}
// Decode the block in blinded
if err := blindedBlock.DecodeSSZ(buffer.Bytes(), int(v)); err != nil {
return nil, err
}
return blindedBlock, nil
}
12 changes: 11 additions & 1 deletion cl/persistence/format/snapshot_format/blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,22 @@ func TestBlockSnapshotEncoding(t *testing.T) {
require.NoError(t, err)
hash3, err := header.HashSSZ()
require.NoError(t, err)

// now do it with blinded
require.Equal(t, hash1, hash2)

require.Equal(t, header.Signature, blk.Signature)
require.Equal(t, header.Header.Slot, blk.Block.Slot)

b.Reset()
_, err = snapshot_format.WriteBlockForSnapshot(&b, blk, nil)
require.NoError(t, err)
blk4, err := snapshot_format.ReadBlindedBlockFromSnapshot(&b, &clparams.MainnetBeaconConfig)
require.NoError(t, err)

hash4, err := blk4.HashSSZ()
require.NoError(t, err)
require.Equal(t, hash1, hash4)

if blk.Version() >= clparams.BellatrixVersion {
require.Equal(t, bn, blk.Block.Body.ExecutionPayload.BlockNumber)
require.Equal(t, bHash, blk.Block.Body.ExecutionPayload.BlockHash)
Expand Down
2 changes: 2 additions & 0 deletions cl/phase1/forkchoice/forkchoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ type ForkChoiceStore struct {
unrealizedJustifiedCheckpoint solid.Checkpoint
unrealizedFinalizedCheckpoint solid.Checkpoint
proposerBoostRoot libcommon.Hash
// attestations that are not yet processed
attestationSet sync.Map
// head data
headHash libcommon.Hash
headSlot uint64
Expand Down
50 changes: 41 additions & 9 deletions cl/phase1/forkchoice/on_attestation.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
libcommon "github.com/ledgerwatch/erigon-lib/common"
)

const maxAttestationJobLifetime = 30 * time.Minute

// OnAttestation processes incoming attestations.
func (f *ForkChoiceStore) OnAttestation(attestation *solid.Attestation, fromBlock bool, insert bool) error {
if !f.synced.Load() {
Expand Down Expand Up @@ -81,6 +83,10 @@ func (f *ForkChoiceStore) OnAggregateAndProof(aggregateAndProof *cltypes.SignedA
committeeIndex := aggregateAndProof.Message.Aggregate.AttestantionData().ValidatorIndex()
epoch := state.GetEpochAtSlot(f.beaconCfg, slot)

if err := f.validateOnAttestation(aggregateAndProof.Message.Aggregate, false); err != nil {
return err
}

target := aggregateAndProof.Message.Aggregate.AttestantionData().Target()
targetState, err := f.getCheckpointState(target)
if err != nil {
Expand All @@ -101,22 +107,48 @@ func (f *ForkChoiceStore) OnAggregateAndProof(aggregateAndProof *cltypes.SignedA
return f.OnAttestation(aggregateAndProof.Message.Aggregate, false, false)
}

type attestationJob struct {
attestation *solid.Attestation
insert bool
when time.Time
}

// scheduleAttestationForLaterProcessing scheudules an attestation for later processing
func (f *ForkChoiceStore) scheduleAttestationForLaterProcessing(attestation *solid.Attestation, insert bool) {
root, err := attestation.HashSSZ()
if err != nil {
log.Error("failed to hash attestation", "err", err)
return
}
f.attestationSet.Store(root, &attestationJob{
attestation: attestation,
insert: insert,
when: time.Now(),
})
}

func (f *ForkChoiceStore) StartAttestationsRTT() {
go func() {
logInterval := time.NewTicker(50 * time.Millisecond)
interval := time.NewTicker(500 * time.Millisecond)
for {
select {
case <-f.ctx.Done():
return
case <-logInterval.C:
if f.Slot() < attestation.AttestantionData().Slot()+1 {
continue
}
if err := f.OnAttestation(attestation, false, insert); err != nil {
log.Debug("could not process scheduled attestation", "reason", err)
}
return
case <-interval.C:
f.attestationSet.Range(func(key, value interface{}) bool {
job := value.(*attestationJob)
if time.Since(job.when) > maxAttestationJobLifetime {
f.attestationSet.Delete(key)
return true
}
if f.Slot() >= job.attestation.AttestantionData().Slot()+1 {
if err := f.OnAttestation(job.attestation, false, job.insert); err != nil {
log.Warn("failed to process attestation", "err", err)
}
f.attestationSet.Delete(key)
}
return true
})
}
}
}()
Expand Down
2 changes: 2 additions & 0 deletions cmd/caplin/caplin1/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ func RunCaplinPhase1(ctx context.Context, sentinel sentinel.SentinelClient, engi
log.Info("Beacon API started", "addr", cfg.Address)
}

forkChoice.StartAttestationsRTT()

stageCfg := stages.ClStagesCfg(beaconRpc, antiq, genesisConfig, beaconConfig, state, engine, gossipManager, forkChoice, historyDB, indexDB, csn, dirs.Tmp, dbConfig, backfilling, syncedDataManager)
sync := stages.ConsensusClStages(ctx, stageCfg)

Expand Down

0 comments on commit e1a1c0c

Please sign in to comment.