Skip to content

Commit

Permalink
feat(Clean-up): Cleaned up code, documented and added logs
Browse files Browse the repository at this point in the history
  • Loading branch information
dfradehubs committed Sep 18, 2024
1 parent 280ed0f commit 59785b5
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 29 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
examples/credentials.json
examples/.env
bin/
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ This project is designed to automate the scaling of Google Cloud Managed Instanc
1. **Environment Variables**:
You can configure the following environment variables:
* PROMETHEUS_URL: Prometheus to query about metrics for scaling (Default `http://localhost:9200`)
* PROMETHEUS_UP_CONDITION: Prometheus query that must met to scale up the nodegroup (`Required`)
* PROMETHEUS_DOWN_CONDITION: Prometheus query that must met to scale down the nodegroup (`Required`)
* PROMETHEUS_UP_CONDITION: Prometheus query that must met to scale up the nodegroup. Program just check if the condition is true or false, do not check values (`Required`)
* PROMETHEUS_DOWN_CONDITION: Prometheus query that must met to scale down the nodegroup. Program just check if the condition is true or false, do not check values (`Required`)
* PROMETHEUS_HEADER_*: Prometheus http headers for queries. For example, PROMETHEUS_HEADER_X_Scope_OrgID environment variable adds a HTTP header called X-Scope-OrgID to the http request (`Optional`)
* GCP_PROJECT_ID: Google Cloud project id (Default `example`)
* ZONE: Google Cloud project zone (Default `europe-west1-d`)
* MIG_NAME: Google Cloud MIG to scale (Default `example`)
Expand All @@ -31,8 +32,9 @@ This project is designed to automate the scaling of Google Cloud Managed Instanc
* ELASTIC_URL: Elasticsearch URL to drain nodes (Default `http://elasticsearch:9200`)
* ELASTIC_USER: Elasticsearch user for authentication (Default `elastic`)
* ELASTIC_PASSWORD: Elasticsearch password for authentication (Default `password`)
* ELASTIC_SSL_INSECURE_SKIP_VERIFY: Elasticsearch SSL certificate skip validation (Default `false`)
* COOLDOWN_PERIOD_SEC: Cooldown seconds to wait between scale checks (Default `60`)
* RETRY_INTERVAL_SEC: Retry timeout when an error is reached during the loop (Default `15`)
* RETRY_INTERVAL_SEC: Retry timeout when an error is reached during the loop (Default `60`)
* DEBUG_MODE: Does not execute scalations, just log and send slack messages (Default `false`)
* MIN_SIZE: Minimum size for the nodegroup (Default `1`)
* MAX_SIZE: Maximum size for the nodegroup (Default `1`)
Expand Down
15 changes: 9 additions & 6 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"elasticsearch-vm-autoscaler/internal/google"
"elasticsearch-vm-autoscaler/internal/prometheus"
"elasticsearch-vm-autoscaler/internal/slack"

"log"
"os"
"strconv"
Expand All @@ -13,6 +14,7 @@ import (
)

