Skip to content

Commit

Permalink
move high level logic to reconciler methods
Browse files Browse the repository at this point in the history
  • Loading branch information
lllamnyp committed Nov 11, 2024
1 parent 9c64f22 commit 33d925a
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 43 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ RUN go mod download
# Copy the go source
COPY cmd/ ./cmd/
COPY api/ ./api/
COPY pkg/ ./pkg/
COPY internal/ ./internal/

# Build
Expand Down
103 changes: 60 additions & 43 deletions internal/controller/etcdcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type EtcdClusterReconciler struct {
// Reconcile checks CR and current cluster state and performs actions to transform current state to desired.
func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log.Debug(ctx, "reconciling object")
state := observables{}
state := &observables{}
state.instance = &etcdaenixiov1alpha1.EtcdCluster{}
err := r.Get(ctx, req.NamespacedName, state.instance)
if err != nil {
Expand All @@ -92,7 +92,7 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}

// create two services and the pdb
err = r.ensureUnconditionalObjects(ctx, state.instance)
err = r.ensureUnconditionalObjects(ctx, state)
if err != nil {
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -123,7 +123,7 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)

if !state.endpointsFound {
if !state.stsExists {
return r.createClusterFromScratch(ctx, &state) // TODO: needs implementing
return r.createClusterFromScratch(ctx, state) // TODO: needs implementing
}

// update sts pod template (and only pod template) if it doesn't match desired state
Expand All @@ -146,7 +146,7 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, nil
}

return r.scaleUpFromZero(ctx, &state) // TODO: needs implementing
return ctrl.Result{}, r.scaleUpFromZero(ctx) // TODO: needs implementing
}

// get status of every endpoint and member list from every endpoint
Expand Down Expand Up @@ -174,7 +174,7 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}

if !memberReached {
return r.createOrUpdateStatefulSet(ctx, &state, state.instance)
return ctrl.Result{}, r.createOrUpdateStatefulSet(ctx)
}

state.setClusterID()
Expand All @@ -189,7 +189,7 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
Message: string(etcdaenixiov1alpha1.EtcdErrorCondSplitbrainMessage),
},
)
return r.updateStatus(ctx, state.instance)
return r.updateStatus(ctx, state)
}

if !state.clusterHasQuorum() {
Expand All @@ -198,19 +198,19 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}

if state.hasLearners() {
return ctrl.Result{}, r.promoteLearners(ctx, &state)
return ctrl.Result{}, r.promoteLearners(ctx)
}

if err := r.createOrUpdateClusterStateConfigMap(ctx, &state); err != nil {
if err := r.createOrUpdateClusterStateConfigMap(ctx); err != nil {
return ctrl.Result{}, err
}

if !state.statefulSetPodSpecCorrect() {
return ctrl.Result{}, r.createOrUpdateStatefulSet(ctx, &state)
return ctrl.Result{}, r.createOrUpdateStatefulSet(ctx)
}

// if size is different we have to remove statefulset it will be recreated in the next step
if err := r.checkAndDeleteStatefulSetIfNecessary(ctx, &state, state.instance); err != nil {
if err := r.checkAndDeleteStatefulSetIfNecessary(ctx, state); err != nil {
return ctrl.Result{}, err
}

Expand All @@ -226,17 +226,17 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
},
)
*/
return r.updateStatus(ctx, state.instance)
return r.updateStatus(ctx, state)
}

