Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(zetaclient): move rpc latency logs to metrics #3313

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 4 additions & 26 deletions zetaclient/chains/base/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@
// lastTxScanned is the last transaction hash scanned by the observer
lastTxScanned string

// rpcAlertLatency is the threshold of RPC latency to trigger an alert
rpcAlertLatency time.Duration

blockCache *lru.Cache

// db is the database to persist data
Expand Down Expand Up @@ -87,7 +84,6 @@
zetacoreClient interfaces.ZetacoreClient,
tss interfaces.TSSSigner,
blockCacheSize int,
rpcAlertLatency int64,
ts *metrics.TelemetryServer,
database *db.DB,
logger Logger,
Expand All @@ -105,7 +101,6 @@
lastBlock: 0,
lastBlockScanned: 0,
lastTxScanned: "",
rpcAlertLatency: time.Duration(rpcAlertLatency) * time.Second,
ts: ts,
db: database,
blockCache: blockCache,
Expand Down Expand Up @@ -429,29 +424,12 @@
return ballot, nil
}

// AlertOnRPCLatency prints an alert if the RPC latency exceeds the threshold.
// Returns true if the RPC latency is too high.
func (ob *Observer) AlertOnRPCLatency(latestBlockTime time.Time, defaultAlertLatency time.Duration) bool {
// ReportBlockLatency records the latency between the current time
// an the latest block time for a chain as a metric
func (ob *Observer) ReportBlockLatency(latestBlockTime time.Time) {

Check warning on line 429 in zetaclient/chains/base/observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/chains/base/observer.go#L429

Added line #L429 was not covered by tests
elapsedTime := time.Since(latestBlockTime)

alertLatency := ob.rpcAlertLatency
if alertLatency == 0 {
alertLatency = defaultAlertLatency
}

lf := map[string]any{
"rpc_latency_alert_ms": alertLatency.Milliseconds(),
"rpc_latency_real_ms": elapsedTime.Milliseconds(),
}

if elapsedTime > alertLatency {
ob.logger.Chain.Error().Fields(lf).Msg("RPC latency is too high, please check the node or explorer")
return true
}

ob.logger.Chain.Info().Fields(lf).Msg("RPC latency is OK")

return false
metrics.LatestBlockLatency.WithLabelValues(ob.chain.Name).Set(elapsedTime.Seconds())

Check warning on line 432 in zetaclient/chains/base/observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/chains/base/observer.go#L432

Added line #L432 was not covered by tests
}

// EnvVarLatestBlockByChain returns the environment variable for the last block by chain.
Expand Down
103 changes: 25 additions & 78 deletions zetaclient/chains/base/observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"os"
"strings"
"testing"
"time"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
Expand All @@ -25,9 +24,6 @@ import (
)

const (
// defaultAlertLatency is the default alert latency (in seconds) for unit tests
defaultAlertLatency = 60

// defaultConfirmationCount is the default confirmation count for unit tests
defaultConfirmationCount = 2
)
Expand All @@ -40,7 +36,7 @@ type testSuite struct {
}

// newTestSuite creates a new observer for testing
func newTestSuite(t *testing.T, chain chains.Chain, alertLatency int64) *testSuite {
func newTestSuite(t *testing.T, chain chains.Chain) *testSuite {
// constructor parameters
chainParams := *sample.ChainParams(chain.ChainId)
chainParams.ConfirmationCount = defaultConfirmationCount
Expand All @@ -57,7 +53,6 @@ func newTestSuite(t *testing.T, chain chains.Chain, alertLatency int64) *testSui
zetacoreClient,
tss,
base.DefaultBlockCacheSize,
alertLatency,
nil,
database,
logger,
Expand Down Expand Up @@ -127,7 +122,6 @@ func TestNewObserver(t *testing.T) {
tt.zetacoreClient,
tt.tss,
tt.blockCacheSize,
60,
nil,
database,
base.DefaultLogger(),
Expand All @@ -147,7 +141,7 @@ func TestNewObserver(t *testing.T) {
func TestStop(t *testing.T) {
t.Run("should be able to stop observer", func(t *testing.T) {
// create observer and initialize db
ob := newTestSuite(t, chains.Ethereum, defaultAlertLatency)
ob := newTestSuite(t, chains.Ethereum)

// stop observer
ob.Stop()
Expand All @@ -158,7 +152,7 @@ func TestObserverGetterAndSetter(t *testing.T) {
chain := chains.Ethereum

t.Run("should be able to update last block", func(t *testing.T) {
ob := newTestSuite(t, chain, defaultAlertLatency)
ob := newTestSuite(t, chain)

// update last block
newLastBlock := uint64(100)
Expand All @@ -167,7 +161,7 @@ func TestObserverGetterAndSetter(t *testing.T) {
})

t.Run("should be able to update last block scanned", func(t *testing.T) {
ob := newTestSuite(t, chain, defaultAlertLatency)
ob := newTestSuite(t, chain)

// update last block scanned
newLastBlockScanned := uint64(100)
Expand All @@ -176,7 +170,7 @@ func TestObserverGetterAndSetter(t *testing.T) {
})

t.Run("should be able to update last tx scanned", func(t *testing.T) {
ob := newTestSuite(t, chain, defaultAlertLatency)
ob := newTestSuite(t, chain)

// update last tx scanned
newLastTxScanned := sample.EthAddress().String()
Expand All @@ -185,7 +179,7 @@ func TestObserverGetterAndSetter(t *testing.T) {
})

t.Run("should be able to get logger", func(t *testing.T) {
ob := newTestSuite(t, chain, defaultAlertLatency)
ob := newTestSuite(t, chain)
logger := ob.Logger()

// should be able to print log
Expand Down Expand Up @@ -233,7 +227,7 @@ func TestTSSAddressString(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// create observer
ob := newTestSuite(t, tt.chain, defaultAlertLatency)
ob := newTestSuite(t, tt.chain)

// get TSS address
addr := ob.TSSAddressString()
Expand Down Expand Up @@ -286,7 +280,7 @@ func TestIsBlockConfirmed(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// create observer
ob := newTestSuite(t, tt.chain, defaultAlertLatency)
ob := newTestSuite(t, tt.chain)
ob.Observer.WithLastBlock(tt.lastBlock)

// check if block is confirmed
Expand Down Expand Up @@ -318,7 +312,7 @@ func TestOutboundID(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// create observer
ob := newTestSuite(t, tt.chain, defaultAlertLatency)
ob := newTestSuite(t, tt.chain)

// get outbound id
outboundID := ob.OutboundID(tt.nonce)
Expand All @@ -336,7 +330,7 @@ func TestLoadLastBlockScanned(t *testing.T) {

t.Run("should be able to load last block scanned", func(t *testing.T) {
// create observer and open db
ob := newTestSuite(t, chain, defaultAlertLatency)
ob := newTestSuite(t, chain)

// create db and write 100 as last block scanned
err := ob.WriteLastBlockScannedToDB(100)
Expand All @@ -350,7 +344,7 @@ func TestLoadLastBlockScanned(t *testing.T) {

t.Run("latest block scanned should be 0 if not found in db", func(t *testing.T) {
// create observer and open db
ob := newTestSuite(t, chain, defaultAlertLatency)
ob := newTestSuite(t, chain)

// read last block scanned
err := ob.LoadLastBlockScanned(log.Logger)
Expand All @@ -360,7 +354,7 @@ func TestLoadLastBlockScanned(t *testing.T) {

t.Run("should overwrite last block scanned if env var is set", func(t *testing.T) {
// create observer and open db
ob := newTestSuite(t, chain, defaultAlertLatency)
ob := newTestSuite(t, chain)

// create db and write 100 as last block scanned
ob.WriteLastBlockScannedToDB(100)
Expand All @@ -376,7 +370,7 @@ func TestLoadLastBlockScanned(t *testing.T) {

t.Run("last block scanned should remain 0 if env var is set to latest", func(t *testing.T) {
// create observer and open db
ob := newTestSuite(t, chain, defaultAlertLatency)
ob := newTestSuite(t, chain)

// create db and write 100 as last block scanned
ob.WriteLastBlockScannedToDB(100)
Expand All @@ -392,7 +386,7 @@ func TestLoadLastBlockScanned(t *testing.T) {

t.Run("should return error on invalid env var", func(t *testing.T) {
// create observer and open db
ob := newTestSuite(t, chain, defaultAlertLatency)
ob := newTestSuite(t, chain)

// set invalid env var
os.Setenv(envvar, "invalid")
Expand All @@ -406,7 +400,7 @@ func TestLoadLastBlockScanned(t *testing.T) {
func TestSaveLastBlockScanned(t *testing.T) {
t.Run("should be able to save last block scanned", func(t *testing.T) {
// create observer and open db
ob := newTestSuite(t, chains.Ethereum, defaultAlertLatency)
ob := newTestSuite(t, chains.Ethereum)

// save 100 as last block scanned
err := ob.SaveLastBlockScanned(100)
Expand All @@ -426,7 +420,7 @@ func TestReadWriteDBLastBlockScanned(t *testing.T) {
chain := chains.Ethereum
t.Run("should be able to write and read last block scanned to db", func(t *testing.T) {
// create observer and open db
ob := newTestSuite(t, chain, defaultAlertLatency)
ob := newTestSuite(t, chain)

// write last block scanned
err := ob.WriteLastBlockScannedToDB(100)
Expand All @@ -439,7 +433,7 @@ func TestReadWriteDBLastBlockScanned(t *testing.T) {

t.Run("should return error when last block scanned not found in db", func(t *testing.T) {
// create empty db
ob := newTestSuite(t, chain, defaultAlertLatency)
ob := newTestSuite(t, chain)

lastScannedBlock, err := ob.ReadLastBlockScannedFromDB()
require.Error(t, err)
Expand All @@ -453,7 +447,7 @@ func TestLoadLastTxScanned(t *testing.T) {

t.Run("should be able to load last tx scanned", func(t *testing.T) {
// create observer and open db
ob := newTestSuite(t, chain, defaultAlertLatency)
ob := newTestSuite(t, chain)

// create db and write sample hash as last tx scanned
ob.WriteLastTxScannedToDB(lastTx)
Expand All @@ -465,7 +459,7 @@ func TestLoadLastTxScanned(t *testing.T) {

t.Run("latest tx scanned should be empty if not found in db", func(t *testing.T) {
// create observer and open db
ob := newTestSuite(t, chain, defaultAlertLatency)
ob := newTestSuite(t, chain)

// read last tx scanned
ob.LoadLastTxScanned()
Expand All @@ -474,7 +468,7 @@ func TestLoadLastTxScanned(t *testing.T) {

t.Run("should overwrite last tx scanned if env var is set", func(t *testing.T) {
// create observer and open db
ob := newTestSuite(t, chain, defaultAlertLatency)
ob := newTestSuite(t, chain)

// create db and write sample hash as last tx scanned
ob.WriteLastTxScannedToDB(lastTx)
Expand All @@ -493,7 +487,7 @@ func TestSaveLastTxScanned(t *testing.T) {
chain := chains.SolanaDevnet
t.Run("should be able to save last tx scanned", func(t *testing.T) {
// create observer and open db
ob := newTestSuite(t, chain, defaultAlertLatency)
ob := newTestSuite(t, chain)

// save random tx hash
lastSlot := uint64(100)
Expand All @@ -516,7 +510,7 @@ func TestReadWriteDBLastTxScanned(t *testing.T) {
chain := chains.SolanaDevnet
t.Run("should be able to write and read last tx scanned to db", func(t *testing.T) {
// create observer and open db
ob := newTestSuite(t, chain, defaultAlertLatency)
ob := newTestSuite(t, chain)

// write last tx scanned
lastTx := "5LuQMorgd11p8GWEw6pmyHCDtA26NUyeNFhLWPNk2oBoM9pkag1LzhwGSRos3j4TJLhKjswFhZkGtvSGdLDkmqsk"
Expand All @@ -530,7 +524,7 @@ func TestReadWriteDBLastTxScanned(t *testing.T) {

t.Run("should return error when last tx scanned not found in db", func(t *testing.T) {
// create empty db
ob := newTestSuite(t, chain, defaultAlertLatency)
ob := newTestSuite(t, chain)

lastTxScanned, err := ob.ReadLastTxScannedFromDB()
require.Error(t, err)
Expand All @@ -541,7 +535,7 @@ func TestReadWriteDBLastTxScanned(t *testing.T) {
func TestPostVoteInbound(t *testing.T) {
t.Run("should be able to post vote inbound", func(t *testing.T) {
// create observer
ob := newTestSuite(t, chains.Ethereum, defaultAlertLatency)
ob := newTestSuite(t, chains.Ethereum)

ob.zetacore.WithPostVoteInbound("", "sampleBallotIndex")

Expand All @@ -554,7 +548,7 @@ func TestPostVoteInbound(t *testing.T) {

t.Run("should not post vote if message basic validation fails", func(t *testing.T) {
// create observer
ob := newTestSuite(t, chains.Ethereum, defaultAlertLatency)
ob := newTestSuite(t, chains.Ethereum)

// create sample message with long Message
msg := sample.InboundVote(coin.CoinType_Gas, chains.Ethereum.ChainId, chains.ZetaChainMainnet.ChainId)
Expand All @@ -567,53 +561,6 @@ func TestPostVoteInbound(t *testing.T) {
})
}

func TestAlertOnRPCLatency(t *testing.T) {
now := time.Now()

tests := []struct {
name string
blockTime time.Time
alertLatency int64
alerted bool
}{
{
name: "should alert on high RPC latency",
blockTime: now.Add(-60 * time.Second),
alertLatency: 55,
alerted: true,
},
{
name: "should not alert on normal RPC latency",
blockTime: now.Add(-60 * time.Second),
alertLatency: 65,
alerted: false,
},
{
name: "should alert on higher RPC latency then default",
blockTime: now.Add(-65 * time.Second),
alertLatency: 0, // 0 means not set
alerted: true,
},
{
name: "should not alert on normal RPC latency when compared to default",
blockTime: now.Add(-55 * time.Second),
alertLatency: 0, // 0 means not set
alerted: false,
},
}

// run tests
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// create observer
ob := newTestSuite(t, chains.Ethereum, tt.alertLatency)

alerted := ob.AlertOnRPCLatency(tt.blockTime, time.Duration(defaultAlertLatency)*time.Second)
require.Equal(t, tt.alerted, alerted)
})
}
}

func createDatabase(t *testing.T) *db.DB {
sqlDatabase, err := db.NewFromSqliteInMemory(true)
require.NoError(t, err)
Expand Down
2 changes: 0 additions & 2 deletions zetaclient/chains/bitcoin/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ func NewObserver(
chainParams observertypes.ChainParams,
zetacoreClient interfaces.ZetacoreClient,
tss interfaces.TSSSigner,
rpcAlertLatency int64,
database *db.DB,
logger base.Logger,
ts *metrics.TelemetryServer,
Expand All @@ -107,7 +106,6 @@ func NewObserver(
zetacoreClient,
tss,
btcBlocksPerDay,
rpcAlertLatency,
ts,
database,
logger,
Expand Down
Loading
Loading