Skip to content

Commit

Permalink
feat(zetaclient): add generic rpc metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
gartnera committed Jul 30, 2024
1 parent 5771df3 commit 29bb317
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 10 deletions.
8 changes: 7 additions & 1 deletion zetaclient/chains/evm/signer/signer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
ethrpc "github.com/ethereum/go-ethereum/rpc"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/zeta-chain/protocol-contracts/pkg/contracts/evm/erc20custody.sol"
Expand Down Expand Up @@ -863,11 +864,16 @@ func getEVMRPC(ctx context.Context, endpoint string) (interfaces.EVMRPCClient, e
client := &mocks.MockEvmClient{}
return client, ethSigner, nil
}
httpClient, err := metrics.GetInstrumentedHttpClient(endpoint)
if err != nil {
return nil, nil, err
}

client, err := ethclient.Dial(endpoint)
rpcClient, err := ethrpc.DialHTTPWithClient(endpoint, httpClient)
if err != nil {
return nil, nil, errors.Wrapf(err, "unable to dial EVM client (endpoint %q)", endpoint)
}
client := ethclient.NewClient(rpcClient)

chainID, err := client.ChainID(ctx)
if err != nil {
Expand Down
57 changes: 57 additions & 0 deletions zetaclient/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package metrics
import (
"context"
"net/http"
"net/url"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -112,6 +113,34 @@ var (
Help: "Histogram of the TSS keysign latency",
Buckets: []float64{1, 7, 15, 30, 60, 120, 240},
}, []string{"result"})

// RPCInProgress is a gauge that contains the number of RPCs requests in progress
RPCInProgress = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ZetaClientNamespace,
Name: "rpc_in_progress",
Help: "Number of RPC requests in progress",
}, []string{"host"})

// RPCCount is a counter that contains the number of total RPC requests
RPCCount = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: ZetaClientNamespace,
Name: "rpc_count",
Help: "A counter for number of total RPC requests",
},
[]string{"host", "code"},
)

// RPCLatency is a histogram of the RPC latency
RPCLatency = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: ZetaClientNamespace,
Name: "rpc_duration_seconds",
Help: "A histogram of the RPC duration in seconds",
Buckets: prometheus.DefBuckets,
},
[]string{"host"},
)
)

// NewMetrics creates a new Metrics instance
Expand Down Expand Up @@ -151,3 +180,31 @@ func (m *Metrics) Stop() error {
defer cancel()
return m.s.Shutdown(ctx)
}

// getInstrumentedHttpClient sets up a http client that emits prometheus metrics
func GetInstrumentedHttpClient(endpoint string) (*http.Client, error) {

Check failure on line 185 in zetaclient/metrics/metrics.go

View workflow job for this annotation

GitHub Actions / lint

var-naming: func GetInstrumentedHttpClient should be GetInstrumentedHTTPClient (revive)
host := endpoint
// try to parse as url (so that we do not expose auth uuid in metrics)
endpointUrl, err := url.Parse(endpoint)

Check failure on line 188 in zetaclient/metrics/metrics.go

View workflow job for this annotation

GitHub Actions / lint

var-naming: var endpointUrl should be endpointURL (revive)
if err == nil {
host = endpointUrl.Host
}
labels := prometheus.Labels{"host": host}
rpcCounterMetric, err := RPCCount.CurryWith(labels)
if err != nil {
return nil, err
}
rpcLatencyMetric, err := RPCLatency.CurryWith(labels)
if err != nil {
return nil, err
}

transport := http.DefaultTransport
transport = promhttp.InstrumentRoundTripperDuration(rpcLatencyMetric, transport)
transport = promhttp.InstrumentRoundTripperCounter(rpcCounterMetric, transport)
transport = promhttp.InstrumentRoundTripperInFlight(RPCInProgress.With(labels), transport)

return &http.Client{
Transport: transport,
}, nil
}
49 changes: 43 additions & 6 deletions zetaclient/metrics/metrics_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package metrics

import (
"fmt"
"io"
"net/http"
"strings"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
. "gopkg.in/check.v1"
)

Expand All @@ -23,20 +28,52 @@ func (ms *MetricsSuite) SetUpSuite(c *C) {
ms.m = m
}

// assert that the curried metric actually uses the same underlying storage
func (ms *MetricsSuite) TestCurryWith(c *C) {
rpcTotalsC := RPCCount.MustCurryWith(prometheus.Labels{"host": "test"})
rpcTotalsC.With(prometheus.Labels{"code": "400"}).Add(1.0)

rpcCtr := testutil.ToFloat64(RPCCount.With(prometheus.Labels{"host": "test", "code": "400"}))
c.Assert(rpcCtr, Equals, 1.0)

RPCCount.Reset()
}

