Skip to content

Commit

Permalink
feat: revert telemetry server changes (#2325)
Browse files Browse the repository at this point in the history
  • Loading branch information
skosito authored Jun 7, 2024
1 parent 3fd9c9a commit eed223c
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 13 deletions.
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* [2305](https://github.com/zeta-chain/node/pull/2305) - add new messages `MsgAddAuthorization` and `MsgRemoveAuthorization` that can be used to update the authorization list
* [2313](https://github.com/zeta-chain/node/pull/2313) - add `CheckAuthorization` function to replace the `IsAuthorized` function. The new function uses the authorization list to verify the signer's authorization.
* [2312](https://github.com/zeta-chain/node/pull/2312) - add queries `ShowAuthorization` and `ListAuthorizations`
* [2325](https://github.com/zeta-chain/node/pull/2325) - revert telemetry server changes

### Refactor

Expand Down
5 changes: 3 additions & 2 deletions zetaclient/chains/bitcoin/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,8 @@ func (ob *Observer) GetLastBlockHeight() int64 {

func (ob *Observer) SetLastBlockHeightScanned(height int64) {
atomic.StoreInt64(&ob.lastBlockScanned, height)
metrics.LastScannedBlockNumber.WithLabelValues(ob.chain.ChainName.String()).Set(float64(height))
// #nosec G701 checked as positive
ob.ts.SetLastScannedBlockNumber(ob.chain, uint64(height))
}

func (ob *Observer) GetLastBlockHeightScanned() int64 {
Expand Down Expand Up @@ -594,7 +595,7 @@ func (ob *Observer) FetchUTXOs() error {
}

ob.Mu.Lock()
metrics.NumberOfUTXO.Set(float64(len(utxosFiltered)))
ob.ts.SetNumberOfUTXOs(len(utxosFiltered))
ob.utxos = utxosFiltered
ob.Mu.Unlock()
return nil
Expand Down
2 changes: 1 addition & 1 deletion zetaclient/chains/evm/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ func (ob *Observer) CheckTxInclusion(tx *ethtypes.Transaction, receipt *ethtypes
// SetLastBlockHeightScanned set last block height scanned (not necessarily caught up with external block; could be slow/paused)
func (ob *Observer) SetLastBlockHeightScanned(height uint64) {
atomic.StoreUint64(&ob.lastBlockScanned, height)
metrics.LastScannedBlockNumber.WithLabelValues(ob.chain.ChainName.String()).Set(float64(height))
ob.ts.SetLastScannedBlockNumber(ob.chain, height)
}

// GetLastBlockHeightScanned get last block height scanned (not necessarily caught up with external block; could be slow/paused)
Expand Down
128 changes: 119 additions & 9 deletions zetaclient/metrics/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package metrics

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
Expand All @@ -11,23 +12,33 @@ import (
"github.com/gorilla/mux"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"

"github.com/zeta-chain/zetacore/pkg/chains"
"github.com/zeta-chain/zetacore/pkg/constant"
"github.com/zeta-chain/zetacore/zetaclient/types"
)

// TelemetryServer provide http endpoint for Tss server
// TelemetryServer provides http endpoint for Tss server
type TelemetryServer struct {
logger zerolog.Logger
s *http.Server
p2pid string
mu sync.Mutex
ipAddress string
HotKeyBurnRate *BurnRate
logger zerolog.Logger
s *http.Server
p2pid string
lastScannedBlockNumber map[int64]uint64 // chainID => block number
lastCoreBlockNumber int64
mu sync.Mutex
lastStartTimestamp time.Time
status types.Status
ipAddress string
HotKeyBurnRate *BurnRate
}

// NewTelemetryServer should only listen to the loopback
func NewTelemetryServer() *TelemetryServer {
hs := &TelemetryServer{
logger: log.With().Str("module", "http").Logger(),
HotKeyBurnRate: NewBurnRate(100),
logger: log.With().Str("module", "http").Logger(),
lastScannedBlockNumber: make(map[int64]uint64),
lastStartTimestamp: time.Now(),
HotKeyBurnRate: NewBurnRate(100),
}
s := &http.Server{
Addr: ":8123",
Expand Down Expand Up @@ -67,6 +78,51 @@ func (t *TelemetryServer) GetIPAddress() string {
return t.ipAddress
}

// GetLastStartTimestamp returns last start timestamp
func (t *TelemetryServer) GetLastStartTimestamp() time.Time {
t.mu.Lock()
defer t.mu.Unlock()
return t.lastStartTimestamp
}

// SetLastScannedBlockNumber last scanned block number for chain in telemetry and metrics
func (t *TelemetryServer) SetLastScannedBlockNumber(chain chains.Chain, blockNumber uint64) {
t.mu.Lock()
t.lastScannedBlockNumber[chain.ChainId] = blockNumber
LastScannedBlockNumber.WithLabelValues(chain.ChainName.String()).Set(float64(blockNumber))
t.mu.Unlock()
}

// GetLastScannedBlockNumber returns last scanned block number for chain
func (t *TelemetryServer) GetLastScannedBlockNumber(chainID int64) uint64 {
t.mu.Lock()
defer t.mu.Unlock()
return t.lastScannedBlockNumber[chainID]
}

// SetCoreBlockNumber sets core block number in telemetry and metrics
func (t *TelemetryServer) SetCoreBlockNumber(blockNumber int64) {
t.mu.Lock()
t.lastCoreBlockNumber = blockNumber
LastCoreBlockNumber.Set(float64(blockNumber))
t.mu.Unlock()
}

// GetCoreBlockNumber returns core block number
func (t *TelemetryServer) GetCoreBlockNumber() int64 {
t.mu.Lock()
defer t.mu.Unlock()
return t.lastCoreBlockNumber
}

// SetNumberOfUTXOs sets number of UTXOs in telemetry and metrics
func (t *TelemetryServer) SetNumberOfUTXOs(numberOfUTXOs int) {
t.mu.Lock()
t.status.BTCNumberOfUTXOs = numberOfUTXOs
NumberOfUTXO.Set(float64(numberOfUTXOs))
t.mu.Unlock()
}

// AddFeeEntry adds fee entry
func (t *TelemetryServer) AddFeeEntry(block int64, amount int64) {
t.mu.Lock()
Expand All @@ -82,6 +138,11 @@ func (t *TelemetryServer) Handlers() http.Handler {
router := mux.NewRouter()
router.Handle("/ping", http.HandlerFunc(t.pingHandler)).Methods(http.MethodGet)
router.Handle("/p2p", http.HandlerFunc(t.p2pHandler)).Methods(http.MethodGet)
router.Handle("/version", http.HandlerFunc(t.versionHandler)).Methods(http.MethodGet)
router.Handle("/lastscannedblock", http.HandlerFunc(t.lastScannedBlockHandler)).Methods(http.MethodGet)
router.Handle("/laststarttimestamp", http.HandlerFunc(t.lastStartTimestampHandler)).Methods(http.MethodGet)
router.Handle("/lastcoreblock", http.HandlerFunc(t.lastCoreBlockHandler)).Methods(http.MethodGet)
router.Handle("/status", http.HandlerFunc(t.statusHandler)).Methods(http.MethodGet)
router.Handle("/ip", http.HandlerFunc(t.ipHandler)).Methods(http.MethodGet)
router.Handle("/hotkeyburnrate", http.HandlerFunc(t.hotKeyFeeBurnRate)).Methods(http.MethodGet)

Expand All @@ -90,6 +151,7 @@ func (t *TelemetryServer) Handlers() http.Handler {
return router
}

// Start starts telemetry server
func (t *TelemetryServer) Start() error {
if t.s == nil {
return errors.New("invalid http server instance")
Expand All @@ -103,6 +165,7 @@ func (t *TelemetryServer) Start() error {
return nil
}

// Stop stops telemetry server
func (t *TelemetryServer) Stop() error {
c, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
Expand Down Expand Up @@ -131,6 +194,53 @@ func (t *TelemetryServer) ipHandler(w http.ResponseWriter, _ *http.Request) {
fmt.Fprintf(w, "%s", t.ipAddress)
}

func (t *TelemetryServer) lastScannedBlockHandler(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json")

t.mu.Lock()
defer t.mu.Unlock()
// Convert map to JSON
jsonBytes, err := json.Marshal(t.lastScannedBlockNumber)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
_, err = w.Write(jsonBytes)
if err != nil {
t.logger.Error().Err(err).Msg("Failed to write response")
}
}

func (t *TelemetryServer) lastCoreBlockHandler(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
t.mu.Lock()
defer t.mu.Unlock()
fmt.Fprintf(w, "%d", t.lastCoreBlockNumber)
}

func (t *TelemetryServer) statusHandler(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
t.mu.Lock()
defer t.mu.Unlock()
s, err := json.MarshalIndent(t.status, "", "\t")
if err != nil {
t.logger.Error().Err(err).Msg("Failed to marshal status")
}
fmt.Fprintf(w, "%s", s)
}

func (t *TelemetryServer) versionHandler(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "%s", constant.Version)
}

func (t *TelemetryServer) lastStartTimestampHandler(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
t.mu.Lock()
defer t.mu.Unlock()
fmt.Fprintf(w, "%s", t.lastStartTimestamp.Format(time.RFC3339))
}

func (t *TelemetryServer) hotKeyFeeBurnRate(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
t.mu.Lock()
Expand Down
2 changes: 1 addition & 1 deletion zetaclient/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func (oc *Orchestrator) StartCctxScheduler(appContext *context.AppContext) {

// update last processed block number
lastBlockNum = bn
metrics.LastCoreBlockNumber.Set(float64(lastBlockNum))
oc.ts.SetCoreBlockNumber(lastBlockNum)
}
}
}
Expand Down

0 comments on commit eed223c

Please sign in to comment.