Skip to content

Commit

Permalink
feat(Elasticsearch): Fixed elasticsearch drain nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
dfradehubs committed Sep 17, 2024
1 parent ef015a1 commit dfc9c42
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 82 deletions.
8 changes: 4 additions & 4 deletions examples/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ services:
- ZONE=europe-west1-d
- MIG_NAME=test
- SLACK_WEBHOOK_URL="test"
- ELASTIC_URL="http://elasticsearch:9200"
- ELASTIC_USER="elastic"
- ELASTIC_PASSWORD="test"
- ELASTIC_URL=https://elasticsearch:9200
- ELASTIC_USER=elastic
- ELASTIC_PASSWORD=test
- COOLDOWN_PERIOD_SEC=10
- RETRY_INTERVAL_SEC=5
- GOOGLE_APPLICATION_CREDENTIALS=/tmp/credentials.json
- MIN_SIZE=1
- MAX_SIZE=2
- X-SCOPE-ORGID_HEADER=test
#- X-SCOPE-ORGID_HEADER=test
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.23.1

require (
cloud.google.com/go/compute v1.28.0
github.com/elastic/go-elasticsearch/v8 v8.15.0
github.com/prometheus/client_golang v1.20.3
github.com/prometheus/common v0.59.1
github.com/slack-go/slack v0.14.0
Expand All @@ -14,6 +15,7 @@ require (
cloud.google.com/go/auth v0.9.0 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.4 // indirect
cloud.google.com/go/compute/metadata v0.5.0 // indirect
github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGX
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA=
github.com/elastic/elastic-transport-go/v8 v8.6.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk=
github.com/elastic/go-elasticsearch/v8 v8.15.0 h1:IZyJhe7t7WI3NEFdcHnf6IJXqpRf+8S8QWLtZYYyBYk=
github.com/elastic/go-elasticsearch/v8 v8.15.0/go.mod h1:HCON3zj4btpqs2N1jjsAy4a/fiAul+YBP00mBH4xik8=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
Expand Down
202 changes: 130 additions & 72 deletions internal/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,69 @@ package elasticsearch

import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
"os/exec"
"strings"
"time"

"github.com/elastic/go-elasticsearch/v8"
)

// nodeInfo struct for elasticsearch nodes
type NodeInfo struct {
IP string `json:"ip"`
HeapPercent string `json:"heap.percent"`
RAMPercent string `json:"ram.percent"`
CPU string `json:"cpu"`
Load1m string `json:"load_1m"`
Load5m string `json:"load_5m"`
Load15m string `json:"load_15m"`
NodeRole string `json:"node.role"`
Master string `json:"master"`
Name string `json:"name"`
}

// DrainElasticsearchNode drains an Elasticsearch node and performs a controlled shutdown.
// elasticURL: The URL of the Elasticsearch cluster.
// nodeName: The name of the node to shut down.
// username: The username for basic authentication.
// password: The password for basic authentication.
func DrainElasticsearchNode(elasticURL, nodeName, username, password string) error {

// Configurar http.Transport para desactivar la verificación del certificado
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}

cfg := elasticsearch.Config{
Addresses: []string{elasticURL},
Username: username,
Password: password,
Transport: tr,
}

es, err := elasticsearch.NewClient(cfg)
if err != nil {
return fmt.Errorf("failed to create Elasticsearch client: %w", err)
}

// Get Elasticsearch node IP
nodeIP, err := getNodeIP(elasticURL, username, password)
nodeIP, err := getNodeIP(es, nodeName)
if err != nil {
return fmt.Errorf("failed to get node IP: %w", err)
}

// Exclude the node from routing allocations
err = updateClusterSettings(elasticURL, username, password, nodeIP)
err = updateClusterSettings(es, nodeIP)
if err != nil {
return fmt.Errorf("failed to update cluster settings: %w", err)
}

// Wait until the node is removed from the cluster
err = waitForNodeRemoval(elasticURL, username, password)
err = waitForNodeRemoval(es, nodeName)
if err != nil {
return fmt.Errorf("failed while waiting for node removal: %w", err)
}
Expand All @@ -38,120 +73,143 @@ func DrainElasticsearchNode(elasticURL, nodeName, username, password string) err
}

