Skip to content

Commit

Permalink
eth/fetcher: remove light mode in block fetcher (#2804)
Browse files Browse the repository at this point in the history
  • Loading branch information
buddh0 authored Dec 16, 2024
1 parent e052b5d commit 87055b5
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 224 deletions.
93 changes: 5 additions & 88 deletions eth/fetcher/block_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ var (
blockInsertFailGauge = metrics.NewRegisteredGauge("chain/insert/failed", nil)
)

// HeaderRetrievalFn is a callback type for retrieving a header from the local chain.
type HeaderRetrievalFn func(common.Hash) *types.Header

// blockRetrievalFn is a callback type for retrieving a block from the local chain.
type blockRetrievalFn func(common.Hash) *types.Block

Expand All @@ -96,9 +93,6 @@ type chainHeightFn func() uint64
// chainFinalizedHeightFn is a callback type to retrieve the current chain finalized height.
type chainFinalizedHeightFn func() uint64

// headersInsertFn is a callback type to insert a batch of headers into the local chain.
type headersInsertFn func(headers []*types.Header) (int, error)

// chainInsertFn is a callback type to insert a batch of blocks into the local chain.
type chainInsertFn func(types.Blocks) (int, error)

Expand Down Expand Up @@ -163,8 +157,6 @@ func (inject *blockOrHeaderInject) hash() common.Hash {
// BlockFetcher is responsible for accumulating block announcements from various peers
// and scheduling them for retrieval.
type BlockFetcher struct {
light bool // The indicator whether it's a light fetcher or normal one.

// Various event channels
notify chan *blockAnnounce
inject chan *blockOrHeaderInject
Expand All @@ -190,13 +182,11 @@ type BlockFetcher struct {
queued map[common.Hash]*blockOrHeaderInject // Set of already queued blocks (to dedup imports)

// Callbacks
getHeader HeaderRetrievalFn // Retrieves a header from the local chain
getBlock blockRetrievalFn // Retrieves a block from the local chain
verifyHeader headerVerifierFn // Checks if a block's headers have a valid proof of work
broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers
chainHeight chainHeightFn // Retrieves the current chain's height
chainFinalizedHeight chainFinalizedHeightFn // Retrieves the current chain's finalized height
insertHeaders headersInsertFn // Injects a batch of headers into the chain
insertChain chainInsertFn // Injects a batch of blocks into the chain
dropPeer peerDropFn // Drops a peer for misbehaving

Expand All @@ -209,11 +199,9 @@ type BlockFetcher struct {
}

// NewBlockFetcher creates a block fetcher to retrieve blocks based on hash announcements.
func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetrievalFn, verifyHeader headerVerifierFn,
broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, chainFinalizedHeight chainFinalizedHeightFn,
insertHeaders headersInsertFn, insertChain chainInsertFn, dropPeer peerDropFn) *BlockFetcher {
func NewBlockFetcher(getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn,
chainHeight chainHeightFn, chainFinalizedHeight chainFinalizedHeightFn, insertChain chainInsertFn, dropPeer peerDropFn) *BlockFetcher {
return &BlockFetcher{
light: light,
notify: make(chan *blockAnnounce),
inject: make(chan *blockOrHeaderInject),
headerFilter: make(chan chan *headerFilterTask),
Expand All @@ -229,13 +217,11 @@ func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetr
queue: prque.New[int64, *blockOrHeaderInject](nil),
queues: make(map[string]int),
queued: make(map[common.Hash]*blockOrHeaderInject),
getHeader: getHeader,
getBlock: getBlock,
verifyHeader: verifyHeader,
broadcastBlock: broadcastBlock,
chainHeight: chainHeight,
chainFinalizedHeight: chainFinalizedHeight,
insertHeaders: insertHeaders,
insertChain: insertChain,
dropPeer: dropPeer,
}
Expand Down Expand Up @@ -382,15 +368,11 @@ func (f *BlockFetcher) loop() {
}
// Otherwise if fresh and still unknown, try and import
finalizedHeight := f.chainFinalizedHeight()
if (number+maxUncleDist < height) || number <= finalizedHeight || (f.light && f.getHeader(hash) != nil) || (!f.light && f.getBlock(hash) != nil) {
if (number+maxUncleDist < height) || number <= finalizedHeight || f.getBlock(hash) != nil {
f.forgetBlock(hash)
continue
}
if f.light {
f.importHeaders(op)
} else {
f.importBlocks(op)
}
f.importBlocks(op)
}
// Wait for an outside event to occur
select {
Expand Down Expand Up @@ -457,12 +439,6 @@ func (f *BlockFetcher) loop() {
case op := <-f.inject:
// A direct block insertion was requested, try and fill any pending gaps
blockBroadcastInMeter.Mark(1)

// Now only direct block injection is allowed, drop the header injection
// here silently if we receive.
if f.light {
continue
}
f.enqueue(op.origin, nil, op.block)

case hash := <-f.done:
Expand All @@ -478,16 +454,13 @@ func (f *BlockFetcher) loop() {
// In current LES protocol(les2/les3), only header announce is
// available, no need to wait too much time for header broadcast.
timeout := arriveTimeout - gatherSlack
if f.light {
timeout = 0
}
if time.Since(announces[0].time) > timeout {
// Pick a random peer to retrieve from, reset all others
announce := announces[rand.Intn(len(announces))]
f.forgetHash(hash)

// If the block still didn't arrive, queue for fetching
if (f.light && f.getHeader(hash) == nil) || (!f.light && f.getBlock(hash) == nil) {
if f.getBlock(hash) == nil {
request[announce.origin] = append(request[announce.origin], hash)
f.fetching[hash] = announce
}
Expand Down Expand Up @@ -621,16 +594,6 @@ func (f *BlockFetcher) loop() {
f.forgetHash(hash)
continue
}
// Collect all headers only if we are running in light
// mode and the headers are not imported by other means.
if f.light {
if f.getHeader(hash) == nil {
announce.header = header
lightHeaders = append(lightHeaders, announce)
}
f.forgetHash(hash)
continue
}
// Only keep if not imported by other means
if f.getBlock(hash) == nil {
announce.header = header
Expand Down Expand Up @@ -766,12 +729,6 @@ func (f *BlockFetcher) rescheduleFetch(fetch *time.Timer) {
if len(f.announced) == 0 {
return
}
// Schedule announcement retrieval quickly for light mode
// since server won't send any headers to client.
if f.light {
fetch.Reset(lightTimeout)
return
}
// Otherwise find the earliest expiring announcement
earliest := time.Now()
for _, announces := range f.announced {
Expand Down Expand Up @@ -851,46 +808,6 @@ func (f *BlockFetcher) enqueue(peer string, header *types.Header, block *types.B
}
}

// importHeaders spawns a new goroutine to run a header insertion into the chain.
// If the header's number is at the same height as the current import phase, it
// updates the phase states accordingly.
func (f *BlockFetcher) importHeaders(op *blockOrHeaderInject) {
peer := op.origin
header := op.header
hash := header.Hash()
log.Debug("Importing propagated header", "peer", peer, "number", header.Number, "hash", hash)

go func() {
// If the parent's unknown, abort insertion
parent := f.getHeader(header.ParentHash)
if parent == nil {
log.Debug("Unknown parent of propagated header", "peer", peer, "number", header.Number, "hash", hash, "parent", header.ParentHash)
// forget block first, then re-queue
f.done <- hash
time.Sleep(reQueueBlockTimeout)
f.requeue <- op
return
}

defer func() { f.done <- hash }()
// Validate the header and if something went wrong, drop the peer
if err := f.verifyHeader(header); err != nil && err != consensus.ErrFutureBlock {
log.Debug("Propagated header verification failed", "peer", peer, "number", header.Number, "hash", hash, "err", err)
f.dropPeer(peer)
return
}
// Run the actual import and log any issues
if _, err := f.insertHeaders([]*types.Header{header}); err != nil {
log.Debug("Propagated header import failed", "peer", peer, "number", header.Number, "hash", hash, "err", err)
return
}
// Invoke the testing hook if needed
if f.importedHook != nil {
f.importedHook(header, nil)
}
}()
}

// importBlocks spawns a new goroutine to run a block insertion into the chain. If the
// block's number is at the same height as the current import phase, it updates
// the phase states accordingly.
Expand Down
Loading

0 comments on commit 87055b5

Please sign in to comment.