From 21b2a6cbd866947e8b643eac176caf61f354c785 Mon Sep 17 00:00:00 2001 From: Tim Ebert Date: Fri, 3 Nov 2023 15:38:23 +0100 Subject: [PATCH] Add clusterring controller --- cmd/sharder/app/options.go | 1 + config/rbac/role.yaml | 15 ++++ pkg/controller/add.go | 5 ++ pkg/controller/clusterring/add.go | 105 +++++++++++++++++++++++ pkg/controller/clusterring/reconciler.go | 78 +++++++++++++++++ 5 files changed, 204 insertions(+) create mode 100644 pkg/controller/clusterring/add.go create mode 100644 pkg/controller/clusterring/reconciler.go diff --git a/cmd/sharder/app/options.go b/cmd/sharder/app/options.go index d4b54fe5..37f9ddde 100644 --- a/cmd/sharder/app/options.go +++ b/cmd/sharder/app/options.go @@ -54,6 +54,7 @@ var scheme = runtime.NewScheme() func init() { schemeBuilder := runtime.NewSchemeBuilder( clientgoscheme.AddToScheme, + shardingv1alpha1.AddToScheme, configv1alpha1.AddToScheme, ) utilruntime.Must(schemeBuilder.AddToScheme(scheme)) diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 1cd40c71..3c5a2a74 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -15,3 +15,18 @@ rules: - patch - update - watch +- apiGroups: + - sharding.timebertt.dev + resources: + - clusterrings + verbs: + - get + - list + - watch +- apiGroups: + - sharding.timebertt.dev + resources: + - clusterrings/status + verbs: + - patch + - update diff --git a/pkg/controller/add.go b/pkg/controller/add.go index 61ce0f99..6eb4c696 100644 --- a/pkg/controller/add.go +++ b/pkg/controller/add.go @@ -23,10 +23,15 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" configv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/config/v1alpha1" + "github.com/timebertt/kubernetes-controller-sharding/pkg/controller/clusterring" "github.com/timebertt/kubernetes-controller-sharding/pkg/controller/shardlease" ) func AddToManager(ctx context.Context, mgr manager.Manager, cfg *configv1alpha1.SharderConfig) error { + if err := (&clusterring.Reconciler{}).AddToManager(mgr); err != nil { + return fmt.Errorf("failed adding clusterring controller: %w", err) + } + if err := (&shardlease.Reconciler{}).AddToManager(mgr); err != nil { return fmt.Errorf("failed adding lease controller: %w", err) } diff --git a/pkg/controller/clusterring/add.go b/pkg/controller/clusterring/add.go new file mode 100644 index 00000000..5160900e --- /dev/null +++ b/pkg/controller/clusterring/add.go @@ -0,0 +1,105 @@ +/* +Copyright 2023 Tim Ebert. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package clusterring + +import ( + "context" + + coordinationv1 "k8s.io/api/coordination/v1" + "k8s.io/utils/clock" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + shardingv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/sharding/v1alpha1" + "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/leases" +) + +// ControllerName is the name of this controller. +const ControllerName = "clusterring" + +// AddToManager adds Reconciler to the given manager. +func (r *Reconciler) AddToManager(mgr manager.Manager) error { + if r.Client == nil { + r.Client = mgr.GetClient() + } + if r.Clock == nil { + r.Clock = clock.RealClock{} + } + + return builder.ControllerManagedBy(mgr). + Named(ControllerName). + For(&shardingv1alpha1.ClusterRing{}, builder.WithPredicates(r.ClusterRingPredicate())). + Watches(&coordinationv1.Lease{}, handler.EnqueueRequestsFromMapFunc(MapLeaseToClusterRing), builder.WithPredicates(r.LeasePredicate())). + WithOptions(controller.Options{ + MaxConcurrentReconciles: 5, + }). + Complete(r) +} + +func (r *Reconciler) ClusterRingPredicate() predicate.Predicate { + return predicate.And( + predicate.GenerationChangedPredicate{}, + // ignore deletion of ClusterRings + predicate.Funcs{ + CreateFunc: func(_ event.CreateEvent) bool { return true }, + UpdateFunc: func(_ event.UpdateEvent) bool { return true }, + DeleteFunc: func(_ event.DeleteEvent) bool { return false }, + }, + ) +} + +func MapLeaseToClusterRing(ctx context.Context, obj client.Object) []reconcile.Request { + ring, ok := obj.GetLabels()[shardingv1alpha1.LabelClusterRing] + if !ok { + return nil + } + + return []reconcile.Request{{NamespacedName: client.ObjectKey{Name: ring}}} +} + +func (r *Reconciler) LeasePredicate() predicate.Predicate { + return predicate.And( + predicate.NewPredicateFuncs(isShardLease), + predicate.Funcs{ + CreateFunc: func(_ event.CreateEvent) bool { return true }, + UpdateFunc: func(e event.UpdateEvent) bool { + oldLease, ok := e.ObjectOld.(*coordinationv1.Lease) + if !ok { + return false + } + newLease, ok := e.ObjectNew.(*coordinationv1.Lease) + if !ok { + return false + } + + // only enqueue ring if the shard's state changed + return leases.ToState(oldLease, r.Clock) != leases.ToState(newLease, r.Clock) + }, + DeleteFunc: func(_ event.DeleteEvent) bool { return true }, + }, + ) +} + +func isShardLease(obj client.Object) bool { + return obj.GetLabels()[shardingv1alpha1.LabelClusterRing] != "" +} diff --git a/pkg/controller/clusterring/reconciler.go b/pkg/controller/clusterring/reconciler.go new file mode 100644 index 00000000..7e3875ab --- /dev/null +++ b/pkg/controller/clusterring/reconciler.go @@ -0,0 +1,78 @@ +/* +Copyright 2023 Tim Ebert. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package clusterring + +import ( + "context" + "fmt" + + coordinationv1 "k8s.io/api/coordination/v1" + apiequality "k8s.io/apimachinery/pkg/api/equality" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/utils/clock" + "sigs.k8s.io/controller-runtime/pkg/client" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + shardingv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/sharding/v1alpha1" + "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/leases" +) + +//+kubebuilder:rbac:groups=sharding.timebertt.dev,resources=clusterrings,verbs=get;list;watch +//+kubebuilder:rbac:groups=sharding.timebertt.dev,resources=clusterrings/status,verbs=update;patch + +// Reconciler reconciles ClusterRings. +type Reconciler struct { + Client client.Client + Clock clock.PassiveClock +} + +// Reconcile reconciles a ClusterRing object. +func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + log := logf.FromContext(ctx) + + clusterRing := &shardingv1alpha1.ClusterRing{} + if err := r.Client.Get(ctx, req.NamespacedName, clusterRing); err != nil { + if apierrors.IsNotFound(err) { + log.V(1).Info("Object is gone, stop reconciling") + return reconcile.Result{}, nil + } + return reconcile.Result{}, fmt.Errorf("error retrieving object from store: %w", err) + } + + clusterRingCopy := clusterRing.DeepCopy() + + // update status with the latest observed generation + clusterRing.Status.ObservedGeneration = clusterRing.Generation + + leaseList := &coordinationv1.LeaseList{} + if err := r.Client.List(ctx, leaseList, client.MatchingLabels{shardingv1alpha1.LabelClusterRing: clusterRing.Name}); err != nil { + return reconcile.Result{}, fmt.Errorf("error listing Leases for ClusterRing: %w", err) + } + + shards := leases.ToShards(leaseList.Items, r.Clock) + clusterRing.Status.Shards = int32(len(shards)) + clusterRing.Status.AvailableShards = int32(len(shards.AvailableShards())) + + if !apiequality.Semantic.DeepEqual(clusterRing.Status, clusterRingCopy.Status) { + if err := r.Client.Status().Update(ctx, clusterRing); err != nil { + return reconcile.Result{}, fmt.Errorf("error updating ClusterRing status: %w", err) + } + } + + return reconcile.Result{}, nil +}