From 64c35e8a43d32be5a5ec007cbe283f3ec37b8cf6 Mon Sep 17 00:00:00 2001 From: ryan smith Date: Tue, 30 Apr 2024 16:47:37 -0700 Subject: [PATCH] shovel/web/metrics: add local/remote delta to prom metrics --- indexsupply.com/shovel/docs/index.md | 4 +++ shovel/web/web.go | 51 +++++++++++++--------------- 2 files changed, 28 insertions(+), 27 deletions(-) diff --git a/indexsupply.com/shovel/docs/index.md b/indexsupply.com/shovel/docs/index.md index 1175730..83cdb5f 100644 --- a/indexsupply.com/shovel/docs/index.md +++ b/indexsupply.com/shovel/docs/index.md @@ -1145,6 +1145,10 @@ shovel_rpc_ping{src="mainnet"} 127 # HELP shovel_rpc_ping_error number of errors in making basic rpc api request # TYPE shovel_rpc_ping_error gauge shovel_rpc_ping_error{src="mainnet"} 0 + +# HELP shovel_delta number of blocks between the source and the shovel database +# TYPE shovel_delta gauge +shovel_delta{src="mainnet"} 0 ``` This endpoint will iterate through all the [eth sources](#ethereum-sources) and query for the latest block on both the eth source and the `shovel.task_updates` table. Each source will use a separate Prometheus label. diff --git a/shovel/web/web.go b/shovel/web/web.go index f039ccd..189e628 100644 --- a/shovel/web/web.go +++ b/shovel/web/web.go @@ -185,13 +185,14 @@ func (h *Handler) Prom(w http.ResponseWriter, r *http.Request) { h.diagLastReq = time.Now() h.diagLastReqMut.Unlock() - checkPG := func(srcName string) []string { + checkSource := func(srcName string, src shovel.Source) []string { var ( - res []string - start = time.Now() - latest uint64 - nerr int + res []string + start = time.Now() + srcLatest, pgLatest uint64 + pgErr, srcErr int ) + // PG const q = ` select num from shovel.task_updates @@ -199,13 +200,13 @@ func (h *Handler) Prom(w http.ResponseWriter, r *http.Request) { order by num desc limit 1 ` - err := h.pgp.QueryRow(r.Context(), q, srcName).Scan(&latest) + err := h.pgp.QueryRow(r.Context(), q, srcName).Scan(&pgLatest) if err != nil { - nerr++ + pgErr++ } res = append(res, "# HELP shovel_latest_block_local last block processed") res = append(res, "# TYPE shovel_latest_block_local gauge") - res = append(res, fmt.Sprintf(`shovel_latest_block_local{src="%s"} %d`, srcName, latest)) + res = append(res, fmt.Sprintf(`shovel_latest_block_local{src="%s"} %d`, srcName, pgLatest)) res = append(res, "# HELP shovel_pg_ping number of ms to make basic status query") res = append(res, "# TYPE shovel_pg_ping gauge") @@ -213,31 +214,30 @@ func (h *Handler) Prom(w http.ResponseWriter, r *http.Request) { res = append(res, "# HELP shovel_pg_ping_error number of errors in making basic status query") res = append(res, "# TYPE shovel_pg_ping_error gauge") - res = append(res, fmt.Sprintf(`shovel_pg_ping_error %d`, nerr)) - return res - } - checkSrc := func(sname string, src shovel.Source) []string { - var ( - start = time.Now() - res []string - nerr int - ) - n, _, err := src.Latest(r.Context(), 0) + res = append(res, fmt.Sprintf(`shovel_pg_ping_error %d`, pgErr)) + + // Source + start = time.Now() + srcLatest, _, err = src.Latest(r.Context(), 0) if err != nil { - nerr++ + srcErr++ } - res = append(res, "# HELP shovel_latest_block_remote latest block height from rpc api") res = append(res, "# TYPE shovel_latest_block_remote gauge") - res = append(res, fmt.Sprintf(`shovel_latest_block_remote{src="%s"} %d`, sname, n)) + res = append(res, fmt.Sprintf(`shovel_latest_block_remote{src="%s"} %d`, srcName, srcLatest)) res = append(res, "# HELP shovel_rpc_ping number of ms to make a basic http request to rpc api") res = append(res, "# TYPE shovel_rpc_ping gauge") - res = append(res, fmt.Sprintf(`shovel_rpc_ping{src="%s"} %d`, sname, uint64(time.Since(start)/time.Millisecond))) + res = append(res, fmt.Sprintf(`shovel_rpc_ping{src="%s"} %d`, srcName, uint64(time.Since(start)/time.Millisecond))) res = append(res, "# HELP shovel_rpc_ping_error number of errors in making basic rpc api request") res = append(res, "# TYPE shovel_rpc_ping_error gauge") - res = append(res, fmt.Sprintf(`shovel_rpc_ping_error{src="%s"} %d`, sname, nerr)) + res = append(res, fmt.Sprintf(`shovel_rpc_ping_error{src="%s"} %d`, srcName, srcErr)) + + // Delta + res = append(res, "# HELP shovel_delta number of blocks between the source and the shovel database") + res = append(res, "# TYPE shovel_delta gauge") + res = append(res, fmt.Sprintf(`shovel_delta{src="%s"} %d`, srcName, srcLatest-pgLatest)) return res } @@ -249,10 +249,7 @@ func (h *Handler) Prom(w http.ResponseWriter, r *http.Request) { var res []string for _, sc := range scs { src := jrpc2.New(sc.URL) - for _, line := range checkPG(sc.Name) { - res = append(res, line) - } - for _, line := range checkSrc(sc.Name, src) { + for _, line := range checkSource(sc.Name, src) { res = append(res, line) } }