From 966c0c8192dd92bf98f65a82833e586f61ed353a Mon Sep 17 00:00:00 2001 From: Maciej Golaszewski Date: Thu, 5 Sep 2024 20:41:56 +0200 Subject: [PATCH] added context to retry removed unnecessary functions --- integration/e2e_test.go | 320 +++++++++++++++++++--------------------- 1 file changed, 152 insertions(+), 168 deletions(-) diff --git a/integration/e2e_test.go b/integration/e2e_test.go index ed1e8e2..df83548 100644 --- a/integration/e2e_test.go +++ b/integration/e2e_test.go @@ -20,6 +20,8 @@ limitations under the License. package e2e_test import ( + "context" + "errors" "fmt" "os" "os/exec" @@ -55,14 +57,15 @@ func init() { // kubectl should be setup so it uses the kubeconfig of the management cluster by default. func TestBasic(t *testing.T) { t.Logf("Cluster to setup is in %s", BasicClusterPath) + ctx := context.Background() - setupCheck(t) + setupCheck(ctx, t) t.Cleanup(teardownCluster) - t.Run("DeployCluster", func(t *testing.T) { deployCluster(t, BasicClusterPath) }) - t.Run("VerifyCluster", func(t *testing.T) { verifyCluster(t) }) - t.Run("DeployMicrobot", func(t *testing.T) { deployMicrobot(t) }) - t.Run("UpgradeClusterRollout", func(t *testing.T) { upgradeCluster(t, "RollingUpgrade") }) + t.Run("DeployCluster", func(t *testing.T) { deployCluster(ctx, t, BasicClusterPath) }) + t.Run("VerifyCluster", func(t *testing.T) { verifyCluster(ctx, t) }) + t.Run("DeployMicrobot", func(t *testing.T) { deployMicrobot(ctx, t) }) + t.Run("UpgradeClusterRollout", func(t *testing.T) { upgradeCluster(ctx, t, "RollingUpgrade") }) // Important: the cluster is deleted in the Cleanup function // which is called after all subtests are finished. @@ -73,14 +76,15 @@ func TestBasic(t *testing.T) { // This cluster will be upgraded via an in-place upgrade. func TestInPlaceUpgrade(t *testing.T) { t.Logf("Cluster for in-place upgrade test setup is in %s", InPlaceUpgradeClusterPath) + ctx := context.Background() - setupCheck(t) + setupCheck(ctx, t) t.Cleanup(teardownCluster) - t.Run("DeployCluster", func(t *testing.T) { deployCluster(t, InPlaceUpgradeClusterPath) }) - t.Run("VerifyCluster", func(t *testing.T) { verifyCluster(t) }) - t.Run("DeployMicrobot", func(t *testing.T) { deployMicrobot(t) }) - t.Run("UpgradeClusterInplace", func(t *testing.T) { upgradeCluster(t, "InPlaceUpgrade") }) + t.Run("DeployCluster", func(t *testing.T) { deployCluster(ctx, t, InPlaceUpgradeClusterPath) }) + t.Run("VerifyCluster", func(t *testing.T) { verifyCluster(ctx, t) }) + t.Run("DeployMicrobot", func(t *testing.T) { deployMicrobot(ctx, t) }) + t.Run("UpgradeClusterInplace", func(t *testing.T) { upgradeCluster(ctx, t, "InPlaceUpgrade") }) // Important: the cluster is deleted in the Cleanup function // which is called after all subtests are finished. @@ -92,15 +96,16 @@ func TestInPlaceUpgrade(t *testing.T) { // helm have to be available in the caller's path. func TestDisableDefaultCNI(t *testing.T) { t.Logf("Cluster to setup is in %s", DisableDefaultCNIClusterPath) + ctx := context.Background() - setupCheck(t) + setupCheck(ctx, t) t.Cleanup(teardownCluster) - t.Run("DeployCluster", func(t *testing.T) { deployCluster(t, DisableDefaultCNIClusterPath) }) - t.Run("ValidateNoCalico", func(t *testing.T) { validateNoCalico(t) }) + t.Run("DeployCluster", func(t *testing.T) { deployCluster(ctx, t, DisableDefaultCNIClusterPath) }) + t.Run("ValidateNoCalico", func(t *testing.T) { validateNoCalico(ctx, t) }) t.Run("installCilium", func(t *testing.T) { installCilium(t) }) - t.Run("validateCilium", func(t *testing.T) { validateCilium(t) }) - t.Run("VerifyCluster", func(t *testing.T) { verifyCluster(t) }) + t.Run("validateCilium", func(t *testing.T) { validateCilium(ctx, t) }) + t.Run("VerifyCluster", func(t *testing.T) { verifyCluster(ctx, t) }) // Important: the cluster is deleted in the Cleanup function // which is called after all subtests are finished. @@ -116,21 +121,21 @@ func checkBinary(t testing.TB, binaryName string) { } // setupCheck checks that the environment is ready to run the tests. -func setupCheck(t testing.TB) { - +func setupCheck(ctx context.Context, t testing.TB) { for _, binaryName := range []string{"kubectl", "clusterctl", "helm"} { checkBinary(t, binaryName) } t.Logf("Waiting for the MicroK8s providers to deploy on the management cluster.") - waitForPod(t, "capi-microk8s-bootstrap-controller-manager", "capi-microk8s-bootstrap-system") - waitForPod(t, "capi-microk8s-control-plane-controller-manager", "capi-microk8s-control-plane-system") + waitForPod(ctx, t, "capi-microk8s-bootstrap-controller-manager", "capi-microk8s-bootstrap-system") + waitForPod(ctx, t, "capi-microk8s-control-plane-controller-manager", "capi-microk8s-control-plane-system") } // waitForPod waits for a pod to be available. -func waitForPod(t testing.TB, pod string, ns string) { - - command := []string{"kubectl", "wait", "--timeout=15s", "--for=condition=available", "deploy/" + pod, "-n", ns} - if err := retryableCommand(t, command); err != nil { +func waitForPod(ctx context.Context, t testing.TB, pod string, ns string) { + if err := retryFor(ctx, retryMaxAttempts, secondsBetweenAttempts*time.Second, func() error { + _, err := execCommand(t, "kubectl", "wait", "--timeout=15s", "--for=condition=available", "deploy/"+pod, "-n", ns) + return err + }); err != nil { t.Fatal(err) } } @@ -152,119 +157,132 @@ func teardownCluster() { } // deployCluster deploys a cluster using the manifest in CLUSTER_MANIFEST_FILE. -func deployCluster(t testing.TB, clusterManifestFile string) { +func deployCluster(ctx context.Context, t testing.TB, clusterManifestFile string) { t.Logf("Setting up the cluster using %s", clusterManifestFile) - command := []string{"kubectl", "apply", "-f", clusterManifestFile} - cmd := exec.Command(command[0], command[1:]...) - outputBytes, err := cmd.CombinedOutput() - if err != nil { - t.Error(string(outputBytes)) - t.Fatalf("Failed to create the requested cluster. %s", err) + if _, err := execCommand(t, "kubectl", "apply", "-f", clusterManifestFile); err != nil { + t.Fatalf("Failed to get the name of the cluster. %s", err) } time.Sleep(30 * time.Second) - command = []string{"kubectl", "get", "cluster", "--no-headers", "-o", "custom-columns=:metadata.name"} - cmd = exec.Command(command[0], command[1:]...) - outputBytes, err = cmd.CombinedOutput() + output, err := execCommand(t, "kubectl", "get", "cluster", "--no-headers", "-o", "custom-columns=:metadata.name") if err != nil { - t.Error(string(outputBytes)) t.Fatalf("Failed to get the name of the cluster. %s", err) } - cluster := strings.Trim(string(outputBytes), "\n") + cluster := strings.Trim(output, "\n") t.Logf("Cluster name is %s", cluster) - command = []string{"clusterctl", "get", "kubeconfig", cluster} - testFn := func(output string) bool { + if err = retryFor(ctx, retryMaxAttempts, secondsBetweenAttempts*time.Second, func() error { + output, err = execCommand(t, "clusterctl", "get", "kubeconfig", cluster) + if err != nil { + return err + } + cfg := strings.Trim(output, "\n") err = os.WriteFile(KUBECONFIG, []byte(cfg), 0644) if err != nil { t.Fatalf("Could not persist the targets kubeconfig file. %s", err) } t.Logf("Target's kubeconfig file is at %s", KUBECONFIG) - t.Log(cfg) - return true - } - if err = retryableCommandWithConditionFunction(t, command, testFn); err != nil { + return nil + + }); err != nil { t.Fatal(err) } // Wait until the cluster is provisioned - command = []string{"kubectl", "get", "cluster", cluster} - testFn = func(output string) bool { - return strings.Contains(output, "Provisioned") - } - if err = retryableCommandWithConditionFunction(t, command, testFn); err != nil { + if err = retryFor(ctx, retryMaxAttempts, secondsBetweenAttempts*time.Second, func() error { + output, err = execCommand(t, "kubectl", "get", "cluster", cluster) + if err != nil { + return err + } + if strings.Contains(output, "Provisioned") { + return nil + } + return errors.New("cluster not provisioned") + }); err != nil { t.Fatal(err) } } // verifyCluster check if cluster is functional -func verifyCluster(t testing.TB) { +func verifyCluster(ctx context.Context, t testing.TB) { // Wait until all machines are running + t.Log("Verify cluster deployment") machines := 0 - command := []string{"kubectl", "get", "machine", "--no-headers"} - testFn := func(output string) bool { + if err := retryFor(ctx, retryMaxAttempts, secondsBetweenAttempts*time.Second, func() error { + output, err := execCommand(t, "kubectl", "get", "machine", "--no-headers") + if err != nil { + return err + } machines = strings.Count(output, "\n") running := strings.Count(output, "Running") - t.Logf("Machines %d out of which %d are Running", machines, running) + msg := fmt.Sprintf("Machines %d out of which %d are Running", machines, running) + t.Logf(msg) if machines == running { - return true + return nil } - return false - } - if err := retryableCommandWithConditionFunction(t, command, testFn); err != nil { + return errors.New(msg) + }); err != nil { t.Fatal(err) } // Make sure we have as many nodes as machines - command = []string{"kubectl", "--kubeconfig=" + KUBECONFIG, "get", "no", "--no-headers"} - testFn = func(output string) bool { + if err := retryFor(ctx, retryMaxAttempts, secondsBetweenAttempts*time.Second, func() error { + output, err := execCommand(t, "kubectl", "--kubeconfig="+KUBECONFIG, "get", "no", "--no-headers") + if err != nil { + return err + } nodes := strings.Count(output, "\n") ready := strings.Count(output, " Ready") - t.Logf("Machines are %d, Nodes are %d out of which %d are Ready", machines, nodes, ready) + msg := fmt.Sprintf("Machines are %d, Nodes are %d out of which %d are Ready", machines, nodes, ready) + t.Log(msg) if machines == nodes && ready == nodes { - return true + return nil } - return false - } - if err := retryableCommandWithConditionFunction(t, command, testFn); err != nil { + return errors.New(msg) + }); err != nil { t.Fatal(err) } } // deployMicrobot deploys a deployment of microbot. -func deployMicrobot(t testing.TB) { +func deployMicrobot(ctx context.Context, t testing.TB) { t.Log("Deploying microbot") - command := []string{"kubectl", "--kubeconfig=" + KUBECONFIG, "create", "deploy", "--image=cdkbot/microbot:1", "--replicas=30", "bot"} - cmd := exec.Command(command[0], command[1:]...) - outputBytes, err := cmd.CombinedOutput() - if err != nil { - t.Error(string(outputBytes)) + if output, err := execCommand(t, "kubectl", "--kubeconfig="+KUBECONFIG, "create", "deploy", "--image=cdkbot/microbot:1", "--replicas=30", "bot"); err != nil { + t.Error(output) t.Fatalf("Failed to create the requested microbot deployment. %s", err) } // Make sure we have as many nodes as machines t.Log("Waiting for the deployment to complete") - command = []string{"kubectl", "--kubeconfig=" + KUBECONFIG, "wait", "deploy/bot", "--for=jsonpath={.status.readyReplicas}=30"} - if err = retryableCommand(t, command); err != nil { + if err := retryFor(ctx, retryMaxAttempts, secondsBetweenAttempts*time.Second, func() error { + _, err := execCommand(t, "kubectl", "--kubeconfig="+KUBECONFIG, "wait", "deploy/bot", "--for=jsonpath={.status.readyReplicas}=30") + return err + }); err != nil { t.Fatal(err) } + } // validateNoCalico Checks if calico daemon set is not deployed on the cluster. -func validateNoCalico(t testing.TB) { +func validateNoCalico(ctx context.Context, t testing.TB) { t.Log("Validate no Calico daemon set") - t.Log("Checking for calico daemon set") - command := []string{"kubectl", "--kubeconfig=" + KUBECONFIG, "-n", "kube-system", "get", "ds"} - testFn := func(output string) bool { - return !strings.Contains(output, "calico") - } - if err := retryableCommandWithConditionFunction(t, command, testFn); err != nil { + if err := retryFor(ctx, retryMaxAttempts, secondsBetweenAttempts*time.Second, func() error { + output, err := execCommand(t, "kubectl", "--kubeconfig="+KUBECONFIG, "-n", "kube-system", "get", "ds") + if err != nil { + return err + } + if strings.Contains(output, "calico") { + return errors.New("there is calico daemon set") + + } + return nil + }); err != nil { t.Fatal(err) } t.Log("No calico daemon set") @@ -273,6 +291,7 @@ func validateNoCalico(t testing.TB) { // installCilium installs cilium from helm chart func installCilium(t testing.TB) { t.Log("Deploy Cilium") + command := []string{"helm", "install", "cilium", "--repo", "https://helm.cilium.io/", "cilium", "--namespace", "kube-system", "--set", "cni.confPath=/var/snap/microk8s/current/args/cni-network", "--set", "cni.binPath=/var/snap/microk8s/current/opt/cni/bin", @@ -281,7 +300,6 @@ func installCilium(t testing.TB) { "--set", "ipam.operator.clusterPoolIPv4PodCIDRList=10.1.0.0/16", "--set", "nodePort.enabled=true", } - t.Logf("running command: %s", strings.Join(command, " ")) cmd := exec.Command(command[0], command[1:]...) cmd.Env = append(cmd.Env, "KUBECONFIG="+KUBECONFIG) @@ -293,132 +311,98 @@ func installCilium(t testing.TB) { } // validateCilium checks a deployment of cilium daemon set. -func validateCilium(t testing.TB) { +func validateCilium(ctx context.Context, t testing.TB) { t.Log("Validate Cilium") - command := []string{"kubectl", "get", "machine", "--no-headers"} machines := 0 - testFn := func(output string) bool { + if err := retryFor(ctx, retryMaxAttempts, secondsBetweenAttempts*time.Second, func() error { + output, err := execCommand(t, "kubectl", "get", "machine", "--no-headers") + if err != nil { + return err + } machines = strings.Count(output, "\n") - return machines > 0 - } - if err := retryableCommandWithConditionFunction(t, command, testFn); err != nil { + if machines == 0 { + return errors.New("machines to haven't start yet") + } + return err + }); err != nil { t.Fatal(err) } t.Log("Checking Cilium daemon set") - command = []string{"kubectl", "--kubeconfig=" + KUBECONFIG, "-n", "kube-system", "wait", "ds/cilium", fmt.Sprintf("--for=jsonpath={.status.numberAvailable}=%d", machines)} - if err := retryableCommand(t, command); err != nil { + if err := retryFor(ctx, retryMaxAttempts, secondsBetweenAttempts*time.Second, func() error { + _, err := execCommand(t, "kubectl", "--kubeconfig="+KUBECONFIG, "-n", "kube-system", "wait", "ds/cilium", fmt.Sprintf("--for=jsonpath={.status.numberAvailable}=%d", machines)) + return err + }); err != nil { t.Fatal(err) } } -// retryableCommand runs command and retires if command fails. -// Runs provided command, if commands return err retries it retryMaxAttempts and waits secondsBetweenAttempts between next execution. -// If after all attempts error occurs error gets returned. -func retryableCommand(t testing.TB, command []string) error { - var err error - var outputBytes []byte - for attempt := 0; attempt < retryMaxAttempts; attempt++ { - if attempt > 1 { - t.Logf("Retrying") - time.Sleep(secondsBetweenAttempts * time.Second) - } - t.Logf("running command: %s", strings.Join(command, " ")) - cmd := exec.Command(command[0], command[1:]...) - outputBytes, err = cmd.CombinedOutput() - t.Log(string(outputBytes)) - if err == nil { - break - } - } - return err -} - -// retryableCommandWithConditionFunction runs command with retry. -// Runs provided command, if commands return err retries it retryMaxAttempts and waits secondsBetweenAttempts between next execution. -// Additionally, to finish correctly runs function fn func(string) bool, if passed function returns true -// the retryableCommandWithConditionFunction returns nil -func retryableCommandWithConditionFunction(t testing.TB, command []string, fn func(string) bool) error { - var err error - var outputBytes []byte - for attempt := 0; attempt < retryMaxAttempts; attempt++ { - if attempt > 1 { - t.Logf("Retrying") - time.Sleep(secondsBetweenAttempts * time.Second) - } - t.Logf("running command: %s", strings.Join(command, " ")) - cmd := exec.Command(command[0], command[1:]...) - outputBytes, err = cmd.CombinedOutput() - output := string(outputBytes) - t.Log(output) - if err == nil && fn(output) { - break - } - } - return err +// execCommand executes command transforms output bytes to string and reruns error from exec +func execCommand(t testing.TB, command ...string) (string, error) { + t.Logf("running command: %s", strings.Join(command, " ")) + cmd := exec.Command(command[0], command[1:]...) + outputBytes, err := cmd.CombinedOutput() + output := string(outputBytes) + t.Logf(output) + return output, err } // upgradeCluster upgrades the cluster to a new version based on the upgrade strategy. -func upgradeCluster(t testing.TB, upgradeStrategy string) { - +func upgradeCluster(ctx context.Context, t testing.TB, upgradeStrategy string) { version := "v1.28.0" - controlPlaneName := "test-ci-cluster-control-plane" - controlPlaneType := "microk8scontrolplanes.controlplane.cluster.x-k8s.io" - workerDeploymentName := "test-ci-cluster-md-0" - workerDeploymentType := "machinedeployments.cluster.x-k8s.io" - t.Logf("Upgrading cluster to %s via %s", version, upgradeStrategy) + // Patch control plane machine upgrades based on type of upgrade strategy. - outputBytes, err := controlPlanePatch(controlPlaneName, controlPlaneType, version, upgradeStrategy) - if err != nil { - t.Error(string(outputBytes)) + if _, err := execCommand(t, "kubectl", "patch", "--type=merge", + "microk8scontrolplanes.controlplane.cluster.x-k8s.io", "test-ci-cluster-control-plane", "--patch", + fmt.Sprintf(`{"spec":{"version":"%s","upgradeStrategy":"%s"}}`, version, upgradeStrategy)); err != nil { t.Fatalf("Failed to merge the patch to control plane. %s", err) } // Patch worker machine upgrades. - outputBytes, err = workerPatch(workerDeploymentName, workerDeploymentType, version) - if err != nil { - t.Error(string(outputBytes)) + if _, err := execCommand(t, "kubectl", "patch", "--type=merge", + "machinedeployments.cluster.x-k8s.io", "test-ci-cluster-md-0", "--patch", + fmt.Sprintf(`{"spec":{"template":{"spec":{"version":"%s"}}}}`, version)); err != nil { t.Fatalf("Failed to merge the patch to the machine deployments. %s", err) } time.Sleep(30 * time.Second) // Now all the machines should be upgraded to the new version. - - command := []string{"kubectl", "get", "machine", "--no-headers"} - testFn := func(output string) bool { + if err := retryFor(ctx, retryMaxAttempts, secondsBetweenAttempts*time.Second, func() error { + output, err := execCommand(t, "kubectl", "get", "machine", "--no-headers") + if err != nil { + return err + } totalMachines := strings.Count(output, "Running") - - // We count all the "Running" machines with the new version. re := regexp.MustCompile("Running .* " + version) upgradedMachines := len(re.FindAllString(output, -1)) - t.Logf("Total machines %d out of which %d are upgraded", totalMachines, upgradedMachines) + msg := fmt.Sprintf("Total machines %d out of which %d are upgraded", totalMachines, upgradedMachines) + t.Logf(msg) if totalMachines == upgradedMachines { - return true + return nil } - return false - } - if err = retryableCommandWithConditionFunction(t, command, testFn); err != nil { + return errors.New(msg) + }); err != nil { t.Fatal(err) } } -// controlPlanePatch patches the control plane machines based on the upgrade strategy and version. -func controlPlanePatch(controlPlaneName, controlPlaneType, version, upgradeStrategy string) ([]byte, error) { - command := []string{"kubectl", "patch", "--type=merge", controlPlaneType, controlPlaneName, "--patch", - fmt.Sprintf(`{"spec":{"version":"%s","upgradeStrategy":"%s"}}`, version, upgradeStrategy)} - cmd := exec.Command(command[0], command[1:]...) - - return cmd.CombinedOutput() -} - -// workerPatch patches a given worker machines with the given version. -func workerPatch(workerDeploymentName, workerDeploymentType, version string) ([]byte, error) { - command := []string{"kubectl", "patch", "--type=merge", workerDeploymentType, workerDeploymentName, "--patch", - fmt.Sprintf(`{"spec":{"template":{"spec":{"version":"%s"}}}}`, version)} - cmd := exec.Command(command[0], command[1:]...) - - return cmd.CombinedOutput() +// retryFor will retry a given function for the given amount of times. +// retryFor will wait for backoff between retries. +func retryFor(ctx context.Context, retryCount int, delayBetweenRetry time.Duration, retryFunc func() error) error { + var err error = nil + for i := 0; i < retryCount; i++ { + if err = retryFunc(); err != nil { + select { + case <-ctx.Done(): + return context.Canceled + case <-time.After(delayBetweenRetry): + continue + } + } + break + } + return err }