diff --git a/charts/etcd/templates/etcd-statefulset.yaml b/charts/etcd/templates/etcd-statefulset.yaml index f9e28b69e..b2906e279 100644 --- a/charts/etcd/templates/etcd-statefulset.yaml +++ b/charts/etcd/templates/etcd-statefulset.yaml @@ -116,6 +116,11 @@ spec: command: - etcdbrctl - server +{{- if and .Values.backup.fullSnapLeaseName .Values.backup.deltaSnapLeaseName }} + - --enable-snapshot-lease-renewal=true + - --delta-snapshot-lease-name={{ .Values.backup.deltaSnapLeaseName }} + - --full-snapshot-lease-name={{ .Values.backup.fullSnapLeaseName }} +{{- end }} {{- if .Values.etcd.defragmentationSchedule }} - --defragmentation-schedule={{ .Values.etcd.defragmentationSchedule }} {{- end }} diff --git a/controllers/compaction_lease_controller.go b/controllers/compaction_lease_controller.go index d28a2827a..6626f936c 100644 --- a/controllers/compaction_lease_controller.go +++ b/controllers/compaction_lease_controller.go @@ -44,7 +44,11 @@ import ( kutil "github.com/gardener/gardener/pkg/utils/kubernetes" ) -const DefaultETCDQuota = 8 * 1024 * 1024 * 1024 // 8Gi +const ( + // DefaultETCDQuota is the default etcd quota. + DefaultETCDQuota = 8 * 1024 * 1024 * 1024 // 8Gi +) + // CompactionLeaseController reconciles compaction job type CompactionLeaseController struct { client.Client @@ -104,17 +108,15 @@ func (lc *CompactionLeaseController) Reconcile(ctx context.Context, req ctrl.Req return lc.delete(ctx, lc.logger, etcd) } - logger := lc.logger.WithValues("etcd", kutil.Key(etcd.Namespace, etcd.Name).String()) - - // Get delta snapshot lease to check the HolderIdentity value to take decision on compaction job - nsName := types.NamespacedName{ - Name: getFullSnapshotLeaseName(etcd), - Namespace: etcd.Namespace, + if etcd.Spec.Backup.Store == nil { + return ctrl.Result{}, nil } + logger := lc.logger.WithValues("etcd", kutil.Key(etcd.Namespace, etcd.Name).String()) + + // Get full and delta snapshot lease to check the HolderIdentity value to take decision on compaction job fullLease := &coordinationv1.Lease{} - err := lc.Get(ctx, nsName, fullLease) - if err != nil { + if err := lc.Get(ctx, kutil.Key(etcd.Namespace, getFullSnapshotLeaseName(etcd)), fullLease); err != nil { logger.Info("Couldn't fetch full snap lease because: " + err.Error()) return ctrl.Result{ @@ -122,14 +124,8 @@ func (lc *CompactionLeaseController) Reconcile(ctx context.Context, req ctrl.Req }, err } - nsName = types.NamespacedName{ - Name: getDeltaSnapshotLeaseName(etcd), - Namespace: etcd.Namespace, - } - deltaLease := &coordinationv1.Lease{} - err = lc.Get(ctx, nsName, deltaLease) - if err != nil { + if err := lc.Get(ctx, kutil.Key(etcd.Namespace, getDeltaSnapshotLeaseName(etcd)), deltaLease); err != nil { logger.Info("Couldn't fetch delta snap lease because: " + err.Error()) return ctrl.Result{ @@ -137,35 +133,36 @@ func (lc *CompactionLeaseController) Reconcile(ctx context.Context, req ctrl.Req }, err } - // Run compaction job - if etcd.Spec.Backup.Store != nil { - full, err := strconv.ParseInt(*fullLease.Spec.HolderIdentity, 10, 64) - if err != nil { - logger.Error(err, "Can't convert holder identity of full snap lease to integer") - return ctrl.Result{ - RequeueAfter: 10 * time.Second, - }, err - } + // Revisions have not been set yet by etcd-back-restore container. + // Skip further processing as we cannot calculate a revision delta. + if fullLease.Spec.HolderIdentity == nil || deltaLease.Spec.HolderIdentity == nil { + return ctrl.Result{}, nil + } - delta, err := strconv.ParseInt(*deltaLease.Spec.HolderIdentity, 10, 64) - if err != nil { - logger.Error(err, "Can't convert holder identity of delta snap lease to integer") - return ctrl.Result{ - RequeueAfter: 10 * time.Second, - }, err - } + full, err := strconv.ParseInt(*fullLease.Spec.HolderIdentity, 10, 64) + if err != nil { + logger.Error(err, "Can't convert holder identity of full snap lease to integer") + return ctrl.Result{ + RequeueAfter: 10 * time.Second, + }, err + } + + delta, err := strconv.ParseInt(*deltaLease.Spec.HolderIdentity, 10, 64) + if err != nil { + logger.Error(err, "Can't convert holder identity of delta snap lease to integer") + return ctrl.Result{ + RequeueAfter: 10 * time.Second, + }, err + } - diff := delta - full + diff := delta - full - // Reconcile job only when number of accumulated revisions over the last full snapshot is more than the configured threshold value via 'events-threshold' flag - if diff >= lc.config.EventsThreshold { - return lc.reconcileJob(ctx, logger, etcd) - } + // Reconcile job only when number of accumulated revisions over the last full snapshot is more than the configured threshold value via 'events-threshold' flag + if diff >= lc.config.EventsThreshold { + return lc.reconcileJob(ctx, logger, etcd) } - return ctrl.Result{ - Requeue: false, - }, nil + return ctrl.Result{}, nil } func (lc *CompactionLeaseController) reconcileJob(ctx context.Context, logger logr.Logger, etcd *druidv1alpha1.Etcd) (ctrl.Result, error) { @@ -181,13 +178,16 @@ func (lc *CompactionLeaseController) reconcileJob(ctx context.Context, logger lo RequeueAfter: 10 * time.Second, }, fmt.Errorf("error while fetching compaction job: %v", err) } - // Required job doesn't exist. Create new - job, err = lc.createCompactJob(ctx, logger, etcd) - logger.Info("Job Creation") - if err != nil { - return ctrl.Result{ - RequeueAfter: 10 * time.Second, - }, fmt.Errorf("error during compaction job creation: %v", err) + + if lc.config.CompactionEnabled { + // Required job doesn't exist. Create new + job, err = lc.createCompactJob(ctx, logger, etcd) + logger.Info("Job Creation") + if err != nil { + return ctrl.Result{ + RequeueAfter: 10 * time.Second, + }, fmt.Errorf("error during compaction job creation: %v", err) + } } } @@ -536,9 +536,11 @@ func (lc *CompactionLeaseController) SetupWithManager(mgr ctrl.Manager, workers MaxConcurrentReconciles: workers, }) - builder = builder.WithEventFilter(buildPredicateForLC()).For(&druidv1alpha1.Etcd{}) - builder = builder.Owns(&coordinationv1.Lease{}) - return builder.Complete(lc) + return builder. + For(&druidv1alpha1.Etcd{}). + Owns(&coordinationv1.Lease{}). + WithEventFilter(buildPredicateForLC()). + Complete(lc) } func buildPredicateForLC() predicate.Predicate { diff --git a/controllers/compaction_lease_controller_test.go b/controllers/compaction_lease_controller_test.go index f4062d005..50dfae60b 100644 --- a/controllers/compaction_lease_controller_test.go +++ b/controllers/compaction_lease_controller_test.go @@ -70,29 +70,7 @@ var _ = Describe("Lease Controller", func() { svc = &corev1.Service{} Eventually(func() error { return serviceIsCorrectlyReconciled(c, instance, svc) }, timeout, pollingInterval).Should(BeNil()) }) - It("no jobs will be scheduled because no store details are provided", func() { - // Verufy if the statefulset updated the specs - validateEtcdWithDefaults(instance, s, cm, svc) - setStatefulSetReady(s) - err = c.Status().Update(context.TODO(), s) - Expect(err).NotTo(HaveOccurred()) - - // Verify if that the job is not created even if the holder identity in delta-snapshot-revision is greater than 1M - deltaLease := &coordinationv1.Lease{} - Eventually(func() error { return deltaLeaseIsCorrectlyReconciled(c, instance, deltaLease) }, timeout, pollingInterval).Should(BeNil()) - err = kutil.TryUpdate(context.TODO(), retry.DefaultBackoff, c, deltaLease, func() error { - deltaLease.Spec.HolderIdentity = pointer.StringPtr("1000000") - renewedTime := time.Now() - deltaLease.Spec.RenewTime = &metav1.MicroTime{Time: renewedTime} - return nil - }) - Expect(err).NotTo(HaveOccurred()) - - j := &batchv1.Job{} - Eventually(func() error { return jobIsCorrectlyReconciled(c, instance, j) }, time.Duration(30*time.Second), pollingInterval).ShouldNot(BeNil()) - - }) AfterEach(func() { Expect(c.Delete(context.TODO(), instance)).To(Succeed()) Eventually(func() error { return statefulSetRemoved(c, s) }, timeout, pollingInterval).Should(BeNil()) @@ -218,6 +196,16 @@ var _ = Describe("Lease Controller", func() { }) Expect(err).NotTo(HaveOccurred()) + // Deliberately update the full lease + fullLease := &coordinationv1.Lease{} + Eventually(func() error { return fullLeaseIsCorrectlyReconciled(c, instance, fullLease) }, timeout, pollingInterval).Should(BeNil()) + err = kutil.TryUpdate(context.TODO(), retry.DefaultBackoff, c, fullLease, func() error { + fullLease.Spec.HolderIdentity = pointer.StringPtr("0") + renewedTime := time.Now() + fullLease.Spec.RenewTime = &metav1.MicroTime{Time: renewedTime} + return nil + }) + // Deliberately update the delta lease deltaLease := &coordinationv1.Lease{} Eventually(func() error { return deltaLeaseIsCorrectlyReconciled(c, instance, deltaLease) }, timeout, pollingInterval).Should(BeNil()) @@ -284,6 +272,26 @@ var _ = Describe("Lease Controller", func() { }) Expect(err).NotTo(HaveOccurred()) + // Deliberately update the full lease + fullLease := &coordinationv1.Lease{} + Eventually(func() error { return fullLeaseIsCorrectlyReconciled(c, instance, fullLease) }, timeout, pollingInterval).Should(BeNil()) + err = kutil.TryUpdate(context.TODO(), retry.DefaultBackoff, c, fullLease, func() error { + fullLease.Spec.HolderIdentity = pointer.StringPtr("0") + renewedTime := time.Now() + fullLease.Spec.RenewTime = &metav1.MicroTime{Time: renewedTime} + return nil + }) + + Eventually(func() error { + if err := c.Get(ctx, client.ObjectKeyFromObject(fullLease), fullLease); err != nil { + return err + } + if fullLease.Spec.HolderIdentity != nil { + return nil + } + return fmt.Errorf("no HolderIdentity") + }, 10*time.Second, pollingInterval) + // Deliberately update the delta lease deltaLease := &coordinationv1.Lease{} Eventually(func() error { return deltaLeaseIsCorrectlyReconciled(c, instance, deltaLease) }, timeout, pollingInterval).Should(BeNil()) diff --git a/controllers/config/compaction.go b/controllers/config/compaction.go index 9c89a7c6b..69c6eb1d4 100644 --- a/controllers/config/compaction.go +++ b/controllers/config/compaction.go @@ -18,6 +18,8 @@ import "time" // CompactionLeaseConfig contains configuration for the compaction controller. type CompactionLeaseConfig struct { + // CompactionEnabled defines of compaction jobs should be created. + CompactionEnabled bool // ActiveDeadlineDuration is the duration after which a running compaction job will be killed (Ex: "300ms", "20s", "-1.5h" or "2h45m") ActiveDeadlineDuration time.Duration // EventsThreshold is total number of etcd events that can be allowed before a backup compaction job is triggered diff --git a/controllers/controllers_suite_test.go b/controllers/controllers_suite_test.go index 106cd3b0e..0632280fb 100644 --- a/controllers/controllers_suite_test.go +++ b/controllers/controllers_suite_test.go @@ -127,6 +127,7 @@ var _ = BeforeSuite(func(done Done) { Expect(err).NotTo(HaveOccurred()) lc, err := NewCompactionLeaseControllerWithImageVector(mgr, controllersconfig.CompactionLeaseConfig{ + CompactionEnabled: true, EventsThreshold: 1000000, ActiveDeadlineDuration: activeDeadlineDuration, }) diff --git a/controllers/etcd_controller.go b/controllers/etcd_controller.go index 0c019ede4..d771c7ab1 100644 --- a/controllers/etcd_controller.go +++ b/controllers/etcd_controller.go @@ -261,22 +261,25 @@ func (r *EtcdReconciler) reconcile(ctx context.Context, etcd *druidv1alpha1.Etcd }, err } - // TODO: (abdasgupta) Use the full snapshot lease available from the reconcile operation for setting backupready condition - fl, err := r.reconcileFullLease(ctx, logger, etcd) - if err != nil { - return ctrl.Result{ - Requeue: true, - }, err - } - logger.Info("Available Full Snapshot Lease: " + fl.Name) + // It isn't necessary to reconcile delta/full leases if no store configuration is given because potential compaction + // jobs need access to the store where backups are stored. + if etcd.Spec.Backup.Store != nil { + fl, err := r.reconcileFullLease(ctx, logger, etcd) + if err != nil { + return ctrl.Result{ + Requeue: true, + }, err + } + logger.Info("Available Full Snapshot Lease: " + fl.Name) - dl, err := r.reconcileDeltaLease(ctx, logger, etcd) - if err != nil { - return ctrl.Result{ - Requeue: true, - }, err + dl, err := r.reconcileDeltaLease(ctx, logger, etcd) + if err != nil { + return ctrl.Result{ + Requeue: true, + }, err + } + logger.Info("Available Delta Snapshot Lease: " + dl.Name) } - logger.Info("Available Delta Snapshot Lease: " + dl.Name) // Delete any existing cronjob if required. // TODO(abdasgupta) : This is for backward compatibility towards ETCD-Druid 0.6.0. Remove it. @@ -879,27 +882,21 @@ func (r *EtcdReconciler) getStatefulSetFromEtcd(etcd *druidv1alpha1.Etcd, values func (r *EtcdReconciler) reconcileFullLease(ctx context.Context, logger logr.Logger, etcd *druidv1alpha1.Etcd) (*coordinationv1.Lease, error) { // Get or Create fullSnapshotRevisions lease object that will help to set BackupReady condition fullSnapshotRevisions := getFullSnapshotLeaseName(etcd) - nsName := types.NamespacedName{ - Name: fullSnapshotRevisions, - Namespace: etcd.Namespace, - } fullLease := &coordinationv1.Lease{} - err1 := r.Get(ctx, nsName, fullLease) - if err1 != nil { - logger.Info("Couldn't fetch full snap lease " + fullSnapshotRevisions + ":" + err1.Error()) - - if apierrors.IsNotFound(err1) { - logger.Info("Creating the full snap lease " + fullSnapshotRevisions) - - fullLease = createSnapshotLease(etcd, fullSnapshotRevisions) - err2 := r.Create(ctx, fullLease) - if err2 != nil { - logger.Error(err2, "Full snap lease "+fullSnapshotRevisions+" couldn't be created") - return nil, err2 - } - } else { - return nil, err1 + if err := r.Get(ctx, kutil.Key(etcd.Namespace, fullSnapshotRevisions), fullLease); err != nil { + logger.Info("Couldn't fetch full snap lease " + fullSnapshotRevisions + ":" + err.Error()) + + if !apierrors.IsNotFound(err) { + return nil, err + } + + logger.Info("Creating the full snap lease " + fullSnapshotRevisions) + + fullLease = createSnapshotLease(etcd, fullSnapshotRevisions) + if err := r.Create(ctx, fullLease); err != nil { + logger.Error(err, "Full snap lease "+fullSnapshotRevisions+" couldn't be created") + return nil, err } } @@ -914,27 +911,21 @@ func (r *EtcdReconciler) reconcileDeltaLease(ctx context.Context, logger logr.Lo // Get or Create delta_snapshot_revisions lease object that will keep track of delta snapshot revisions based on which // compaction job will be scheduled deltaSnapshotRevisions := getDeltaSnapshotLeaseName(etcd) - nsName := types.NamespacedName{ - Name: deltaSnapshotRevisions, - Namespace: etcd.Namespace, - } deltaLease := &coordinationv1.Lease{} - err1 := r.Get(ctx, nsName, deltaLease) - if err1 != nil { - logger.Info("Couldn't fetch delta snap lease " + deltaSnapshotRevisions + " because: " + err1.Error()) - - if apierrors.IsNotFound(err1) { - logger.Info("Creating the delta snap lease " + deltaSnapshotRevisions) - - deltaLease = createSnapshotLease(etcd, deltaSnapshotRevisions) - err2 := r.Create(ctx, deltaLease) - if err2 != nil { - logger.Error(err2, "Delta snap lease "+deltaSnapshotRevisions+" couldn't be created") - return nil, err2 - } - } else { - return nil, err1 + if err := r.Get(ctx, kutil.Key(etcd.Namespace, deltaSnapshotRevisions), deltaLease); err != nil { + logger.Info("Couldn't fetch delta snap lease " + deltaSnapshotRevisions + " because: " + err.Error()) + + if !apierrors.IsNotFound(err) { + return nil, err + } + + logger.Info("Creating the delta snap lease " + deltaSnapshotRevisions) + + deltaLease = createSnapshotLease(etcd, deltaSnapshotRevisions) + if err := r.Create(ctx, deltaLease); err != nil { + logger.Error(err, "Delta snap lease "+deltaSnapshotRevisions+" couldn't be created") + return nil, err } } @@ -946,7 +937,6 @@ func getDeltaSnapshotLeaseName(etcd *druidv1alpha1.Etcd) string { } func createSnapshotLease(etcd *druidv1alpha1.Etcd, snapshotLeaseName string) *coordinationv1.Lease { - renewTime := metav1.NewMicroTime(time.Now()) return &coordinationv1.Lease{ ObjectMeta: metav1.ObjectMeta{ Name: snapshotLeaseName, @@ -962,10 +952,6 @@ func createSnapshotLease(etcd *druidv1alpha1.Etcd, snapshotLeaseName string) *co }, }, }, - Spec: coordinationv1.LeaseSpec{ - HolderIdentity: pointer.StringPtr("0"), - RenewTime: &renewTime, - }, } } func decodeObject(renderedChart *chartrenderer.RenderedChart, path string, object interface{}) error { @@ -1347,13 +1333,11 @@ func getMapFromEtcd(im imagevector.ImageVector, etcd *druidv1alpha1.Etcd) (map[s "statefulsetReplicas": statefulsetReplicas, "serviceName": fmt.Sprintf("%s-client", etcd.Name), "configMapName": fmt.Sprintf("etcd-bootstrap-%s", string(etcd.UID[:6])), - "fullSnapLeaseName": getFullSnapshotLeaseName(etcd), - "deltaSnapLeaseName": getDeltaSnapshotLeaseName(etcd), "jobName": getJobName(etcd), "volumeClaimTemplateName": volumeClaimTemplateName, "serviceAccountName": getServiceAccountName(etcd), - "roleName": fmt.Sprintf("%s-br-role", etcd.Name), - "roleBindingName": fmt.Sprintf("%s-br-rolebinding", etcd.Name), + "roleName": fmt.Sprintf("druid.gardener.cloud:etcd:%s", etcd.Name), + "roleBindingName": fmt.Sprintf("druid.gardener.cloud:etcd:%s", etcd.Name), } if etcd.Spec.StorageCapacity != nil { @@ -1378,13 +1362,16 @@ func getMapFromEtcd(im imagevector.ImageVector, etcd *druidv1alpha1.Etcd) (map[s if values["store"], err = utils.GetStoreValues(etcd.Spec.Backup.Store); err != nil { return nil, err } + + backupValues["fullSnapLeaseName"] = getFullSnapshotLeaseName(etcd) + backupValues["deltaSnapLeaseName"] = getDeltaSnapshotLeaseName(etcd) } return values, nil } func getServiceAccountName(etcd *druidv1alpha1.Etcd) string { - return fmt.Sprintf("%s-br-serviceaccount", etcd.Name) + return etcd.Name } func getEtcdImages(im imagevector.ImageVector, etcd *druidv1alpha1.Etcd) (string, string, error) { diff --git a/controllers/etcd_controller_test.go b/controllers/etcd_controller_test.go index c03d2766d..fa8bce181 100644 --- a/controllers/etcd_controller_test.go +++ b/controllers/etcd_controller_test.go @@ -165,15 +165,15 @@ func accessModeIterator(element interface{}) string { } func cmdIterator(element interface{}) string { - return string(element.(string)) + return element.(string) } func ruleIterator(element interface{}) string { - return string(element.(rbac.PolicyRule).APIGroups[0]) + return element.(rbac.PolicyRule).APIGroups[0] } func stringArrayIterator(element interface{}) string { - return string(element.(string)) + return element.(string) } var _ = Describe("Druid", func() { @@ -767,7 +767,7 @@ var _ = Describe("Cron Job", func() { func validateRole(instance *druidv1alpha1.Etcd, role *rbac.Role) { Expect(*role).To(MatchFields(IgnoreExtras, Fields{ "ObjectMeta": MatchFields(IgnoreExtras, Fields{ - "Name": Equal(fmt.Sprintf("%s-br-role", instance.Name)), + "Name": Equal(fmt.Sprintf("druid.gardener.cloud:etcd:%s", instance.Name)), "Namespace": Equal(instance.Namespace), "Labels": MatchKeys(IgnoreExtras, Keys{ "name": Equal("etcd"), @@ -1450,6 +1450,7 @@ func validateEtcd(instance *druidv1alpha1.Etcd, s *appsv1.StatefulSet, cm *corev "--snapstore-temp-directory=/var/etcd/data/temp": Equal("--snapstore-temp-directory=/var/etcd/data/temp"), "--etcd-process-name=etcd": Equal("--etcd-process-name=etcd"), "--etcd-connection-timeout=5m": Equal("--etcd-connection-timeout=5m"), + "--enable-snapshot-lease-renewal=true": Equal("--enable-snapshot-lease-renewal=true"), fmt.Sprintf("--defragmentation-schedule=%s", *instance.Spec.Etcd.DefragmentationSchedule): Equal(fmt.Sprintf("--defragmentation-schedule=%s", *instance.Spec.Etcd.DefragmentationSchedule)), fmt.Sprintf("--schedule=%s", *instance.Spec.Backup.FullSnapshotSchedule): Equal(fmt.Sprintf("--schedule=%s", *instance.Spec.Backup.FullSnapshotSchedule)), fmt.Sprintf("%s=%s", "--garbage-collection-policy", *instance.Spec.Backup.GarbageCollectionPolicy): Equal(fmt.Sprintf("%s=%s", "--garbage-collection-policy", *instance.Spec.Backup.GarbageCollectionPolicy)), @@ -1470,6 +1471,8 @@ func validateEtcd(instance *druidv1alpha1.Etcd, s *appsv1.StatefulSet, cm *corev fmt.Sprintf("%s=%s", "--owner-check-interval", instance.Spec.Backup.OwnerCheck.Interval.Duration.String()): Equal(fmt.Sprintf("%s=%s", "--owner-check-interval", instance.Spec.Backup.OwnerCheck.Interval.Duration.String())), fmt.Sprintf("%s=%s", "--owner-check-timeout", instance.Spec.Backup.OwnerCheck.Timeout.Duration.String()): Equal(fmt.Sprintf("%s=%s", "--owner-check-timeout", instance.Spec.Backup.OwnerCheck.Timeout.Duration.String())), fmt.Sprintf("%s=%s", "--owner-check-dns-cache-ttl", instance.Spec.Backup.OwnerCheck.DNSCacheTTL.Duration.String()): Equal(fmt.Sprintf("%s=%s", "--owner-check-dns-cache-ttl", instance.Spec.Backup.OwnerCheck.DNSCacheTTL.Duration.String())), + fmt.Sprintf("%s=%s", "--delta-snapshot-lease-name", getDeltaSnapshotLeaseName(instance)): Equal(fmt.Sprintf("%s=%s", "--delta-snapshot-lease-name", getDeltaSnapshotLeaseName(instance))), + fmt.Sprintf("%s=%s", "--full-snapshot-lease-name", getFullSnapshotLeaseName(instance)): Equal(fmt.Sprintf("%s=%s", "--full-snapshot-lease-name", getFullSnapshotLeaseName(instance))), }), "Ports": ConsistOf([]corev1.ContainerPort{ corev1.ContainerPort{ @@ -2117,7 +2120,7 @@ func serviceAccountIsCorrectlyReconciled(c client.Client, instance *druidv1alpha ctx, cancel := context.WithTimeout(context.TODO(), timeout) defer cancel() req := types.NamespacedName{ - Name: fmt.Sprintf("%s-br-serviceaccount", instance.Name), + Name: instance.Name, Namespace: instance.Namespace, } @@ -2131,7 +2134,7 @@ func roleIsCorrectlyReconciled(c client.Client, instance *druidv1alpha1.Etcd, ro ctx, cancel := context.WithTimeout(context.TODO(), timeout) defer cancel() req := types.NamespacedName{ - Name: fmt.Sprintf("%s-br-role", instance.Name), + Name: fmt.Sprintf("druid.gardener.cloud:etcd:%s", instance.Name), Namespace: instance.Namespace, } @@ -2145,7 +2148,7 @@ func roleBindingIsCorrectlyReconciled(c client.Client, instance *druidv1alpha1.E ctx, cancel := context.WithTimeout(context.TODO(), timeout) defer cancel() req := types.NamespacedName{ - Name: fmt.Sprintf("%s-br-rolebinding", instance.Name), + Name: fmt.Sprintf("druid.gardener.cloud:etcd:%s", instance.Name), Namespace: instance.Namespace, } diff --git a/docs/compactor.md b/docs/compactor.md index bf65f1018..4b2dfa4d3 100644 --- a/docs/compactor.md +++ b/docs/compactor.md @@ -33,10 +33,11 @@ To help with the problem mentioned earlier, our proposal is to introduce `compac The newly introduced compact command does not disturb the running ETCD while compacting the backup snapshots. The command is designed to run potentially separately (from the main ETCD process/container/pod). ETCD Druid can be configured to run the newly introduced compact command as a separate job (scheduled periodically) based on total number of ETCD events accumulated after the most recent full snapshot. ### Druid flags: -ETCD druid introduced following three flags to configure the compaction job: -`compaction-workers` : If this flag is set to zero, no compaction job will be running. If it's set to any value greater than zero, druid controller will have that many threads to kickstart the compaction job. -`etcd-events-threshold`: Set this flag with the value which will signify the number of ETCD events allowed after the most recent full snapshot. Once the number of ETCD events crosses the value mentioned in this flag, compaction job will be kickstarted. -`active-deadline-duration`: This flag signifies the maximum duration till which a compaction job won't be garbage-collected. +ETCD druid introduced following flags to configure the compaction job: +- `--enable-backup-compaction` (default `false`): Set this flag to `true` to enable the automatic compaction of etcd backups when `etcd-events-threshold` is exceeded. +- `--compaction-workers` (default `3`): If this flag is set to zero, no compaction job will be running. If it's set to any value greater than zero, druid controller will have that many threads to kickstart the compaction job. +- `--etcd-events-threshold` (default `1000000`): Set this flag with the value which will signify the number of ETCD events allowed after the most recent full snapshot. Once the number of ETCD events crosses the value mentioned in this flag, compaction job will be kickstarted. +- `--active-deadline-duration` (default `3h`): This flag signifies the maximum duration till which a compaction job won't be garbage-collected. ### **Points to take care while saving the compacted snapshot:** As compacted snapshot and the existing periodic full snapshots are taken by different processes running in different pods but accessing same store to save the snapshots, some problems may arise: diff --git a/main.go b/main.go index d2a7e2de9..785f94298 100644 --- a/main.go +++ b/main.go @@ -52,6 +52,7 @@ func main() { var ( metricsAddr string enableLeaderElection bool + enableBackupCompaction bool leaderElectionID string leaderElectionResourceLock string etcdWorkers int @@ -75,6 +76,8 @@ func main() { flag.IntVar(&custodianWorkers, "custodian-workers", 3, "Number of worker threads of the custodian controller.") flag.IntVar(&etcdCopyBackupsTaskWorkers, "etcd-copy-backups-task-workers", 3, "Number of worker threads of the EtcdCopyBackupsTask controller.") flag.DurationVar(&custodianSyncPeriod, "custodian-sync-period", 30*time.Second, "Sync period of the custodian controller.") + flag.BoolVar(&enableBackupCompaction, "enable-backup-compaction", false, + "Enable automatic compaction of etcd backups.") flag.IntVar(&compactionWorkers, "compaction-workers", 3, "Number of worker threads of the CompactionJob controller. The controller creates a backup compaction job if a certain etcd event threshold is reached. Setting this flag to 0 disabled the controller.") flag.Int64Var(&eventsThreshold, "etcd-events-threshold", 1000000, "Total number of etcd events that can be allowed before a backup compaction job is triggered.") flag.DurationVar(&activeDeadlineDuration, "active-deadline-duration", 3*time.Hour, "Duration after which a running backup compaction job will be killed (Ex: \"300ms\", \"20s\", \"-1.5h\" or \"2h45m\").") @@ -148,6 +151,7 @@ func main() { } lc, err := controllers.NewCompactionLeaseControllerWithImageVector(mgr, controllersconfig.CompactionLeaseConfig{ + CompactionEnabled: enableBackupCompaction, EventsThreshold: eventsThreshold, ActiveDeadlineDuration: activeDeadlineDuration, }) diff --git a/pkg/predicate/predicate.go b/pkg/predicate/predicate.go index b4de0ec4d..13f08b8ea 100644 --- a/pkg/predicate/predicate.go +++ b/pkg/predicate/predicate.go @@ -124,7 +124,8 @@ func LeaseHolderIdentityChange() predicate.Predicate { if !ok { return false } - return *leaseOld.Spec.HolderIdentity != *leaseNew.Spec.HolderIdentity + + return !reflect.DeepEqual(leaseOld.Spec.HolderIdentity, leaseNew.Spec.HolderIdentity) } return predicate.Funcs{ diff --git a/pkg/predicate/predicate_test.go b/pkg/predicate/predicate_test.go index 924a823d1..1c403c0ed 100644 --- a/pkg/predicate/predicate_test.go +++ b/pkg/predicate/predicate_test.go @@ -116,6 +116,20 @@ var _ = Describe("Druid Predicate", func() { pred = LeaseHolderIdentityChange() }) + Context("when holder identity is nil", func() { + BeforeEach(func() { + obj = &coordinationv1.Lease{} + oldObj = &coordinationv1.Lease{} + }) + + It("should return false", func() { + gomega.Expect(pred.Create(createEvent)).To(gomega.BeTrue()) + gomega.Expect(pred.Update(updateEvent)).To(gomega.BeFalse()) + gomega.Expect(pred.Delete(deleteEvent)).To(gomega.BeTrue()) + gomega.Expect(pred.Generic(genericEvent)).To(gomega.BeTrue()) + }) + }) + Context("when holder identity matches", func() { BeforeEach(func() { obj = &coordinationv1.Lease{