Skip to content

Commit

Permalink
add Solana RPC status check and refactor EVM and bitcoin RPC status c…
Browse files Browse the repository at this point in the history
…heck
  • Loading branch information
ws4charlie committed Aug 20, 2024
1 parent 53883f5 commit 989ccca
Show file tree
Hide file tree
Showing 15 changed files with 323 additions and 119 deletions.
67 changes: 1 addition & 66 deletions zetaclient/chains/bitcoin/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"math"
"math/big"
"sort"
"time"

"github.com/btcsuite/btcd/btcjson"
"github.com/btcsuite/btcd/chaincfg"
Expand Down Expand Up @@ -224,70 +223,6 @@ func (ob *Observer) Start(ctx context.Context) {
bg.Work(ctx, ob.WatchRPCStatus, bg.WithName("WatchRPCStatus"), bg.WithLogger(ob.Logger().Chain))
}

// WatchRPCStatus watches the RPC status of the Bitcoin chain
// TODO(revamp): move ticker related functions to a specific file
// TODO(revamp): move inner logic in a separate function
func (ob *Observer) WatchRPCStatus(_ context.Context) error {
ob.logger.Chain.Info().Msgf("RPCStatus is starting")
ticker := time.NewTicker(60 * time.Second)

for {
select {
case <-ticker.C:
if !ob.GetChainParams().IsSupported {
continue
}

bn, err := ob.btcClient.GetBlockCount()
if err != nil {
ob.logger.Chain.Error().Err(err).Msg("RPC status check: RPC down? ")
continue
}

hash, err := ob.btcClient.GetBlockHash(bn)
if err != nil {
ob.logger.Chain.Error().Err(err).Msg("RPC status check: RPC down? ")
continue
}

header, err := ob.btcClient.GetBlockHeader(hash)
if err != nil {
ob.logger.Chain.Error().Err(err).Msg("RPC status check: RPC down? ")
continue
}

blockTime := header.Timestamp
elapsedSeconds := time.Since(blockTime).Seconds()
if elapsedSeconds > 1200 {
ob.logger.Chain.Error().Err(err).Msg("RPC status check: RPC down? ")
continue
}

tssAddr := ob.TSS().BTCAddressWitnessPubkeyHash()
res, err := ob.btcClient.ListUnspentMinMaxAddresses(0, 1000000, []btcutil.Address{tssAddr})
if err != nil {
ob.logger.Chain.Error().
Err(err).
Msg("RPC status check: can't list utxos of TSS address; wallet or loaded? TSS address is not imported? ")
continue
}

if len(res) == 0 {
ob.logger.Chain.Error().
Err(err).
Msg("RPC status check: TSS address has no utxos; TSS address is not imported? ")
continue
}

ob.logger.Chain.Info().
Msgf("[OK] RPC status check: latest block number %d, timestamp %s (%.fs ago), tss addr %s, #utxos: %d", bn, blockTime, elapsedSeconds, tssAddr, len(res))

case <-ob.StopChannel():
return nil
}
}
}