func (ms *MetricsSuite) TestMetrics(c *C) {
GetFilterLogsPerChain.WithLabelValues("chain1").Inc()
GetFilterLogsPerChain.WithLabelValues("chain2").Inc()
GetFilterLogsPerChain.WithLabelValues("chain2").Inc()
time.Sleep(1 * time.Second)
res, err := http.Get("http://127.0.0.1:8886/metrics")

chain1Ctr := testutil.ToFloat64(GetFilterLogsPerChain.WithLabelValues("chain1"))
c.Assert(chain1Ctr, Equals, 1.0)

httpClient, err := GetInstrumentedHttpClient("http://127.0.0.1:8886")
c.Assert(err, IsNil)
c.Assert(res.StatusCode, Equals, http.StatusOK)
defer res.Body.Close()
//out, err := ioutil.ReadAll(res.Body)
//fmt.Println(string(out))

res, err = http.Get("http://127.0.0.1:8886")
res, err := httpClient.Get("http://127.0.0.1:8886")
c.Assert(err, IsNil)
defer res.Body.Close()
c.Assert(res.StatusCode, Equals, http.StatusOK)

res, err = httpClient.Get("http://127.0.0.1:8886/metrics")
c.Assert(err, IsNil)
defer res.Body.Close()
c.Assert(res.StatusCode, Equals, http.StatusOK)
body, err := io.ReadAll(res.Body)
c.Assert(err, IsNil)
metricsBody := string(body)
c.Assert(strings.Contains(metricsBody, fmt.Sprintf("%s_%s", ZetaClientNamespace, "rpc_count")), Equals, true)

// assert that rpc count is being incremented at all
rpcCount := testutil.ToFloat64(RPCCount)
c.Assert(rpcCount, Equals, 2.0)

// assert that rpc count is being incremented correctly
rpcCount = testutil.ToFloat64(RPCCount.With(prometheus.Labels{"host": "127.0.0.1:8886", "code": "200"}))
c.Assert(rpcCount, Equals, 2.0)

// assert that rpc count is not being incremented incorrectly
rpcCount = testutil.ToFloat64(RPCCount.With(prometheus.Labels{"host": "127.0.0.1:8886", "code": "502"}))
c.Assert(rpcCount, Equals, 0.0)
}
21 changes: 18 additions & 3 deletions zetaclient/orchestrator/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (

ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
ethrpc "github.com/ethereum/go-ethereum/rpc"
solrpc "github.com/gagliardetto/solana-go/rpc"
"github.com/gagliardetto/solana-go/rpc/jsonrpc"
"github.com/pkg/errors"

"github.com/zeta-chain/zetacore/pkg/chains"
Expand Down Expand Up @@ -253,12 +255,17 @@ func syncObserverMap(
continue
}

// create EVM client
evmClient, err := ethclient.DialContext(ctx, evmConfig.Endpoint)
httpClient, err := metrics.GetInstrumentedHttpClient(evmConfig.Endpoint)
if err != nil {
logger.Std.Error().Err(err).Str("rpc.endpoint", evmConfig.Endpoint).Msgf("Unable to create HTTP client")
continue
}
rpcClient, err := ethrpc.DialHTTPWithClient(evmConfig.Endpoint, httpClient)
if err != nil {
logger.Std.Error().Err(err).Str("rpc.endpoint", evmConfig.Endpoint).Msgf("Unable to dial EVM RPC")
continue
}
evmClient := ethclient.NewClient(rpcClient)

database, err := db.NewFromSqlite(dbpath, chain.Name, true)
if err != nil {
Expand Down Expand Up @@ -378,7 +385,15 @@ func syncObserverMap(
continue
}

rpcClient := solrpc.New(solConfig.Endpoint)
httpClient, err := metrics.GetInstrumentedHttpClient(solConfig.Endpoint)
if err != nil {
logger.Std.Error().Err(err).Str("rpc.endpoint", solConfig.Endpoint).Msgf("Unable to create HTTP client")
continue
}
jsonRpcClient := jsonrpc.NewClientWithOpts(solConfig.Endpoint, &jsonrpc.RPCClientOpts{

Check failure on line 393 in zetaclient/orchestrator/bootstrap.go

View workflow job for this annotation

GitHub Actions / lint

var-naming: var jsonRpcClient should be jsonRPCClient (revive)
HTTPClient: httpClient,
})
rpcClient := solrpc.NewWithCustomRPCClient(jsonRpcClient)
if rpcClient == nil {
// should never happen
logger.Std.Error().Msg("solana create Solana client error")
Expand Down

0 comments on commit 29bb317

Please sign in to comment.