Skip to content

Commit

Permalink
Add lease controller
Browse files Browse the repository at this point in the history
  • Loading branch information
timebertt committed Nov 3, 2023
1 parent 3933420 commit e64c5d7
Show file tree
Hide file tree
Showing 9 changed files with 289 additions and 4 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ modules: ## Runs go mod to ensure modules are up to date.

.PHONY: generate
generate: $(CONTROLLER_GEN) modules ## Run all code generators
$(CONTROLLER_GEN) rbac:roleName=sharder paths="./pkg/..." output:rbac:artifacts:config=config/rbac
$(CONTROLLER_GEN) object:headerFile="hack/boilerplate.go.txt" paths="./pkg/..."
hack/update-codegen.sh

Expand Down
30 changes: 27 additions & 3 deletions cmd/sharder/app/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@ import (
"net/http"
"os"
goruntime "runtime"
"strconv"

"github.com/spf13/pflag"
"go.uber.org/zap/zapcore"
coordinationv1 "k8s.io/api/coordination/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/selection"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
Expand All @@ -41,6 +45,7 @@ import (
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

configv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/config/v1alpha1"
shardingv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/sharding/v1alpha1"
"github.com/timebertt/kubernetes-controller-sharding/pkg/utils/routes"
)

Expand Down Expand Up @@ -110,14 +115,13 @@ func (o *options) complete() error {
Scheme: scheme,
// allows us to quickly handover leadership on restarts
LeaderElectionReleaseOnCancel: true,
Cache: cache.Options{
DefaultTransform: dropUnwantedMetadata,
},
Controller: controllerconfig.Controller{
RecoverPanic: ptr.To(true),
},
}
o.applyConfigToManagerOptions()
o.applyCacheOptions()

if err := o.applyOptionsOverrides(); err != nil {
return err
}
Expand Down Expand Up @@ -169,6 +173,26 @@ func (o *options) applyConfigToManagerOptions() {
o.managerOptions.GracefulShutdownTimeout = ptr.To(o.config.GracefulShutdownTimeout.Duration)
}

func (o *options) applyCacheOptions() {
// filter lease cache for shard leases to avoid watching all leases in cluster
leaseSelector := labels.NewSelector()
{
ringRequirement, err := labels.NewRequirement(shardingv1alpha1.LabelClusterRing, selection.Exists, nil)
utilruntime.Must(err)
leaseSelector.Add(*ringRequirement)
}

o.managerOptions.Cache = cache.Options{
DefaultTransform: dropUnwantedMetadata,

ByObject: map[client.Object]cache.ByObject{
&coordinationv1.Lease{}: {
Label: leaseSelector,
},
},
}
}

func (o *options) applyOptionsOverrides() error {
var err error

Expand Down
12 changes: 12 additions & 0 deletions config/rbac/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,15 @@ resources:
- serviceaccount.yaml
- leader_election.yaml
- metrics_auth.yaml
- role.yaml
- rolebinding.yaml

patches:
# This is a workaround for controller-gen not being able to handle colons in the role name option.
- target:
kind: ClusterRole
name: sharder
patch: |
- op: replace
path: /metadata/name
value: sharding:sharder
17 changes: 17 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: sharder
rules:
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- delete
- get
- list
- patch
- update
- watch
13 changes: 13 additions & 0 deletions config/rbac/rolebinding.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: sharding:sharder
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: sharding:sharder
subjects:
- kind: ServiceAccount
name: sharder
namespace: sharding-system
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/spf13/pflag v1.0.5
go.uber.org/automaxprocs v1.5.3
go.uber.org/zap v1.26.0
k8s.io/api v0.28.3
k8s.io/apimachinery v0.28.3
k8s.io/client-go v0.28.3
k8s.io/code-generator v0.28.3
Expand Down Expand Up @@ -102,7 +103,7 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.28.3 // indirect
k8s.io/apiextensions-apiserver v0.28.3 // indirect
k8s.io/apiserver v0.28.3 // indirect
k8s.io/gengo v0.0.0-20230829151522-9cce18d56c01 // indirect
k8s.io/kms v0.28.3 // indirect
Expand Down
6 changes: 6 additions & 0 deletions pkg/controller/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@ package controller

import (
"context"
"fmt"

"sigs.k8s.io/controller-runtime/pkg/manager"

configv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/config/v1alpha1"
"github.com/timebertt/kubernetes-controller-sharding/pkg/controller/lease"
)

func AddToManager(ctx context.Context, mgr manager.Manager, cfg *configv1alpha1.SharderConfig) error {
if err := (&lease.Reconciler{}).AddToManager(mgr); err != nil {
return fmt.Errorf("failed adding lease controller: %w", err)
}

return nil
}
79 changes: 79 additions & 0 deletions pkg/controller/lease/add.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
Copyright 2023 Tim Ebert.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package lease

import (
coordinationv1 "k8s.io/api/coordination/v1"
"k8s.io/utils/clock"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"

shardingv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/sharding/v1alpha1"
"github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/leases"
)

// ControllerName is the name of this controller.
const ControllerName = "lease"

// AddToManager adds Reconciler to the given manager.
func (r *Reconciler) AddToManager(mgr manager.Manager) error {
if r.Client == nil {
r.Client = mgr.GetClient()
}
if r.Clock == nil {
r.Clock = clock.RealClock{}
}

return builder.ControllerManagedBy(mgr).
Named(ControllerName).
For(&coordinationv1.Lease{}, builder.WithPredicates(r.LeasePredicate())).
WithOptions(controller.Options{
MaxConcurrentReconciles: 5,
}).
Complete(r)
}

func (r *Reconciler) LeasePredicate() predicate.Predicate {
// ignore deletion of shard leases
return predicate.And(
predicate.NewPredicateFuncs(isShardLease),
predicate.Funcs{
CreateFunc: func(_ event.CreateEvent) bool { return true },
UpdateFunc: func(e event.UpdateEvent) bool {
oldLease, ok := e.ObjectOld.(*coordinationv1.Lease)
if !ok {
return false
}
newLease, ok := e.ObjectNew.(*coordinationv1.Lease)
if !ok {
return false
}

return leases.ToState(oldLease, r.Clock) != leases.ToState(newLease, r.Clock)
},
DeleteFunc: func(_ event.DeleteEvent) bool { return false },
},
)
}

func isShardLease(obj client.Object) bool {
return obj.GetLabels()[shardingv1alpha1.LabelClusterRing] != ""
}
132 changes: 132 additions & 0 deletions pkg/controller/lease/reconciler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
Copyright 2023 Tim Ebert.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package lease

import (
"context"
"fmt"
"time"

coordinationv1 "k8s.io/api/coordination/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/clock"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

shardingv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/sharding/v1alpha1"
"github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/leases"
)

//+kubebuilder:rbac:groups=coordination.k8s.io,resources=leases,verbs=get;list;watch;update;patch;delete

// Reconciler reconciles shard leases.
type Reconciler struct {
Client client.Client
Clock clock.PassiveClock
}

// Reconcile reconciles a Lease object.
func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
log := logf.FromContext(ctx)

lease := &coordinationv1.Lease{}
if err := r.Client.Get(ctx, req.NamespacedName, lease); err != nil {
if apierrors.IsNotFound(err) {
log.V(1).Info("Object is gone, stop reconciling")
return reconcile.Result{}, nil
}
return reconcile.Result{}, fmt.Errorf("error retrieving object from store: %w", err)
}

clusterRingName := lease.Labels[shardingv1alpha1.LabelClusterRing]
if clusterRingName == "" {
log.V(1).Info("Ignoring non-shard lease")
return reconcile.Result{}, nil
}

if err := r.Client.Get(ctx, client.ObjectKey{Name: clusterRingName}, &shardingv1alpha1.ClusterRing{}); err != nil {
if !apierrors.IsNotFound(err) {
return reconcile.Result{}, fmt.Errorf("error checking for existence of ClusterRing: %w", err)
}

log.V(1).Info("Ignoring shard lease without a corresponding ClusterRing")
return reconcile.Result{}, nil
}

var (
previousState = leases.StateFromString(lease.Labels[shardingv1alpha1.LabelState])
shard = leases.ToShard(lease, r.Clock)
)
log = log.WithValues("state", shard.State, "expirationTime", shard.Times.Expiration, "leaseDuration", shard.Times.LeaseDuration)

// maintain state label
if previousState != shard.State {
patch := client.MergeFromWithOptions(lease.DeepCopy(), client.MergeFromWithOptimisticLock{})
metav1.SetMetaDataLabel(&lease.ObjectMeta, shardingv1alpha1.LabelState, shard.State.String())
if err := r.Client.Patch(ctx, lease, patch); err != nil {
return reconcile.Result{}, fmt.Errorf("failed to update state label on lease: %w", err)
}
}

// act on state and determine when to check again
var requeueAfter time.Duration
switch shard.State {
case leases.Ready:
if previousState != leases.Ready {
log.Info("Shard got ready")
}
requeueAfter = shard.Times.ToExpired
case leases.Expired:
log.Info("Shard lease has expired")
requeueAfter = shard.Times.ToUncertain
case leases.Uncertain:
log.Info("Shard lease has expired more than leaseDuration ago, trying to acquire shard lease")

now := metav1.NewMicroTime(r.Clock.Now())
transitions := int32(0)
if lease.Spec.LeaseTransitions != nil {
transitions = *lease.Spec.LeaseTransitions
}

lease.Spec.HolderIdentity = ptr.To(shardingv1alpha1.IdentityShardLeaseController)
lease.Spec.LeaseDurationSeconds = ptr.To(2 * int32(shard.Times.LeaseDuration.Round(time.Second).Seconds()))
lease.Spec.AcquireTime = &now
lease.Spec.RenewTime = &now
lease.Spec.LeaseTransitions = ptr.To(transitions + 1)
if err := r.Client.Update(ctx, lease); err != nil {
return reconcile.Result{}, fmt.Errorf("error acquiring shard lease: %w", err)
}

// lease will be enqueued once we observe our previous update via watch
// requeue with leaseDuration just to be sure
requeueAfter = shard.Times.LeaseDuration
case leases.Dead:
// garbage collect later
requeueAfter = shard.Times.ToOrphaned
case leases.Orphaned:
// garbage collect and forget orphaned leases
return reconcile.Result{}, r.Client.Delete(ctx, lease)
default:
// Unknown, forget lease
return reconcile.Result{}, nil
}

return reconcile.Result{RequeueAfter: requeueAfter}, nil
}

0 comments on commit e64c5d7

Please sign in to comment.