Skip to content

Commit

Permalink
Merge pull request #258 from timuthy/enhancements.etcd-druid-v070
Browse files Browse the repository at this point in the history
Several improvements for compaction feature
  • Loading branch information
timuthy authored Nov 16, 2021
2 parents 1c2d8ab + b3f7428 commit 153fe29
Show file tree
Hide file tree
Showing 11 changed files with 174 additions and 146 deletions.
5 changes: 5 additions & 0 deletions charts/etcd/templates/etcd-statefulset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ spec:
command:
- etcdbrctl
- server
{{- if and .Values.backup.fullSnapLeaseName .Values.backup.deltaSnapLeaseName }}
- --enable-snapshot-lease-renewal=true
- --delta-snapshot-lease-name={{ .Values.backup.deltaSnapLeaseName }}
- --full-snapshot-lease-name={{ .Values.backup.fullSnapLeaseName }}
{{- end }}
{{- if .Values.etcd.defragmentationSchedule }}
- --defragmentation-schedule={{ .Values.etcd.defragmentationSchedule }}
{{- end }}
Expand Down
102 changes: 52 additions & 50 deletions controllers/compaction_lease_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ import (
kutil "github.com/gardener/gardener/pkg/utils/kubernetes"
)

const DefaultETCDQuota = 8 * 1024 * 1024 * 1024 // 8Gi
const (
// DefaultETCDQuota is the default etcd quota.
DefaultETCDQuota = 8 * 1024 * 1024 * 1024 // 8Gi
)

