Skip to content

Commit

Permalink
merge develop
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinssgh committed Jan 30, 2024
2 parents 0fa67e2 + 39dedf3 commit 51ff66c
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 88 deletions.
2 changes: 2 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
* [1642](https://github.com/zeta-chain/node/pull/1642) - Change WhitelistERC20 authorization from group1 to group2
* [1610](https://github.com/zeta-chain/node/issues/1610) - add pending outtx hash to tracker after monitoring for 10 minutes
* [1656](https://github.com/zeta-chain/node/issues/1656) - schedule bitcoin keysign with intervals to avoid keysign failures
* [1662](https://github.com/zeta-chain/node/issues/1662) - skip Goerli BlobTxType transactions introduced in Dencun upgrade
* [1663](https://github.com/zeta-chain/node/issues/1663) - skip Mumbai empty block if ethclient sanity check fails
* [1661](https://github.com/zeta-chain/node/issues/1661) - use estimated SegWit tx size for Bitcoin gas fee calculation
* [1667](https://github.com/zeta-chain/node/issues/1667) - estimate SegWit tx size in uinit of vByte

Expand Down
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ require (
gorm.io/gorm v1.24.6
)

require github.com/binance-chain/tss-lib v0.0.0-20201118045712-70b2cb4bf916
require (
github.com/binance-chain/tss-lib v0.0.0-20201118045712-70b2cb4bf916
github.com/onrik/ethrpc v1.2.0
)

require (
github.com/DataDog/zstd v1.5.2 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1869,6 +1869,7 @@ github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7Bd
github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc=
github.com/jaguilar/vt100 v0.0.0-20150826170717-2703a27b14ea/go.mod h1:QMdK4dGB3YhEW2BmA1wgGpPYI3HZy/5gD705PXKUVSg=
github.com/jarcoal/httpmock v1.0.5/go.mod h1:ATjnClrvW/3tijVmpL/va5Z3aAyGvqU3gCT8nX0Txik=
github.com/jarcoal/httpmock v1.3.0 h1:2RJ8GP0IIaWwcC9Fp2BmVi8Kog3v2Hn7VXM3fTd+nuc=
github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA=
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo=
github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABoLk/+KKHggpk=
Expand Down Expand Up @@ -2358,6 +2359,8 @@ github.com/olekukonko/tablewriter v0.0.2-0.20190409134802-7e037d187b0c/go.mod h1
github.com/olekukonko/tablewriter v0.0.2/go.mod h1:rSAaSIOAGT9odnlyGlUfAJaoc5w2fSBUmeGDbRWPxyQ=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
github.com/onrik/ethrpc v1.2.0 h1:BBcr1iWxW1RBP/eyZfzvSKtGgeqexq5qS0yyf4pmKbc=
github.com/onrik/ethrpc v1.2.0/go.mod h1:uvyqpn8+WbsTgBYfouImgEfpIMb0hR8fWGjwdgPHtFU=
github.com/onsi/ginkgo v0.0.0-20151202141238-7f8ab55aaf3b/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
Expand Down
241 changes: 171 additions & 70 deletions zetaclient/evm/evm_client.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package evm

import (
"bytes"
"context"
"fmt"
"math"
"math/big"
"os"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -27,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rlp"
lru "github.com/hashicorp/golang-lru"
"github.com/onrik/ethrpc"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -78,6 +81,7 @@ type ChainClient struct {
KlaytnClient interfaces.KlaytnRPCClient
zetaClient interfaces.ZetaCoreBridger
Tss interfaces.TSSSigner
evmClientAlternate *ethrpc.EthRPC // a fallback rpc client
lastBlockScanned uint64
lastBlock uint64
BlockTimeExternalChain uint64 // block time in seconds
Expand All @@ -97,7 +101,9 @@ type ChainClient struct {
params observertypes.ChainParams
ts *metricsPkg.TelemetryServer

BlockCache *lru.Cache
blockCache *lru.Cache
blockCacheV3 *lru.Cache // blockCacheV3 caches blocks containing type-3 (BlobTxType) transactions
headerCache *lru.Cache
}

var _ interfaces.ChainClient = (*ChainClient)(nil)
Expand Down Expand Up @@ -151,12 +157,24 @@ func NewEVMChainClient(
return nil, err
}
ob.evmClient = client
ob.evmClientAlternate = ethrpc.NewEthRPC(evmCfg.Endpoint)

ob.BlockCache, err = lru.New(1000)
// create block header and block caches
ob.blockCache, err = lru.New(1000)
if err != nil {
ob.logger.ChainLogger.Error().Err(err).Msg("failed to create block cache")
return nil, err
}
ob.blockCacheV3, err = lru.New(1000)
if err != nil {
ob.logger.ChainLogger.Error().Err(err).Msg("failed to create block cache v3")
return nil, err
}
ob.headerCache, err = lru.New(1000)
if err != nil {
ob.logger.ChainLogger.Error().Err(err).Msg("failed to create header cache")
return nil, err
}

if ob.chain.IsKlaytnChain() {
client, err := klaytn.Dial(evmCfg.Endpoint)
Expand Down Expand Up @@ -761,36 +779,56 @@ func (ob *ChainClient) checkConfirmedTx(txHash string, nonce uint64) (*ethtypes.
return nil, nil, false
}

// cross-check receipt against the block
block, err := ob.GetBlockByNumberCached(receipt.BlockNumber.Uint64())
if err != nil {
log.Error().Err(err).Msgf("confirmTxByHash: GetBlockByNumberCached error, txHash %s nonce %d block %d",
txHash, nonce, receipt.BlockNumber)
return nil, nil, false
}
// #nosec G701 non negative value
if receipt.TransactionIndex >= uint(len(block.Transactions())) {
log.Error().Msgf("confirmTxByHash: transaction index %d out of range [0, %d), txHash %s nonce %d block %d",
receipt.TransactionIndex, len(block.Transactions()), txHash, nonce, receipt.BlockNumber)
return nil, nil, false
}
txAtIndex := block.Transactions()[receipt.TransactionIndex]
if txAtIndex.Hash() != transaction.Hash() {
log.Error().Msgf("confirmTxByHash: transaction at index %d has different hash %s, txHash %s nonce %d block %d",
receipt.TransactionIndex, txAtIndex.Hash().Hex(), txHash, nonce, receipt.BlockNumber)
return nil, nil, false
}

// check confirmations
if !ob.HasEnoughConfirmations(receipt, ob.GetLastBlockHeight()) {
log.Debug().Msgf("confirmTxByHash: txHash %s nonce %d included but not confirmed: receipt block %d, current block %d",
txHash, nonce, receipt.BlockNumber, ob.GetLastBlockHeight())
return nil, nil, false
}

// cross-check tx inclusion against the block
// Note: a guard for false BlockNumber in receipt. The blob-carrying tx won't come here
err = ob.checkTxInclusion(transaction, receipt.BlockNumber.Uint64(), receipt.TransactionIndex)
if err != nil {
log.Error().Err(err).Msgf("confirmTxByHash: checkTxInclusion error for txHash %s nonce %d", txHash, nonce)
return nil, nil, false
}

return receipt, transaction, true
}

// checkTxInclusion returns nil only if tx is included in the block at blockNumber and txIndex
func (ob *ChainClient) checkTxInclusion(tx *ethtypes.Transaction, blockNumber uint64, txIndex uint) error {
block, blockRPC, fallBack, _, err := ob.GetBlockByNumberCached(blockNumber)
if err != nil {
return fmt.Errorf("GetBlockByNumberCached error for block %d txHash %s nonce %d: %w", blockNumber, tx.Hash(), tx.Nonce(), err)
}
if !fallBack {
// #nosec G701 non negative value
if txIndex >= uint(len(block.Transactions())) {
return fmt.Errorf("transaction index %d out of range [0, %d), txHash %s nonce %d block %d",
txIndex, len(block.Transactions()), tx.Hash(), tx.Nonce(), blockNumber)
}
txAtIndex := block.Transactions()[txIndex]
if txAtIndex.Hash() != tx.Hash() {
return fmt.Errorf("transaction at index %d has different hash %s, txHash %s nonce %d block %d",
txIndex, txAtIndex.Hash().Hex(), tx.Hash(), tx.Nonce(), blockNumber)
}
} else { // fell back on ETH RPC as ethclient failed to parse the block
// #nosec G701 non negative value
if txIndex >= uint(len(blockRPC.Transactions)) {
return fmt.Errorf("transaction index %d out of range [0, %d), txHash %s nonce %d block %d",
txIndex, len(block.Transactions()), tx.Hash(), tx.Nonce(), blockNumber)
}
txAtIndex := blockRPC.Transactions[txIndex]
if ethcommon.HexToHash(txAtIndex.Hash) != tx.Hash() {
return fmt.Errorf("transaction at index %d has different hash %s, txHash %s nonce %d block %d",
txIndex, txAtIndex.Hash, tx.Hash(), tx.Nonce(), blockNumber)
}
}
return nil
}

// SetLastBlockHeightScanned set last block height scanned (not necessarily caught up with external block; could be slow/paused)
func (ob *ChainClient) SetLastBlockHeightScanned(height uint64) {
atomic.StoreUint64(&ob.lastBlockScanned, height)
Expand Down Expand Up @@ -868,21 +906,21 @@ func (ob *ChainClient) postBlockHeader(tip uint64) error {
return fmt.Errorf("postBlockHeader: must post block confirmed block header: %d > %d", bn, tip)
}

block, err := ob.GetBlockByNumberCached(bn)
header, err := ob.GetBlockHeaderCached(bn)
if err != nil {
ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("postBlockHeader: error getting block: %d", bn)
return err
}
headerRLP, err := rlp.EncodeToBytes(block.Header())
headerRLP, err := rlp.EncodeToBytes(header)
if err != nil {
ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("postBlockHeader: error encoding block header: %d", bn)
return err
}

_, err = ob.zetaClient.PostAddBlockHeader(
ob.chain.ChainId,
block.Hash().Bytes(),
block.Number().Int64(),
header.Hash().Bytes(),
header.Number.Int64(),
common.NewEthereumHeader(headerRLP),
)
if err != nil {
Expand Down Expand Up @@ -1164,48 +1202,39 @@ func (ob *ChainClient) observeTssRecvd(startBlock, toBlock uint64, flags observe
}

// TODO: we can track the total number of 'getBlockByNumber' RPC calls made
block, err := ob.GetBlockByNumberCached(bn)
block, blockRPC, fallBack, skip, err := ob.GetBlockByNumberCached(bn)
if err != nil {
if skip {
ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("observeTssRecvd: skip block %d for chain %d", bn, ob.chain.ChainId)
continue
}
ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("observeTssRecvd: error getting block %d for chain %d", bn, ob.chain.ChainId)
return bn - 1 // we have to re-scan from this block next time
}
for _, tx := range block.Transactions() {
if tx.To() == nil {
continue
}

if *tx.To() == tssAddress {
receipt, err := ob.evmClient.TransactionReceipt(context.Background(), tx.Hash())
if err != nil {
ob.logger.ExternalChainWatcher.Err(err).Msgf(
"observeTssRecvd: TransactionReceipt error for tx %s chain %d", tx.Hash().Hex(), ob.chain.ChainId)
return bn - 1 // we have to re-scan this block next time
}
if receipt.Status != 1 { // 1: successful, 0: failed
ob.logger.ExternalChainWatcher.Info().Msgf("observeTssRecvd: tx %s chain %d failed; don't act", tx.Hash().Hex(), ob.chain.ChainId)
continue
}

sender, err := ob.GetTransactionSender(tx, block.Hash(), receipt.TransactionIndex)
if err != nil {
ob.logger.ExternalChainWatcher.Err(err).Msgf(
"observeTssRecvd: GetTransactionSender error for tx %s chain %d", tx.Hash().Hex(), ob.chain.ChainId)
return bn - 1 // we have to re-scan this block next time
}

msg := ob.GetInboundVoteMsgForTokenSentToTSS(tx, sender, receipt.BlockNumber.Uint64())
if msg == nil {
continue
if !fallBack {
for _, tx := range block.Transactions() {
if tx.To() != nil && *tx.To() == tssAddress {
if ok := ob.processIntxToTss(tx, bn, block.Hash()); !ok {
return bn - 1 // we have to re-scan this block next time
}
}
zetaHash, ballot, err := ob.zetaClient.PostVoteInbound(zetabridge.PostVoteInboundGasLimit, zetabridge.PostVoteInboundExecutionGasLimit, msg)
if err != nil {
ob.logger.ExternalChainWatcher.Error().Err(err).Msgf(
"observeTssRecvd: error posting to zeta core for tx %s at height %d for chain %d", tx.Hash().Hex(), bn, ob.chain.ChainId)
return bn - 1 // we have to re-scan this block next time
} else if zetaHash != "" {
ob.logger.ExternalChainWatcher.Info().Msgf(
"observeTssRecvd: gas asset deposit detected in tx %s at height %d for chain %d, PostVoteInbound zeta tx: %s ballot %s",
tx.Hash().Hex(), bn, ob.chain.ChainId, zetaHash, ballot)
}
} else { // fell back on ETH RPC as ethclient failed to parse the block
ob.logger.ExternalChainWatcher.Info().Msgf("observeTssRecvd: processing block %d using fallback for chain %d", bn, ob.chain.ChainId)
for _, txRPC := range blockRPC.Transactions {
if ethcommon.HexToAddress(txRPC.To) == tssAddress {
tx, _, err := ob.evmClient.TransactionByHash(context.Background(), ethcommon.HexToHash(txRPC.Hash))
if err != nil {
if strings.Contains(err.Error(), "transaction type not supported") {
ob.logger.ExternalChainWatcher.Err(err).Msgf(
"observeTssRecvd: transaction type not supported for tx %s chain %d", txRPC.Hash, ob.chain.ChainId)
continue // skip blob-carrying tx to TSS address
}
return bn - 1 // we have to re-scan this block next time
}
if ok := ob.processIntxToTss(tx, bn, ethcommon.HexToHash(blockRPC.Hash)); !ok {
return bn - 1 // we have to re-scan this block next time
}
}
}
}
Expand All @@ -1214,6 +1243,48 @@ func (ob *ChainClient) observeTssRecvd(startBlock, toBlock uint64, flags observe
return toBlock
}

// processIntxToTss processes the incoming tx to TSS address and posts to zetacore
// returns true if the tx is successfully processed, false otherwise
func (ob *ChainClient) processIntxToTss(tx *ethtypes.Transaction, bn uint64, blockHash ethcommon.Hash) bool {
receipt, err := ob.evmClient.TransactionReceipt(context.Background(), tx.Hash())
if err != nil {
ob.logger.ExternalChainWatcher.Err(err).Msgf(
"processIntxToTss: TransactionReceipt error for tx %s chain %d", tx.Hash().Hex(), ob.chain.ChainId)
return false // we have to re-scan this block next time
}
if receipt.Status != 1 { // 1: successful, 0: failed
ob.logger.ExternalChainWatcher.Info().Msgf("processIntxToTss: tx %s chain %d failed; don't act", tx.Hash().Hex(), ob.chain.ChainId)
return true // skip failed tx
}
if bytes.Equal(tx.Data(), []byte(DonationMessage)) {
ob.logger.ExternalChainWatcher.Info().Msgf(
"processIntxToTss: thank you rich folk for your donation!: %s chain %d", tx.Hash().Hex(), ob.chain.ChainId)
return true // skip donation tx
}
sender, err := ob.GetTransactionSender(tx, blockHash, receipt.TransactionIndex)
if err != nil {
ob.logger.ExternalChainWatcher.Err(err).Msgf(
"processIntxToTss: GetTransactionSender error for tx %s chain %d", tx.Hash().Hex(), ob.chain.ChainId)
return false // we have to re-scan this block next time
}

msg := ob.GetInboundVoteMsgForTokenSentToTSS(tx, sender, bn)
if msg == nil {
return true // should never happen, always non-nil
}
zetaHash, ballot, err := ob.zetaClient.PostVoteInbound(zetabridge.PostVoteInboundGasLimit, zetabridge.PostVoteInboundExecutionGasLimit, msg)
if err != nil {
ob.logger.ExternalChainWatcher.Error().Err(err).Msgf(
"processIntxToTss: error posting to zeta core for tx %s at height %d for chain %d", tx.Hash().Hex(), bn, ob.chain.ChainId)
return false // we have to re-scan this block next time
} else if zetaHash != "" {
ob.logger.ExternalChainWatcher.Info().Msgf(
"processIntxToTss: gas asset deposit detected in tx %s at height %d for chain %d, PostSend zeta tx: %s ballot %s",
tx.Hash().Hex(), bn, ob.chain.ChainId, zetaHash, ballot)
}
return true
}

func (ob *ChainClient) WatchGasPrice() {
ob.logger.WatchGasPrice.Info().Msg("WatchGasPrice starting...")
err := ob.PostGasPrice()
Expand Down Expand Up @@ -1416,15 +1487,45 @@ func (ob *ChainClient) GetTxID(nonce uint64) string {
return fmt.Sprintf("%d-%s-%d", ob.chain.ChainId, tssAddr, nonce)
}

func (ob *ChainClient) GetBlockByNumberCached(blockNumber uint64) (*ethtypes.Block, error) {
if block, ok := ob.BlockCache.Get(blockNumber); ok {
return block.(*ethtypes.Block), nil
func (ob *ChainClient) GetBlockHeaderCached(blockNumber uint64) (*ethtypes.Header, error) {
if header, ok := ob.headerCache.Get(blockNumber); ok {
return header.(*ethtypes.Header), nil
}
block, err := ob.evmClient.BlockByNumber(context.Background(), new(big.Int).SetUint64(blockNumber))
header, err := ob.evmClient.HeaderByNumber(context.Background(), new(big.Int).SetUint64(blockNumber))
if err != nil {
return nil, err
}
ob.BlockCache.Add(blockNumber, block)
ob.BlockCache.Add(block.Hash(), block)
return block, nil
ob.headerCache.Add(blockNumber, header)
return header, nil
}

// GetBlockByNumberCached get block by number from cache
// returns block, ethrpc.Block, isFallback, isSkip, error
func (ob *ChainClient) GetBlockByNumberCached(blockNumber uint64) (*ethtypes.Block, *ethrpc.Block, bool, bool, error) {
if block, ok := ob.blockCache.Get(blockNumber); ok {
return block.(*ethtypes.Block), nil, false, false, nil
}
if block, ok := ob.blockCacheV3.Get(blockNumber); ok {
return nil, block.(*ethrpc.Block), true, false, nil
}
block, err := ob.evmClient.BlockByNumber(context.Background(), new(big.Int).SetUint64(blockNumber))
if err != nil {
if strings.Contains(err.Error(), "block header indicates no transactions") {
return nil, nil, false, true, err // it's ok skip empty block
} else if strings.Contains(err.Error(), "transaction type not supported") {
if blockNumber > math.MaxInt32 {
return nil, nil, true, false, fmt.Errorf("block number %d is too large", blockNumber)
}
// #nosec G701 always in range, checked above
rpcBlock, err := ob.evmClientAlternate.EthGetBlockByNumber(int(blockNumber), true)
if err != nil {
return nil, nil, true, false, err // fall back on ethRPC but still fail
}
ob.blockCacheV3.Add(blockNumber, rpcBlock)
return nil, rpcBlock, true, false, nil // fall back on ethRPC without error
}
return nil, nil, false, false, err
}
ob.blockCache.Add(blockNumber, block)
return block, nil, false, false, nil
}
Loading

0 comments on commit 51ff66c

Please sign in to comment.