From eed223c8f445e1fd9001655938063da5d26b2530 Mon Sep 17 00:00:00 2001 From: skosito Date: Fri, 7 Jun 2024 17:58:34 +0100 Subject: [PATCH] feat: revert telemetry server changes (#2325) --- changelog.md | 1 + .../chains/bitcoin/observer/observer.go | 5 +- zetaclient/chains/evm/observer/observer.go | 2 +- zetaclient/metrics/telemetry.go | 128 ++++++++++++++++-- zetaclient/orchestrator/orchestrator.go | 2 +- 5 files changed, 125 insertions(+), 13 deletions(-) diff --git a/changelog.md b/changelog.md index 598ce8dd6a..e2a0db4753 100644 --- a/changelog.md +++ b/changelog.md @@ -25,6 +25,7 @@ * [2305](https://github.com/zeta-chain/node/pull/2305) - add new messages `MsgAddAuthorization` and `MsgRemoveAuthorization` that can be used to update the authorization list * [2313](https://github.com/zeta-chain/node/pull/2313) - add `CheckAuthorization` function to replace the `IsAuthorized` function. The new function uses the authorization list to verify the signer's authorization. * [2312](https://github.com/zeta-chain/node/pull/2312) - add queries `ShowAuthorization` and `ListAuthorizations` +* [2325](https://github.com/zeta-chain/node/pull/2325) - revert telemetry server changes ### Refactor diff --git a/zetaclient/chains/bitcoin/observer/observer.go b/zetaclient/chains/bitcoin/observer/observer.go index 4726dc595a..062f00c2c7 100644 --- a/zetaclient/chains/bitcoin/observer/observer.go +++ b/zetaclient/chains/bitcoin/observer/observer.go @@ -356,7 +356,8 @@ func (ob *Observer) GetLastBlockHeight() int64 { func (ob *Observer) SetLastBlockHeightScanned(height int64) { atomic.StoreInt64(&ob.lastBlockScanned, height) - metrics.LastScannedBlockNumber.WithLabelValues(ob.chain.ChainName.String()).Set(float64(height)) + // #nosec G701 checked as positive + ob.ts.SetLastScannedBlockNumber(ob.chain, uint64(height)) } func (ob *Observer) GetLastBlockHeightScanned() int64 { @@ -594,7 +595,7 @@ func (ob *Observer) FetchUTXOs() error { } ob.Mu.Lock() - metrics.NumberOfUTXO.Set(float64(len(utxosFiltered))) + ob.ts.SetNumberOfUTXOs(len(utxosFiltered)) ob.utxos = utxosFiltered ob.Mu.Unlock() return nil diff --git a/zetaclient/chains/evm/observer/observer.go b/zetaclient/chains/evm/observer/observer.go index 4a38a62d12..795aa3b13b 100644 --- a/zetaclient/chains/evm/observer/observer.go +++ b/zetaclient/chains/evm/observer/observer.go @@ -422,7 +422,7 @@ func (ob *Observer) CheckTxInclusion(tx *ethtypes.Transaction, receipt *ethtypes // SetLastBlockHeightScanned set last block height scanned (not necessarily caught up with external block; could be slow/paused) func (ob *Observer) SetLastBlockHeightScanned(height uint64) { atomic.StoreUint64(&ob.lastBlockScanned, height) - metrics.LastScannedBlockNumber.WithLabelValues(ob.chain.ChainName.String()).Set(float64(height)) + ob.ts.SetLastScannedBlockNumber(ob.chain, height) } // GetLastBlockHeightScanned get last block height scanned (not necessarily caught up with external block; could be slow/paused) diff --git a/zetaclient/metrics/telemetry.go b/zetaclient/metrics/telemetry.go index bdedbd9c50..f49c1bf7b1 100644 --- a/zetaclient/metrics/telemetry.go +++ b/zetaclient/metrics/telemetry.go @@ -2,6 +2,7 @@ package metrics import ( "context" + "encoding/json" "errors" "fmt" "net/http" @@ -11,23 +12,33 @@ import ( "github.com/gorilla/mux" "github.com/rs/zerolog" "github.com/rs/zerolog/log" + + "github.com/zeta-chain/zetacore/pkg/chains" + "github.com/zeta-chain/zetacore/pkg/constant" + "github.com/zeta-chain/zetacore/zetaclient/types" ) -// TelemetryServer provide http endpoint for Tss server +// TelemetryServer provides http endpoint for Tss server type TelemetryServer struct { - logger zerolog.Logger - s *http.Server - p2pid string - mu sync.Mutex - ipAddress string - HotKeyBurnRate *BurnRate + logger zerolog.Logger + s *http.Server + p2pid string + lastScannedBlockNumber map[int64]uint64 // chainID => block number + lastCoreBlockNumber int64 + mu sync.Mutex + lastStartTimestamp time.Time + status types.Status + ipAddress string + HotKeyBurnRate *BurnRate } // NewTelemetryServer should only listen to the loopback func NewTelemetryServer() *TelemetryServer { hs := &TelemetryServer{ - logger: log.With().Str("module", "http").Logger(), - HotKeyBurnRate: NewBurnRate(100), + logger: log.With().Str("module", "http").Logger(), + lastScannedBlockNumber: make(map[int64]uint64), + lastStartTimestamp: time.Now(), + HotKeyBurnRate: NewBurnRate(100), } s := &http.Server{ Addr: ":8123", @@ -67,6 +78,51 @@ func (t *TelemetryServer) GetIPAddress() string { return t.ipAddress } +// GetLastStartTimestamp returns last start timestamp +func (t *TelemetryServer) GetLastStartTimestamp() time.Time { + t.mu.Lock() + defer t.mu.Unlock() + return t.lastStartTimestamp +} + +// SetLastScannedBlockNumber last scanned block number for chain in telemetry and metrics +func (t *TelemetryServer) SetLastScannedBlockNumber(chain chains.Chain, blockNumber uint64) { + t.mu.Lock() + t.lastScannedBlockNumber[chain.ChainId] = blockNumber + LastScannedBlockNumber.WithLabelValues(chain.ChainName.String()).Set(float64(blockNumber)) + t.mu.Unlock() +} + +// GetLastScannedBlockNumber returns last scanned block number for chain +func (t *TelemetryServer) GetLastScannedBlockNumber(chainID int64) uint64 { + t.mu.Lock() + defer t.mu.Unlock() + return t.lastScannedBlockNumber[chainID] +} + +// SetCoreBlockNumber sets core block number in telemetry and metrics +func (t *TelemetryServer) SetCoreBlockNumber(blockNumber int64) { + t.mu.Lock() + t.lastCoreBlockNumber = blockNumber + LastCoreBlockNumber.Set(float64(blockNumber)) + t.mu.Unlock() +} + +// GetCoreBlockNumber returns core block number +func (t *TelemetryServer) GetCoreBlockNumber() int64 { + t.mu.Lock() + defer t.mu.Unlock() + return t.lastCoreBlockNumber +} + +// SetNumberOfUTXOs sets number of UTXOs in telemetry and metrics +func (t *TelemetryServer) SetNumberOfUTXOs(numberOfUTXOs int) { + t.mu.Lock() + t.status.BTCNumberOfUTXOs = numberOfUTXOs + NumberOfUTXO.Set(float64(numberOfUTXOs)) + t.mu.Unlock() +} + // AddFeeEntry adds fee entry func (t *TelemetryServer) AddFeeEntry(block int64, amount int64) { t.mu.Lock() @@ -82,6 +138,11 @@ func (t *TelemetryServer) Handlers() http.Handler { router := mux.NewRouter() router.Handle("/ping", http.HandlerFunc(t.pingHandler)).Methods(http.MethodGet) router.Handle("/p2p", http.HandlerFunc(t.p2pHandler)).Methods(http.MethodGet) + router.Handle("/version", http.HandlerFunc(t.versionHandler)).Methods(http.MethodGet) + router.Handle("/lastscannedblock", http.HandlerFunc(t.lastScannedBlockHandler)).Methods(http.MethodGet) + router.Handle("/laststarttimestamp", http.HandlerFunc(t.lastStartTimestampHandler)).Methods(http.MethodGet) + router.Handle("/lastcoreblock", http.HandlerFunc(t.lastCoreBlockHandler)).Methods(http.MethodGet) + router.Handle("/status", http.HandlerFunc(t.statusHandler)).Methods(http.MethodGet) router.Handle("/ip", http.HandlerFunc(t.ipHandler)).Methods(http.MethodGet) router.Handle("/hotkeyburnrate", http.HandlerFunc(t.hotKeyFeeBurnRate)).Methods(http.MethodGet) @@ -90,6 +151,7 @@ func (t *TelemetryServer) Handlers() http.Handler { return router } +// Start starts telemetry server func (t *TelemetryServer) Start() error { if t.s == nil { return errors.New("invalid http server instance") @@ -103,6 +165,7 @@ func (t *TelemetryServer) Start() error { return nil } +// Stop stops telemetry server func (t *TelemetryServer) Stop() error { c, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -131,6 +194,53 @@ func (t *TelemetryServer) ipHandler(w http.ResponseWriter, _ *http.Request) { fmt.Fprintf(w, "%s", t.ipAddress) } +func (t *TelemetryServer) lastScannedBlockHandler(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "application/json") + + t.mu.Lock() + defer t.mu.Unlock() + // Convert map to JSON + jsonBytes, err := json.Marshal(t.lastScannedBlockNumber) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + _, err = w.Write(jsonBytes) + if err != nil { + t.logger.Error().Err(err).Msg("Failed to write response") + } +} + +func (t *TelemetryServer) lastCoreBlockHandler(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + t.mu.Lock() + defer t.mu.Unlock() + fmt.Fprintf(w, "%d", t.lastCoreBlockNumber) +} + +func (t *TelemetryServer) statusHandler(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + t.mu.Lock() + defer t.mu.Unlock() + s, err := json.MarshalIndent(t.status, "", "\t") + if err != nil { + t.logger.Error().Err(err).Msg("Failed to marshal status") + } + fmt.Fprintf(w, "%s", s) +} + +func (t *TelemetryServer) versionHandler(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + fmt.Fprintf(w, "%s", constant.Version) +} + +func (t *TelemetryServer) lastStartTimestampHandler(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + t.mu.Lock() + defer t.mu.Unlock() + fmt.Fprintf(w, "%s", t.lastStartTimestamp.Format(time.RFC3339)) +} + func (t *TelemetryServer) hotKeyFeeBurnRate(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) t.mu.Lock() diff --git a/zetaclient/orchestrator/orchestrator.go b/zetaclient/orchestrator/orchestrator.go index 7967231de0..1407557d3e 100644 --- a/zetaclient/orchestrator/orchestrator.go +++ b/zetaclient/orchestrator/orchestrator.go @@ -325,7 +325,7 @@ func (oc *Orchestrator) StartCctxScheduler(appContext *context.AppContext) { // update last processed block number lastBlockNum = bn - metrics.LastCoreBlockNumber.Set(float64(lastBlockNum)) + oc.ts.SetCoreBlockNumber(lastBlockNum) } } }