From c3a5428734fd3abca6e17567361df44884ad92c4 Mon Sep 17 00:00:00 2001 From: Alex Gartner Date: Tue, 6 Aug 2024 09:41:45 -0700 Subject: [PATCH] feat(zetaclient): add generic rpc metrics (#2597) * feat(zetaclient): add generic rpc metrics * feedback * changelog * fmt --- changelog.md | 1 + zetaclient/chains/evm/signer/signer.go | 8 +++- zetaclient/metrics/metrics.go | 57 ++++++++++++++++++++++++++ zetaclient/metrics/metrics_test.go | 49 +++++++++++++++++++--- zetaclient/orchestrator/bootstrap.go | 10 ++++- 5 files changed, 116 insertions(+), 9 deletions(-) diff --git a/changelog.md b/changelog.md index 822cefe360..cf8a796932 100644 --- a/changelog.md +++ b/changelog.md @@ -5,6 +5,7 @@ ### Features * [2578](https://github.com/zeta-chain/node/pull/2578) - Add Gateway address in protocol contract list +* [2597](https://github.com/zeta-chain/node/pull/2597) - Add generic rpc metrics to zetaclient ## v19.0.0 diff --git a/zetaclient/chains/evm/signer/signer.go b/zetaclient/chains/evm/signer/signer.go index 9c928face1..5ec7df55a2 100644 --- a/zetaclient/chains/evm/signer/signer.go +++ b/zetaclient/chains/evm/signer/signer.go @@ -17,6 +17,7 @@ import ( ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethclient" + ethrpc "github.com/ethereum/go-ethereum/rpc" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/zeta-chain/protocol-contracts/pkg/contracts/evm/erc20custody.sol" @@ -866,11 +867,16 @@ func getEVMRPC(ctx context.Context, endpoint string) (interfaces.EVMRPCClient, e client := &mocks.MockEvmClient{} return client, ethSigner, nil } + httpClient, err := metrics.GetInstrumentedHTTPClient(endpoint) + if err != nil { + return nil, nil, errors.Wrap(err, "unable to get instrumented HTTP client") + } - client, err := ethclient.Dial(endpoint) + rpcClient, err := ethrpc.DialHTTPWithClient(endpoint, httpClient) if err != nil { return nil, nil, errors.Wrapf(err, "unable to dial EVM client (endpoint %q)", endpoint) } + client := ethclient.NewClient(rpcClient) chainID, err := client.ChainID(ctx) if err != nil { diff --git a/zetaclient/metrics/metrics.go b/zetaclient/metrics/metrics.go index 50d88b398b..df956daa90 100644 --- a/zetaclient/metrics/metrics.go +++ b/zetaclient/metrics/metrics.go @@ -4,6 +4,7 @@ package metrics import ( "context" "net/http" + "net/url" "time" "github.com/prometheus/client_golang/prometheus" @@ -112,6 +113,34 @@ var ( Help: "Histogram of the TSS keysign latency", Buckets: []float64{1, 7, 15, 30, 60, 120, 240}, }, []string{"result"}) + + // RPCInProgress is a gauge that contains the number of RPCs requests in progress + RPCInProgress = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ZetaClientNamespace, + Name: "rpc_in_progress", + Help: "Number of RPC requests in progress", + }, []string{"host"}) + + // RPCCount is a counter that contains the number of total RPC requests + RPCCount = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: ZetaClientNamespace, + Name: "rpc_count", + Help: "A counter for number of total RPC requests", + }, + []string{"host", "code"}, + ) + + // RPCLatency is a histogram of the RPC latency + RPCLatency = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: ZetaClientNamespace, + Name: "rpc_duration_seconds", + Help: "A histogram of the RPC duration in seconds", + Buckets: prometheus.DefBuckets, + }, + []string{"host"}, + ) ) // NewMetrics creates a new Metrics instance @@ -151,3 +180,31 @@ func (m *Metrics) Stop() error { defer cancel() return m.s.Shutdown(ctx) } + +// GetInstrumentedHTTPClient sets up a http client that emits prometheus metrics +func GetInstrumentedHTTPClient(endpoint string) (*http.Client, error) { + host := endpoint + // try to parse as url (so that we do not expose auth uuid in metrics) + endpointURL, err := url.Parse(endpoint) + if err == nil { + host = endpointURL.Host + } + labels := prometheus.Labels{"host": host} + rpcCounterMetric, err := RPCCount.CurryWith(labels) + if err != nil { + return nil, err + } + rpcLatencyMetric, err := RPCLatency.CurryWith(labels) + if err != nil { + return nil, err + } + + transport := http.DefaultTransport + transport = promhttp.InstrumentRoundTripperDuration(rpcLatencyMetric, transport) + transport = promhttp.InstrumentRoundTripperCounter(rpcCounterMetric, transport) + transport = promhttp.InstrumentRoundTripperInFlight(RPCInProgress.With(labels), transport) + + return &http.Client{ + Transport: transport, + }, nil +} diff --git a/zetaclient/metrics/metrics_test.go b/zetaclient/metrics/metrics_test.go index b73bf00530..6be8bc30c0 100644 --- a/zetaclient/metrics/metrics_test.go +++ b/zetaclient/metrics/metrics_test.go @@ -1,10 +1,15 @@ package metrics import ( + "fmt" + "io" "net/http" + "strings" "testing" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" . "gopkg.in/check.v1" ) @@ -23,20 +28,52 @@ func (ms *MetricsSuite) SetUpSuite(c *C) { ms.m = m } +// assert that the curried metric actually uses the same underlying storage +func (ms *MetricsSuite) TestCurryWith(c *C) { + rpcTotalsC := RPCCount.MustCurryWith(prometheus.Labels{"host": "test"}) + rpcTotalsC.With(prometheus.Labels{"code": "400"}).Add(1.0) + + rpcCtr := testutil.ToFloat64(RPCCount.With(prometheus.Labels{"host": "test", "code": "400"})) + c.Assert(rpcCtr, Equals, 1.0) + + RPCCount.Reset() +} + func (ms *MetricsSuite) TestMetrics(c *C) { GetFilterLogsPerChain.WithLabelValues("chain1").Inc() GetFilterLogsPerChain.WithLabelValues("chain2").Inc() GetFilterLogsPerChain.WithLabelValues("chain2").Inc() time.Sleep(1 * time.Second) - res, err := http.Get("http://127.0.0.1:8886/metrics") + + chain1Ctr := testutil.ToFloat64(GetFilterLogsPerChain.WithLabelValues("chain1")) + c.Assert(chain1Ctr, Equals, 1.0) + + httpClient, err := GetInstrumentedHTTPClient("http://127.0.0.1:8886/myauthuuid") c.Assert(err, IsNil) - c.Assert(res.StatusCode, Equals, http.StatusOK) - defer res.Body.Close() - //out, err := ioutil.ReadAll(res.Body) - //fmt.Println(string(out)) - res, err = http.Get("http://127.0.0.1:8886") + res, err := httpClient.Get("http://127.0.0.1:8886") c.Assert(err, IsNil) + defer res.Body.Close() c.Assert(res.StatusCode, Equals, http.StatusOK) + + res, err = httpClient.Get("http://127.0.0.1:8886/metrics") + c.Assert(err, IsNil) defer res.Body.Close() + c.Assert(res.StatusCode, Equals, http.StatusOK) + body, err := io.ReadAll(res.Body) + c.Assert(err, IsNil) + metricsBody := string(body) + c.Assert(strings.Contains(metricsBody, fmt.Sprintf("%s_%s", ZetaClientNamespace, "rpc_count")), Equals, true) + + // assert that rpc count is being incremented at all + rpcCount := testutil.ToFloat64(RPCCount) + c.Assert(rpcCount, Equals, 2.0) + + // assert that rpc count is being incremented correctly + rpcCount = testutil.ToFloat64(RPCCount.With(prometheus.Labels{"host": "127.0.0.1:8886", "code": "200"})) + c.Assert(rpcCount, Equals, 2.0) + + // assert that rpc count is not being incremented incorrectly + rpcCount = testutil.ToFloat64(RPCCount.With(prometheus.Labels{"host": "127.0.0.1:8886", "code": "502"})) + c.Assert(rpcCount, Equals, 0.0) } diff --git a/zetaclient/orchestrator/bootstrap.go b/zetaclient/orchestrator/bootstrap.go index cd4d2a223c..ef4920f8b5 100644 --- a/zetaclient/orchestrator/bootstrap.go +++ b/zetaclient/orchestrator/bootstrap.go @@ -5,6 +5,7 @@ import ( ethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" + ethrpc "github.com/ethereum/go-ethereum/rpc" solrpc "github.com/gagliardetto/solana-go/rpc" "github.com/pkg/errors" @@ -279,12 +280,17 @@ func syncObserverMap( continue } - // create EVM client - evmClient, err := ethclient.DialContext(ctx, cfg.Endpoint) + httpClient, err := metrics.GetInstrumentedHTTPClient(cfg.Endpoint) + if err != nil { + logger.Std.Error().Err(err).Str("rpc.endpoint", cfg.Endpoint).Msgf("Unable to create HTTP client") + continue + } + rpcClient, err := ethrpc.DialHTTPWithClient(cfg.Endpoint, httpClient) if err != nil { logger.Std.Error().Err(err).Str("rpc.endpoint", cfg.Endpoint).Msgf("Unable to dial EVM RPC") continue } + evmClient := ethclient.NewClient(rpcClient) database, err := db.NewFromSqlite(dbpath, chainName, true) if err != nil {