// checkAndDeleteStatefulSetIfNecessary deletes the StatefulSet if the specified storage size has changed.
func (r *EtcdClusterReconciler) checkAndDeleteStatefulSetIfNecessary(ctx context.Context, state *observables, instance *etcdaenixiov1alpha1.EtcdCluster) error {
func (r *EtcdClusterReconciler) checkAndDeleteStatefulSetIfNecessary(ctx context.Context, state *observables) error {
for _, volumeClaimTemplate := range state.statefulSet.Spec.VolumeClaimTemplates {
if volumeClaimTemplate.Name != "data" {
continue
}
currentStorage := volumeClaimTemplate.Spec.Resources.Requests[corev1.ResourceStorage]
desiredStorage := instance.Spec.Storage.VolumeClaimTemplate.Spec.Resources.Requests[corev1.ResourceStorage]
desiredStorage := state.instance.Spec.Storage.VolumeClaimTemplate.Spec.Resources.Requests[corev1.ResourceStorage]
if desiredStorage.Cmp(currentStorage) != 0 {
deletePolicy := metav1.DeletePropagationOrphan
log.Info(ctx, "Deleting StatefulSet due to storage change", "statefulSet", state.statefulSet.Name)
Expand All @@ -252,21 +252,20 @@ func (r *EtcdClusterReconciler) checkAndDeleteStatefulSetIfNecessary(ctx context
}

// ensureConditionalClusterObjects creates or updates all objects owned by cluster CR
func (r *EtcdClusterReconciler) ensureConditionalClusterObjects(
ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) error {
func (r *EtcdClusterReconciler) ensureConditionalClusterObjects(ctx context.Context, state *observables) error {

if err := factory.CreateOrUpdateClusterStateConfigMap(ctx, cluster, r.Client); err != nil {
if err := factory.CreateOrUpdateClusterStateConfigMap(ctx, state.instance, r.Client); err != nil {
log.Error(ctx, err, "reconcile cluster state configmap failed")
return err
}
log.Debug(ctx, "cluster state configmap reconciled")

if err := factory.CreateOrUpdateStatefulSet(ctx, cluster, r.Client); err != nil {
if err := factory.CreateOrUpdateStatefulSet(ctx, state.instance, r.Client); err != nil {
log.Error(ctx, err, "reconcile statefulset failed")
return err
}

if err := factory.UpdatePersistentVolumeClaims(ctx, cluster, r.Client); err != nil {
if err := factory.UpdatePersistentVolumeClaims(ctx, state.instance, r.Client); err != nil {
log.Error(ctx, err, "reconcile persistentVolumeClaims failed")
return err
}
Expand All @@ -276,20 +275,20 @@ func (r *EtcdClusterReconciler) ensureConditionalClusterObjects(
}

// updateStatusOnErr wraps error and updates EtcdCluster status
func (r *EtcdClusterReconciler) updateStatusOnErr(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster, err error) (ctrl.Result, error) {
func (r *EtcdClusterReconciler) updateStatusOnErr(ctx context.Context, state *observables, err error) (ctrl.Result, error) {
// The function 'updateStatusOnErr' will always return non-nil error. Hence, the ctrl.Result will always be ignored.
// Therefore, the ctrl.Result returned by 'updateStatus' function can be discarded.
// REF: https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/[email protected]#Reconciler
_, statusErr := r.updateStatus(ctx, cluster)
_, statusErr := r.updateStatus(ctx, state)
if statusErr != nil {
return ctrl.Result{}, goerrors.Join(statusErr, err)
}
return ctrl.Result{}, err
}

// updateStatus updates EtcdCluster status and returns error and requeue in case status could not be updated due to conflict
func (r *EtcdClusterReconciler) updateStatus(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) (ctrl.Result, error) {
err := r.Status().Update(ctx, cluster)
func (r *EtcdClusterReconciler) updateStatus(ctx context.Context, state *observables) (ctrl.Result, error) {
err := r.Status().Update(ctx, state.instance)
if err == nil {
return ctrl.Result{}, nil
}
Expand All @@ -302,9 +301,9 @@ func (r *EtcdClusterReconciler) updateStatus(ctx context.Context, cluster *etcda
}

// isStatefulSetReady gets managed StatefulSet and checks its readiness.
func (r *EtcdClusterReconciler) isStatefulSetReady(ctx context.Context, c *etcdaenixiov1alpha1.EtcdCluster) (bool, error) {
func (r *EtcdClusterReconciler) isStatefulSetReady(ctx context.Context, state *observables) (bool, error) {
sts := &appsv1.StatefulSet{}
err := r.Get(ctx, client.ObjectKeyFromObject(c), sts)
err := r.Get(ctx, client.ObjectKeyFromObject(state.instance), sts)
if err == nil {
return sts.Status.ReadyReplicas == *sts.Spec.Replicas, nil
}
Expand All @@ -322,11 +321,11 @@ func (r *EtcdClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}

func (r *EtcdClusterReconciler) configureAuth(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) error {
func (r *EtcdClusterReconciler) configureAuth(ctx context.Context, state *observables) error {

var err error

cli, err := r.GetEtcdClient(ctx, cluster)
cli, err := r.GetEtcdClient(ctx, state.instance)
if err != nil {
return err
}
Expand All @@ -342,7 +341,7 @@ func (r *EtcdClusterReconciler) configureAuth(ctx context.Context, cluster *etcd

auth := clientv3.NewAuth(cli)

if cluster.Spec.Security != nil && cluster.Spec.Security.EnableAuth {
if state.instance.Spec.Security != nil && state.instance.Spec.Security.EnableAuth {

if err := r.createRoleIfNotExists(ctx, auth, "root"); err != nil {
return err
Expand Down Expand Up @@ -393,12 +392,12 @@ func testMemberList(ctx context.Context, cli *clientv3.Client) error {
return err
}

func (r *EtcdClusterReconciler) GetEtcdClient(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) (*clientv3.Client, error) {
func (r *EtcdClusterReconciler) GetEtcdClient(ctx context.Context, instance *etcdaenixiov1alpha1.EtcdCluster) (*clientv3.Client, error) {

endpoints := getEndpointsSlice(cluster)
endpoints := getEndpointsSlice(instance)
log.Debug(ctx, "endpoints built", "endpoints", endpoints)

tlsConfig, err := r.getTLSConfig(ctx, cluster)
tlsConfig, err := r.getTLSConfig(ctx, instance)
if err != nil {
log.Error(ctx, err, "failed to build tls config")
return nil, err
Expand All @@ -421,17 +420,17 @@ func (r *EtcdClusterReconciler) GetEtcdClient(ctx context.Context, cluster *etcd

}

func (r *EtcdClusterReconciler) getTLSConfig(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) (*tls.Config, error) {
func (r *EtcdClusterReconciler) getTLSConfig(ctx context.Context, instance *etcdaenixiov1alpha1.EtcdCluster) (*tls.Config, error) {

var err error

caCertPool := &x509.CertPool{}

if cluster.IsServerTrustedCADefined() {
if instance.IsServerTrustedCADefined() {

serverCASecret := &corev1.Secret{}

if err = r.Get(ctx, client.ObjectKey{Namespace: cluster.Namespace, Name: cluster.Spec.Security.TLS.ServerTrustedCASecret}, serverCASecret); err != nil {
if err = r.Get(ctx, client.ObjectKey{Namespace: instance.Namespace, Name: instance.Spec.Security.TLS.ServerTrustedCASecret}, serverCASecret); err != nil {
log.Error(ctx, err, "failed to get server trusted CA secret")
return nil, err
}
Expand All @@ -448,10 +447,10 @@ func (r *EtcdClusterReconciler) getTLSConfig(ctx context.Context, cluster *etcda

cert := tls.Certificate{}

if cluster.IsClientSecurityEnabled() {
if instance.IsClientSecurityEnabled() {

rootSecret := &corev1.Secret{}
if err = r.Get(ctx, client.ObjectKey{Namespace: cluster.Namespace, Name: cluster.Spec.Security.TLS.ClientSecret}, rootSecret); err != nil {
if err = r.Get(ctx, client.ObjectKey{Namespace: instance.Namespace, Name: instance.Spec.Security.TLS.ClientSecret}, rootSecret); err != nil {
log.Error(ctx, err, "failed to get root client secret")
return nil, err
}
Expand All @@ -465,7 +464,7 @@ func (r *EtcdClusterReconciler) getTLSConfig(ctx context.Context, cluster *etcda
}

tlsConfig := &tls.Config{
InsecureSkipVerify: !cluster.IsServerTrustedCADefined(),
InsecureSkipVerify: !instance.IsServerTrustedCADefined(),
RootCAs: caCertPool,
Certificates: []tls.Certificate{
cert,
Expand Down Expand Up @@ -602,7 +601,7 @@ func (r *EtcdClusterReconciler) disableAuth(ctx context.Context, authClient clie
// ensureUnconditionalObjects creates the two services and the PDB
// which can be created at the start of the reconciliation loop
// without any risk of disrupting the etcd cluster
func (r *EtcdClusterReconciler) ensureUnconditionalObjects(ctx context.Context, instance *etcdaenixiov1alpha1.EtcdCluster) error {
func (r *EtcdClusterReconciler) ensureUnconditionalObjects(ctx context.Context, state *observables) error {
const concurrentOperations = 3
c := make(chan error)
defer close(c)
Expand All @@ -620,23 +619,23 @@ func (r *EtcdClusterReconciler) ensureUnconditionalObjects(ctx context.Context,
defer wg.Done()
select {
case <-ctx.Done():
case c <- wrapWithMsg(factory.CreateOrUpdateClientService(ctx, instance, r.Client),
case c <- wrapWithMsg(factory.CreateOrUpdateClientService(ctx, state.instance, r.Client),
"couldn't ensure client service"):
}
}(c)
go func(chan<- error) {
defer wg.Done()
select {
case <-ctx.Done():
case c <- wrapWithMsg(factory.CreateOrUpdateHeadlessService(ctx, instance, r.Client),
case c <- wrapWithMsg(factory.CreateOrUpdateHeadlessService(ctx, state.instance, r.Client),
"couldn't ensure headless service"):
}
}(c)
go func(chan<- error) {
defer wg.Done()
select {
case <-ctx.Done():
case c <- wrapWithMsg(factory.CreateOrUpdatePdb(ctx, instance, r.Client),
case c <- wrapWithMsg(factory.CreateOrUpdatePdb(ctx, state.instance, r.Client),
"couldn't ensure pod disruption budget"):
}
}(c)
Expand Down Expand Up @@ -698,14 +697,32 @@ func (r *EtcdClusterReconciler) createClusterFromScratch(ctx context.Context, st
)

// ensure managed resources
if err = r.ensureConditionalClusterObjects(ctx, state.instance); err != nil {
return r.updateStatusOnErr(ctx, state.instance, fmt.Errorf("cannot create Cluster auxiliary objects: %w", err))
if err = r.ensureConditionalClusterObjects(ctx, state); err != nil {
return r.updateStatusOnErr(ctx, state, fmt.Errorf("cannot create Cluster auxiliary objects: %w", err))
}
panic("not yet implemented")
}

// TODO!
// nolint:unused
func (r *EtcdClusterReconciler) scaleUpFromZero(ctx context.Context, state *observables) (ctrl.Result, error) {
func (r *EtcdClusterReconciler) scaleUpFromZero(ctx context.Context) error {
panic("not yet implemented")
}

// TODO!
// nolint:unused
func (r *EtcdClusterReconciler) createOrUpdateClusterStateConfigMap(ctx context.Context) error {
panic("not yet implemented")
}

// TODO!
// nolint:unused
func (r *EtcdClusterReconciler) createOrUpdateStatefulSet(ctx context.Context) error {
panic("not yet implemented")
}

// TODO!
// nolint:unused
func (r *EtcdClusterReconciler) promoteLearners(ctx context.Context) error {
panic("not yet implemented")
}
10 changes: 10 additions & 0 deletions internal/controller/observables.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,13 @@ func (o *observables) statefulSetPodSpecCorrect() bool {
func (o *observables) statefulSetReady() bool {
return o.statefulSet.Status.ReadyReplicas == *o.statefulSet.Spec.Replicas
}

// TODO:
func (o *observables) clusterHasQuorum() bool {
return false
}

// TODO:
func (o *observables) hasLearners() bool {
return false
}

0 comments on commit 33d925a

Please sign in to comment.