Skip to content

Commit

Permalink
add etcd reconcile logic (#103)
Browse files Browse the repository at this point in the history
add machine controller to reconcile node etcd removal



Add comment and small fix



slightly fixed the comment

Signed-off-by: nasusoba <[email protected]>
Co-authored-by: qliang <[email protected]>
  • Loading branch information
nasusoba and qliang authored May 7, 2024
1 parent 286bede commit 6b8c220
Show file tree
Hide file tree
Showing 9 changed files with 459 additions and 64 deletions.
2 changes: 2 additions & 0 deletions controlplane/controllers/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ const (
// dependentCertRequeueAfter is how long to wait before checking again to see if
// dependent certificates have been created.
dependentCertRequeueAfter = 30 * time.Second

k3sHookName = "k3s"
)
59 changes: 55 additions & 4 deletions controlplane/controllers/kthreescontrolplane_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ package controllers

import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/go-logr/logr"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -480,9 +480,9 @@ func (r *KThreesControlPlaneReconciler) reconcile(ctx context.Context, cluster *

// Ensures the number of etcd members is in sync with the number of machines/nodes.
// NOTE: This is usually required after a machine deletion.
// if result, err := r.reconcileEtcdMembers(ctx, controlPlane); err != nil || !result.IsZero() {
// return result, err
// }
if err := r.reconcileEtcdMembers(ctx, controlPlane); err != nil {
return reconcile.Result{}, err
}

// Reconcile unhealthy machines by triggering deletion and requeue if it is considered safe to remediate,
// otherwise continue with the other KCP operations.
Expand Down Expand Up @@ -655,6 +655,57 @@ func (r *KThreesControlPlaneReconciler) reconcileControlPlaneConditions(ctx cont
return nil
}

// reconcileEtcdMembers ensures the number of etcd members is in sync with the number of machines/nodes.
// This is usually required after a machine deletion.
//
// NOTE: this func uses KCP conditions, it is required to call reconcileControlPlaneConditions before this.
func (r *KThreesControlPlaneReconciler) reconcileEtcdMembers(ctx context.Context, controlPlane *k3s.ControlPlane) error {
log := ctrl.LoggerFrom(ctx)

// If etcd is not managed by KCP this is a no-op.
if !controlPlane.IsEtcdManaged() {
return nil
}

// If there is no KCP-owned control-plane machines, then control-plane has not been initialized yet.
if controlPlane.Machines.Len() == 0 {
return nil
}

// Collect all the node names.
nodeNames := []string{}
for _, machine := range controlPlane.Machines {
if machine.Status.NodeRef == nil {
// If there are provisioning machines (machines without a node yet), return.
return nil
}
nodeNames = append(nodeNames, machine.Status.NodeRef.Name)
}

// Potential inconsistencies between the list of members and the list of machines/nodes are
// surfaced using the EtcdClusterHealthyCondition; if this condition is true, meaning no inconsistencies exists, return early.
if conditions.IsTrue(controlPlane.KCP, controlplanev1.EtcdClusterHealthyCondition) {
return nil
}

workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(controlPlane.Cluster))
if err != nil {
// Failing at connecting to the workload cluster can mean workload cluster is unhealthy for a variety of reasons such as etcd quorum loss.
return errors.Wrap(err, "cannot get remote client to workload cluster")
}

removedMembers, err := workloadCluster.ReconcileEtcdMembers(ctx, nodeNames)
if err != nil {
return errors.Wrap(err, "failed attempt to reconcile etcd members")
}

if len(removedMembers) > 0 {
log.Info("Etcd members without nodes removed from the cluster", "members", removedMembers)
}

return nil
}

func (r *KThreesControlPlaneReconciler) upgradeControlPlane(
ctx context.Context,
cluster *clusterv1.Cluster,
Expand Down
125 changes: 125 additions & 0 deletions controlplane/controllers/machine_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package controllers

import (
"context"
"time"

"github.com/go-logr/logr"
"github.com/pkg/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/conditions"
"sigs.k8s.io/cluster-api/util/patch"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

k3s "github.com/k3s-io/cluster-api-k3s/pkg/k3s"
)

// KThreesControlPlaneReconciler reconciles a KThreesControlPlane object.
type MachineReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme

EtcdDialTimeout time.Duration
EtcdCallTimeout time.Duration

managementCluster k3s.ManagementCluster
managementClusterUncached k3s.ManagementCluster
}

