Skip to content

Commit

Permalink
skip Goerli BlobTxType transaction and Mumbai empty block
Browse files Browse the repository at this point in the history
  • Loading branch information
ws4charlie committed Feb 13, 2024
1 parent 66d1010 commit 965e366
Show file tree
Hide file tree
Showing 4 changed files with 221 additions and 70 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ require (
github.com/libp2p/go-yamux/v4 v4.0.0 // indirect
github.com/linxGnu/grocksdb v1.7.15 // indirect
github.com/miekg/pkcs11 v1.1.1 // indirect
github.com/onrik/ethrpc v1.2.0 // indirect
github.com/onsi/ginkgo/v2 v2.9.7 // indirect
github.com/prometheus/tsdb v0.7.1 // indirect
github.com/quic-go/qpack v0.4.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1848,6 +1848,7 @@ github.com/ipfs/go-datastore v0.6.0/go.mod h1:rt5M3nNbSO/8q1t4LNkLyUwRs8HupMeN/8
github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk=
github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps=
github.com/ipfs/go-ipfs-util v0.0.2 h1:59Sswnk1MFaiq+VcaknX7aYEyGyGDAA73ilhEK2POp8=
github.com/ipfs/go-ipfs-util v0.0.2/go.mod h1:CbPtkWJzjLdEcezDns2XYaehFVNXG9zrdrtMecczcsQ=
github.com/ipfs/go-log v1.0.5 h1:2dOuUCB1Z7uoczMWgAyDck5JLb72zHzrMnGnCNNbvY8=
github.com/ipfs/go-log v1.0.5/go.mod h1:j0b8ZoR+7+R99LD9jZ6+AJsrzkPbSXbZfGakb5JPtIo=
github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Axpmri6g=
Expand Down Expand Up @@ -2358,6 +2359,8 @@ github.com/olekukonko/tablewriter v0.0.2-0.20190409134802-7e037d187b0c/go.mod h1
github.com/olekukonko/tablewriter v0.0.2/go.mod h1:rSAaSIOAGT9odnlyGlUfAJaoc5w2fSBUmeGDbRWPxyQ=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
github.com/onrik/ethrpc v1.2.0 h1:BBcr1iWxW1RBP/eyZfzvSKtGgeqexq5qS0yyf4pmKbc=
github.com/onrik/ethrpc v1.2.0/go.mod h1:uvyqpn8+WbsTgBYfouImgEfpIMb0hR8fWGjwdgPHtFU=
github.com/onsi/ginkgo v0.0.0-20151202141238-7f8ab55aaf3b/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
Expand Down Expand Up @@ -2757,6 +2760,7 @@ github.com/sourcegraph/go-diff v0.5.1/go.mod h1:j2dHj3m8aZgQO8lMTcTnBcXkRRRqi34c
github.com/sourcegraph/go-diff v0.5.3/go.mod h1:v9JDtjCE4HHHCZGId75rg8gkKKa98RVjBcBGsVmMmak=
github.com/sourcegraph/go-diff v0.6.1/go.mod h1:iBszgVvyxdc8SFZ7gm69go2KDdt3ag071iBaWPF6cjs=
github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e/go.mod h1:HuIsMU8RRBOtsCgI77wP899iHVBQpCmg4ErYMZB+2IA=
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572/go.mod h1:w0SWMsp6j9O/dk4/ZpIhL+3CkG8ofA2vuv7k+ltqUMc=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
Expand Down
249 changes: 179 additions & 70 deletions zetaclient/evm_client.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package zetaclient

import (
"bytes"
"context"
"fmt"
"math"
"math/big"
"os"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -22,6 +24,7 @@ import (
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rlp"
lru "github.com/hashicorp/golang-lru"
"github.com/onrik/ethrpc"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -70,6 +73,7 @@ type EVMChainClient struct {
*ChainMetrics
chain common.Chain
evmClient EVMRPCClient
evmClientAlternate *ethrpc.EthRPC // a fallback rpc client
KlaytnClient KlaytnRPCClient
zetaClient ZetaCoreBridger
Tss TSSSigner
Expand All @@ -92,7 +96,9 @@ type EVMChainClient struct {
params observertypes.ChainParams
ts *TelemetryServer

BlockCache *lru.Cache
blockCache *lru.Cache
blockCacheV3 *lru.Cache // blockCacheV3 caches blocks containing type-3 (BlobTxType) transactions
headerCache *lru.Cache
}

var _ ChainClient = (*EVMChainClient)(nil)
Expand Down Expand Up @@ -146,12 +152,24 @@ func NewEVMChainClient(
return nil, err
}
ob.evmClient = client
ob.evmClientAlternate = ethrpc.NewEthRPC(evmCfg.Endpoint)

ob.BlockCache, err = lru.New(1000)
// create block header and block caches
ob.blockCache, err = lru.New(1000)
if err != nil {
ob.logger.ChainLogger.Error().Err(err).Msg("failed to create block cache")
return nil, err
}
ob.blockCacheV3, err = lru.New(1000)
if err != nil {
ob.logger.ChainLogger.Error().Err(err).Msg("failed to create block cache v3")
return nil, err
}
ob.headerCache, err = lru.New(1000)
if err != nil {
ob.logger.ChainLogger.Error().Err(err).Msg("failed to create header cache")
return nil, err
}

if ob.chain.IsKlaytnChain() {
client, err := Dial(evmCfg.Endpoint)
Expand Down Expand Up @@ -756,36 +774,58 @@ func (ob *EVMChainClient) checkConfirmedTx(txHash string, nonce uint64) (*ethtyp
return nil, nil, false
}

// cross-check receipt against the block
block, err := ob.GetBlockByNumberCached(receipt.BlockNumber.Uint64())
if err != nil {
log.Error().Err(err).Msgf("confirmTxByHash: GetBlockByNumberCached error, txHash %s nonce %d block %d",
txHash, nonce, receipt.BlockNumber)
return nil, nil, false
}
// #nosec G701 non negative value
if receipt.TransactionIndex >= uint(len(block.Transactions())) {
log.Error().Msgf("confirmTxByHash: transaction index %d out of range [0, %d), txHash %s nonce %d block %d",
receipt.TransactionIndex, len(block.Transactions()), txHash, nonce, receipt.BlockNumber)
return nil, nil, false
}
txAtIndex := block.Transactions()[receipt.TransactionIndex]
if txAtIndex.Hash() != transaction.Hash() {
log.Error().Msgf("confirmTxByHash: transaction at index %d has different hash %s, txHash %s nonce %d block %d",
receipt.TransactionIndex, txAtIndex.Hash().Hex(), txHash, nonce, receipt.BlockNumber)
return nil, nil, false
}

// check confirmations
if !ob.HasEnoughConfirmations(receipt, ob.GetLastBlockHeight()) {
log.Debug().Msgf("confirmTxByHash: txHash %s nonce %d included but not confirmed: receipt block %d, current block %d",
txHash, nonce, receipt.BlockNumber, ob.GetLastBlockHeight())
return nil, nil, false
}

// cross-check tx inclusion against the block
// Note: a guard for false BlockNumber in receipt. The blob-carrying tx won't come here
err = ob.checkTxInclusion(transaction, receipt.BlockNumber.Uint64(), receipt.TransactionIndex)
if err != nil {
log.Error().Err(err).Msgf("confirmTxByHash: checkTxInclusion error for txHash %s nonce %d", txHash, nonce)
return nil, nil, false
}

return receipt, transaction, true
}

// checkTxInclusion returns nil only if tx is included in the block at blockNumber and txIndex
func (ob *EVMChainClient) checkTxInclusion(tx *ethtypes.Transaction, blockNumber uint64, txIndex uint) error {
block, blockRPC, fallBack, _, err := ob.GetBlockByNumberCached(blockNumber)
if err != nil {
return fmt.Errorf("GetBlockByNumberCached error for block %d txHash %s nonce %d: %w", blockNumber, tx.Hash(), tx.Nonce(), err)
}
if !fallBack {
// #nosec G701 non negative value
if txIndex >= uint(len(block.Transactions())) {
return fmt.Errorf("transaction index %d out of range [0, %d), txHash %s nonce %d block %d",
txIndex, len(block.Transactions()), tx.Hash(), tx.Nonce(), blockNumber)
}
txAtIndex := block.Transactions()[txIndex]
if txAtIndex.Hash() != tx.Hash() {
ob.RemoveCachedBlock(blockNumber) // clean stale block from cache
return fmt.Errorf("transaction at index %d has different hash %s, txHash %s nonce %d block %d",
txIndex, txAtIndex.Hash().Hex(), tx.Hash(), tx.Nonce(), blockNumber)
}
} else { // fell back on ETH RPC as ethclient failed to parse the block
// #nosec G701 non negative value
if txIndex >= uint(len(blockRPC.Transactions)) {
return fmt.Errorf("transaction index %d out of range [0, %d), txHash %s nonce %d block %d",
txIndex, len(block.Transactions()), tx.Hash(), tx.Nonce(), blockNumber)
}
txAtIndex := blockRPC.Transactions[txIndex]
if ethcommon.HexToHash(txAtIndex.Hash) != tx.Hash() {
ob.RemoveCachedBlock(blockNumber) // clean stale block from cache
return fmt.Errorf("transaction at index %d has different hash %s, txHash %s nonce %d block %d",
txIndex, txAtIndex.Hash, tx.Hash(), tx.Nonce(), blockNumber)
}
}
return nil
}

// SetLastBlockHeightScanned set last block height scanned (not necessarily caught up with external block; could be slow/paused)
func (ob *EVMChainClient) SetLastBlockHeightScanned(height uint64) {
atomic.StoreUint64(&ob.lastBlockScanned, height)
Expand Down Expand Up @@ -863,21 +903,21 @@ func (ob *EVMChainClient) postBlockHeader(tip uint64) error {
return fmt.Errorf("postBlockHeader: must post block confirmed block header: %d > %d", bn, tip)
}

block, err := ob.GetBlockByNumberCached(bn)
header, err := ob.GetBlockHeaderCached(bn)
if err != nil {
ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("postBlockHeader: error getting block: %d", bn)
return err
}
headerRLP, err := rlp.EncodeToBytes(block.Header())
headerRLP, err := rlp.EncodeToBytes(header)
if err != nil {
ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("postBlockHeader: error encoding block header: %d", bn)
return err
}

_, err = ob.zetaClient.PostAddBlockHeader(
ob.chain.ChainId,
block.Hash().Bytes(),
block.Number().Int64(),
header.Hash().Bytes(),
header.Number.Int64(),
common.NewEthereumHeader(headerRLP),
)
if err != nil {
Expand Down Expand Up @@ -1159,48 +1199,39 @@ func (ob *EVMChainClient) observeTssRecvd(startBlock, toBlock uint64, flags obse
}

// TODO: we can track the total number of 'getBlockByNumber' RPC calls made
block, err := ob.GetBlockByNumberCached(bn)
block, blockRPC, fallBack, skip, err := ob.GetBlockByNumberCached(bn)
if err != nil {
if skip {
ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("observeTssRecvd: skip block %d for chain %d", bn, ob.chain.ChainId)
continue
}
ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("observeTssRecvd: error getting block %d for chain %d", bn, ob.chain.ChainId)
return bn - 1 // we have to re-scan from this block next time
}
for _, tx := range block.Transactions() {
if tx.To() == nil {
continue
}

if *tx.To() == tssAddress {
receipt, err := ob.evmClient.TransactionReceipt(context.Background(), tx.Hash())
if err != nil {
ob.logger.ExternalChainWatcher.Err(err).Msgf(
"observeTssRecvd: TransactionReceipt error for tx %s chain %d", tx.Hash().Hex(), ob.chain.ChainId)
return bn - 1 // we have to re-scan this block next time
}
if receipt.Status != 1 { // 1: successful, 0: failed
ob.logger.ExternalChainWatcher.Info().Msgf("observeTssRecvd: tx %s chain %d failed; don't act", tx.Hash().Hex(), ob.chain.ChainId)
continue
}

sender, err := ob.GetTransactionSender(tx, block.Hash(), receipt.TransactionIndex)
if err != nil {
ob.logger.ExternalChainWatcher.Err(err).Msgf(
"observeTssRecvd: GetTransactionSender error for tx %s chain %d", tx.Hash().Hex(), ob.chain.ChainId)
return bn - 1 // we have to re-scan this block next time
}

msg := ob.GetInboundVoteMsgForTokenSentToTSS(tx, sender, receipt.BlockNumber.Uint64())
if msg == nil {
continue
if !fallBack {
for _, tx := range block.Transactions() {
if tx.To() != nil && *tx.To() == tssAddress {
if ok := ob.processIntxToTss(tx, bn, block.Hash()); !ok {
return bn - 1 // we have to re-scan this block next time
}
}
zetaHash, ballot, err := ob.zetaClient.PostVoteInbound(PostVoteInboundGasLimit, PostVoteInboundExecutionGasLimit, msg)
if err != nil {
ob.logger.ExternalChainWatcher.Error().Err(err).Msgf(
"observeTssRecvd: error posting to zeta core for tx %s at height %d for chain %d", tx.Hash().Hex(), bn, ob.chain.ChainId)
return bn - 1 // we have to re-scan this block next time
} else if zetaHash != "" {
ob.logger.ExternalChainWatcher.Info().Msgf(
"observeTssRecvd: gas asset deposit detected in tx %s at height %d for chain %d, PostVoteInbound zeta tx: %s ballot %s",
tx.Hash().Hex(), bn, ob.chain.ChainId, zetaHash, ballot)
}
} else { // fell back on ETH RPC as ethclient failed to parse the block
ob.logger.ExternalChainWatcher.Info().Msgf("observeTssRecvd: processing block %d using fallback for chain %d", bn, ob.chain.ChainId)
for _, txRPC := range blockRPC.Transactions {
if ethcommon.HexToAddress(txRPC.To) == tssAddress {
tx, _, err := ob.evmClient.TransactionByHash(context.Background(), ethcommon.HexToHash(txRPC.Hash))
if err != nil {
if strings.Contains(err.Error(), "transaction type not supported") {
ob.logger.ExternalChainWatcher.Err(err).Msgf(
"observeTssRecvd: transaction type not supported for tx %s chain %d", txRPC.Hash, ob.chain.ChainId)
continue // skip blob-carrying tx to TSS address
}
return bn - 1 // we have to re-scan this block next time
}
if ok := ob.processIntxToTss(tx, bn, ethcommon.HexToHash(blockRPC.Hash)); !ok {
return bn - 1 // we have to re-scan this block next time
}
}
}
}
Expand All @@ -1209,6 +1240,48 @@ func (ob *EVMChainClient) observeTssRecvd(startBlock, toBlock uint64, flags obse
return toBlock
}

// processIntxToTss processes the incoming tx to TSS address and posts to zetacore
// returns true if the tx is successfully processed, false otherwise
func (ob *EVMChainClient) processIntxToTss(tx *ethtypes.Transaction, bn uint64, blockHash ethcommon.Hash) bool {
receipt, err := ob.evmClient.TransactionReceipt(context.Background(), tx.Hash())
if err != nil {
ob.logger.ExternalChainWatcher.Err(err).Msgf(
"processIntxToTss: TransactionReceipt error for tx %s chain %d", tx.Hash().Hex(), ob.chain.ChainId)
return false // we have to re-scan this block next time
}
if receipt.Status != 1 { // 1: successful, 0: failed
ob.logger.ExternalChainWatcher.Info().Msgf("processIntxToTss: tx %s chain %d failed; don't act", tx.Hash().Hex(), ob.chain.ChainId)
return true // skip failed tx
}
if bytes.Equal(tx.Data(), []byte(DonationMessage)) {
ob.logger.ExternalChainWatcher.Info().Msgf(
"processIntxToTss: thank you rich folk for your donation!: %s chain %d", tx.Hash().Hex(), ob.chain.ChainId)
return true // skip donation tx
}
sender, err := ob.GetTransactionSender(tx, blockHash, receipt.TransactionIndex)
if err != nil {
ob.logger.ExternalChainWatcher.Err(err).Msgf(
"processIntxToTss: GetTransactionSender error for tx %s chain %d", tx.Hash().Hex(), ob.chain.ChainId)
return false // we have to re-scan this block next time
}

msg := ob.GetInboundVoteMsgForTokenSentToTSS(tx, sender, bn)
if msg == nil {
return true // should never happen, always non-nil
}
zetaHash, ballot, err := ob.zetaClient.PostVoteInbound(PostVoteInboundGasLimit, PostVoteInboundExecutionGasLimit, msg)
if err != nil {
ob.logger.ExternalChainWatcher.Error().Err(err).Msgf(
"processIntxToTss: error posting to zeta core for tx %s at height %d for chain %d", tx.Hash().Hex(), bn, ob.chain.ChainId)
return false // we have to re-scan this block next time
} else if zetaHash != "" {
ob.logger.ExternalChainWatcher.Info().Msgf(
"processIntxToTss: gas asset deposit detected in tx %s at height %d for chain %d, PostSend zeta tx: %s ballot %s",
tx.Hash().Hex(), bn, ob.chain.ChainId, zetaHash, ballot)
}
return true
}

func (ob *EVMChainClient) WatchGasPrice() {
ob.logger.WatchGasPrice.Info().Msg("WatchGasPrice starting...")
err := ob.PostGasPrice()
Expand Down Expand Up @@ -1411,15 +1484,51 @@ func (ob *EVMChainClient) GetTxID(nonce uint64) string {
return fmt.Sprintf("%d-%s-%d", ob.chain.ChainId, tssAddr, nonce)
}

func (ob *EVMChainClient) GetBlockByNumberCached(blockNumber uint64) (*ethtypes.Block, error) {
if block, ok := ob.BlockCache.Get(blockNumber); ok {
return block.(*ethtypes.Block), nil
func (ob *EVMChainClient) GetBlockHeaderCached(blockNumber uint64) (*ethtypes.Header, error) {
if header, ok := ob.headerCache.Get(blockNumber); ok {
return header.(*ethtypes.Header), nil
}
block, err := ob.evmClient.BlockByNumber(context.Background(), new(big.Int).SetUint64(blockNumber))
header, err := ob.evmClient.HeaderByNumber(context.Background(), new(big.Int).SetUint64(blockNumber))
if err != nil {
return nil, err
}
ob.BlockCache.Add(blockNumber, block)
ob.BlockCache.Add(block.Hash(), block)
return block, nil
ob.headerCache.Add(blockNumber, header)
return header, nil
}

// GetBlockByNumberCached get block by number from cache
// returns block, ethrpc.Block, isFallback, isSkip, error
func (ob *EVMChainClient) GetBlockByNumberCached(blockNumber uint64) (*ethtypes.Block, *ethrpc.Block, bool, bool, error) {
if block, ok := ob.blockCache.Get(blockNumber); ok {
return block.(*ethtypes.Block), nil, false, false, nil
}
if block, ok := ob.blockCacheV3.Get(blockNumber); ok {
return nil, block.(*ethrpc.Block), true, false, nil
}
block, err := ob.evmClient.BlockByNumber(context.Background(), new(big.Int).SetUint64(blockNumber))
if err != nil {
if strings.Contains(err.Error(), "block header indicates no transactions") {
return nil, nil, false, true, err // it's ok skip empty block
} else if strings.Contains(err.Error(), "transaction type not supported") {
if blockNumber > math.MaxInt32 {
return nil, nil, true, false, fmt.Errorf("block number %d is too large", blockNumber)
}
// #nosec G701 always in range, checked above
rpcBlock, err := ob.evmClientAlternate.EthGetBlockByNumber(int(blockNumber), true)
if err != nil {
return nil, nil, true, false, err // fall back on ethRPC but still fail
}
ob.blockCacheV3.Add(blockNumber, rpcBlock)
return nil, rpcBlock, true, false, nil // fall back on ethRPC without error
}
return nil, nil, false, false, err
}
ob.blockCache.Add(blockNumber, block)
return block, nil, false, false, nil
}

// RemoveCachedBlock remove block from cache
func (ob *EVMChainClient) RemoveCachedBlock(blockNumber uint64) {
ob.blockCache.Remove(blockNumber)
ob.blockCacheV3.Remove(blockNumber)
}
Loading

0 comments on commit 965e366

Please sign in to comment.