diff --git a/avssync/avssync.go b/avssync/avssync.go index d7ac34f..0af292d 100644 --- a/avssync/avssync.go +++ b/avssync/avssync.go @@ -30,6 +30,7 @@ type AvsSync struct { readerTimeoutDuration time.Duration writerTimeoutDuration time.Duration prometheusServerAddr string + Metrics *Metrics } // NewAvsSync creates a new AvsSync object @@ -44,7 +45,10 @@ func NewAvsSync( quorums []byte, fetchQuorumsDynamically bool, retrySyncNTimes int, readerTimeoutDuration time.Duration, writerTimeoutDuration time.Duration, prometheusServerAddr string, + prometheusRegistry *prometheus.Registry, ) *AvsSync { + metrics := NewMetrics(prometheusRegistry) + return &AvsSync{ AvsReader: avsReader, AvsWriter: avsWriter, @@ -58,6 +62,7 @@ func NewAvsSync( readerTimeoutDuration: readerTimeoutDuration, writerTimeoutDuration: writerTimeoutDuration, prometheusServerAddr: prometheusServerAddr, + Metrics: metrics, } } @@ -76,7 +81,7 @@ func (a *AvsSync) Start(ctx context.Context) { ) if a.prometheusServerAddr != "" { - StartMetricsServer(a.prometheusServerAddr) + a.Metrics.Start(a.prometheusServerAddr) } else { a.logger.Info("Prometheus server address not set, not starting metrics server") } @@ -129,11 +134,13 @@ func (a *AvsSync) updateStakes() { receipt, err := a.AvsWriter.UpdateStakesOfOperatorSubsetForAllQuorums(timeoutCtx, a.operators) if err != nil { // no quorum label means we are updating all quorums - updateStakeAttempt.With(prometheus.Labels{"status": string(UpdateStakeStatusError), "quorum": ""}).Inc() + for _, quorum := range a.quorums { + a.Metrics.UpdateStakeAttemptInc(UpdateStakeStatusError, strconv.Itoa(int(quorum))) + } a.logger.Error("Error updating stakes of operator subset for all quorums", err) return } else if receipt.Status == gethtypes.ReceiptStatusFailed { - txRevertedTotal.Inc() + a.Metrics.TxRevertedTotalInc() a.logger.Error("Update stakes of operator subset for all quorums reverted") return } @@ -178,7 +185,6 @@ func (a *AvsSync) tryNTimesUpdateStakesOfEntireOperatorSetForQuorum(quorum byte, } var operators []common.Address operators = append(operators, operatorAddrsPerQuorum[0]...) - operatorsUpdated.With(prometheus.Labels{"quorum": strconv.Itoa(int(quorum))}).Set(float64(len(operators))) sort.Slice(operators, func(i, j int) bool { return operators[i].Big().Cmp(operators[j].Big()) < 0 }) @@ -191,14 +197,20 @@ func (a *AvsSync) tryNTimesUpdateStakesOfEntireOperatorSetForQuorum(quorum byte, continue } if receipt.Status == gethtypes.ReceiptStatusFailed { - txRevertedTotal.Inc() + a.Metrics.TxRevertedTotalInc() a.logger.Error("Update stakes of entire operator set for quorum reverted", "quorum", int(quorum)) continue } - updateStakeAttempt.With(prometheus.Labels{"status": string(UpdateStakeStatusSucceed), "quorum": strconv.Itoa(int(quorum))}).Inc() + + // Update metrics on success + a.Metrics.UpdateStakeAttemptInc(UpdateStakeStatusSucceed, strconv.Itoa(int(quorum))) + a.Metrics.OperatorsUpdatedSet(strconv.Itoa(int(quorum)), len(operators)) + return } - updateStakeAttempt.With(prometheus.Labels{"status": string(UpdateStakeStatusError), "quorum": strconv.Itoa(int(quorum))}).Inc() + + // Update metrics on failure + a.Metrics.UpdateStakeAttemptInc(UpdateStakeStatusError, strconv.Itoa(int(quorum))) a.logger.Error("Giving up after retrying", "retryNTimes", retryNTimes) } diff --git a/avssync/metrics.go b/avssync/metrics.go index ac97774..1934d14 100644 --- a/avssync/metrics.go +++ b/avssync/metrics.go @@ -17,28 +17,54 @@ const ( UpdateStakeStatusSucceed UpdateStakeStatus = "succeed" ) -var ( - updateStakeAttempt = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: metricsNamespace, - Name: "update_stake_attempt", - Help: "Result from an update stake attempt. Either succeed or error (either tx was mined but reverted, or failed to get processed by chain).", - }, []string{"status", "quorum"}) - txRevertedTotal = promauto.NewCounter(prometheus.CounterOpts{ - Namespace: metricsNamespace, - Name: "tx_reverted_total", - Help: "The total number of transactions that made it onchain but reverted (most likely because out of gas)", - }) - operatorsUpdated = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: metricsNamespace, - Name: "operators_updated", - Help: "The total number of operators updated (during the last quorum sync)", - }, []string{"quorum"}) -) +type Metrics struct { + updateStakeAttempts *prometheus.CounterVec + txRevertedTotal prometheus.Counter + operatorsUpdated *prometheus.GaugeVec + + registry *prometheus.Registry +} + +func NewMetrics(reg *prometheus.Registry) *Metrics { + metrics := &Metrics{ + updateStakeAttempts: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Name: "update_stake_attempt", + Help: "Result from an update stake attempt. Either succeed or error (either tx was mined but reverted, or failed to get processed by chain).", + }, []string{"status", "quorum"}), + + txRevertedTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Name: "tx_reverted_total", + Help: "The total number of transactions that made it onchain but reverted (most likely because out of gas)", + }), + + operatorsUpdated: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Name: "operators_updated", + Help: "The total number of operators updated (during the last quorum sync)", + }, []string{"quorum"}), + + registry: reg, + } + + return metrics +} + +func (g *Metrics) UpdateStakeAttemptInc(status UpdateStakeStatus, quorum string) { + g.updateStakeAttempts.WithLabelValues(string(status), quorum).Inc() +} + +func (g *Metrics) TxRevertedTotalInc() { + g.txRevertedTotal.Inc() +} + +func (g *Metrics) OperatorsUpdatedSet(quorum string, operators int) { + g.operatorsUpdated.WithLabelValues(quorum).Set(float64(operators)) +} -func StartMetricsServer(metricsAddr string) { - registry := prometheus.NewRegistry() - registry.MustRegister(updateStakeAttempt, txRevertedTotal, operatorsUpdated) - http.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{})) +func (g *Metrics) Start(metricsAddr string) { + http.Handle("/metrics", promhttp.HandlerFor(g.registry, promhttp.HandlerOpts{})) // not sure if we need to handle this error, since if metric server errors, then we will get alerts from grafana go func() { _ = http.ListenAndServe(metricsAddr, nil) diff --git a/integration_test.go b/integration_test.go index 8b2c93c..6a18fdc 100644 --- a/integration_test.go +++ b/integration_test.go @@ -15,6 +15,7 @@ import ( "github.com/ethereum/go-ethereum/common" gethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" + "github.com/prometheus/client_golang/prometheus" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" "go.uber.org/mock/gomock" @@ -274,6 +275,7 @@ func NewAvsSyncComponents(t *testing.T, anvilHttpEndpoint string, contractAddres time.Second, time.Second, "", // no metrics server (otherwise parallel tests all try to start server at same endpoint and error out) + prometheus.NewRegistry(), ) return &AvsSyncComponents{ avsSync: avsSync, diff --git a/main.go b/main.go index 6fc78e4..c7f0bb4 100644 --- a/main.go +++ b/main.go @@ -19,6 +19,7 @@ import ( "github.com/Layr-Labs/eigensdk-go/signerv2" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" + "github.com/prometheus/client_golang/prometheus" "github.com/urfave/cli" ) @@ -208,6 +209,10 @@ func avsSyncMain(cliCtx *cli.Context) error { sleepBeforeFirstSyncDuration = firstSyncTime.Sub(now) } logger.Infof("Sleeping for %v before first sync, so that it happens at %v", sleepBeforeFirstSyncDuration, time.Now().Add(sleepBeforeFirstSyncDuration)) + + // Create new prometheus registry + reg := prometheus.NewRegistry() + avsSync := avssync.NewAvsSync( logger, avsReader, @@ -221,6 +226,7 @@ func avsSyncMain(cliCtx *cli.Context) error { readerTimeout, writerTimeout, cliCtx.String(MetricsAddrFlag.Name), + reg, ) avsSync.Start(context.Background())