From 59785b55cf480c2e3845459ed21490a783694c7a Mon Sep 17 00:00:00 2001 From: dfradehubs Date: Wed, 18 Sep 2024 09:46:17 +0200 Subject: [PATCH] feat(Clean-up): Cleaned up code, documented and added logs --- .gitignore | 1 + README.md | 8 ++-- cmd/main.go | 15 ++++--- examples/docker-compose.yml | 2 +- internal/elasticsearch/elasticsearch.go | 52 +++++++++++++++++++------ internal/google/mig.go | 24 +++++++++--- internal/prometheus/prometheus.go | 17 ++++++-- 7 files changed, 90 insertions(+), 29 deletions(-) diff --git a/.gitignore b/.gitignore index 6a41cb1..e7a79d3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ examples/credentials.json +examples/.env bin/ \ No newline at end of file diff --git a/README.md b/README.md index 377ba73..fdf40f2 100644 --- a/README.md +++ b/README.md @@ -21,8 +21,9 @@ This project is designed to automate the scaling of Google Cloud Managed Instanc 1. **Environment Variables**: You can configure the following environment variables: * PROMETHEUS_URL: Prometheus to query about metrics for scaling (Default `http://localhost:9200`) - * PROMETHEUS_UP_CONDITION: Prometheus query that must met to scale up the nodegroup (`Required`) - * PROMETHEUS_DOWN_CONDITION: Prometheus query that must met to scale down the nodegroup (`Required`) + * PROMETHEUS_UP_CONDITION: Prometheus query that must met to scale up the nodegroup. Program just check if the condition is true or false, do not check values (`Required`) + * PROMETHEUS_DOWN_CONDITION: Prometheus query that must met to scale down the nodegroup. Program just check if the condition is true or false, do not check values (`Required`) + * PROMETHEUS_HEADER_*: Prometheus http headers for queries. For example, PROMETHEUS_HEADER_X_Scope_OrgID environment variable adds a HTTP header called X-Scope-OrgID to the http request (`Optional`) * GCP_PROJECT_ID: Google Cloud project id (Default `example`) * ZONE: Google Cloud project zone (Default `europe-west1-d`) * MIG_NAME: Google Cloud MIG to scale (Default `example`) @@ -31,8 +32,9 @@ This project is designed to automate the scaling of Google Cloud Managed Instanc * ELASTIC_URL: Elasticsearch URL to drain nodes (Default `http://elasticsearch:9200`) * ELASTIC_USER: Elasticsearch user for authentication (Default `elastic`) * ELASTIC_PASSWORD: Elasticsearch password for authentication (Default `password`) + * ELASTIC_SSL_INSECURE_SKIP_VERIFY: Elasticsearch SSL certificate skip validation (Default `false`) * COOLDOWN_PERIOD_SEC: Cooldown seconds to wait between scale checks (Default `60`) - * RETRY_INTERVAL_SEC: Retry timeout when an error is reached during the loop (Default `15`) + * RETRY_INTERVAL_SEC: Retry timeout when an error is reached during the loop (Default `60`) * DEBUG_MODE: Does not execute scalations, just log and send slack messages (Default `false`) * MIN_SIZE: Minimum size for the nodegroup (Default `1`) * MAX_SIZE: Maximum size for the nodegroup (Default `1`) diff --git a/cmd/main.go b/cmd/main.go index 23fadeb..025f007 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -5,6 +5,7 @@ import ( "elasticsearch-vm-autoscaler/internal/google" "elasticsearch-vm-autoscaler/internal/prometheus" "elasticsearch-vm-autoscaler/internal/slack" + "log" "os" "strconv" @@ -13,6 +14,7 @@ import ( ) func main() { + // Prometheus variables to hold configuration for scaling conditions prometheusURL := globals.GetEnv("PROMETHEUS_URL", "http://localhost:9090") // Conditions for scaling up or down the MIG @@ -40,7 +42,7 @@ func main() { // Cooldown and retry intervals in seconds, parsed from environment variables cooldownPeriodSeconds, _ := strconv.ParseInt(globals.GetEnv("COOLDOWN_PERIOD_SEC", "60"), 10, 64) - retryIntervalSeconds, _ := strconv.ParseInt(globals.GetEnv("RETRY_INTERVAL_SEC", "15"), 10, 64) + retryIntervalSeconds, _ := strconv.ParseInt(globals.GetEnv("RETRY_INTERVAL_SEC", "60"), 10, 64) // Debug mode flag, enabled if "DEBUG_MODE" is set to "true" debugModeStr := globals.GetEnv("DEBUG_MODE", "false") @@ -53,19 +55,20 @@ func main() { // Main loop to monitor scaling conditions and manage the MIG for { - // Check if the MIG is at its minimum size + // Check if the MIG is at its minimum size at least. If not, scale it up to minSize err := google.CheckMIGMinimumSize(projectID, zone, migName, debugMode) if err != nil { log.Printf("Error checking minimum size for MIG nodes: %v", err) } - // Fetch the up and down conditions from Prometheus + // Fetch the scale up and down conditions from Prometheus upCondition, err := prometheus.GetPrometheusCondition(prometheusURL, prometheusUpCondition) if err != nil { log.Printf("Error querying Prometheus: %v", err) time.Sleep(time.Duration(retryIntervalSeconds) * time.Second) continue } + downCondition, err := prometheus.GetPrometheusCondition(prometheusURL, prometheusDownCondition) if err != nil { log.Printf("Error querying Prometheus: %v", err) @@ -75,7 +78,7 @@ func main() { // If the up condition is met, add a node to the MIG if upCondition { - log.Printf("Condition %s met: Creating new node!", prometheusUpCondition) + log.Printf("Up condition %s met: Trying to create a new node!", prometheusUpCondition) err = google.AddNodeToMIG(projectID, zone, migName, debugMode) if err != nil { log.Printf("Error adding node to MIG: %v", err) @@ -84,10 +87,10 @@ func main() { } // Notify via Slack that a node has been added if slackWebhookURL != "" { - slack.NotifySlack("Added node to MIG", slackWebhookURL) + slack.NotifySlack("New node created succesfully in MIG", slackWebhookURL) } } else if downCondition { // If the down condition is met, remove a node from the MIG - log.Printf("Condition %s met. Removing one node!", prometheusDownCondition) + log.Printf("Down condition %s met. Trying to remove one node!", prometheusDownCondition) err = google.RemoveNodeFromMIG(projectID, zone, migName, elasticURL, elasticUser, elasticPassword, debugMode) if err != nil { log.Printf("Error draining node from MIG: %v", err) diff --git a/examples/docker-compose.yml b/examples/docker-compose.yml index a669239..e9c1778 100644 --- a/examples/docker-compose.yml +++ b/examples/docker-compose.yml @@ -43,4 +43,4 @@ services: - GOOGLE_APPLICATION_CREDENTIALS=/tmp/credentials.json - MIN_SIZE=1 - MAX_SIZE=2 - #- X-SCOPE-ORGID_HEADER=test + - PROMETHEUS_HEADER_X_Scope_OrgID=test diff --git a/internal/elasticsearch/elasticsearch.go b/internal/elasticsearch/elasticsearch.go index c42c045..dc906aa 100644 --- a/internal/elasticsearch/elasticsearch.go +++ b/internal/elasticsearch/elasticsearch.go @@ -3,13 +3,13 @@ package elasticsearch import ( "bytes" "crypto/tls" + "elasticsearch-vm-autoscaler/internal/globals" "encoding/json" "fmt" "io" "log" "net/http" "regexp" - "time" "github.com/elastic/go-elasticsearch/v8" ) @@ -48,11 +48,19 @@ type ShardInfo struct { // password: The password for basic authentication. func DrainElasticsearchNode(elasticURL, nodeName, username, password string) error { - // Configurar http.Transport para desactivar la verificación del certificado - tr := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + // Check ELASTIC_SSL_INSECURE_SKIP_VERIFY environment variable to skip SSL certificate validation + // for elasticsearch + insecureSkipVerify := globals.GetEnv("ELASTIC_SSL_INSECURE_SKIP_VERIFY", "false") + var tr http.RoundTripper + if insecureSkipVerify == "true" { + tr = &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + } else { + tr = http.DefaultTransport } + // Create elasticsearch config for connection cfg := elasticsearch.Config{ Addresses: []string{elasticURL}, Username: username, @@ -60,18 +68,19 @@ func DrainElasticsearchNode(elasticURL, nodeName, username, password string) err Transport: tr, } + // Creates new client es, err := elasticsearch.NewClient(cfg) if err != nil { return fmt.Errorf("failed to create Elasticsearch client: %w", err) } - // Get Elasticsearch node IP + // Get Elasticsearch node IP using the nodeName to delete nodeIP, err := getNodeIP(es, nodeName) if err != nil { return fmt.Errorf("failed to get node IP: %w", err) } - // Exclude the node from routing allocations + // Exclude the node IP from routing allocations err = updateClusterSettings(es, nodeIP) if err != nil { return fmt.Errorf("failed to update cluster settings: %w", err) @@ -99,11 +108,13 @@ func getNodeIP(es *elasticsearch.Client, nodeName string) (string, error) { } defer res.Body.Close() + // Reads the response body, err := io.ReadAll(res.Body) if err != nil { return "", fmt.Errorf("error reading response body: %w", err) } + // Parse response in JSON var nodes []NodeInfo err = json.Unmarshal([]byte(string(body)), &nodes) if err != nil { @@ -125,17 +136,20 @@ func getNodeIP(es *elasticsearch.Client, nodeName string) (string, error) { // updateClusterSettings updates the cluster settings to exclude a specific node IP. func updateClusterSettings(es *elasticsearch.Client, nodeIP string) error { + // _cluster/settings to set settings := map[string]map[string]string{ "persistent": { "cluster.routing.allocation.exclude._ip": nodeIP, }, } + // Parse settings in JSON data, err := json.Marshal(settings) if err != nil { return fmt.Errorf("failed to marshal settings to JSON: %w", err) } + // Execute PUT _cluster/settings command req := bytes.NewReader(data) res, err := es.Cluster.PutSettings(req) if err != nil { @@ -160,6 +174,7 @@ func waitForNodeRemoval(es *elasticsearch.Client, nodeName string) error { } for { + // Get _cat/shards to check if nodeName has any shard inside res, err := es.Cat.Shards( es.Cat.Shards.WithFormat("json"), es.Cat.Shards.WithV(true), @@ -169,31 +184,33 @@ func waitForNodeRemoval(es *elasticsearch.Client, nodeName string) error { } defer res.Body.Close() + // Get response body, err := io.ReadAll(res.Body) if err != nil || string(body) == "" { return fmt.Errorf("error reading response body: %w", err) } + // Parse response in JSON var shards []ShardInfo err = json.Unmarshal([]byte(string(body)), &shards) if err != nil { return fmt.Errorf("error deserializing JSON: %w", err) } + // Check if nodeName has any shards inside it nodeFound := false for _, shard := range shards { - // Assuming `node` field contains the node name if re.MatchString(shard.Node) { nodeFound = true } } + // If nodeFound is false, there are not any shard inside it. It is ready to delete if !nodeFound { log.Printf("node %s is fully empty and ready to delete", nodeName) break } - time.Sleep(10 * time.Second) } return nil @@ -201,11 +218,20 @@ func waitForNodeRemoval(es *elasticsearch.Client, nodeName string) error { // clearClusterSettings removes the node exclusion from cluster settings. func ClearElasticsearchClusterSettings(elasticURL, username, password string) error { - // Configurar http.Transport para desactivar la verificación del certificado - tr := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + + // Check ELASTIC_SSL_INSECURE_SKIP_VERIFY environment variable to skip SSL certificate validation + // for elasticsearch + insecureSkipVerify := globals.GetEnv("ELASTIC_SSL_INSECURE_SKIP_VERIFY", "false") + var tr http.RoundTripper + if insecureSkipVerify == "true" { + tr = &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + } else { + tr = http.DefaultTransport } + // Configure elasticsearch connection cfg := elasticsearch.Config{ Addresses: []string{elasticURL}, Username: username, @@ -213,22 +239,26 @@ func ClearElasticsearchClusterSettings(elasticURL, username, password string) er Transport: tr, } + // Create elastic client es, err := elasticsearch.NewClient(cfg) if err != nil { return fmt.Errorf("failed to create Elasticsearch client: %w", err) } + // _cluster/settings to set after the node deletion settings := map[string]map[string]any{ "persistent": { "cluster.routing.allocation.exclude._ip": nil, }, } + // Parse in JSON data, err := json.Marshal(settings) if err != nil { return fmt.Errorf("failed to marshal settings to JSON: %w", err) } + // Execute PUT _cluster/settings req := bytes.NewReader(data) res, err := es.Cluster.PutSettings(req) if err != nil { diff --git a/internal/google/mig.go b/internal/google/mig.go index 7aebd54..aa092f3 100644 --- a/internal/google/mig.go +++ b/internal/google/mig.go @@ -33,6 +33,7 @@ func AddNodeToMIG(projectID, zone, migName string, debugMode bool) error { if err != nil { return fmt.Errorf("failed to get MIG target size: %v", err) } + log.Printf("Current size of MIG is %d nodes", targetSize) // Get the scaling limits (minimum and maximum) _, maxSize, err := getMIGScalingLimits() @@ -42,7 +43,7 @@ func AddNodeToMIG(projectID, zone, migName string, debugMode bool) error { // Check if the MIG has reached its maximum size if targetSize >= maxSize { - return fmt.Errorf("MIG has reached its maximum size (%d), no further scaling is possible.\n", maxSize) + return fmt.Errorf("MIG has reached its maximum size (%d/%d), no further scaling is possible", targetSize, maxSize) } // Create a request to resize the MIG by increasing the target size by 1 @@ -56,7 +57,11 @@ func AddNodeToMIG(projectID, zone, migName string, debugMode bool) error { // Resize the MIG if not in debug mode if !debugMode { _, err = client.Resize(ctx, req) - return err + if err != nil { + return err + } else { + log.Printf("Scaled up MIG successfully %d/%d", targetSize+1, maxSize) + } } return nil } @@ -77,6 +82,7 @@ func RemoveNodeFromMIG(projectID, zone, migName, elasticURL, elasticUser, elasti if err != nil { return fmt.Errorf("failed to get MIG target size: %v", err) } + log.Printf("Current size of MIG is %d nodes", targetSize) // Get the scaling limits (minimum and maximum) minSize, _, err := getMIGScalingLimits() @@ -86,7 +92,7 @@ func RemoveNodeFromMIG(projectID, zone, migName, elasticURL, elasticUser, elasti // Check if the MIG has reached its minimum size if targetSize <= minSize { - return fmt.Errorf("MIG has reached the minimum size (%d/%d), no further scaling down is possible.\n", targetSize, minSize) + return fmt.Errorf("MIG has reached its minimum size (%d/%d), no further scaling down is possible", targetSize, minSize) } // Get a random instance from the MIG to remove @@ -97,11 +103,12 @@ func RemoveNodeFromMIG(projectID, zone, migName, elasticURL, elasticUser, elasti // If not in debug mode, drain the node from Elasticsearch before removal if !debugMode { - log.Printf("Instance to remove: %s", instanceToRemove) + log.Printf("Instance to remove: %s. Draining from elasticsearch cluster", instanceToRemove) err = elasticsearch.DrainElasticsearchNode(elasticURL, instanceToRemove, elasticUser, elasticPassword) if err != nil { return fmt.Errorf("error draining Elasticsearch node: %v", err) } + log.Printf("Instance drained successfully from elasticsearch cluster") } // Create a request to delete the selected instance and reduce the MIG size @@ -120,6 +127,8 @@ func RemoveNodeFromMIG(projectID, zone, migName, elasticURL, elasticUser, elasti _, err = client.DeleteInstances(ctx, deleteReq) if err != nil { return fmt.Errorf("error deleting instance: %v", err) + } else { + log.Printf("Scaled down MIG successfully %d/%d", targetSize-1, minSize) } // Wait 90 seconds until instance is fully deleted // Google Cloud has a deletion timeout of 90 seconds max @@ -132,6 +141,7 @@ func RemoveNodeFromMIG(projectID, zone, migName, elasticURL, elasticUser, elasti if err != nil { return fmt.Errorf("error clearing Elasticsearch cluster settings: %v", err) } + log.Printf("Cleared up elasticsearch settings for draining node") } return nil @@ -258,7 +268,11 @@ func CheckMIGMinimumSize(projectID, zone, migName string, debugMode bool) error // Resize the MIG if not in debug mode if !debugMode { _, err = client.Resize(ctx, req) - return err + if err != nil { + return err + } else { + log.Printf("Scaled up MIG to its minimum size %d/%d", minSize, minSize) + } } } return nil diff --git a/internal/prometheus/prometheus.go b/internal/prometheus/prometheus.go index 5a1b6bb..8b16c6c 100644 --- a/internal/prometheus/prometheus.go +++ b/internal/prometheus/prometheus.go @@ -6,6 +6,7 @@ import ( "log" "net/http" "os" + "strings" "time" "github.com/prometheus/client_golang/api" @@ -20,10 +21,20 @@ type customTransport struct { // RoundTrip executes a single HTTP transaction and adds custom headers. func (t *customTransport) RoundTrip(req *http.Request) (*http.Response, error) { - // Add custom headers from environment variables - if orgIdHeader := os.Getenv("X_SCOPE_ORGID_HEADER"); orgIdHeader != "" { - req.Header.Set("X-Scope-OrgID", orgIdHeader) + // Get all environment vars + for _, env := range os.Environ() { + // Split in key value + pair := strings.SplitN(env, "=", 2) + key := pair[0] + value := pair[1] + + // If the variable has the prefix PROMETHEUS_HEADER_, add it as a header + if strings.HasPrefix(key, "PROMETHEUS_HEADER_") { + headerName := strings.ReplaceAll(key[len("PROMETHEUS_HEADER_"):], "_", "-") + req.Header.Set(headerName, value) + } } + return t.Transport.RoundTrip(req) }