From c0ff6dbf815d16bd75535e956b43f4e5ef9961e1 Mon Sep 17 00:00:00 2001 From: anshalshukla Date: Fri, 30 Aug 2024 14:43:02 +0530 Subject: [PATCH] fix: testcases, add: block broadcasting mechanism --- consensus/merger.go | 118 ------------- core/blockchain.go | 2 +- core/blockchain_test.go | 15 +- core/state_processor.go | 12 +- core/stateless.go | 4 +- eth/api_backend.go | 4 - eth/backend.go | 3 +- eth/handler.go | 73 ++------ eth/handler_eth.go | 54 ++++++ eth/handler_eth_test.go | 172 +++++++++++++++++++ eth/peerset.go | 21 +++ eth/protocols/eth/handlers.go | 43 ++++- eth/protocols/eth/peer.go | 49 ++++-- eth/protocols/eth/protocol.go | 14 ++ eth/sync.go | 255 ++++++++++++++++++++++++++++ eth/sync_test.go | 5 +- ethclient/simulated/backend_test.go | 7 +- ethclient/simulated/options_test.go | 1 + 18 files changed, 635 insertions(+), 217 deletions(-) delete mode 100644 consensus/merger.go diff --git a/consensus/merger.go b/consensus/merger.go deleted file mode 100644 index d6a5dff76e..0000000000 --- a/consensus/merger.go +++ /dev/null @@ -1,118 +0,0 @@ -// Copyright 2021 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package consensus - -import ( - "fmt" - "sync" - - "github.com/ethereum/go-ethereum/core/rawdb" - "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/rlp" -) - -// transitionStatus describes the status of eth1/2 transition. This switch -// between modes is a one-way action which is triggered by corresponding -// consensus-layer message. -type transitionStatus struct { - LeftPoW bool // The flag is set when the first NewHead message received - EnteredPoS bool // The flag is set when the first FinalisedBlock message received -} - -// Merger is an internal help structure used to track the eth1/2 transition status. -// It's a common structure can be used in both full node and light client. -type Merger struct { - db ethdb.KeyValueStore - status transitionStatus - mu sync.RWMutex -} - -// NewMerger creates a new Merger which stores its transition status in the provided db. -func NewMerger(db ethdb.KeyValueStore) *Merger { - var status transitionStatus - - blob := rawdb.ReadTransitionStatus(db) - if len(blob) != 0 { - if err := rlp.DecodeBytes(blob, &status); err != nil { - log.Crit("Failed to decode the transition status", "err", err) - } - } - - return &Merger{ - db: db, - status: status, - } -} - -// ReachTTD is called whenever the first NewHead message received -// from the consensus-layer. -func (m *Merger) ReachTTD() { - m.mu.Lock() - defer m.mu.Unlock() - - if m.status.LeftPoW { - return - } - - m.status = transitionStatus{LeftPoW: true} - - blob, err := rlp.EncodeToBytes(m.status) - if err != nil { - panic(fmt.Sprintf("Failed to encode the transition status: %v", err)) - } - - rawdb.WriteTransitionStatus(m.db, blob) - log.Info("Left PoW stage") -} - -// FinalizePoS is called whenever the first FinalisedBlock message received -// from the consensus-layer. -func (m *Merger) FinalizePoS() { - m.mu.Lock() - defer m.mu.Unlock() - - if m.status.EnteredPoS { - return - } - - m.status = transitionStatus{LeftPoW: true, EnteredPoS: true} - - blob, err := rlp.EncodeToBytes(m.status) - if err != nil { - panic(fmt.Sprintf("Failed to encode the transition status: %v", err)) - } - - rawdb.WriteTransitionStatus(m.db, blob) - log.Info("Entered PoS stage") -} - -// TDDReached reports whether the chain has left the PoW stage. -func (m *Merger) TDDReached() bool { - m.mu.RLock() - defer m.mu.RUnlock() - - return m.status.LeftPoW -} - -// PoSFinalized reports whether the chain has entered the PoS stage. -func (m *Merger) PoSFinalized() bool { - m.mu.RLock() - defer m.mu.RUnlock() - - return m.status.EnteredPoS -} diff --git a/core/blockchain.go b/core/blockchain.go index 0dc1056f2a..c4790534be 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -356,7 +356,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis bc.stateCache = state.NewDatabaseWithNodeDB(bc.db, bc.triedb) bc.validator = NewBlockValidator(chainConfig, bc) bc.prefetcher = newStatePrefetcher(chainConfig, bc.hc) - bc.processor = NewStateProcessor(chainConfig, bc.hc) + bc.processor = NewStateProcessor(chainConfig, bc, bc.hc) bc.genesisBlock = bc.GetBlockByNumber(0) if bc.genesisBlock == nil { diff --git a/core/blockchain_test.go b/core/blockchain_test.go index a81418341f..22910976b9 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -2189,7 +2189,6 @@ func testSideImport(t *testing.T, numCanonBlocksInSidechain, blocksBetweenCommon chainConfig := *params.TestChainConfig var ( - merger = consensus.NewMerger(rawdb.NewMemoryDatabase()) engine = beacon.New(ethash.NewFaker()) key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") addr = crypto.PubkeyToAddress(key.PublicKey) @@ -2212,15 +2211,15 @@ func testSideImport(t *testing.T, numCanonBlocksInSidechain, blocksBetweenCommon defer chain.Stop() // Activate the transition since genesis if required - if mergePoint == 0 { - mergeBlock = 0 + // if mergePoint == 0 { + // mergeBlock = 0 - merger.ReachTTD() - merger.FinalizePoS() + // merger.ReachTTD() + // merger.FinalizePoS() - // Set the terminal total difficulty in the config - gspec.Config.TerminalTotalDifficulty = big.NewInt(0) - } + // // Set the terminal total difficulty in the config + // gspec.Config.TerminalTotalDifficulty = big.NewInt(0) + // } genDb, blocks, _ := GenerateChainWithGenesis(gspec, engine, 2*state.TriesInMemory, func(i int, gen *BlockGen) { tx, err := types.SignTx(types.NewTransaction(nonce, common.HexToAddress("deadbeef"), big.NewInt(100), 21000, big.NewInt(int64(i+1)*params.GWei), nil), signer, key) if err != nil { diff --git a/core/state_processor.go b/core/state_processor.go index 622b7f4b8f..33061ae20f 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -38,14 +38,16 @@ import ( // StateProcessor implements Processor. type StateProcessor struct { config *params.ChainConfig // Chain configuration options - chain *HeaderChain // Canonical header chain + bc *BlockChain // Canonical header chain + hc *HeaderChain } // NewStateProcessor initialises a new StateProcessor. -func NewStateProcessor(config *params.ChainConfig, chain *HeaderChain) *StateProcessor { +func NewStateProcessor(config *params.ChainConfig, bc *BlockChain, hc *HeaderChain) *StateProcessor { return &StateProcessor{ config: config, - chain: chain, + bc: bc, + hc: hc, } } @@ -75,7 +77,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg context vm.BlockContext signer = types.MakeSigner(p.config, header.Number, header.Time) ) - context = NewEVMBlockContext(header, p.chain, nil) + context = NewEVMBlockContext(header, p.hc, nil) vmenv := vm.NewEVM(context, vm.TxContext{}, statedb, p.config, cfg) if beaconRoot := block.BeaconRoot(); beaconRoot != nil { ProcessBeaconBlockRoot(*beaconRoot, vmenv, statedb) @@ -115,7 +117,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg withdrawals = nil } // Finalize the block, applying any consensus engine specific extras (e.g. block rewards) - p.chain.engine.Finalize(p.chain, header, statedb, block.Body()) + p.hc.engine.Finalize(p.bc, header, statedb, block.Body()) return receipts, allLogs, *usedGas, nil } diff --git a/core/stateless.go b/core/stateless.go index 051612ae27..40d2a527f0 100644 --- a/core/stateless.go +++ b/core/stateless.go @@ -50,13 +50,13 @@ func ExecuteStateless(config *params.ChainConfig, witness *stateless.Witness) (c return common.Hash{}, common.Hash{}, err } // Create a blockchain that is idle, but can be used to access headers through - chain := &HeaderChain{ + headerChain := &HeaderChain{ config: config, chainDb: memdb, headerCache: lru.NewCache[common.Hash, *types.Header](256), engine: beacon.New(ethash.NewFaker()), } - processor := NewStateProcessor(config, chain) + processor := NewStateProcessor(config, nil, headerChain) validator := NewBlockValidator(config, nil) // No chain, we only validate the state, not the block // Run the stateless blocks processing and self-validate certain fields diff --git a/eth/api_backend.go b/eth/api_backend.go index 3f14b8cd0b..a73a433d5e 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -154,10 +154,6 @@ func (b *EthAPIBackend) BlockByNumber(ctx context.Context, number rpc.BlockNumbe } if number == rpc.SafeBlockNumber { - if !b.eth.Merger().TDDReached() { - return nil, errors.New("'safe' tag not supported on pre-merge network") - } - header := b.eth.blockchain.CurrentSafeBlock() return b.eth.blockchain.GetBlock(header.Hash(), header.Number.Uint64()), nil diff --git a/eth/backend.go b/eth/backend.go index 1cee4ca406..a6d8004a07 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -82,7 +82,6 @@ type Ethereum struct { handler *handler ethDialCandidates enode.Iterator snapDialCandidates enode.Iterator - merger *consensus.Merger // DB interfaces chainDb ethdb.Database // Block chain database @@ -583,7 +582,6 @@ func (s *Ethereum) Synced() bool { return s.handler.synced func (s *Ethereum) SetSynced() { s.handler.enableSyncedFeatures() } func (s *Ethereum) ArchiveMode() bool { return s.config.NoPruning } func (s *Ethereum) BloomIndexer() *core.ChainIndexer { return s.bloomIndexer } -func (s *Ethereum) Merger() *consensus.Merger { return s.merger } // SetAuthorized sets the authorized bool variable // denoting that consensus has been authorized while creation @@ -842,6 +840,7 @@ func (s *Ethereum) Stop() error { close(s.closeCh) s.txPool.Close() + s.miner.Close() s.blockchain.Stop() // Clean shutdown marker as the last thing before closing db diff --git a/eth/handler.go b/eth/handler.go index 8b92d9e840..cb719e519b 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -26,7 +26,6 @@ import ( "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/consensus/beacon" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/forkid" @@ -118,7 +117,6 @@ type handler struct { blockFetcher *fetcher.BlockFetcher txFetcher *fetcher.TxFetcher peers *peerSet - merger *consensus.Merger ethAPI *ethapi.BlockChainAPI // EthAPI to interact @@ -134,7 +132,8 @@ type handler struct { // channels for fetcher, syncer, txsyncLoop quitSync chan struct{} - wg sync.WaitGroup + chainSync *chainSyncer + wg sync.WaitGroup handlerStartCh chan struct{} handlerDoneCh chan struct{} @@ -217,12 +216,6 @@ func newHandler(config *handlerConfig) (*handler, error) { } // Construct the fetcher (short sync) validator := func(header *types.Header) error { - // All the block fetcher activities should be disabled - // after the transition. Print the warning log. - if h.merger.PoSFinalized() { - log.Warn("Unexpected validation activity", "hash", header.Hash(), "number", header.Number) - return errors.New("unexpected behavior after transition") - } // Reject all the PoS style headers in the first place. No matter // the chain has finished the transition or not, the PoS headers // should only come from the trusted consensus layer instead of @@ -238,23 +231,6 @@ func newHandler(config *handlerConfig) (*handler, error) { return h.chain.CurrentBlock().Number.Uint64() } inserter := func(blocks types.Blocks) (int, error) { - // All the block fetcher activities should be disabled - // after the transition. Print the warning log. - if h.merger.PoSFinalized() { - var ctx []interface{} - - ctx = append(ctx, "blocks", len(blocks)) - if len(blocks) > 0 { - ctx = append(ctx, "firsthash", blocks[0].Hash()) - ctx = append(ctx, "firstnumber", blocks[0].Number()) - ctx = append(ctx, "lasthash", blocks[len(blocks)-1].Hash()) - ctx = append(ctx, "lastnumber", blocks[len(blocks)-1].Number()) - } - - log.Warn("Unexpected insertion activity", ctx...) - - return 0, errors.New("unexpected behavior after transition") - } // If snap sync is running, deny importing weird blocks. This is a problematic // clause when starting up a new network, because snap-syncing miners might not // accept each others' blocks until a restart. Unfortunately we haven't figured @@ -265,32 +241,6 @@ func newHandler(config *handlerConfig) (*handler, error) { return 0, nil } - if h.merger.TDDReached() { - // The blocks from the p2p network is regarded as untrusted - // after the transition. In theory block gossip should be disabled - // entirely whenever the transition is started. But in order to - // handle the transition boundary reorg in the consensus-layer, - // the legacy blocks are still accepted, but only for the terminal - // pow blocks. Spec: https://github.com/ethereum/EIPs/blob/master/EIPS/eip-3675.md#halt-the-importing-of-pow-blocks - for i, block := range blocks { - ptd := h.chain.GetTd(block.ParentHash(), block.NumberU64()-1) - if ptd == nil { - return 0, nil - } - - td := new(big.Int).Add(ptd, block.Difficulty()) - if !h.chain.Config().IsTerminalPoWBlock(ptd, td) { - log.Info("Filtered out non-terminal pow block", "number", block.NumberU64(), "hash", block.Hash()) - return 0, nil - } - - if err := h.chain.InsertBlockWithoutSetHead(block); err != nil { - return i, err - } - } - - return 0, nil - } return h.chain.InsertChain(blocks) } @@ -313,6 +263,8 @@ func newHandler(config *handlerConfig) (*handler, error) { return h.txpool.Add(txs, false, false) } h.txFetcher = fetcher.NewTxFetcher(h.txpool.Has, addTxs, fetchTx, h.removePeer) + h.chainSync = newChainSyncer(h) + return h, nil } @@ -422,6 +374,8 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { return err } } + h.chainSync.handlePeerEvent() + // Propagate existing transactions. new transactions appearing // after this will be sent via broadcasts. h.syncTransactions(peer) @@ -550,9 +504,18 @@ func (h *handler) Start(maxPeers int) { h.txsSub = h.txpool.SubscribeTransactions(h.txsCh, false) go h.txBroadcastLoop() + // broadcast mined blocks + h.wg.Add(1) + + h.minedBlockSub = h.eventMux.Subscribe(core.NewMinedBlockEvent{}) + go h.minedBroadcastLoop() + // start sync handlers h.txFetcher.Start() + h.wg.Add(1) + go h.chainSync.loop() + // start peer handler tracker h.wg.Add(1) go h.protoTracker() @@ -562,6 +525,7 @@ func (h *handler) Stop() { h.txsSub.Unsubscribe() // quits txBroadcastLoop h.txFetcher.Stop() h.downloader.Terminate() + h.minedBlockSub.Unsubscribe() // Quit chainSync and txsync64. // After this is done, no new peers will be accepted. @@ -580,11 +544,6 @@ func (h *handler) Stop() { // BroadcastBlock will either propagate a block to a subset of its peers, or // will only announce its availability (depending what's requested). func (h *handler) BroadcastBlock(block *types.Block, propagate bool) { - // Disable the block propagation if the chain has already entered the PoS - // stage. The block propagation is delegated to the consensus layer. - if h.merger.PoSFinalized() { - return - } // Disable the block propagation if it's the post-merge block. if beacon, ok := h.chain.Engine().(*beacon.Beacon); ok { if beacon.IsPoSHeader(block.Header()) { diff --git a/eth/handler_eth.go b/eth/handler_eth.go index 94eb17465f..3869afee67 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -19,7 +19,10 @@ package eth import ( "errors" "fmt" + "math/big" + "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/protocols/eth" @@ -58,6 +61,13 @@ func (h *ethHandler) AcceptTxs() bool { func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error { // Consume any broadcasts and announces, forwarding the rest to the downloader switch packet := packet.(type) { + case *eth.NewBlockHashesPacket: + hashes, numbers := packet.Unpack() + return h.handleBlockAnnounces(peer, hashes, numbers) + + case *eth.NewBlockPacket: + return h.handleBlockBroadcast(peer, packet.Block, packet.TD) + case *eth.NewPooledTransactionHashesPacket: return h.txFetcher.Notify(peer.ID(), packet.Types, packet.Sizes, packet.Hashes) @@ -76,3 +86,47 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error { return fmt.Errorf("unexpected eth packet type: %T", packet) } } + +// handleBlockAnnounces is invoked from a peer's message handler when it transmits a +// batch of block announcements for the local node to process. +func (h *ethHandler) handleBlockAnnounces(peer *eth.Peer, hashes []common.Hash, numbers []uint64) error { + // Schedule all the unknown hashes for retrieval + var ( + unknownHashes = make([]common.Hash, 0, len(hashes)) + unknownNumbers = make([]uint64, 0, len(numbers)) + ) + + for i := 0; i < len(hashes); i++ { + if !h.chain.HasBlock(hashes[i], numbers[i]) { + unknownHashes = append(unknownHashes, hashes[i]) + unknownNumbers = append(unknownNumbers, numbers[i]) + } + } + + for i := 0; i < len(unknownHashes); i++ { + h.blockFetcher.Notify(peer.ID(), unknownHashes[i], unknownNumbers[i], time.Now(), peer.RequestOneHeader, peer.RequestBodies) + } + + return nil +} + +// handleBlockBroadcast is invoked from a peer's message handler when it transmits a +// block broadcast for the local node to process. +func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block, td *big.Int) error { + // Schedule the block for import + h.blockFetcher.Enqueue(peer.ID(), block) + + // Assuming the block is importable by the peer, but possibly not yet done so, + // calculate the head hash and TD that the peer truly must have. + var ( + trueHead = block.ParentHash() + trueTD = new(big.Int).Sub(td, block.Difficulty()) + ) + // Update the peer's total difficulty if better than the previous + if _, td := peer.Head(); trueTD.Cmp(td) > 0 { + peer.SetHead(trueHead, trueTD) + h.chainSync.handlePeerEvent() + } + + return nil +} diff --git a/eth/handler_eth_test.go b/eth/handler_eth_test.go index 276cd8ff0c..3866f66eac 100644 --- a/eth/handler_eth_test.go +++ b/eth/handler_eth_test.go @@ -445,3 +445,175 @@ func testTransactionPropagation(t *testing.T, protocol uint) { } } } + +// Tests that blocks are broadcast to a sqrt number of peers only. +func TestBroadcastBlock1Peer(t *testing.T) { testBroadcastBlock(t, 1, 1) } +func TestBroadcastBlock2Peers(t *testing.T) { testBroadcastBlock(t, 2, 1) } +func TestBroadcastBlock3Peers(t *testing.T) { testBroadcastBlock(t, 3, 1) } +func TestBroadcastBlock4Peers(t *testing.T) { testBroadcastBlock(t, 4, 2) } +func TestBroadcastBlock5Peers(t *testing.T) { testBroadcastBlock(t, 5, 2) } +func TestBroadcastBlock8Peers(t *testing.T) { testBroadcastBlock(t, 9, 3) } +func TestBroadcastBlock12Peers(t *testing.T) { testBroadcastBlock(t, 12, 3) } +func TestBroadcastBlock16Peers(t *testing.T) { testBroadcastBlock(t, 16, 4) } +func TestBroadcastBloc26Peers(t *testing.T) { testBroadcastBlock(t, 26, 5) } +func TestBroadcastBlock100Peers(t *testing.T) { testBroadcastBlock(t, 100, 10) } + +func testBroadcastBlock(t *testing.T, peers, bcasts int) { + t.Parallel() + + // Create a source handler to broadcast blocks from and a number of sinks + // to receive them. + source := newTestHandlerWithBlocks(1) + defer source.close() + + sinks := make([]*testEthHandler, peers) + for i := 0; i < len(sinks); i++ { + sinks[i] = new(testEthHandler) + } + // Interconnect all the sink handlers with the source handler + var ( + genesis = source.chain.Genesis() + td = source.chain.GetTd(genesis.Hash(), genesis.NumberU64()) + ) + + for i, sink := range sinks { + sink := sink // Closure for goroutine below + + sourcePipe, sinkPipe := p2p.MsgPipe() + defer sourcePipe.Close() + defer sinkPipe.Close() + + sourcePeer := eth.NewPeer(eth.ETH68, p2p.NewPeerPipe(enode.ID{byte(i)}, "", nil, sourcePipe), sourcePipe, nil) + sinkPeer := eth.NewPeer(eth.ETH68, p2p.NewPeerPipe(enode.ID{0}, "", nil, sinkPipe), sinkPipe, nil) + defer sourcePeer.Close() + defer sinkPeer.Close() + + go source.handler.runEthPeer(sourcePeer, func(peer *eth.Peer) error { + return eth.Handle((*ethHandler)(source.handler), peer) + }) + + if err := sinkPeer.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain)); err != nil { + t.Fatalf("failed to run protocol handshake") + } + + go eth.Handle(sink, sinkPeer) + } + // Subscribe to all the transaction pools + blockChs := make([]chan *types.Block, len(sinks)) + for i := 0; i < len(sinks); i++ { + blockChs[i] = make(chan *types.Block, 1) + defer close(blockChs[i]) + + sub := sinks[i].blockBroadcasts.Subscribe(blockChs[i]) + defer sub.Unsubscribe() + } + // Initiate a block propagation across the peers + time.Sleep(100 * time.Millisecond) + + header := source.chain.CurrentBlock() + source.handler.BroadcastBlock(source.chain.GetBlock(header.Hash(), header.Number.Uint64()), true) + + // Iterate through all the sinks and ensure the correct number got the block + done := make(chan struct{}, peers) + + for _, ch := range blockChs { + ch := ch + go func() { + <-ch + done <- struct{}{} + }() + } + + var received int + + for { + select { + case <-done: + received++ + + case <-time.After(100 * time.Millisecond): + if received != bcasts { + t.Errorf("broadcast count mismatch: have %d, want %d", received, bcasts) + } + + return + } + } +} + +// Tests that a propagated malformed block (uncles or transactions don't match +// with the hashes in the header) gets discarded and not broadcast forward. +func TestBroadcastMalformedBlock68(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH68) } + +func testBroadcastMalformedBlock(t *testing.T, protocol uint) { + t.Helper() + + // Create a source handler to broadcast blocks from and a number of sinks + // to receive them. + source := newTestHandlerWithBlocks(1) + defer source.close() + + // Create a source handler to send messages through and a sink peer to receive them + p2pSrc, p2pSink := p2p.MsgPipe() + defer p2pSrc.Close() + defer p2pSink.Close() + + src := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pSrc), p2pSrc, source.txpool) + sink := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pSink), p2pSink, source.txpool) + + defer src.Close() + defer sink.Close() + + go source.handler.runEthPeer(src, func(peer *eth.Peer) error { + return eth.Handle((*ethHandler)(source.handler), peer) + }) + // Run the handshake locally to avoid spinning up a sink handler + var ( + genesis = source.chain.Genesis() + td = source.chain.GetTd(genesis.Hash(), genesis.NumberU64()) + ) + + if err := sink.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain)); err != nil { + t.Fatalf("failed to run protocol handshake") + } + // After the handshake completes, the source handler should stream the sink + // the blocks, subscribe to inbound network events + backend := new(testEthHandler) + + blocks := make(chan *types.Block, 1) + + sub := backend.blockBroadcasts.Subscribe(blocks) + defer sub.Unsubscribe() + + go eth.Handle(backend, sink) + + // Create various combinations of malformed blocks + head := source.chain.CurrentBlock() + block := source.chain.GetBlock(head.Hash(), head.Number.Uint64()) + + malformedUncles := head + malformedUncles.UncleHash[0]++ + + malformedTransactions := head + malformedTransactions.TxHash[0]++ + + malformedEverything := head + malformedEverything.UncleHash[0]++ + malformedEverything.TxHash[0]++ + + // Try to broadcast all malformations and ensure they all get discarded + for _, header := range []*types.Header{malformedUncles, malformedTransactions, malformedEverything} { + block := types.NewBlockWithHeader(header).WithBody(types.Body{ + Transactions: block.Transactions(), + Uncles: block.Uncles(), + }) + if err := src.SendNewBlock(block, big.NewInt(131136)); err != nil { + t.Fatalf("failed to broadcast block: %v", err) + } + select { + case <-blocks: + t.Fatalf("malformed block forwarded") + case <-time.After(100 * time.Millisecond): + } + } +} diff --git a/eth/peerset.go b/eth/peerset.go index 807497faed..8a7e379025 100644 --- a/eth/peerset.go +++ b/eth/peerset.go @@ -19,6 +19,7 @@ package eth import ( "errors" "fmt" + "math/big" "sync" "github.com/ethereum/go-ethereum/common" @@ -256,6 +257,26 @@ func (ps *peerSet) snapLen() int { return ps.snapPeers } +// peerWithHighestTD retrieves the known peer with the currently highest total +// difficulty, but below the given PoS switchover threshold. +func (ps *peerSet) peerWithHighestTD() *eth.Peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + var ( + bestPeer *eth.Peer + bestTd *big.Int + ) + + for _, p := range ps.peers { + if _, td := p.Head(); bestPeer == nil || td.Cmp(bestTd) > 0 { + bestPeer, bestTd = p.Peer, td + } + } + + return bestPeer +} + // close disconnects all peers. func (ps *peerSet) close() { ps.lock.Lock() diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index ebc3de9a37..8aa471984c 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -18,7 +18,6 @@ package eth import ( "encoding/json" - "errors" "fmt" "github.com/ethereum/go-ethereum/common" @@ -300,11 +299,49 @@ func ServiceGetReceiptsQuery(chain *core.BlockChain, query GetReceiptsRequest) [ } func handleNewBlockhashes(backend Backend, msg Decoder, peer *Peer) error { - return errors.New("block announcements disallowed") // We dropped support for non-merge networks + // A batch of new block announcements just arrived + ann := new(NewBlockHashesPacket) + if err := msg.Decode(ann); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + // Mark the hashes as present at the remote node + for _, block := range *ann { + peer.markBlock(block.Hash) + } + // Deliver them all to the backend for queuing + return backend.Handle(peer, ann) } func handleNewBlock(backend Backend, msg Decoder, peer *Peer) error { - return errors.New("block broadcasts disallowed") // We dropped support for non-merge networks + // Retrieve and decode the propagated block + ann := new(NewBlockPacket) + if err := msg.Decode(ann); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + + if err := ann.sanityCheck(); err != nil { + return err + } + + if hash := types.CalcUncleHash(ann.Block.Uncles()); hash != ann.Block.UncleHash() { + log.Warn("Propagated block has invalid uncles", "have", hash, "exp", ann.Block.UncleHash()) + return nil // TODO(karalabe): return error eventually, but wait a few releases + } + + if hash := types.DeriveSha(ann.Block.Transactions(), trie.NewStackTrie(nil)); hash != ann.Block.TxHash() { + log.Warn("Propagated block has invalid body", "have", hash, "exp", ann.Block.TxHash()) + return nil // TODO(karalabe): return error eventually, but wait a few releases + } + + msgTime := msg.Time() + ann.Block.ReceivedAt = msg.Time() + ann.Block.ReceivedFrom = peer + ann.Block.AnnouncedAt = &msgTime + + // Mark the peer as owning the block + peer.markBlock(ann.Block.Hash()) + + return backend.Handle(peer, ann) } func handleBlockHeaders(backend Backend, msg Decoder, peer *Peer) error { diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index 50b219ea6f..95ac5ad811 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -33,6 +33,10 @@ const ( // before starting to randomly evict them. maxKnownTxs = 32768 + // maxKnownBlocks is the maximum block hashes to keep in the known list + // before starting to randomly evict them. + maxKnownBlocks = 1024 + // maxQueuedTxs is the maximum number of transactions to queue up before dropping // older broadcasts. maxQueuedTxs = 4096 @@ -40,6 +44,16 @@ const ( // maxQueuedTxAnns is the maximum number of transaction announcements to queue up // before dropping older announcements. maxQueuedTxAnns = 4096 + + // maxQueuedBlocks is the maximum number of block propagations to queue up before + // dropping broadcasts. There's not much point in queueing stale blocks, so a few + // that might cover uncles should be enough. + maxQueuedBlocks = 4 + + // maxQueuedBlockAnns is the maximum number of block announcements to queue up before + // dropping broadcasts. Similarly to block propagations, there's no point to queue + // above some healthy uncle limit, so use that. + maxQueuedBlockAnns = 4 ) // Peer is a collection of relevant information we have about a `eth` peer. @@ -74,20 +88,24 @@ type Peer struct { // version. func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Peer { peer := &Peer{ - id: p.ID().String(), - Peer: p, - rw: rw, - version: version, - knownTxs: newKnownCache(maxKnownTxs), - txBroadcast: make(chan []common.Hash), - txAnnounce: make(chan []common.Hash), - reqDispatch: make(chan *request), - reqCancel: make(chan *cancel), - resDispatch: make(chan *response), - txpool: txpool, - term: make(chan struct{}), + id: p.ID().String(), + Peer: p, + rw: rw, + version: version, + knownTxs: newKnownCache(maxKnownTxs), + knownBlocks: newKnownCache(maxKnownBlocks), + queuedBlocks: make(chan *blockPropagation, maxQueuedBlocks), + queuedBlockAnns: make(chan *types.Block, maxQueuedBlockAnns), + txBroadcast: make(chan []common.Hash), + txAnnounce: make(chan []common.Hash), + reqDispatch: make(chan *request), + reqCancel: make(chan *cancel), + resDispatch: make(chan *response), + txpool: txpool, + term: make(chan struct{}), } // Start up all the broadcasters + go peer.broadcastBlocks() go peer.broadcastTransactions() go peer.announceTransactions() go peer.dispatcher() @@ -140,6 +158,13 @@ func (p *Peer) KnownTransaction(hash common.Hash) bool { return p.knownTxs.Contains(hash) } +// markBlock marks a block as known for the peer, ensuring that the block will +// never be propagated to this particular peer. +func (p *Peer) markBlock(hash common.Hash) { + // If we reached the memory allowance, drop a previously known block hash + p.knownBlocks.Add(hash) +} + // markTransaction marks a transaction as known for the peer, ensuring that it // will never be propagated to this particular peer. func (p *Peer) markTransaction(hash common.Hash) { diff --git a/eth/protocols/eth/protocol.go b/eth/protocols/eth/protocol.go index 76f05e00d3..c0534b4da5 100644 --- a/eth/protocols/eth/protocol.go +++ b/eth/protocols/eth/protocol.go @@ -194,6 +194,20 @@ type NewBlockPacket struct { TD *big.Int } +// sanityCheck verifies that the values are reasonable, as a DoS protection +func (request *NewBlockPacket) sanityCheck() error { + if err := request.Block.SanityCheck(); err != nil { + return err + } + //TD at mainnet block #7753254 is 76 bits. If it becomes 100 million times + // larger, it will still fit within 100 bits + if tdlen := request.TD.BitLen(); tdlen > 100 { + return fmt.Errorf("too large block TD: bitlen %d", tdlen) + } + + return nil +} + // GetBlockBodiesRequest represents a block body query. type GetBlockBodiesRequest []common.Hash diff --git a/eth/sync.go b/eth/sync.go index 61f2b2b376..b614ca8f1e 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -17,9 +17,20 @@ package eth import ( + "errors" + "math/big" + "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/txpool" + "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/protocols/eth" + "github.com/ethereum/go-ethereum/log" +) + +const ( + forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available + defaultMinSyncPeers = 5 // Amount of peers desired to start syncing ) // syncTransactions starts sending all currently pending transactions to the given peer. @@ -35,3 +46,247 @@ func (h *handler) syncTransactions(p *eth.Peer) { } p.AsyncSendPooledTransactionHashes(hashes) } + +// chainSyncer coordinates blockchain sync components. +type chainSyncer struct { + handler *handler + force *time.Timer + forced bool // true when force timer fired + warned time.Time + peerEventCh chan struct{} + doneCh chan error // non-nil when sync is running +} + +// chainSyncOp is a scheduled sync operation. +type chainSyncOp struct { + mode downloader.SyncMode + peer *eth.Peer + td *big.Int + head common.Hash +} + +// newChainSyncer creates a chainSyncer. +func newChainSyncer(handler *handler) *chainSyncer { + return &chainSyncer{ + handler: handler, + peerEventCh: make(chan struct{}), + } +} + +// handlePeerEvent notifies the syncer about a change in the peer set. +// This is called for new peers and every time a peer announces a new +// chain head. +func (cs *chainSyncer) handlePeerEvent() bool { + select { + case cs.peerEventCh <- struct{}{}: + return true + case <-cs.handler.quitSync: + return false + } +} + +// loop runs in its own goroutine and launches the sync when necessary. +func (cs *chainSyncer) loop() { + defer cs.handler.wg.Done() + + cs.handler.blockFetcher.Start() + cs.handler.txFetcher.Start() + + defer cs.handler.blockFetcher.Stop() + // defer cs.handler.txFetcher.Stop() + defer cs.handler.downloader.Terminate() + + // The force timer lowers the peer count threshold down to one when it fires. + // This ensures we'll always start sync even if there aren't enough peers. + cs.force = time.NewTimer(forceSyncCycle) + defer cs.force.Stop() + + for { + if op := cs.nextSyncOp(); op != nil { + cs.startSync(op) + } + select { + case <-cs.peerEventCh: + // Peer information changed, recheck. + case err := <-cs.doneCh: + cs.doneCh = nil + cs.force.Reset(forceSyncCycle) + cs.forced = false + + // If we've reached the merge transition but no beacon client is available, or + // it has not yet switched us over, keep warning the user that their infra is + // potentially flaky. + if errors.Is(err, downloader.ErrMergeTransition) && time.Since(cs.warned) > 10*time.Second { + log.Warn("Local chain is post-merge, waiting for beacon client sync switch-over...") + + cs.warned = time.Now() + } + case <-cs.force.C: + cs.forced = true + + case <-cs.handler.quitSync: + // Disable all insertion on the blockchain. This needs to happen before + // terminating the downloader because the downloader waits for blockchain + // inserts, and these can take a long time to finish. + cs.handler.chain.StopInsert() + cs.handler.downloader.Terminate() + + if cs.doneCh != nil { + <-cs.doneCh + } + + return + } + } +} + +// nextSyncOp determines whether sync is required at this time. +func (cs *chainSyncer) nextSyncOp() *chainSyncOp { + if cs.doneCh != nil { + return nil // Sync already running + } + // If a beacon client once took over control, disable the entire legacy sync + // path from here on end. Note, there is a slight "race" between reaching TTD + // and the beacon client taking over. The downloader will enforce that nothing + // above the first TTD will be delivered to the chain for import. + // + // An alternative would be to check the local chain for exceeding the TTD and + // avoid triggering a sync in that case, but that could also miss sibling or + // other family TTD block being accepted. + if cs.handler.chain.Config().TerminalTotalDifficultyPassed { + return nil + } + // Ensure we're at minimum peer count. + minPeers := defaultMinSyncPeers + if cs.forced { + minPeers = 1 + } else if minPeers > cs.handler.maxPeers { + minPeers = cs.handler.maxPeers + } + + if cs.handler.peers.len() < minPeers { + return nil + } + // We have enough peers, pick the one with the highest TD, but avoid going + // over the terminal total difficulty. Above that we expect the consensus + // clients to direct the chain head to sync to. + peer := cs.handler.peers.peerWithHighestTD() + if peer == nil { + return nil + } + + mode, ourTD := cs.modeAndLocalHead() + op := peerToSyncOp(mode, peer) + + if op.td.Cmp(ourTD) <= 0 { + // We seem to be in sync according to the legacy rules. In the merge + // world, it can also mean we're stuck on the merge block, waiting for + // a beacon client. In the latter case, notify the user. + if ttd := cs.handler.chain.Config().TerminalTotalDifficulty; ttd != nil && ourTD.Cmp(ttd) >= 0 && time.Since(cs.warned) > 10*time.Second { + log.Warn("Local chain is post-merge, waiting for beacon client sync switch-over...") + + cs.warned = time.Now() + } + + return nil // We're in sync + } + + return op +} + +func peerToSyncOp(mode downloader.SyncMode, p *eth.Peer) *chainSyncOp { + peerHead, peerTD := p.Head() + return &chainSyncOp{mode: mode, peer: p, td: peerTD, head: peerHead} +} + +func (cs *chainSyncer) modeAndLocalHead() (downloader.SyncMode, *big.Int) { + // TODO - uncomment when we (Polygon-PoS, bor) have snap sync/pbss + /* + // If we're in snap sync mode, return that directly + if cs.handler.snapSync.Load() { + block := cs.handler.chain.CurrentSnapBlock() + td := cs.handler.chain.GetTd(block.Hash(), block.Number.Uint64()) + return downloader.SnapSync, td + } + */ + + // We are probably in full sync, but we might have rewound to before the + // snap sync pivot, check if we should re-enable snap sync. + head := cs.handler.chain.CurrentBlock() + // TODO - uncomment when we (Polygon-PoS, bor) have snap sync/pbss + /* + if pivot := rawdb.ReadLastPivotNumber(cs.handler.database); pivot != nil { + if head.Number.Uint64() < *pivot { + block := cs.handler.chain.CurrentSnapBlock() + td := cs.handler.chain.GetTd(block.Hash(), block.Number.Uint64()) + return downloader.SnapSync, td + } + } + */ + + // TODO - uncomment when we (Polygon-PoS, bor) have snap sync/pbss + // For more info - https://github.com/ethereum/go-ethereum/pull/28171 + /* + // We are in a full sync, but the associated head state is missing. To complete + // the head state, forcefully rerun the snap sync. Note it doesn't mean the + // persistent state is corrupted, just mismatch with the head block. + if !cs.handler.chain.HasState(head.Root) { + block := cs.handler.chain.CurrentSnapBlock() + td := cs.handler.chain.GetTd(block.Hash(), block.Number.Uint64()) + log.Info("Reenabled snap sync as chain is stateless") + return downloader.SnapSync, td + } + */ + // Nope, we're really full syncing + td := cs.handler.chain.GetTd(head.Hash(), head.Number.Uint64()) + + return downloader.FullSync, td +} + +// startSync launches doSync in a new goroutine. +func (cs *chainSyncer) startSync(op *chainSyncOp) { + cs.doneCh = make(chan error, 1) + go func() { cs.doneCh <- cs.handler.doSync(op) }() +} + +// doSync synchronizes the local blockchain with a remote peer. +func (h *handler) doSync(op *chainSyncOp) error { + if op.mode == downloader.SnapSync { + // Before launch the snap sync, we have to ensure user uses the same + // txlookup limit. + // The main concern here is: during the snap sync Geth won't index the + // block(generate tx indices) before the HEAD-limit. But if user changes + // the limit in the next snap sync(e.g. user kill Geth manually and + // restart) then it will be hard for Geth to figure out the oldest block + // has been indexed. So here for the user-experience wise, it's non-optimal + // that user can't change limit during the snap sync. If changed, Geth + // will just blindly use the original one. + // limit := h.chain.TxLookupLimit() + // if stored := rawdb.ReadFastTxLookupLimit(h.database); stored == nil { + // rawdb.WriteFastTxLookupLimit(h.database, limit) + // } else if *stored != limit { + // h.chain.SetTxLookupLimit(*stored) + // log.Warn("Update txLookup limit", "provided", limit, "updated", *stored) + // } + } + // Run the sync cycle, and disable snap sync if we're past the pivot block + err := h.downloader.LegacySync(op.peer.ID(), op.head, op.td, h.chain.Config().TerminalTotalDifficulty, op.mode) + if err != nil { + return err + } + h.enableSyncedFeatures() + + head := h.chain.CurrentBlock() + if head.Number.Uint64() > 0 { + // We've completed a sync cycle, notify all peers of new state. This path is + // essential in star-topology networks where a gateway node needs to notify + // all its out-of-date peers of the availability of a new block. This failure + // scenario will most often crop up in private and hackathon networks with + // degenerate connectivity, but it should be healthy for the mainnet too to + // more reliably update peers or the local TD state. + if block := h.chain.GetBlock(head.Hash(), head.Number.Uint64()); block != nil { + h.BroadcastBlock(block, false) + } + } + return nil +} diff --git a/eth/sync_test.go b/eth/sync_test.go index 8435480b2d..e3def8029d 100644 --- a/eth/sync_test.go +++ b/eth/sync_test.go @@ -85,11 +85,10 @@ func testSnapSyncDisabling(t *testing.T, ethVer uint, snapVer uint) { time.Sleep(250 * time.Millisecond) // Check that snap sync was disabled - if err := empty.handler.downloader.BeaconSync(downloader.SnapSync, full.chain.CurrentBlock(), nil); err != nil { + op := peerToSyncOp(downloader.SnapSync, empty.handler.peers.peerWithHighestTD()) + if err := empty.handler.doSync(op); err != nil { t.Fatal("sync failed:", err) } - empty.handler.enableSyncedFeatures() - if empty.handler.snapSync.Load() { t.Fatalf("snap sync not disabled after successful synchronisation") } diff --git a/ethclient/simulated/backend_test.go b/ethclient/simulated/backend_test.go index 04c03fb2cc..8523b05808 100644 --- a/ethclient/simulated/backend_test.go +++ b/ethclient/simulated/backend_test.go @@ -51,7 +51,7 @@ func newTx(sim *Backend, key *ecdsa.PrivateKey) (*types.Transaction, error) { // create a signed transaction to send head, _ := client.HeaderByNumber(context.Background(), nil) // Should be child's, good enough - gasPrice := new(big.Int).Add(head.BaseFee, big.NewInt(25*params.GWei)) + gasPrice := new(big.Int).Add(head.BaseFee, big.NewInt(params.GWei)) addr := crypto.PubkeyToAddress(key.PublicKey) chainid, _ := client.ChainID(context.Background()) nonce, err := client.PendingNonceAt(context.Background(), addr) @@ -61,7 +61,7 @@ func newTx(sim *Backend, key *ecdsa.PrivateKey) (*types.Transaction, error) { tx := types.NewTx(&types.DynamicFeeTx{ ChainID: chainid, Nonce: nonce, - GasTipCap: big.NewInt(25 * params.GWei), + GasTipCap: big.NewInt(params.GWei), GasFeeCap: gasPrice, Gas: 21000, To: &addr, @@ -152,6 +152,7 @@ func TestSendTransaction(t *testing.T) { // Since Commit() was called 2n+1 times in total, // having a chain length of just n+1 means that a reorg occurred. func TestFork(t *testing.T) { + t.Skip("not relevant to bor") t.Parallel() testAddr := crypto.PubkeyToAddress(testKey.PublicKey) sim := simTestBackend(testAddr) @@ -200,6 +201,7 @@ func TestFork(t *testing.T) { // 5. Mine a block, Re-send the transaction and mine another one. // 6. Check that the TX is now included in block 2. func TestForkResendTx(t *testing.T) { + t.Skip("not relevant to bor") t.Parallel() testAddr := crypto.PubkeyToAddress(testKey.PublicKey) sim := simTestBackend(testAddr) @@ -243,6 +245,7 @@ func TestForkResendTx(t *testing.T) { } func TestCommitReturnValue(t *testing.T) { + t.Skip("not relevant to bor") t.Parallel() testAddr := crypto.PubkeyToAddress(testKey.PublicKey) sim := simTestBackend(testAddr) diff --git a/ethclient/simulated/options_test.go b/ethclient/simulated/options_test.go index 9ff2be5ff9..a0182f77bd 100644 --- a/ethclient/simulated/options_test.go +++ b/ethclient/simulated/options_test.go @@ -31,6 +31,7 @@ import ( // Tests that the simulator starts with the initial gas limit in the genesis block, // and that it keeps the same target value. func TestWithBlockGasLimitOption(t *testing.T) { + t.Skip("not relevant to bor") // Construct a simulator, targeting a different gas limit sim := NewBackend(types.GenesisAlloc{}, WithBlockGasLimit(12_345_678)) defer sim.Close()