Skip to content

Commit

Permalink
Adds e2e test for compaction (#723)
Browse files Browse the repository at this point in the history
* Added e2e tests for compaction.

* Fetches options for ETCD Druid through skaffold.

* Fixes compaction for distroless image of etcd backup-restore.

* Address Sesha's second review.
  • Loading branch information
abdasgupta authored Nov 23, 2023
1 parent 07c38a9 commit 6330f27
Show file tree
Hide file tree
Showing 7 changed files with 231 additions and 15 deletions.
10 changes: 10 additions & 0 deletions charts/druid/templates/controller-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ spec:
- --ignore-operation-annotation={{ .Values.ignoreOperationAnnotation }}
- --workers=3
- --custodian-sync-period=15s
{{- if .Values.enableBackupCompaction }}
- --enable-backup-compaction={{ .Values.enableBackupCompaction }}
{{- end }}
{{- if .Values.eventsThreshold }}
- --etcd-events-threshold={{ .Values.eventsThreshold }}
{{- end }}
{{- if .Values.metricsScrapeWaitDuration }}
- --metrics-scrape-wait-duration={{ .Values.metricsScrapeWaitDuration }}
{{- end }}

{{- if .Values.featureGates }}
{{- $featuregates := "" }}
{{- range $feature, $value := $.Values.featureGates }}
Expand Down
3 changes: 3 additions & 0 deletions charts/druid/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,8 @@ image:
imagePullPolicy: IfNotPresent
replicas: 1
ignoreOperationAnnotation: false
# enableBackupCompaction: true
# eventsThreshold: 15
# metricsScrapeWaitDuration: "10s"
# featureGates:
# UseEtcdWrapper: true
20 changes: 14 additions & 6 deletions controllers/compaction/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/component-base/featuregate"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -309,7 +310,7 @@ func (r *Reconciler) createCompactionJob(ctx context.Context, logger logr.Logger
},
}

