diff --git a/README.md b/README.md index 28d72c7..d88a191 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,11 @@ You can use Torch to manage the nodes connections from a config file and Torch w Torch uses the Kubernetes API to manage the nodes, it gets their multi addresses information and stores them in a Redis instance, also, it provides some metrics to expose the node's IDs through the `/metrics` endpoint. +Torch automatically detects Load Balancer resources in a Kubernetes cluster and exposes metrics related to these Load Balancers. +The service uses OpenTelemetry to instrument the metrics and Prometheus to expose them. +It uses the Kubernetes API server with a watcher to receive events from it. Then filters the list to include only services of type **LoadBalancer**. +For each LoadBalancer service found, it retrieves the LoadBalancer public IP and name and generates metrics with custom labels. These metrics are then exposed via a Prometheus endpoint, making them available for monitoring and visualization in Grafana or other monitoring tools. + --- ## Workflow @@ -243,6 +248,66 @@ Torch uses [Redis](https://redis.io/) as a DB, so to use Torch, you need to have We are using Redis in two different ways: - Store the Nodes IDs and reuse them. -- As a message broker, where Torch uses Producer & Consumer approach to process data async. +- As a message broker, Torch uses the Producer & Consumer approach to process data async. + +--- + +## Metrics + +### Multi Address + +Custom metrics to expose the nodes multi-address: + +- `multiaddr`: This metric represents the nodes Multi Address: + - `service_name`: The service name. In this case, it is set to **torch**. + - `node_name`: The name of the node. + - `multiaddress`: Node Multi Address. + - `namespace`: The namespace in which the torch is deployed. + - `value`: The value of the metric. In this example, it is set to 1. + +### BlockHeight + +Custom metrics to expose the first block height of the chain: + +- `block_height_1`: Name of the metric to represent the first block height of the chain: + - `service_name`: The service name. In this case, it is set to **torch**. + - `block_height_1`: First block id generated + - `earliest_block_time`: Timestamp when the chain was created. + - `days_running`: Number of days that the chain is running. + - `namespace`: The namespace in which the torch is deployed. + - `value`: The value of the metric. In this example, it is set to 1. + +### Load Balancer + +Custom metrics to expose the LoadBalancer public IPs: + +- `load_balancer`: This metric represents the LoadBalancer resource and includes the following labels: + - `service_name`: The service name. In this case, it is set to **torch**. + - `load_balancer_name`: The name of the LoadBalancer service. + - `load_balancer_ip`: The IP address of the LoadBalancer. + - `namespace`: The namespace in which the LoadBalancer is deployed. + - `value`: The value of the metric. In this example, it is set to 1, but it can be customized to represent different load balancing states. + + +--- + +## Monitoring and Visualization + +Torch exposes some custom metrics through the Prometheus endpoint. +You can use Grafana to connect to Prometheus and create custom dashboards to visualize these metrics. + +To access the Prometheus and Grafana dashboards and view the metrics, follow these steps: + +1. Access the Prometheus dashboard: +- Open a web browser and navigate to the Prometheus server's URL (e.g., `http://prometheus-server:9090`). +- In the Prometheus web interface, you can explore and query the metrics collected by the Service Torch. + +2. Access the Grafana dashboard: +- Open a web browser and navigate to the Grafana server's URL (e.g., `http://grafana-server:3000`). +- Log in to Grafana using your credentials. +- Create a new dashboard or import an existing one to visualize the LoadBalancer metrics from Prometheus. +- Use the `load_balancer` metric and its labels to filter and display the relevant information. + +Customizing dashboards and setting up alerts in Grafana will help you monitor the performance and health of your LoadBalancer resources effectively. --- diff --git a/pkg/http/server.go b/pkg/http/server.go index 1c5682a..75d7811 100644 --- a/pkg/http/server.go +++ b/pkg/http/server.go @@ -19,6 +19,10 @@ import ( "github.com/celestiaorg/torch/pkg/nodes" ) +const ( + retryInterval = 10 * time.Second // retryInterval Retry interval in seconds to generate the consensus metric. +) + // GetHttpPort GetPort retrieves the namespace where the service will be deployed func GetHttpPort() string { port := os.Getenv("HTTP_PORT") @@ -57,13 +61,6 @@ func Run(cfg config.MutualPeersConfig) { return } - // generate the metric from the Genesis Hash data - notOk := GenerateHashMetrics(cfg, err) - if notOk { - log.Error("Error registering metric block_height_1") - return - } - // Create the server server := &http.Server{ Addr: ":" + httpPort, @@ -81,6 +78,10 @@ func Run(cfg config.MutualPeersConfig) { log.Info("Server Started...") log.Info("Listening on port: " + httpPort) + // check if Torch has to generate the metric or not. + BackgroundGenerateHashMetric(cfg) + BackgroundGenerateLBMetric() + // Initialize the goroutine to check the nodes in the queue. log.Info("Initializing queues to process the nodes...") // Create a new context without timeout as we want to keep this goroutine running forever, if we specify a timeout, @@ -127,23 +128,79 @@ func Run(cfg config.MutualPeersConfig) { log.Info("Server Exited Properly") } -func GenerateHashMetrics(cfg config.MutualPeersConfig, err error) bool { - // Get the genesisHash - // check if the config has the consensusNode field defined +// BackgroundGenerateHashMetric check if we have defined the consensus in the config, if so, it creates a goroutine +// to generate the metric +func BackgroundGenerateHashMetric(cfg config.MutualPeersConfig) { + // Check if the config has the consensusNode field defined to generate the metric from the Genesis Hash data. if cfg.MutualPeers[0].ConsensusNode != "" { - blockHash, earliestBlockTime := nodes.GenesisHash(cfg) - err = metrics.WithMetricsBlockHeight( - blockHash, - earliestBlockTime, - cfg.MutualPeers[0].ConsensusNode, - os.Getenv("POD_NAMESPACE"), - ) - if err != nil { - log.Errorf("Error registering metric block_height_1: %v", err) - return true + log.Info("Initializing goroutine to generate the metric: block_height_1") + // Initialise the goroutine to generate the metric in the background, only if we specify the node in the config. + go func() { + log.Info("Consensus node defined to get the first block") + for { + err := GenerateHashMetrics(cfg) + // check if err is nil, if so, that means that Torch was able to generate the metric. + if err == nil { + log.Info("Metric generated for the first block...") + // The metric was successfully generated, stop the retries. + break + } + + // Wait for the retry interval before the next execution + time.Sleep(retryInterval) + } + }() + } +} + +// BackgroundGenerateLBMetric initializes a goroutine to generate the load_balancer metric. +func BackgroundGenerateLBMetric() { + log.Info("Initializing goroutine to generate the metric: load_balancer ") + + // Retrieve the list of Load Balancers + _, err := k8s.RetrieveAndGenerateMetrics() + if err != nil { + log.Printf("Failed to update metrics: %v", err) + } + + // Start watching for changes to the services in a separate goroutine + done := make(chan error) + go k8s.WatchServices(done) + + // Handle errors from WatchServices + for { + select { + case err := <-done: + if err != nil { + log.Error("Error in WatchServices: ", err) + } } } - return false +} + +// GenerateHashMetrics generates the metric by getting the first block and calculating the days. +func GenerateHashMetrics(cfg config.MutualPeersConfig) error { + log.Info("Generating the metric for the first block generated...") + + // Get the genesisHash + blockHash, earliestBlockTime, err := nodes.GenesisHash(cfg.MutualPeers[0].ConsensusNode) + if err != nil { + return err + } + + // check if earliestBlockTime is not empty, otherwise torch skips this process for now. + err = metrics.WithMetricsBlockHeight( + blockHash, + earliestBlockTime, + cfg.MutualPeers[0].ConsensusNode, + os.Getenv("POD_NAMESPACE"), + ) + if err != nil { + log.Error("Error registering metric block_height_1: ", err) + return err + } + + return nil } // RegisterMetrics generates and registers the metrics for all nodes in case they already exist in the DB. diff --git a/pkg/k8s/services.go b/pkg/k8s/services.go new file mode 100644 index 0000000..e368e56 --- /dev/null +++ b/pkg/k8s/services.go @@ -0,0 +1,131 @@ +package k8s + +import ( + "context" + "fmt" + log "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + + "github.com/celestiaorg/torch/pkg/metrics" +) + +// RetrieveAndGenerateMetrics retrieves the list of Load Balancers and generates metrics +func RetrieveAndGenerateMetrics() ([]metrics.LoadBalancer, error) { + log.Info("Retrieving the list of Load Balancers") + + // Get list of LBs + svc, err := ListServices() + if err != nil { + log.Printf("Failed to retrieve the LoadBalancers: %v", err) + return nil, err + } + + // Get the list of the LBs + loadBalancers := GetLoadBalancers(svc) + + // Generate the metrics with the LBs + err = metrics.WithMetricsLoadBalancer(loadBalancers) + if err != nil { + log.Printf("Failed to update metrics: %v", err) + return nil, err + } + + return loadBalancers, nil +} + +// ListServices retrieves the list of services in a namespace +func ListServices() (*corev1.ServiceList, error) { + // Authentication in cluster - using Service Account, Role, RoleBinding + config, err := rest.InClusterConfig() + if err != nil { + log.Error("ERROR: ", err) + return nil, err + } + + // Create the Kubernetes clientSet + clientSet, err := kubernetes.NewForConfig(config) + if err != nil { + log.Error("ERROR: ", err) + return nil, err + } + + // Get all services in the namespace + services, err := clientSet.CoreV1().Services(GetCurrentNamespace()).List(context.Background(), metav1.ListOptions{}) + if err != nil { + log.Error("ERROR: ", err) + return nil, err + } + + return services, nil +} + +// GetLoadBalancers filters the list of services to include only Load Balancers and returns a list of them +func GetLoadBalancers(svc *corev1.ServiceList) []metrics.LoadBalancer { + var loadBalancers []metrics.LoadBalancer + + for _, svc := range svc.Items { + if svc.Spec.Type == corev1.ServiceTypeLoadBalancer { + for _, ingress := range svc.Status.LoadBalancer.Ingress { + log.Info(fmt.Sprintf("Updating metrics for service: [%s] with IP: [%s]", svc.Name, ingress.IP)) + + // Create a LoadBalancer struct and append it to the loadBalancers list + loadBalancer := metrics.LoadBalancer{ + ServiceName: "torch", + LoadBalancerName: svc.Name, + LoadBalancerIP: ingress.IP, + Namespace: svc.Namespace, + Value: 1, // Set the value of the metric here (e.g., 1) + } + loadBalancers = append(loadBalancers, loadBalancer) + } + } + } + + return loadBalancers +} + +// WatchServices watches for changes to the services in the specified namespace and updates the metrics accordingly +func WatchServices(done chan<- error) { + defer close(done) + + // Authentication in cluster - using Service Account, Role, RoleBinding + config, err := rest.InClusterConfig() + if err != nil { + log.Error("Failed to get in-cluster config: ", err) + done <- err + return + } + + // Create the Kubernetes clientSet + clientSet, err := kubernetes.NewForConfig(config) + if err != nil { + log.Error("Failed to create Kubernetes clientSet: ", err) + done <- err + return + } + + // Create a service watcher + watcher, err := clientSet.CoreV1().Services(GetCurrentNamespace()).Watch(context.Background(), metav1.ListOptions{}) + if err != nil { + log.Error("Failed to create service watcher: ", err) + done <- err + return + } + + // Watch for events on the watcher channel + for event := range watcher.ResultChan() { + if service, ok := event.Object.(*corev1.Service); ok { + if service.Spec.Type == corev1.ServiceTypeLoadBalancer { + loadBalancers := GetLoadBalancers(&corev1.ServiceList{Items: []corev1.Service{*service}}) + if err := metrics.WithMetricsLoadBalancer(loadBalancers); err != nil { + log.Error("Failed to update metrics with load balancers: ", err) + done <- err + return + } + } + } + } +} diff --git a/pkg/k8s/statefulsets.go b/pkg/k8s/statefulsets.go index b556847..9defb87 100644 --- a/pkg/k8s/statefulsets.go +++ b/pkg/k8s/statefulsets.go @@ -13,7 +13,10 @@ import ( "github.com/celestiaorg/torch/pkg/db/redis" ) -const queueK8SNodes = "k8s" +const ( + queueK8SNodes = "k8s" // queueK8SNodes name of the queue. + daNodePrefix = "da" // daNodePrefix name prefix that Torch will use to filter the StatefulSets. +) // WatchStatefulSets watches for changes to the StatefulSets in the specified namespace and updates the metrics accordingly func WatchStatefulSets() error { @@ -42,11 +45,11 @@ func WatchStatefulSets() error { // Watch for events on the watcher channel for event := range watcher.ResultChan() { + // Check if the event object is of type *v1.StatefulSet if statefulSet, ok := event.Object.(*v1.StatefulSet); ok { - //log.Info("StatefulSet containers: ", statefulSet.Spec.Template.Spec.Containers) - - // check if the node is DA, if so, send it to the queue to generate the multi address - if strings.HasPrefix(statefulSet.Name, "da") { + // Check if the StatefulSet is valid based on the conditions + if isStatefulSetValid(statefulSet) { + // Perform necessary actions, such as adding the node to the Redis queue err := redis.Producer(statefulSet.Name, queueK8SNodes) if err != nil { log.Error("ERROR adding the node to the queue: ", err) @@ -58,3 +61,11 @@ func WatchStatefulSets() error { return nil } + +// isStatefulSetValid validates the StatefulSet received. +// checks if the StatefulSet name contains the daNodePrefix, and if the StatefulSet is in the "Running" state. +func isStatefulSetValid(statefulSet *v1.StatefulSet) bool { + return strings.HasPrefix(statefulSet.Name, daNodePrefix) && + statefulSet.Status.CurrentReplicas > 0 && + statefulSet.Status.Replicas == statefulSet.Status.ReadyReplicas +} diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index ba712ce..c9b2e3d 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -77,6 +77,14 @@ func WithMetricsBlockHeight(blockHeight, earliestBlockTime, serviceName, namespa log.Fatalf(err.Error()) return err } + + // Calculate the days that the chain is live. + daysRunning, err := calculateDaysDifference(earliestBlockTime) + if err != nil { + log.Error("ERROR: ", err) + return err + } + callback := func(ctx context.Context, observer metric.Observer) error { // Define the callback function that will be called periodically to observe metrics. // Create labels with attributes for each block_height_1. @@ -84,7 +92,7 @@ func WithMetricsBlockHeight(blockHeight, earliestBlockTime, serviceName, namespa attribute.String("service_name", serviceName), attribute.String("block_height_1", blockHeight), attribute.String("earliest_block_time", earliestBlockTime), - attribute.Int("days_running", CalculateDaysDifference(earliestBlockTime)), + attribute.Int("days_running", daysRunning), attribute.String("namespace", namespace), ) // Observe the float64 value for the current block_height_1 with the associated labels. @@ -98,18 +106,61 @@ func WithMetricsBlockHeight(blockHeight, earliestBlockTime, serviceName, namespa return err } -// CalculateDaysDifference based on the date received, returns the number of days since this day. -func CalculateDaysDifference(inputTimeString string) int { +// calculateDaysDifference based on the date received, returns the number of days since this day. +func calculateDaysDifference(inputTimeString string) (int, error) { layout := "2006-01-02T15:04:05.999999999Z" inputTime, err := time.Parse(layout, inputTimeString) if err != nil { log.Error("Error parsing time: [", inputTimeString, "]", err) - return -1 + return -1, err } currentTime := time.Now() timeDifference := currentTime.Sub(inputTime) daysDifference := int(timeDifference.Hours() / 24) - return daysDifference + return daysDifference, nil +} + +// LoadBalancer represents the information for a load balancer. +type LoadBalancer struct { + ServiceName string // ServiceName Name of the service associated with the load balancer. + LoadBalancerName string // LoadBalancerName Name of the load balancer. + LoadBalancerIP string // LoadBalancerIP IP address of the load balancer. + Namespace string // Namespace where the service is deployed. + Value float64 // Value to be observed for the load balancer. +} + +// WithMetricsLoadBalancer creates a callback function to observe metrics for multiple load balancers. +func WithMetricsLoadBalancer(loadBalancers []LoadBalancer) error { + // Create a Float64ObservableGauge named "load_balancer" with a description for the metric. + loadBalancersGauge, err := meter.Float64ObservableGauge( + "load_balancer", + metric.WithDescription("Torch - Load Balancers"), + ) + if err != nil { + log.Fatalf(err.Error()) + return err + } + + // Define the callback function that will be called periodically to observe metrics. + callback := func(ctx context.Context, observer metric.Observer) error { + for _, lb := range loadBalancers { + // Create labels with attributes for each load balancer. + labels := metric.WithAttributes( + attribute.String("service_name", lb.ServiceName), + attribute.String("load_balancer_name", lb.LoadBalancerName), + attribute.String("load_balancer_ip", lb.LoadBalancerIP), + attribute.String("namespace", lb.Namespace), + ) + // Observe the float64 value for the current load balancer with the associated labels. + observer.ObserveFloat64(loadBalancersGauge, lb.Value, labels) + } + + return nil + } + + // Register the callback with the meter and the Float64ObservableGauge. + _, err = meter.RegisterCallback(callback, loadBalancersGauge) + return err } diff --git a/pkg/nodes/consensus.go b/pkg/nodes/consensus.go index 03d3a18..a265be2 100644 --- a/pkg/nodes/consensus.go +++ b/pkg/nodes/consensus.go @@ -15,7 +15,7 @@ import ( var ( consContainerSetupName = "consensus-setup" // consContainerSetupName initContainer that we use to configure the nodes. consContainerName = "consensus" // consContainerName container name which the pod runs. - namespace = k8s.GetCurrentNamespace() // ns namespace of the node. + namespace = k8s.GetCurrentNamespace() // namespace of the node. ) // SetConsNodeDefault sets all the default values in case they are empty @@ -34,26 +34,25 @@ func SetConsNodeDefault(peer config.Peer) config.Peer { // GenesisHash connects to the node specified in: config.MutualPeersConfig.ConsensusNode // makes a request to the API and gets the info about the genesis and return it -func GenesisHash(pods config.MutualPeersConfig) (string, string) { - consensusNode := pods.MutualPeers[0].ConsensusNode +func GenesisHash(consensusNode string) (string, string, error) { url := fmt.Sprintf("http://%s:26657/block?height=1", consensusNode) response, err := http.Get(url) if err != nil { - log.Error("Error making GET request:", err) - return "", "" + log.Error("Error making the request to the node [", consensusNode, "] - ", err) + return "", "", err } defer response.Body.Close() if response.StatusCode != http.StatusOK { log.Error("Non-OK response:", response.Status) - return "", "" + return "", "", err } bodyBytes, err := ioutil.ReadAll(response.Body) if err != nil { log.Error("Error reading response body:", err) - return "", "" + return "", "", err } bodyString := string(bodyBytes) @@ -64,26 +63,26 @@ func GenesisHash(pods config.MutualPeersConfig) (string, string) { err = json.Unmarshal([]byte(bodyString), &jsonResponse) if err != nil { log.Error("Error parsing JSON:", err) - return "", "" + return "", "", err } // Access and print the .block_id.hash field blockIDHash, ok := jsonResponse["result"].(map[string]interface{})["block_id"].(map[string]interface{})["hash"].(string) if !ok { log.Error("Unable to access .block_id.hash") - return "", "" + return "", "", err } // Access and print the .block.header.time field blockTime, ok := jsonResponse["result"].(map[string]interface{})["block"].(map[string]interface{})["header"].(map[string]interface{})["time"].(string) if !ok { log.Error("Unable to access .block.header.time") - return "", "" + return "", "", err } log.Info("Block ID Hash: ", blockIDHash) log.Info("Block Time: ", blockTime) log.Info("Full output: ", bodyString) - return blockIDHash, blockTime + return blockIDHash, blockTime, nil }