diff --git a/api/v1/aerospikecluster_types.go b/api/v1/aerospikecluster_types.go index 089bed614..9c1b23712 100644 --- a/api/v1/aerospikecluster_types.go +++ b/api/v1/aerospikecluster_types.go @@ -555,6 +555,12 @@ type AerospikeStorageSpec struct { //nolint:govet // for readability // CleanupThreads contains maximum number of cleanup threads(dd or blkdiscard) per init container. CleanupThreads int `json:"cleanupThreads,omitempty"` + // CleanLocalPVC is a flag which allows operator to delete local PVC in case of kubernetes node (drain or delete) + CleanLocalPVC bool `json:"cleanLocalPVC,omitempty"` + + // LocalStorageClasses contains list of storage classes which provisions local volumes. + LocalStorageClasses []string `json:"localStorageClasses,omitempty"` + // Volumes list to attach to created pods. // +patchMergeKey=name // +patchStrategy=merge diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index a68fa7d32..9fd51cfb7 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -652,6 +652,11 @@ func (in *AerospikeStorageSpec) DeepCopyInto(out *AerospikeStorageSpec) { *out = *in in.FileSystemVolumePolicy.DeepCopyInto(&out.FileSystemVolumePolicy) in.BlockVolumePolicy.DeepCopyInto(&out.BlockVolumePolicy) + if in.LocalStorageClasses != nil { + in, out := &in.LocalStorageClasses, &out.LocalStorageClasses + *out = make([]string, len(*in)) + copy(*out, *in) + } if in.Volumes != nil { in, out := &in.Volumes, &out.Volumes *out = make([]VolumeSpec, len(*in)) diff --git a/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml b/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml index 65a697cac..9229ad628 100644 --- a/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml +++ b/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml @@ -5653,6 +5653,11 @@ spec: - deleteFiles type: string type: object + cleanLocalPVC: + description: CleanLocalPVC is a flag which allows operator + to delete local PVC in case of kubernetes node (drain + or delete) + type: boolean cleanupThreads: description: CleanupThreads contains maximum number of cleanup threads(dd or blkdiscard) per init container. @@ -5709,6 +5714,12 @@ spec: - deleteFiles type: string type: object + localStorageClasses: + description: LocalStorageClasses contains list of storage + classes which provisions local volumes. + items: + type: string + type: array volumes: description: Volumes list to attach to created pods. items: @@ -7341,6 +7352,11 @@ spec: - deleteFiles type: string type: object + cleanLocalPVC: + description: CleanLocalPVC is a flag which allows operator + to delete local PVC in case of kubernetes node (drain + or delete) + type: boolean cleanupThreads: description: CleanupThreads contains maximum number of cleanup threads(dd or blkdiscard) per init container. @@ -7397,6 +7413,12 @@ spec: - deleteFiles type: string type: object + localStorageClasses: + description: LocalStorageClasses contains list of storage + classes which provisions local volumes. + items: + type: string + type: array volumes: description: Volumes list to attach to created pods. items: @@ -8045,6 +8067,10 @@ spec: - deleteFiles type: string type: object + cleanLocalPVC: + description: CleanLocalPVC is a flag which allows operator to + delete local PVC in case of kubernetes node (drain or delete) + type: boolean cleanupThreads: description: CleanupThreads contains maximum number of cleanup threads(dd or blkdiscard) per init container. @@ -8101,6 +8127,12 @@ spec: - deleteFiles type: string type: object + localStorageClasses: + description: LocalStorageClasses contains list of storage classes + which provisions local volumes. + items: + type: string + type: array volumes: description: Volumes list to attach to created pods. items: @@ -14330,6 +14362,11 @@ spec: - deleteFiles type: string type: object + cleanLocalPVC: + description: CleanLocalPVC is a flag which allows operator + to delete local PVC in case of kubernetes node (drain + or delete) + type: boolean cleanupThreads: description: CleanupThreads contains maximum number of cleanup threads(dd or blkdiscard) per init container. @@ -14386,6 +14423,12 @@ spec: - deleteFiles type: string type: object + localStorageClasses: + description: LocalStorageClasses contains list of storage + classes which provisions local volumes. + items: + type: string + type: array volumes: description: Volumes list to attach to created pods. items: @@ -16018,6 +16061,11 @@ spec: - deleteFiles type: string type: object + cleanLocalPVC: + description: CleanLocalPVC is a flag which allows operator + to delete local PVC in case of kubernetes node (drain + or delete) + type: boolean cleanupThreads: description: CleanupThreads contains maximum number of cleanup threads(dd or blkdiscard) per init container. @@ -16074,6 +16122,12 @@ spec: - deleteFiles type: string type: object + localStorageClasses: + description: LocalStorageClasses contains list of storage + classes which provisions local volumes. + items: + type: string + type: array volumes: description: Volumes list to attach to created pods. items: @@ -16771,6 +16825,10 @@ spec: - deleteFiles type: string type: object + cleanLocalPVC: + description: CleanLocalPVC is a flag which allows operator to + delete local PVC in case of kubernetes node (drain or delete) + type: boolean cleanupThreads: description: CleanupThreads contains maximum number of cleanup threads(dd or blkdiscard) per init container. @@ -16827,6 +16885,12 @@ spec: - deleteFiles type: string type: object + localStorageClasses: + description: LocalStorageClasses contains list of storage classes + which provisions local volumes. + items: + type: string + type: array volumes: description: Volumes list to attach to created pods. items: diff --git a/controllers/aerospikecluster_controller.go b/controllers/aerospikecluster_controller.go index 2c2f8f441..c0606cb41 100644 --- a/controllers/aerospikecluster_controller.go +++ b/controllers/aerospikecluster_controller.go @@ -47,6 +47,38 @@ type AerospikeClusterReconciler struct { Log logr.Logger } +func stsPodPendingPredicate(e event.UpdateEvent) bool { + if e.ObjectOld == nil || e.ObjectNew == nil { + return false + } + + oldSTS, ok := e.ObjectOld.(*appsv1.StatefulSet) + if !ok { + return false + } + + newSTS, ok := e.ObjectNew.(*appsv1.StatefulSet) + if !ok { + return false + } + + if oldSTS.Status.ObservedGeneration == newSTS.Status.ObservedGeneration && + newSTS.Status.AvailableReplicas < oldSTS.Status.AvailableReplicas { + return true + } + + return false +} + +type statusPendingPredicate struct { + predicate.Funcs +} + +// Update implements default UpdateEvent filter for statefulSets. +func (statusPendingPredicate) Update(e event.UpdateEvent) bool { + return stsPodPendingPredicate(e) +} + // SetupWithManager sets up the controller with the Manager func (r *AerospikeClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). @@ -57,9 +89,7 @@ func (r *AerospikeClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { CreateFunc: func(e event.CreateEvent) bool { return false }, - UpdateFunc: func(e event.UpdateEvent) bool { - return false - }, + UpdateFunc: stsPodPendingPredicate, }, ), ). @@ -68,7 +98,8 @@ func (r *AerospikeClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { MaxConcurrentReconciles: maxConcurrentReconciles, }, ). - WithEventFilter(predicate.Or(predicate.GenerationChangedPredicate{}, predicate.LabelChangedPredicate{})). + WithEventFilter(predicate.Or(predicate.GenerationChangedPredicate{}, predicate.LabelChangedPredicate{}, + statusPendingPredicate{})). Complete(r) } diff --git a/controllers/pod.go b/controllers/pod.go index 8574b3c1c..0b2229c46 100644 --- a/controllers/pod.go +++ b/controllers/pod.go @@ -261,6 +261,12 @@ func (r *SingleClusterReconciler) restartPods( } } + if rackState.Rack.Storage.CleanLocalPVC { + if err := r.deleteLocalPVCs(pod, rackState.Rack.Storage.LocalStorageClasses); err != nil { + return reconcileError(err) + } + } + if err := r.Client.Delete(context.TODO(), pod); err != nil { r.Log.Error(err, "Failed to delete pod") return reconcileError(err) diff --git a/controllers/pvc.go b/controllers/pvc.go index f354ce77e..f1b15da13 100644 --- a/controllers/pvc.go +++ b/controllers/pvc.go @@ -103,6 +103,39 @@ func (r *SingleClusterReconciler) removePVCsAsync( return deletedPVCs, nil } +func (r *SingleClusterReconciler) deleteLocalPVCs(pod *corev1.Pod, localStorageClasses []string) error { + if pod.Status.Phase == corev1.PodPending && utils.IsPodNeedsToMigrate(pod) { + rackID, err := utils.GetRackIDFromPodName(pod.Name) + if err != nil { + return err + } + + pvcItems, err := r.getPodsPVCList([]string{pod.Name}, *rackID) + if err != nil { + return fmt.Errorf("could not find pvc for pod %v: %v", pod.Name, err) + } + + for idx := range pvcItems { + pvcStorageClass := pvcItems[idx].Spec.StorageClassName + if pvcStorageClass == nil { + r.Log.Info("PVC does not have storageClass set, no need to delete PVC", "pvcName", pvcItems[idx].Name) + + continue + } + + if utils.ContainsString(localStorageClasses, *pvcStorageClass) { + if err := r.Client.Delete(context.TODO(), &pvcItems[idx]); err != nil { + return fmt.Errorf( + "could not delete pvc %s: %v", pvcItems[idx].Name, err, + ) + } + } + } + } + + return nil +} + func (r *SingleClusterReconciler) waitForPVCTermination(deletedPVCs []corev1.PersistentVolumeClaim) error { if len(deletedPVCs) == 0 { return nil diff --git a/controllers/statefulset.go b/controllers/statefulset.go index b4247c9b8..3d3cc62d2 100644 --- a/controllers/statefulset.go +++ b/controllers/statefulset.go @@ -16,6 +16,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/util/retry" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -597,10 +598,16 @@ func (r *SingleClusterReconciler) updateSTS( // TODO: Add validation. device, file, both should not exist in same storage class r.updateSTSStorage(statefulSet, rackState) - // Save the updated stateful set. - // Can we optimize this? Update stateful set only if there is any change - // in it. - err := r.Client.Update(context.TODO(), statefulSet, updateOption) + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + found, err := r.getSTS(rackState) + if err != nil { + return err + } + + // Save the updated stateful set. + found.Spec = statefulSet.Spec + return r.Client.Update(context.TODO(), found, updateOption) + }) if err != nil { return fmt.Errorf( "failed to update StatefulSet %s: %v", @@ -1458,8 +1465,18 @@ func getSTSContainerPort( multiPodPerHost bool, aeroConf *asdbv1.AerospikeConfigSpec, ) []corev1.ContainerPort { ports := make([]corev1.ContainerPort, 0, len(defaultContainerPorts)) + portNames := make([]string, 0, len(defaultContainerPorts)) + + // Sorting defaultContainerPorts to fetch map in ordered manner. + // Helps reduce unnecessary sts object updates. + for portName := range defaultContainerPorts { + portNames = append(portNames, portName) + } + + sort.Strings(portNames) - for portName, portInfo := range defaultContainerPorts { + for _, portName := range portNames { + portInfo := defaultContainerPorts[portName] configPort := asdbv1.GetPortFromConfig( aeroConf, portInfo.connectionType, portInfo.configParam, ) diff --git a/pkg/utils/pod.go b/pkg/utils/pod.go index 75399f02b..b2c8e45f6 100644 --- a/pkg/utils/pod.go +++ b/pkg/utils/pod.go @@ -32,7 +32,7 @@ func CheckPodFailed(pod *corev1.Pod) error { return fmt.Errorf("pod %s has failed status", pod.Name) } - if pod.Status.Phase == corev1.PodPending && isPodReasonUnschedulable(pod) { + if pod.Status.Phase == corev1.PodPending && IsPodReasonUnschedulable(pod) { return fmt.Errorf("pod %s is in unschedulable state", pod.Name) } @@ -210,9 +210,22 @@ func isPodError(reason string) bool { return strings.HasSuffix(reason, "Error") } -func isPodReasonUnschedulable(pod *corev1.Pod) bool { +func IsPodReasonUnschedulable(pod *corev1.Pod) bool { for _, condition := range pod.Status.Conditions { - if condition.Type == corev1.PodScheduled && condition.Reason == corev1.PodReasonUnschedulable { + if condition.Type == corev1.PodScheduled && (condition.Reason == corev1.PodReasonUnschedulable || + condition.Reason == corev1.PodReasonSchedulerError) { + return true + } + } + + return false +} + +func IsPodNeedsToMigrate(pod *corev1.Pod) bool { + for _, condition := range pod.Status.Conditions { + if condition.Type == corev1.PodScheduled && (condition.Reason == corev1.PodReasonUnschedulable || + (condition.Reason == corev1.PodReasonSchedulerError && + strings.Contains(condition.Message, "nodeinfo not found for node name"))) { return true } } diff --git a/test/drain_test.go b/test/drain_test.go new file mode 100644 index 000000000..904b58bb0 --- /dev/null +++ b/test/drain_test.go @@ -0,0 +1,118 @@ +package test + +import ( + goctx "context" + "fmt" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "golang.org/x/net/context" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + asdbv1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1" +) + +var _ = Describe( + "NodeDrain", func() { + ctx := goctx.TODO() + Context( + "Clean local PVC", func() { + clusterName := "fake-local-volume-cluster" + clusterNamespacedName := getNamespacedName( + clusterName, namespace, + ) + + AfterEach( + func() { + aeroCluster := &asdbv1.AerospikeCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: clusterNamespacedName.Name, + Namespace: clusterNamespacedName.Namespace, + }, + } + err := deleteCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + }, + ) + + It( + + "Should delete local PVC if pod is unschedulable", func() { + aeroCluster := createDummyAerospikeCluster( + clusterNamespacedName, 2, + ) + aeroCluster.Spec.Storage.LocalStorageClasses = []string{storageClass} + aeroCluster.Spec.Storage.CleanLocalPVC = true + err := deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + + aeroCluster.Spec.PodSpec.AerospikeContainerSpec.Resources = unschedulableResource() + + err = updateClusterWithTO(k8sClient, ctx, aeroCluster, 1*time.Minute) + Expect(err).To(HaveOccurred()) + + pvcDeleted := false + pvcDeleted, err = isPVCDeleted(ctx, "ns-"+clusterName+"-0-1") + Expect(err).ToNot(HaveOccurred()) + Expect(pvcDeleted).To(Equal(true)) + }, + ) + It( + "Should not delete local PVC if cleanLocalPVC flag is not set", func() { + aeroCluster := createDummyAerospikeCluster( + clusterNamespacedName, 2, + ) + aeroCluster.Spec.Storage.LocalStorageClasses = []string{storageClass} + err := deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + + aeroCluster.Spec.PodSpec.AerospikeContainerSpec.Resources = unschedulableResource() + + err = updateClusterWithTO(k8sClient, ctx, aeroCluster, 1*time.Minute) + Expect(err).To(HaveOccurred()) + + pvcDeleted := false + pvcDeleted, err = isPVCDeleted(ctx, "ns-"+clusterName+"-0-1") + Expect(err).ToNot(HaveOccurred()) + Expect(pvcDeleted).To(Equal(false)) + }, + ) + }, + ) + }, +) + +func isPVCDeleted(ctx context.Context, pvcName string) (bool, error) { + pvc := &corev1.PersistentVolumeClaim{} + pvcNamespacesName := getNamespacedName( + pvcName, namespace, + ) + + err := k8sClient.Get(ctx, pvcNamespacesName, pvc) + if err != nil { + if errors.IsNotFound(err) { + return true, nil + } + + return false, err + } + + //nolint:exhaustive // rest of the cases handled by default + switch pvc.Status.Phase { + case corev1.ClaimPending: + return true, nil + case corev1.ClaimBound: + return false, nil + default: + return false, fmt.Errorf("PVC status is not expected %s", pvc.Name) + } +}