Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Solana RPC status check and some refactor around RPC status check #2751

Merged
merged 15 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* [2681](https://github.com/zeta-chain/node/pull/2681) - implement `MsgUpdateERC20CustodyPauseStatus` to pause or unpause ERC20 Custody contract (to be used for the migration process for smart contract V2)
* [2644](https://github.com/zeta-chain/node/pull/2644) - add created_timestamp to cctx status
* [2673](https://github.com/zeta-chain/node/pull/2673) - add relayer key importer, encryption and decryption
* [2751](https://github.com/zeta-chain/node/pull/2751) - add RPC status check for Solana chain

### Refactor

Expand Down
10 changes: 10 additions & 0 deletions zetaclient/chains/base/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ type Observer struct {
// lastTxScanned is the last transaction hash scanned by the observer
lastTxScanned string

// rpcAlertLatency is the threshold of RPC latency (in seconds) to trigger an alert
rpcAlertLatency uint64
ws4charlie marked this conversation as resolved.
Show resolved Hide resolved

// blockCache is the cache for blocks
blockCache *lru.Cache

Expand Down Expand Up @@ -92,6 +95,7 @@ func NewObserver(
tss interfaces.TSSSigner,
blockCacheSize int,
headerCacheSize int,
rpcAlertLatency uint64,
ts *metrics.TelemetryServer,
database *db.DB,
logger Logger,
Expand All @@ -104,6 +108,7 @@ func NewObserver(
lastBlock: 0,
lastBlockScanned: 0,
lastTxScanned: "",
rpcAlertLatency: rpcAlertLatency,
ts: ts,
db: database,
mu: &sync.Mutex{},
Expand Down Expand Up @@ -246,6 +251,11 @@ func (ob *Observer) WithLastTxScanned(txHash string) *Observer {
return ob
}

// RPCAlertLatency returns the RPC alert latency for the observer.
func (ob *Observer) RPCAlertLatency() uint64 {
return ob.rpcAlertLatency
}

// BlockCache returns the block cache for the observer.
func (ob *Observer) BlockCache() *lru.Cache {
return ob.blockCache
Expand Down
27 changes: 27 additions & 0 deletions zetaclient/chains/base/observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func createObserver(t *testing.T, chain chains.Chain) *base.Observer {
tss,
base.DefaultBlockCacheSize,
base.DefaultHeaderCacheSize,
60,
nil,
database,
logger,
Expand Down Expand Up @@ -122,6 +123,7 @@ func TestNewObserver(t *testing.T) {
tt.tss,
tt.blockCacheSize,
tt.headerCacheSize,
60,
nil,
database,
base.DefaultLogger(),
Expand Down Expand Up @@ -159,6 +161,7 @@ func TestObserverGetterAndSetter(t *testing.T) {
ob = ob.WithChain(chains.BscMainnet)
require.Equal(t, newChain, ob.Chain())
})

t.Run("should be able to update chain params", func(t *testing.T) {
ob := createObserver(t, chain)

Expand All @@ -167,6 +170,7 @@ func TestObserverGetterAndSetter(t *testing.T) {
ob = ob.WithChainParams(newChainParams)
require.True(t, observertypes.ChainParamsEqual(newChainParams, ob.ChainParams()))
})

t.Run("should be able to update zetacore client", func(t *testing.T) {
ob := createObserver(t, chain)

Expand All @@ -175,6 +179,7 @@ func TestObserverGetterAndSetter(t *testing.T) {
ob = ob.WithZetacoreClient(newZetacoreClient)
require.Equal(t, newZetacoreClient, ob.ZetacoreClient())
})

t.Run("should be able to update tss", func(t *testing.T) {
ob := createObserver(t, chain)

Expand All @@ -183,6 +188,7 @@ func TestObserverGetterAndSetter(t *testing.T) {
ob = ob.WithTSS(newTSS)
require.Equal(t, newTSS, ob.TSS())
})

t.Run("should be able to update last block", func(t *testing.T) {
ob := createObserver(t, chain)

Expand All @@ -191,6 +197,7 @@ func TestObserverGetterAndSetter(t *testing.T) {
ob = ob.WithLastBlock(newLastBlock)
require.Equal(t, newLastBlock, ob.LastBlock())
})

t.Run("should be able to update last block scanned", func(t *testing.T) {
ob := createObserver(t, chain)

Expand All @@ -199,6 +206,7 @@ func TestObserverGetterAndSetter(t *testing.T) {
ob = ob.WithLastBlockScanned(newLastBlockScanned)
require.Equal(t, newLastBlockScanned, ob.LastBlockScanned())
})

t.Run("should be able to update last tx scanned", func(t *testing.T) {
ob := createObserver(t, chain)

Expand All @@ -207,6 +215,14 @@ func TestObserverGetterAndSetter(t *testing.T) {
ob = ob.WithLastTxScanned(newLastTxScanned)
require.Equal(t, newLastTxScanned, ob.LastTxScanned())
})

t.Run("should be able to get rpc alert latency", func(t *testing.T) {
ob := createObserver(t, chain)

// get rpc alert latency
require.EqualValues(t, 60, ob.RPCAlertLatency())
})

t.Run("should be able to replace block cache", func(t *testing.T) {
ob := createObserver(t, chain)

Expand All @@ -217,6 +233,7 @@ func TestObserverGetterAndSetter(t *testing.T) {
ob = ob.WithBlockCache(newBlockCache)
require.Equal(t, newBlockCache, ob.BlockCache())
})

t.Run("should be able to replace header cache", func(t *testing.T) {
ob := createObserver(t, chain)

Expand All @@ -227,6 +244,7 @@ func TestObserverGetterAndSetter(t *testing.T) {
ob = ob.WithHeaderCache(newHeadersCache)
require.Equal(t, newHeadersCache, ob.HeaderCache())
})

t.Run("should be able to update telemetry server", func(t *testing.T) {
ob := createObserver(t, chain)

Expand All @@ -235,6 +253,7 @@ func TestObserverGetterAndSetter(t *testing.T) {
ob = ob.WithTelemetryServer(newServer)
require.Equal(t, newServer, ob.TelemetryServer())
})

t.Run("should be able to get logger", func(t *testing.T) {
ob := createObserver(t, chain)
logger := ob.Logger()
Expand Down Expand Up @@ -307,6 +326,7 @@ func TestLoadLastBlockScanned(t *testing.T) {
require.NoError(t, err)
require.EqualValues(t, 100, ob.LastBlockScanned())
})

t.Run("latest block scanned should be 0 if not found in db", func(t *testing.T) {
// create observer and open db
ob := createObserver(t, chain)
Expand All @@ -316,6 +336,7 @@ func TestLoadLastBlockScanned(t *testing.T) {
require.NoError(t, err)
require.EqualValues(t, 0, ob.LastBlockScanned())
})

t.Run("should overwrite last block scanned if env var is set", func(t *testing.T) {
// create observer and open db
ob := createObserver(t, chain)
Expand All @@ -331,6 +352,7 @@ func TestLoadLastBlockScanned(t *testing.T) {
require.NoError(t, err)
require.EqualValues(t, 101, ob.LastBlockScanned())
})

t.Run("last block scanned should remain 0 if env var is set to latest", func(t *testing.T) {
// create observer and open db
ob := createObserver(t, chain)
Expand All @@ -346,6 +368,7 @@ func TestLoadLastBlockScanned(t *testing.T) {
require.NoError(t, err)
require.EqualValues(t, 0, ob.LastBlockScanned())
})

t.Run("should return error on invalid env var", func(t *testing.T) {
// create observer and open db
ob := createObserver(t, chain)
Expand Down Expand Up @@ -392,6 +415,7 @@ func TestReadWriteDBLastBlockScanned(t *testing.T) {
require.NoError(t, err)
require.EqualValues(t, 100, lastBlockScanned)
})

t.Run("should return error when last block scanned not found in db", func(t *testing.T) {
// create empty db
ob := createObserver(t, chain)
Expand All @@ -417,6 +441,7 @@ func TestLoadLastTxScanned(t *testing.T) {
ob.LoadLastTxScanned()
require.EqualValues(t, lastTx, ob.LastTxScanned())
})

t.Run("latest tx scanned should be empty if not found in db", func(t *testing.T) {
// create observer and open db
ob := createObserver(t, chain)
Expand All @@ -425,6 +450,7 @@ func TestLoadLastTxScanned(t *testing.T) {
ob.LoadLastTxScanned()
require.Empty(t, ob.LastTxScanned())
})

t.Run("should overwrite last tx scanned if env var is set", func(t *testing.T) {
// create observer and open db
ob := createObserver(t, chain)
Expand Down Expand Up @@ -480,6 +506,7 @@ func TestReadWriteDBLastTxScanned(t *testing.T) {
require.NoError(t, err)
require.EqualValues(t, lastTx, lastTxScanned)
})

t.Run("should return error when last tx scanned not found in db", func(t *testing.T) {
// create empty db
ob := createObserver(t, chain)
Expand Down
69 changes: 3 additions & 66 deletions zetaclient/chains/bitcoin/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"math"
"math/big"
"sort"
"time"

"github.com/btcsuite/btcd/btcjson"
"github.com/btcsuite/btcd/chaincfg"
Expand Down Expand Up @@ -115,6 +114,7 @@ func NewObserver(
chainParams observertypes.ChainParams,
zetacoreClient interfaces.ZetacoreClient,
tss interfaces.TSSSigner,
rpcAlertLatency uint64,
database *db.DB,
logger base.Logger,
ts *metrics.TelemetryServer,
Expand All @@ -127,6 +127,7 @@ func NewObserver(
tss,
btcBlocksPerDay,
base.DefaultHeaderCacheSize,
rpcAlertLatency,
ts,
database,
logger,
Expand Down Expand Up @@ -224,70 +225,6 @@ func (ob *Observer) Start(ctx context.Context) {
bg.Work(ctx, ob.WatchRPCStatus, bg.WithName("WatchRPCStatus"), bg.WithLogger(ob.Logger().Chain))
}

// WatchRPCStatus watches the RPC status of the Bitcoin chain
// TODO(revamp): move ticker related functions to a specific file
// TODO(revamp): move inner logic in a separate function
func (ob *Observer) WatchRPCStatus(_ context.Context) error {
ob.logger.Chain.Info().Msgf("RPCStatus is starting")
ticker := time.NewTicker(60 * time.Second)

for {
select {
case <-ticker.C:
if !ob.GetChainParams().IsSupported {
continue
}

bn, err := ob.btcClient.GetBlockCount()
if err != nil {
ob.logger.Chain.Error().Err(err).Msg("RPC status check: RPC down? ")
continue
}

hash, err := ob.btcClient.GetBlockHash(bn)
if err != nil {
ob.logger.Chain.Error().Err(err).Msg("RPC status check: RPC down? ")
continue
}

header, err := ob.btcClient.GetBlockHeader(hash)
if err != nil {
ob.logger.Chain.Error().Err(err).Msg("RPC status check: RPC down? ")
continue
}

blockTime := header.Timestamp
elapsedSeconds := time.Since(blockTime).Seconds()
if elapsedSeconds > 1200 {
ob.logger.Chain.Error().Err(err).Msg("RPC status check: RPC down? ")
continue
}

tssAddr := ob.TSS().BTCAddressWitnessPubkeyHash()
res, err := ob.btcClient.ListUnspentMinMaxAddresses(0, 1000000, []btcutil.Address{tssAddr})
if err != nil {
ob.logger.Chain.Error().
Err(err).
Msg("RPC status check: can't list utxos of TSS address; wallet or loaded? TSS address is not imported? ")
continue
}

if len(res) == 0 {
ob.logger.Chain.Error().
Err(err).
Msg("RPC status check: TSS address has no utxos; TSS address is not imported? ")
continue
}

ob.logger.Chain.Info().
Msgf("[OK] RPC status check: latest block number %d, timestamp %s (%.fs ago), tss addr %s, #utxos: %d", bn, blockTime, elapsedSeconds, tssAddr, len(res))

case <-ob.StopChannel():
return nil
}
}
}

// GetPendingNonce returns the artificial pending nonce
// Note: pending nonce is accessed concurrently
func (ob *Observer) GetPendingNonce() uint64 {
Expand Down Expand Up @@ -399,12 +336,12 @@ func (ob *Observer) PostGasPrice(ctx context.Context) error {
// TODO(revamp): move in upper package to separate file (e.g., rpc.go)
func GetSenderAddressByVin(rpcClient interfaces.BTCRPCClient, vin btcjson.Vin, net *chaincfg.Params) (string, error) {
// query previous raw transaction by txid
// GetTransaction requires reconfiguring the bitcoin node (txindex=1), so we use GetRawTransaction instead
hash, err := chainhash.NewHashFromStr(vin.Txid)
if err != nil {
return "", err
}

// this requires running bitcoin node with 'txindex=1'
tx, err := rpcClient.GetRawTransaction(hash)
if err != nil {
return "", errors.Wrapf(err, "error getting raw transaction %s", vin.Txid)
Expand Down
2 changes: 2 additions & 0 deletions zetaclient/chains/bitcoin/observer/observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func MockBTCObserver(
params,
nil,
nil,
60,
database,
base.Logger{},
nil,
Expand Down Expand Up @@ -169,6 +170,7 @@ func Test_NewObserver(t *testing.T) {
tt.chainParams,
tt.coreClient,
tt.tss,
60,
database,
tt.logger,
tt.ts,
Expand Down
2 changes: 1 addition & 1 deletion zetaclient/chains/bitcoin/observer/outbound_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func MockBTCObserverMainnet(t *testing.T) *Observer {
require.NoError(t, err)

// create Bitcoin observer
ob, err := NewObserver(chain, btcClient, params, nil, tss, database, base.Logger{}, nil)
ob, err := NewObserver(chain, btcClient, params, nil, tss, 60, database, base.Logger{}, nil)
require.NoError(t, err)

return ob
Expand Down
34 changes: 34 additions & 0 deletions zetaclient/chains/bitcoin/observer/rpc_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package observer

import (
"context"
"time"

"github.com/zeta-chain/zetacore/zetaclient/chains/bitcoin/rpc"
"github.com/zeta-chain/zetacore/zetaclient/common"
)

// WatchRPCStatus watches the RPC status of the Bitcoin chain
func (ob *Observer) WatchRPCStatus(_ context.Context) error {
ob.Logger().Chain.Info().Msgf("WatchRPCStatus started for chain %d", ob.Chain().ChainId)

Check warning on line 13 in zetaclient/chains/bitcoin/observer/rpc_status.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/chains/bitcoin/observer/rpc_status.go#L12-L13

Added lines #L12 - L13 were not covered by tests
ws4charlie marked this conversation as resolved.
Show resolved Hide resolved

ticker := time.NewTicker(common.RPCStatusCheckInterval)
swift1337 marked this conversation as resolved.
Show resolved Hide resolved
for {
select {
case <-ticker.C:
if !ob.GetChainParams().IsSupported {
continue

Check warning on line 20 in zetaclient/chains/bitcoin/observer/rpc_status.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/chains/bitcoin/observer/rpc_status.go#L15-L20

Added lines #L15 - L20 were not covered by tests
}

alertLatency := ob.RPCAlertLatency()
tssAddress := ob.TSS().BTCAddressWitnessPubkeyHash()
err := rpc.CheckRPCStatus(ob.btcClient, alertLatency, tssAddress, ob.Logger().Chain)
if err != nil {
ob.Logger().Chain.Error().Err(err).Msg("RPC Status error")

Check warning on line 27 in zetaclient/chains/bitcoin/observer/rpc_status.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/chains/bitcoin/observer/rpc_status.go#L23-L27

Added lines #L23 - L27 were not covered by tests
}

case <-ob.StopChannel():
return nil

Check warning on line 31 in zetaclient/chains/bitcoin/observer/rpc_status.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/chains/bitcoin/observer/rpc_status.go#L30-L31

Added lines #L30 - L31 were not covered by tests
}
}
}
Loading
Loading