// getNodeIP retrieves the IP address of the Elasticsearch node.
func getNodeIP(elasticURL, username, password string) (string, error) {
cmd := exec.Command("curl", "-s", "-k", "-u", fmt.Sprintf("%s:%s", username, password), fmt.Sprintf("%s/_cat/nodes?v&h=ip,name", elasticURL))
output, err := cmd.Output()
func getNodeIP(es *elasticsearch.Client, nodeName string) (string, error) {

// Request to get the nodes information
res, err := es.Cat.Nodes(
es.Cat.Nodes.WithFormat("json"),
es.Cat.Nodes.WithV(true),
)
if err != nil {
return "", fmt.Errorf("failed to execute curl command: %w", err)
return "", fmt.Errorf("failed to get nodes information: %w", err)
}
defer res.Body.Close()

lines := strings.Split(string(output), "\n")
for _, line := range lines {
if strings.Contains(line, getHostname()) {
fields := strings.Fields(line)
if len(fields) > 0 {
return fields[0], nil
}
body, err := io.ReadAll(res.Body)
if err != nil {
return "", fmt.Errorf("error reading response body: %w", err)
}

var nodes []NodeInfo
err = json.Unmarshal([]byte(string(body)), &nodes)
if err != nil {
return "", fmt.Errorf("error deserializing JSON: %w", err)
}

// Find the IP address for the node with the hostname
for _, node := range nodes {
if node.Name == nodeName {
return node.IP, nil
}
}

return "", fmt.Errorf("node IP not found")
}

// updateClusterSettings updates the cluster settings to exclude a specific node IP.
func updateClusterSettings(elasticURL, username, password, nodeIP string) error {
data := fmt.Sprintf(`{
func updateClusterSettings(es *elasticsearch.Client, nodeIP string) error {

settings := map[string]map[string]string{
"persistent": {
"cluster.routing.allocation.exclude._ip": "%s"
}
}`, nodeIP)
req, err := http.NewRequest("PUT", fmt.Sprintf("%s/_cluster/settings", elasticURL), bytes.NewBuffer([]byte(data)))
"cluster.routing.allocation.exclude._ip": nodeIP,
},
}

data, err := json.Marshal(settings)
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
return fmt.Errorf("failed to marshal settings to JSON: %w", err)
}
req.SetBasicAuth(username, password)
req.Header.Set("Content-Type", "application/json")

client := &http.Client{}
resp, err := client.Do(req)
req := bytes.NewReader(data)
res, err := es.Cluster.PutSettings(req)
if err != nil {
return fmt.Errorf("failed to send request: %w", err)
return fmt.Errorf("failed to update cluster settings: %w", err)
}
defer resp.Body.Close()
defer res.Body.Close()

if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("error updating cluster settings: %s", string(body))
if res.IsError() {
return fmt.Errorf("error updating cluster settings: %s", res.String())
}

return nil
}

// waitForNodeRemoval waits for the node to be removed from the cluster.
func waitForNodeRemoval(elasticURL, username, password string) error {
func waitForNodeRemoval(es *elasticsearch.Client, nodeName string) error {

for {
cmd := exec.Command("curl", "-s", "-k", "-u", fmt.Sprintf("%s:%s", username, password), fmt.Sprintf("%s/_cat/shards", elasticURL))
output, err := cmd.Output()
res, err := es.Cat.Shards(
es.Cat.Shards.WithFormat("json"),
)
if err != nil {
return fmt.Errorf("failed to execute curl command: %w", err)
return fmt.Errorf("failed to get shards information: %w", err)
}
defer res.Body.Close()

var shards []map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&shards); err != nil {
return fmt.Errorf("failed to decode shards information: %w", err)
}

nodeFound := false
for _, shard := range shards {
// Assuming `node` field contains the node name
if node, ok := shard["node"].(string); ok && strings.Contains(node, nodeName) {
nodeFound = true
break
}
}

if !strings.Contains(string(output), getHostname()) {
if !nodeFound {
break
}

time.Sleep(10 * time.Second)
}

return nil
}

// shutdownServices stops Docker and Nomad services.
func shutdownServices() error {
cmd := exec.Command("sudo", "systemctl", "stop", "docker")
err := cmd.Run()
if err != nil {
return fmt.Errorf("failed to stop Docker: %w", err)
// clearClusterSettings removes the node exclusion from cluster settings.
func ClearElasticsearchClusterSettings(elasticURL, username, password string) error {
// Configurar http.Transport para desactivar la verificación del certificado
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}

cmd = exec.Command("sudo", "systemctl", "stop", "nomad")
err = cmd.Run()
if err != nil {
return fmt.Errorf("failed to stop Nomad: %w", err)
cfg := elasticsearch.Config{
Addresses: []string{elasticURL},
Username: username,
Password: password,
Transport: tr,
}

time.Sleep(10 * time.Second)
return nil
}
es, err := elasticsearch.NewClient(cfg)
if err != nil {
return fmt.Errorf("failed to create Elasticsearch client: %w", err)
}

// clearClusterSettings removes the node exclusion from cluster settings.
func ClearElasticsearchClusterSettings(elasticURL, username, password string) error {
data := `{
settings := map[string]map[string]string{
"persistent": {
"cluster.routing.allocation.exclude._ip": null
}
}`
req, err := http.NewRequest("PUT", fmt.Sprintf("%s/_cluster/settings", elasticURL), bytes.NewBuffer([]byte(data)))
"cluster.routing.allocation.exclude._ip": "",
},
}

data, err := json.Marshal(settings)
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
return fmt.Errorf("failed to marshal settings to JSON: %w", err)
}
req.SetBasicAuth(username, password)
req.Header.Set("Content-Type", "application/json")

client := &http.Client{}
resp, err := client.Do(req)
req := bytes.NewReader(data)
res, err := es.Cluster.PutSettings(req)
if err != nil {
return fmt.Errorf("failed to send request: %w", err)
return fmt.Errorf("failed to update cluster settings: %w", err)
}
defer resp.Body.Close()
defer res.Body.Close()

if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("error clearing cluster settings: %s", string(body))
if res.IsError() {
return fmt.Errorf("error updating cluster settings: %s", res.String())
}

return nil
}

