Skip to content

Commit

Permalink
Ingress Filtering for Interop Enabled Mempool
Browse files Browse the repository at this point in the history
  • Loading branch information
axelKingsley committed Oct 29, 2024
1 parent 98b8443 commit 91583bf
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 10 deletions.
4 changes: 4 additions & 0 deletions core/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ var (
// current network configuration.
ErrTxTypeNotSupported = types.ErrTxTypeNotSupported

// ErrTxFilteredOut indicates an ingress filter has rejected the transaction from
// being included in the pool.
ErrTxFilteredOut = errors.New("transaction filtered out")

// ErrTipAboveFeeCap is a sanity error to ensure no one is able to specify a
// transaction with a tip higher than the total fee cap.
ErrTipAboveFeeCap = errors.New("max priority fee per gas higher than max fee per gas")
Expand Down
52 changes: 52 additions & 0 deletions core/txpool/ingress_filters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package txpool

import (
"context"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/types/interoptypes"
)

// IngressFilter is an interface that allows filtering of transactions before they are added to the transaction pool.
// Implementations of this interface can be used to filter transactions based on various criteria.
// FilterTx will return true if the transaction should be allowed, and false if it should be rejected.
type IngressFilter interface {
FilterTx(tx *types.Transaction) bool
}

type interopFilter struct {
logsFn func(tx *types.Transaction) ([]*types.Log, error)
checkFn func(ctx context.Context, ems []interoptypes.Message, safety interoptypes.SafetyLevel) error
}

func NewInteropFilter(
logsFn func(tx *types.Transaction) ([]*types.Log, error),
checkFn func(ctx context.Context, ems []interoptypes.Message, safety interoptypes.SafetyLevel) error) IngressFilter {
return &interopFilter{
logsFn: logsFn,
checkFn: checkFn,
}
}

// FilterTx implements IngressFilter.FilterTx
// it gets logs checks for message safety based on the function provided
func (f *interopFilter) FilterTx(tx *types.Transaction) bool {
logs, err := f.logsFn(tx)
if err != nil {
return true // default to allow if logs cannot be retrieved
}
if len(logs) == 0 {
return true // default to allow if there are no logs
}
ems, err := interoptypes.ExecutingMessagesFromLogs(logs)
if err != nil {
return true // default to allow if logs cannot be parsed
}
if len(ems) == 0 {
return true // default to allow if there are no executing messages
}

// check with the supervisor if the transaction should be allowed given the executing messages
ctx := context.Background()
return f.checkFn(ctx, ems, interoptypes.Unsafe) == nil
}
70 changes: 70 additions & 0 deletions core/txpool/ingress_filters_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package txpool

import (
"context"
"errors"
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/types/interoptypes"
"github.com/stretchr/testify/require"
)

func TestInteropFilter(t *testing.T) {
t.Run("Tx has no logs", func(t *testing.T) {
logFn := func(tx *types.Transaction) ([]*types.Log, error) {
return []*types.Log{}, nil
}
checkFn := func(ctx context.Context, ems []interoptypes.Message, safety interoptypes.SafetyLevel) error {
// make this return error, but it won't be called because logs are empty
return errors.New("error")
}
// when there are no logs to process, the transaction should be allowed
filter := NewInteropFilter(logFn, checkFn)
require.True(t, filter.FilterTx(&types.Transaction{}))
})
t.Run("Tx errored when getting logs", func(t *testing.T) {
logFn := func(tx *types.Transaction) ([]*types.Log, error) {
return []*types.Log{}, errors.New("error")
}
checkFn := func(ctx context.Context, ems []interoptypes.Message, safety interoptypes.SafetyLevel) error {
// make this return error, but it won't be called because logs retrieval errored
return errors.New("error")
}
// when log retrieval errors, the transaction should be allowed
filter := NewInteropFilter(logFn, checkFn)
require.True(t, filter.FilterTx(&types.Transaction{}))
})
t.Run("Tx has no executing messages", func(t *testing.T) {
logFn := func(tx *types.Transaction) ([]*types.Log, error) {
l1 := &types.Log{
Topics: []common.Hash{common.BytesToHash([]byte("topic1"))},
}
return []*types.Log{l1}, errors.New("error")
}
checkFn := func(ctx context.Context, ems []interoptypes.Message, safety interoptypes.SafetyLevel) error {
// make this return error, but it won't be called because logs retrieval doesn't have executing messages
return errors.New("error")
}
// when no executing messages are included, the transaction should be allowed
filter := NewInteropFilter(logFn, checkFn)
require.True(t, filter.FilterTx(&types.Transaction{}))
})
t.Run("Tx has valid executing message", func(t *testing.T) {
logFn := func(tx *types.Transaction) ([]*types.Log, error) {
// TODO: make executing messages here
l1 := &types.Log{
Topics: []common.Hash{common.BytesToHash([]byte("topic1"))},
}
return []*types.Log{l1}, errors.New("error")
}
checkFn := func(ctx context.Context, ems []interoptypes.Message, safety interoptypes.SafetyLevel) error {
// make this return error, but it won't be called because logs retrieval doesn't have executing messages
return nil
}
// when no executing messages are included, the transaction should be allowed
filter := NewInteropFilter(logFn, checkFn)
require.True(t, filter.FilterTx(&types.Transaction{}))
})
}
32 changes: 26 additions & 6 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,25 @@ type TxPool struct {
term chan struct{} // Termination channel to detect a closed pool

sync chan chan error // Testing / simulator channel to block until internal reset is done

ingressFilters []IngressFilter // List of filters to apply to incoming transactions
}

// New creates a new transaction pool to gather, sort and filter inbound
// transactions from the network.
func New(gasTip uint64, chain BlockChain, subpools []SubPool) (*TxPool, error) {
func New(gasTip uint64, chain BlockChain, subpools []SubPool, poolFilters []IngressFilter) (*TxPool, error) {
// Retrieve the current head so that all subpools and this main coordinator
// pool will have the same starting state, even if the chain moves forward
// during initialization.
head := chain.CurrentBlock()

pool := &TxPool{
subpools: subpools,
reservations: make(map[common.Address]SubPool),
quit: make(chan chan error),
term: make(chan struct{}),
sync: make(chan chan error),
subpools: subpools,
reservations: make(map[common.Address]SubPool),
quit: make(chan chan error),
term: make(chan struct{}),
sync: make(chan chan error),
ingressFilters: poolFilters,
}
for i, subpool := range subpools {
if err := subpool.Init(gasTip, head, pool.reserver(i, subpool)); err != nil {
Expand Down Expand Up @@ -319,11 +322,23 @@ func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
// so we can piece back the returned errors into the original order.
txsets := make([][]*types.Transaction, len(p.subpools))
splits := make([]int, len(txs))
filtered_out := make([]bool, len(txs))

for i, tx := range txs {
// Mark this transaction belonging to no-subpool
splits[i] = -1

// Filter the transaction through the ingress filters
for _, f := range p.ingressFilters {
if !f.FilterTx(tx) {
filtered_out[i] = true
}
}
// if the transaction is filtered out, don't add it to any subpool
if filtered_out[i] {
continue
}

// Try to find a subpool that accepts the transaction
for j, subpool := range p.subpools {
if subpool.Filter(tx) {
Expand All @@ -341,6 +356,11 @@ func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
}
errs := make([]error, len(txs))
for i, split := range splits {
// If the transaction was filtered out, mark it as such
if filtered_out[i] {
errs[i] = core.ErrTxFilteredOut
continue
}
// If the transaction was rejected by all subpools, mark it unsupported
if split == -1 {
errs[i] = core.ErrTxTypeNotSupported
Expand Down
56 changes: 55 additions & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,13 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
blobPool := blobpool.New(config.BlobPool, eth.blockchain)
txPools = append(txPools, blobPool)
}
eth.txPool, err = txpool.New(config.TxPool.PriceLimit, eth.blockchain, txPools)
// if interop is enabled, establish an Interop Filter connected to this Ethereum instance's
// simulated logs and message safety check functions
poolFilters := []txpool.IngressFilter{}
if config.InteropMessageRPC != "" {
poolFilters = append(poolFilters, txpool.NewInteropFilter(eth.SimLogs, eth.CheckMessages))
}
eth.txPool, err = txpool.New(config.TxPool.PriceLimit, eth.blockchain, txPools, poolFilters)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -567,3 +573,51 @@ func (s *Ethereum) CheckMessages(ctx context.Context, messages []interoptypes.Me
}
return s.interopRPC.CheckMessages(ctx, messages, minSafety)
}

// simLogs simulates the logs that would be generated by a transaction if it were executed on the current state.
// This is used by the interop filter to determine if a transaction should be allowed.
// if errors are encountered, no logs are returned.
func (s *Ethereum) SimLogs(tx *types.Transaction) ([]*types.Log, error) {
// prepare all necessary resources for the transaction simulation
state, err := s.BlockChain().State()
header := s.BlockChain().CurrentBlock()
if err != nil {
return nil, err
}
var vmConf vm.Config
chainConfig := s.APIBackend.ChainConfig()
signer := types.MakeSigner(chainConfig, header.Number, header.Time)
message, err := core.TransactionToMessage(tx, signer, header.BaseFee)
if err != nil {
return nil, err
}
ctx := context.Background()
chainCtx := ethapi.NewChainContext(ctx, s.APIBackend)
blockCtx := core.NewEVMBlockContext(
header,
chainCtx,
nil,
chainConfig,
state)
txCtx := core.NewEVMTxContext(message)
vmenv := vm.NewEVM(
blockCtx,
txCtx,
state,
chainConfig,
vmConf,
)

// prepare the state and apply the transaction to it
state.SetTxContext(tx.Hash(), 0)
_, err = core.ApplyMessage(
vmenv,
message,
new(core.GasPool).AddGas(message.GasLimit))
if err != nil {
return nil, err
}

return state.GetLogs(tx.Hash(), header.Number.Uint64(), header.Hash()), nil

}
2 changes: 1 addition & 1 deletion eth/protocols/eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func newTestBackendWithGenerator(blocks int, shanghai bool, generator func(int,
txconfig.Journal = "" // Don't litter the disk with test journals

pool := legacypool.New(txconfig, chain)
txpool, _ := txpool.New(txconfig.PriceLimit, chain, []txpool.SubPool{pool})
txpool, _ := txpool.New(txconfig.PriceLimit, chain, []txpool.SubPool{pool}, nil)

return &testBackend{
db: db,
Expand Down
2 changes: 1 addition & 1 deletion miner/miner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func createMiner(t *testing.T) *Miner {
blockchain := &testBlockChain{bc.Genesis().Root(), chainConfig, statedb, 10000000, new(event.Feed)}

pool := legacypool.New(testTxPoolConfig, blockchain)
txpool, _ := txpool.New(testTxPoolConfig.PriceLimit, blockchain, []txpool.SubPool{pool})
txpool, _ := txpool.New(testTxPoolConfig.PriceLimit, blockchain, []txpool.SubPool{pool}, nil)

// Create Miner
backend := NewMockBackend(bc, txpool)
Expand Down
2 changes: 1 addition & 1 deletion miner/payload_building_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine
t.Fatalf("core.NewBlockChain failed: %v", err)
}
pool := legacypool.New(testTxPoolConfig, chain)
txpool, _ := txpool.New(testTxPoolConfig.PriceLimit, chain, []txpool.SubPool{pool})
txpool, _ := txpool.New(testTxPoolConfig.PriceLimit, chain, []txpool.SubPool{pool}, nil)

return &testWorkerBackend{
db: db,
Expand Down

0 comments on commit 91583bf

Please sign in to comment.