From 29bb317e3d4e3ef789d0cf4777e62c13ea315de1 Mon Sep 17 00:00:00 2001 From: Alex Gartner Date: Tue, 30 Jul 2024 10:36:49 -0700 Subject: [PATCH] feat(zetaclient): add generic rpc metrics --- zetaclient/chains/evm/signer/signer.go | 8 +++- zetaclient/metrics/metrics.go | 57 ++++++++++++++++++++++++++ zetaclient/metrics/metrics_test.go | 49 +++++++++++++++++++--- zetaclient/orchestrator/bootstrap.go | 21 ++++++++-- 4 files changed, 125 insertions(+), 10 deletions(-) diff --git a/zetaclient/chains/evm/signer/signer.go b/zetaclient/chains/evm/signer/signer.go index 7235aa4456..a704a9e638 100644 --- a/zetaclient/chains/evm/signer/signer.go +++ b/zetaclient/chains/evm/signer/signer.go @@ -17,6 +17,7 @@ import ( ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethclient" + ethrpc "github.com/ethereum/go-ethereum/rpc" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/zeta-chain/protocol-contracts/pkg/contracts/evm/erc20custody.sol" @@ -863,11 +864,16 @@ func getEVMRPC(ctx context.Context, endpoint string) (interfaces.EVMRPCClient, e client := &mocks.MockEvmClient{} return client, ethSigner, nil } + httpClient, err := metrics.GetInstrumentedHttpClient(endpoint) + if err != nil { + return nil, nil, err + } - client, err := ethclient.Dial(endpoint) + rpcClient, err := ethrpc.DialHTTPWithClient(endpoint, httpClient) if err != nil { return nil, nil, errors.Wrapf(err, "unable to dial EVM client (endpoint %q)", endpoint) } + client := ethclient.NewClient(rpcClient) chainID, err := client.ChainID(ctx) if err != nil { diff --git a/zetaclient/metrics/metrics.go b/zetaclient/metrics/metrics.go index 50d88b398b..0dd7b162e4 100644 --- a/zetaclient/metrics/metrics.go +++ b/zetaclient/metrics/metrics.go @@ -4,6 +4,7 @@ package metrics import ( "context" "net/http" + "net/url" "time" "github.com/prometheus/client_golang/prometheus" @@ -112,6 +113,34 @@ var ( Help: "Histogram of the TSS keysign latency", Buckets: []float64{1, 7, 15, 30, 60, 120, 240}, }, []string{"result"}) + + // RPCInProgress is a gauge that contains the number of RPCs requests in progress + RPCInProgress = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ZetaClientNamespace, + Name: "rpc_in_progress", + Help: "Number of RPC requests in progress", + }, []string{"host"}) + + // RPCCount is a counter that contains the number of total RPC requests + RPCCount = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: ZetaClientNamespace, + Name: "rpc_count", + Help: "A counter for number of total RPC requests", + }, + []string{"host", "code"}, + ) + + // RPCLatency is a histogram of the RPC latency + RPCLatency = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: ZetaClientNamespace, + Name: "rpc_duration_seconds", + Help: "A histogram of the RPC duration in seconds", + Buckets: prometheus.DefBuckets, + }, + []string{"host"}, + ) ) // NewMetrics creates a new Metrics instance @@ -151,3 +180,31 @@ func (m *Metrics) Stop() error { defer cancel() return m.s.Shutdown(ctx) } + +// getInstrumentedHttpClient sets up a http client that emits prometheus metrics +func GetInstrumentedHttpClient(endpoint string) (*http.Client, error) { + host := endpoint + // try to parse as url (so that we do not expose auth uuid in metrics) + endpointUrl, err := url.Parse(endpoint) + if err == nil { + host = endpointUrl.Host + } + labels := prometheus.Labels{"host": host} + rpcCounterMetric, err := RPCCount.CurryWith(labels) + if err != nil { + return nil, err + } + rpcLatencyMetric, err := RPCLatency.CurryWith(labels) + if err != nil { + return nil, err + } + + transport := http.DefaultTransport + transport = promhttp.InstrumentRoundTripperDuration(rpcLatencyMetric, transport) + transport = promhttp.InstrumentRoundTripperCounter(rpcCounterMetric, transport) + transport = promhttp.InstrumentRoundTripperInFlight(RPCInProgress.With(labels), transport) + + return &http.Client{ + Transport: transport, + }, nil +} diff --git a/zetaclient/metrics/metrics_test.go b/zetaclient/metrics/metrics_test.go index b73bf00530..596d0aaeab 100644 --- a/zetaclient/metrics/metrics_test.go +++ b/zetaclient/metrics/metrics_test.go @@ -1,10 +1,15 @@ package metrics import ( + "fmt" + "io" "net/http" + "strings" "testing" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" . "gopkg.in/check.v1" ) @@ -23,20 +28,52 @@ func (ms *MetricsSuite) SetUpSuite(c *C) { ms.m = m } +// assert that the curried metric actually uses the same underlying storage +func (ms *MetricsSuite) TestCurryWith(c *C) { + rpcTotalsC := RPCCount.MustCurryWith(prometheus.Labels{"host": "test"}) + rpcTotalsC.With(prometheus.Labels{"code": "400"}).Add(1.0) + + rpcCtr := testutil.ToFloat64(RPCCount.With(prometheus.Labels{"host": "test", "code": "400"})) + c.Assert(rpcCtr, Equals, 1.0) + + RPCCount.Reset() +} + func (ms *MetricsSuite) TestMetrics(c *C) { GetFilterLogsPerChain.WithLabelValues("chain1").Inc() GetFilterLogsPerChain.WithLabelValues("chain2").Inc() GetFilterLogsPerChain.WithLabelValues("chain2").Inc() time.Sleep(1 * time.Second) - res, err := http.Get("http://127.0.0.1:8886/metrics") + + chain1Ctr := testutil.ToFloat64(GetFilterLogsPerChain.WithLabelValues("chain1")) + c.Assert(chain1Ctr, Equals, 1.0) + + httpClient, err := GetInstrumentedHttpClient("http://127.0.0.1:8886") c.Assert(err, IsNil) - c.Assert(res.StatusCode, Equals, http.StatusOK) - defer res.Body.Close() - //out, err := ioutil.ReadAll(res.Body) - //fmt.Println(string(out)) - res, err = http.Get("http://127.0.0.1:8886") + res, err := httpClient.Get("http://127.0.0.1:8886") c.Assert(err, IsNil) + defer res.Body.Close() c.Assert(res.StatusCode, Equals, http.StatusOK) + + res, err = httpClient.Get("http://127.0.0.1:8886/metrics") + c.Assert(err, IsNil) defer res.Body.Close() + c.Assert(res.StatusCode, Equals, http.StatusOK) + body, err := io.ReadAll(res.Body) + c.Assert(err, IsNil) + metricsBody := string(body) + c.Assert(strings.Contains(metricsBody, fmt.Sprintf("%s_%s", ZetaClientNamespace, "rpc_count")), Equals, true) + + // assert that rpc count is being incremented at all + rpcCount := testutil.ToFloat64(RPCCount) + c.Assert(rpcCount, Equals, 2.0) + + // assert that rpc count is being incremented correctly + rpcCount = testutil.ToFloat64(RPCCount.With(prometheus.Labels{"host": "127.0.0.1:8886", "code": "200"})) + c.Assert(rpcCount, Equals, 2.0) + + // assert that rpc count is not being incremented incorrectly + rpcCount = testutil.ToFloat64(RPCCount.With(prometheus.Labels{"host": "127.0.0.1:8886", "code": "502"})) + c.Assert(rpcCount, Equals, 0.0) } diff --git a/zetaclient/orchestrator/bootstrap.go b/zetaclient/orchestrator/bootstrap.go index 04cd4646ea..8815fb3e6c 100644 --- a/zetaclient/orchestrator/bootstrap.go +++ b/zetaclient/orchestrator/bootstrap.go @@ -5,7 +5,9 @@ import ( ethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" + ethrpc "github.com/ethereum/go-ethereum/rpc" solrpc "github.com/gagliardetto/solana-go/rpc" + "github.com/gagliardetto/solana-go/rpc/jsonrpc" "github.com/pkg/errors" "github.com/zeta-chain/zetacore/pkg/chains" @@ -253,12 +255,17 @@ func syncObserverMap( continue } - // create EVM client - evmClient, err := ethclient.DialContext(ctx, evmConfig.Endpoint) + httpClient, err := metrics.GetInstrumentedHttpClient(evmConfig.Endpoint) + if err != nil { + logger.Std.Error().Err(err).Str("rpc.endpoint", evmConfig.Endpoint).Msgf("Unable to create HTTP client") + continue + } + rpcClient, err := ethrpc.DialHTTPWithClient(evmConfig.Endpoint, httpClient) if err != nil { logger.Std.Error().Err(err).Str("rpc.endpoint", evmConfig.Endpoint).Msgf("Unable to dial EVM RPC") continue } + evmClient := ethclient.NewClient(rpcClient) database, err := db.NewFromSqlite(dbpath, chain.Name, true) if err != nil { @@ -378,7 +385,15 @@ func syncObserverMap( continue } - rpcClient := solrpc.New(solConfig.Endpoint) + httpClient, err := metrics.GetInstrumentedHttpClient(solConfig.Endpoint) + if err != nil { + logger.Std.Error().Err(err).Str("rpc.endpoint", solConfig.Endpoint).Msgf("Unable to create HTTP client") + continue + } + jsonRpcClient := jsonrpc.NewClientWithOpts(solConfig.Endpoint, &jsonrpc.RPCClientOpts{ + HTTPClient: httpClient, + }) + rpcClient := solrpc.NewWithCustomRPCClient(jsonRpcClient) if rpcClient == nil { // should never happen logger.Std.Error().Msg("solana create Solana client error")