// getHostname retrieves the current hostname of the node.
func getHostname() string {
hostname, _ := exec.Command("hostname").Output()
return strings.TrimSpace(string(hostname))
}
25 changes: 19 additions & 6 deletions internal/google/mig.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log"
"math/rand"
"strconv"
"strings"

"elasticsearch-vm-autoscaler/internal/elasticsearch"
"elasticsearch-vm-autoscaler/internal/globals"
Expand Down Expand Up @@ -98,6 +99,7 @@ func RemoveNodeFromMIG(projectID, zone, migName, elasticURL, elasticUser, elasti

// If not in debug mode, drain the node from Elasticsearch before removal
if !debugMode {
log.Printf("Instance to remove: %s", instanceToRemove)
err = elasticsearch.DrainElasticsearchNode(elasticURL, instanceToRemove, elasticUser, elasticPassword)
if err != nil {
log.Printf("Error draining Elasticsearch node: %v", err)
Expand All @@ -116,6 +118,12 @@ func RemoveNodeFromMIG(projectID, zone, migName, elasticURL, elasticUser, elasti
},
}

// Abandon the instance if not in debug mode
if !debugMode {
_, err = client.AbandonInstances(ctx, abandonReq)
return err
}

// If not in debug mode, remove the elasticsearch node from cluster settings
if !debugMode {
err = elasticsearch.ClearElasticsearchClusterSettings(elasticURL, elasticUser, elasticPassword)
Expand All @@ -125,11 +133,6 @@ func RemoveNodeFromMIG(projectID, zone, migName, elasticURL, elasticUser, elasti
}
}

// Abandon the instance if not in debug mode
if !debugMode {
_, err = client.AbandonInstances(ctx, abandonReq)
return err
}
return nil
}

Expand Down Expand Up @@ -161,6 +164,16 @@ func getMIGTargetSize(ctx context.Context, client *compute.InstanceGroupManagers
return mig.GetTargetSize(), nil
}

// getInstanceNameFromURL parses the Google Cloud instance name to get just the hostname
// and not the full path
func getInstanceNameFromURL(instanceURL string) string {
parts := strings.Split(instanceURL, "/")
if len(parts) > 0 {
return parts[len(parts)-1]
}
return ""
}

// GetInstanceToRemove retrieves a random instance from the MIG to be removed.
func GetInstanceToRemove(ctx context.Context, client *compute.InstanceGroupManagersClient, projectID, zone, migName string) (string, error) {
// Get the list of instances in the MIG
Expand All @@ -173,7 +186,7 @@ func GetInstanceToRemove(ctx context.Context, client *compute.InstanceGroupManag
}

// Randomly select an instance to remove
return instanceNames[rand.Intn(len(instanceNames))], nil
return getInstanceNameFromURL(instanceNames[rand.Intn(len(instanceNames))]), nil
}

// getMIGInstanceNames retrieves the list of instance names in a Managed Instance Group (MIG).
Expand Down

0 comments on commit dfc9c42

Please sign in to comment.