Skip to content

Commit

Permalink
feat: handle Keda scaled object
Browse files Browse the repository at this point in the history
  • Loading branch information
Tchoupinax committed May 7, 2024
1 parent 00b4029 commit 61146f8
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 56 deletions.
26 changes: 17 additions & 9 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"flag"
"os"

utils "github.com/Tchoupinax/k8s-labels-migrator/utils"
istio "istio.io/client-go/pkg/clientset/versioned"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
Expand All @@ -20,7 +22,7 @@ func main() {

clientset, err := kubernetes.NewForConfig(config)
istioClient, err := istio.NewForConfig(config)
//crdClient, err := dynamic.NewForConfig(config)
crdClient, err := dynamic.NewForConfig(config)
if err != nil {
panic(err.Error())
}
Expand All @@ -30,6 +32,11 @@ func main() {
var labelToChangeKey = ""
var labelToChangeValue = ""
var goalOfOperationIsToRemoveLabel = false
var matcherLabels = []string{
"app.kubernetes.io/instance",
"app.kubernetes.io/name",
"app.kubernetes.io/version",
}

flag.StringVar(&deploymentName, "deployment", "", "Name of the deployment to edit label")
flag.StringVar(&namespace, "namespace", "", "Namespace of the deployment to edit label")
Expand All @@ -39,21 +46,21 @@ func main() {
flag.Parse()

if deploymentName == "" {
logError("Deployment name is mandatory")
utils.LogError("Deployment name is mandatory")
os.Exit(1)
}
if namespace == "" {
logError("Namespace is mandatory")
utils.LogError("Namespace is mandatory")
os.Exit(1)
}
if labelToChangeValue == "" && !goalOfOperationIsToRemoveLabel {
logError("label value is mandatory")
utils.LogError("label value is mandatory")
os.Exit(1)
}

logInfo("Analyzing your cluster...")
utils.LogInfo("Analyzing your cluster...")
resourcesAnalyze(clientset, istioClient, namespace, deploymentName, labelToChangeKey)
logSuccess("Cluster ready")
utils.LogSuccess("Cluster ready")
displaySummary(
namespace,
deploymentName,
Expand All @@ -64,26 +71,27 @@ func main() {

c := askForConfirmation("Do you validate these parameters?")
if !c {
logInfo("Operation aborted by the user")
utils.LogInfo("Operation aborted by the user")
os.Exit(0)
}
c2 := askForConfirmation("I confirm that I have no gitops tool overriding my config (e.g. ArgoCD auto-sync)")
if !c2 {
logInfo("Operation aborted by the user")
utils.LogInfo("Operation aborted by the user")
os.Exit(0)
}

MigrationWorkflow(
namespace,
clientset,
istioClient,
crdClient,
deploymentName,
labelToChangeKey,
labelToChangeValue,
goalOfOperationIsToRemoveLabel,
)

if labelToChangeKey == "app.kubernetes.io/name" {
if arrayContains(matcherLabels, labelToChangeKey) {
AddLabelToServiceSelector(
namespace,
clientset,
Expand Down
5 changes: 3 additions & 2 deletions pod-status.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"

utils "github.com/Tchoupinax/k8s-labels-migrator/utils"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
Expand All @@ -14,9 +15,9 @@ func waitUntilAllPodAreReady(
) bool {
deployment, err := clientset.AppsV1().Deployments(namespace).Get(context.TODO(), deploymentName, metav1.GetOptions{})
if err != nil {
logError(err.Error())
utils.LogError(err.Error())
return false
}
currentDeploymentHasAllPodReady := deployment.Status.Replicas-deployment.Status.ReadyReplicas == 0 && deployment.Status.Replicas > 1
currentDeploymentHasAllPodReady := deployment.Status.Replicas-deployment.Status.ReadyReplicas == 0 && deployment.Status.Replicas > 0
return currentDeploymentHasAllPodReady
}
69 changes: 69 additions & 0 deletions resources/keda.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package keda

import (
"context"
"fmt"
"time"

utils "github.com/Tchoupinax/k8s-labels-migrator/utils"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
)

// https://keda.sh/docs/2.13/concepts/scaling-deployments

func PauseScaledObject(
crdClient *dynamic.DynamicClient,
clientset *kubernetes.Clientset,
deploymentName string,
namespace string,
) {
crdGVR := schema.GroupVersionResource{
Group: "keda.sh",
Version: "v1alpha1",
Resource: "scaledobjects",
}
kedaScaledObject, _ := crdClient.Resource(crdGVR).Namespace(namespace).Get(context.TODO(), deploymentName, v1.GetOptions{})
if kedaScaledObject != nil {
utils.LogInfo("Keda Scaled Object detected")
// Add the annotation "autoscaling.keda.sh/paused"
kedaScaledObject.Object["metadata"].(map[string]interface{})["annotations"].(map[string]interface{})["autoscaling.keda.sh/paused"] = "true"
crdClient.Resource(crdGVR).Namespace(namespace).Update(context.TODO(), kedaScaledObject, v1.UpdateOptions{})
utils.LogSuccess("Keda object paused ⏸️")
utils.LogBlocking("Waiting randomly 5 seconds to ensure keda controller registered the update")
for range 5 {

Check failure on line 36 in resources/keda.go

View workflow job for this annotation

GitHub Actions / lint

cannot range over 5 (untyped int constant) (typecheck)

Check failure on line 36 in resources/keda.go

View workflow job for this annotation

GitHub Actions / lint

cannot range over 5 (untyped int constant) (typecheck)
utils.LogBlockingDot()
time.Sleep(1 * time.Second)
}
fmt.Println()
} else {
utils.LogInfo("Any Keda Scaled Object detected")
}
}

func ResumeScaledObject(
crdClient *dynamic.DynamicClient,
clientset *kubernetes.Clientset,
deploymentName string,
namespace string,
) {
crdGVR := schema.GroupVersionResource{
Group: "keda.sh",
Version: "v1alpha1",
Resource: "scaledobjects",
}
kedaScaledObject, _ := crdClient.Resource(crdGVR).Namespace(namespace).Get(context.TODO(), deploymentName, v1.GetOptions{})
if kedaScaledObject != nil {
utils.LogInfo("Keda Scaled Object detected")
delete(
kedaScaledObject.Object["metadata"].(map[string]interface{})["annotations"].(map[string]interface{}),
"autoscaling.keda.sh/paused",
)
utils.LogSuccess("Keda object resumed ▶️")
crdClient.Resource(crdGVR).Namespace(namespace).Update(context.TODO(), kedaScaledObject, v1.UpdateOptions{})
} else {
utils.LogInfo("Any Keda Scaled Object detected")
}
}
59 changes: 33 additions & 26 deletions steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,20 @@ import (
"strings"
"time"

keda "github.com/Tchoupinax/k8s-labels-migrator/resources"
utils "github.com/Tchoupinax/k8s-labels-migrator/utils"
istio "istio.io/client-go/pkg/clientset/versioned"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
)

func MigrationWorkflow(
namespace string,
clientset *kubernetes.Clientset,
istioClient *istio.Clientset,
crdClient *dynamic.DynamicClient,
deploymentName string,
changingLabelKey string,
changingLabelValue string,
Expand All @@ -26,11 +30,11 @@ func MigrationWorkflow(
currentDeployment, err := clientset.AppsV1().Deployments(namespace).Get(context.TODO(), deploymentName, metav1.GetOptions{})
currentDestinationRule, _ := istioClient.NetworkingV1alpha3().DestinationRules(namespace).Get(context.TODO(), deploymentName, v1.GetOptions{})
if err != nil {
logError("No deployment found.")
utils.LogError("No deployment found.")
os.Exit(1)
}

logInfo("1. Creating the clone deployment")
utils.LogInfo("1. Creating the clone deployment")
var temporalDeployment = *currentDeployment
temporalDeployment.GenerateName = fmt.Sprintf("%s-%s", currentDeployment.Name, "changing-label-tmp")
temporalDeployment.Name = fmt.Sprintf("%s-%s", currentDeployment.Name, "changing-label-tmp")
Expand All @@ -40,44 +44,47 @@ func MigrationWorkflow(
_, err = clientset.AppsV1().Deployments(namespace).Create(context.TODO(), &temporalDeployment, metav1.CreateOptions{})
if err != nil {
if strings.Contains(err.Error(), "already exists, the server was not able to generate a unique name for the object") {
logWarning("1. Temporary deployment already created. Continue...")
utils.LogWarning("1. Temporary deployment already created. Continue...")
}
} else {
logSuccess("1. Deployment replicated")
utils.LogSuccess("1. Deployment replicated")
}

logInfo("2. Updating the service...")
utils.LogInfo("2. Updating the service...")
var temporalService = *currentService
delete(temporalService.Spec.Selector, changingLabelKey)
_, err = clientset.CoreV1().Services(namespace).Update(context.TODO(), &temporalService, metav1.UpdateOptions{})
check(err)
logSuccess("2. Service updated")
utils.LogSuccess("2. Service updated")

logInfo("2.1 Updating Istio destination rules...")
utils.LogInfo("2.1 Updating Istio destination rules...")
var temporalDestinationRule = *currentDestinationRule
delete(temporalDestinationRule.Spec.Subsets[0].Labels, changingLabelKey)
_, err = istioClient.NetworkingV1alpha3().DestinationRules(namespace).Update(context.TODO(), &temporalDestinationRule, metav1.UpdateOptions{})
check(err)
logSuccess("2.1 Istio destination rules updated")
utils.LogSuccess("2.1 Istio destination rules updated")

logBlocking("3. Waiting while pods are not totally ready to handle traffic")
utils.LogInfo("2.2 Keda")
keda.PauseScaledObject(crdClient, clientset, deploymentName, namespace)

utils.LogBlocking("3. Waiting while pods are not totally ready to handle traffic")
areAllPodReady := false
for !areAllPodReady {
logBlockingDot()
utils.LogBlockingDot()
time.Sleep(1 * time.Second)
areAllPodReady =
waitUntilAllPodAreReady(clientset, namespace, currentDeployment.Name) &&
waitUntilAllPodAreReady(clientset, namespace, fmt.Sprintf("%s-%s", currentDeployment.Name, "changing-label-tmp"))
}
fmt.Println("")

logInfo("4. Delete the old deployment...")
utils.LogInfo("4. Delete the old deployment...")
// Delete the old deployment
deleteError := clientset.AppsV1().Deployments(namespace).Delete(context.TODO(), currentDeployment.Name, *metav1.NewDeleteOptions(0))
check(deleteError)
logSuccess("4. Old deployment deleted")
utils.LogSuccess("4. Old deployment deleted")

logInfo("5. Creating the original deployment with modified label")
utils.LogInfo("5. Creating the original deployment with modified label")
var futureOfficialDeployment = *currentDeployment
futureOfficialDeployment.GenerateName = deploymentName
futureOfficialDeployment.Name = deploymentName
Expand Down Expand Up @@ -106,23 +113,23 @@ func MigrationWorkflow(
}
fmt.Println(err)
}
logSuccess("5. Deployment created")
utils.LogSuccess("5. Deployment created")

logBlocking("6. Waiting while pods are not totally ready to handle traffic")
utils.LogBlocking("6. Waiting while pods are not totally ready to handle traffic")
areAllPodReady = false
for !areAllPodReady {
logBlockingDot()
utils.LogBlockingDot()
time.Sleep(1 * time.Second)
areAllPodReady = waitUntilAllPodAreReady(clientset, namespace, currentDeployment.Name)
}
fmt.Println("")

logInfo("7. Deleting temporal deployment...")
utils.LogInfo("7. Deleting temporal deployment...")
time.Sleep(1 * time.Second)
// Delete the temporal deployment
errDeleteTmpDeploy := clientset.AppsV1().Deployments(namespace).Delete(context.TODO(), fmt.Sprintf("%s-%s", currentDeployment.Name, "changing-label-tmp"), metav1.DeleteOptions{})
check(errDeleteTmpDeploy)
logSuccess("7. Temporary deployment deleted")
utils.LogSuccess("7. Temporary deployment deleted")
}

func AddLabelToServiceSelector(
Expand All @@ -133,8 +140,8 @@ func AddLabelToServiceSelector(
changingLabelValue string,
removeLabel bool,
) {
logInfo("====== Additionnal step ====================================")
logInfo("8. Add the label as a selector in the service...")
utils.LogInfo("====== Additionnal step ====================================")
utils.LogInfo("8. Add the label as a selector in the service...")
// Get the current service
currentService, err := clientset.CoreV1().Services(namespace).Get(context.TODO(), applicationName, metav1.GetOptions{})
check(err)
Expand All @@ -152,8 +159,8 @@ func AddLabelToServiceSelector(
_, updateServiceError := clientset.CoreV1().Services(namespace).Update(context.TODO(), &futureService, metav1.UpdateOptions{})
check(updateServiceError)

logSuccess("8. Service configured")
logInfo("============================================================")
utils.LogSuccess("8. Service configured")
utils.LogInfo("============================================================")
}

func AddLabelToIstioDestinatonRulesSelector(
Expand All @@ -165,8 +172,8 @@ func AddLabelToIstioDestinatonRulesSelector(
changingLabelValue string,
removeLabel bool,
) {
logInfo("====== Additionnal step ====================================")
logInfo("9. Add the label as a selector in istio destination rules...")
utils.LogInfo("====== Additionnal step ====================================")
utils.LogInfo("9. Add the label as a selector in istio destination rules...")
currentDestinationRule, err := istioClient.NetworkingV1alpha3().DestinationRules(namespace).Get(context.TODO(), applicationName, v1.GetOptions{})
check(err)

Expand All @@ -183,6 +190,6 @@ func AddLabelToIstioDestinatonRulesSelector(
_, updateDestinationRuleError := istioClient.NetworkingV1alpha3().DestinationRules(namespace).Update(context.TODO(), &futureDestinationRule, metav1.UpdateOptions{})
check(updateDestinationRuleError)

logSuccess("9. Istio destination rules configured")
logInfo("============================================================")
utils.LogSuccess("9. Istio destination rules configured")
utils.LogInfo("============================================================")
}
Loading

0 comments on commit 61146f8

Please sign in to comment.