func (r *MachineReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, log *logr.Logger) error {
_, err := ctrl.NewControllerManagedBy(mgr).
For(&clusterv1.Machine{}).
Build(r)

if r.managementCluster == nil {
r.managementCluster = &k3s.Management{
Client: r.Client,
EtcdDialTimeout: r.EtcdDialTimeout,
EtcdCallTimeout: r.EtcdCallTimeout,
}
}

if r.managementClusterUncached == nil {
r.managementClusterUncached = &k3s.Management{
Client: mgr.GetAPIReader(),
EtcdDialTimeout: r.EtcdDialTimeout,
EtcdCallTimeout: r.EtcdCallTimeout,
}
}

return err
}

// +kubebuilder:rbac:groups=cluster.x-k8s.io,resources=clusters;clusters/status,verbs=get;list;watch
// +kubebuilder:rbac:groups=cluster.x-k8s.io,resources=machines;machines/status,verbs=get;list;watch;create;update;patch;delete
func (r *MachineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := r.Log.WithValues("namespace", req.Namespace, "machine", req.Name)

m := &clusterv1.Machine{}
if err := r.Client.Get(ctx, req.NamespacedName, m); err != nil {
if apierrors.IsNotFound(err) {
// Object not found, return. Created objects are automatically garbage collected.
// For additional cleanup logic use finalizers.
return ctrl.Result{}, nil
}

// Error reading the object - requeue the request.
return ctrl.Result{}, err
}

if m.DeletionTimestamp.IsZero() {
return ctrl.Result{}, nil
}

// if machine registered PreTerminate hook, wait for capi to drain and deattach volume, then remove etcd member
if annotations.HasWithPrefix(clusterv1.PreTerminateDeleteHookAnnotationPrefix, m.ObjectMeta.Annotations) &&
m.ObjectMeta.Annotations[clusterv1.PreTerminateDeleteHookAnnotationPrefix] == k3sHookName {
if !conditions.IsTrue(m, clusterv1.DrainingSucceededCondition) || !conditions.IsTrue(m, clusterv1.VolumeDetachSucceededCondition) {
logger.Info("wait for machine drain and detech volume operation complete.")
return ctrl.Result{}, nil
}

cluster, err := util.GetClusterFromMetadata(ctx, r.Client, m.ObjectMeta)
if err != nil {
logger.Info("unable to get cluster.")
return ctrl.Result{}, errors.Wrapf(err, "unable to get cluster")
}

workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster))
if err != nil {
logger.Error(err, "failed to create client to workload cluster")
return ctrl.Result{}, errors.Wrapf(err, "failed to create client to workload cluster")
}

etcdRemoved, err := workloadCluster.RemoveEtcdMemberForMachine(ctx, m)
if err != nil {
logger.Error(err, "failed to remove etcd member for machine")
return ctrl.Result{}, err
}
if !etcdRemoved {
logger.Info("wait k3s embedded etcd controller to remove etcd")
return ctrl.Result{Requeue: true}, err
}
logger.Info("etcd remove etcd member succeeded", "node", m.Status.NodeRef.Name)

patchHelper, err := patch.NewHelper(m, r.Client)
if err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed to create patch helper for machine")
}

mAnnotations := m.GetAnnotations()
delete(mAnnotations, clusterv1.PreTerminateDeleteHookAnnotationPrefix)
m.SetAnnotations(mAnnotations)
if err := patchHelper.Patch(ctx, m); err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed patch machine")
}
}

