From feea10a820950ce55eadfdbba0bfeb7a6efaabd6 Mon Sep 17 00:00:00 2001 From: Arpit Temani Date: Wed, 13 Sep 2023 14:26:54 +0530 Subject: [PATCH] fix todos --- accounts/abi/topics.go | 2 +- core/state/statedb.go | 1 - core/txpool/legacypool/legacypool.go | 3 +- core/txpool/txpool.go | 14 ++- core/types/transaction.go | 162 +++++++++++++-------------- eth/api_backend.go | 27 +---- eth/backend.go | 18 ++- internal/ethapi/api.go | 1 - miner/worker.go | 59 ++++++---- 9 files changed, 147 insertions(+), 140 deletions(-) diff --git a/accounts/abi/topics.go b/accounts/abi/topics.go index 7144c50534..78e804bc21 100644 --- a/accounts/abi/topics.go +++ b/accounts/abi/topics.go @@ -76,7 +76,7 @@ func MakeTopics(query ...[]interface{}) ([][]common.Hash, error) { copy(topic[:], hash[:]) default: - // todo(rjl493456442) according solidity documentation, indexed event + // TODO (rjl493456442) according solidity documentation, indexed event // parameters that are not value types i.e. arrays and structs are not // stored directly but instead a keccak256-hash of an encoding is stored. // diff --git a/core/state/statedb.go b/core/state/statedb.go index d67f46f1b1..dccb185734 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -398,7 +398,6 @@ func (s *StateDB) ApplyMVWriteSet(writes []blockstm.WriteDescriptor) { s.SetNonce(addr, sr.GetNonce(addr)) case CodePath: s.SetCode(addr, sr.GetCode(addr)) - // TODO - Arpit ----------------- case SuicidePath: stateObject := sr.getDeletedStateObject(addr) if stateObject != nil && stateObject.deleted { diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 861bbd5d1b..d513f121e3 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -143,7 +143,8 @@ type Config struct { AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts - Lifetime time.Duration // Maximum amount of time non-executable transaction are queued + Lifetime time.Duration // Maximum amount of time non-executable transaction are queued + AllowUnprotectedTxs bool // Allow non-EIP-155 transactions } // DefaultConfig contains the default configurations for the transaction pool. diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 03e18738d2..7d4a794be2 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -319,7 +319,6 @@ func (p *TxPool) Pending(enforceTips bool) map[common.Address][]*LazyTransaction // SubscribeNewTxsEvent registers a subscription of NewTxsEvent and starts sending // events to the given channel. -// TODO - Arpit func (p *TxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { subs := make([]event.Subscription, len(p.subpools)) for i, subpool := range p.subpools { @@ -359,6 +358,19 @@ func (p *TxPool) Nonce(addr common.Address) uint64 { return nonce } +// Stats retrieves the current pool stats, namely the number of pending and the +// number of queued (non-executable) transactions. +func (p *TxPool) Stats() (int, int) { + var runnable, blocked int + for _, subpool := range p.subpools { + run, block := subpool.Stats() + + runnable += run + blocked += block + } + return runnable, blocked +} + // // Stats retrieves the current pool stats, namely the number of pending and the // // number of queued (non-executable) transactions. // func (p *TxPool) Stats() (int, int) { diff --git a/core/types/transaction.go b/core/types/transaction.go index a3517d5458..93244c625f 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -18,7 +18,6 @@ package types import ( "bytes" - "container/heap" "errors" "io" "math/big" @@ -531,21 +530,20 @@ type TxWithMinerFee struct { minerFee *uint256.Int } -// TODO - Arpit -// NewTxWithMinerFee creates a wrapped transaction, calculating the effective -// miner gasTipCap if a base fee is provided. -// Returns error in case of a negative effective miner gasTipCap. -func NewTxWithMinerFee(tx *Transaction, baseFee *uint256.Int) (*TxWithMinerFee, error) { - // minerFee, err := tx.EffectiveGasTipUnit(baseFee) - // if err != nil { - // return nil, err - // } +// // NewTxWithMinerFee creates a wrapped transaction, calculating the effective +// // miner gasTipCap if a base fee is provided. +// // Returns error in case of a negative effective miner gasTipCap. +// func NewTxWithMinerFee(tx *Transaction, baseFee *uint256.Int) (*TxWithMinerFee, error) { +// // minerFee, err := tx.EffectiveGasTipUnit(baseFee) +// // if err != nil { +// // return nil, err +// // } - return &TxWithMinerFee{ - tx: tx, - minerFee: uint256.NewInt(1), - }, nil -} +// return &TxWithMinerFee{ +// tx: tx, +// minerFee: uint256.NewInt(1), +// }, nil +// } // TxByPriceAndTime implements both the sort and the heap interface, making it useful // for all at once sorting as well as individually adding and removing elements. @@ -623,70 +621,70 @@ func NewTransactionsByPriceAndNonce(signer Signer, txs map[common.Address]Transa } }*/ -func NewTransactionsByPriceAndNonce(signer Signer, txs map[common.Address]Transactions, baseFee *uint256.Int) *TransactionsByPriceAndNonce { - // Initialize a price and received time based heap with the head transactions - heads := make(TxByPriceAndTime, 0, len(txs)) - - for from, accTxs := range txs { - if len(accTxs) == 0 { - continue - } - - acc, _ := Sender(signer, accTxs[0]) - wrapped, err := NewTxWithMinerFee(accTxs[0], baseFee) - - // Remove transaction if sender doesn't match from, or if wrapping fails. - if acc != from || err != nil { - delete(txs, from) - continue - } - - heads = append(heads, wrapped) - txs[from] = accTxs[1:] - } - - heap.Init(&heads) - - // Assemble and return the transaction set - return &TransactionsByPriceAndNonce{ - txs: txs, - heads: heads, - signer: signer, - baseFee: baseFee, - } -} - -// Peek returns the next transaction by price. -func (t *TransactionsByPriceAndNonce) Peek() *Transaction { - if len(t.heads) == 0 { - return nil - } - - return t.heads[0].tx -} - -// Shift replaces the current best head with the next one from the same account. -func (t *TransactionsByPriceAndNonce) Shift() { - acc, _ := Sender(t.signer, t.heads[0].tx) - if txs, ok := t.txs[acc]; ok && len(txs) > 0 { - if wrapped, err := NewTxWithMinerFee(txs[0], t.baseFee); err == nil { - t.heads[0], t.txs[acc] = wrapped, txs[1:] - heap.Fix(&t.heads, 0) - - return - } - } - - heap.Pop(&t.heads) -} - -func (t *TransactionsByPriceAndNonce) GetTxs() int { - return len(t.txs) -} - -// Pop removes the best transaction, *not* replacing it with the next one from -// the same account. This should be used when a transaction cannot be executed -// and hence all subsequent ones should be discarded from the same account. -func (t *TransactionsByPriceAndNonce) Pop() { - heap.Pop(&t.heads) -} +// func NewTransactionsByPriceAndNonce(signer Signer, txs map[common.Address]Transactions, baseFee *uint256.Int) *TransactionsByPriceAndNonce { +// // Initialize a price and received time based heap with the head transactions +// heads := make(TxByPriceAndTime, 0, len(txs)) + +// for from, accTxs := range txs { +// if len(accTxs) == 0 { +// continue +// } + +// acc, _ := Sender(signer, accTxs[0]) +// wrapped, err := NewTxWithMinerFee(accTxs[0], baseFee) + +// // Remove transaction if sender doesn't match from, or if wrapping fails. +// if acc != from || err != nil { +// delete(txs, from) +// continue +// } + +// heads = append(heads, wrapped) +// txs[from] = accTxs[1:] +// } + +// heap.Init(&heads) + +// // Assemble and return the transaction set +// return &TransactionsByPriceAndNonce{ +// txs: txs, +// heads: heads, +// signer: signer, +// baseFee: baseFee, +// } +// } + +// // Peek returns the next transaction by price. +// func (t *TransactionsByPriceAndNonce) Peek() *Transaction { +// if len(t.heads) == 0 { +// return nil +// } + +// return t.heads[0].tx +// } + +// // Shift replaces the current best head with the next one from the same account. +// func (t *TransactionsByPriceAndNonce) Shift() { +// acc, _ := Sender(t.signer, t.heads[0].tx) +// if txs, ok := t.txs[acc]; ok && len(txs) > 0 { +// if wrapped, err := NewTxWithMinerFee(txs[0], t.baseFee); err == nil { +// t.heads[0], t.txs[acc] = wrapped, txs[1:] +// heap.Fix(&t.heads, 0) + +// return +// } +// } + +// heap.Pop(&t.heads) +// } + +// func (t *TransactionsByPriceAndNonce) GetTxs() int { +// return len(t.txs) +// } + +// // Pop removes the best transaction, *not* replacing it with the next one from +// // the same account. This should be used when a transaction cannot be executed +// // and hence all subsequent ones should be discarded from the same account. +// func (t *TransactionsByPriceAndNonce) Pop() { +// heap.Pop(&t.heads) +// } diff --git a/eth/api_backend.go b/eth/api_backend.go index d6dcd9085f..734a100605 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -68,9 +68,6 @@ func (b *EthAPIBackend) HeaderByNumber(ctx context.Context, number rpc.BlockNumb // Pending block is only known by the miner if number == rpc.PendingBlockNumber { block := b.eth.miner.PendingBlock() - if block == nil { - return nil, errors.New("pending block is not available") - } return block.Header(), nil } // Otherwise resolve and return the block @@ -139,9 +136,6 @@ func (b *EthAPIBackend) BlockByNumber(ctx context.Context, number rpc.BlockNumbe // Pending block is only known by the miner if number == rpc.PendingBlockNumber { block := b.eth.miner.PendingBlock() - if block == nil { - return nil, errors.New("pending block is not available") - } return block, nil } // Otherwise resolve and return the block @@ -151,17 +145,12 @@ func (b *EthAPIBackend) BlockByNumber(ctx context.Context, number rpc.BlockNumbe } if number == rpc.FinalizedBlockNumber { - // TODO - Arpit - // finalBlocknumber, err := getFinalizedBlockNumber(b.eth) - // if err != nil { - // return nil, errors.New("finalized block not found") - // } - - header := b.eth.blockchain.CurrentFinalBlock() - if header == nil { + finalBlocknumber, err := getFinalizedBlockNumber(b.eth) + if err != nil { return nil, errors.New("finalized block not found") } - return b.eth.blockchain.GetBlock(header.Hash(), header.Number.Uint64()), nil + + return b.eth.blockchain.CurrentFinalizedBlock(finalBlocknumber), nil } if number == rpc.SafeBlockNumber { @@ -170,9 +159,7 @@ func (b *EthAPIBackend) BlockByNumber(ctx context.Context, number rpc.BlockNumbe } header := b.eth.blockchain.CurrentSafeBlock() - if header == nil { - return nil, errors.New("safe block not found") - } + return b.eth.blockchain.GetBlock(header.Hash(), header.Number.Uint64()), nil } @@ -369,9 +356,7 @@ func (b *EthAPIBackend) GetPoolNonce(ctx context.Context, addr common.Address) ( } func (b *EthAPIBackend) Stats() (runnable int, blocked int) { - // TODO - Arpit - return 0, 0 - // return b.eth.txPool.Stats() + return b.eth.txPool.Stats() } func (b *EthAPIBackend) TxPoolContent() (map[common.Address][]*types.Transaction, map[common.Address][]*types.Transaction) { diff --git a/eth/backend.go b/eth/backend.go index b0b1d1ff15..4b62818fc2 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -155,7 +155,8 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { if err != nil { return nil, err } - // TODO - Check this - Arpit + + // START: Bor changes eth := &Ethereum{ config: config, merger: consensus.NewMerger(chainDb), @@ -175,12 +176,11 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { } eth.APIBackend = &EthAPIBackend{stack.Config().ExtRPCEnabled(), stack.Config().AllowUnprotectedTxs, eth, nil} - // TODO - Check this - Arpit - // if ethereum.APIBackend.allowUnprotectedTxs { - // log.Debug(" ###########", "Unprotected transactions allowed") + if eth.APIBackend.allowUnprotectedTxs { + log.Debug(" ###########", "Unprotected transactions allowed") - // config.TxPool.AllowUnprotectedTxs = true - // } + config.TxPool.AllowUnprotectedTxs = true + } gpoParams := config.GPO if gpoParams.Default == nil { @@ -832,12 +832,10 @@ func (s *Ethereum) Stop() error { // Close all bg processes close(s.closeCh) - // closing consensus engine first, as miner has deps on it - s.engine.Close() - // TODO - Check this Arpit - // s.txPool.Stop() + s.txPool.Close() s.miner.Close() s.blockchain.Stop() + s.engine.Close() // Clean shutdown marker as the last thing before closing db s.shutdownTracker.Stop() diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index a857fe9309..7ef511c490 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -2107,7 +2107,6 @@ func (s *TransactionAPI) GetRawTransactionByHash(ctx context.Context, hash commo func (s *TransactionAPI) GetTransactionReceipt(ctx context.Context, hash common.Hash) (map[string]interface{}, error) { borTx := false - // TEST THIS --------------------------------- Arpit tx, blockHash, blockNumber, index, err := s.b.GetTransaction(ctx, hash) if err != nil { // When the transaction doesn't exist, the RPC method should return JSON null diff --git a/miner/worker.go b/miner/worker.go index bc5238ea83..d6907ca5bf 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -168,6 +168,7 @@ type newWorkReq struct { //nolint:containedctx ctx context.Context interrupt *atomic.Int32 + noempty bool timestamp int64 } @@ -265,6 +266,13 @@ type worker struct { profileCount *int32 // Global count for profiling interruptCommitFlag bool // Interrupt commit ( Default true ) interruptedTxCache *vm.TxCache + + // noempty is the flag used to control whether the feature of pre-seal empty + // block is enabled. The default value is false(pre-seal is enabled by default). + // But in some special scenario the consensus engine will seal blocks instantaneously, + // in this case this feature will add all empty blocks into canonical chain + // non-stop and no real transaction will be included. + noempty atomic.Bool } //nolint:staticcheck @@ -292,6 +300,7 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize), interruptCommitFlag: config.CommitInterruptFlag, } + worker.noempty.Store(true) worker.profileCount = new(int32) // Subscribe NewTxsEvent for tx pool worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh) @@ -307,10 +316,9 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus Cache: interruptedTxCache, } - // TODO - Arpit - // if !worker.interruptCommitFlag { - // worker.noempty.Store(false) - // } + if !worker.interruptCommitFlag { + worker.noempty.Store(false) + } // Sanitize recommit interval if the user-specified one is too short. recommit := worker.config.Recommit @@ -351,6 +359,16 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus return worker } +// disablePreseal disables pre-sealing feature +func (w *worker) disablePreseal() { + w.noempty.Store(true) +} + +// enablePreseal enables pre-sealing feature +func (w *worker) enablePreseal() { + w.noempty.Store(false) +} + // setEtherbase sets the etherbase used to initialize the block coinbase field. func (w *worker) setEtherbase(addr common.Address) { w.mu.Lock() @@ -481,7 +499,7 @@ func (w *worker) newWorkLoop(ctx context.Context, recommit time.Duration) { <-timer.C // discard the initial tick // commit aborts in-flight transaction execution with given signal and resubmits a new one. - commit := func(s int32) { + commit := func(noempty bool, s int32) { ctx, span := tracing.Trace(ctx, "worker.newWorkLoop.commit") tracing.EndSpan(span) if interrupt != nil { @@ -490,7 +508,7 @@ func (w *worker) newWorkLoop(ctx context.Context, recommit time.Duration) { interrupt = new(atomic.Int32) select { - case w.newWorkCh <- &newWorkReq{interrupt: interrupt, timestamp: timestamp, ctx: ctx}: + case w.newWorkCh <- &newWorkReq{interrupt: interrupt, timestamp: timestamp, ctx: ctx, noempty: noempty}: case <-w.exitCh: return } @@ -517,13 +535,13 @@ func (w *worker) newWorkLoop(ctx context.Context, recommit time.Duration) { clearPending(w.chain.CurrentBlock().Number.Uint64()) timestamp = time.Now().Unix() - commit(commitInterruptNewHead) + commit(false, commitInterruptNewHead) case head := <-w.chainHeadCh: clearPending(head.Block.NumberU64()) timestamp = time.Now().Unix() - commit(commitInterruptNewHead) + commit(false, commitInterruptNewHead) case <-timer.C: // If sealing is running resubmit a new work cycle periodically to pull in @@ -534,7 +552,7 @@ func (w *worker) newWorkLoop(ctx context.Context, recommit time.Duration) { timer.Reset(recommit) continue } - commit(commitInterruptResubmit) + commit(true, commitInterruptResubmit) } case interval := <-w.resubmitIntervalCh: @@ -594,11 +612,11 @@ func (w *worker) mainLoop(ctx context.Context) { if w.chainConfig.ChainID.Cmp(params.BorMainnetChainConfig.ChainID) == 0 || w.chainConfig.ChainID.Cmp(params.MumbaiChainConfig.ChainID) == 0 { if w.eth.PeerCount() > 0 { //nolint:contextcheck - w.commitWork(req.ctx, req.interrupt, req.timestamp) + w.commitWork(req.ctx, req.interrupt, req.noempty, req.timestamp) } } else { //nolint:contextcheck - w.commitWork(req.ctx, req.interrupt, req.timestamp) + w.commitWork(req.ctx, req.interrupt, req.noempty, req.timestamp) } case req := <-w.getWorkCh: @@ -645,7 +663,7 @@ func (w *worker) mainLoop(ctx context.Context) { // submit sealing work here since all empty submission will be rejected // by clique. Of course the advance sealing(empty submission) is disabled. if w.chainConfig.Clique != nil && w.chainConfig.Clique.Period == 0 { - w.commitWork(ctx, nil, time.Now().Unix()) + w.commitWork(ctx, nil, true, time.Now().Unix()) } } @@ -1509,7 +1527,7 @@ func (w *worker) generateWork(ctx context.Context, params *generateParams) (*typ // commitWork generates several new sealing tasks based on the parent block // and submit them to the sealer. -func (w *worker) commitWork(ctx context.Context, interrupt *atomic.Int32, timestamp int64) { +func (w *worker) commitWork(ctx context.Context, interrupt *atomic.Int32, noempty bool, timestamp int64) { // Abort committing if node is still syncing if w.syncing.Load() { return @@ -1550,9 +1568,7 @@ func (w *worker) commitWork(ctx context.Context, interrupt *atomic.Int32, timest stopFn() }() - // TODO - Arpit - if w.interruptCommitFlag { - // if !noempty && w.interruptCommitFlag { + if !noempty && w.interruptCommitFlag { block := w.chain.GetBlockByHash(w.chain.CurrentBlock().Hash()) interruptCtx, stopFn = getInterruptTimer(ctx, work, block) // nolint : staticcheck @@ -1567,12 +1583,11 @@ func (w *worker) commitWork(ctx context.Context, interrupt *atomic.Int32, timest attribute.Int("number", int(work.header.Number.Uint64())), ) - // TODO - Arpit - // // Create an empty block based on temporary copied state for - // // sealing in advance without waiting block execution finished. - // if !noempty && !w.noempty.Load() { - // _ = w.commit(ctx, work.copy(), nil, false, start) - // } + // Create an empty block based on temporary copied state for + // sealing in advance without waiting block execution finished. + if !noempty && !w.noempty.Load() { + _ = w.commit(ctx, work.copy(), nil, false, start) + } // Fill pending transactions from the txpool into the block. err = w.fillTransactions(ctx, interrupt, work, interruptCtx)