From ad42b527def0ede5804a0b27137628b2278a78e9 Mon Sep 17 00:00:00 2001 From: Charlie Chen Date: Fri, 19 Jan 2024 14:38:17 -0600 Subject: [PATCH] skip Goerli unknown tx type and Mumbai empty block --- zetaclient/ethrpc/ethrpc.go | 137 ++++++++++++++++++++++++++++ zetaclient/ethrpc/helper.go | 40 +++++++++ zetaclient/ethrpc/types.go | 157 ++++++++++++++++++++++++++++++++ zetaclient/evm_client.go | 173 ++++++++++++++++++++++++------------ 4 files changed, 452 insertions(+), 55 deletions(-) create mode 100644 zetaclient/ethrpc/ethrpc.go create mode 100644 zetaclient/ethrpc/helper.go create mode 100644 zetaclient/ethrpc/types.go diff --git a/zetaclient/ethrpc/ethrpc.go b/zetaclient/ethrpc/ethrpc.go new file mode 100644 index 0000000000..88283254f2 --- /dev/null +++ b/zetaclient/ethrpc/ethrpc.go @@ -0,0 +1,137 @@ +package ethrpc + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" +) + +// EthError - ethereum error +type EthError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +func (err EthError) Error() string { + return fmt.Sprintf("Error %d (%s)", err.Code, err.Message) +} + +type ethResponse struct { + ID int `json:"id"` + JSONRPC string `json:"jsonrpc"` + Result json.RawMessage `json:"result"` + Error *EthError `json:"error"` +} + +type ethRequest struct { + ID int `json:"id"` + JSONRPC string `json:"jsonrpc"` + Method string `json:"method"` + Params []interface{} `json:"params"` +} + +// EthRPC - Ethereum rpc client +type EthRPC struct { + url string + client *http.Client +} + +// New create new rpc client with given url +func New(url string, options ...func(rpc *EthRPC)) *EthRPC { + rpc := &EthRPC{ + url: url, + client: http.DefaultClient, + } + for _, option := range options { + option(rpc) + } + + return rpc +} + +// NewEthRPC create new rpc client with given url +func NewEthRPC(url string, options ...func(rpc *EthRPC)) *EthRPC { + return New(url, options...) +} + +// URL returns client url +func (rpc *EthRPC) URL() string { + return rpc.url +} + +// Call returns raw response of method call +func (rpc *EthRPC) Call(method string, params ...interface{}) (json.RawMessage, error) { + request := ethRequest{ + ID: 1, + JSONRPC: "2.0", + Method: method, + Params: params, + } + + body, err := json.Marshal(request) + if err != nil { + return nil, err + } + + response, err := rpc.client.Post(rpc.url, "application/json", bytes.NewBuffer(body)) + if response != nil { + defer response.Body.Close() + } + if err != nil { + return nil, err + } + + data, err := io.ReadAll(response.Body) + if err != nil { + return nil, err + } + + resp := new(ethResponse) + if err := json.Unmarshal(data, resp); err != nil { + return nil, err + } + + if resp.Error != nil { + return nil, *resp.Error + } + + return resp.Result, nil + +} + +// RawCall returns raw response of method call (Deprecated) +func (rpc *EthRPC) RawCall(method string, params ...interface{}) (json.RawMessage, error) { + return rpc.Call(method, params...) +} + +func (rpc *EthRPC) getBlock(method string, withTransactions bool, params ...interface{}) (*Block, error) { + result, err := rpc.RawCall(method, params...) + if err != nil { + return nil, err + } + if bytes.Equal(result, []byte("null")) { + return nil, nil + } + + var response proxyBlock + if withTransactions { + response = new(proxyBlockWithTransactions) + } else { + response = new(proxyBlockWithoutTransactions) + } + + err = json.Unmarshal(result, response) + if err != nil { + return nil, err + } + + block := response.toBlock() + return &block, nil +} + +// EthGetBlockByNumber returns information about a block by block number. +func (rpc *EthRPC) EthGetBlockByNumber(number uint64, withTransactions bool) (*Block, error) { + return rpc.getBlock("eth_getBlockByNumber", withTransactions, IntToHex(number), withTransactions) +} diff --git a/zetaclient/ethrpc/helper.go b/zetaclient/ethrpc/helper.go new file mode 100644 index 0000000000..c826bf94cd --- /dev/null +++ b/zetaclient/ethrpc/helper.go @@ -0,0 +1,40 @@ +package ethrpc + +import ( + "fmt" + "math/big" + "strconv" + "strings" +) + +// ParseInt parse hex string value to uint64 +func ParseInt(value string) (uint64, error) { + i, err := strconv.ParseUint(strings.TrimPrefix(value, "0x"), 16, 64) + if err != nil { + return 0, err + } + + return i, nil +} + +// ParseBigInt parse hex string value to big.Int +func ParseBigInt(value string) (big.Int, error) { + i := big.Int{} + _, err := fmt.Sscan(value, &i) + + return i, err +} + +// IntToHex convert int to hexadecimal representation +func IntToHex(i uint64) string { + return fmt.Sprintf("0x%x", i) +} + +// BigToHex covert big.Int to hexadecimal representation +func BigToHex(bigInt big.Int) string { + if bigInt.BitLen() == 0 { + return "0x0" + } + + return "0x" + strings.TrimPrefix(fmt.Sprintf("%x", bigInt.Bytes()), "0") +} diff --git a/zetaclient/ethrpc/types.go b/zetaclient/ethrpc/types.go new file mode 100644 index 0000000000..6146e0e724 --- /dev/null +++ b/zetaclient/ethrpc/types.go @@ -0,0 +1,157 @@ +package ethrpc + +import ( + "bytes" + "math/big" + "unsafe" +) + +// Transaction - transaction object +type Transaction struct { + Hash string + Nonce int + BlockHash string + BlockNumber *int + TransactionIndex *int + From string + To string + Value big.Int + Gas int + GasPrice big.Int + Input string +} + +// Block - block object +type Block struct { + Number uint64 + Hash string + ParentHash string + Nonce string + Sha3Uncles string + LogsBloom string + TransactionsRoot string + StateRoot string + Miner string + Difficulty big.Int + TotalDifficulty big.Int + ExtraData string + Size int + GasLimit int + GasUsed int + Timestamp int + Uncles []string + Transactions []Transaction +} + +type hexInt uint64 + +func (i *hexInt) UnmarshalJSON(data []byte) error { + result, err := ParseInt(string(bytes.Trim(data, `"`))) + *i = hexInt(result) + + return err +} + +type hexBig big.Int + +func (i *hexBig) UnmarshalJSON(data []byte) error { + result, err := ParseBigInt(string(bytes.Trim(data, `"`))) + *i = hexBig(result) + + return err +} + +type proxyBlock interface { + toBlock() Block +} + +type proxyBlockWithTransactions struct { + Number hexInt `json:"number"` + Hash string `json:"hash"` + ParentHash string `json:"parentHash"` + Nonce string `json:"nonce"` + Sha3Uncles string `json:"sha3Uncles"` + LogsBloom string `json:"logsBloom"` + TransactionsRoot string `json:"transactionsRoot"` + StateRoot string `json:"stateRoot"` + Miner string `json:"miner"` + Difficulty hexBig `json:"difficulty"` + TotalDifficulty hexBig `json:"totalDifficulty"` + ExtraData string `json:"extraData"` + Size hexInt `json:"size"` + GasLimit hexInt `json:"gasLimit"` + GasUsed hexInt `json:"gasUsed"` + Timestamp hexInt `json:"timestamp"` + Uncles []string `json:"uncles"` + Transactions []proxyTransaction `json:"transactions"` +} + +type proxyBlockWithoutTransactions struct { + Number hexInt `json:"number"` + Hash string `json:"hash"` + ParentHash string `json:"parentHash"` + Nonce string `json:"nonce"` + Sha3Uncles string `json:"sha3Uncles"` + LogsBloom string `json:"logsBloom"` + TransactionsRoot string `json:"transactionsRoot"` + StateRoot string `json:"stateRoot"` + Miner string `json:"miner"` + Difficulty hexBig `json:"difficulty"` + TotalDifficulty hexBig `json:"totalDifficulty"` + ExtraData string `json:"extraData"` + Size hexInt `json:"size"` + GasLimit hexInt `json:"gasLimit"` + GasUsed hexInt `json:"gasUsed"` + Timestamp hexInt `json:"timestamp"` + Uncles []string `json:"uncles"` + Transactions []string `json:"transactions"` +} + +func (proxy *proxyBlockWithoutTransactions) toBlock() Block { + block := Block{ + Number: uint64(proxy.Number), + Hash: proxy.Hash, + ParentHash: proxy.ParentHash, + Nonce: proxy.Nonce, + Sha3Uncles: proxy.Sha3Uncles, + LogsBloom: proxy.LogsBloom, + TransactionsRoot: proxy.TransactionsRoot, + StateRoot: proxy.StateRoot, + Miner: proxy.Miner, + Difficulty: big.Int(proxy.Difficulty), + TotalDifficulty: big.Int(proxy.TotalDifficulty), + ExtraData: proxy.ExtraData, + Size: int(proxy.Size), + GasLimit: int(proxy.GasLimit), + GasUsed: int(proxy.GasUsed), + Timestamp: int(proxy.Timestamp), + Uncles: proxy.Uncles, + } + + block.Transactions = make([]Transaction, len(proxy.Transactions)) + for i := range proxy.Transactions { + block.Transactions[i] = Transaction{ + Hash: proxy.Transactions[i], + } + } + + return block +} + +type proxyTransaction struct { + Hash string `json:"hash"` + Nonce hexInt `json:"nonce"` + BlockHash string `json:"blockHash"` + BlockNumber *hexInt `json:"blockNumber"` + TransactionIndex *hexInt `json:"transactionIndex"` + From string `json:"from"` + To string `json:"to"` + Value hexBig `json:"value"` + Gas hexInt `json:"gas"` + GasPrice hexBig `json:"gasPrice"` + Input string `json:"input"` +} + +func (proxy *proxyBlockWithTransactions) toBlock() Block { + return *(*Block)(unsafe.Pointer(proxy)) +} diff --git a/zetaclient/evm_client.go b/zetaclient/evm_client.go index 8a68ac121e..0e426c35c4 100644 --- a/zetaclient/evm_client.go +++ b/zetaclient/evm_client.go @@ -9,6 +9,7 @@ import ( "os" "sort" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -32,6 +33,7 @@ import ( "github.com/zeta-chain/zetacore/x/crosschain/types" observertypes "github.com/zeta-chain/zetacore/x/observer/types" "github.com/zeta-chain/zetacore/zetaclient/config" + "github.com/zeta-chain/zetacore/zetaclient/ethrpc" metricsPkg "github.com/zeta-chain/zetacore/zetaclient/metrics" clienttypes "github.com/zeta-chain/zetacore/zetaclient/types" "gorm.io/driver/sqlite" @@ -71,6 +73,7 @@ type EVMChainClient struct { *ChainMetrics chain common.Chain evmClient EVMRPCClient + rpcClient *ethrpc.EthRPC // a fallback rpc client KlaytnClient KlaytnRPCClient zetaClient ZetaCoreBridger Tss TSSSigner @@ -93,7 +96,8 @@ type EVMChainClient struct { params observertypes.ChainParams ts *TelemetryServer - BlockCache *lru.Cache + BlockCache *lru.Cache + HeaderCache *lru.Cache } var _ ChainClient = (*EVMChainClient)(nil) @@ -147,12 +151,19 @@ func NewEVMChainClient( return nil, err } ob.evmClient = client + ob.rpcClient = ethrpc.NewEthRPC(evmCfg.Endpoint) + // create block header and block caches ob.BlockCache, err = lru.New(1000) if err != nil { ob.logger.ChainLogger.Error().Err(err).Msg("failed to create block cache") return nil, err } + ob.HeaderCache, err = lru.New(1000) + if err != nil { + ob.logger.ChainLogger.Error().Err(err).Msg("failed to create header cache") + return nil, err + } if ob.chain.IsKlaytnChain() { client, err := Dial(evmCfg.Endpoint) @@ -813,12 +824,12 @@ func (ob *EVMChainClient) postBlockHeader(tip uint64) error { return fmt.Errorf("postBlockHeader: must post block confirmed block header: %d > %d", bn, tip) } - block, err := ob.GetBlockByNumberCached(bn) + header, err := ob.GetBlockHeaderCached(bn) if err != nil { ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("postBlockHeader: error getting block: %d", bn) return err } - headerRLP, err := rlp.EncodeToBytes(block.Header()) + headerRLP, err := rlp.EncodeToBytes(header) if err != nil { ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("postBlockHeader: error encoding block header: %d", bn) return err @@ -826,8 +837,8 @@ func (ob *EVMChainClient) postBlockHeader(tip uint64) error { _, err = ob.zetaClient.PostAddBlockHeader( ob.chain.ChainId, - block.Hash().Bytes(), - block.Number().Int64(), + header.Hash().Bytes(), + header.Number.Int64(), common.NewEthereumHeader(headerRLP), ) if err != nil { @@ -1090,58 +1101,39 @@ func (ob *EVMChainClient) observeTssRecvd(startBlock, toBlock uint64, flags obse } // TODO: we can track the total number of 'getBlockByNumber' RPC calls made - block, err := ob.GetBlockByNumberCached(bn) + block, blockRPC, fallBack, skip, err := ob.GetBlockByNumberCached(bn) if err != nil { - ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("observeTssRecvd: error getting block %d for chain %d", bn, ob.chain.ChainId) - return startBlock - 1 // we have to re-scan from this block next time - } - for _, tx := range block.Transactions() { - if tx.To() == nil { - continue - } - if bytes.Equal(tx.Data(), []byte(DonationMessage)) { - ob.logger.ExternalChainWatcher.Info().Msgf( - "observeTssRecvd: thank you rich folk for your donation!: %s chain %d", tx.Hash().Hex(), ob.chain.ChainId) + if skip { + ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("observeTssRecvd: skip block %d for chain %d", bn, ob.chain.ChainId) continue } - - if *tx.To() == tssAddress { - receipt, err := ob.evmClient.TransactionReceipt(context.Background(), tx.Hash()) - if err != nil { - ob.logger.ExternalChainWatcher.Err(err).Msgf( - "observeTssRecvd: TransactionReceipt error for tx %s chain %d", tx.Hash().Hex(), ob.chain.ChainId) - return startBlock - 1 // we have to re-scan this block next time - } - if receipt.Status != 1 { // 1: successful, 0: failed - ob.logger.ExternalChainWatcher.Info().Msgf("observeTssRecvd: tx %s chain %d failed; don't act", tx.Hash().Hex(), ob.chain.ChainId) - continue + ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("observeTssRecvd: error getting block %d for chain %d", bn, ob.chain.ChainId) + return bn - 1 // we have to re-scan from this block next time + } + if !fallBack { + for _, tx := range block.Transactions() { + if tx.To() != nil && *tx.To() == tssAddress { + if ok := ob.processIntxToTss(tx, bn, block.Hash()); !ok { + return bn - 1 // we have to re-scan this block next time + } } - - from, err := ob.evmClient.TransactionSender(context.Background(), tx, block.Hash(), receipt.TransactionIndex) - if err != nil { - ob.logger.ExternalChainWatcher.Err(err).Msgf("observeTssRecvd: TransactionSender error for tx %s", tx.Hash().Hex()) - // trying local recovery (assuming LondonSigner dynamic fee tx type) of sender address - signer := ethtypes.NewLondonSigner(big.NewInt(ob.chain.ChainId)) - from, err = signer.Sender(tx) + } + } else { // fell back on ETH RPC as ethclient failed to parse the block + ob.logger.ExternalChainWatcher.Info().Msgf("observeTssRecvd: processing block %d using fallback for chain %d", bn, ob.chain.ChainId) + for _, txRPC := range blockRPC.Transactions { + if ethcommon.HexToAddress(txRPC.To) == tssAddress { + tx, _, err := ob.evmClient.TransactionByHash(context.Background(), ethcommon.HexToHash(txRPC.Hash)) if err != nil { - ob.logger.ExternalChainWatcher.Err(err).Msgf( - "observeTssRecvd: local recovery of sender address failed for tx %s chain %d", tx.Hash().Hex(), ob.chain.ChainId) - continue + if strings.Contains(err.Error(), "transaction type not supported") { + ob.logger.ExternalChainWatcher.Err(err).Msgf( + "observeTssRecvd: transaction type not supported for tx %s chain %d", txRPC.Hash, ob.chain.ChainId) + continue // skip blob-carrying tx to TSS address + } + return bn - 1 // we have to re-scan this block next time + } + if ok := ob.processIntxToTss(tx, blockRPC.Number, ethcommon.HexToHash(blockRPC.Hash)); !ok { + return bn - 1 // we have to re-scan this block next time } - } - msg := ob.GetInboundVoteMsgForTokenSentToTSS(tx.Hash(), tx.Value(), receipt, from, tx.Data()) - if msg == nil { - continue - } - zetaHash, ballot, err := ob.zetaClient.PostSend(PostSendEVMGasLimit, msg) - if err != nil { - ob.logger.ExternalChainWatcher.Error().Err(err).Msgf( - "observeTssRecvd: error posting to zeta core for tx %s at height %d for chain %d", tx.Hash().Hex(), bn, ob.chain.ChainId) - return startBlock - 1 // we have to re-scan this block next time - } else if zetaHash != "" { - ob.logger.ExternalChainWatcher.Info().Msgf( - "observeTssRecvd: gas asset deposit detected in tx %s at height %d for chain %d, PostSend zeta tx: %s ballot %s", - tx.Hash().Hex(), bn, ob.chain.ChainId, zetaHash, ballot) } } } @@ -1150,6 +1142,54 @@ func (ob *EVMChainClient) observeTssRecvd(startBlock, toBlock uint64, flags obse return toBlock } +// processIntxToTss processes the incoming tx to TSS address and posts to zetacore +// returns true if the tx is successfully processed, false otherwise +func (ob *EVMChainClient) processIntxToTss(tx *ethtypes.Transaction, bn uint64, blockHash ethcommon.Hash) bool { + receipt, err := ob.evmClient.TransactionReceipt(context.Background(), tx.Hash()) + if err != nil { + ob.logger.ExternalChainWatcher.Err(err).Msgf( + "processIntxToTss: TransactionReceipt error for tx %s chain %d", tx.Hash().Hex(), ob.chain.ChainId) + return false // we have to re-scan this block next time + } + if receipt.Status != 1 { // 1: successful, 0: failed + ob.logger.ExternalChainWatcher.Info().Msgf("processIntxToTss: tx %s chain %d failed; don't act", tx.Hash().Hex(), ob.chain.ChainId) + return true // skip failed tx + } + if bytes.Equal(tx.Data(), []byte(DonationMessage)) { + ob.logger.ExternalChainWatcher.Info().Msgf( + "processIntxToTss: thank you rich folk for your donation!: %s chain %d", tx.Hash().Hex(), ob.chain.ChainId) + return true // skip donation tx + } + + from, err := ob.evmClient.TransactionSender(context.Background(), tx, blockHash, receipt.TransactionIndex) + if err != nil { + ob.logger.ExternalChainWatcher.Err(err).Msgf("processIntxToTss: TransactionSender error for tx %s", tx.Hash().Hex()) + // trying local recovery (assuming LondonSigner dynamic fee tx type) of sender address + signer := ethtypes.NewLondonSigner(big.NewInt(ob.chain.ChainId)) + from, err = signer.Sender(tx) + if err != nil { + ob.logger.ExternalChainWatcher.Err(err).Msgf( + "processIntxToTss: local recovery of sender address failed for tx %s chain %d", tx.Hash().Hex(), ob.chain.ChainId) + return false // we have to re-scan this block next time + } + } + msg := ob.GetInboundVoteMsgForTokenSentToTSS(tx.Hash(), tx.Value(), receipt, from, tx.Data()) + if msg == nil { + return true // should never happen, always non-nil + } + zetaHash, ballot, err := ob.zetaClient.PostSend(PostSendEVMGasLimit, msg) + if err != nil { + ob.logger.ExternalChainWatcher.Error().Err(err).Msgf( + "processIntxToTss: error posting to zeta core for tx %s at height %d for chain %d", tx.Hash().Hex(), bn, ob.chain.ChainId) + return false // we have to re-scan this block next time + } else if zetaHash != "" { + ob.logger.ExternalChainWatcher.Info().Msgf( + "processIntxToTss: gas asset deposit detected in tx %s at height %d for chain %d, PostSend zeta tx: %s ballot %s", + tx.Hash().Hex(), bn, ob.chain.ChainId, zetaHash, ballot) + } + return true +} + func (ob *EVMChainClient) WatchGasPrice() { err := ob.PostGasPrice() @@ -1351,15 +1391,38 @@ func (ob *EVMChainClient) GetTxID(nonce uint64) string { return fmt.Sprintf("%d-%s-%d", ob.chain.ChainId, tssAddr, nonce) } -func (ob *EVMChainClient) GetBlockByNumberCached(blockNumber uint64) (*ethtypes.Block, error) { +func (ob *EVMChainClient) GetBlockHeaderCached(blockNumber uint64) (*ethtypes.Header, error) { + if header, ok := ob.HeaderCache.Get(blockNumber); ok { + return header.(*ethtypes.Header), nil + } + header, err := ob.evmClient.HeaderByNumber(context.Background(), new(big.Int).SetUint64(blockNumber)) + if err != nil { + return nil, err + } + ob.HeaderCache.Add(blockNumber, header) + return header, nil +} + +// GetBlockByNumberCached get block by number from cache +// returns block, ethrpc.Block, isFallback, isSkip, error +func (ob *EVMChainClient) GetBlockByNumberCached(blockNumber uint64) (*ethtypes.Block, *ethrpc.Block, bool, bool, error) { if block, ok := ob.BlockCache.Get(blockNumber); ok { - return block.(*ethtypes.Block), nil + return block.(*ethtypes.Block), nil, false, false, nil } block, err := ob.evmClient.BlockByNumber(context.Background(), new(big.Int).SetUint64(blockNumber)) if err != nil { - return nil, err + if strings.Contains(err.Error(), "block header indicates no transactions") { + return nil, nil, false, true, err // it's ok skip empty block + } else if strings.Contains(err.Error(), "transaction type not supported") { + rpcBlock, err := ob.rpcClient.EthGetBlockByNumber(blockNumber, true) + if err != nil { + return nil, nil, true, false, err // fall back on ethRPC but still fail + } + return nil, rpcBlock, true, false, nil // fall back on ethRPC without error + } + return nil, nil, false, false, err } ob.BlockCache.Add(blockNumber, block) ob.BlockCache.Add(block.Hash(), block) - return block, nil + return block, nil, false, false, nil }