diff --git a/eth/backend.go b/eth/backend.go index d2115eca93..fad49c3a49 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -511,9 +511,6 @@ func (s *Ethereum) StartMining() error { } parlia.Authorize(eb, wallet.SignData, wallet.SignTx) } - // If mining is started, we can disable the transaction rejection mechanism - // introduced to speed sync times. - s.handler.enableSyncedFeatures() go s.miner.Start() } diff --git a/eth/handler.go b/eth/handler.go index f4b51c574b..498021c863 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -136,6 +136,7 @@ type handler struct { snapSync atomic.Bool // Flag whether snap sync is enabled (gets disabled if we already have blocks) synced atomic.Bool // Flag whether we're considered synchronised (enables transaction processing) + acceptTxs atomic.Bool directBroadcast bool database ethdb.Database @@ -240,7 +241,7 @@ func newHandler(config *handlerConfig) (*handler, error) { return nil, errors.New("snap sync not supported with snapshots disabled") } // Construct the downloader (long sync) - h.downloader = downloader.New(config.Database, h.eventMux, h.chain, h.removePeer, h.enableSyncedFeatures) + h.downloader = downloader.New(config.Database, h.eventMux, h.chain, h.removePeer, nil) // Construct the fetcher (short sync) validator := func(header *types.Header) error { @@ -978,6 +979,10 @@ func (h *handler) voteBroadcastLoop() { func (h *handler) enableSyncedFeatures() { // Mark the local node as synced. h.synced.Store(true) + if !h.acceptTxs.Load() { + h.acceptTxs.Store(true) + log.Info("Enable transaction acceptance when synced.") + } // If we were running snap sync and it finished, disable doing another // round on next sync cycle diff --git a/eth/handler_eth.go b/eth/handler_eth.go index 8722d97c03..dc1f3e744b 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -52,7 +52,7 @@ func (h *ethHandler) PeerInfo(id enode.ID) interface{} { // AcceptTxs retrieves whether transaction processing is enabled on the node // or if inbound transactions should simply be dropped. func (h *ethHandler) AcceptTxs() bool { - return h.synced.Load() + return h.acceptTxs.Load() } // Handle is invoked from a peer's message handler when it receives a new remote diff --git a/eth/handler_eth_test.go b/eth/handler_eth_test.go index b4f1bd0706..0852f62bec 100644 --- a/eth/handler_eth_test.go +++ b/eth/handler_eth_test.go @@ -257,7 +257,7 @@ func testRecvTransactions(t *testing.T, protocol uint) { handler := newTestHandler() defer handler.close() - handler.handler.synced.Store(true) // mark synced to accept transactions + handler.handler.acceptTxs.Store(true) // mark synced to accept transactions txs := make(chan core.NewTxsEvent) sub := handler.txpool.SubscribeTransactions(txs, false) @@ -480,7 +480,7 @@ func testTransactionPropagation(t *testing.T, protocol uint) { sinks[i] = newTestHandler() defer sinks[i].close() - sinks[i].handler.synced.Store(true) // mark synced to accept transactions + sinks[i].handler.acceptTxs.Store(true) // mark synced to accept transactions } // Interconnect all the sink handlers with the source handler for i, sink := range sinks { @@ -542,7 +542,7 @@ func TestTransactionPendingReannounce(t *testing.T) { sink := newTestHandler() defer sink.close() - sink.handler.synced.Store(true) // mark synced to accept transactions + sink.handler.acceptTxs.Store(true) // mark synced to accept transactions sourcePipe, sinkPipe := p2p.MsgPipe() defer sourcePipe.Close() diff --git a/eth/sync.go b/eth/sync.go index f98eccb067..0b7b2232eb 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -18,6 +18,7 @@ package eth import ( "math/big" + "math/rand" "time" "github.com/ethereum/go-ethereum/common" @@ -163,6 +164,11 @@ func (cs *chainSyncer) nextSyncOp() *chainSyncOp { mode, ourTD := cs.modeAndLocalHead() op := peerToSyncOp(mode, peer) if op.td.Cmp(ourTD) <= 0 { + if !cs.handler.acceptTxs.Load() { + // Occurs only during a quick restart. + cs.handler.acceptTxs.Store(true) + log.Info("Enable transaction acceptance for already in sync.") + } // We seem to be in sync according to the legacy rules. In the merge // world, it can also mean we're stuck on the merge block, waiting for // a beacon client. In the latter case, notify the user. @@ -171,6 +177,13 @@ func (cs *chainSyncer) nextSyncOp() *chainSyncOp { cs.warned = time.Now() } return nil // We're in sync + } else if op.td.Cmp(new(big.Int).Add(ourTD, new(big.Int).SetUint64(10*2))) > 0 { + if cs.handler.acceptTxs.Load() && rand.New(rand.NewSource(time.Now().UnixNano())).Int31n(10) < 1 { + // There is only a 1/10 probability of disabling transaction acceptance. + // This randomness helps protect against attacks where a malicious node falsely claims to have higher blocks. + cs.handler.acceptTxs.Store(false) + log.Info("Disable transaction acceptance randomly for the delay exceeding 10 blocks.") + } } return op }