Skip to content

Commit

Permalink
feat(torch): merge watchdog - update docs
Browse files Browse the repository at this point in the history
Signed-off-by: Jose Ramon Mañes <[email protected]>
  • Loading branch information
tty47 committed Nov 14, 2023
1 parent 6ae5bd1 commit 70ea48d
Show file tree
Hide file tree
Showing 6 changed files with 357 additions and 43 deletions.
67 changes: 66 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ You can use Torch to manage the nodes connections from a config file and Torch w

Torch uses the Kubernetes API to manage the nodes, it gets their multi addresses information and stores them in a Redis instance, also, it provides some metrics to expose the node's IDs through the `/metrics` endpoint.

Torch automatically detects Load Balancer resources in a Kubernetes cluster and exposes metrics related to these Load Balancers.
The service uses OpenTelemetry to instrument the metrics and Prometheus to expose them.
It uses the Kubernetes API server with a watcher to receive events from it. Then filters the list to include only services of type **LoadBalancer**.
For each LoadBalancer service found, it retrieves the LoadBalancer public IP and name and generates metrics with custom labels. These metrics are then exposed via a Prometheus endpoint, making them available for monitoring and visualization in Grafana or other monitoring tools.

---

## Workflow
Expand Down Expand Up @@ -243,6 +248,66 @@ Torch uses [Redis](https://redis.io/) as a DB, so to use Torch, you need to have

We are using Redis in two different ways:
- Store the Nodes IDs and reuse them.
- As a message broker, where Torch uses Producer & Consumer approach to process data async.
- As a message broker, Torch uses the Producer & Consumer approach to process data async.

---

## Metrics

### Multi Address

Custom metrics to expose the nodes multi-address:

- `multiaddr`: This metric represents the nodes Multi Address:
- `service_name`: The service name. In this case, it is set to **torch**.
- `node_name`: The name of the node.
- `multiaddress`: Node Multi Address.
- `namespace`: The namespace in which the torch is deployed.
- `value`: The value of the metric. In this example, it is set to 1.

### BlockHeight

Custom metrics to expose the first block height of the chain:

- `block_height_1`: Name of the metric to represent the first block height of the chain:
- `service_name`: The service name. In this case, it is set to **torch**.
- `block_height_1`: First block id generated
- `earliest_block_time`: Timestamp when the chain was created.
- `days_running`: Number of days that the chain is running.
- `namespace`: The namespace in which the torch is deployed.
- `value`: The value of the metric. In this example, it is set to 1.

### Load Balancer

Custom metrics to expose the LoadBalancer public IPs:

- `load_balancer`: This metric represents the LoadBalancer resource and includes the following labels:
- `service_name`: The service name. In this case, it is set to **torch**.
- `load_balancer_name`: The name of the LoadBalancer service.
- `load_balancer_ip`: The IP address of the LoadBalancer.
- `namespace`: The namespace in which the LoadBalancer is deployed.
- `value`: The value of the metric. In this example, it is set to 1, but it can be customized to represent different load balancing states.


---

## Monitoring and Visualization

Torch exposes some custom metrics through the Prometheus endpoint.
You can use Grafana to connect to Prometheus and create custom dashboards to visualize these metrics.

To access the Prometheus and Grafana dashboards and view the metrics, follow these steps:

1. Access the Prometheus dashboard:
- Open a web browser and navigate to the Prometheus server's URL (e.g., `http://prometheus-server:9090`).
- In the Prometheus web interface, you can explore and query the metrics collected by the Service Torch.

2. Access the Grafana dashboard:
- Open a web browser and navigate to the Grafana server's URL (e.g., `http://grafana-server:3000`).
- Log in to Grafana using your credentials.
- Create a new dashboard or import an existing one to visualize the LoadBalancer metrics from Prometheus.
- Use the `load_balancer` metric and its labels to filter and display the relevant information.

Customizing dashboards and setting up alerts in Grafana will help you monitor the performance and health of your LoadBalancer resources effectively.

---
99 changes: 78 additions & 21 deletions pkg/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ import (
"github.com/celestiaorg/torch/pkg/nodes"
)

const (
retryInterval = 10 * time.Second // retryInterval Retry interval in seconds to generate the consensus metric.
)

// GetHttpPort GetPort retrieves the namespace where the service will be deployed
func GetHttpPort() string {
port := os.Getenv("HTTP_PORT")
Expand Down Expand Up @@ -57,13 +61,6 @@ func Run(cfg config.MutualPeersConfig) {
return
}

// generate the metric from the Genesis Hash data
notOk := GenerateHashMetrics(cfg, err)
if notOk {
log.Error("Error registering metric block_height_1")
return
}

// Create the server
server := &http.Server{
Addr: ":" + httpPort,
Expand All @@ -81,6 +78,10 @@ func Run(cfg config.MutualPeersConfig) {
log.Info("Server Started...")
log.Info("Listening on port: " + httpPort)

// check if Torch has to generate the metric or not.
BackgroundGenerateHashMetric(cfg)
BackgroundGenerateLBMetric()

// Initialize the goroutine to check the nodes in the queue.
log.Info("Initializing queues to process the nodes...")
// Create a new context without timeout as we want to keep this goroutine running forever, if we specify a timeout,
Expand Down Expand Up @@ -127,23 +128,79 @@ func Run(cfg config.MutualPeersConfig) {
log.Info("Server Exited Properly")
}

func GenerateHashMetrics(cfg config.MutualPeersConfig, err error) bool {
// Get the genesisHash
// check if the config has the consensusNode field defined
// BackgroundGenerateHashMetric check if we have defined the consensus in the config, if so, it creates a goroutine
// to generate the metric
func BackgroundGenerateHashMetric(cfg config.MutualPeersConfig) {
// Check if the config has the consensusNode field defined to generate the metric from the Genesis Hash data.
if cfg.MutualPeers[0].ConsensusNode != "" {
blockHash, earliestBlockTime := nodes.GenesisHash(cfg)
err = metrics.WithMetricsBlockHeight(
blockHash,
earliestBlockTime,
cfg.MutualPeers[0].ConsensusNode,
os.Getenv("POD_NAMESPACE"),
)
if err != nil {
log.Errorf("Error registering metric block_height_1: %v", err)
return true
log.Info("Initializing goroutine to generate the metric: block_height_1")
// Initialise the goroutine to generate the metric in the background, only if we specify the node in the config.
go func() {
log.Info("Consensus node defined to get the first block")
for {
err := GenerateHashMetrics(cfg)
// check if err is nil, if so, that means that Torch was able to generate the metric.
if err == nil {
log.Info("Metric generated for the first block...")
// The metric was successfully generated, stop the retries.
break
}

// Wait for the retry interval before the next execution
time.Sleep(retryInterval)
}
}()
}
}

// BackgroundGenerateLBMetric initializes a goroutine to generate the load_balancer metric.
func BackgroundGenerateLBMetric() {
log.Info("Initializing goroutine to generate the metric: load_balancer ")

// Retrieve the list of Load Balancers
_, err := k8s.RetrieveAndGenerateMetrics()
if err != nil {
log.Printf("Failed to update metrics: %v", err)
}

// Start watching for changes to the services in a separate goroutine
done := make(chan error)
go k8s.WatchServices(done)

// Handle errors from WatchServices
for {
select {
case err := <-done:
if err != nil {
log.Error("Error in WatchServices: ", err)
}
}
}
return false
}

// GenerateHashMetrics generates the metric by getting the first block and calculating the days.
func GenerateHashMetrics(cfg config.MutualPeersConfig) error {
log.Info("Generating the metric for the first block generated...")

// Get the genesisHash
blockHash, earliestBlockTime, err := nodes.GenesisHash(cfg.MutualPeers[0].ConsensusNode)
if err != nil {
return err
}

// check if earliestBlockTime is not empty, otherwise torch skips this process for now.
err = metrics.WithMetricsBlockHeight(
blockHash,
earliestBlockTime,
cfg.MutualPeers[0].ConsensusNode,
os.Getenv("POD_NAMESPACE"),
)
if err != nil {
log.Error("Error registering metric block_height_1: ", err)
return err
}

return nil
}

// RegisterMetrics generates and registers the metrics for all nodes in case they already exist in the DB.
Expand Down
131 changes: 131 additions & 0 deletions pkg/k8s/services.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package k8s

import (
"context"
"fmt"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

"github.com/celestiaorg/torch/pkg/metrics"
)

// RetrieveAndGenerateMetrics retrieves the list of Load Balancers and generates metrics
func RetrieveAndGenerateMetrics() ([]metrics.LoadBalancer, error) {
log.Info("Retrieving the list of Load Balancers")

// Get list of LBs
svc, err := ListServices()
if err != nil {
log.Printf("Failed to retrieve the LoadBalancers: %v", err)
return nil, err
}

// Get the list of the LBs
loadBalancers := GetLoadBalancers(svc)

// Generate the metrics with the LBs
err = metrics.WithMetricsLoadBalancer(loadBalancers)
if err != nil {
log.Printf("Failed to update metrics: %v", err)
return nil, err
}

return loadBalancers, nil
}

// ListServices retrieves the list of services in a namespace
func ListServices() (*corev1.ServiceList, error) {
// Authentication in cluster - using Service Account, Role, RoleBinding
config, err := rest.InClusterConfig()
if err != nil {
log.Error("ERROR: ", err)
return nil, err
}

// Create the Kubernetes clientSet
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
log.Error("ERROR: ", err)
return nil, err
}

// Get all services in the namespace
services, err := clientSet.CoreV1().Services(GetCurrentNamespace()).List(context.Background(), metav1.ListOptions{})
if err != nil {
log.Error("ERROR: ", err)
return nil, err
}

return services, nil
}

// GetLoadBalancers filters the list of services to include only Load Balancers and returns a list of them
func GetLoadBalancers(svc *corev1.ServiceList) []metrics.LoadBalancer {
var loadBalancers []metrics.LoadBalancer

for _, svc := range svc.Items {
if svc.Spec.Type == corev1.ServiceTypeLoadBalancer {
for _, ingress := range svc.Status.LoadBalancer.Ingress {
log.Info(fmt.Sprintf("Updating metrics for service: [%s] with IP: [%s]", svc.Name, ingress.IP))

// Create a LoadBalancer struct and append it to the loadBalancers list
loadBalancer := metrics.LoadBalancer{
ServiceName: "torch",
LoadBalancerName: svc.Name,
LoadBalancerIP: ingress.IP,
Namespace: svc.Namespace,
Value: 1, // Set the value of the metric here (e.g., 1)
}
loadBalancers = append(loadBalancers, loadBalancer)
}
}
}

return loadBalancers
}

// WatchServices watches for changes to the services in the specified namespace and updates the metrics accordingly
func WatchServices(done chan<- error) {
defer close(done)

// Authentication in cluster - using Service Account, Role, RoleBinding
config, err := rest.InClusterConfig()
if err != nil {
log.Error("Failed to get in-cluster config: ", err)
done <- err
return
}

// Create the Kubernetes clientSet
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
log.Error("Failed to create Kubernetes clientSet: ", err)
done <- err
return
}

// Create a service watcher
watcher, err := clientSet.CoreV1().Services(GetCurrentNamespace()).Watch(context.Background(), metav1.ListOptions{})
if err != nil {
log.Error("Failed to create service watcher: ", err)
done <- err
return
}

// Watch for events on the watcher channel
for event := range watcher.ResultChan() {
if service, ok := event.Object.(*corev1.Service); ok {
if service.Spec.Type == corev1.ServiceTypeLoadBalancer {
loadBalancers := GetLoadBalancers(&corev1.ServiceList{Items: []corev1.Service{*service}})
if err := metrics.WithMetricsLoadBalancer(loadBalancers); err != nil {
log.Error("Failed to update metrics with load balancers: ", err)
done <- err
return
}
}
}
}
}
21 changes: 16 additions & 5 deletions pkg/k8s/statefulsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ import (
"github.com/celestiaorg/torch/pkg/db/redis"
)

const queueK8SNodes = "k8s"
const (
queueK8SNodes = "k8s" // queueK8SNodes name of the queue.
daNodePrefix = "da" // daNodePrefix name prefix that Torch will use to filter the StatefulSets.
)

// WatchStatefulSets watches for changes to the StatefulSets in the specified namespace and updates the metrics accordingly
func WatchStatefulSets() error {
Expand Down Expand Up @@ -42,11 +45,11 @@ func WatchStatefulSets() error {

// Watch for events on the watcher channel
for event := range watcher.ResultChan() {
// Check if the event object is of type *v1.StatefulSet
if statefulSet, ok := event.Object.(*v1.StatefulSet); ok {
//log.Info("StatefulSet containers: ", statefulSet.Spec.Template.Spec.Containers)

// check if the node is DA, if so, send it to the queue to generate the multi address
if strings.HasPrefix(statefulSet.Name, "da") {
// Check if the StatefulSet is valid based on the conditions
if isStatefulSetValid(statefulSet) {
// Perform necessary actions, such as adding the node to the Redis queue
err := redis.Producer(statefulSet.Name, queueK8SNodes)
if err != nil {
log.Error("ERROR adding the node to the queue: ", err)
Expand All @@ -58,3 +61,11 @@ func WatchStatefulSets() error {

return nil
}

// isStatefulSetValid validates the StatefulSet received.
// checks if the StatefulSet name contains the daNodePrefix, and if the StatefulSet is in the "Running" state.
func isStatefulSetValid(statefulSet *v1.StatefulSet) bool {
return strings.HasPrefix(statefulSet.Name, daNodePrefix) &&
statefulSet.Status.CurrentReplicas > 0 &&
statefulSet.Status.Replicas == statefulSet.Status.ReadyReplicas
}
Loading

0 comments on commit 70ea48d

Please sign in to comment.