diff --git a/.circleci/config.yml b/.circleci/config.yml index 8789560333..d7a6545148 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -4,6 +4,11 @@ orbs: gcp-cli: circleci/gcp-cli@3.0.1 slack: circleci/slack@4.10.1 +parameters: + go_version: + type: string + default: 1.22.7 # update CI Go version here + commands: gcp-oidc-authenticate: description: "Authenticate with GCP using a CircleCI OIDC token." @@ -134,7 +139,7 @@ jobs: build-geth: docker: - - image: cimg/go:1.21 + - image: cimg/go:<> resource_class: xlarge steps: - checkout @@ -143,7 +148,7 @@ jobs: unit-test: resource_class: xlarge docker: - - image: cimg/go:1.21 + - image: cimg/go:<> steps: - checkout - run: @@ -151,7 +156,7 @@ jobs: lint-geth: resource_class: medium docker: - - image: cimg/go:1.21 + - image: cimg/go:<> steps: - checkout - run: @@ -159,14 +164,14 @@ jobs: tidy-geth: resource_class: small docker: - - image: cimg/go:1.21 + - image: cimg/go:<> steps: - checkout - run: command: go mod tidy && git diff --exit-code check-releases: docker: - - image: cimg/go:1.21 + - image: cimg/go:<> steps: - checkout - run: diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 2020df3991..30c7df3b84 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -159,6 +159,8 @@ var ( utils.RollupSequencerTxConditionalCostRateLimitFlag, utils.RollupHistoricalRPCFlag, utils.RollupHistoricalRPCTimeoutFlag, + utils.RollupInteropRPCFlag, + utils.RollupInteropMempoolFilteringFlag, utils.RollupDisableTxPoolGossipFlag, utils.RollupComputePendingBlock, utils.RollupHaltOnIncompatibleProtocolVersionFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 026bcf7e62..8eda534783 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -939,6 +939,18 @@ var ( Category: flags.RollupCategory, } + RollupInteropRPCFlag = &cli.StringFlag{ + Name: "rollup.interoprpc", + Usage: "RPC endpoint for interop message verification (experimental).", + Category: flags.RollupCategory, + } + + RollupInteropMempoolFilteringFlag = &cli.BoolFlag{ + Name: "rollup.interopmempoolfiltering", + Usage: "If using interop, transactions are checked for interop validity before being added to the mempool (experimental).", + Category: flags.RollupCategory, + } + RollupDisableTxPoolGossipFlag = &cli.BoolFlag{ Name: "rollup.disabletxpoolgossip", Usage: "Disable transaction pool gossip.", @@ -1941,6 +1953,12 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { if ctx.IsSet(RollupHistoricalRPCTimeoutFlag.Name) { cfg.RollupHistoricalRPCTimeout = ctx.Duration(RollupHistoricalRPCTimeoutFlag.Name) } + if ctx.IsSet(RollupInteropRPCFlag.Name) { + cfg.InteropMessageRPC = ctx.String(RollupInteropRPCFlag.Name) + } + if ctx.IsSet(RollupInteropMempoolFilteringFlag.Name) { + cfg.InteropMempoolFiltering = ctx.Bool(RollupInteropMempoolFilteringFlag.Name) + } cfg.RollupDisableTxPoolGossip = ctx.Bool(RollupDisableTxPoolGossipFlag.Name) cfg.RollupDisableTxPoolAdmission = cfg.RollupSequencerHTTP != "" && !ctx.Bool(RollupEnableTxPoolAdmissionFlag.Name) cfg.RollupHaltOnIncompatibleProtocolVersion = ctx.String(RollupHaltOnIncompatibleProtocolVersionFlag.Name) diff --git a/core/error.go b/core/error.go index e6ad999bdd..b8b00121eb 100644 --- a/core/error.go +++ b/core/error.go @@ -84,6 +84,10 @@ var ( // current network configuration. ErrTxTypeNotSupported = types.ErrTxTypeNotSupported + // ErrTxFilteredOut indicates an ingress filter has rejected the transaction from + // being included in the pool. + ErrTxFilteredOut = errors.New("transaction filtered out") + // ErrTipAboveFeeCap is a sanity error to ensure no one is able to specify a // transaction with a tip higher than the total fee cap. ErrTipAboveFeeCap = errors.New("max priority fee per gas higher than max fee per gas") diff --git a/core/state_processor.go b/core/state_processor.go index 5b41e7fa5a..a1042cf868 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -210,10 +210,21 @@ func MakeReceipt(evm *vm.EVM, result *ExecutionResult, statedb *state.StateDB, b // for the transaction, gas used and an error if the transaction failed, // indicating the block was invalid. func ApplyTransaction(config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, usedGas *uint64, cfg vm.Config) (*types.Receipt, error) { + return ApplyTransactionExtended(config, bc, author, gp, statedb, header, tx, usedGas, cfg, nil) +} + +type ApplyTransactionOpts struct { + PostValidation func(evm *vm.EVM, result *ExecutionResult) error +} + +func ApplyTransactionExtended(config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, usedGas *uint64, cfg vm.Config, extraOpts *ApplyTransactionOpts) (*types.Receipt, error) { msg, err := TransactionToMessage(tx, types.MakeSigner(config, header.Number, header.Time), header.BaseFee) if err != nil { return nil, err } + if extraOpts != nil { + msg.PostValidation = extraOpts.PostValidation + } // Create a new context to be used in the EVM environment blockContext := NewEVMBlockContext(header, bc, author, config, statedb) txContext := NewEVMTxContext(msg) diff --git a/core/state_transition.go b/core/state_transition.go index ce8d5aea94..2de1cad762 100644 --- a/core/state_transition.go +++ b/core/state_transition.go @@ -154,6 +154,8 @@ type Message struct { IsDepositTx bool // IsDepositTx indicates the message is force-included and can persist a mint. Mint *big.Int // Mint is the amount to mint before EVM processing, or nil if there is no minting. RollupCostData types.RollupCostData // RollupCostData caches data to compute the fee we charge for data availability + + PostValidation func(evm *vm.EVM, result *ExecutionResult) error } // TransactionToMessage converts a transaction into a Message. @@ -447,6 +449,13 @@ func (st *StateTransition) TransitionDb() (*ExecutionResult, error) { } err = nil } + + if st.msg.PostValidation != nil { + if err := st.msg.PostValidation(st.evm, result); err != nil { + return nil, err + } + } + return result, err } diff --git a/core/txpool/ingress_filters.go b/core/txpool/ingress_filters.go new file mode 100644 index 0000000000..317585f7a8 --- /dev/null +++ b/core/txpool/ingress_filters.go @@ -0,0 +1,57 @@ +package txpool + +import ( + "context" + "time" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/types/interoptypes" + "github.com/ethereum/go-ethereum/log" +) + +// IngressFilter is an interface that allows filtering of transactions before they are added to the transaction pool. +// Implementations of this interface can be used to filter transactions based on various criteria. +// FilterTx will return true if the transaction should be allowed, and false if it should be rejected. +type IngressFilter interface { + FilterTx(ctx context.Context, tx *types.Transaction) bool +} + +type interopFilter struct { + logsFn func(tx *types.Transaction) ([]*types.Log, error) + checkFn func(ctx context.Context, ems []interoptypes.Message, safety interoptypes.SafetyLevel) error +} + +func NewInteropFilter( + logsFn func(tx *types.Transaction) ([]*types.Log, error), + checkFn func(ctx context.Context, ems []interoptypes.Message, safety interoptypes.SafetyLevel) error) IngressFilter { + return &interopFilter{ + logsFn: logsFn, + checkFn: checkFn, + } +} + +// FilterTx implements IngressFilter.FilterTx +// it gets logs checks for message safety based on the function provided +func (f *interopFilter) FilterTx(ctx context.Context, tx *types.Transaction) bool { + logs, err := f.logsFn(tx) + if err != nil { + log.Debug("Failed to retrieve logs of tx", "txHash", tx.Hash(), "err", err) + return false // default to deny if logs cannot be retrieved + } + if len(logs) == 0 { + return true // default to allow if there are no logs + } + ems, err := interoptypes.ExecutingMessagesFromLogs(logs) + if err != nil { + log.Debug("Failed to parse executing messages of tx", "txHash", tx.Hash(), "err", err) + return false // default to deny if logs cannot be parsed + } + if len(ems) == 0 { + return true // default to allow if there are no executing messages + } + + ctx, cancel := context.WithTimeout(ctx, time.Second*2) + defer cancel() + // check with the supervisor if the transaction should be allowed given the executing messages + return f.checkFn(ctx, ems, interoptypes.Unsafe) == nil +} diff --git a/core/txpool/ingress_filters_test.go b/core/txpool/ingress_filters_test.go new file mode 100644 index 0000000000..b2fcfd9018 --- /dev/null +++ b/core/txpool/ingress_filters_test.go @@ -0,0 +1,188 @@ +package txpool + +import ( + "context" + "errors" + "math/big" + "net" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/types/interoptypes" + "github.com/ethereum/go-ethereum/params" + "github.com/stretchr/testify/require" +) + +func TestInteropFilter(t *testing.T) { + // some placeholder transaction to test with + tx := types.NewTx(&types.DynamicFeeTx{ + ChainID: big.NewInt(1), + Nonce: 1, + To: &common.Address{}, + Value: big.NewInt(1), + Data: []byte{}, + }) + t.Run("Tx has no logs", func(t *testing.T) { + logFn := func(tx *types.Transaction) ([]*types.Log, error) { + return []*types.Log{}, nil + } + checkFn := func(ctx context.Context, ems []interoptypes.Message, safety interoptypes.SafetyLevel) error { + // make this return error, but it won't be called because logs are empty + return errors.New("error") + } + // when there are no logs to process, the transaction should be allowed + filter := NewInteropFilter(logFn, checkFn) + require.True(t, filter.FilterTx(context.Background(), tx)) + }) + t.Run("Tx errored when getting logs", func(t *testing.T) { + logFn := func(tx *types.Transaction) ([]*types.Log, error) { + return []*types.Log{}, errors.New("error") + } + checkFn := func(ctx context.Context, ems []interoptypes.Message, safety interoptypes.SafetyLevel) error { + // make this return error, but it won't be called because logs retrieval errored + return errors.New("error") + } + // when log retrieval errors, the transaction should be denied + filter := NewInteropFilter(logFn, checkFn) + require.False(t, filter.FilterTx(context.Background(), tx)) + }) + t.Run("Tx has no executing messages", func(t *testing.T) { + logFn := func(tx *types.Transaction) ([]*types.Log, error) { + l1 := &types.Log{ + Topics: []common.Hash{common.BytesToHash([]byte("topic1"))}, + } + return []*types.Log{l1}, nil + } + checkFn := func(ctx context.Context, ems []interoptypes.Message, safety interoptypes.SafetyLevel) error { + // make this return error, but it won't be called because logs retrieval doesn't have executing messages + return errors.New("error") + } + // when no executing messages are included, the transaction should be allowed + filter := NewInteropFilter(logFn, checkFn) + require.True(t, filter.FilterTx(context.Background(), tx)) + }) + t.Run("Tx has valid executing message", func(t *testing.T) { + // build a basic executing message + // the executing message must pass basic decode validation, + // but the validity check is done by the checkFn + l1 := &types.Log{ + Address: params.InteropCrossL2InboxAddress, + Topics: []common.Hash{ + common.BytesToHash(interoptypes.ExecutingMessageEventTopic[:]), + common.BytesToHash([]byte("payloadHash")), + }, + Data: []byte{}, + } + // using all 0s for data allows all takeZeros to pass + for i := 0; i < 32*5; i++ { + l1.Data = append(l1.Data, 0) + } + logFn := func(tx *types.Transaction) ([]*types.Log, error) { + return []*types.Log{l1}, nil + } + var spyEMs []interoptypes.Message + checkFn := func(ctx context.Context, ems []interoptypes.Message, safety interoptypes.SafetyLevel) error { + spyEMs = ems + return nil + } + // when there is one executing message, the transaction should be allowed + // if the checkFn returns nil + filter := NewInteropFilter(logFn, checkFn) + require.True(t, filter.FilterTx(context.Background(), tx)) + // confirm that one executing message was passed to the checkFn + require.Equal(t, 1, len(spyEMs)) + }) + t.Run("Tx has invalid executing message", func(t *testing.T) { + // build a basic executing message + // the executing message must pass basic decode validation, + // but the validity check is done by the checkFn + l1 := &types.Log{ + Address: params.InteropCrossL2InboxAddress, + Topics: []common.Hash{ + common.BytesToHash(interoptypes.ExecutingMessageEventTopic[:]), + common.BytesToHash([]byte("payloadHash")), + }, + Data: []byte{}, + } + // using all 0s for data allows all takeZeros to pass + for i := 0; i < 32*5; i++ { + l1.Data = append(l1.Data, 0) + } + logFn := func(tx *types.Transaction) ([]*types.Log, error) { + return []*types.Log{l1}, nil + } + var spyEMs []interoptypes.Message + checkFn := func(ctx context.Context, ems []interoptypes.Message, safety interoptypes.SafetyLevel) error { + spyEMs = ems + return errors.New("error") + } + // when there is one executing message, and the checkFn returns an error, + // (ie the supervisor rejects the transaction) the transaction should be denied + filter := NewInteropFilter(logFn, checkFn) + require.False(t, filter.FilterTx(context.Background(), tx)) + // confirm that one executing message was passed to the checkFn + require.Equal(t, 1, len(spyEMs)) + }) +} + +func TestInteropFilterRPCFailures(t *testing.T) { + tests := []struct { + name string + networkErr bool + timeout bool + invalidResp bool + }{ + { + name: "Network Error", + networkErr: true, + }, + { + name: "Timeout", + timeout: true, + }, + { + name: "Invalid Response", + invalidResp: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create mock log function that always returns our test log + logFn := func(tx *types.Transaction) ([]*types.Log, error) { + log := &types.Log{ + Address: params.InteropCrossL2InboxAddress, + Topics: []common.Hash{ + common.BytesToHash(interoptypes.ExecutingMessageEventTopic[:]), + common.BytesToHash([]byte("payloadHash")), + }, + Data: make([]byte, 32*5), + } + return []*types.Log{log}, nil + } + + // Create mock check function that simulates RPC failures + checkFn := func(ctx context.Context, ems []interoptypes.Message, safety interoptypes.SafetyLevel) error { + if tt.networkErr { + return &net.OpError{Op: "dial", Err: errors.New("connection refused")} + } + + if tt.timeout { + return context.DeadlineExceeded + } + + if tt.invalidResp { + return errors.New("invalid response format") + } + + return nil + } + + // Create and test filter + filter := NewInteropFilter(logFn, checkFn) + result := filter.FilterTx(context.Background(), &types.Transaction{}) + require.Equal(t, false, result, "FilterTx result mismatch") + }) + } +} diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 3f30965a48..2579104e5a 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -570,8 +570,7 @@ func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address] } if filter.MaxDATxSize != nil && !pool.locals.contains(addr) { for i, tx := range txs { - estimate := types.EstimatedL1SizeScaled(tx.RollupCostData()) - estimate = estimate.Div(estimate, big.NewInt(1e6)) + estimate := tx.RollupCostData().EstimatedDASize() if estimate.Cmp(filter.MaxDATxSize) > 0 { log.Debug("filtering tx that exceeds max da tx size", "hash", tx.Hash(), "txda", estimate, "dalimit", filter.MaxDATxSize) @@ -583,8 +582,7 @@ func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address] if len(txs) > 0 { lazies := make([]*txpool.LazyTransaction, len(txs)) for i := 0; i < len(txs); i++ { - daBytes := types.EstimatedL1SizeScaled(txs[i].RollupCostData()) - daBytes = daBytes.Div(daBytes, big.NewInt(1e6)) + daBytes := txs[i].RollupCostData().EstimatedDASize() lazies[i] = &txpool.LazyTransaction{ Pool: pool, Hash: txs[i].Hash(), diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index ff31ffc200..db4ee13ee4 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -17,6 +17,7 @@ package txpool import ( + "context" "errors" "fmt" "math/big" @@ -77,22 +78,32 @@ type TxPool struct { term chan struct{} // Termination channel to detect a closed pool sync chan chan error // Testing / simulator channel to block until internal reset is done + + ingressFilters []IngressFilter // List of filters to apply to incoming transactions + + filterCtx context.Context // Filters may use external resources + filterCancel context.CancelFunc // Filter calls are cancelled on shutdown } // New creates a new transaction pool to gather, sort and filter inbound // transactions from the network. -func New(gasTip uint64, chain BlockChain, subpools []SubPool) (*TxPool, error) { +func New(gasTip uint64, chain BlockChain, subpools []SubPool, poolFilters []IngressFilter) (*TxPool, error) { // Retrieve the current head so that all subpools and this main coordinator // pool will have the same starting state, even if the chain moves forward // during initialization. head := chain.CurrentBlock() + filterCtx, filterCancel := context.WithCancel(context.Background()) + pool := &TxPool{ - subpools: subpools, - reservations: make(map[common.Address]SubPool), - quit: make(chan chan error), - term: make(chan struct{}), - sync: make(chan chan error), + subpools: subpools, + reservations: make(map[common.Address]SubPool), + quit: make(chan chan error), + term: make(chan struct{}), + sync: make(chan chan error), + ingressFilters: poolFilters, + filterCtx: filterCtx, + filterCancel: filterCancel, } for i, subpool := range subpools { if err := subpool.Init(gasTip, head, pool.reserver(i, subpool)); err != nil { @@ -156,6 +167,8 @@ func (p *TxPool) reserver(id int, subpool SubPool) AddressReserver { func (p *TxPool) Close() error { var errs []error + p.filterCancel() // Cancel filter work, these in-flight txs will be not be allowed through before shutdown + // Terminate the reset loop and wait for it to finish errc := make(chan error) p.quit <- errc @@ -319,11 +332,23 @@ func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error { // so we can piece back the returned errors into the original order. txsets := make([][]*types.Transaction, len(p.subpools)) splits := make([]int, len(txs)) + filtered_out := make([]bool, len(txs)) for i, tx := range txs { // Mark this transaction belonging to no-subpool splits[i] = -1 + // Filter the transaction through the ingress filters + for _, f := range p.ingressFilters { + if !f.FilterTx(p.filterCtx, tx) { + filtered_out[i] = true + } + } + // if the transaction is filtered out, don't add it to any subpool + if filtered_out[i] { + continue + } + // Try to find a subpool that accepts the transaction for j, subpool := range p.subpools { if subpool.Filter(tx) { @@ -341,6 +366,11 @@ func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error { } errs := make([]error, len(txs)) for i, split := range splits { + // If the transaction was filtered out, mark it as such + if filtered_out[i] { + errs[i] = core.ErrTxFilteredOut + continue + } // If the transaction was rejected by all subpools, mark it unsupported if split == -1 { errs[i] = core.ErrTxTypeNotSupported diff --git a/core/types/interoptypes/interop.go b/core/types/interoptypes/interop.go new file mode 100644 index 0000000000..742ecefdcd --- /dev/null +++ b/core/types/interoptypes/interop.go @@ -0,0 +1,169 @@ +package interoptypes + +import ( + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "math" + + "github.com/holiman/uint256" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/params" +) + +var ExecutingMessageEventTopic = crypto.Keccak256Hash([]byte("ExecutingMessage(bytes32,(address,uint256,uint256,uint256,uint256))")) + +type Message struct { + Identifier Identifier `json:"identifier"` + PayloadHash common.Hash `json:"payloadHash"` +} + +func (m *Message) DecodeEvent(topics []common.Hash, data []byte) error { + if len(topics) != 2 { // event hash, indexed payloadHash + return fmt.Errorf("unexpected number of event topics: %d", len(topics)) + } + if topics[0] != ExecutingMessageEventTopic { + return fmt.Errorf("unexpected event topic %q", topics[0]) + } + if len(data) != 32*5 { + return fmt.Errorf("unexpected identifier data length: %d", len(data)) + } + take := func(length uint) []byte { + taken := data[:length] + data = data[length:] + return taken + } + takeZeroes := func(length uint) error { + for _, v := range take(length) { + if v != 0 { + return errors.New("expected zero") + } + } + return nil + } + if err := takeZeroes(12); err != nil { + return fmt.Errorf("invalid address padding: %w", err) + } + m.Identifier.Origin = common.Address(take(20)) + if err := takeZeroes(32 - 8); err != nil { + return fmt.Errorf("invalid block number padding: %w", err) + } + m.Identifier.BlockNumber = binary.BigEndian.Uint64(take(8)) + if err := takeZeroes(32 - 4); err != nil { + return fmt.Errorf("invalid log index padding: %w", err) + } + m.Identifier.LogIndex = binary.BigEndian.Uint32(take(4)) + if err := takeZeroes(32 - 8); err != nil { + return fmt.Errorf("invalid timestamp padding: %w", err) + } + m.Identifier.Timestamp = binary.BigEndian.Uint64(take(8)) + m.Identifier.ChainID.SetBytes32(take(32)) + m.PayloadHash = topics[1] + return nil +} + +func ExecutingMessagesFromLogs(logs []*types.Log) ([]Message, error) { + var executingMessages []Message + for i, l := range logs { + if l.Address == params.InteropCrossL2InboxAddress { + // ignore events that do not match this + if len(l.Topics) == 0 || l.Topics[0] != ExecutingMessageEventTopic { + continue + } + var msg Message + if err := msg.DecodeEvent(l.Topics, l.Data); err != nil { + return nil, fmt.Errorf("invalid executing message %d, tx-log %d: %w", len(executingMessages), i, err) + } + executingMessages = append(executingMessages, msg) + } + } + return executingMessages, nil +} + +type Identifier struct { + Origin common.Address + BlockNumber uint64 + LogIndex uint32 + Timestamp uint64 + ChainID uint256.Int // flat, not a pointer, to make Identifier safe as map key +} + +type identifierMarshaling struct { + Origin common.Address `json:"origin"` + BlockNumber hexutil.Uint64 `json:"blockNumber"` + LogIndex hexutil.Uint64 `json:"logIndex"` + Timestamp hexutil.Uint64 `json:"timestamp"` + ChainID hexutil.U256 `json:"chainID"` +} + +func (id Identifier) MarshalJSON() ([]byte, error) { + var enc identifierMarshaling + enc.Origin = id.Origin + enc.BlockNumber = hexutil.Uint64(id.BlockNumber) + enc.LogIndex = hexutil.Uint64(id.LogIndex) + enc.Timestamp = hexutil.Uint64(id.Timestamp) + enc.ChainID = (hexutil.U256)(id.ChainID) + return json.Marshal(&enc) +} + +func (id *Identifier) UnmarshalJSON(input []byte) error { + var dec identifierMarshaling + if err := json.Unmarshal(input, &dec); err != nil { + return err + } + id.Origin = dec.Origin + id.BlockNumber = uint64(dec.BlockNumber) + if dec.LogIndex > math.MaxUint32 { + return fmt.Errorf("log index too large: %d", dec.LogIndex) + } + id.LogIndex = uint32(dec.LogIndex) + id.Timestamp = uint64(dec.Timestamp) + id.ChainID = (uint256.Int)(dec.ChainID) + return nil +} + +type SafetyLevel string + +func (lvl SafetyLevel) String() string { + return string(lvl) +} + +// Valid returns if the safety level is a well-formatted safety level. +func (lvl SafetyLevel) wellFormatted() bool { + switch lvl { + case Finalized, Safe, LocalSafe, CrossUnsafe, Unsafe, Invalid: + return true + default: + return false + } +} + +func (lvl SafetyLevel) MarshalText() ([]byte, error) { + return []byte(lvl), nil +} + +func (lvl *SafetyLevel) UnmarshalText(text []byte) error { + if lvl == nil { + return errors.New("cannot unmarshal into nil SafetyLevel") + } + x := SafetyLevel(text) + if !x.wellFormatted() { + return fmt.Errorf("unrecognized safety level: %q", text) + } + *lvl = x + return nil +} + +const ( + Finalized SafetyLevel = "finalized" + Safe SafetyLevel = "safe" + LocalSafe SafetyLevel = "local-safe" + CrossUnsafe SafetyLevel = "cross-unsafe" + Unsafe SafetyLevel = "unsafe" + Invalid SafetyLevel = "invalid" +) diff --git a/core/types/interoptypes/interop_test.go b/core/types/interoptypes/interop_test.go new file mode 100644 index 0000000000..5bb303fecf --- /dev/null +++ b/core/types/interoptypes/interop_test.go @@ -0,0 +1,178 @@ +package interoptypes + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/params" +) + +func FuzzMessage_DecodeEvent(f *testing.F) { + f.Fuzz(func(t *testing.T, validEvTopic bool, numTopics uint8, data []byte) { + if len(data) < 32 { + return + } + if len(data) > 100_000 { + return + } + if validEvTopic { // valid even signature topic implies a topic to be there + numTopics += 1 + } + if numTopics > 4 { // There can be no more than 4 topics per log event + return + } + if int(numTopics)*32 > len(data) { + return + } + var topics []common.Hash + if validEvTopic { + topics = append(topics, ExecutingMessageEventTopic) + } + for i := 0; i < int(numTopics); i++ { + var topic common.Hash + copy(topic[:], data[:]) + data = data[32:] + } + require.NotPanics(t, func() { + var m Message + _ = m.DecodeEvent(topics, data) + }) + }) +} + +func TestSafetyLevel(t *testing.T) { + require.True(t, Invalid.wellFormatted()) + require.True(t, Unsafe.wellFormatted()) + require.True(t, CrossUnsafe.wellFormatted()) + require.True(t, LocalSafe.wellFormatted()) + require.True(t, Safe.wellFormatted()) + require.True(t, Finalized.wellFormatted()) + require.False(t, SafetyLevel("hello").wellFormatted()) + require.False(t, SafetyLevel("").wellFormatted()) +} + +func TestInteropMessageFormatEdgeCases(t *testing.T) { + tests := []struct { + name string + log *types.Log + expectedError string + }{ + { + name: "Empty Topics", + log: &types.Log{ + Address: params.InteropCrossL2InboxAddress, + Topics: []common.Hash{}, + Data: make([]byte, 32*5), + }, + expectedError: "unexpected number of event topics: 0", + }, + { + name: "Wrong Event Topic", + log: &types.Log{ + Address: params.InteropCrossL2InboxAddress, + Topics: []common.Hash{ + common.BytesToHash([]byte("wrong topic")), + common.BytesToHash([]byte("payloadHash")), + }, + Data: make([]byte, 32*5), + }, + expectedError: "unexpected event topic", + }, + { + name: "Missing PayloadHash Topic", + log: &types.Log{ + Address: params.InteropCrossL2InboxAddress, + Topics: []common.Hash{ + common.BytesToHash(ExecutingMessageEventTopic[:]), + }, + Data: make([]byte, 32*5), + }, + expectedError: "unexpected number of event topics: 1", + }, + { + name: "Too Many Topics", + log: &types.Log{ + Address: params.InteropCrossL2InboxAddress, + Topics: []common.Hash{ + common.BytesToHash(ExecutingMessageEventTopic[:]), + common.BytesToHash([]byte("payloadHash")), + common.BytesToHash([]byte("extra")), + }, + Data: make([]byte, 32*5), + }, + expectedError: "unexpected number of event topics: 3", + }, + { + name: "Data Too Short", + log: &types.Log{ + Address: params.InteropCrossL2InboxAddress, + Topics: []common.Hash{ + common.BytesToHash(ExecutingMessageEventTopic[:]), + common.BytesToHash([]byte("payloadHash")), + }, + Data: make([]byte, 32*4), // One word too short + }, + expectedError: "unexpected identifier data length: 128", + }, + { + name: "Data Too Long", + log: &types.Log{ + Address: params.InteropCrossL2InboxAddress, + Topics: []common.Hash{ + common.BytesToHash(ExecutingMessageEventTopic[:]), + common.BytesToHash([]byte("payloadHash")), + }, + Data: make([]byte, 32*6), // One word too long + }, + expectedError: "unexpected identifier data length: 192", + }, + { + name: "Invalid Address Padding", + log: &types.Log{ + Address: params.InteropCrossL2InboxAddress, + Topics: []common.Hash{ + common.BytesToHash(ExecutingMessageEventTopic[:]), + common.BytesToHash([]byte("payloadHash")), + }, + Data: func() []byte { + data := make([]byte, 32*5) + data[0] = 1 // Add non-zero byte in address padding + return data + }(), + }, + expectedError: "invalid address padding", + }, + { + name: "Invalid Block Number Padding", + log: &types.Log{ + Address: params.InteropCrossL2InboxAddress, + Topics: []common.Hash{ + common.BytesToHash(ExecutingMessageEventTopic[:]), + common.BytesToHash([]byte("payloadHash")), + }, + Data: func() []byte { + data := make([]byte, 32*5) + data[32+23] = 1 // Add non-zero byte in block number padding + return data + }(), + }, + expectedError: "invalid block number padding", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var msg Message + err := msg.DecodeEvent(tt.log.Topics, tt.log.Data) + if tt.expectedError != "" { + require.Error(t, err) + require.ErrorContains(t, err, tt.expectedError) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/core/types/rollup_cost.go b/core/types/rollup_cost.go index 183ff12489..f398c43af1 100644 --- a/core/types/rollup_cost.go +++ b/core/types/rollup_cost.go @@ -364,7 +364,7 @@ func NewL1CostFuncFjord(l1BaseFee, l1BlobBaseFee, baseFeeScalar, blobFeeScalar * calldataCostPerByte := new(big.Int).Mul(scaledL1BaseFee, sixteen) blobCostPerByte := new(big.Int).Mul(blobFeeScalar, l1BlobBaseFee) l1FeeScaled := new(big.Int).Add(calldataCostPerByte, blobCostPerByte) - estimatedSize := EstimatedL1SizeScaled(costData) + estimatedSize := costData.estimatedDASizeScaled() l1CostScaled := new(big.Int).Mul(estimatedSize, l1FeeScaled) l1Cost := new(big.Int).Div(l1CostScaled, fjordDivisor) @@ -375,10 +375,10 @@ func NewL1CostFuncFjord(l1BaseFee, l1BlobBaseFee, baseFeeScalar, blobFeeScalar * } } -// EstimatedL1Size estimates the number of bytes the transaction will occupy in its L1 batch using -// the Fjord linear regression model, and returns this value scaled up by 1e6. -func EstimatedL1SizeScaled(costData RollupCostData) *big.Int { - fastLzSize := new(big.Int).SetUint64(costData.FastLzSize) +// estimatedDASizeScaled estimates the number of bytes the transaction will occupy in the DA batch using the Fjord +// linear regression model, and returns this value scaled up by 1e6. +func (cd RollupCostData) estimatedDASizeScaled() *big.Int { + fastLzSize := new(big.Int).SetUint64(cd.FastLzSize) estimatedSize := new(big.Int).Add(L1CostIntercept, new(big.Int).Mul(L1CostFastlzCoef, fastLzSize)) if estimatedSize.Cmp(MinTransactionSizeScaled) < 0 { @@ -387,6 +387,13 @@ func EstimatedL1SizeScaled(costData RollupCostData) *big.Int { return estimatedSize } +// EstimatedDASize estimates the number of bytes the transaction will occupy in its DA batch using the Fjord linear +// regression model. +func (cd RollupCostData) EstimatedDASize() *big.Int { + b := cd.estimatedDASizeScaled() + return b.Div(b, big.NewInt(1e6)) +} + func extractEcotoneFeeParams(l1FeeParams []byte) (l1BaseFeeScalar, l1BlobBaseFeeScalar *big.Int) { offset := scalarSectionStart l1BaseFeeScalar = new(big.Int).SetBytes(l1FeeParams[offset : offset+4]) diff --git a/eth/backend.go b/eth/backend.go index 860ecc9149..79453719c6 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -42,6 +42,7 @@ import ( "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/ethconfig" "github.com/ethereum/go-ethereum/eth/gasprice" + "github.com/ethereum/go-ethereum/eth/interop" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/snap" "github.com/ethereum/go-ethereum/eth/tracers" @@ -79,6 +80,8 @@ type Ethereum struct { seqRPCService *rpc.Client historicalRPCService *rpc.Client + interopRPC *interop.InteropClient + // DB interfaces chainDb ethdb.Database // Block chain database @@ -270,7 +273,13 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { blobPool := blobpool.New(config.BlobPool, eth.blockchain) txPools = append(txPools, blobPool) } - eth.txPool, err = txpool.New(config.TxPool.PriceLimit, eth.blockchain, txPools) + // if interop is enabled, establish an Interop Filter connected to this Ethereum instance's + // simulated logs and message safety check functions + poolFilters := []txpool.IngressFilter{} + if config.InteropMessageRPC != "" && config.InteropMempoolFiltering { + poolFilters = append(poolFilters, txpool.NewInteropFilter(eth.SimLogs, eth.CheckMessages)) + } + eth.txPool, err = txpool.New(config.TxPool.PriceLimit, eth.blockchain, txPools, poolFilters) if err != nil { return nil, err } @@ -320,6 +329,10 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { eth.historicalRPCService = client } + if config.InteropMessageRPC != "" { + eth.interopRPC = interop.NewInteropClient(config.InteropMessageRPC) + } + // Start the RPC service eth.netRPCService = ethapi.NewNetAPI(eth.p2pServer, networkID) @@ -483,6 +496,12 @@ func (s *Ethereum) Stop() error { if s.historicalRPCService != nil { s.historicalRPCService.Close() } + if s.interopRPC != nil { + s.interopRPC.Close() + } + if s.miner != nil { + s.miner.Close() + } // Clean shutdown marker as the last thing before closing db s.shutdownTracker.Stop() diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index f3415b2284..29eca7bb28 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -181,6 +181,9 @@ type Config struct { RollupDisableTxPoolGossip bool RollupDisableTxPoolAdmission bool RollupHaltOnIncompatibleProtocolVersion string + + InteropMessageRPC string `toml:",omitempty"` + InteropMempoolFiltering bool `toml:",omitempty"` } // CreateConsensusEngine creates a consensus engine for the given chain config. diff --git a/eth/ethconfig/gen_config.go b/eth/ethconfig/gen_config.go index 864f4afdbd..72f0e1dc73 100644 --- a/eth/ethconfig/gen_config.go +++ b/eth/ethconfig/gen_config.go @@ -67,6 +67,8 @@ func (c Config) MarshalTOML() (interface{}, error) { RollupDisableTxPoolGossip bool RollupDisableTxPoolAdmission bool RollupHaltOnIncompatibleProtocolVersion string + InteropMessageRPC string `toml:",omitempty"` + InteropMempoolFiltering bool `toml:",omitempty"` } var enc Config enc.Genesis = c.Genesis @@ -119,6 +121,8 @@ func (c Config) MarshalTOML() (interface{}, error) { enc.RollupDisableTxPoolGossip = c.RollupDisableTxPoolGossip enc.RollupDisableTxPoolAdmission = c.RollupDisableTxPoolAdmission enc.RollupHaltOnIncompatibleProtocolVersion = c.RollupHaltOnIncompatibleProtocolVersion + enc.InteropMessageRPC = c.InteropMessageRPC + enc.InteropMempoolFiltering = c.InteropMempoolFiltering return &enc, nil } @@ -175,6 +179,8 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { RollupDisableTxPoolGossip *bool RollupDisableTxPoolAdmission *bool RollupHaltOnIncompatibleProtocolVersion *string + InteropMessageRPC *string `toml:",omitempty"` + InteropMempoolFiltering *bool `toml:",omitempty"` } var dec Config if err := unmarshal(&dec); err != nil { @@ -330,5 +336,11 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { if dec.RollupHaltOnIncompatibleProtocolVersion != nil { c.RollupHaltOnIncompatibleProtocolVersion = *dec.RollupHaltOnIncompatibleProtocolVersion } + if dec.InteropMessageRPC != nil { + c.InteropMessageRPC = *dec.InteropMessageRPC + } + if dec.InteropMempoolFiltering != nil { + c.InteropMempoolFiltering = *dec.InteropMempoolFiltering + } return nil } diff --git a/eth/interop.go b/eth/interop.go new file mode 100644 index 0000000000..f044f20120 --- /dev/null +++ b/eth/interop.go @@ -0,0 +1,57 @@ +package eth + +import ( + "context" + "errors" + "fmt" + + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/types/interoptypes" + "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/internal/ethapi" +) + +func (s *Ethereum) CheckMessages(ctx context.Context, messages []interoptypes.Message, minSafety interoptypes.SafetyLevel) error { + if s.interopRPC == nil { + return errors.New("cannot check interop messages, no RPC available") + } + return s.interopRPC.CheckMessages(ctx, messages, minSafety) +} + +// SimLogs simulates the logs that would be generated by a transaction if it were executed on the current state. +// This is used by the interop filter to determine if a transaction should be allowed. +// if errors are encountered, no logs are returned. +func (s *Ethereum) SimLogs(tx *types.Transaction) ([]*types.Log, error) { + chainConfig := s.APIBackend.ChainConfig() + if !chainConfig.IsOptimism() { + return nil, errors.New("expected OP-Stack chain config, SimLogs is an OP-Stack feature") + } + header := s.BlockChain().CurrentBlock() + if chainConfig.InteropTime == nil { + return nil, errors.New("expected Interop fork to be configured, SimLogs is unavailable pre-interop") + } + state, err := s.BlockChain().StateAt(header.Root) + if err != nil { + return nil, fmt.Errorf("state %s (block %d) is unavailable for log simulation: %w", header.Root, header.Number.Uint64(), err) + } + var vmConf vm.Config + signer := types.MakeSigner(chainConfig, header.Number, header.Time) + message, err := core.TransactionToMessage(tx, signer, header.BaseFee) + if err != nil { + return nil, fmt.Errorf("cannot convert tx to message for log simulation: %w", err) + } + chainCtx := ethapi.NewChainContext(context.Background(), s.APIBackend) + blockCtx := core.NewEVMBlockContext(header, chainCtx, &header.Coinbase, chainConfig, state) + txCtx := core.NewEVMTxContext(message) + vmenv := vm.NewEVM(blockCtx, txCtx, state, chainConfig, vmConf) + state.SetTxContext(tx.Hash(), 0) + result, err := core.ApplyMessage(vmenv, message, new(core.GasPool).AddGas(header.GasLimit)) + if err != nil { + return nil, fmt.Errorf("failed to execute tx: %w", err) + } + if result.Failed() { // failed txs do not have log events + return nil, nil + } + return state.GetLogs(tx.Hash(), header.Number.Uint64(), header.Hash()), nil +} diff --git a/eth/interop/interop.go b/eth/interop/interop.go new file mode 100644 index 0000000000..ae93189ac4 --- /dev/null +++ b/eth/interop/interop.go @@ -0,0 +1,57 @@ +package interop + +import ( + "context" + "errors" + "sync" + + "github.com/ethereum/go-ethereum/core/types/interoptypes" + "github.com/ethereum/go-ethereum/rpc" +) + +type InteropClient struct { + mu sync.Mutex + client *rpc.Client + endpoint string + closed bool // don't allow lazy-dials after Close +} + +// maybeDial dials the endpoint if it was not already. +func (cl *InteropClient) maybeDial(ctx context.Context) error { + cl.mu.Lock() + defer cl.mu.Unlock() + if cl.closed { + return errors.New("client is closed") + } + if cl.client != nil { + return nil + } + rpcClient, err := rpc.DialContext(ctx, cl.endpoint) + if err != nil { + return err + } + cl.client = rpcClient + return nil +} + +func (cl *InteropClient) Close() { + cl.mu.Lock() + defer cl.mu.Unlock() + if cl.client != nil { + cl.client.Close() + } + cl.closed = true +} + +func NewInteropClient(rpcEndpoint string) *InteropClient { + return &InteropClient{endpoint: rpcEndpoint} +} + +// CheckMessages checks if the given messages meet the given minimum safety level. +func (cl *InteropClient) CheckMessages(ctx context.Context, messages []interoptypes.Message, minSafety interoptypes.SafetyLevel) error { + // we lazy-dial the endpoint, so we can start geth, and build blocks, without supervisor endpoint availability. + if err := cl.maybeDial(ctx); err != nil { // a single dial attempt is made, the next call may retry. + return err + } + return cl.client.CallContext(ctx, nil, "supervisor_checkMessages", messages, minSafety) +} diff --git a/eth/protocols/eth/handler_test.go b/eth/protocols/eth/handler_test.go index fc82b42947..0acd50ed5b 100644 --- a/eth/protocols/eth/handler_test.go +++ b/eth/protocols/eth/handler_test.go @@ -117,7 +117,7 @@ func newTestBackendWithGenerator(blocks int, shanghai bool, generator func(int, txconfig.Journal = "" // Don't litter the disk with test journals pool := legacypool.New(txconfig, chain) - txpool, _ := txpool.New(txconfig.PriceLimit, chain, []txpool.SubPool{pool}) + txpool, _ := txpool.New(txconfig.PriceLimit, chain, []txpool.SubPool{pool}, nil) return &testBackend{ db: db, diff --git a/fork.yaml b/fork.yaml index 5ba914d9c7..cf31c24316 100644 --- a/fork.yaml +++ b/fork.yaml @@ -119,6 +119,7 @@ def: The block-building code (in the "miner" package because of Proof-Of-Work legacy of ethereum) implements the changes to support the transaction-inclusion, tx-pool toggle, gaslimit, and EIP-1559 parameters of the Engine API. + This also includes experimental support for interop executing-messages to be verified through an RPC. globs: - "miner/*" - title: "Tx-pool tx cost updates" @@ -230,6 +231,14 @@ def: globs: - "core/state/workers.go" - "trie/hasher.go" + - title: "Interop message checking" + description: | + The interop upgrade introduces cross-chain message. + Transactions are checked for cross-chain message safety before and during inclusion into a block. + This also includes tx-pool ingress filtering. + globs: + - "eth/interop.go" + - "core/txpool/ingress_filters.go" - title: "User API enhancements" description: "Encode the Deposit Tx properties, the L1 costs, and daisy-chain RPC-calls for pre-Bedrock historical data" sub: diff --git a/go.mod b/go.mod index 9b65187b18..0273ac7963 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,7 @@ module github.com/ethereum/go-ethereum go 1.22.0 -toolchain go1.22.8 +toolchain go1.22.7 require ( github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.2.0 @@ -23,7 +23,7 @@ require ( github.com/deckarep/golang-set/v2 v2.6.0 github.com/donovanhide/eventsource v0.0.0-20210830082556-c59027999da0 github.com/dop251/goja v0.0.0-20230806174421-c933cf95e127 - github.com/ethereum-optimism/superchain-registry/superchain v0.0.0-20240828144951-4e6edcb7d36c + github.com/ethereum-optimism/superchain-registry/superchain v0.0.0-20241119111730-bee358f6d6e6 github.com/ethereum/c-kzg-4844 v1.0.0 github.com/ethereum/go-verkle v0.1.1-0.20240829091221-dffa7562dbe9 github.com/fatih/color v1.16.0 diff --git a/miner/miner.go b/miner/miner.go index 14a6a7e1fa..7cbfebf564 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -31,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/types/interoptypes" "github.com/ethereum/go-ethereum/eth/tracers" "github.com/ethereum/go-ethereum/params" ) @@ -46,6 +47,10 @@ type BackendWithHistoricalState interface { StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, readOnly bool, preferDisk bool) (*state.StateDB, tracers.StateReleaseFunc, error) } +type BackendWithInterop interface { + CheckMessages(ctx context.Context, messages []interoptypes.Message, minSafety interoptypes.SafetyLevel) error +} + // Config is the configuration parameters of mining. type Config struct { Etherbase common.Address `toml:"-"` // Deprecated @@ -88,10 +93,14 @@ type Miner struct { pendingMu sync.Mutex // Lock protects the pending block backend Backend + + lifeCtxCancel context.CancelFunc + lifeCtx context.Context } // New creates a new miner with provided config. func New(eth Backend, config Config, engine consensus.Engine) *Miner { + ctx, cancel := context.WithCancel(context.Background()) return &Miner{ backend: eth, config: &config, @@ -100,6 +109,9 @@ func New(eth Backend, config Config, engine consensus.Engine) *Miner { txpool: eth.TxPool(), chain: eth.BlockChain(), pending: &pending{}, + // To interrupt background tasks that may be attached to external processes + lifeCtxCancel: cancel, + lifeCtx: ctx, } } @@ -208,3 +220,7 @@ func (miner *Miner) getPending() *newPayloadResult { miner.pending.update(header.Hash(), ret) return ret } + +func (miner *Miner) Close() { + miner.lifeCtxCancel() +} diff --git a/miner/miner_test.go b/miner/miner_test.go index 46a14e97f3..d365ee18ac 100644 --- a/miner/miner_test.go +++ b/miner/miner_test.go @@ -164,7 +164,7 @@ func createMiner(t *testing.T) *Miner { blockchain := &testBlockChain{bc.Genesis().Root(), chainConfig, statedb, 10000000, new(event.Feed)} pool := legacypool.New(testTxPoolConfig, blockchain) - txpool, _ := txpool.New(testTxPoolConfig.PriceLimit, blockchain, []txpool.SubPool{pool}) + txpool, _ := txpool.New(testTxPoolConfig.PriceLimit, blockchain, []txpool.SubPool{pool}, nil) // Create Miner backend := NewMockBackend(bc, txpool) diff --git a/miner/payload_building.go b/miner/payload_building.go index 30ecfff720..2dd574c15c 100644 --- a/miner/payload_building.go +++ b/miner/payload_building.go @@ -17,6 +17,7 @@ package miner import ( + "context" "crypto/sha256" "encoding/binary" "errors" @@ -106,10 +107,14 @@ type Payload struct { err error stopOnce sync.Once interrupt *atomic.Int32 // interrupt signal shared with worker + + rpcCtx context.Context // context to limit RPC-coupled payload checks + rpcCancel context.CancelFunc } // newPayload initializes the payload object. -func newPayload(empty *types.Block, witness *stateless.Witness, id engine.PayloadID) *Payload { +func newPayload(lifeCtx context.Context, empty *types.Block, witness *stateless.Witness, id engine.PayloadID) *Payload { + rpcCtx, rpcCancel := context.WithCancel(lifeCtx) payload := &Payload{ id: id, empty: empty, @@ -117,6 +122,9 @@ func newPayload(empty *types.Block, witness *stateless.Witness, id engine.Payloa stop: make(chan struct{}), interrupt: new(atomic.Int32), + + rpcCtx: rpcCtx, + rpcCancel: rpcCancel, } log.Info("Starting work on payload", "id", payload.id) payload.cond = sync.NewCond(&payload.lock) @@ -253,6 +261,7 @@ func (payload *Payload) resolve(onlyFull bool) *engine.ExecutionPayloadEnvelope // the update anyways. // interruptBuilding is safe to be called concurrently. func (payload *Payload) interruptBuilding() { + payload.rpcCancel() // Set the interrupt if not interrupted already. // It's ok if it has either already been interrupted by payload resolution earlier, // or by the timeout timer set to commitInterruptTimeout. @@ -269,6 +278,7 @@ func (payload *Payload) interruptBuilding() { // transactions with interruptBuilding. // stopBuilding is safe to be called concurrently. func (payload *Payload) stopBuilding() { + payload.rpcCancel() // Concurrent Resolve calls should only stop once. payload.stopOnce.Do(func() { log.Debug("Stop payload building.", "id", payload.id) @@ -295,12 +305,14 @@ func (miner *Miner) buildPayload(args *BuildPayloadArgs, witness bool) (*Payload txs: args.Transactions, gasLimit: args.GasLimit, eip1559Params: args.EIP1559Params, + // No RPC requests allowed. + rpcCtx: nil, } empty := miner.generateWork(emptyParams, witness) if empty.err != nil { return nil, empty.err } - payload := newPayload(empty.block, empty.witness, args.Id()) + payload := newPayload(miner.lifeCtx, empty.block, empty.witness, args.Id()) // make sure to make it appear as full, otherwise it will wait indefinitely for payload building to complete. payload.full = empty.block payload.fullFees = empty.fees @@ -329,9 +341,10 @@ func (miner *Miner) buildPayload(args *BuildPayloadArgs, witness bool) (*Payload return nil, err } - payload := newPayload(nil, nil, args.Id()) + payload := newPayload(miner.lifeCtx, nil, nil, args.Id()) // set shared interrupt fullParams.interrupt = payload.interrupt + fullParams.rpcCtx = payload.rpcCtx // Spin up a routine for updating the payload in background. This strategy // can maximum the revenue for including transactions with highest fee. @@ -375,6 +388,8 @@ func (miner *Miner) buildPayload(args *BuildPayloadArgs, witness bool) (*Payload var lastDuration time.Duration for { select { + case <-miner.lifeCtx.Done(): + stopReason = "miner-shutdown" case <-timer.C: // We have to prioritize the stop signal because the recommit timer // might have fired while stop also got closed. diff --git a/miner/payload_building_test.go b/miner/payload_building_test.go index 49c2578604..ee5e2e6944 100644 --- a/miner/payload_building_test.go +++ b/miner/payload_building_test.go @@ -140,7 +140,7 @@ func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine t.Fatalf("core.NewBlockChain failed: %v", err) } pool := legacypool.New(testTxPoolConfig, chain) - txpool, _ := txpool.New(testTxPoolConfig.PriceLimit, chain, []txpool.SubPool{pool}) + txpool, _ := txpool.New(testTxPoolConfig.PriceLimit, chain, []txpool.SubPool{pool}, nil) return &testWorkerBackend{ db: db, @@ -192,9 +192,9 @@ func TestDAFilters(t *testing.T) { // The first transaction has size 100, all other DA test txs are bigger due to random Data, so should get filtered. t.Run("with-tx-filter-all-but-first", func(t *testing.T) { testDAFilters(t, big.NewInt(100), nil, 1) }) t.Run("with-block-filter-all-but-first", func(t *testing.T) { testDAFilters(t, nil, big.NewInt(100), 1) }) - // Zero/nil values for these parameters means we should mean never filter + // Zero/nil values for these parameters means we should never filter t.Run("with-zero-tx-filters", func(t *testing.T) { testDAFilters(t, big.NewInt(0), big.NewInt(0), totalTxs) }) - t.Run("with-zero-tx-filters", func(t *testing.T) { testDAFilters(t, nil, nil, totalTxs) }) + t.Run("with-nil-tx-filters", func(t *testing.T) { testDAFilters(t, nil, nil, totalTxs) }) } func holoceneConfig() *params.ChainConfig { diff --git a/miner/worker.go b/miner/worker.go index 864c5ae2ff..a4f297c6c6 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -33,6 +33,7 @@ import ( "github.com/ethereum/go-ethereum/core/stateless" "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/types/interoptypes" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/eth/tracers" "github.com/ethereum/go-ethereum/log" @@ -57,6 +58,8 @@ var ( txConditionalRejectedCounter = metrics.NewRegisteredCounter("miner/transactionConditional/rejected", nil) txConditionalMinedTimer = metrics.NewRegisteredTimer("miner/transactionConditional/elapsedtime", nil) + + txInteropRejectedCounter = metrics.NewRegisteredCounter("miner/transactionInterop/rejected", nil) ) // environment is the worker's current environment and holds all @@ -75,6 +78,9 @@ type environment struct { blobs int witness *stateless.Witness + + noTxs bool // true if we are reproducing a block, and do not have to check interop txs + rpcCtx context.Context // context to control block-building RPC work. No RPC allowed if nil. } const ( @@ -112,6 +118,8 @@ type generateParams struct { eip1559Params []byte // Optional EIP-1559 parameters interrupt *atomic.Int32 // Optional interruption signal to pass down to worker.generateWork isUpdate bool // Optional flag indicating that this is building a discardable update + + rpcCtx context.Context // context to control block-building RPC work. No RPC allowed if nil. } // generateWork generates a sealing block based on the given parameters. @@ -144,7 +152,6 @@ func (miner *Miner) generateWork(params *generateParams, witness bool) *newPaylo if err != nil { return &newPayloadResult{err: fmt.Errorf("failed to force-include tx: %s type: %d sender: %s nonce: %d, err: %w", tx.Hash(), tx.Type(), from, tx.Nonce(), err)} } - work.tcount++ } if !params.noTxs { // use shared interrupt if present @@ -288,11 +295,12 @@ func (miner *Miner) prepareWork(genParams *generateParams, witness bool) (*envir // Could potentially happen if starting to mine in an odd state. // Note genParams.coinbase can be different with header.Coinbase // since clique algorithm can modify the coinbase field in header. - env, err := miner.makeEnv(parent, header, genParams.coinbase, witness) + env, err := miner.makeEnv(parent, header, genParams.coinbase, witness, genParams.rpcCtx) if err != nil { log.Error("Failed to create sealing context", "err", err) return nil, err } + env.noTxs = genParams.noTxs if header.ParentBeaconRoot != nil { context := core.NewEVMBlockContext(header, miner.chain, nil, miner.chainConfig, env.state) vmenv := vm.NewEVM(context, vm.TxContext{}, env.state, miner.chainConfig, vm.Config{}) @@ -307,7 +315,7 @@ func (miner *Miner) prepareWork(genParams *generateParams, witness bool) (*envir } // makeEnv creates a new environment for the sealing block. -func (miner *Miner) makeEnv(parent *types.Header, header *types.Header, coinbase common.Address, witness bool) (*environment, error) { +func (miner *Miner) makeEnv(parent *types.Header, header *types.Header, coinbase common.Address, witness bool, rpcCtx context.Context) (*environment, error) { // Retrieve the parent state to execute on top. state, err := miner.chain.StateAt(parent.Root) if err != nil { @@ -340,6 +348,7 @@ func (miner *Miner) makeEnv(parent *types.Header, header *types.Header, coinbase coinbase: coinbase, header: header, witness: state.Witness(), + rpcCtx: rpcCtx, }, nil } @@ -396,13 +405,37 @@ func (miner *Miner) commitBlobTransaction(env *environment, tx *types.Transactio return nil } +type LogInspector interface { + GetLogs(hash common.Hash, blockNumber uint64, blockHash common.Hash) []*types.Log +} + // applyTransaction runs the transaction. If execution fails, state and gas pool are reverted. func (miner *Miner) applyTransaction(env *environment, tx *types.Transaction) (*types.Receipt, error) { var ( snap = env.state.Snapshot() gp = env.gasPool.Gas() ) - receipt, err := core.ApplyTransaction(miner.chainConfig, miner.chain, &env.coinbase, env.gasPool, env.state, env.header, tx, &env.header.GasUsed, vm.Config{}) + var extraOpts *core.ApplyTransactionOpts + // If not just reproducing the block, check the interop executing messages. + if !env.noTxs && miner.chain.Config().IsInterop(env.header.Time) { + // Whenever there are `noTxs` it means we are building a block from pre-determined txs. There are two cases: + // (1) it's derived from L1, and will be verified asynchronously by the op-node. + // (2) it is a deposits-only empty-block by the sequencer, in which case there are no interop-txs to verify (as deposits do not emit any). + + // We have to insert as call-back, since we cannot revert the snapshot + // after the tx is deemed successful and the journal has been cleared already. + extraOpts = &core.ApplyTransactionOpts{ + PostValidation: func(evm *vm.EVM, result *core.ExecutionResult) error { + logInspector, ok := evm.StateDB.(LogInspector) + if !ok { + return fmt.Errorf("cannot get logs from StateDB type %T", evm.StateDB) + } + logs := logInspector.GetLogs(tx.Hash(), env.header.Number.Uint64(), common.Hash{}) + return miner.checkInterop(env.rpcCtx, tx, result.Failed(), logs) + }, + } + } + receipt, err := core.ApplyTransactionExtended(miner.chainConfig, miner.chain, &env.coinbase, env.gasPool, env.state, env.header, tx, &env.header.GasUsed, vm.Config{}, extraOpts) if err != nil { env.state.RevertToSnapshot(snap) env.gasPool.SetGas(gp) @@ -410,6 +443,42 @@ func (miner *Miner) applyTransaction(env *environment, tx *types.Transaction) (* return receipt, err } +func (miner *Miner) checkInterop(ctx context.Context, tx *types.Transaction, failed bool, logs []*types.Log) error { + if tx.Type() == types.DepositTxType { + return nil // deposit-txs are always safe + } + if failed { + return nil // failed txs don't persist any logs + } + if tx.Rejected() { + return errors.New("transaction was previously rejected") + } + b, ok := miner.backend.(BackendWithInterop) + if !ok { + return fmt.Errorf("cannot mine interop txs without interop backend, got backend type %T", miner.backend) + } + if ctx == nil { // check if the miner was set up correctly to interact with an RPC + return errors.New("need RPC context to check executing messages") + } + executingMessages, err := interoptypes.ExecutingMessagesFromLogs(logs) + if err != nil { + return fmt.Errorf("cannot parse interop messages from receipt of %s: %w", tx.Hash(), err) + } + if len(executingMessages) == 0 { + return nil // avoid an RPC check if there are no executing messages to verify. + } + if err := b.CheckMessages(ctx, executingMessages, interoptypes.CrossUnsafe); err != nil { + if ctx.Err() != nil { // don't reject transactions permanently on RPC timeouts etc. + log.Debug("CheckMessages timed out", "err", ctx.Err()) + return err + } + txInteropRejectedCounter.Inc(1) + tx.SetRejected() // Mark the tx as rejected: it will not be welcome in the tx-pool anymore. + return err + } + return nil +} + func (miner *Miner) commitTransactions(env *environment, plainTxs, blobTxs *transactionsByPriceAndNonce, interrupt *atomic.Int32) error { gasLimit := env.header.GasLimit if env.gasPool == nil { @@ -516,6 +585,14 @@ func (miner *Miner) commitTransactions(env *environment, plainTxs, blobTxs *tran log.Warn("Skipping account, transaction with failed conditional", "sender", from, "hash", ltx.Hash, "err", err) txs.Pop() + case env.rpcCtx != nil && env.rpcCtx.Err() != nil && errors.Is(err, env.rpcCtx.Err()): + log.Warn("Transaction processing aborted due to RPC context error", "err", err) + txs.Pop() // RPC timeout. Tx could not be checked, and thus not included, but not rejected yet. + + case err != nil && tx.Rejected(): + log.Warn("Transaction was rejected during block-building", "hash", ltx.Hash, "err", err) + txs.Pop() + case errors.Is(err, nil): // Everything ok, collect the logs and shift in the next transaction from the same account blockDABytes = daBytesAfter diff --git a/params/interop.go b/params/interop.go new file mode 100644 index 0000000000..148ece53fe --- /dev/null +++ b/params/interop.go @@ -0,0 +1,5 @@ +package params + +import "github.com/ethereum/go-ethereum/common" + +var InteropCrossL2InboxAddress = common.HexToAddress("0x4200000000000000000000000000000000000022") diff --git a/params/superchain.go b/params/superchain.go index f2ceeda954..233c1e117d 100644 --- a/params/superchain.go +++ b/params/superchain.go @@ -11,7 +11,7 @@ import ( "github.com/ethereum/go-ethereum/common" ) -var OPStackSupport = ProtocolVersionV0{Build: [8]byte{}, Major: 8, Minor: 0, Patch: 0, PreRelease: 0}.Encode() +var OPStackSupport = ProtocolVersionV0{Build: [8]byte{}, Major: 9, Minor: 0, Patch: 0, PreRelease: 1}.Encode() func init() { for id, ch := range superchain.OPChains {