Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KO-245] Deleting PVC of local volumes during node maintenance. #256

Closed
wants to merge 10 commits into from
2 changes: 2 additions & 0 deletions api/v1/aerospikecluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ type AerospikeClusterSpec struct { //nolint:govet // for readability
SeedsFinderServices SeedsFinderServices `json:"seedsFinderServices,omitempty"`
// RosterNodeBlockList is a list of blocked nodeIDs from roster in a strong-consistency setup
RosterNodeBlockList []string `json:"rosterNodeBlockList,omitempty"`
// CleanLocalPVC is a flag which allows operator to delete local PVC in case of kubernetes node crash
CleanLocalPVC bool `json:"cleanLocalPVC,omitempty"`
}

type SeedsFinderServices struct {
Expand Down
4 changes: 4 additions & 0 deletions config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,10 @@ spec:
- customInterface
type: string
type: object
cleanLocalPVC:
description: CleanLocalPVC is a flag which allows operator to delete
local PVC in case of kubernetes node crash
type: boolean
image:
description: Aerospike server image
type: string
Expand Down
1 change: 1 addition & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ rules:
- ""
resources:
- secrets
- persistentvolumes
abhishekdwivedi3060 marked this conversation as resolved.
Show resolved Hide resolved
verbs:
- get
- apiGroups:
Expand Down
40 changes: 36 additions & 4 deletions controllers/aerospikecluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,38 @@ type AerospikeClusterReconciler struct {
Log logr.Logger
}

func stsPodPendingPredicate(e event.UpdateEvent) bool {
if e.ObjectOld == nil || e.ObjectNew == nil {
return false
}

oldSTS, ok := e.ObjectOld.(*appsv1.StatefulSet)
if !ok {
return false
}

newSTS, ok := e.ObjectNew.(*appsv1.StatefulSet)
if !ok {
return false
}

if oldSTS.Status.ObservedGeneration == newSTS.Status.ObservedGeneration &&
newSTS.Status.AvailableReplicas < oldSTS.Status.AvailableReplicas {
return true
}

return false
}

type StatusPendingPredicate struct {
abhishekdwivedi3060 marked this conversation as resolved.
Show resolved Hide resolved
predicate.Funcs
}

// Update implements default UpdateEvent filter for statefulSets.
func (StatusPendingPredicate) Update(e event.UpdateEvent) bool {
return stsPodPendingPredicate(e)
}

// SetupWithManager sets up the controller with the Manager
func (r *AerospikeClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Expand All @@ -57,9 +89,7 @@ func (r *AerospikeClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
CreateFunc: func(e event.CreateEvent) bool {
return false
},
UpdateFunc: func(e event.UpdateEvent) bool {
return false
},
UpdateFunc: stsPodPendingPredicate,
},
),
).
Expand All @@ -68,7 +98,8 @@ func (r *AerospikeClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
MaxConcurrentReconciles: maxConcurrentReconciles,
},
).
WithEventFilter(predicate.Or(predicate.GenerationChangedPredicate{}, predicate.LabelChangedPredicate{})).
WithEventFilter(predicate.Or(predicate.GenerationChangedPredicate{}, predicate.LabelChangedPredicate{},
StatusPendingPredicate{})).
Complete(r)
}

Expand All @@ -82,6 +113,7 @@ type RackState struct {
// +kubebuilder:rbac:groups=core,resources=pods/exec,verbs=create
// +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=persistentvolumes,verbs=get
// +kubebuilder:rbac:groups=core,resources=events,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=secrets,verbs=get
Expand Down
12 changes: 12 additions & 0 deletions controllers/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,12 @@ func (r *SingleClusterReconciler) restartPods(
}
}

if r.aeroCluster.Spec.CleanLocalPVC {
if err := r.deleteLocalPVCs(pod); err != nil {
return reconcileError(err)
}
}

