From 6330f277c7fe534194d110276156b0f1ca125dba Mon Sep 17 00:00:00 2001 From: Abhishek Dasgupta Date: Thu, 23 Nov 2023 15:35:26 +0530 Subject: [PATCH] Adds e2e test for compaction (#723) * Added e2e tests for compaction. * Fetches options for ETCD Druid through skaffold. * Fixes compaction for distroless image of etcd backup-restore. * Address Sesha's second review. --- .../templates/controller-deployment.yaml | 10 + charts/druid/values.yaml | 3 + controllers/compaction/reconciler.go | 20 +- skaffold.yaml | 10 +- test/e2e/etcd_backup_test.go | 9 +- test/e2e/etcd_compaction_test.go | 190 ++++++++++++++++++ test/e2e/suite_test.go | 4 +- 7 files changed, 231 insertions(+), 15 deletions(-) create mode 100644 test/e2e/etcd_compaction_test.go diff --git a/charts/druid/templates/controller-deployment.yaml b/charts/druid/templates/controller-deployment.yaml index 1ff871d79..dd33b0d11 100644 --- a/charts/druid/templates/controller-deployment.yaml +++ b/charts/druid/templates/controller-deployment.yaml @@ -26,6 +26,16 @@ spec: - --ignore-operation-annotation={{ .Values.ignoreOperationAnnotation }} - --workers=3 - --custodian-sync-period=15s + {{- if .Values.enableBackupCompaction }} + - --enable-backup-compaction={{ .Values.enableBackupCompaction }} + {{- end }} + {{- if .Values.eventsThreshold }} + - --etcd-events-threshold={{ .Values.eventsThreshold }} + {{- end }} + {{- if .Values.metricsScrapeWaitDuration }} + - --metrics-scrape-wait-duration={{ .Values.metricsScrapeWaitDuration }} + {{- end }} + {{- if .Values.featureGates }} {{- $featuregates := "" }} {{- range $feature, $value := $.Values.featureGates }} diff --git a/charts/druid/values.yaml b/charts/druid/values.yaml index a5cfebf88..22d4321b3 100644 --- a/charts/druid/values.yaml +++ b/charts/druid/values.yaml @@ -6,5 +6,8 @@ image: imagePullPolicy: IfNotPresent replicas: 1 ignoreOperationAnnotation: false +# enableBackupCompaction: true +# eventsThreshold: 15 +# metricsScrapeWaitDuration: "10s" # featureGates: # UseEtcdWrapper: true diff --git a/controllers/compaction/reconciler.go b/controllers/compaction/reconciler.go index 966a179c3..09ba68024 100644 --- a/controllers/compaction/reconciler.go +++ b/controllers/compaction/reconciler.go @@ -35,6 +35,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/component-base/featuregate" "k8s.io/utils/pointer" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -309,7 +310,7 @@ func (r *Reconciler) createCompactionJob(ctx context.Context, logger logr.Logger }, } - if vms, err := getCompactionJobVolumeMounts(etcd); err != nil { + if vms, err := getCompactionJobVolumeMounts(etcd, r.config.FeatureGates); err != nil { return nil, fmt.Errorf("error while creating compaction job in %v for %v : %v", etcd.Namespace, etcd.Name, @@ -360,7 +361,7 @@ func getLabels(etcd *druidv1alpha1.Etcd) map[string]string { "networking.gardener.cloud/to-public-networks": "allowed", } } -func getCompactionJobVolumeMounts(etcd *druidv1alpha1.Etcd) ([]v1.VolumeMount, error) { +func getCompactionJobVolumeMounts(etcd *druidv1alpha1.Etcd, featureMap map[featuregate.Feature]bool) ([]v1.VolumeMount, error) { vms := []v1.VolumeMount{ { Name: "etcd-workspace-dir", @@ -374,10 +375,17 @@ func getCompactionJobVolumeMounts(etcd *druidv1alpha1.Etcd) ([]v1.VolumeMount, e } switch provider { case utils.Local: - vms = append(vms, v1.VolumeMount{ - Name: "host-storage", - MountPath: pointer.StringDeref(etcd.Spec.Backup.Store.Container, ""), - }) + if featureMap[features.UseEtcdWrapper] { + vms = append(vms, v1.VolumeMount{ + Name: "host-storage", + MountPath: "/home/nonroot/" + pointer.StringDeref(etcd.Spec.Backup.Store.Container, ""), + }) + } else { + vms = append(vms, v1.VolumeMount{ + Name: "host-storage", + MountPath: pointer.StringDeref(etcd.Spec.Backup.Store.Container, ""), + }) + } case utils.GCS: vms = append(vms, v1.VolumeMount{ Name: "etcd-backup", diff --git a/skaffold.yaml b/skaffold.yaml index 5dbcb0620..5d3238ba2 100644 --- a/skaffold.yaml +++ b/skaffold.yaml @@ -21,18 +21,20 @@ deploy: image: eu.gcr.io/gardener-project/gardener/etcd-druid imageStrategy: helm: {} - # Dependency builds create new dep archives and thus circumvent Docker's build cache at the next run. skipBuildDependencies: true + setValues: + enableBackupCompaction: "true" + eventsThreshold: 15 + metricsScrapeWaitDuration: "30s" profiles: - name: use-feature-gates activation: - env: "USE_ETCD_DRUID_FEATURE_GATES=true" patches: - op: add - path: /deploy/helm/releases/0/setValues + path: /deploy/helm/releases/0/setValues/featureGates value: - featureGates: - UseEtcdWrapper: true + UseEtcdWrapper: true --- apiVersion: skaffold/v2beta25 kind: Config diff --git a/test/e2e/etcd_backup_test.go b/test/e2e/etcd_backup_test.go index 1eceab9f0..0de133eb0 100644 --- a/test/e2e/etcd_backup_test.go +++ b/test/e2e/etcd_backup_test.go @@ -108,12 +108,15 @@ var _ = Describe("Etcd Backup", func() { Expect(err).ShouldNot(HaveOccurred()) By("Check snapshot after putting data into etcd") - // allow 5 second buffer to upload full/delta snapshot - time.Sleep(time.Second * 5) - latestSnapshotsAfterPopulate, err := getLatestSnapshots(kubeconfigPath, namespace, etcdName, debugPod.Name, debugPod.Spec.Containers[0].Name, 8080) Expect(err).ShouldNot(HaveOccurred()) + Eventually(func() int { + latestSnapshotsAfterPopulate, err = getLatestSnapshots(kubeconfigPath, namespace, etcdName, debugPod.Name, debugPod.Spec.Containers[0].Name, 8080) + Expect(err).NotTo(HaveOccurred()) + return len(latestSnapshotsAfterPopulate.DeltaSnapshots) + }, singleNodeEtcdTimeout, pollingInterval).Should(BeNumerically(">", len(latestSnapshotsBeforePopulate.DeltaSnapshots))) + latestSnapshotAfterPopulate := latestSnapshotsAfterPopulate.FullSnapshot if numDeltas := len(latestSnapshotsAfterPopulate.DeltaSnapshots); numDeltas > 0 { latestSnapshotAfterPopulate = latestSnapshotsAfterPopulate.DeltaSnapshots[numDeltas-1] diff --git a/test/e2e/etcd_compaction_test.go b/test/e2e/etcd_compaction_test.go new file mode 100644 index 000000000..e69211252 --- /dev/null +++ b/test/e2e/etcd_compaction_test.go @@ -0,0 +1,190 @@ +// Copyright (c) 2023 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "context" + "fmt" + "time" + + brtypes "github.com/gardener/etcd-backup-restore/pkg/types" + batchv1 "k8s.io/api/batch/v1" + "k8s.io/apimachinery/pkg/types" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +var _ = Describe("Etcd Compaction", func() { + var ( + parentCtx context.Context + ) + + BeforeEach(func() { + parentCtx = context.Background() + }) + + Context("when compaction is enabled for single-node etcd", func() { + providers, err := getProviders() + Expect(err).ToNot(HaveOccurred()) + + var ( + cl client.Client + etcdName string + storageContainer string + ) + + for _, p := range providers { + provider := p + Context(fmt.Sprintf("with provider %s", provider.Name), func() { + BeforeEach(func() { + cl, err = getKubernetesClient(kubeconfigPath) + Expect(err).ShouldNot(HaveOccurred()) + + etcdName = fmt.Sprintf("etcd-%s", provider.Name) + + storageContainer = getEnvAndExpectNoError(envStorageContainer) + + snapstoreProvider := provider.Storage.Provider + store, err := getSnapstore(string(snapstoreProvider), storageContainer, storePrefix) + Expect(err).ShouldNot(HaveOccurred()) + + // purge any existing backups in bucket + Expect(purgeSnapstore(store)).To(Succeed()) + + Expect(deployBackupSecret(parentCtx, cl, logger, provider, etcdNamespace, storageContainer)) + }) + + It("should test compaction on backup", func() { + ctx, cancelFunc := context.WithTimeout(parentCtx, 10*time.Minute) + defer cancelFunc() + + etcd := getDefaultEtcd(etcdName, namespace, storageContainer, storePrefix, provider) + objLogger := logger.WithValues("etcd", client.ObjectKeyFromObject(etcd)) + + By("Create etcd") + createAndCheckEtcd(ctx, cl, objLogger, etcd, singleNodeEtcdTimeout) + + By("Create debug pod") + debugPod := createDebugPod(ctx, etcd) + + By("Check initial snapshot is available") + + latestSnapshotsBeforePopulate, err := getLatestSnapshots(kubeconfigPath, namespace, etcdName, debugPod.Name, debugPod.Spec.Containers[0].Name, 8080) + Expect(err).ShouldNot(HaveOccurred()) + // We don't expect any delta snapshot as the cluster + Expect(latestSnapshotsBeforePopulate.DeltaSnapshots).To(HaveLen(0)) + + latestSnapshotBeforePopulate := latestSnapshotsBeforePopulate.FullSnapshot + Expect(latestSnapshotBeforePopulate).To(Not(BeNil())) + + By("Put keys into etcd") + logger.Info("populating etcd with sequential key-value pairs", + "fromKey", fmt.Sprintf("%s-1", etcdKeyPrefix), "fromValue", fmt.Sprintf("%s-1", etcdValuePrefix), + "toKey", fmt.Sprintf("%s-10", etcdKeyPrefix), "toValue", fmt.Sprintf("%s-10", etcdValuePrefix)) + + // populate 10 keys in etcd, finishing in 10 seconds + err = populateEtcd(logger, kubeconfigPath, namespace, etcdName, debugPod.Name, debugPod.Spec.Containers[0].Name, etcdKeyPrefix, etcdValuePrefix, 1, 10, time.Second*1) + Expect(err).ShouldNot(HaveOccurred()) + + By("Check snapshot after putting data into etcd") + + latestSnapshotsAfterPopulate, err := getLatestSnapshots(kubeconfigPath, namespace, etcdName, debugPod.Name, debugPod.Spec.Containers[0].Name, 8080) + Expect(err).ShouldNot(HaveOccurred()) + + Eventually(func() int { + latestSnapshotsAfterPopulate, err = getLatestSnapshots(kubeconfigPath, namespace, etcdName, debugPod.Name, debugPod.Spec.Containers[0].Name, 8080) + Expect(err).NotTo(HaveOccurred()) + return len(latestSnapshotsAfterPopulate.DeltaSnapshots) + }, singleNodeEtcdTimeout, pollingInterval).Should(BeNumerically(">", len(latestSnapshotsBeforePopulate.DeltaSnapshots))) + + latestSnapshotAfterPopulate := latestSnapshotsAfterPopulate.FullSnapshot + if numDeltas := len(latestSnapshotsAfterPopulate.DeltaSnapshots); numDeltas > 0 { + latestSnapshotAfterPopulate = latestSnapshotsAfterPopulate.DeltaSnapshots[numDeltas-1] + } + + Expect(latestSnapshotsAfterPopulate).To(Not(BeNil())) + Expect(latestSnapshotAfterPopulate.CreatedOn.After(latestSnapshotBeforePopulate.CreatedOn)).To(BeTrue()) + + By("Put additional data into etcd") + logger.Info("populating etcd with sequential key-value pairs", + "fromKey", fmt.Sprintf("%s-11", etcdKeyPrefix), "fromValue", fmt.Sprintf("%s-11", etcdValuePrefix), + "toKey", fmt.Sprintf("%s-15", etcdKeyPrefix), "toValue", fmt.Sprintf("%s-15", etcdValuePrefix)) + // populate 5 keys in etcd, finishing in 5 seconds + err = populateEtcd(logger, kubeconfigPath, namespace, etcdName, debugPod.Name, debugPod.Spec.Containers[0].Name, etcdKeyPrefix, etcdValuePrefix, 11, 15, time.Second*1) + Expect(err).ShouldNot(HaveOccurred()) + + By("Trigger on-demand delta snapshot") + _, err = triggerOnDemandSnapshot(kubeconfigPath, namespace, etcdName, debugPod.Name, debugPod.Spec.Containers[0].Name, 8080, brtypes.SnapshotKindDelta) + Expect(err).ShouldNot(HaveOccurred()) + + logger.Info("waiting for compaction job to become successful") + Eventually(func() error { + ctx, cancelFunc := context.WithTimeout(context.Background(), singleNodeEtcdTimeout) + defer cancelFunc() + + req := types.NamespacedName{ + Name: etcd.GetCompactionJobName(), + Namespace: etcd.Namespace, + } + + j := &batchv1.Job{} + if err := cl.Get(ctx, req, j); err != nil { + return err + } + + if j.Status.Succeeded < 1 { + return fmt.Errorf("compaction job started but not yet successful") + } + + return nil + }, singleNodeEtcdTimeout, pollingInterval).Should(BeNil()) + logger.Info("compaction job is successful") + + By("Verify that all the delta snapshots are compacted to full snapshots by compaction triggerred at first 15th revision") + latestSnapshotsAfterPopulate, err = getLatestSnapshots(kubeconfigPath, namespace, etcdName, debugPod.Name, debugPod.Spec.Containers[0].Name, 8080) + Expect(err).ShouldNot(HaveOccurred()) + + Expect(len(latestSnapshotsAfterPopulate.DeltaSnapshots)).Should(BeNumerically("==", 0)) + + By("Put additional data into etcd") + logger.Info("populating etcd with sequential key-value pairs", + "fromKey", fmt.Sprintf("%s-16", etcdKeyPrefix), "fromValue", fmt.Sprintf("%s-16", etcdValuePrefix), + "toKey", fmt.Sprintf("%s-20", etcdKeyPrefix), "toValue", fmt.Sprintf("%s-20", etcdValuePrefix)) + // populate 5 keys in etcd, finishing in 5 seconds + err = populateEtcd(logger, kubeconfigPath, namespace, etcdName, debugPod.Name, debugPod.Spec.Containers[0].Name, etcdKeyPrefix, etcdValuePrefix, 16, 20, time.Second*1) + Expect(err).ShouldNot(HaveOccurred()) + + By("Trigger on-demand delta snapshot") + _, err = triggerOnDemandSnapshot(kubeconfigPath, namespace, etcdName, debugPod.Name, debugPod.Spec.Containers[0].Name, 8080, brtypes.SnapshotKindDelta) + Expect(err).ShouldNot(HaveOccurred()) + + By("Verify that there are new delta snapshots as compaction is not triggered yet because delta events have not reached next 15 revision") + latestSnapshotsAfterPopulate, err = getLatestSnapshots(kubeconfigPath, namespace, etcdName, debugPod.Name, debugPod.Spec.Containers[0].Name, 8080) + Expect(err).ShouldNot(HaveOccurred()) + + Expect(len(latestSnapshotsAfterPopulate.DeltaSnapshots)).Should(BeNumerically(">", 0)) + + By("Delete debug pod") + Expect(cl.Delete(ctx, debugPod)).ToNot(HaveOccurred()) + + By("Delete etcd") + deleteAndCheckEtcd(ctx, cl, objLogger, etcd, singleNodeEtcdTimeout) + }) + }) + } + }) +}) diff --git a/test/e2e/suite_test.go b/test/e2e/suite_test.go index 45ec8263c..31ac58f24 100644 --- a/test/e2e/suite_test.go +++ b/test/e2e/suite_test.go @@ -21,8 +21,8 @@ import ( "time" "github.com/gardener/etcd-druid/api/v1alpha1" - "github.com/gardener/gardener/pkg/utils/test/matchers" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" @@ -35,7 +35,7 @@ import ( ) const ( - singleNodeEtcdTimeout = time.Minute + singleNodeEtcdTimeout = time.Minute * 3 multiNodeEtcdTimeout = time.Minute * 5 pollingInterval = time.Second * 2