// CompactionLeaseController reconciles compaction job
type CompactionLeaseController struct {
client.Client
Expand Down Expand Up @@ -104,68 +108,61 @@ func (lc *CompactionLeaseController) Reconcile(ctx context.Context, req ctrl.Req
return lc.delete(ctx, lc.logger, etcd)
}

logger := lc.logger.WithValues("etcd", kutil.Key(etcd.Namespace, etcd.Name).String())

// Get delta snapshot lease to check the HolderIdentity value to take decision on compaction job
nsName := types.NamespacedName{
Name: getFullSnapshotLeaseName(etcd),
Namespace: etcd.Namespace,
if etcd.Spec.Backup.Store == nil {
return ctrl.Result{}, nil
}

logger := lc.logger.WithValues("etcd", kutil.Key(etcd.Namespace, etcd.Name).String())

// Get full and delta snapshot lease to check the HolderIdentity value to take decision on compaction job
fullLease := &coordinationv1.Lease{}
err := lc.Get(ctx, nsName, fullLease)
if err != nil {
if err := lc.Get(ctx, kutil.Key(etcd.Namespace, getFullSnapshotLeaseName(etcd)), fullLease); err != nil {
logger.Info("Couldn't fetch full snap lease because: " + err.Error())

return ctrl.Result{
RequeueAfter: 10 * time.Second,
}, err
}

nsName = types.NamespacedName{
Name: getDeltaSnapshotLeaseName(etcd),
Namespace: etcd.Namespace,
}

deltaLease := &coordinationv1.Lease{}
err = lc.Get(ctx, nsName, deltaLease)
if err != nil {
if err := lc.Get(ctx, kutil.Key(etcd.Namespace, getDeltaSnapshotLeaseName(etcd)), deltaLease); err != nil {
logger.Info("Couldn't fetch delta snap lease because: " + err.Error())

return ctrl.Result{
RequeueAfter: 10 * time.Second,
}, err
}

// Run compaction job
if etcd.Spec.Backup.Store != nil {
full, err := strconv.ParseInt(*fullLease.Spec.HolderIdentity, 10, 64)
if err != nil {
logger.Error(err, "Can't convert holder identity of full snap lease to integer")
return ctrl.Result{
RequeueAfter: 10 * time.Second,
}, err
}
// Revisions have not been set yet by etcd-back-restore container.
// Skip further processing as we cannot calculate a revision delta.
if fullLease.Spec.HolderIdentity == nil || deltaLease.Spec.HolderIdentity == nil {
return ctrl.Result{}, nil
}

delta, err := strconv.ParseInt(*deltaLease.Spec.HolderIdentity, 10, 64)
if err != nil {
logger.Error(err, "Can't convert holder identity of delta snap lease to integer")
return ctrl.Result{
RequeueAfter: 10 * time.Second,
}, err
}
full, err := strconv.ParseInt(*fullLease.Spec.HolderIdentity, 10, 64)
if err != nil {
logger.Error(err, "Can't convert holder identity of full snap lease to integer")
return ctrl.Result{
RequeueAfter: 10 * time.Second,
}, err
}

delta, err := strconv.ParseInt(*deltaLease.Spec.HolderIdentity, 10, 64)
if err != nil {
logger.Error(err, "Can't convert holder identity of delta snap lease to integer")
return ctrl.Result{
RequeueAfter: 10 * time.Second,
}, err
}

diff := delta - full
diff := delta - full

// Reconcile job only when number of accumulated revisions over the last full snapshot is more than the configured threshold value via 'events-threshold' flag
if diff >= lc.config.EventsThreshold {
return lc.reconcileJob(ctx, logger, etcd)
}
// Reconcile job only when number of accumulated revisions over the last full snapshot is more than the configured threshold value via 'events-threshold' flag
if diff >= lc.config.EventsThreshold {
return lc.reconcileJob(ctx, logger, etcd)
}

return ctrl.Result{
Requeue: false,
}, nil
return ctrl.Result{}, nil
}

func (lc *CompactionLeaseController) reconcileJob(ctx context.Context, logger logr.Logger, etcd *druidv1alpha1.Etcd) (ctrl.Result, error) {
Expand All @@ -181,13 +178,16 @@ func (lc *CompactionLeaseController) reconcileJob(ctx context.Context, logger lo
RequeueAfter: 10 * time.Second,
}, fmt.Errorf("error while fetching compaction job: %v", err)
}
// Required job doesn't exist. Create new
job, err = lc.createCompactJob(ctx, logger, etcd)
logger.Info("Job Creation")
if err != nil {
return ctrl.Result{
RequeueAfter: 10 * time.Second,
}, fmt.Errorf("error during compaction job creation: %v", err)

if lc.config.CompactionEnabled {
// Required job doesn't exist. Create new
job, err = lc.createCompactJob(ctx, logger, etcd)
logger.Info("Job Creation")
if err != nil {
return ctrl.Result{
RequeueAfter: 10 * time.Second,
}, fmt.Errorf("error during compaction job creation: %v", err)
}
}
}

Expand Down Expand Up @@ -536,9 +536,11 @@ func (lc *CompactionLeaseController) SetupWithManager(mgr ctrl.Manager, workers
MaxConcurrentReconciles: workers,
})

builder = builder.WithEventFilter(buildPredicateForLC()).For(&druidv1alpha1.Etcd{})
builder = builder.Owns(&coordinationv1.Lease{})
return builder.Complete(lc)
return builder.
For(&druidv1alpha1.Etcd{}).
Owns(&coordinationv1.Lease{}).
WithEventFilter(buildPredicateForLC()).
Complete(lc)
}

func buildPredicateForLC() predicate.Predicate {
Expand Down
52 changes: 30 additions & 22 deletions controllers/compaction_lease_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,29 +70,7 @@ var _ = Describe("Lease Controller", func() {
svc = &corev1.Service{}
Eventually(func() error { return serviceIsCorrectlyReconciled(c, instance, svc) }, timeout, pollingInterval).Should(BeNil())
})
It("no jobs will be scheduled because no store details are provided", func() {
// Verufy if the statefulset updated the specs
validateEtcdWithDefaults(instance, s, cm, svc)

setStatefulSetReady(s)
err = c.Status().Update(context.TODO(), s)
Expect(err).NotTo(HaveOccurred())

// Verify if that the job is not created even if the holder identity in delta-snapshot-revision is greater than 1M
deltaLease := &coordinationv1.Lease{}
Eventually(func() error { return deltaLeaseIsCorrectlyReconciled(c, instance, deltaLease) }, timeout, pollingInterval).Should(BeNil())
err = kutil.TryUpdate(context.TODO(), retry.DefaultBackoff, c, deltaLease, func() error {
deltaLease.Spec.HolderIdentity = pointer.StringPtr("1000000")
renewedTime := time.Now()
deltaLease.Spec.RenewTime = &metav1.MicroTime{Time: renewedTime}
return nil
})
Expect(err).NotTo(HaveOccurred())

j := &batchv1.Job{}
Eventually(func() error { return jobIsCorrectlyReconciled(c, instance, j) }, time.Duration(30*time.Second), pollingInterval).ShouldNot(BeNil())

})
AfterEach(func() {
Expect(c.Delete(context.TODO(), instance)).To(Succeed())
Eventually(func() error { return statefulSetRemoved(c, s) }, timeout, pollingInterval).Should(BeNil())
Expand Down Expand Up @@ -218,6 +196,16 @@ var _ = Describe("Lease Controller", func() {
})
Expect(err).NotTo(HaveOccurred())

// Deliberately update the full lease
fullLease := &coordinationv1.Lease{}
Eventually(func() error { return fullLeaseIsCorrectlyReconciled(c, instance, fullLease) }, timeout, pollingInterval).Should(BeNil())
err = kutil.TryUpdate(context.TODO(), retry.DefaultBackoff, c, fullLease, func() error {
fullLease.Spec.HolderIdentity = pointer.StringPtr("0")
renewedTime := time.Now()
fullLease.Spec.RenewTime = &metav1.MicroTime{Time: renewedTime}
return nil
})

// Deliberately update the delta lease
deltaLease := &coordinationv1.Lease{}
Eventually(func() error { return deltaLeaseIsCorrectlyReconciled(c, instance, deltaLease) }, timeout, pollingInterval).Should(BeNil())
Expand Down Expand Up @@ -284,6 +272,26 @@ var _ = Describe("Lease Controller", func() {
})
Expect(err).NotTo(HaveOccurred())

// Deliberately update the full lease
fullLease := &coordinationv1.Lease{}
Eventually(func() error { return fullLeaseIsCorrectlyReconciled(c, instance, fullLease) }, timeout, pollingInterval).Should(BeNil())
err = kutil.TryUpdate(context.TODO(), retry.DefaultBackoff, c, fullLease, func() error {
fullLease.Spec.HolderIdentity = pointer.StringPtr("0")
renewedTime := time.Now()
fullLease.Spec.RenewTime = &metav1.MicroTime{Time: renewedTime}
return nil
})

Eventually(func() error {
if err := c.Get(ctx, client.ObjectKeyFromObject(fullLease), fullLease); err != nil {
return err
}
if fullLease.Spec.HolderIdentity != nil {
return nil
}
return fmt.Errorf("no HolderIdentity")
}, 10*time.Second, pollingInterval)

// Deliberately update the delta lease
deltaLease := &coordinationv1.Lease{}
Eventually(func() error { return deltaLeaseIsCorrectlyReconciled(c, instance, deltaLease) }, timeout, pollingInterval).Should(BeNil())
Expand Down
2 changes: 2 additions & 0 deletions controllers/config/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import "time"

// CompactionLeaseConfig contains configuration for the compaction controller.
type CompactionLeaseConfig struct {
// CompactionEnabled defines of compaction jobs should be created.
CompactionEnabled bool
// ActiveDeadlineDuration is the duration after which a running compaction job will be killed (Ex: "300ms", "20s", "-1.5h" or "2h45m")
ActiveDeadlineDuration time.Duration
// EventsThreshold is total number of etcd events that can be allowed before a backup compaction job is triggered
Expand Down
1 change: 1 addition & 0 deletions controllers/controllers_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ var _ = BeforeSuite(func(done Done) {
Expect(err).NotTo(HaveOccurred())

lc, err := NewCompactionLeaseControllerWithImageVector(mgr, controllersconfig.CompactionLeaseConfig{
CompactionEnabled: true,
EventsThreshold: 1000000,
ActiveDeadlineDuration: activeDeadlineDuration,
})
Expand Down
Loading

0 comments on commit 153fe29

Please sign in to comment.