Skip to content

Commit

Permalink
Add lease controller
Browse files Browse the repository at this point in the history
  • Loading branch information
timebertt committed Nov 3, 2023
1 parent 1c7f789 commit fa9d722
Show file tree
Hide file tree
Showing 5 changed files with 242 additions and 4 deletions.
30 changes: 27 additions & 3 deletions cmd/sharder/app/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@ import (
"net/http"
"os"
goruntime "runtime"
"strconv"

"github.com/spf13/pflag"
"go.uber.org/zap/zapcore"
coordinationv1 "k8s.io/api/coordination/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/selection"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
Expand All @@ -41,6 +45,7 @@ import (
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

configv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/config/v1alpha1"
shardingv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/sharding/v1alpha1"
"github.com/timebertt/kubernetes-controller-sharding/pkg/utils/routes"
)

Expand Down Expand Up @@ -110,14 +115,13 @@ func (o *options) complete() error {
Scheme: scheme,
// allows us to quickly handover leadership on restarts
LeaderElectionReleaseOnCancel: true,
Cache: cache.Options{
DefaultTransform: dropUnwantedMetadata,
},
Controller: controllerconfig.Controller{
RecoverPanic: ptr.To(true),
},
}
o.applyConfigToManagerOptions()
o.applyCacheOptions()

if err := o.applyOptionsOverrides(); err != nil {
return err
}
Expand Down Expand Up @@ -169,6 +173,26 @@ func (o *options) applyConfigToManagerOptions() {
o.managerOptions.GracefulShutdownTimeout = ptr.To(o.config.GracefulShutdownTimeout.Duration)
}

func (o *options) applyCacheOptions() {
// filter lease cache for shard leases to avoid watching all leases in cluster
leaseSelector := labels.NewSelector()
{
ringRequirement, err := labels.NewRequirement(shardingv1alpha1.LabelRing, selection.Exists, nil)
utilruntime.Must(err)
leaseSelector.Add(*ringRequirement)
}

o.managerOptions.Cache = cache.Options{
DefaultTransform: dropUnwantedMetadata,

ByObject: map[client.Object]cache.ByObject{
&coordinationv1.Lease{}: {
Label: leaseSelector,
},
},
}
}

func (o *options) applyOptionsOverrides() error {
var err error

Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/spf13/pflag v1.0.5
go.uber.org/automaxprocs v1.5.3
go.uber.org/zap v1.26.0
k8s.io/api v0.28.3
k8s.io/apimachinery v0.28.3
k8s.io/client-go v0.28.3
k8s.io/code-generator v0.28.3
Expand Down Expand Up @@ -102,7 +103,7 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.28.3 // indirect
k8s.io/apiextensions-apiserver v0.28.3 // indirect
k8s.io/apiserver v0.28.3 // indirect
k8s.io/gengo v0.0.0-20230829151522-9cce18d56c01 // indirect
k8s.io/kms v0.28.3 // indirect
Expand Down
6 changes: 6 additions & 0 deletions pkg/controller/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@ package controller

import (
"context"
"fmt"

"sigs.k8s.io/controller-runtime/pkg/manager"

configv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/config/v1alpha1"
"github.com/timebertt/kubernetes-controller-sharding/pkg/controller/lease"
)

func AddToManager(ctx context.Context, mgr manager.Manager, cfg *configv1alpha1.SharderConfig) error {
if err := (&lease.Reconciler{}).AddToManager(mgr); err != nil {
return fmt.Errorf("failed adding lease controller: %w", err)
}

return nil
}
83 changes: 83 additions & 0 deletions pkg/controller/lease/add.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
Copyright 2023 Tim Ebert.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package lease

import (
coordinationv1 "k8s.io/api/coordination/v1"
"k8s.io/utils/clock"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"

shardingv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/sharding/v1alpha1"
"github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/leases"
)

// ControllerName is the name of this controller.
const ControllerName = "lease"

// AddToManager adds Reconciler to the given manager.
func (r *Reconciler) AddToManager(mgr manager.Manager) error {
if r.Client == nil {
r.Client = mgr.GetClient()
}
if r.Clock == nil {
r.Clock = clock.RealClock{}
}

return builder.ControllerManagedBy(mgr).
Named(ControllerName).
For(&coordinationv1.Lease{}, builder.WithPredicates(r.LeasePredicate())).
WithOptions(controller.Options{
MaxConcurrentReconciles: 5,
}).
Complete(r)
}

func (r *Reconciler) LeasePredicate() predicate.Predicate {
// ignore deletion of shard leases
return predicate.And(
predicate.NewPredicateFuncs(func(obj client.Object) bool {
return isShardLease(obj)
}),
predicate.Funcs{
CreateFunc: func(_ event.CreateEvent) bool { return true },
UpdateFunc: func(e event.UpdateEvent) bool {
oldLease, ok := e.ObjectOld.(*coordinationv1.Lease)
if !ok {
return false
}
newLease, ok := e.ObjectNew.(*coordinationv1.Lease)
if !ok {
return false
}

return leases.ToState(oldLease, r.Clock) != leases.ToState(newLease, r.Clock)
},
// TODO(timebertt): we need to handle removal of the ring label which is received as a DELETE event because of the
// label selector on the lease cache.
DeleteFunc: func(_ event.DeleteEvent) bool { return false },
},
)
}

func isShardLease(obj client.Object) bool {
return obj.GetLabels()[shardingv1alpha1.LabelRing] != ""
}
124 changes: 124 additions & 0 deletions pkg/controller/lease/reconciler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
Copyright 2023 Tim Ebert.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package lease

import (
"context"
"fmt"
"time"

coordinationv1 "k8s.io/api/coordination/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/clock"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/leases"
)

const (
sharderIdentity = "sharder"
stateLabel = "state"
)

// Reconciler reconciles shard leases.
type Reconciler struct {
Client client.Client
Clock clock.Clock
}

// Reconcile reconciles a Lease object.
func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
log := logf.FromContext(ctx)

lease := &coordinationv1.Lease{}
if err := r.Client.Get(ctx, req.NamespacedName, lease); err != nil {
if apierrors.IsNotFound(err) {
log.V(1).Info("Object is gone, stop reconciling")
return reconcile.Result{}, nil
}
return reconcile.Result{}, fmt.Errorf("error retrieving object from store: %w", err)
}

if isShardLease(lease) {
log.V(1).Info("Ignoring non-shard lease")
return reconcile.Result{}, nil
}

var (
previousState = leases.StateFromString(lease.Labels[stateLabel])
shard = leases.ToShard(lease, r.Clock)
)
log = log.WithValues("state", shard.State, "expirationTime", shard.Times.Expiration, "leaseDuration", shard.Times.LeaseDuration)

// maintain state label
if previousState != shard.State {
patch := client.MergeFromWithOptions(lease.DeepCopy(), client.MergeFromWithOptimisticLock{})
metav1.SetMetaDataLabel(&lease.ObjectMeta, stateLabel, shard.State.String())
if err := r.Client.Patch(ctx, lease, patch); err != nil {
return reconcile.Result{}, fmt.Errorf("failed to update state label on lease: %w", err)
}
}

// act on state and determine when to check again
var requeueAfter time.Duration
switch shard.State {
case leases.Ready:
if previousState != leases.Ready {
log.Info("Shard got ready")
}
requeueAfter = shard.Times.ToExpired
case leases.Expired:
log.Info("Shard lease has expired")
requeueAfter = shard.Times.ToUncertain
case leases.Uncertain:
log.Info("Shard lease has expired more than leaseDuration ago, trying to acquire shard lease")

now := metav1.NewMicroTime(r.Clock.Now())
transitions := int32(0)
if lease.Spec.LeaseTransitions != nil {
transitions = *lease.Spec.LeaseTransitions
}

lease.Spec.HolderIdentity = ptr.To(sharderIdentity)
lease.Spec.LeaseDurationSeconds = ptr.To(2 * int32(shard.Times.LeaseDuration.Round(time.Second).Seconds()))
lease.Spec.AcquireTime = &now
lease.Spec.RenewTime = &now
lease.Spec.LeaseTransitions = ptr.To(transitions + 1)
if err := r.Client.Update(ctx, lease); err != nil {
return reconcile.Result{}, fmt.Errorf("error acquiring shard lease: %w", err)
}

// lease will be enqueued once we observe our previous update via watch
// requeue with leaseDuration just to be sure
requeueAfter = shard.Times.LeaseDuration
case leases.Dead:
// garbage collect later
requeueAfter = shard.Times.ToOrphaned
case leases.Orphaned:
// garbage collect and forget orphaned leases
return reconcile.Result{}, r.Client.Delete(ctx, lease)
default:
// Unknown, forget lease
return reconcile.Result{}, nil
}

return reconcile.Result{RequeueAfter: requeueAfter}, nil
}

0 comments on commit fa9d722

Please sign in to comment.