// GetPendingNonce returns the artificial pending nonce
// Note: pending nonce is accessed concurrently
func (ob *Observer) GetPendingNonce() uint64 {
Expand Down Expand Up @@ -399,12 +334,12 @@ func (ob *Observer) PostGasPrice(ctx context.Context) error {
// TODO(revamp): move in upper package to separate file (e.g., rpc.go)
func GetSenderAddressByVin(rpcClient interfaces.BTCRPCClient, vin btcjson.Vin, net *chaincfg.Params) (string, error) {
// query previous raw transaction by txid
// GetTransaction requires reconfiguring the bitcoin node (txindex=1), so we use GetRawTransaction instead
hash, err := chainhash.NewHashFromStr(vin.Txid)
if err != nil {
return "", err
}

// this requires running bitcoin node with 'txindex=1'
tx, err := rpcClient.GetRawTransaction(hash)
if err != nil {
return "", errors.Wrapf(err, "error getting raw transaction %s", vin.Txid)
Expand Down
33 changes: 33 additions & 0 deletions zetaclient/chains/bitcoin/observer/rpc_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package observer

import (
"context"
"time"

"github.com/zeta-chain/zetacore/zetaclient/chains/bitcoin/rpc"
"github.com/zeta-chain/zetacore/zetaclient/common"
)

// WatchRPCStatus watches the RPC status of the Bitcoin chain
func (ob *Observer) WatchRPCStatus(_ context.Context) error {
ob.Logger().Chain.Info().Msgf("WatchRPCStatus started for chain %d", ob.Chain().ChainId)

ticker := time.NewTicker(common.RPCStatusCheckInterval)
for {
select {
case <-ticker.C:
if !ob.GetChainParams().IsSupported {
continue
}

tssAddress := ob.TSS().BTCAddressWitnessPubkeyHash()
err := rpc.CheckRPCStatus(ob.btcClient, tssAddress, ob.Logger().Chain)
if err != nil {
ob.Logger().Chain.Error().Err(err).Msg("RPC Status error")
}

case <-ob.StopChannel():
return nil
}
}
}
54 changes: 54 additions & 0 deletions zetaclient/chains/bitcoin/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package rpc

import (
"fmt"
"time"

"github.com/btcsuite/btcd/btcjson"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/rpcclient"
"github.com/btcsuite/btcutil"
"github.com/pkg/errors"
"github.com/rs/zerolog"

"github.com/zeta-chain/zetacore/zetaclient/chains/bitcoin"
"github.com/zeta-chain/zetacore/zetaclient/chains/interfaces"
Expand All @@ -20,6 +23,10 @@ const (

// defaultTestnetFeeRate is the default fee rate for testnet, 10 sat/byte
defaultTestnetFeeRate = 10

// rpcLatencyThreshold is the threshold for RPC latency to be considered unhealthy
// Bitcoin block time is 10 minutes, 1200s (20 minutes) is a reasonable threshold for Bitcoin
rpcLatencyThreshold = 1200
)

// NewRPCClient creates a new RPC client by the given config.
Expand Down Expand Up @@ -157,3 +164,50 @@ func GetRecentFeeRate(rpcClient interfaces.BTCRPCClient, netParams *chaincfg.Par
// #nosec G115 always in range
return uint64(highestRate), nil
}

// CheckRPCStatus checks the RPC status of the evm chain
func CheckRPCStatus(client interfaces.BTCRPCClient, tssAddress btcutil.Address, logger zerolog.Logger) error {
// query latest block number
bn, err := client.GetBlockCount()
if err != nil {
return errors.Wrap(err, "GetBlockCount error: RPC down?")
}

// query latest block header
hash, err := client.GetBlockHash(bn)
if err != nil {
return errors.Wrap(err, "GetBlockHash error: RPC down?")
}

// query latest block header thru hash
header, err := client.GetBlockHeader(hash)
if err != nil {
return errors.Wrap(err, "GetBlockHeader error: RPC down?")
}

// latest block should not be too old
blockTime := header.Timestamp
elapsedSeconds := time.Since(blockTime).Seconds()
if elapsedSeconds > rpcLatencyThreshold {
return errors.Errorf(
"Latest block %d is %.0fs old, RPC stale or chain stuck (check explorer)?",
bn,
elapsedSeconds,
)
}

// should be able to list utxos owned by TSS address
res, err := client.ListUnspentMinMaxAddresses(0, 1000000, []btcutil.Address{tssAddress})
if err != nil {
return errors.Wrap(err, "can't list utxos of TSS address; TSS address is not imported?")
}

// TSS address should have utxos
if len(res) == 0 {
return errors.New("TSS address has no utxos; TSS address is not imported?")
}

logger.Info().
Msgf("RPC Status [OK]: latest block %d, timestamp %s (%.fs ago), tss addr %s, #utxos: %d", bn, blockTime, elapsedSeconds, tssAddress, len(res))
return nil
}
19 changes: 18 additions & 1 deletion zetaclient/chains/bitcoin/rpc/rpc_live_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ func TestBitcoinObserverLive(t *testing.T) {
// suite.Run(t, new(BitcoinClientTestSuite))

// LiveTestNewRPCClient(t)
// LiveTestCheckRPCStatus(t)
// LiveTestGetBlockHeightByHash(t)
// LiveTestBitcoinFeeRate(t)
// LiveTestAvgFeeRateMainnetMempoolSpace(t)
Expand All @@ -232,7 +233,7 @@ func LiveTestNewRPCClient(t *testing.T) {
btcConfig := config.BTCConfig{
RPCUsername: "user",
RPCPassword: "pass",
RPCHost: "bitcoin.rpc.zetachain.com/6315704c-49bc-4649-8b9d-e9278a1dfeb3",
RPCHost: os.Getenv("BTC_RPC_TESTNET"),
RPCParams: "mainnet",
}

Expand All @@ -246,6 +247,22 @@ func LiveTestNewRPCClient(t *testing.T) {
require.Greater(t, bn, int64(0))
}

// LiveTestCheckRPCStatus checks the RPC status of the Bitcoin chain
func LiveTestCheckRPCStatus(t *testing.T) {
// setup Bitcoin client
chainID := chains.BitcoinMainnet.ChainId
client, err := createRPCClient(chainID)
require.NoError(t, err)

// decode tss address
tssAddress, err := chains.DecodeBtcAddress(testutils.TSSAddressBTCMainnet, chainID)
require.NoError(t, err)

// check RPC status
err = rpc.CheckRPCStatus(client, tssAddress, log.Logger)
require.NoError(t, err)
}

// LiveTestGetBlockHeightByHash queries Bitcoin block height by hash
func LiveTestGetBlockHeightByHash(t *testing.T) {
// setup Bitcoin client
Expand Down
44 changes: 0 additions & 44 deletions zetaclient/chains/evm/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"math"
"math/big"
"strings"
"time"

ethcommon "github.com/ethereum/go-ethereum/common"
ethtypes "github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -202,49 +201,6 @@ func (ob *Observer) Start(ctx context.Context) {
bg.Work(ctx, ob.WatchRPCStatus, bg.WithName("WatchRPCStatus"), bg.WithLogger(ob.Logger().Chain))
}

// WatchRPCStatus watches the RPC status of the evm chain
// TODO(revamp): move ticker to ticker file
// TODO(revamp): move inner logic to a separate function
func (ob *Observer) WatchRPCStatus(ctx context.Context) error {
ob.Logger().Chain.Info().Msgf("Starting RPC status check for chain %d", ob.Chain().ChainId)
ticker := time.NewTicker(60 * time.Second)
for {
select {
case <-ticker.C:
if !ob.GetChainParams().IsSupported {
continue
}
bn, err := ob.evmClient.BlockNumber(ctx)
if err != nil {
ob.Logger().Chain.Error().Err(err).Msg("RPC Status Check error: RPC down?")
continue
}
gasPrice, err := ob.evmClient.SuggestGasPrice(ctx)
if err != nil {
ob.Logger().Chain.Error().Err(err).Msg("RPC Status Check error: RPC down?")
continue
}
header, err := ob.evmClient.HeaderByNumber(ctx, new(big.Int).SetUint64(bn))
if err != nil {
ob.Logger().Chain.Error().Err(err).Msg("RPC Status Check error: RPC down?")
continue
}
// #nosec G115 always in range
blockTime := time.Unix(int64(header.Time), 0).UTC()
elapsedSeconds := time.Since(blockTime).Seconds()
if elapsedSeconds > 100 {
ob.Logger().Chain.Warn().
Msgf("RPC Status Check warning: RPC stale or chain stuck (check explorer)? Latest block %d timestamp is %.0fs ago", bn, elapsedSeconds)
continue
}
ob.Logger().Chain.Info().
Msgf("[OK] RPC status: latest block num %d, timestamp %s ( %.0fs ago), suggested gas price %d", header.Number, blockTime.String(), elapsedSeconds, gasPrice.Uint64())
case <-ob.StopChannel():
return nil
}
}
}

// SetTxNReceipt sets the receipt and transaction in memory
func (ob *Observer) SetTxNReceipt(nonce uint64, receipt *ethtypes.Receipt, transaction *ethtypes.Transaction) {
ob.Mu().Lock()
Expand Down
32 changes: 32 additions & 0 deletions zetaclient/chains/evm/observer/rpc_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Package observer implements the EVM chain observer
package observer

import (
"context"
"time"

"github.com/zeta-chain/zetacore/zetaclient/chains/evm/rpc"
"github.com/zeta-chain/zetacore/zetaclient/common"
)

// WatchRPCStatus watches the RPC status of the evm chain
func (ob *Observer) WatchRPCStatus(ctx context.Context) error {
ob.Logger().Chain.Info().Msgf("WatchRPCStatus started for chain %d", ob.Chain().ChainId)

ticker := time.NewTicker(common.RPCStatusCheckInterval)
for {
select {
case <-ticker.C:
if !ob.GetChainParams().IsSupported {
continue
}

err := rpc.CheckRPCStatus(ctx, ob.evmClient, ob.Logger().Chain)
if err != nil {
ob.Logger().Chain.Error().Err(err).Msg("RPC Status error")
}
case <-ob.StopChannel():
return nil
}
}
}
46 changes: 46 additions & 0 deletions zetaclient/chains/evm/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,22 @@ package rpc

import (
"context"
"math/big"
"time"

ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
"github.com/rs/zerolog"

"github.com/zeta-chain/zetacore/zetaclient/chains/interfaces"
)

const (
// rpcLatencyThreshold is the threshold for RPC latency to be considered unhealthy
// 100s is a reasonable threshold for most EVM chains
rpcLatencyThreshold = 100
)

// IsTxConfirmed checks if the transaction is confirmed with given confirmations
func IsTxConfirmed(
ctx context.Context,
Expand Down Expand Up @@ -50,3 +59,40 @@ func IsTxConfirmed(

return blocks >= confirmations, nil
}

// CheckRPCStatus checks the RPC status of the evm chain
func CheckRPCStatus(ctx context.Context, client interfaces.EVMRPCClient, logger zerolog.Logger) error {
// query latest block number
bn, err := client.BlockNumber(ctx)
if err != nil {
return errors.Wrap(err, "BlockNumber error: RPC down?")
}

// query suggested gas price
gasPrice, err := client.SuggestGasPrice(ctx)
if err != nil {
return errors.Wrap(err, "SuggestGasPrice error: RPC down?")
}

// query latest block header
header, err := client.HeaderByNumber(ctx, new(big.Int).SetUint64(bn))
if err != nil {
return errors.Wrap(err, "HeaderByNumber error: RPC down?")
}

// latest block should not be too old
// #nosec G115 always in range
blockTime := time.Unix(int64(header.Time), 0).UTC()
elapsedSeconds := time.Since(blockTime).Seconds()
if elapsedSeconds > rpcLatencyThreshold {
return errors.Errorf(
"Latest block %d is %.0fs old, RPC stale or chain stuck (check explorer)?",
bn,
elapsedSeconds,
)
}

logger.Info().
Msgf("RPC Status [OK]: latest block %d, timestamp %s (%.0fs ago), gas price %s", header.Number, blockTime.String(), elapsedSeconds, gasPrice.String())
return nil
}
Loading

0 comments on commit 989ccca

Please sign in to comment.