func main() {

// Prometheus variables to hold configuration for scaling conditions
prometheusURL := globals.GetEnv("PROMETHEUS_URL", "http://localhost:9090")
// Conditions for scaling up or down the MIG
Expand Down Expand Up @@ -40,7 +42,7 @@ func main() {

// Cooldown and retry intervals in seconds, parsed from environment variables
cooldownPeriodSeconds, _ := strconv.ParseInt(globals.GetEnv("COOLDOWN_PERIOD_SEC", "60"), 10, 64)
retryIntervalSeconds, _ := strconv.ParseInt(globals.GetEnv("RETRY_INTERVAL_SEC", "15"), 10, 64)
retryIntervalSeconds, _ := strconv.ParseInt(globals.GetEnv("RETRY_INTERVAL_SEC", "60"), 10, 64)

// Debug mode flag, enabled if "DEBUG_MODE" is set to "true"
debugModeStr := globals.GetEnv("DEBUG_MODE", "false")
Expand All @@ -53,19 +55,20 @@ func main() {

// Main loop to monitor scaling conditions and manage the MIG
for {
// Check if the MIG is at its minimum size
// Check if the MIG is at its minimum size at least. If not, scale it up to minSize
err := google.CheckMIGMinimumSize(projectID, zone, migName, debugMode)
if err != nil {
log.Printf("Error checking minimum size for MIG nodes: %v", err)
}

// Fetch the up and down conditions from Prometheus
// Fetch the scale up and down conditions from Prometheus
upCondition, err := prometheus.GetPrometheusCondition(prometheusURL, prometheusUpCondition)
if err != nil {
log.Printf("Error querying Prometheus: %v", err)
time.Sleep(time.Duration(retryIntervalSeconds) * time.Second)
continue
}

downCondition, err := prometheus.GetPrometheusCondition(prometheusURL, prometheusDownCondition)
if err != nil {
log.Printf("Error querying Prometheus: %v", err)
Expand All @@ -75,7 +78,7 @@ func main() {

// If the up condition is met, add a node to the MIG
if upCondition {
log.Printf("Condition %s met: Creating new node!", prometheusUpCondition)
log.Printf("Up condition %s met: Trying to create a new node!", prometheusUpCondition)
err = google.AddNodeToMIG(projectID, zone, migName, debugMode)
if err != nil {
log.Printf("Error adding node to MIG: %v", err)
Expand All @@ -84,10 +87,10 @@ func main() {
}
// Notify via Slack that a node has been added
if slackWebhookURL != "" {
slack.NotifySlack("Added node to MIG", slackWebhookURL)
slack.NotifySlack("New node created succesfully in MIG", slackWebhookURL)
}
} else if downCondition { // If the down condition is met, remove a node from the MIG
log.Printf("Condition %s met. Removing one node!", prometheusDownCondition)
log.Printf("Down condition %s met. Trying to remove one node!", prometheusDownCondition)
err = google.RemoveNodeFromMIG(projectID, zone, migName, elasticURL, elasticUser, elasticPassword, debugMode)
if err != nil {
log.Printf("Error draining node from MIG: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion examples/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,4 @@ services:
- GOOGLE_APPLICATION_CREDENTIALS=/tmp/credentials.json
- MIN_SIZE=1
- MAX_SIZE=2
#- X-SCOPE-ORGID_HEADER=test
- PROMETHEUS_HEADER_X_Scope_OrgID=test
52 changes: 41 additions & 11 deletions internal/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package elasticsearch
import (
"bytes"
"crypto/tls"
"elasticsearch-vm-autoscaler/internal/globals"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"regexp"
"time"

"github.com/elastic/go-elasticsearch/v8"
)
Expand Down Expand Up @@ -48,30 +48,39 @@ type ShardInfo struct {
// password: The password for basic authentication.
func DrainElasticsearchNode(elasticURL, nodeName, username, password string) error {

// Configurar http.Transport para desactivar la verificación del certificado
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
// Check ELASTIC_SSL_INSECURE_SKIP_VERIFY environment variable to skip SSL certificate validation
// for elasticsearch
insecureSkipVerify := globals.GetEnv("ELASTIC_SSL_INSECURE_SKIP_VERIFY", "false")
var tr http.RoundTripper
if insecureSkipVerify == "true" {
tr = &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
} else {
tr = http.DefaultTransport
}

// Create elasticsearch config for connection
cfg := elasticsearch.Config{
Addresses: []string{elasticURL},
Username: username,
Password: password,
Transport: tr,
}

// Creates new client
es, err := elasticsearch.NewClient(cfg)
if err != nil {
return fmt.Errorf("failed to create Elasticsearch client: %w", err)
}

// Get Elasticsearch node IP
// Get Elasticsearch node IP using the nodeName to delete
nodeIP, err := getNodeIP(es, nodeName)
if err != nil {
return fmt.Errorf("failed to get node IP: %w", err)
}

// Exclude the node from routing allocations
// Exclude the node IP from routing allocations
err = updateClusterSettings(es, nodeIP)
if err != nil {
return fmt.Errorf("failed to update cluster settings: %w", err)
Expand Down Expand Up @@ -99,11 +108,13 @@ func getNodeIP(es *elasticsearch.Client, nodeName string) (string, error) {
}
defer res.Body.Close()

// Reads the response
body, err := io.ReadAll(res.Body)
if err != nil {
return "", fmt.Errorf("error reading response body: %w", err)
}

// Parse response in JSON
var nodes []NodeInfo
err = json.Unmarshal([]byte(string(body)), &nodes)
if err != nil {
Expand All @@ -125,17 +136,20 @@ func getNodeIP(es *elasticsearch.Client, nodeName string) (string, error) {
// updateClusterSettings updates the cluster settings to exclude a specific node IP.
func updateClusterSettings(es *elasticsearch.Client, nodeIP string) error {

// _cluster/settings to set
settings := map[string]map[string]string{
"persistent": {
"cluster.routing.allocation.exclude._ip": nodeIP,
},
}

// Parse settings in JSON
data, err := json.Marshal(settings)
if err != nil {
return fmt.Errorf("failed to marshal settings to JSON: %w", err)
}

// Execute PUT _cluster/settings command
req := bytes.NewReader(data)
res, err := es.Cluster.PutSettings(req)
if err != nil {
Expand All @@ -160,6 +174,7 @@ func waitForNodeRemoval(es *elasticsearch.Client, nodeName string) error {
}

for {
// Get _cat/shards to check if nodeName has any shard inside
res, err := es.Cat.Shards(
es.Cat.Shards.WithFormat("json"),
es.Cat.Shards.WithV(true),
Expand All @@ -169,66 +184,81 @@ func waitForNodeRemoval(es *elasticsearch.Client, nodeName string) error {
}
defer res.Body.Close()

// Get response
body, err := io.ReadAll(res.Body)
if err != nil || string(body) == "" {
return fmt.Errorf("error reading response body: %w", err)
}

// Parse response in JSON
var shards []ShardInfo
err = json.Unmarshal([]byte(string(body)), &shards)
if err != nil {
return fmt.Errorf("error deserializing JSON: %w", err)
}

// Check if nodeName has any shards inside it
nodeFound := false
for _, shard := range shards {
// Assuming `node` field contains the node name
if re.MatchString(shard.Node) {
nodeFound = true
}
}

// If nodeFound is false, there are not any shard inside it. It is ready to delete
if !nodeFound {
log.Printf("node %s is fully empty and ready to delete", nodeName)
break
}

time.Sleep(10 * time.Second)
}

return nil
}

// clearClusterSettings removes the node exclusion from cluster settings.
func ClearElasticsearchClusterSettings(elasticURL, username, password string) error {
// Configurar http.Transport para desactivar la verificación del certificado
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},

// Check ELASTIC_SSL_INSECURE_SKIP_VERIFY environment variable to skip SSL certificate validation
// for elasticsearch
insecureSkipVerify := globals.GetEnv("ELASTIC_SSL_INSECURE_SKIP_VERIFY", "false")
var tr http.RoundTripper
if insecureSkipVerify == "true" {
tr = &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
} else {
tr = http.DefaultTransport
}

// Configure elasticsearch connection
cfg := elasticsearch.Config{
Addresses: []string{elasticURL},
Username: username,
Password: password,
Transport: tr,
}

// Create elastic client
es, err := elasticsearch.NewClient(cfg)
if err != nil {
return fmt.Errorf("failed to create Elasticsearch client: %w", err)
}

// _cluster/settings to set after the node deletion
settings := map[string]map[string]any{
"persistent": {
"cluster.routing.allocation.exclude._ip": nil,
},
}

// Parse in JSON
data, err := json.Marshal(settings)
if err != nil {
return fmt.Errorf("failed to marshal settings to JSON: %w", err)
}

// Execute PUT _cluster/settings
req := bytes.NewReader(data)
res, err := es.Cluster.PutSettings(req)
if err != nil {
Expand Down
24 changes: 19 additions & 5 deletions internal/google/mig.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func AddNodeToMIG(projectID, zone, migName string, debugMode bool) error {
if err != nil {
return fmt.Errorf("failed to get MIG target size: %v", err)
}
log.Printf("Current size of MIG is %d nodes", targetSize)

// Get the scaling limits (minimum and maximum)
_, maxSize, err := getMIGScalingLimits()
Expand All @@ -42,7 +43,7 @@ func AddNodeToMIG(projectID, zone, migName string, debugMode bool) error {

// Check if the MIG has reached its maximum size
if targetSize >= maxSize {
return fmt.Errorf("MIG has reached its maximum size (%d), no further scaling is possible.\n", maxSize)
return fmt.Errorf("MIG has reached its maximum size (%d/%d), no further scaling is possible", targetSize, maxSize)
}

// Create a request to resize the MIG by increasing the target size by 1
Expand All @@ -56,7 +57,11 @@ func AddNodeToMIG(projectID, zone, migName string, debugMode bool) error {
// Resize the MIG if not in debug mode
if !debugMode {
_, err = client.Resize(ctx, req)
return err
if err != nil {
return err
} else {
log.Printf("Scaled up MIG successfully %d/%d", targetSize+1, maxSize)
}
}
return nil
}
Expand All @@ -77,6 +82,7 @@ func RemoveNodeFromMIG(projectID, zone, migName, elasticURL, elasticUser, elasti
if err != nil {
return fmt.Errorf("failed to get MIG target size: %v", err)
}
log.Printf("Current size of MIG is %d nodes", targetSize)

// Get the scaling limits (minimum and maximum)
minSize, _, err := getMIGScalingLimits()
Expand All @@ -86,7 +92,7 @@ func RemoveNodeFromMIG(projectID, zone, migName, elasticURL, elasticUser, elasti

// Check if the MIG has reached its minimum size
if targetSize <= minSize {
return fmt.Errorf("MIG has reached the minimum size (%d/%d), no further scaling down is possible.\n", targetSize, minSize)
return fmt.Errorf("MIG has reached its minimum size (%d/%d), no further scaling down is possible", targetSize, minSize)
}

// Get a random instance from the MIG to remove
Expand All @@ -97,11 +103,12 @@ func RemoveNodeFromMIG(projectID, zone, migName, elasticURL, elasticUser, elasti

// If not in debug mode, drain the node from Elasticsearch before removal
if !debugMode {
log.Printf("Instance to remove: %s", instanceToRemove)
log.Printf("Instance to remove: %s. Draining from elasticsearch cluster", instanceToRemove)
err = elasticsearch.DrainElasticsearchNode(elasticURL, instanceToRemove, elasticUser, elasticPassword)
if err != nil {
return fmt.Errorf("error draining Elasticsearch node: %v", err)
}
log.Printf("Instance drained successfully from elasticsearch cluster")
}

// Create a request to delete the selected instance and reduce the MIG size
Expand All @@ -120,6 +127,8 @@ func RemoveNodeFromMIG(projectID, zone, migName, elasticURL, elasticUser, elasti
_, err = client.DeleteInstances(ctx, deleteReq)
if err != nil {
return fmt.Errorf("error deleting instance: %v", err)
} else {
log.Printf("Scaled down MIG successfully %d/%d", targetSize-1, minSize)
}
// Wait 90 seconds until instance is fully deleted
// Google Cloud has a deletion timeout of 90 seconds max
Expand All @@ -132,6 +141,7 @@ func RemoveNodeFromMIG(projectID, zone, migName, elasticURL, elasticUser, elasti
if err != nil {
return fmt.Errorf("error clearing Elasticsearch cluster settings: %v", err)
}
log.Printf("Cleared up elasticsearch settings for draining node")
}

return nil
Expand Down Expand Up @@ -258,7 +268,11 @@ func CheckMIGMinimumSize(projectID, zone, migName string, debugMode bool) error
// Resize the MIG if not in debug mode
if !debugMode {
_, err = client.Resize(ctx, req)
return err
if err != nil {
return err
} else {
log.Printf("Scaled up MIG to its minimum size %d/%d", minSize, minSize)
}
}
}
return nil
Expand Down
Loading

0 comments on commit 59785b5

Please sign in to comment.