diff --git a/.github/workflows/publish-release.yml b/.github/workflows/publish-release.yml index 532962d547..d2b75106e4 100644 --- a/.github/workflows/publish-release.yml +++ b/.github/workflows/publish-release.yml @@ -52,7 +52,7 @@ jobs: - name: Run Gosec Security Scanner if: ${{ github.event.inputs.skip_checks != 'true' }} - uses: securego/gosec@master + uses: securego/gosec@v2.19.0 with: args: ./... diff --git a/.github/workflows/sast-linters.yml b/.github/workflows/sast-linters.yml index bd8910893f..b2f4d7f646 100644 --- a/.github/workflows/sast-linters.yml +++ b/.github/workflows/sast-linters.yml @@ -30,7 +30,7 @@ jobs: go-version: '1.20' - name: Run Gosec Security Scanner - uses: securego/gosec@master + uses: securego/gosec@v2.19.0 with: args: ./... diff --git a/app/ante/handler_options.go b/app/ante/handler_options.go index b620f01d9b..9ae24bf56e 100644 --- a/app/ante/handler_options.go +++ b/app/ante/handler_options.go @@ -130,6 +130,7 @@ func newCosmosAnteHandlerForSystemTx(options HandlerOptions) sdk.AnteHandler { ante.NewValidateMemoDecorator(options.AccountKeeper), ante.NewConsumeGasForTxSizeDecorator(options.AccountKeeper), ante.NewDeductFeeDecorator(options.AccountKeeper, options.BankKeeper, options.FeegrantKeeper, options.TxFeeChecker), + NewSystemPriorityDecorator(), // SetPubKeyDecorator must be called before all signature verification decorators ante.NewSetPubKeyDecorator(options.AccountKeeper), ante.NewValidateSigCountDecorator(options.AccountKeeper), diff --git a/app/ante/system_tx_priority_decorator.go b/app/ante/system_tx_priority_decorator.go new file mode 100644 index 0000000000..5b560a1416 --- /dev/null +++ b/app/ante/system_tx_priority_decorator.go @@ -0,0 +1,29 @@ +package ante + +import ( + "math" + + sdk "github.com/cosmos/cosmos-sdk/types" +) + +var _ sdk.AnteDecorator = SystemPriorityDecorator{} + +// SystemPriorityDecorator adds bigger priority for system messages +type SystemPriorityDecorator struct { +} + +// NewSystemPriorityDecorator creates a decorator to add bigger priority for system messages +func NewSystemPriorityDecorator() SystemPriorityDecorator { + return SystemPriorityDecorator{} +} + +// AnteHandle implements AnteDecorator +func (vad SystemPriorityDecorator) AnteHandle( + ctx sdk.Context, + tx sdk.Tx, + simulate bool, + next sdk.AnteHandler, +) (sdk.Context, error) { + newCtx := ctx.WithPriority(math.MaxInt64) + return next(newCtx, tx, simulate) +} diff --git a/app/ante/system_tx_priority_decorator_test.go b/app/ante/system_tx_priority_decorator_test.go new file mode 100644 index 0000000000..f777a98ba1 --- /dev/null +++ b/app/ante/system_tx_priority_decorator_test.go @@ -0,0 +1,45 @@ +package ante_test + +import ( + "math" + "math/rand" + "testing" + "time" + + simtestutil "github.com/cosmos/cosmos-sdk/testutil/sims" + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/stretchr/testify/require" + "github.com/zeta-chain/zetacore/app" + "github.com/zeta-chain/zetacore/app/ante" + "github.com/zeta-chain/zetacore/testutil/sample" +) + +func TestSystemTxPriorityDecorator_AnteHandle(t *testing.T) { + txConfig := app.MakeEncodingConfig().TxConfig + + testPrivKey, _ := sample.PrivKeyAddressPair() + + decorator := ante.NewSystemPriorityDecorator() + mmd := MockAnteHandler{} + // set priority to 10 before ante handler + ctx := sdk.Context{}.WithIsCheckTx(true).WithPriority(10) + + tx, err := simtestutil.GenSignedMockTx( + rand.New(rand.NewSource(time.Now().UnixNano())), + txConfig, + []sdk.Msg{}, + sdk.NewCoins(), + simtestutil.DefaultGenTxGas, + "testing-chain-id", + []uint64{0}, + []uint64{0}, + testPrivKey, + ) + require.NoError(t, err) + ctx, err = decorator.AnteHandle(ctx, tx, false, mmd.AnteHandle) + require.NoError(t, err) + + // check that priority is set to max int64 + priorityAfter := ctx.Priority() + require.Equal(t, math.MaxInt64, int(priorityAfter)) +} diff --git a/app/app.go b/app/app.go index df81af9fac..94327c1faf 100644 --- a/app/app.go +++ b/app/app.go @@ -332,6 +332,7 @@ func New( bApp.SetCommitMultiStoreTracer(traceStore) bApp.SetVersion(version.Version) bApp.SetInterfaceRegistry(interfaceRegistry) + bApp.SetTxEncoder(encodingConfig.TxConfig.TxEncoder()) keys := sdk.NewKVStoreKeys( authtypes.StoreKey, @@ -385,6 +386,9 @@ func New( app.ConsensusParamsKeeper = consensusparamkeeper.NewKeeper(appCodec, keys[consensusparamtypes.StoreKey], authAddr) bApp.SetParamStore(&app.ConsensusParamsKeeper) + customProposalHandler := NewCustomProposalHandler(bApp.Mempool(), bApp) + app.SetPrepareProposal(customProposalHandler.PrepareProposalHandler()) + // add capability keeper and ScopeToModule for ibc module app.CapabilityKeeper = capabilitykeeper.NewKeeper( appCodec, diff --git a/app/custom_proposal_handler.go b/app/custom_proposal_handler.go new file mode 100644 index 0000000000..e6f9e4512f --- /dev/null +++ b/app/custom_proposal_handler.go @@ -0,0 +1,318 @@ +package app + +// This proposal handler is taken from https://github.com/cosmos/cosmos-sdk/blob/v0.47.10/baseapp/abci_utils.go +// Only difference is extraction of senders and nonce from tx. In latest version of cosmos, there is a way to provide adapter for this, but in 0.47.10 this is the only way. +// TODO: remove this once cosmos is upgraded: https://github.com/zeta-chain/node/issues/2156 + +import ( + "fmt" + + "github.com/cockroachdb/errors" + abci "github.com/cometbft/cometbft/abci/types" + + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/cosmos/cosmos-sdk/types/mempool" + zetamempool "github.com/zeta-chain/zetacore/app/mempool" +) + +type ( + // GasTx defines the contract that a transaction with a gas limit must implement. + GasTx interface { + GetGas() uint64 + } + + // ProposalTxVerifier defines the interface that is implemented by BaseApp, + // that any custom ABCI PrepareProposal and ProcessProposal handler can use + // to verify a transaction. + ProposalTxVerifier interface { + PrepareProposalVerifyTx(tx sdk.Tx) ([]byte, error) + ProcessProposalVerifyTx(txBz []byte) (sdk.Tx, error) + } + + // CustomProposalHandler defines the default ABCI PrepareProposal and + // ProcessProposal handlers. + CustomProposalHandler struct { + mempool mempool.Mempool + txVerifier ProposalTxVerifier + txSelector TxSelector + } +) + +func NewCustomProposalHandler(mp mempool.Mempool, txVerifier ProposalTxVerifier) *CustomProposalHandler { + return &CustomProposalHandler{ + mempool: mp, + txVerifier: txVerifier, + txSelector: NewDefaultTxSelector(), + } +} + +// SetTxSelector sets the TxSelector function on the CustomProposalHandler. +func (h *CustomProposalHandler) SetTxSelector(ts TxSelector) { + h.txSelector = ts +} + +// PrepareProposalHandler returns the default implementation for processing an +// ABCI proposal. The application's mempool is enumerated and all valid +// transactions are added to the proposal. Transactions are valid if they: +// +// 1) Successfully encode to bytes. +// 2) Are valid (i.e. pass runTx, AnteHandler only). +// +// Enumeration is halted once RequestPrepareProposal.MaxBytes of transactions is +// reached or the mempool is exhausted. +// +// Note: +// +// - Step (2) is identical to the validation step performed in +// DefaultProcessProposal. It is very important that the same validation logic +// is used in both steps, and applications must ensure that this is the case in +// non-default handlers. +// +// - If no mempool is set or if the mempool is a no-op mempool, the transactions +// requested from CometBFT will simply be returned, which, by default, are in +// FIFO order. +func (h *CustomProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHandler { + return func(ctx sdk.Context, req abci.RequestPrepareProposal) abci.ResponsePrepareProposal { + var maxBlockGas uint64 + if b := ctx.ConsensusParams().Block; b != nil { + // #nosec G701 range checked, cosmos-sdk forked code + maxBlockGas = uint64(b.MaxGas) + } + + defer h.txSelector.Clear() + + // If the mempool is nil or NoOp we simply return the transactions + // requested from CometBFT, which, by default, should be in FIFO order. + // + // Note, we still need to ensure the transactions returned respect req.MaxTxBytes. + _, isNoOp := h.mempool.(mempool.NoOpMempool) + if h.mempool == nil || isNoOp { + for _, txBz := range req.Txs { + // XXX: We pass nil as the memTx because we have no way of decoding the + // txBz. We'd need to break (update) the ProposalTxVerifier interface. + // As a result, we CANNOT account for block max gas. + // #nosec G701 range checked, cosmos-sdk forked code + stop := h.txSelector.SelectTxForProposal(uint64(req.MaxTxBytes), maxBlockGas, nil, txBz) + if stop { + break + } + } + + return abci.ResponsePrepareProposal{Txs: h.txSelector.SelectedTxs()} + } + + iterator := h.mempool.Select(ctx, req.Txs) + selectedTxsSignersSeqs := make(map[string]uint64) + var selectedTxsNums int + for iterator != nil { + memTx := iterator.Tx() + + sendersWithNonce, err := zetamempool.GetSendersWithNonce(memTx) + if err != nil { + panic(fmt.Errorf("failed to get signatures: %w", err)) + } + + // If the signers aren't in selectedTxsSignersSeqs then we haven't seen them before + // so we add them and continue given that we don't need to check the sequence. + shouldAdd := true + txSignersSeqs := make(map[string]uint64) + for _, sig := range sendersWithNonce { + signer := sig.Sender + nonce := sig.Nonce + seq, ok := selectedTxsSignersSeqs[signer] + if !ok { + txSignersSeqs[signer] = nonce + continue + } + + // If we have seen this signer before in this block, we must make + // sure that the current sequence is seq+1; otherwise is invalid + // and we skip it. + if seq+1 != nonce { + shouldAdd = false + break + } + txSignersSeqs[signer] = nonce + } + if !shouldAdd { + iterator = iterator.Next() + continue + } + + // NOTE: Since transaction verification was already executed in CheckTx, + // which calls mempool.Insert, in theory everything in the pool should be + // valid. But some mempool implementations may insert invalid txs, so we + // check again. + txBz, err := h.txVerifier.PrepareProposalVerifyTx(memTx) + if err != nil { + err := h.mempool.Remove(memTx) + if err != nil && !errors.Is(err, mempool.ErrTxNotFound) { + panic(err) + } + } else { + // #nosec G701 range checked, cosmos-sdk forked code + stop := h.txSelector.SelectTxForProposal(uint64(req.MaxTxBytes), maxBlockGas, memTx, txBz) + if stop { + break + } + + txsLen := len(h.txSelector.SelectedTxs()) + for sender, seq := range txSignersSeqs { + // If txsLen != selectedTxsNums is true, it means that we've + // added a new tx to the selected txs, so we need to update + // the sequence of the sender. + if txsLen != selectedTxsNums { + selectedTxsSignersSeqs[sender] = seq + } else if _, ok := selectedTxsSignersSeqs[sender]; !ok { + // The transaction hasn't been added but it passed the + // verification, so we know that the sequence is correct. + // So we set this sender's sequence to seq-1, in order + // to avoid unnecessary calls to PrepareProposalVerifyTx. + selectedTxsSignersSeqs[sender] = seq - 1 + } + } + selectedTxsNums = txsLen + } + + iterator = iterator.Next() + } + + return abci.ResponsePrepareProposal{Txs: h.txSelector.SelectedTxs()} + } +} + +// ProcessProposalHandler returns the default implementation for processing an +// ABCI proposal. Every transaction in the proposal must pass 2 conditions: +// +// 1. The transaction bytes must decode to a valid transaction. +// 2. The transaction must be valid (i.e. pass runTx, AnteHandler only) +// +// If any transaction fails to pass either condition, the proposal is rejected. +// Note that step (2) is identical to the validation step performed in +// DefaultPrepareProposal. It is very important that the same validation logic +// is used in both steps, and applications must ensure that this is the case in +// non-default handlers. +func (h *CustomProposalHandler) ProcessProposalHandler() sdk.ProcessProposalHandler { + // If the mempool is nil or NoOp we simply return ACCEPT, + // because PrepareProposal may have included txs that could fail verification. + _, isNoOp := h.mempool.(mempool.NoOpMempool) + if h.mempool == nil || isNoOp { + return NoOpProcessProposal() + } + + return func(ctx sdk.Context, req abci.RequestProcessProposal) abci.ResponseProcessProposal { + var totalTxGas uint64 + + var maxBlockGas int64 + if b := ctx.ConsensusParams().Block; b != nil { + maxBlockGas = b.MaxGas + } + + for _, txBytes := range req.Txs { + tx, err := h.txVerifier.ProcessProposalVerifyTx(txBytes) + if err != nil { + return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT} + } + + if maxBlockGas > 0 { + gasTx, ok := tx.(GasTx) + if ok { + totalTxGas += gasTx.GetGas() + } + + // #nosec G701 range checked, cosmos-sdk forked code + if totalTxGas > uint64(maxBlockGas) { + return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT} + } + } + } + + return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT} + } +} + +// NoOpPrepareProposal defines a no-op PrepareProposal handler. It will always +// return the transactions sent by the client's request. +func NoOpPrepareProposal() sdk.PrepareProposalHandler { + return func(_ sdk.Context, req abci.RequestPrepareProposal) abci.ResponsePrepareProposal { + return abci.ResponsePrepareProposal{Txs: req.Txs} + } +} + +// NoOpProcessProposal defines a no-op ProcessProposal Handler. It will always +// return ACCEPT. +func NoOpProcessProposal() sdk.ProcessProposalHandler { + return func(_ sdk.Context, _ abci.RequestProcessProposal) abci.ResponseProcessProposal { + return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT} + } +} + +// TxSelector defines a helper type that assists in selecting transactions during +// mempool transaction selection in PrepareProposal. It keeps track of the total +// number of bytes and total gas of the selected transactions. It also keeps +// track of the selected transactions themselves. +type TxSelector interface { + // SelectedTxs should return a copy of the selected transactions. + SelectedTxs() [][]byte + + // Clear should clear the TxSelector, nulling out all relevant fields. + Clear() + + // SelectTxForProposal should attempt to select a transaction for inclusion in + // a proposal based on inclusion criteria defined by the TxSelector. It must + // return if the caller should halt the transaction selection loop + // (typically over a mempool) or otherwise. + SelectTxForProposal(maxTxBytes, maxBlockGas uint64, memTx sdk.Tx, txBz []byte) bool +} + +type defaultTxSelector struct { + totalTxBytes uint64 + totalTxGas uint64 + selectedTxs [][]byte +} + +func NewDefaultTxSelector() TxSelector { + return &defaultTxSelector{} +} + +func (ts *defaultTxSelector) SelectedTxs() [][]byte { + txs := make([][]byte, len(ts.selectedTxs)) + copy(txs, ts.selectedTxs) + return txs +} + +func (ts *defaultTxSelector) Clear() { + ts.totalTxBytes = 0 + ts.totalTxGas = 0 + ts.selectedTxs = nil +} + +func (ts *defaultTxSelector) SelectTxForProposal(maxTxBytes, maxBlockGas uint64, memTx sdk.Tx, txBz []byte) bool { + txSize := uint64(len(txBz)) + + var txGasLimit uint64 + if memTx != nil { + if gasTx, ok := memTx.(GasTx); ok { + txGasLimit = gasTx.GetGas() + } + } + + // only add the transaction to the proposal if we have enough capacity + if (txSize + ts.totalTxBytes) <= maxTxBytes { + // If there is a max block gas limit, add the tx only if the limit has + // not been met. + if maxBlockGas > 0 { + if (txGasLimit + ts.totalTxGas) <= maxBlockGas { + ts.totalTxGas += txGasLimit + ts.totalTxBytes += txSize + ts.selectedTxs = append(ts.selectedTxs, txBz) + } + } else { + ts.totalTxBytes += txSize + ts.selectedTxs = append(ts.selectedTxs, txBz) + } + } + + // check if we've reached capacity; if so, we cannot select any more transactions + return ts.totalTxBytes >= maxTxBytes || (maxBlockGas > 0 && (ts.totalTxGas >= maxBlockGas)) +} diff --git a/app/mempool/priority_nonce_mempool.go b/app/mempool/priority_nonce_mempool.go new file mode 100644 index 0000000000..cb6ed8748f --- /dev/null +++ b/app/mempool/priority_nonce_mempool.go @@ -0,0 +1,445 @@ +// This proposal handler is taken from https://github.com/cosmos/cosmos-sdk/blob/v0.47.10/types/mempool/priority_nonce.go +// Only difference is extraction of senders and nonce from tx. In latest version of cosmos, there is a way to provide adapter for this, but in 0.47.10 this is the only way. +// TODO: remove this once cosmos is upgraded: https://github.com/zeta-chain/node/issues/2156 + +package mempool + +import ( + "context" + "fmt" + "math" + + "github.com/huandu/skiplist" + + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/cosmos/cosmos-sdk/types/mempool" +) + +var ( + _ mempool.Mempool = (*PriorityNonceMempool)(nil) + _ mempool.Iterator = (*PriorityNonceIterator)(nil) +) + +// PriorityNonceMempool is a mempool implementation that stores txs +// in a partially ordered set by 2 dimensions: priority, and sender-nonce +// (sequence number). Internally it uses one priority ordered skip list and one +// skip list per sender ordered by sender-nonce (sequence number). When there +// are multiple txs from the same sender, they are not always comparable by +// priority to other sender txs and must be partially ordered by both sender-nonce +// and priority. +type PriorityNonceMempool struct { + priorityIndex *skiplist.SkipList + priorityCounts map[int64]int + senderIndices map[string]*skiplist.SkipList + scores map[txMeta]txMeta + onRead func(tx sdk.Tx) + txReplacement func(op, np int64, oTx, nTx sdk.Tx) bool + maxTx int +} + +type PriorityNonceIterator struct { + senderCursors map[string]*skiplist.Element + nextPriority int64 + sender string + priorityNode *skiplist.Element + mempool *PriorityNonceMempool +} + +// txMeta stores transaction metadata used in indices +type txMeta struct { + // nonce is the sender's sequence number + nonce uint64 + // priority is the transaction's priority + priority int64 + // sender is the transaction's sender + sender string + // weight is the transaction's weight, used as a tiebreaker for transactions with the same priority + weight int64 + // senderElement is a pointer to the transaction's element in the sender index + senderElement *skiplist.Element +} + +// txMetaLess is a comparator for txKeys that first compares priority, then weight, +// then sender, then nonce, uniquely identifying a transaction. +// +// Note, txMetaLess is used as the comparator in the priority index. +func txMetaLess(a, b any) int { + keyA := a.(txMeta) + keyB := b.(txMeta) + res := skiplist.Int64.Compare(keyA.priority, keyB.priority) + if res != 0 { + return res + } + + // Weight is used as a tiebreaker for transactions with the same priority. + // Weight is calculated in a single pass in .Select(...) and so will be 0 + // on .Insert(...). + res = skiplist.Int64.Compare(keyA.weight, keyB.weight) + if res != 0 { + return res + } + + // Because weight will be 0 on .Insert(...), we must also compare sender and + // nonce to resolve priority collisions. If we didn't then transactions with + // the same priority would overwrite each other in the priority index. + res = skiplist.String.Compare(keyA.sender, keyB.sender) + if res != 0 { + return res + } + + return skiplist.Uint64.Compare(keyA.nonce, keyB.nonce) +} + +type PriorityNonceMempoolOption func(*PriorityNonceMempool) + +// PriorityNonceWithOnRead sets a callback to be called when a tx is read from +// the mempool. +func PriorityNonceWithOnRead(onRead func(tx sdk.Tx)) PriorityNonceMempoolOption { + return func(mp *PriorityNonceMempool) { + mp.onRead = onRead + } +} + +// PriorityNonceWithTxReplacement sets a callback to be called when duplicated +// transaction nonce detected during mempool insert. An application can define a +// transaction replacement rule based on tx priority or certain transaction fields. +func PriorityNonceWithTxReplacement(txReplacementRule func(op, np int64, oTx, nTx sdk.Tx) bool) PriorityNonceMempoolOption { + return func(mp *PriorityNonceMempool) { + mp.txReplacement = txReplacementRule + } +} + +// PriorityNonceWithMaxTx sets the maximum number of transactions allowed in the +// mempool with the semantics: +// +// <0: disabled, `Insert` is a no-op +// 0: unlimited +// >0: maximum number of transactions allowed +func PriorityNonceWithMaxTx(maxTx int) PriorityNonceMempoolOption { + return func(mp *PriorityNonceMempool) { + mp.maxTx = maxTx + } +} + +// DefaultPriorityMempool returns a priorityNonceMempool with no options. +func DefaultPriorityMempool() mempool.Mempool { + return NewPriorityMempool() +} + +// NewPriorityMempool returns the SDK's default mempool implementation which +// returns txs in a partial order by 2 dimensions; priority, and sender-nonce. +func NewPriorityMempool(opts ...PriorityNonceMempoolOption) *PriorityNonceMempool { + mp := &PriorityNonceMempool{ + priorityIndex: skiplist.New(skiplist.LessThanFunc(txMetaLess)), + priorityCounts: make(map[int64]int), + senderIndices: make(map[string]*skiplist.SkipList), + scores: make(map[txMeta]txMeta), + } + + for _, opt := range opts { + opt(mp) + } + + return mp +} + +// NextSenderTx returns the next transaction for a given sender by nonce order, +// i.e. the next valid transaction for the sender. If no such transaction exists, +// nil will be returned. +func (mp *PriorityNonceMempool) NextSenderTx(sender string) sdk.Tx { + senderIndex, ok := mp.senderIndices[sender] + if !ok { + return nil + } + + cursor := senderIndex.Front() + return cursor.Value.(sdk.Tx) +} + +// Insert attempts to insert a Tx into the app-side mempool in O(log n) time, +// returning an error if unsuccessful. Sender and nonce are derived from the +// transaction's first signature. +// +// Transactions are unique by sender and nonce. Inserting a duplicate tx is an +// O(log n) no-op. +// +// Inserting a duplicate tx with a different priority overwrites the existing tx, +// changing the total order of the mempool. +func (mp *PriorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error { + if mp.maxTx > 0 && mp.CountTx() >= mp.maxTx { + return mempool.ErrMempoolTxMaxCapacity + } else if mp.maxTx < 0 { + return nil + } + + sendersWithNonce, err := GetSendersWithNonce(tx) + if err != nil { + return err + } + + sender := sendersWithNonce[0].Sender + nonce := sendersWithNonce[0].Nonce + + sdkContext := sdk.UnwrapSDKContext(ctx) + priority := sdkContext.Priority() + key := txMeta{nonce: nonce, priority: priority, sender: sender} + + senderIndex, ok := mp.senderIndices[sender] + if !ok { + senderIndex = skiplist.New(skiplist.LessThanFunc(func(a, b any) int { + return skiplist.Uint64.Compare(b.(txMeta).nonce, a.(txMeta).nonce) + })) + + // initialize sender index if not found + mp.senderIndices[sender] = senderIndex + } + + // Since mp.priorityIndex is scored by priority, then sender, then nonce, a + // changed priority will create a new key, so we must remove the old key and + // re-insert it to avoid having the same tx with different priorityIndex indexed + // twice in the mempool. + // + // This O(log n) remove operation is rare and only happens when a tx's priority + // changes. + sk := txMeta{nonce: nonce, sender: sender} + if oldScore, txExists := mp.scores[sk]; txExists { + if mp.txReplacement != nil && !mp.txReplacement(oldScore.priority, priority, senderIndex.Get(key).Value.(sdk.Tx), tx) { + return fmt.Errorf( + "tx doesn't fit the replacement rule, oldPriority: %v, newPriority: %v, oldTx: %v, newTx: %v", + oldScore.priority, + priority, + senderIndex.Get(key).Value.(sdk.Tx), + tx, + ) + } + + mp.priorityIndex.Remove(txMeta{ + nonce: nonce, + sender: sender, + priority: oldScore.priority, + weight: oldScore.weight, + }) + mp.priorityCounts[oldScore.priority]-- + } + + mp.priorityCounts[priority]++ + + // Since senderIndex is scored by nonce, a changed priority will overwrite the + // existing key. + key.senderElement = senderIndex.Set(key, tx) + + mp.scores[sk] = txMeta{priority: priority} + mp.priorityIndex.Set(key, tx) + + return nil +} + +func (i *PriorityNonceIterator) iteratePriority() mempool.Iterator { + // beginning of priority iteration + if i.priorityNode == nil { + i.priorityNode = i.mempool.priorityIndex.Front() + } else { + i.priorityNode = i.priorityNode.Next() + } + + // end of priority iteration + if i.priorityNode == nil { + return nil + } + + i.sender = i.priorityNode.Key().(txMeta).sender + + nextPriorityNode := i.priorityNode.Next() + if nextPriorityNode != nil { + i.nextPriority = nextPriorityNode.Key().(txMeta).priority + } else { + i.nextPriority = math.MinInt64 + } + + return i.Next() +} + +func (i *PriorityNonceIterator) Next() mempool.Iterator { + if i.priorityNode == nil { + return nil + } + + cursor, ok := i.senderCursors[i.sender] + if !ok { + // beginning of sender iteration + cursor = i.mempool.senderIndices[i.sender].Front() + } else { + // middle of sender iteration + cursor = cursor.Next() + } + + // end of sender iteration + if cursor == nil { + return i.iteratePriority() + } + + key := cursor.Key().(txMeta) + + // We've reached a transaction with a priority lower than the next highest + // priority in the pool. + if key.priority < i.nextPriority { + return i.iteratePriority() + } else if key.priority == i.nextPriority && i.priorityNode.Next() != nil { + // Weight is incorporated into the priority index key only (not sender index) + // so we must fetch it here from the scores map. + weight := i.mempool.scores[txMeta{nonce: key.nonce, sender: key.sender}].weight + if weight < i.priorityNode.Next().Key().(txMeta).weight { + return i.iteratePriority() + } + } + + i.senderCursors[i.sender] = cursor + return i +} + +func (i *PriorityNonceIterator) Tx() sdk.Tx { + return i.senderCursors[i.sender].Value.(sdk.Tx) +} + +// Select returns a set of transactions from the mempool, ordered by priority +// and sender-nonce in O(n) time. The passed in list of transactions are ignored. +// This is a readonly operation, the mempool is not modified. +// +// The maxBytes parameter defines the maximum number of bytes of transactions to +// return. +// +// NOTE: It is not safe to use this iterator while removing transactions from +// the underlying mempool. +func (mp *PriorityNonceMempool) Select(_ context.Context, _ [][]byte) mempool.Iterator { + if mp.priorityIndex.Len() == 0 { + return nil + } + + mp.reorderPriorityTies() + + iterator := &PriorityNonceIterator{ + mempool: mp, + senderCursors: make(map[string]*skiplist.Element), + } + + return iterator.iteratePriority() +} + +type reorderKey struct { + deleteKey txMeta + insertKey txMeta + tx sdk.Tx +} + +func (mp *PriorityNonceMempool) reorderPriorityTies() { + node := mp.priorityIndex.Front() + + var reordering []reorderKey + for node != nil { + key := node.Key().(txMeta) + if mp.priorityCounts[key.priority] > 1 { + newKey := key + newKey.weight = senderWeight(key.senderElement) + reordering = append(reordering, reorderKey{deleteKey: key, insertKey: newKey, tx: node.Value.(sdk.Tx)}) + } + + node = node.Next() + } + + for _, k := range reordering { + mp.priorityIndex.Remove(k.deleteKey) + delete(mp.scores, txMeta{nonce: k.deleteKey.nonce, sender: k.deleteKey.sender}) + mp.priorityIndex.Set(k.insertKey, k.tx) + mp.scores[txMeta{nonce: k.insertKey.nonce, sender: k.insertKey.sender}] = k.insertKey + } +} + +// senderWeight returns the weight of a given tx (t) at senderCursor. Weight is +// defined as the first (nonce-wise) same sender tx with a priority not equal to +// t. It is used to resolve priority collisions, that is when 2 or more txs from +// different senders have the same priority. +func senderWeight(senderCursor *skiplist.Element) int64 { + if senderCursor == nil { + return 0 + } + + weight := senderCursor.Key().(txMeta).priority + senderCursor = senderCursor.Next() + for senderCursor != nil { + p := senderCursor.Key().(txMeta).priority + if p != weight { + weight = p + } + + senderCursor = senderCursor.Next() + } + + return weight +} + +// CountTx returns the number of transactions in the mempool. +func (mp *PriorityNonceMempool) CountTx() int { + return mp.priorityIndex.Len() +} + +// Remove removes a transaction from the mempool in O(log n) time, returning an +// error if unsuccessful. +func (mp *PriorityNonceMempool) Remove(tx sdk.Tx) error { + sendersWithNonce, err := GetSendersWithNonce(tx) + if err != nil { + return err + } + + sender := sendersWithNonce[0].Sender + nonce := sendersWithNonce[0].Nonce + + scoreKey := txMeta{nonce: nonce, sender: sender} + score, ok := mp.scores[scoreKey] + if !ok { + return mempool.ErrTxNotFound + } + tk := txMeta{nonce: nonce, priority: score.priority, sender: sender, weight: score.weight} + + senderTxs, ok := mp.senderIndices[sender] + if !ok { + return fmt.Errorf("sender %s not found", sender) + } + + mp.priorityIndex.Remove(tk) + senderTxs.Remove(tk) + delete(mp.scores, scoreKey) + mp.priorityCounts[score.priority]-- + + return nil +} + +func IsEmpty(mempool mempool.Mempool) error { + mp := mempool.(*PriorityNonceMempool) + if mp.priorityIndex.Len() != 0 { + return fmt.Errorf("priorityIndex not empty") + } + + var countKeys = make([]int64, 0, len(mp.priorityCounts)) + for k := range mp.priorityCounts { + countKeys = append(countKeys, k) + } + + for _, k := range countKeys { + if mp.priorityCounts[k] != 0 { + return fmt.Errorf("priorityCounts not zero at %v, got %v", k, mp.priorityCounts[k]) + } + } + + var senderKeys = make([]string, 0, len(mp.senderIndices)) + for k := range mp.senderIndices { + senderKeys = append(senderKeys, k) + } + + for _, k := range senderKeys { + if mp.senderIndices[k].Len() != 0 { + return fmt.Errorf("senderIndex not empty for sender %v", k) + } + } + + return nil +} diff --git a/app/mempool/senders_with_nonce.go b/app/mempool/senders_with_nonce.go new file mode 100644 index 0000000000..9ffc272f8c --- /dev/null +++ b/app/mempool/senders_with_nonce.go @@ -0,0 +1,75 @@ +// TODO: use with signer extractor once available https://github.com/zeta-chain/node/issues/2156 + +package mempool + +import ( + "fmt" + + sdk "github.com/cosmos/cosmos-sdk/types" + authante "github.com/cosmos/cosmos-sdk/x/auth/ante" + "github.com/cosmos/cosmos-sdk/x/auth/signing" + evmtypes "github.com/evmos/ethermint/x/evm/types" +) + +// GetSendersWithNonce is used to extract sender and nonce information txs +// if tx is ethermint, it is extracted using from and nonce field +// if it's cosmos tx, default cosmos way using signatures is used +func GetSendersWithNonce(tx sdk.Tx) ([]SenderWithNonce, error) { + const extensionOptionsEthereumTxTypeURL = "/ethermint.evm.v1.ExtensionOptionsEthereumTx" + if txWithExtensions, ok := tx.(authante.HasExtensionOptionsTx); ok { + opts := txWithExtensions.GetExtensionOptions() + if len(opts) > 0 && opts[0].GetTypeUrl() == extensionOptionsEthereumTxTypeURL { + return getSendersWithNonceEthermint(tx) + } + } + + return getSendersWithNonceCosmos(tx) +} + +// getSendersWithNonceEthermint gets senders and nonces from signatures in ethertmint txs +func getSendersWithNonceEthermint(tx sdk.Tx) ([]SenderWithNonce, error) { + for _, msg := range tx.GetMsgs() { + if ethMsg, ok := msg.(*evmtypes.MsgEthereumTx); ok { + return []SenderWithNonce{ + { + Sender: ethMsg.GetFrom().String(), + Nonce: ethMsg.AsTransaction().Nonce(), + }, + }, nil + } + } + return nil, fmt.Errorf("ethermint sender with nonce not found") +} + +type SenderWithNonce struct { + Sender string + Nonce uint64 +} + +// getSendersWithNonceCosmos gets senders and nonces from signatures in cosmos txs +func getSendersWithNonceCosmos(tx sdk.Tx) ([]SenderWithNonce, error) { + sendersWithNonce := []SenderWithNonce{} + + sigTx, ok := tx.(signing.SigVerifiableTx) + if !ok { + return nil, fmt.Errorf("tx of type %T does not implement SigVerifiableTx", tx) + } + + sigs, err := sigTx.GetSignaturesV2() + if err != nil { + return nil, err + } + + if len(sigs) == 0 { + return nil, fmt.Errorf("tx must have at least one signer") + } + + for _, sig := range sigs { + sendersWithNonce = append(sendersWithNonce, SenderWithNonce{ + Sender: sig.PubKey.Address().String(), + Nonce: sig.Sequence, + }) + } + + return sendersWithNonce, nil +} diff --git a/changelog.md b/changelog.md index 43dbe77311..98e37b4b94 100644 --- a/changelog.md +++ b/changelog.md @@ -13,6 +13,7 @@ * [2100](https://github.com/zeta-chain/node/pull/2100) - cosmos v0.47 upgrade * [2145](https://github.com/zeta-chain/node/pull/2145) - add `ibc` and `ibc-transfer` modules * [2135](https://github.com/zeta-chain/node/pull/2135) - add develop build version logic +* [2152](https://github.com/zeta-chain/node/pull/2152) - custom priority nonce mempool * [2113](https://github.com/zeta-chain/node/pull/2113) - add zetaclientd-supervisor process * [2154](https://github.com/zeta-chain/node/pull/2154) - add `ibccrosschain` module diff --git a/cmd/zetacored/root.go b/cmd/zetacored/root.go index d5adb514db..1a674e699f 100644 --- a/cmd/zetacored/root.go +++ b/cmd/zetacored/root.go @@ -8,7 +8,6 @@ import ( "github.com/cosmos/cosmos-sdk/baseapp" "github.com/cosmos/cosmos-sdk/client/snapshot" - "github.com/cosmos/cosmos-sdk/types/mempool" appparams "cosmossdk.io/simapp/params" tmcfg "github.com/cometbft/cometbft/config" @@ -40,6 +39,7 @@ import ( ethermintclient "github.com/evmos/ethermint/client" "github.com/spf13/cast" "github.com/spf13/cobra" + zetamempool "github.com/zeta-chain/zetacore/app/mempool" ) const EnvPrefix = "zetacore" @@ -233,7 +233,7 @@ func (ac appCreator) newApp( ) servertypes.Application { baseappOptions := server.DefaultBaseappOptions(appOpts) baseappOptions = append(baseappOptions, func(app *baseapp.BaseApp) { - app.SetMempool(mempool.NoOpMempool{}) + app.SetMempool(zetamempool.DefaultPriorityMempool()) }) skipUpgradeHeights := make(map[int64]bool) for _, h := range cast.ToIntSlice(appOpts.Get(server.FlagUnsafeSkipUpgrades)) {