Skip to content

Commit

Permalink
eth: make transaction acceptance depends on syncing status
Browse files Browse the repository at this point in the history
  • Loading branch information
buddh0 committed Dec 30, 2024
1 parent 76689de commit 027de39
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 8 deletions.
3 changes: 0 additions & 3 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,9 +511,6 @@ func (s *Ethereum) StartMining() error {
}
parlia.Authorize(eb, wallet.SignData, wallet.SignTx)
}
// If mining is started, we can disable the transaction rejection mechanism
// introduced to speed sync times.
s.handler.enableSyncedFeatures()

go s.miner.Start()
}
Expand Down
7 changes: 6 additions & 1 deletion eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ type handler struct {

snapSync atomic.Bool // Flag whether snap sync is enabled (gets disabled if we already have blocks)
synced atomic.Bool // Flag whether we're considered synchronised (enables transaction processing)
acceptTxs atomic.Bool
directBroadcast bool

database ethdb.Database
Expand Down Expand Up @@ -240,7 +241,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
return nil, errors.New("snap sync not supported with snapshots disabled")
}
// Construct the downloader (long sync)
h.downloader = downloader.New(config.Database, h.eventMux, h.chain, h.removePeer, h.enableSyncedFeatures)
h.downloader = downloader.New(config.Database, h.eventMux, h.chain, h.removePeer, nil)

// Construct the fetcher (short sync)
validator := func(header *types.Header) error {
Expand Down Expand Up @@ -978,6 +979,10 @@ func (h *handler) voteBroadcastLoop() {
func (h *handler) enableSyncedFeatures() {
// Mark the local node as synced.
h.synced.Store(true)
if !h.acceptTxs.Load() {
h.acceptTxs.Store(true)
log.Info("Enable transaction acceptance when synced.")
}

// If we were running snap sync and it finished, disable doing another
// round on next sync cycle
Expand Down
2 changes: 1 addition & 1 deletion eth/handler_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (h *ethHandler) PeerInfo(id enode.ID) interface{} {
// AcceptTxs retrieves whether transaction processing is enabled on the node
// or if inbound transactions should simply be dropped.
func (h *ethHandler) AcceptTxs() bool {
return h.synced.Load()
return h.acceptTxs.Load()
}

// Handle is invoked from a peer's message handler when it receives a new remote
Expand Down
6 changes: 3 additions & 3 deletions eth/handler_eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func testRecvTransactions(t *testing.T, protocol uint) {
handler := newTestHandler()
defer handler.close()

handler.handler.synced.Store(true) // mark synced to accept transactions
handler.handler.acceptTxs.Store(true) // mark synced to accept transactions

txs := make(chan core.NewTxsEvent)
sub := handler.txpool.SubscribeTransactions(txs, false)
Expand Down Expand Up @@ -480,7 +480,7 @@ func testTransactionPropagation(t *testing.T, protocol uint) {
sinks[i] = newTestHandler()
defer sinks[i].close()

sinks[i].handler.synced.Store(true) // mark synced to accept transactions
sinks[i].handler.acceptTxs.Store(true) // mark synced to accept transactions
}
// Interconnect all the sink handlers with the source handler
for i, sink := range sinks {
Expand Down Expand Up @@ -542,7 +542,7 @@ func TestTransactionPendingReannounce(t *testing.T) {

sink := newTestHandler()
defer sink.close()
sink.handler.synced.Store(true) // mark synced to accept transactions
sink.handler.acceptTxs.Store(true) // mark synced to accept transactions

sourcePipe, sinkPipe := p2p.MsgPipe()
defer sourcePipe.Close()
Expand Down
13 changes: 13 additions & 0 deletions eth/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package eth

import (
"math/big"
"math/rand"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -163,6 +164,11 @@ func (cs *chainSyncer) nextSyncOp() *chainSyncOp {
mode, ourTD := cs.modeAndLocalHead()
op := peerToSyncOp(mode, peer)
if op.td.Cmp(ourTD) <= 0 {
if !cs.handler.acceptTxs.Load() {
// Occurs only during a quick restart.
cs.handler.acceptTxs.Store(true)
log.Info("Enable transaction acceptance for already in sync.")
}
// We seem to be in sync according to the legacy rules. In the merge
// world, it can also mean we're stuck on the merge block, waiting for
// a beacon client. In the latter case, notify the user.
Expand All @@ -171,6 +177,13 @@ func (cs *chainSyncer) nextSyncOp() *chainSyncOp {
cs.warned = time.Now()
}
return nil // We're in sync
} else if op.td.Cmp(new(big.Int).Add(ourTD, new(big.Int).SetUint64(10*2))) > 0 {
if cs.handler.acceptTxs.Load() && rand.New(rand.NewSource(time.Now().UnixNano())).Int31n(10) < 1 {
// There is only a 1/10 probability of disabling transaction acceptance.
// This randomness helps protect against attacks where a malicious node falsely claims to have higher blocks.
cs.handler.acceptTxs.Store(false)
log.Info("Disable transaction acceptance randomly for the delay exceeding 10 blocks.")
}
}
return op
}
Expand Down

0 comments on commit 027de39

Please sign in to comment.