if vms, err := getCompactionJobVolumeMounts(etcd); err != nil {
if vms, err := getCompactionJobVolumeMounts(etcd, r.config.FeatureGates); err != nil {
return nil, fmt.Errorf("error while creating compaction job in %v for %v : %v",
etcd.Namespace,
etcd.Name,
Expand Down Expand Up @@ -360,7 +361,7 @@ func getLabels(etcd *druidv1alpha1.Etcd) map[string]string {
"networking.gardener.cloud/to-public-networks": "allowed",
}
}
func getCompactionJobVolumeMounts(etcd *druidv1alpha1.Etcd) ([]v1.VolumeMount, error) {
func getCompactionJobVolumeMounts(etcd *druidv1alpha1.Etcd, featureMap map[featuregate.Feature]bool) ([]v1.VolumeMount, error) {
vms := []v1.VolumeMount{
{
Name: "etcd-workspace-dir",
Expand All @@ -374,10 +375,17 @@ func getCompactionJobVolumeMounts(etcd *druidv1alpha1.Etcd) ([]v1.VolumeMount, e
}
switch provider {
case utils.Local:
vms = append(vms, v1.VolumeMount{
Name: "host-storage",
MountPath: pointer.StringDeref(etcd.Spec.Backup.Store.Container, ""),
})
if featureMap[features.UseEtcdWrapper] {
vms = append(vms, v1.VolumeMount{
Name: "host-storage",
MountPath: "/home/nonroot/" + pointer.StringDeref(etcd.Spec.Backup.Store.Container, ""),
})
} else {
vms = append(vms, v1.VolumeMount{
Name: "host-storage",
MountPath: pointer.StringDeref(etcd.Spec.Backup.Store.Container, ""),
})
}
case utils.GCS:
vms = append(vms, v1.VolumeMount{
Name: "etcd-backup",
Expand Down
10 changes: 6 additions & 4 deletions skaffold.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,20 @@ deploy:
image: eu.gcr.io/gardener-project/gardener/etcd-druid
imageStrategy:
helm: {}
# Dependency builds create new dep archives and thus circumvent Docker's build cache at the next run.
skipBuildDependencies: true
setValues:
enableBackupCompaction: "true"
eventsThreshold: 15
metricsScrapeWaitDuration: "30s"
profiles:
- name: use-feature-gates
activation:
- env: "USE_ETCD_DRUID_FEATURE_GATES=true"
patches:
- op: add
path: /deploy/helm/releases/0/setValues
path: /deploy/helm/releases/0/setValues/featureGates
value:
featureGates:
UseEtcdWrapper: true
UseEtcdWrapper: true
---
apiVersion: skaffold/v2beta25
kind: Config
Expand Down
9 changes: 6 additions & 3 deletions test/e2e/etcd_backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,15 @@ var _ = Describe("Etcd Backup", func() {
Expect(err).ShouldNot(HaveOccurred())

By("Check snapshot after putting data into etcd")
// allow 5 second buffer to upload full/delta snapshot
time.Sleep(time.Second * 5)

latestSnapshotsAfterPopulate, err := getLatestSnapshots(kubeconfigPath, namespace, etcdName, debugPod.Name, debugPod.Spec.Containers[0].Name, 8080)
Expect(err).ShouldNot(HaveOccurred())

Eventually(func() int {
latestSnapshotsAfterPopulate, err = getLatestSnapshots(kubeconfigPath, namespace, etcdName, debugPod.Name, debugPod.Spec.Containers[0].Name, 8080)
Expect(err).NotTo(HaveOccurred())
return len(latestSnapshotsAfterPopulate.DeltaSnapshots)
}, singleNodeEtcdTimeout, pollingInterval).Should(BeNumerically(">", len(latestSnapshotsBeforePopulate.DeltaSnapshots)))

latestSnapshotAfterPopulate := latestSnapshotsAfterPopulate.FullSnapshot
if numDeltas := len(latestSnapshotsAfterPopulate.DeltaSnapshots); numDeltas > 0 {
latestSnapshotAfterPopulate = latestSnapshotsAfterPopulate.DeltaSnapshots[numDeltas-1]
Expand Down
190 changes: 190 additions & 0 deletions test/e2e/etcd_compaction_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
// Copyright (c) 2023 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package e2e

import (
"context"
"fmt"
"time"

brtypes "github.com/gardener/etcd-backup-restore/pkg/types"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/apimachinery/pkg/types"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"sigs.k8s.io/controller-runtime/pkg/client"
)

var _ = Describe("Etcd Compaction", func() {
var (
parentCtx context.Context
)

BeforeEach(func() {
parentCtx = context.Background()
})

Context("when compaction is enabled for single-node etcd", func() {
providers, err := getProviders()
Expect(err).ToNot(HaveOccurred())

var (
cl client.Client
etcdName string
storageContainer string
)

for _, p := range providers {
provider := p
Context(fmt.Sprintf("with provider %s", provider.Name), func() {
BeforeEach(func() {
cl, err = getKubernetesClient(kubeconfigPath)
Expect(err).ShouldNot(HaveOccurred())

etcdName = fmt.Sprintf("etcd-%s", provider.Name)

storageContainer = getEnvAndExpectNoError(envStorageContainer)

snapstoreProvider := provider.Storage.Provider
store, err := getSnapstore(string(snapstoreProvider), storageContainer, storePrefix)
Expect(err).ShouldNot(HaveOccurred())

// purge any existing backups in bucket
Expect(purgeSnapstore(store)).To(Succeed())

Expect(deployBackupSecret(parentCtx, cl, logger, provider, etcdNamespace, storageContainer))
})

It("should test compaction on backup", func() {
ctx, cancelFunc := context.WithTimeout(parentCtx, 10*time.Minute)
defer cancelFunc()

etcd := getDefaultEtcd(etcdName, namespace, storageContainer, storePrefix, provider)
objLogger := logger.WithValues("etcd", client.ObjectKeyFromObject(etcd))

By("Create etcd")
createAndCheckEtcd(ctx, cl, objLogger, etcd, singleNodeEtcdTimeout)

By("Create debug pod")
debugPod := createDebugPod(ctx, etcd)

By("Check initial snapshot is available")

latestSnapshotsBeforePopulate, err := getLatestSnapshots(kubeconfigPath, namespace, etcdName, debugPod.Name, debugPod.Spec.Containers[0].Name, 8080)
Expect(err).ShouldNot(HaveOccurred())
// We don't expect any delta snapshot as the cluster
Expect(latestSnapshotsBeforePopulate.DeltaSnapshots).To(HaveLen(0))

latestSnapshotBeforePopulate := latestSnapshotsBeforePopulate.FullSnapshot
Expect(latestSnapshotBeforePopulate).To(Not(BeNil()))

By("Put keys into etcd")
logger.Info("populating etcd with sequential key-value pairs",
"fromKey", fmt.Sprintf("%s-1", etcdKeyPrefix), "fromValue", fmt.Sprintf("%s-1", etcdValuePrefix),
"toKey", fmt.Sprintf("%s-10", etcdKeyPrefix), "toValue", fmt.Sprintf("%s-10", etcdValuePrefix))

// populate 10 keys in etcd, finishing in 10 seconds
err = populateEtcd(logger, kubeconfigPath, namespace, etcdName, debugPod.Name, debugPod.Spec.Containers[0].Name, etcdKeyPrefix, etcdValuePrefix, 1, 10, time.Second*1)
Expect(err).ShouldNot(HaveOccurred())

By("Check snapshot after putting data into etcd")

latestSnapshotsAfterPopulate, err := getLatestSnapshots(kubeconfigPath, namespace, etcdName, debugPod.Name, debugPod.Spec.Containers[0].Name, 8080)
Expect(err).ShouldNot(HaveOccurred())

Eventually(func() int {
latestSnapshotsAfterPopulate, err = getLatestSnapshots(kubeconfigPath, namespace, etcdName, debugPod.Name, debugPod.Spec.Containers[0].Name, 8080)
Expect(err).NotTo(HaveOccurred())
return len(latestSnapshotsAfterPopulate.DeltaSnapshots)
}, singleNodeEtcdTimeout, pollingInterval).Should(BeNumerically(">", len(latestSnapshotsBeforePopulate.DeltaSnapshots)))

latestSnapshotAfterPopulate := latestSnapshotsAfterPopulate.FullSnapshot
if numDeltas := len(latestSnapshotsAfterPopulate.DeltaSnapshots); numDeltas > 0 {
latestSnapshotAfterPopulate = latestSnapshotsAfterPopulate.DeltaSnapshots[numDeltas-1]
}

Expect(latestSnapshotsAfterPopulate).To(Not(BeNil()))
Expect(latestSnapshotAfterPopulate.CreatedOn.After(latestSnapshotBeforePopulate.CreatedOn)).To(BeTrue())

By("Put additional data into etcd")
logger.Info("populating etcd with sequential key-value pairs",
"fromKey", fmt.Sprintf("%s-11", etcdKeyPrefix), "fromValue", fmt.Sprintf("%s-11", etcdValuePrefix),
"toKey", fmt.Sprintf("%s-15", etcdKeyPrefix), "toValue", fmt.Sprintf("%s-15", etcdValuePrefix))
// populate 5 keys in etcd, finishing in 5 seconds
err = populateEtcd(logger, kubeconfigPath, namespace, etcdName, debugPod.Name, debugPod.Spec.Containers[0].Name, etcdKeyPrefix, etcdValuePrefix, 11, 15, time.Second*1)
Expect(err).ShouldNot(HaveOccurred())

By("Trigger on-demand delta snapshot")
_, err = triggerOnDemandSnapshot(kubeconfigPath, namespace, etcdName, debugPod.Name, debugPod.Spec.Containers[0].Name, 8080, brtypes.SnapshotKindDelta)
Expect(err).ShouldNot(HaveOccurred())

logger.Info("waiting for compaction job to become successful")
Eventually(func() error {
ctx, cancelFunc := context.WithTimeout(context.Background(), singleNodeEtcdTimeout)
defer cancelFunc()

req := types.NamespacedName{
Name: etcd.GetCompactionJobName(),
Namespace: etcd.Namespace,
}

j := &batchv1.Job{}
if err := cl.Get(ctx, req, j); err != nil {
return err
}

if j.Status.Succeeded < 1 {
return fmt.Errorf("compaction job started but not yet successful")
}

return nil
}, singleNodeEtcdTimeout, pollingInterval).Should(BeNil())
logger.Info("compaction job is successful")

By("Verify that all the delta snapshots are compacted to full snapshots by compaction triggerred at first 15th revision")
latestSnapshotsAfterPopulate, err = getLatestSnapshots(kubeconfigPath, namespace, etcdName, debugPod.Name, debugPod.Spec.Containers[0].Name, 8080)
Expect(err).ShouldNot(HaveOccurred())

Expect(len(latestSnapshotsAfterPopulate.DeltaSnapshots)).Should(BeNumerically("==", 0))

By("Put additional data into etcd")
logger.Info("populating etcd with sequential key-value pairs",
"fromKey", fmt.Sprintf("%s-16", etcdKeyPrefix), "fromValue", fmt.Sprintf("%s-16", etcdValuePrefix),
"toKey", fmt.Sprintf("%s-20", etcdKeyPrefix), "toValue", fmt.Sprintf("%s-20", etcdValuePrefix))
// populate 5 keys in etcd, finishing in 5 seconds
err = populateEtcd(logger, kubeconfigPath, namespace, etcdName, debugPod.Name, debugPod.Spec.Containers[0].Name, etcdKeyPrefix, etcdValuePrefix, 16, 20, time.Second*1)
Expect(err).ShouldNot(HaveOccurred())

By("Trigger on-demand delta snapshot")
_, err = triggerOnDemandSnapshot(kubeconfigPath, namespace, etcdName, debugPod.Name, debugPod.Spec.Containers[0].Name, 8080, brtypes.SnapshotKindDelta)
Expect(err).ShouldNot(HaveOccurred())

By("Verify that there are new delta snapshots as compaction is not triggered yet because delta events have not reached next 15 revision")
latestSnapshotsAfterPopulate, err = getLatestSnapshots(kubeconfigPath, namespace, etcdName, debugPod.Name, debugPod.Spec.Containers[0].Name, 8080)
Expect(err).ShouldNot(HaveOccurred())

Expect(len(latestSnapshotsAfterPopulate.DeltaSnapshots)).Should(BeNumerically(">", 0))

By("Delete debug pod")
Expect(cl.Delete(ctx, debugPod)).ToNot(HaveOccurred())

By("Delete etcd")
deleteAndCheckEtcd(ctx, cl, objLogger, etcd, singleNodeEtcdTimeout)
})
})
}
})
})
4 changes: 2 additions & 2 deletions test/e2e/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"time"

"github.com/gardener/etcd-druid/api/v1alpha1"

"github.com/gardener/gardener/pkg/utils/test/matchers"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
Expand All @@ -35,7 +35,7 @@ import (
)

const (
singleNodeEtcdTimeout = time.Minute
singleNodeEtcdTimeout = time.Minute * 3
multiNodeEtcdTimeout = time.Minute * 5

pollingInterval = time.Second * 2
Expand Down

0 comments on commit 6330f27

Please sign in to comment.