From dfc9c429c479f41d93aac29d28a7a32c5c654676 Mon Sep 17 00:00:00 2001 From: dfradehubs Date: Tue, 17 Sep 2024 12:30:41 +0200 Subject: [PATCH] feat(Elasticsearch): Fixed elasticsearch drain nodes --- examples/docker-compose.yml | 8 +- go.mod | 2 + go.sum | 4 + internal/elasticsearch/elasticsearch.go | 202 +++++++++++++++--------- internal/google/mig.go | 25 ++- 5 files changed, 159 insertions(+), 82 deletions(-) diff --git a/examples/docker-compose.yml b/examples/docker-compose.yml index b39a1cc..b525af4 100644 --- a/examples/docker-compose.yml +++ b/examples/docker-compose.yml @@ -35,12 +35,12 @@ services: - ZONE=europe-west1-d - MIG_NAME=test - SLACK_WEBHOOK_URL="test" - - ELASTIC_URL="http://elasticsearch:9200" - - ELASTIC_USER="elastic" - - ELASTIC_PASSWORD="test" + - ELASTIC_URL=https://elasticsearch:9200 + - ELASTIC_USER=elastic + - ELASTIC_PASSWORD=test - COOLDOWN_PERIOD_SEC=10 - RETRY_INTERVAL_SEC=5 - GOOGLE_APPLICATION_CREDENTIALS=/tmp/credentials.json - MIN_SIZE=1 - MAX_SIZE=2 - - X-SCOPE-ORGID_HEADER=test + #- X-SCOPE-ORGID_HEADER=test diff --git a/go.mod b/go.mod index 6347a23..62430de 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.23.1 require ( cloud.google.com/go/compute v1.28.0 + github.com/elastic/go-elasticsearch/v8 v8.15.0 github.com/prometheus/client_golang v1.20.3 github.com/prometheus/common v0.59.1 github.com/slack-go/slack v0.14.0 @@ -14,6 +15,7 @@ require ( cloud.google.com/go/auth v0.9.0 // indirect cloud.google.com/go/auth/oauth2adapt v0.2.4 // indirect cloud.google.com/go/compute/metadata v0.5.0 // indirect + github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect diff --git a/go.sum b/go.sum index 5b182fe..3b79eed 100644 --- a/go.sum +++ b/go.sum @@ -20,6 +20,10 @@ github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGX github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA= +github.com/elastic/elastic-transport-go/v8 v8.6.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk= +github.com/elastic/go-elasticsearch/v8 v8.15.0 h1:IZyJhe7t7WI3NEFdcHnf6IJXqpRf+8S8QWLtZYYyBYk= +github.com/elastic/go-elasticsearch/v8 v8.15.0/go.mod h1:HCON3zj4btpqs2N1jjsAy4a/fiAul+YBP00mBH4xik8= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= diff --git a/internal/elasticsearch/elasticsearch.go b/internal/elasticsearch/elasticsearch.go index b2b55db..2b24d0b 100644 --- a/internal/elasticsearch/elasticsearch.go +++ b/internal/elasticsearch/elasticsearch.go @@ -2,34 +2,69 @@ package elasticsearch import ( "bytes" + "crypto/tls" + "encoding/json" "fmt" "io" "net/http" - "os/exec" "strings" "time" + + "github.com/elastic/go-elasticsearch/v8" ) +// nodeInfo struct for elasticsearch nodes +type NodeInfo struct { + IP string `json:"ip"` + HeapPercent string `json:"heap.percent"` + RAMPercent string `json:"ram.percent"` + CPU string `json:"cpu"` + Load1m string `json:"load_1m"` + Load5m string `json:"load_5m"` + Load15m string `json:"load_15m"` + NodeRole string `json:"node.role"` + Master string `json:"master"` + Name string `json:"name"` +} + // DrainElasticsearchNode drains an Elasticsearch node and performs a controlled shutdown. // elasticURL: The URL of the Elasticsearch cluster. // nodeName: The name of the node to shut down. // username: The username for basic authentication. // password: The password for basic authentication. func DrainElasticsearchNode(elasticURL, nodeName, username, password string) error { + + // Configurar http.Transport para desactivar la verificación del certificado + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + + cfg := elasticsearch.Config{ + Addresses: []string{elasticURL}, + Username: username, + Password: password, + Transport: tr, + } + + es, err := elasticsearch.NewClient(cfg) + if err != nil { + return fmt.Errorf("failed to create Elasticsearch client: %w", err) + } + // Get Elasticsearch node IP - nodeIP, err := getNodeIP(elasticURL, username, password) + nodeIP, err := getNodeIP(es, nodeName) if err != nil { return fmt.Errorf("failed to get node IP: %w", err) } // Exclude the node from routing allocations - err = updateClusterSettings(elasticURL, username, password, nodeIP) + err = updateClusterSettings(es, nodeIP) if err != nil { return fmt.Errorf("failed to update cluster settings: %w", err) } // Wait until the node is removed from the cluster - err = waitForNodeRemoval(elasticURL, username, password) + err = waitForNodeRemoval(es, nodeName) if err != nil { return fmt.Errorf("failed while waiting for node removal: %w", err) } @@ -38,120 +73,143 @@ func DrainElasticsearchNode(elasticURL, nodeName, username, password string) err } // getNodeIP retrieves the IP address of the Elasticsearch node. -func getNodeIP(elasticURL, username, password string) (string, error) { - cmd := exec.Command("curl", "-s", "-k", "-u", fmt.Sprintf("%s:%s", username, password), fmt.Sprintf("%s/_cat/nodes?v&h=ip,name", elasticURL)) - output, err := cmd.Output() +func getNodeIP(es *elasticsearch.Client, nodeName string) (string, error) { + + // Request to get the nodes information + res, err := es.Cat.Nodes( + es.Cat.Nodes.WithFormat("json"), + es.Cat.Nodes.WithV(true), + ) if err != nil { - return "", fmt.Errorf("failed to execute curl command: %w", err) + return "", fmt.Errorf("failed to get nodes information: %w", err) } + defer res.Body.Close() - lines := strings.Split(string(output), "\n") - for _, line := range lines { - if strings.Contains(line, getHostname()) { - fields := strings.Fields(line) - if len(fields) > 0 { - return fields[0], nil - } + body, err := io.ReadAll(res.Body) + if err != nil { + return "", fmt.Errorf("error reading response body: %w", err) + } + + var nodes []NodeInfo + err = json.Unmarshal([]byte(string(body)), &nodes) + if err != nil { + return "", fmt.Errorf("error deserializing JSON: %w", err) + } + + // Find the IP address for the node with the hostname + for _, node := range nodes { + if node.Name == nodeName { + return node.IP, nil } } + return "", fmt.Errorf("node IP not found") } // updateClusterSettings updates the cluster settings to exclude a specific node IP. -func updateClusterSettings(elasticURL, username, password, nodeIP string) error { - data := fmt.Sprintf(`{ +func updateClusterSettings(es *elasticsearch.Client, nodeIP string) error { + + settings := map[string]map[string]string{ "persistent": { - "cluster.routing.allocation.exclude._ip": "%s" - } - }`, nodeIP) - req, err := http.NewRequest("PUT", fmt.Sprintf("%s/_cluster/settings", elasticURL), bytes.NewBuffer([]byte(data))) + "cluster.routing.allocation.exclude._ip": nodeIP, + }, + } + + data, err := json.Marshal(settings) if err != nil { - return fmt.Errorf("failed to create request: %w", err) + return fmt.Errorf("failed to marshal settings to JSON: %w", err) } - req.SetBasicAuth(username, password) - req.Header.Set("Content-Type", "application/json") - client := &http.Client{} - resp, err := client.Do(req) + req := bytes.NewReader(data) + res, err := es.Cluster.PutSettings(req) if err != nil { - return fmt.Errorf("failed to send request: %w", err) + return fmt.Errorf("failed to update cluster settings: %w", err) } - defer resp.Body.Close() + defer res.Body.Close() - if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(resp.Body) - return fmt.Errorf("error updating cluster settings: %s", string(body)) + if res.IsError() { + return fmt.Errorf("error updating cluster settings: %s", res.String()) } return nil } // waitForNodeRemoval waits for the node to be removed from the cluster. -func waitForNodeRemoval(elasticURL, username, password string) error { +func waitForNodeRemoval(es *elasticsearch.Client, nodeName string) error { + for { - cmd := exec.Command("curl", "-s", "-k", "-u", fmt.Sprintf("%s:%s", username, password), fmt.Sprintf("%s/_cat/shards", elasticURL)) - output, err := cmd.Output() + res, err := es.Cat.Shards( + es.Cat.Shards.WithFormat("json"), + ) if err != nil { - return fmt.Errorf("failed to execute curl command: %w", err) + return fmt.Errorf("failed to get shards information: %w", err) + } + defer res.Body.Close() + + var shards []map[string]interface{} + if err := json.NewDecoder(res.Body).Decode(&shards); err != nil { + return fmt.Errorf("failed to decode shards information: %w", err) + } + + nodeFound := false + for _, shard := range shards { + // Assuming `node` field contains the node name + if node, ok := shard["node"].(string); ok && strings.Contains(node, nodeName) { + nodeFound = true + break + } } - if !strings.Contains(string(output), getHostname()) { + if !nodeFound { break } + time.Sleep(10 * time.Second) } + return nil } -// shutdownServices stops Docker and Nomad services. -func shutdownServices() error { - cmd := exec.Command("sudo", "systemctl", "stop", "docker") - err := cmd.Run() - if err != nil { - return fmt.Errorf("failed to stop Docker: %w", err) +// clearClusterSettings removes the node exclusion from cluster settings. +func ClearElasticsearchClusterSettings(elasticURL, username, password string) error { + // Configurar http.Transport para desactivar la verificación del certificado + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, } - cmd = exec.Command("sudo", "systemctl", "stop", "nomad") - err = cmd.Run() - if err != nil { - return fmt.Errorf("failed to stop Nomad: %w", err) + cfg := elasticsearch.Config{ + Addresses: []string{elasticURL}, + Username: username, + Password: password, + Transport: tr, } - time.Sleep(10 * time.Second) - return nil -} + es, err := elasticsearch.NewClient(cfg) + if err != nil { + return fmt.Errorf("failed to create Elasticsearch client: %w", err) + } -// clearClusterSettings removes the node exclusion from cluster settings. -func ClearElasticsearchClusterSettings(elasticURL, username, password string) error { - data := `{ + settings := map[string]map[string]string{ "persistent": { - "cluster.routing.allocation.exclude._ip": null - } - }` - req, err := http.NewRequest("PUT", fmt.Sprintf("%s/_cluster/settings", elasticURL), bytes.NewBuffer([]byte(data))) + "cluster.routing.allocation.exclude._ip": "", + }, + } + + data, err := json.Marshal(settings) if err != nil { - return fmt.Errorf("failed to create request: %w", err) + return fmt.Errorf("failed to marshal settings to JSON: %w", err) } - req.SetBasicAuth(username, password) - req.Header.Set("Content-Type", "application/json") - client := &http.Client{} - resp, err := client.Do(req) + req := bytes.NewReader(data) + res, err := es.Cluster.PutSettings(req) if err != nil { - return fmt.Errorf("failed to send request: %w", err) + return fmt.Errorf("failed to update cluster settings: %w", err) } - defer resp.Body.Close() + defer res.Body.Close() - if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(resp.Body) - return fmt.Errorf("error clearing cluster settings: %s", string(body)) + if res.IsError() { + return fmt.Errorf("error updating cluster settings: %s", res.String()) } return nil } - -// getHostname retrieves the current hostname of the node. -func getHostname() string { - hostname, _ := exec.Command("hostname").Output() - return strings.TrimSpace(string(hostname)) -} diff --git a/internal/google/mig.go b/internal/google/mig.go index c157f94..c45fcef 100644 --- a/internal/google/mig.go +++ b/internal/google/mig.go @@ -6,6 +6,7 @@ import ( "log" "math/rand" "strconv" + "strings" "elasticsearch-vm-autoscaler/internal/elasticsearch" "elasticsearch-vm-autoscaler/internal/globals" @@ -98,6 +99,7 @@ func RemoveNodeFromMIG(projectID, zone, migName, elasticURL, elasticUser, elasti // If not in debug mode, drain the node from Elasticsearch before removal if !debugMode { + log.Printf("Instance to remove: %s", instanceToRemove) err = elasticsearch.DrainElasticsearchNode(elasticURL, instanceToRemove, elasticUser, elasticPassword) if err != nil { log.Printf("Error draining Elasticsearch node: %v", err) @@ -116,6 +118,12 @@ func RemoveNodeFromMIG(projectID, zone, migName, elasticURL, elasticUser, elasti }, } + // Abandon the instance if not in debug mode + if !debugMode { + _, err = client.AbandonInstances(ctx, abandonReq) + return err + } + // If not in debug mode, remove the elasticsearch node from cluster settings if !debugMode { err = elasticsearch.ClearElasticsearchClusterSettings(elasticURL, elasticUser, elasticPassword) @@ -125,11 +133,6 @@ func RemoveNodeFromMIG(projectID, zone, migName, elasticURL, elasticUser, elasti } } - // Abandon the instance if not in debug mode - if !debugMode { - _, err = client.AbandonInstances(ctx, abandonReq) - return err - } return nil } @@ -161,6 +164,16 @@ func getMIGTargetSize(ctx context.Context, client *compute.InstanceGroupManagers return mig.GetTargetSize(), nil } +// getInstanceNameFromURL parses the Google Cloud instance name to get just the hostname +// and not the full path +func getInstanceNameFromURL(instanceURL string) string { + parts := strings.Split(instanceURL, "/") + if len(parts) > 0 { + return parts[len(parts)-1] + } + return "" +} + // GetInstanceToRemove retrieves a random instance from the MIG to be removed. func GetInstanceToRemove(ctx context.Context, client *compute.InstanceGroupManagersClient, projectID, zone, migName string) (string, error) { // Get the list of instances in the MIG @@ -173,7 +186,7 @@ func GetInstanceToRemove(ctx context.Context, client *compute.InstanceGroupManag } // Randomly select an instance to remove - return instanceNames[rand.Intn(len(instanceNames))], nil + return getInstanceNameFromURL(instanceNames[rand.Intn(len(instanceNames))]), nil } // getMIGInstanceNames retrieves the list of instance names in a Managed Instance Group (MIG).