Skip to content

Commit

Permalink
feat(torch): add func to generate the metrics if we already have them…
Browse files Browse the repository at this point in the history
… in the db

Signed-off-by: Jose Ramon Mañes <[email protected]>
  • Loading branch information
tty47 committed Oct 25, 2023
1 parent 252b8f1 commit 493e1c5
Showing 1 changed file with 50 additions and 23 deletions.
73 changes: 50 additions & 23 deletions pkg/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"time"

"github.com/celestiaorg/torch/config"
"github.com/celestiaorg/torch/pkg/db/redis"
"github.com/celestiaorg/torch/pkg/k8s"
"github.com/celestiaorg/torch/pkg/metrics"
"github.com/celestiaorg/torch/pkg/nodes"

Expand Down Expand Up @@ -79,9 +81,21 @@ func Run(cfg config.MutualPeersConfig) {
log.Info("Server Started...")
log.Info("Listening on port: " + httpPort)

// Initialize the goroutine to check the nodes in the queue.
log.Info("Initializing queues to process the nodes...")
go nodes.ProcessTaskQueue()

// Initialize the goroutine to add a watcher to the StatefulSets in the namespace.
log.Info("Initializing goroutine to watch over the StatefulSets...")
go k8s.WatchStatefulSets()

// Check if we already have some multi addresses in the DB and expose them, there might be a situation where Torch
// get restarted, and we already have the nodes IDs, so we can expose them.
err = RegisterMetrics(cfg)
if err != nil {
log.Error("Couldn't generate the metrics...", err)
}

<-done
log.Info("Server Stopped")

Expand Down Expand Up @@ -115,26 +129,39 @@ func GenerateHashMetrics(cfg config.MutualPeersConfig, err error) bool {
return false
}

//
//// RegisterMetrics generates and registers the metrics for all nodes in the configuration.
//func RegisterMetrics(cfg config.MutualPeersConfig) error {
// log.Info("Generating initial metrics for all the nodes...")
//
// var nodeNames []string
//
// // Adding nodes from config to register the initial metrics
// for _, n := range cfg.MutualPeers {
// for _, no := range n.Peers {
// nodeNames = append(nodeNames, no.NodeName)
// }
// }
//
// // Generate the metrics for all nodes
// _, err := nodes.GenerateAllTrustedPeersAddr(cfg, nodeNames)
// if err != nil {
// log.Errorf("Error GenerateAllTrustedPeersAddr: %v", err)
// return err
// }
//
// return nil
//}
// RegisterMetrics generates and registers the metrics for all nodes in case they already exist in the DB.
func RegisterMetrics(cfg config.MutualPeersConfig) error {
red := redis.InitRedisConfig()
ctx := context.TODO()

log.Info("Generating metrics from existing nodes...")

// Adding nodes from config to register the initial metrics
for _, n := range cfg.MutualPeers {
for _, no := range n.Peers {
// checking the node in the DB first
ma, err := redis.CheckIfNodeExistsInDB(red, ctx, no.NodeName)
if err != nil {
log.Error("Error CheckIfNodeExistsInDB : [", no.NodeName, "]", err)
return err
}

// check if the multi address is not empty
if ma != "" {
log.Info("Node: [", no.NodeName, "], found in the DB generating metric: ", " [", ma, "]")

// Register a multi-address metric
m := metrics.MultiAddrs{
ServiceName: "torch",
NodeName: no.NodeName,
MultiAddr: ma,
Namespace: k8s.GetCurrentNamespace(),
Value: 1,
}
k8s.RegisterMetric(m)
}
}
}

return nil
}

0 comments on commit 493e1c5

Please sign in to comment.