From 965e3669fef8a46dc7ce2cf4198a833911997b77 Mon Sep 17 00:00:00 2001 From: Charlie Chen Date: Mon, 12 Feb 2024 18:17:19 -0600 Subject: [PATCH] skip Goerli BlobTxType transaction and Mumbai empty block --- go.mod | 1 + go.sum | 4 + zetaclient/evm_client.go | 249 ++++++++++++++++++++++++---------- zetaclient/evm_client_test.go | 37 +++++ 4 files changed, 221 insertions(+), 70 deletions(-) create mode 100644 zetaclient/evm_client_test.go diff --git a/go.mod b/go.mod index aff1f815c3..58504fbf19 100644 --- a/go.mod +++ b/go.mod @@ -86,6 +86,7 @@ require ( github.com/libp2p/go-yamux/v4 v4.0.0 // indirect github.com/linxGnu/grocksdb v1.7.15 // indirect github.com/miekg/pkcs11 v1.1.1 // indirect + github.com/onrik/ethrpc v1.2.0 // indirect github.com/onsi/ginkgo/v2 v2.9.7 // indirect github.com/prometheus/tsdb v0.7.1 // indirect github.com/quic-go/qpack v0.4.0 // indirect diff --git a/go.sum b/go.sum index 96ac6b81eb..56381b3da1 100644 --- a/go.sum +++ b/go.sum @@ -1848,6 +1848,7 @@ github.com/ipfs/go-datastore v0.6.0/go.mod h1:rt5M3nNbSO/8q1t4LNkLyUwRs8HupMeN/8 github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk= github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps= github.com/ipfs/go-ipfs-util v0.0.2 h1:59Sswnk1MFaiq+VcaknX7aYEyGyGDAA73ilhEK2POp8= +github.com/ipfs/go-ipfs-util v0.0.2/go.mod h1:CbPtkWJzjLdEcezDns2XYaehFVNXG9zrdrtMecczcsQ= github.com/ipfs/go-log v1.0.5 h1:2dOuUCB1Z7uoczMWgAyDck5JLb72zHzrMnGnCNNbvY8= github.com/ipfs/go-log v1.0.5/go.mod h1:j0b8ZoR+7+R99LD9jZ6+AJsrzkPbSXbZfGakb5JPtIo= github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Axpmri6g= @@ -2358,6 +2359,8 @@ github.com/olekukonko/tablewriter v0.0.2-0.20190409134802-7e037d187b0c/go.mod h1 github.com/olekukonko/tablewriter v0.0.2/go.mod h1:rSAaSIOAGT9odnlyGlUfAJaoc5w2fSBUmeGDbRWPxyQ= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= +github.com/onrik/ethrpc v1.2.0 h1:BBcr1iWxW1RBP/eyZfzvSKtGgeqexq5qS0yyf4pmKbc= +github.com/onrik/ethrpc v1.2.0/go.mod h1:uvyqpn8+WbsTgBYfouImgEfpIMb0hR8fWGjwdgPHtFU= github.com/onsi/ginkgo v0.0.0-20151202141238-7f8ab55aaf3b/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -2757,6 +2760,7 @@ github.com/sourcegraph/go-diff v0.5.1/go.mod h1:j2dHj3m8aZgQO8lMTcTnBcXkRRRqi34c github.com/sourcegraph/go-diff v0.5.3/go.mod h1:v9JDtjCE4HHHCZGId75rg8gkKKa98RVjBcBGsVmMmak= github.com/sourcegraph/go-diff v0.6.1/go.mod h1:iBszgVvyxdc8SFZ7gm69go2KDdt3ag071iBaWPF6cjs= github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e/go.mod h1:HuIsMU8RRBOtsCgI77wP899iHVBQpCmg4ErYMZB+2IA= +github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572/go.mod h1:w0SWMsp6j9O/dk4/ZpIhL+3CkG8ofA2vuv7k+ltqUMc= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= diff --git a/zetaclient/evm_client.go b/zetaclient/evm_client.go index 36d5ee0570..63299003e1 100644 --- a/zetaclient/evm_client.go +++ b/zetaclient/evm_client.go @@ -1,6 +1,7 @@ package zetaclient import ( + "bytes" "context" "fmt" "math" @@ -8,6 +9,7 @@ import ( "os" "sort" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -22,6 +24,7 @@ import ( "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/rlp" lru "github.com/hashicorp/golang-lru" + "github.com/onrik/ethrpc" "github.com/pkg/errors" "github.com/rs/zerolog" "github.com/rs/zerolog/log" @@ -70,6 +73,7 @@ type EVMChainClient struct { *ChainMetrics chain common.Chain evmClient EVMRPCClient + evmClientAlternate *ethrpc.EthRPC // a fallback rpc client KlaytnClient KlaytnRPCClient zetaClient ZetaCoreBridger Tss TSSSigner @@ -92,7 +96,9 @@ type EVMChainClient struct { params observertypes.ChainParams ts *TelemetryServer - BlockCache *lru.Cache + blockCache *lru.Cache + blockCacheV3 *lru.Cache // blockCacheV3 caches blocks containing type-3 (BlobTxType) transactions + headerCache *lru.Cache } var _ ChainClient = (*EVMChainClient)(nil) @@ -146,12 +152,24 @@ func NewEVMChainClient( return nil, err } ob.evmClient = client + ob.evmClientAlternate = ethrpc.NewEthRPC(evmCfg.Endpoint) - ob.BlockCache, err = lru.New(1000) + // create block header and block caches + ob.blockCache, err = lru.New(1000) if err != nil { ob.logger.ChainLogger.Error().Err(err).Msg("failed to create block cache") return nil, err } + ob.blockCacheV3, err = lru.New(1000) + if err != nil { + ob.logger.ChainLogger.Error().Err(err).Msg("failed to create block cache v3") + return nil, err + } + ob.headerCache, err = lru.New(1000) + if err != nil { + ob.logger.ChainLogger.Error().Err(err).Msg("failed to create header cache") + return nil, err + } if ob.chain.IsKlaytnChain() { client, err := Dial(evmCfg.Endpoint) @@ -756,26 +774,6 @@ func (ob *EVMChainClient) checkConfirmedTx(txHash string, nonce uint64) (*ethtyp return nil, nil, false } - // cross-check receipt against the block - block, err := ob.GetBlockByNumberCached(receipt.BlockNumber.Uint64()) - if err != nil { - log.Error().Err(err).Msgf("confirmTxByHash: GetBlockByNumberCached error, txHash %s nonce %d block %d", - txHash, nonce, receipt.BlockNumber) - return nil, nil, false - } - // #nosec G701 non negative value - if receipt.TransactionIndex >= uint(len(block.Transactions())) { - log.Error().Msgf("confirmTxByHash: transaction index %d out of range [0, %d), txHash %s nonce %d block %d", - receipt.TransactionIndex, len(block.Transactions()), txHash, nonce, receipt.BlockNumber) - return nil, nil, false - } - txAtIndex := block.Transactions()[receipt.TransactionIndex] - if txAtIndex.Hash() != transaction.Hash() { - log.Error().Msgf("confirmTxByHash: transaction at index %d has different hash %s, txHash %s nonce %d block %d", - receipt.TransactionIndex, txAtIndex.Hash().Hex(), txHash, nonce, receipt.BlockNumber) - return nil, nil, false - } - // check confirmations if !ob.HasEnoughConfirmations(receipt, ob.GetLastBlockHeight()) { log.Debug().Msgf("confirmTxByHash: txHash %s nonce %d included but not confirmed: receipt block %d, current block %d", @@ -783,9 +781,51 @@ func (ob *EVMChainClient) checkConfirmedTx(txHash string, nonce uint64) (*ethtyp return nil, nil, false } + // cross-check tx inclusion against the block + // Note: a guard for false BlockNumber in receipt. The blob-carrying tx won't come here + err = ob.checkTxInclusion(transaction, receipt.BlockNumber.Uint64(), receipt.TransactionIndex) + if err != nil { + log.Error().Err(err).Msgf("confirmTxByHash: checkTxInclusion error for txHash %s nonce %d", txHash, nonce) + return nil, nil, false + } + return receipt, transaction, true } +// checkTxInclusion returns nil only if tx is included in the block at blockNumber and txIndex +func (ob *EVMChainClient) checkTxInclusion(tx *ethtypes.Transaction, blockNumber uint64, txIndex uint) error { + block, blockRPC, fallBack, _, err := ob.GetBlockByNumberCached(blockNumber) + if err != nil { + return fmt.Errorf("GetBlockByNumberCached error for block %d txHash %s nonce %d: %w", blockNumber, tx.Hash(), tx.Nonce(), err) + } + if !fallBack { + // #nosec G701 non negative value + if txIndex >= uint(len(block.Transactions())) { + return fmt.Errorf("transaction index %d out of range [0, %d), txHash %s nonce %d block %d", + txIndex, len(block.Transactions()), tx.Hash(), tx.Nonce(), blockNumber) + } + txAtIndex := block.Transactions()[txIndex] + if txAtIndex.Hash() != tx.Hash() { + ob.RemoveCachedBlock(blockNumber) // clean stale block from cache + return fmt.Errorf("transaction at index %d has different hash %s, txHash %s nonce %d block %d", + txIndex, txAtIndex.Hash().Hex(), tx.Hash(), tx.Nonce(), blockNumber) + } + } else { // fell back on ETH RPC as ethclient failed to parse the block + // #nosec G701 non negative value + if txIndex >= uint(len(blockRPC.Transactions)) { + return fmt.Errorf("transaction index %d out of range [0, %d), txHash %s nonce %d block %d", + txIndex, len(block.Transactions()), tx.Hash(), tx.Nonce(), blockNumber) + } + txAtIndex := blockRPC.Transactions[txIndex] + if ethcommon.HexToHash(txAtIndex.Hash) != tx.Hash() { + ob.RemoveCachedBlock(blockNumber) // clean stale block from cache + return fmt.Errorf("transaction at index %d has different hash %s, txHash %s nonce %d block %d", + txIndex, txAtIndex.Hash, tx.Hash(), tx.Nonce(), blockNumber) + } + } + return nil +} + // SetLastBlockHeightScanned set last block height scanned (not necessarily caught up with external block; could be slow/paused) func (ob *EVMChainClient) SetLastBlockHeightScanned(height uint64) { atomic.StoreUint64(&ob.lastBlockScanned, height) @@ -863,12 +903,12 @@ func (ob *EVMChainClient) postBlockHeader(tip uint64) error { return fmt.Errorf("postBlockHeader: must post block confirmed block header: %d > %d", bn, tip) } - block, err := ob.GetBlockByNumberCached(bn) + header, err := ob.GetBlockHeaderCached(bn) if err != nil { ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("postBlockHeader: error getting block: %d", bn) return err } - headerRLP, err := rlp.EncodeToBytes(block.Header()) + headerRLP, err := rlp.EncodeToBytes(header) if err != nil { ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("postBlockHeader: error encoding block header: %d", bn) return err @@ -876,8 +916,8 @@ func (ob *EVMChainClient) postBlockHeader(tip uint64) error { _, err = ob.zetaClient.PostAddBlockHeader( ob.chain.ChainId, - block.Hash().Bytes(), - block.Number().Int64(), + header.Hash().Bytes(), + header.Number.Int64(), common.NewEthereumHeader(headerRLP), ) if err != nil { @@ -1159,48 +1199,39 @@ func (ob *EVMChainClient) observeTssRecvd(startBlock, toBlock uint64, flags obse } // TODO: we can track the total number of 'getBlockByNumber' RPC calls made - block, err := ob.GetBlockByNumberCached(bn) + block, blockRPC, fallBack, skip, err := ob.GetBlockByNumberCached(bn) if err != nil { + if skip { + ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("observeTssRecvd: skip block %d for chain %d", bn, ob.chain.ChainId) + continue + } ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("observeTssRecvd: error getting block %d for chain %d", bn, ob.chain.ChainId) return bn - 1 // we have to re-scan from this block next time } - for _, tx := range block.Transactions() { - if tx.To() == nil { - continue - } - - if *tx.To() == tssAddress { - receipt, err := ob.evmClient.TransactionReceipt(context.Background(), tx.Hash()) - if err != nil { - ob.logger.ExternalChainWatcher.Err(err).Msgf( - "observeTssRecvd: TransactionReceipt error for tx %s chain %d", tx.Hash().Hex(), ob.chain.ChainId) - return bn - 1 // we have to re-scan this block next time - } - if receipt.Status != 1 { // 1: successful, 0: failed - ob.logger.ExternalChainWatcher.Info().Msgf("observeTssRecvd: tx %s chain %d failed; don't act", tx.Hash().Hex(), ob.chain.ChainId) - continue - } - - sender, err := ob.GetTransactionSender(tx, block.Hash(), receipt.TransactionIndex) - if err != nil { - ob.logger.ExternalChainWatcher.Err(err).Msgf( - "observeTssRecvd: GetTransactionSender error for tx %s chain %d", tx.Hash().Hex(), ob.chain.ChainId) - return bn - 1 // we have to re-scan this block next time - } - - msg := ob.GetInboundVoteMsgForTokenSentToTSS(tx, sender, receipt.BlockNumber.Uint64()) - if msg == nil { - continue + if !fallBack { + for _, tx := range block.Transactions() { + if tx.To() != nil && *tx.To() == tssAddress { + if ok := ob.processIntxToTss(tx, bn, block.Hash()); !ok { + return bn - 1 // we have to re-scan this block next time + } } - zetaHash, ballot, err := ob.zetaClient.PostVoteInbound(PostVoteInboundGasLimit, PostVoteInboundExecutionGasLimit, msg) - if err != nil { - ob.logger.ExternalChainWatcher.Error().Err(err).Msgf( - "observeTssRecvd: error posting to zeta core for tx %s at height %d for chain %d", tx.Hash().Hex(), bn, ob.chain.ChainId) - return bn - 1 // we have to re-scan this block next time - } else if zetaHash != "" { - ob.logger.ExternalChainWatcher.Info().Msgf( - "observeTssRecvd: gas asset deposit detected in tx %s at height %d for chain %d, PostVoteInbound zeta tx: %s ballot %s", - tx.Hash().Hex(), bn, ob.chain.ChainId, zetaHash, ballot) + } + } else { // fell back on ETH RPC as ethclient failed to parse the block + ob.logger.ExternalChainWatcher.Info().Msgf("observeTssRecvd: processing block %d using fallback for chain %d", bn, ob.chain.ChainId) + for _, txRPC := range blockRPC.Transactions { + if ethcommon.HexToAddress(txRPC.To) == tssAddress { + tx, _, err := ob.evmClient.TransactionByHash(context.Background(), ethcommon.HexToHash(txRPC.Hash)) + if err != nil { + if strings.Contains(err.Error(), "transaction type not supported") { + ob.logger.ExternalChainWatcher.Err(err).Msgf( + "observeTssRecvd: transaction type not supported for tx %s chain %d", txRPC.Hash, ob.chain.ChainId) + continue // skip blob-carrying tx to TSS address + } + return bn - 1 // we have to re-scan this block next time + } + if ok := ob.processIntxToTss(tx, bn, ethcommon.HexToHash(blockRPC.Hash)); !ok { + return bn - 1 // we have to re-scan this block next time + } } } } @@ -1209,6 +1240,48 @@ func (ob *EVMChainClient) observeTssRecvd(startBlock, toBlock uint64, flags obse return toBlock } +// processIntxToTss processes the incoming tx to TSS address and posts to zetacore +// returns true if the tx is successfully processed, false otherwise +func (ob *EVMChainClient) processIntxToTss(tx *ethtypes.Transaction, bn uint64, blockHash ethcommon.Hash) bool { + receipt, err := ob.evmClient.TransactionReceipt(context.Background(), tx.Hash()) + if err != nil { + ob.logger.ExternalChainWatcher.Err(err).Msgf( + "processIntxToTss: TransactionReceipt error for tx %s chain %d", tx.Hash().Hex(), ob.chain.ChainId) + return false // we have to re-scan this block next time + } + if receipt.Status != 1 { // 1: successful, 0: failed + ob.logger.ExternalChainWatcher.Info().Msgf("processIntxToTss: tx %s chain %d failed; don't act", tx.Hash().Hex(), ob.chain.ChainId) + return true // skip failed tx + } + if bytes.Equal(tx.Data(), []byte(DonationMessage)) { + ob.logger.ExternalChainWatcher.Info().Msgf( + "processIntxToTss: thank you rich folk for your donation!: %s chain %d", tx.Hash().Hex(), ob.chain.ChainId) + return true // skip donation tx + } + sender, err := ob.GetTransactionSender(tx, blockHash, receipt.TransactionIndex) + if err != nil { + ob.logger.ExternalChainWatcher.Err(err).Msgf( + "processIntxToTss: GetTransactionSender error for tx %s chain %d", tx.Hash().Hex(), ob.chain.ChainId) + return false // we have to re-scan this block next time + } + + msg := ob.GetInboundVoteMsgForTokenSentToTSS(tx, sender, bn) + if msg == nil { + return true // should never happen, always non-nil + } + zetaHash, ballot, err := ob.zetaClient.PostVoteInbound(PostVoteInboundGasLimit, PostVoteInboundExecutionGasLimit, msg) + if err != nil { + ob.logger.ExternalChainWatcher.Error().Err(err).Msgf( + "processIntxToTss: error posting to zeta core for tx %s at height %d for chain %d", tx.Hash().Hex(), bn, ob.chain.ChainId) + return false // we have to re-scan this block next time + } else if zetaHash != "" { + ob.logger.ExternalChainWatcher.Info().Msgf( + "processIntxToTss: gas asset deposit detected in tx %s at height %d for chain %d, PostSend zeta tx: %s ballot %s", + tx.Hash().Hex(), bn, ob.chain.ChainId, zetaHash, ballot) + } + return true +} + func (ob *EVMChainClient) WatchGasPrice() { ob.logger.WatchGasPrice.Info().Msg("WatchGasPrice starting...") err := ob.PostGasPrice() @@ -1411,15 +1484,51 @@ func (ob *EVMChainClient) GetTxID(nonce uint64) string { return fmt.Sprintf("%d-%s-%d", ob.chain.ChainId, tssAddr, nonce) } -func (ob *EVMChainClient) GetBlockByNumberCached(blockNumber uint64) (*ethtypes.Block, error) { - if block, ok := ob.BlockCache.Get(blockNumber); ok { - return block.(*ethtypes.Block), nil +func (ob *EVMChainClient) GetBlockHeaderCached(blockNumber uint64) (*ethtypes.Header, error) { + if header, ok := ob.headerCache.Get(blockNumber); ok { + return header.(*ethtypes.Header), nil } - block, err := ob.evmClient.BlockByNumber(context.Background(), new(big.Int).SetUint64(blockNumber)) + header, err := ob.evmClient.HeaderByNumber(context.Background(), new(big.Int).SetUint64(blockNumber)) if err != nil { return nil, err } - ob.BlockCache.Add(blockNumber, block) - ob.BlockCache.Add(block.Hash(), block) - return block, nil + ob.headerCache.Add(blockNumber, header) + return header, nil +} + +// GetBlockByNumberCached get block by number from cache +// returns block, ethrpc.Block, isFallback, isSkip, error +func (ob *EVMChainClient) GetBlockByNumberCached(blockNumber uint64) (*ethtypes.Block, *ethrpc.Block, bool, bool, error) { + if block, ok := ob.blockCache.Get(blockNumber); ok { + return block.(*ethtypes.Block), nil, false, false, nil + } + if block, ok := ob.blockCacheV3.Get(blockNumber); ok { + return nil, block.(*ethrpc.Block), true, false, nil + } + block, err := ob.evmClient.BlockByNumber(context.Background(), new(big.Int).SetUint64(blockNumber)) + if err != nil { + if strings.Contains(err.Error(), "block header indicates no transactions") { + return nil, nil, false, true, err // it's ok skip empty block + } else if strings.Contains(err.Error(), "transaction type not supported") { + if blockNumber > math.MaxInt32 { + return nil, nil, true, false, fmt.Errorf("block number %d is too large", blockNumber) + } + // #nosec G701 always in range, checked above + rpcBlock, err := ob.evmClientAlternate.EthGetBlockByNumber(int(blockNumber), true) + if err != nil { + return nil, nil, true, false, err // fall back on ethRPC but still fail + } + ob.blockCacheV3.Add(blockNumber, rpcBlock) + return nil, rpcBlock, true, false, nil // fall back on ethRPC without error + } + return nil, nil, false, false, err + } + ob.blockCache.Add(blockNumber, block) + return block, nil, false, false, nil +} + +// RemoveCachedBlock remove block from cache +func (ob *EVMChainClient) RemoveCachedBlock(blockNumber uint64) { + ob.blockCache.Remove(blockNumber) + ob.blockCacheV3.Remove(blockNumber) } diff --git a/zetaclient/evm_client_test.go b/zetaclient/evm_client_test.go new file mode 100644 index 0000000000..f24848bc26 --- /dev/null +++ b/zetaclient/evm_client_test.go @@ -0,0 +1,37 @@ +package zetaclient + +import ( + "math/big" + "testing" + + ethtypes "github.com/ethereum/go-ethereum/core/types" + lru "github.com/hashicorp/golang-lru" + "github.com/stretchr/testify/require" +) + +func TestEVMBlockCache(t *testing.T) { + // create client + blockCache, err := lru.New(1000) + require.NoError(t, err) + blockCacheV3, err := lru.New(1000) + require.NoError(t, err) + ob := EVMChainClient{ + blockCache: blockCache, + blockCacheV3: blockCacheV3, + } + + // delete non-existing block should not panic + blockNumber := int64(10388180) + // #nosec G701 possible nummber + ob.RemoveCachedBlock(uint64(blockNumber)) + + // add a block + header := ðtypes.Header{ + Number: big.NewInt(blockNumber), + } + block := ethtypes.NewBlock(header, nil, nil, nil, nil) + ob.blockCache.Add(blockNumber, block) + + // delete the block should not panic + ob.RemoveCachedBlock(uint64(blockNumber)) +}