From 2a3281f47e91be1f155e11ad5e565c1c480b9d43 Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Mon, 13 May 2024 17:40:25 -0700 Subject: [PATCH 1/3] Refactor metrics on AVS sync --- avssync/avssync.go | 26 +++++++++++++----- avssync/metrics.go | 67 +++++++++++++++++++++++++++++++--------------- 2 files changed, 65 insertions(+), 28 deletions(-) diff --git a/avssync/avssync.go b/avssync/avssync.go index d7ac34f..017d581 100644 --- a/avssync/avssync.go +++ b/avssync/avssync.go @@ -30,6 +30,7 @@ type AvsSync struct { readerTimeoutDuration time.Duration writerTimeoutDuration time.Duration prometheusServerAddr string + Metrics *Metrics } // NewAvsSync creates a new AvsSync object @@ -45,6 +46,9 @@ func NewAvsSync( readerTimeoutDuration time.Duration, writerTimeoutDuration time.Duration, prometheusServerAddr string, ) *AvsSync { + promReg := prometheus.NewRegistry() + metrics := NewMetrics(promReg) + return &AvsSync{ AvsReader: avsReader, AvsWriter: avsWriter, @@ -58,6 +62,7 @@ func NewAvsSync( readerTimeoutDuration: readerTimeoutDuration, writerTimeoutDuration: writerTimeoutDuration, prometheusServerAddr: prometheusServerAddr, + Metrics: metrics, } } @@ -76,7 +81,7 @@ func (a *AvsSync) Start(ctx context.Context) { ) if a.prometheusServerAddr != "" { - StartMetricsServer(a.prometheusServerAddr) + a.Metrics.Start(a.prometheusServerAddr) } else { a.logger.Info("Prometheus server address not set, not starting metrics server") } @@ -129,11 +134,13 @@ func (a *AvsSync) updateStakes() { receipt, err := a.AvsWriter.UpdateStakesOfOperatorSubsetForAllQuorums(timeoutCtx, a.operators) if err != nil { // no quorum label means we are updating all quorums - updateStakeAttempt.With(prometheus.Labels{"status": string(UpdateStakeStatusError), "quorum": ""}).Inc() + for _, quorum := range a.quorums { + a.Metrics.UpdateStakeAttempt(UpdateStakeStatusError, strconv.Itoa(int(quorum))) + } a.logger.Error("Error updating stakes of operator subset for all quorums", err) return } else if receipt.Status == gethtypes.ReceiptStatusFailed { - txRevertedTotal.Inc() + a.Metrics.TxRevertedTotalInc() a.logger.Error("Update stakes of operator subset for all quorums reverted") return } @@ -178,7 +185,6 @@ func (a *AvsSync) tryNTimesUpdateStakesOfEntireOperatorSetForQuorum(quorum byte, } var operators []common.Address operators = append(operators, operatorAddrsPerQuorum[0]...) - operatorsUpdated.With(prometheus.Labels{"quorum": strconv.Itoa(int(quorum))}).Set(float64(len(operators))) sort.Slice(operators, func(i, j int) bool { return operators[i].Big().Cmp(operators[j].Big()) < 0 }) @@ -191,14 +197,20 @@ func (a *AvsSync) tryNTimesUpdateStakesOfEntireOperatorSetForQuorum(quorum byte, continue } if receipt.Status == gethtypes.ReceiptStatusFailed { - txRevertedTotal.Inc() + a.Metrics.TxRevertedTotalInc() a.logger.Error("Update stakes of entire operator set for quorum reverted", "quorum", int(quorum)) continue } - updateStakeAttempt.With(prometheus.Labels{"status": string(UpdateStakeStatusSucceed), "quorum": strconv.Itoa(int(quorum))}).Inc() + + // Update metrics on success + a.Metrics.UpdateStakeAttempt(UpdateStakeStatusSucceed, strconv.Itoa(int(quorum))) + a.Metrics.OperatorsUpdatedSet(strconv.Itoa(int(quorum)), len(operators)) + return } - updateStakeAttempt.With(prometheus.Labels{"status": string(UpdateStakeStatusError), "quorum": strconv.Itoa(int(quorum))}).Inc() + + // Update metrics on failure + a.Metrics.UpdateStakeAttempt(UpdateStakeStatusError, strconv.Itoa(int(quorum))) a.logger.Error("Giving up after retrying", "retryNTimes", retryNTimes) } diff --git a/avssync/metrics.go b/avssync/metrics.go index ac97774..8cce264 100644 --- a/avssync/metrics.go +++ b/avssync/metrics.go @@ -17,28 +17,53 @@ const ( UpdateStakeStatusSucceed UpdateStakeStatus = "succeed" ) -var ( - updateStakeAttempt = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: metricsNamespace, - Name: "update_stake_attempt", - Help: "Result from an update stake attempt. Either succeed or error (either tx was mined but reverted, or failed to get processed by chain).", - }, []string{"status", "quorum"}) - txRevertedTotal = promauto.NewCounter(prometheus.CounterOpts{ - Namespace: metricsNamespace, - Name: "tx_reverted_total", - Help: "The total number of transactions that made it onchain but reverted (most likely because out of gas)", - }) - operatorsUpdated = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: metricsNamespace, - Name: "operators_updated", - Help: "The total number of operators updated (during the last quorum sync)", - }, []string{"quorum"}) -) +type Metrics struct { + UpdateStakeAttempts *prometheus.CounterVec + TxRevertedTotal prometheus.Counter + OperatorsUpdated *prometheus.GaugeVec + + registry *prometheus.Registry +} + +func NewMetrics(reg *prometheus.Registry) *Metrics { + metrics := &Metrics{ + UpdateStakeAttempts: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Name: "update_stake_attempt", + Help: "Result from an update stake attempt. Either succeed or error (either tx was mined but reverted, or failed to get processed by chain).", + }, []string{"status", "quorum"}), + + TxRevertedTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Name: "tx_reverted_total", + Help: "The total number of transactions that made it onchain but reverted (most likely because out of gas)", + }), + + OperatorsUpdated: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Name: "operators_updated", + Help: "The total number of operators updated (during the last quorum sync)", + }, []string{"quorum"}), + } + + return metrics +} + +func (g *Metrics) UpdateStakeAttempt(status UpdateStakeStatus, quorum string) { + g.UpdateStakeAttempts.WithLabelValues(string(status), quorum).Inc() +} + +func (g *Metrics) TxRevertedTotalInc() { + g.TxRevertedTotal.Inc() +} + +// operatorsUpdated.With(prometheus.Labels{"quorum": strconv.Itoa(int(quorum))}).Set(float64(len(operators))) +func (g *Metrics) OperatorsUpdatedSet(quorum string, operators int) { + g.OperatorsUpdated.WithLabelValues(quorum).Set(float64(operators)) +} -func StartMetricsServer(metricsAddr string) { - registry := prometheus.NewRegistry() - registry.MustRegister(updateStakeAttempt, txRevertedTotal, operatorsUpdated) - http.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{})) +func (g *Metrics) Start(metricsAddr string) { + http.Handle("/metrics", promhttp.HandlerFor(g.registry, promhttp.HandlerOpts{})) // not sure if we need to handle this error, since if metric server errors, then we will get alerts from grafana go func() { _ = http.ListenAndServe(metricsAddr, nil) From b2662ce6cd6f24130d4b1b4682fb23891316c608 Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Mon, 13 May 2024 17:53:12 -0700 Subject: [PATCH 2/3] Init prometheus registry in main.go --- avssync/avssync.go | 4 ++-- avssync/metrics.go | 2 ++ integration_test.go | 2 ++ main.go | 6 ++++++ 4 files changed, 12 insertions(+), 2 deletions(-) diff --git a/avssync/avssync.go b/avssync/avssync.go index 017d581..7fcc097 100644 --- a/avssync/avssync.go +++ b/avssync/avssync.go @@ -45,9 +45,9 @@ func NewAvsSync( quorums []byte, fetchQuorumsDynamically bool, retrySyncNTimes int, readerTimeoutDuration time.Duration, writerTimeoutDuration time.Duration, prometheusServerAddr string, + prometheusRegistry *prometheus.Registry, ) *AvsSync { - promReg := prometheus.NewRegistry() - metrics := NewMetrics(promReg) + metrics := NewMetrics(prometheusRegistry, prometheusServerAddr) return &AvsSync{ AvsReader: avsReader, diff --git a/avssync/metrics.go b/avssync/metrics.go index 8cce264..6c72e16 100644 --- a/avssync/metrics.go +++ b/avssync/metrics.go @@ -44,6 +44,8 @@ func NewMetrics(reg *prometheus.Registry) *Metrics { Name: "operators_updated", Help: "The total number of operators updated (during the last quorum sync)", }, []string{"quorum"}), + + registry: reg, } return metrics diff --git a/integration_test.go b/integration_test.go index 8b2c93c..6a18fdc 100644 --- a/integration_test.go +++ b/integration_test.go @@ -15,6 +15,7 @@ import ( "github.com/ethereum/go-ethereum/common" gethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" + "github.com/prometheus/client_golang/prometheus" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" "go.uber.org/mock/gomock" @@ -274,6 +275,7 @@ func NewAvsSyncComponents(t *testing.T, anvilHttpEndpoint string, contractAddres time.Second, time.Second, "", // no metrics server (otherwise parallel tests all try to start server at same endpoint and error out) + prometheus.NewRegistry(), ) return &AvsSyncComponents{ avsSync: avsSync, diff --git a/main.go b/main.go index 6fc78e4..c7f0bb4 100644 --- a/main.go +++ b/main.go @@ -19,6 +19,7 @@ import ( "github.com/Layr-Labs/eigensdk-go/signerv2" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" + "github.com/prometheus/client_golang/prometheus" "github.com/urfave/cli" ) @@ -208,6 +209,10 @@ func avsSyncMain(cliCtx *cli.Context) error { sleepBeforeFirstSyncDuration = firstSyncTime.Sub(now) } logger.Infof("Sleeping for %v before first sync, so that it happens at %v", sleepBeforeFirstSyncDuration, time.Now().Add(sleepBeforeFirstSyncDuration)) + + // Create new prometheus registry + reg := prometheus.NewRegistry() + avsSync := avssync.NewAvsSync( logger, avsReader, @@ -221,6 +226,7 @@ func avsSyncMain(cliCtx *cli.Context) error { readerTimeout, writerTimeout, cliCtx.String(MetricsAddrFlag.Name), + reg, ) avsSync.Start(context.Background()) From 0130407eb3b766c8e6a879095601251be1af3f48 Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Tue, 14 May 2024 09:37:06 -0700 Subject: [PATCH 3/3] Address pr feedback --- avssync/avssync.go | 8 ++++---- avssync/metrics.go | 21 ++++++++++----------- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/avssync/avssync.go b/avssync/avssync.go index 7fcc097..0af292d 100644 --- a/avssync/avssync.go +++ b/avssync/avssync.go @@ -47,7 +47,7 @@ func NewAvsSync( prometheusServerAddr string, prometheusRegistry *prometheus.Registry, ) *AvsSync { - metrics := NewMetrics(prometheusRegistry, prometheusServerAddr) + metrics := NewMetrics(prometheusRegistry) return &AvsSync{ AvsReader: avsReader, @@ -135,7 +135,7 @@ func (a *AvsSync) updateStakes() { if err != nil { // no quorum label means we are updating all quorums for _, quorum := range a.quorums { - a.Metrics.UpdateStakeAttempt(UpdateStakeStatusError, strconv.Itoa(int(quorum))) + a.Metrics.UpdateStakeAttemptInc(UpdateStakeStatusError, strconv.Itoa(int(quorum))) } a.logger.Error("Error updating stakes of operator subset for all quorums", err) return @@ -203,14 +203,14 @@ func (a *AvsSync) tryNTimesUpdateStakesOfEntireOperatorSetForQuorum(quorum byte, } // Update metrics on success - a.Metrics.UpdateStakeAttempt(UpdateStakeStatusSucceed, strconv.Itoa(int(quorum))) + a.Metrics.UpdateStakeAttemptInc(UpdateStakeStatusSucceed, strconv.Itoa(int(quorum))) a.Metrics.OperatorsUpdatedSet(strconv.Itoa(int(quorum)), len(operators)) return } // Update metrics on failure - a.Metrics.UpdateStakeAttempt(UpdateStakeStatusError, strconv.Itoa(int(quorum))) + a.Metrics.UpdateStakeAttemptInc(UpdateStakeStatusError, strconv.Itoa(int(quorum))) a.logger.Error("Giving up after retrying", "retryNTimes", retryNTimes) } diff --git a/avssync/metrics.go b/avssync/metrics.go index 6c72e16..1934d14 100644 --- a/avssync/metrics.go +++ b/avssync/metrics.go @@ -18,28 +18,28 @@ const ( ) type Metrics struct { - UpdateStakeAttempts *prometheus.CounterVec - TxRevertedTotal prometheus.Counter - OperatorsUpdated *prometheus.GaugeVec + updateStakeAttempts *prometheus.CounterVec + txRevertedTotal prometheus.Counter + operatorsUpdated *prometheus.GaugeVec registry *prometheus.Registry } func NewMetrics(reg *prometheus.Registry) *Metrics { metrics := &Metrics{ - UpdateStakeAttempts: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + updateStakeAttempts: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Namespace: metricsNamespace, Name: "update_stake_attempt", Help: "Result from an update stake attempt. Either succeed or error (either tx was mined but reverted, or failed to get processed by chain).", }, []string{"status", "quorum"}), - TxRevertedTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + txRevertedTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, Name: "tx_reverted_total", Help: "The total number of transactions that made it onchain but reverted (most likely because out of gas)", }), - OperatorsUpdated: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + operatorsUpdated: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Namespace: metricsNamespace, Name: "operators_updated", Help: "The total number of operators updated (during the last quorum sync)", @@ -51,17 +51,16 @@ func NewMetrics(reg *prometheus.Registry) *Metrics { return metrics } -func (g *Metrics) UpdateStakeAttempt(status UpdateStakeStatus, quorum string) { - g.UpdateStakeAttempts.WithLabelValues(string(status), quorum).Inc() +func (g *Metrics) UpdateStakeAttemptInc(status UpdateStakeStatus, quorum string) { + g.updateStakeAttempts.WithLabelValues(string(status), quorum).Inc() } func (g *Metrics) TxRevertedTotalInc() { - g.TxRevertedTotal.Inc() + g.txRevertedTotal.Inc() } -// operatorsUpdated.With(prometheus.Labels{"quorum": strconv.Itoa(int(quorum))}).Set(float64(len(operators))) func (g *Metrics) OperatorsUpdatedSet(quorum string, operators int) { - g.OperatorsUpdated.WithLabelValues(quorum).Set(float64(operators)) + g.operatorsUpdated.WithLabelValues(quorum).Set(float64(operators)) } func (g *Metrics) Start(metricsAddr string) {