From e64c5d719217ff7f3b9461a166c9a40a5d556e28 Mon Sep 17 00:00:00 2001 From: Tim Ebert Date: Fri, 3 Nov 2023 11:44:52 +0100 Subject: [PATCH] Add lease controller --- Makefile | 1 + cmd/sharder/app/options.go | 30 ++++++- config/rbac/kustomization.yaml | 12 +++ config/rbac/role.yaml | 17 ++++ config/rbac/rolebinding.yaml | 13 +++ go.mod | 3 +- pkg/controller/add.go | 6 ++ pkg/controller/lease/add.go | 79 +++++++++++++++++ pkg/controller/lease/reconciler.go | 132 +++++++++++++++++++++++++++++ 9 files changed, 289 insertions(+), 4 deletions(-) create mode 100644 config/rbac/role.yaml create mode 100644 config/rbac/rolebinding.yaml create mode 100644 pkg/controller/lease/add.go create mode 100644 pkg/controller/lease/reconciler.go diff --git a/Makefile b/Makefile index f0ecbedb..8bb01275 100644 --- a/Makefile +++ b/Makefile @@ -49,6 +49,7 @@ modules: ## Runs go mod to ensure modules are up to date. .PHONY: generate generate: $(CONTROLLER_GEN) modules ## Run all code generators + $(CONTROLLER_GEN) rbac:roleName=sharder paths="./pkg/..." output:rbac:artifacts:config=config/rbac $(CONTROLLER_GEN) object:headerFile="hack/boilerplate.go.txt" paths="./pkg/..." hack/update-codegen.sh diff --git a/cmd/sharder/app/options.go b/cmd/sharder/app/options.go index 2ea1beaa..d4b54fe5 100644 --- a/cmd/sharder/app/options.go +++ b/cmd/sharder/app/options.go @@ -22,11 +22,15 @@ import ( "net/http" "os" goruntime "runtime" + "strconv" "github.com/spf13/pflag" "go.uber.org/zap/zapcore" + coordinationv1 "k8s.io/api/coordination/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/selection" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" @@ -41,6 +45,7 @@ import ( metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" configv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/config/v1alpha1" + shardingv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/sharding/v1alpha1" "github.com/timebertt/kubernetes-controller-sharding/pkg/utils/routes" ) @@ -110,14 +115,13 @@ func (o *options) complete() error { Scheme: scheme, // allows us to quickly handover leadership on restarts LeaderElectionReleaseOnCancel: true, - Cache: cache.Options{ - DefaultTransform: dropUnwantedMetadata, - }, Controller: controllerconfig.Controller{ RecoverPanic: ptr.To(true), }, } o.applyConfigToManagerOptions() + o.applyCacheOptions() + if err := o.applyOptionsOverrides(); err != nil { return err } @@ -169,6 +173,26 @@ func (o *options) applyConfigToManagerOptions() { o.managerOptions.GracefulShutdownTimeout = ptr.To(o.config.GracefulShutdownTimeout.Duration) } +func (o *options) applyCacheOptions() { + // filter lease cache for shard leases to avoid watching all leases in cluster + leaseSelector := labels.NewSelector() + { + ringRequirement, err := labels.NewRequirement(shardingv1alpha1.LabelClusterRing, selection.Exists, nil) + utilruntime.Must(err) + leaseSelector.Add(*ringRequirement) + } + + o.managerOptions.Cache = cache.Options{ + DefaultTransform: dropUnwantedMetadata, + + ByObject: map[client.Object]cache.ByObject{ + &coordinationv1.Lease{}: { + Label: leaseSelector, + }, + }, + } +} + func (o *options) applyOptionsOverrides() error { var err error diff --git a/config/rbac/kustomization.yaml b/config/rbac/kustomization.yaml index c2d2d6ae..6159a950 100644 --- a/config/rbac/kustomization.yaml +++ b/config/rbac/kustomization.yaml @@ -5,3 +5,15 @@ resources: - serviceaccount.yaml - leader_election.yaml - metrics_auth.yaml +- role.yaml +- rolebinding.yaml + +patches: +# This is a workaround for controller-gen not being able to handle colons in the role name option. +- target: + kind: ClusterRole + name: sharder + patch: | + - op: replace + path: /metadata/name + value: sharding:sharder diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml new file mode 100644 index 00000000..1cd40c71 --- /dev/null +++ b/config/rbac/role.yaml @@ -0,0 +1,17 @@ +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: sharder +rules: +- apiGroups: + - coordination.k8s.io + resources: + - leases + verbs: + - delete + - get + - list + - patch + - update + - watch diff --git a/config/rbac/rolebinding.yaml b/config/rbac/rolebinding.yaml new file mode 100644 index 00000000..29d9159d --- /dev/null +++ b/config/rbac/rolebinding.yaml @@ -0,0 +1,13 @@ +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: sharding:sharder +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: sharding:sharder +subjects: +- kind: ServiceAccount + name: sharder + namespace: sharding-system diff --git a/go.mod b/go.mod index 7d3da75b..4a0d847f 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/spf13/pflag v1.0.5 go.uber.org/automaxprocs v1.5.3 go.uber.org/zap v1.26.0 + k8s.io/api v0.28.3 k8s.io/apimachinery v0.28.3 k8s.io/client-go v0.28.3 k8s.io/code-generator v0.28.3 @@ -102,7 +103,7 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/api v0.28.3 // indirect + k8s.io/apiextensions-apiserver v0.28.3 // indirect k8s.io/apiserver v0.28.3 // indirect k8s.io/gengo v0.0.0-20230829151522-9cce18d56c01 // indirect k8s.io/kms v0.28.3 // indirect diff --git a/pkg/controller/add.go b/pkg/controller/add.go index 094bbcca..347db3f5 100644 --- a/pkg/controller/add.go +++ b/pkg/controller/add.go @@ -18,12 +18,18 @@ package controller import ( "context" + "fmt" "sigs.k8s.io/controller-runtime/pkg/manager" configv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/config/v1alpha1" + "github.com/timebertt/kubernetes-controller-sharding/pkg/controller/lease" ) func AddToManager(ctx context.Context, mgr manager.Manager, cfg *configv1alpha1.SharderConfig) error { + if err := (&lease.Reconciler{}).AddToManager(mgr); err != nil { + return fmt.Errorf("failed adding lease controller: %w", err) + } + return nil } diff --git a/pkg/controller/lease/add.go b/pkg/controller/lease/add.go new file mode 100644 index 00000000..63795dee --- /dev/null +++ b/pkg/controller/lease/add.go @@ -0,0 +1,79 @@ +/* +Copyright 2023 Tim Ebert. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package lease + +import ( + coordinationv1 "k8s.io/api/coordination/v1" + "k8s.io/utils/clock" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + shardingv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/sharding/v1alpha1" + "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/leases" +) + +// ControllerName is the name of this controller. +const ControllerName = "lease" + +// AddToManager adds Reconciler to the given manager. +func (r *Reconciler) AddToManager(mgr manager.Manager) error { + if r.Client == nil { + r.Client = mgr.GetClient() + } + if r.Clock == nil { + r.Clock = clock.RealClock{} + } + + return builder.ControllerManagedBy(mgr). + Named(ControllerName). + For(&coordinationv1.Lease{}, builder.WithPredicates(r.LeasePredicate())). + WithOptions(controller.Options{ + MaxConcurrentReconciles: 5, + }). + Complete(r) +} + +func (r *Reconciler) LeasePredicate() predicate.Predicate { + // ignore deletion of shard leases + return predicate.And( + predicate.NewPredicateFuncs(isShardLease), + predicate.Funcs{ + CreateFunc: func(_ event.CreateEvent) bool { return true }, + UpdateFunc: func(e event.UpdateEvent) bool { + oldLease, ok := e.ObjectOld.(*coordinationv1.Lease) + if !ok { + return false + } + newLease, ok := e.ObjectNew.(*coordinationv1.Lease) + if !ok { + return false + } + + return leases.ToState(oldLease, r.Clock) != leases.ToState(newLease, r.Clock) + }, + DeleteFunc: func(_ event.DeleteEvent) bool { return false }, + }, + ) +} + +func isShardLease(obj client.Object) bool { + return obj.GetLabels()[shardingv1alpha1.LabelClusterRing] != "" +} diff --git a/pkg/controller/lease/reconciler.go b/pkg/controller/lease/reconciler.go new file mode 100644 index 00000000..407bf9dc --- /dev/null +++ b/pkg/controller/lease/reconciler.go @@ -0,0 +1,132 @@ +/* +Copyright 2023 Tim Ebert. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package lease + +import ( + "context" + "fmt" + "time" + + coordinationv1 "k8s.io/api/coordination/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/clock" + "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + shardingv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/sharding/v1alpha1" + "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/leases" +) + +//+kubebuilder:rbac:groups=coordination.k8s.io,resources=leases,verbs=get;list;watch;update;patch;delete + +// Reconciler reconciles shard leases. +type Reconciler struct { + Client client.Client + Clock clock.PassiveClock +} + +// Reconcile reconciles a Lease object. +func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + log := logf.FromContext(ctx) + + lease := &coordinationv1.Lease{} + if err := r.Client.Get(ctx, req.NamespacedName, lease); err != nil { + if apierrors.IsNotFound(err) { + log.V(1).Info("Object is gone, stop reconciling") + return reconcile.Result{}, nil + } + return reconcile.Result{}, fmt.Errorf("error retrieving object from store: %w", err) + } + + clusterRingName := lease.Labels[shardingv1alpha1.LabelClusterRing] + if clusterRingName == "" { + log.V(1).Info("Ignoring non-shard lease") + return reconcile.Result{}, nil + } + + if err := r.Client.Get(ctx, client.ObjectKey{Name: clusterRingName}, &shardingv1alpha1.ClusterRing{}); err != nil { + if !apierrors.IsNotFound(err) { + return reconcile.Result{}, fmt.Errorf("error checking for existence of ClusterRing: %w", err) + } + + log.V(1).Info("Ignoring shard lease without a corresponding ClusterRing") + return reconcile.Result{}, nil + } + + var ( + previousState = leases.StateFromString(lease.Labels[shardingv1alpha1.LabelState]) + shard = leases.ToShard(lease, r.Clock) + ) + log = log.WithValues("state", shard.State, "expirationTime", shard.Times.Expiration, "leaseDuration", shard.Times.LeaseDuration) + + // maintain state label + if previousState != shard.State { + patch := client.MergeFromWithOptions(lease.DeepCopy(), client.MergeFromWithOptimisticLock{}) + metav1.SetMetaDataLabel(&lease.ObjectMeta, shardingv1alpha1.LabelState, shard.State.String()) + if err := r.Client.Patch(ctx, lease, patch); err != nil { + return reconcile.Result{}, fmt.Errorf("failed to update state label on lease: %w", err) + } + } + + // act on state and determine when to check again + var requeueAfter time.Duration + switch shard.State { + case leases.Ready: + if previousState != leases.Ready { + log.Info("Shard got ready") + } + requeueAfter = shard.Times.ToExpired + case leases.Expired: + log.Info("Shard lease has expired") + requeueAfter = shard.Times.ToUncertain + case leases.Uncertain: + log.Info("Shard lease has expired more than leaseDuration ago, trying to acquire shard lease") + + now := metav1.NewMicroTime(r.Clock.Now()) + transitions := int32(0) + if lease.Spec.LeaseTransitions != nil { + transitions = *lease.Spec.LeaseTransitions + } + + lease.Spec.HolderIdentity = ptr.To(shardingv1alpha1.IdentityShardLeaseController) + lease.Spec.LeaseDurationSeconds = ptr.To(2 * int32(shard.Times.LeaseDuration.Round(time.Second).Seconds())) + lease.Spec.AcquireTime = &now + lease.Spec.RenewTime = &now + lease.Spec.LeaseTransitions = ptr.To(transitions + 1) + if err := r.Client.Update(ctx, lease); err != nil { + return reconcile.Result{}, fmt.Errorf("error acquiring shard lease: %w", err) + } + + // lease will be enqueued once we observe our previous update via watch + // requeue with leaseDuration just to be sure + requeueAfter = shard.Times.LeaseDuration + case leases.Dead: + // garbage collect later + requeueAfter = shard.Times.ToOrphaned + case leases.Orphaned: + // garbage collect and forget orphaned leases + return reconcile.Result{}, r.Client.Delete(ctx, lease) + default: + // Unknown, forget lease + return reconcile.Result{}, nil + } + + return reconcile.Result{RequeueAfter: requeueAfter}, nil +}