return ctrl.Result{}, nil
}
29 changes: 12 additions & 17 deletions controlplane/controllers/remediation.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/klog/v2"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/conditions"
"sigs.k8s.io/cluster-api/util/patch"
Expand Down Expand Up @@ -167,13 +168,10 @@ func (r *KThreesControlPlaneReconciler) reconcileUnhealthyMachines(ctx context.C
// Start remediating the unhealthy control plane machine by deleting it.
// A new machine will come up completing the operation as part of the regular reconcile.

// TODO figure out etcd complexities
// If the control plane is initialized, before deleting the machine:
// - if the machine hosts the etcd leader, forward etcd leadership to another machine.
// - delete the etcd member hosted on the machine being deleted.
// - remove the etcd member from the kubeadm config map (only for kubernetes version older than v1.22.0)
/**
workloadCluster, err := controlPlane.GetWorkloadCluster(ctx)
workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(controlPlane.Cluster))
if err != nil {
log.Error(err, "Failed to create client to workload cluster")
return ctrl.Result{}, errors.Wrapf(err, "failed to create client to workload cluster")
Expand All @@ -193,23 +191,20 @@ func (r *KThreesControlPlaneReconciler) reconcileUnhealthyMachines(ctx context.C
conditions.MarkFalse(machineToBeRemediated, clusterv1.MachineOwnerRemediatedCondition, clusterv1.RemediationFailedReason, clusterv1.ConditionSeverityError, err.Error())
return ctrl.Result{}, err
}
if err := workloadCluster.RemoveEtcdMemberForMachine(ctx, machineToBeRemediated); err != nil {
log.Error(err, "Failed to remove etcd member for machine")
conditions.MarkFalse(machineToBeRemediated, clusterv1.MachineOwnerRemediatedCondition, clusterv1.RemediationFailedReason, clusterv1.ConditionSeverityError, err.Error())
return ctrl.Result{}, err

patchHelper, err := patch.NewHelper(machineToBeRemediated, r.Client)
if err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed to create patch helper for machine")
}
}

parsedVersion, err := semver.ParseTolerant(controlPlane.KCP.Spec.Version)
if err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed to parse kubernetes version %q", controlPlane.KCP.Spec.Version)
}
mAnnotations := machineToBeRemediated.GetAnnotations()
mAnnotations[clusterv1.PreTerminateDeleteHookAnnotationPrefix] = k3sHookName
machineToBeRemediated.SetAnnotations(mAnnotations)

if err := workloadCluster.RemoveMachineFromKubeadmConfigMap(ctx, machineToBeRemediated, parsedVersion); err != nil {
log.Error(err, "Failed to remove machine from kubeadm ConfigMap")
return ctrl.Result{}, err
if err := patchHelper.Patch(ctx, machineToBeRemediated); err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed patch machine for adding preTerminate hook")
}
}
**/
}

// Delete the machine
Expand Down
26 changes: 20 additions & 6 deletions controlplane/controllers/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package controllers
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -33,6 +33,7 @@ import (
"sigs.k8s.io/cluster-api/controllers/external"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/conditions"
"sigs.k8s.io/cluster-api/util/patch"
ctrl "sigs.k8s.io/controller-runtime"

bootstrapv1 "github.com/k3s-io/cluster-api-k3s/bootstrap/api/v1beta1"
Expand Down Expand Up @@ -132,12 +133,19 @@ func (r *KThreesControlPlaneReconciler) scaleDownControlPlane(
logger.Error(err, "Failed to move leadership to candidate machine", "candidate", etcdLeaderCandidate.Name)
return ctrl.Result{}, err
}
logger.Info("etcd move etcd leader succeeded, node to delete %s", machineToDelete.Status.NodeRef.Name)
if err := workloadCluster.RemoveEtcdMemberForMachine(ctx, machineToDelete); err != nil {
logger.Error(err, "Failed to remove etcd member for machine")
return ctrl.Result{}, err

patchHelper, err := patch.NewHelper(machineToDelete, r.Client)
if err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed to create patch helper for machine")
}

mAnnotations := machineToDelete.GetAnnotations()
mAnnotations[clusterv1.PreTerminateDeleteHookAnnotationPrefix] = k3sHookName
machineToDelete.SetAnnotations(mAnnotations)

if err := patchHelper.Patch(ctx, machineToDelete); err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed patch machine for adding preTerminate hook")
}
logger.Info("etcd remove etcd member succeeded, node to delete %s", machineToDelete.Status.NodeRef.Name)
}

logger = logger.WithValues("machine", machineToDelete)
Expand Down Expand Up @@ -177,6 +185,12 @@ func (r *KThreesControlPlaneReconciler) preflightChecks(_ context.Context, contr

// Check machine health conditions; if there are conditions with False or Unknown, then wait.
allMachineHealthConditions := []clusterv1.ConditionType{controlplanev1.MachineAgentHealthyCondition}
if controlPlane.IsEtcdManaged() {
allMachineHealthConditions = append(allMachineHealthConditions,
controlplanev1.MachineEtcdMemberHealthyCondition,
)
}

machineErrors := []error{}

loopmachines:
Expand Down
12 changes: 12 additions & 0 deletions controlplane/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,18 @@ func main() {
setupLog.Error(err, "unable to create controller", "controller", "KThreesControlPlane")
os.Exit(1)
}

ctrMachineLogger := ctrl.Log.WithName("controllers").WithName("Machine")
if err = (&controllers.MachineReconciler{
Client: mgr.GetClient(),
Log: ctrMachineLogger,
Scheme: mgr.GetScheme(),
EtcdDialTimeout: etcdDialTimeout,
EtcdCallTimeout: etcdCallTimeout,
}).SetupWithManager(ctx, mgr, &ctrMachineLogger); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Machine")
os.Exit(1)
}
// +kubebuilder:scaffold:builder

setupLog.Info("starting manager")
Expand Down
Loading

0 comments on commit 6b8c220

Please sign in to comment.