if err := r.Client.Delete(context.TODO(), pod); err != nil {
r.Log.Error(err, "Failed to delete pod")
return reconcileError(err)
Expand Down Expand Up @@ -399,6 +405,12 @@ func (r *SingleClusterReconciler) deletePodAndEnsureImageUpdated(

// Delete pods
for _, p := range podsToUpdate {
if r.aeroCluster.Spec.CleanLocalPVC {
if err := r.deleteLocalPVCs(p); err != nil {
return reconcileError(err)
}
}

abhishekdwivedi3060 marked this conversation as resolved.
Show resolved Hide resolved
if err := r.Client.Delete(context.TODO(), p); err != nil {
return reconcileError(err)
}
Expand Down
35 changes: 35 additions & 0 deletions controllers/pvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package controllers
import (
"context"
"fmt"
"strconv"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

asdbv1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1"
Expand Down Expand Up @@ -103,6 +105,39 @@ func (r *SingleClusterReconciler) removePVCsAsync(
return deletedPVCs, nil
}

func (r *SingleClusterReconciler) deleteLocalPVCs(pod *corev1.Pod) error {
if pod.Status.Phase == corev1.PodPending && utils.IsPodNeedsToMigrate(pod) {
rackID, err := strconv.Atoi(pod.Labels[asdbv1.AerospikeRackIDLabel])
if err != nil {
return err
}

abhishekdwivedi3060 marked this conversation as resolved.
Show resolved Hide resolved
pvcItems, err := r.getPodsPVCList([]string{pod.Name}, rackID)
if err != nil {
return fmt.Errorf("could not find pvc for pod %v: %v", pod.Name, err)
}

for idx := range pvcItems {
pv := &corev1.PersistentVolume{}
pvName := types.NamespacedName{Name: pvcItems[idx].Spec.VolumeName}

if err := r.Client.Get(context.TODO(), pvName, pv); err != nil {
return err
}

abhishekdwivedi3060 marked this conversation as resolved.
Show resolved Hide resolved
if pv.Spec.Local != nil {
if err := r.Client.Delete(context.TODO(), &pvcItems[idx]); err != nil {
return fmt.Errorf(
"could not delete pvc %s: %v", pvcItems[idx].Name, err,
)
}
}
}
}

return nil
}

func (r *SingleClusterReconciler) waitForPVCTermination(deletedPVCs []corev1.PersistentVolumeClaim) error {
if len(deletedPVCs) == 0 {
return nil
Expand Down
36 changes: 27 additions & 9 deletions controllers/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

Expand Down Expand Up @@ -597,16 +598,21 @@ func (r *SingleClusterReconciler) updateSTS(
// TODO: Add validation. device, file, both should not exist in same storage class
r.updateSTSStorage(statefulSet, rackState)

// Save the updated stateful set.
// Can we optimize this? Update stateful set only if there is any change
// in it.
err := r.Client.Update(context.TODO(), statefulSet, updateOption)
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
found, err := r.getSTS(rackState)
if err != nil {
return err
}

if reflect.DeepEqual(found.Spec, statefulSet.Spec) {
abhishekdwivedi3060 marked this conversation as resolved.
Show resolved Hide resolved
return nil
}
// Save the updated stateful set.
found.Spec = statefulSet.Spec
return r.Client.Update(context.TODO(), found, updateOption)
})
if err != nil {
return fmt.Errorf(
"failed to update StatefulSet %s: %v",
statefulSet.Name,
err,
)
return err
}

r.Log.V(1).Info(
Expand Down Expand Up @@ -775,7 +781,16 @@ func (r *SingleClusterReconciler) updateSTSNonPVStorage(
)

// Add volume in statefulSet template
perm := corev1.SecretVolumeSourceDefaultMode
k8sVolume := createVolumeForVolumeAttachment(volume)

switch {
case k8sVolume.Secret != nil:
k8sVolume.Secret.DefaultMode = &perm
case k8sVolume.ConfigMap != nil:
k8sVolume.ConfigMap.DefaultMode = &perm
}

tanmayja marked this conversation as resolved.
Show resolved Hide resolved
st.Spec.Template.Spec.Volumes = append(
st.Spec.Template.Spec.Volumes, k8sVolume,
)
Expand Down Expand Up @@ -1190,6 +1205,8 @@ func getDefaultAerospikeInitContainerVolumeMounts() []corev1.VolumeMount {
func getDefaultSTSVolumes(
aeroCluster *asdbv1.AerospikeCluster, rackState *RackState,
) []corev1.Volume {
defaultMode := corev1.SecretVolumeSourceDefaultMode

return []corev1.Volume{
{
Name: confDirName,
Expand All @@ -1206,6 +1223,7 @@ func getDefaultSTSVolumes(
aeroCluster, rackState.Rack.ID,
).Name,
},
DefaultMode: &defaultMode,
},
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ rules:
- ""
resources:
- secrets
- persistentvolumes
verbs:
- get
- apiGroups:
Expand Down
19 changes: 16 additions & 3 deletions pkg/utils/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func CheckPodFailed(pod *corev1.Pod) error {
return fmt.Errorf("pod %s has failed status", pod.Name)
}

if pod.Status.Phase == corev1.PodPending && isPodReasonUnschedulable(pod) {
if pod.Status.Phase == corev1.PodPending && IsPodReasonUnschedulable(pod) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about moving this pod.Status.Phase == corev1.PodPending inside IsPodReasonUnschedulable and IsPodNeedsToMigrate func ?

Copy link
Contributor Author

@tanmayja tanmayja Oct 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

anything is fine. Both functions are just on the conditions, even we can just pass conditions also in these functions.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, anything is fine. My thinking was to keep every un-schedulable condition under a func so that we don't have to call multiple conditions ANDed together at multiple places if required.

return fmt.Errorf("pod %s is in unschedulable state", pod.Name)
}

Expand Down Expand Up @@ -210,9 +210,22 @@ func isPodError(reason string) bool {
return strings.HasSuffix(reason, "Error")
}

func isPodReasonUnschedulable(pod *corev1.Pod) bool {
func IsPodReasonUnschedulable(pod *corev1.Pod) bool {
for _, condition := range pod.Status.Conditions {
if condition.Type == corev1.PodScheduled && condition.Reason == corev1.PodReasonUnschedulable {
if condition.Type == corev1.PodScheduled && (condition.Reason == corev1.PodReasonUnschedulable ||
condition.Reason == corev1.PodReasonSchedulerError) {
return true
}
}

return false
}

func IsPodNeedsToMigrate(pod *corev1.Pod) bool {
for _, condition := range pod.Status.Conditions {
if condition.Type == corev1.PodScheduled && (condition.Reason == corev1.PodReasonUnschedulable ||
condition.Reason == corev1.PodReasonSchedulerError &&
strings.Contains(condition.Message, "nodeinfo not found for node name")) {
return true
}
}
Expand Down
5 changes: 4 additions & 1 deletion test/cluster_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1373,11 +1373,14 @@ func getStorageVolumeForAerospike(name, path string) asdbv1.VolumeSpec {
}

func getStorageVolumeForSecret() asdbv1.VolumeSpec {
perm := corev1.SecretVolumeSourceDefaultMode

return asdbv1.VolumeSpec{
Name: aerospikeConfigSecret,
Source: asdbv1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: tlsSecretName,
SecretName: tlsSecretName,
DefaultMode: &perm,
},
},
Aerospike: &asdbv1.AerospikeServerVolumeAttachment{
Expand Down