Skip to content

Commit

Permalink
Support MixedProtocolLBService and clean up Daemonsets on type change.
Browse files Browse the repository at this point in the history
Also add event support to increase visibility of change events.

Signed-off-by: Brad Davidson <[email protected]>
  • Loading branch information
brandond committed Mar 3, 2022
1 parent c07b33a commit 1f990f3
Showing 1 changed file with 37 additions and 17 deletions.
54 changes: 37 additions & 17 deletions pkg/servicelb/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strconv"
"strings"

util "github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version"
"github.com/rancher/wrangler/pkg/apply"
"github.com/rancher/wrangler/pkg/condition"
Expand All @@ -16,7 +17,6 @@ import (
"github.com/rancher/wrangler/pkg/objectset"
"github.com/rancher/wrangler/pkg/relatedresource"
"github.com/rancher/wrangler/pkg/slice"
"github.com/sirupsen/logrus"
apps "k8s.io/api/apps/v1"
core "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -27,6 +27,7 @@ import (
"k8s.io/client-go/kubernetes"
v1getter "k8s.io/client-go/kubernetes/typed/apps/v1"
coregetter "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
utilsnet "k8s.io/utils/net"
utilpointer "k8s.io/utils/pointer"
)
Expand All @@ -40,7 +41,8 @@ var (
)

const (
Ready = condition.Cond("Ready")
Ready = condition.Cond("Ready")
ControllerName = "svccontroller"
)

var (
Expand All @@ -63,17 +65,17 @@ func Register(ctx context.Context,
nodeCache: nodes.Cache(),
podCache: pods.Cache(),
deploymentCache: deployments.Cache(),
processor: apply.WithSetID("svccontroller").
WithCacheTypes(daemonSetController),
serviceCache: services.Cache(),
services: kubernetes.CoreV1(),
daemonsets: kubernetes.AppsV1(),
deployments: kubernetes.AppsV1(),
processor: apply.WithSetID(ControllerName).WithCacheTypes(daemonSetController),
serviceCache: services.Cache(),
services: kubernetes.CoreV1(),
daemonsets: kubernetes.AppsV1(),
deployments: kubernetes.AppsV1(),
recorder: util.BuildControllerEventRecorder(kubernetes, ControllerName),
}

services.OnChange(ctx, "svccontroller", h.onChangeService)
nodes.OnChange(ctx, "svccontroller", h.onChangeNode)
relatedresource.Watch(ctx, "svccontroller-watcher",
services.OnChange(ctx, ControllerName, h.onChangeService)
nodes.OnChange(ctx, ControllerName, h.onChangeNode)
relatedresource.Watch(ctx, ControllerName+"-watcher",
h.onResourceChange,
services,
pods,
Expand All @@ -93,6 +95,7 @@ type handler struct {
services coregetter.ServicesGetter
daemonsets v1getter.DaemonSetsGetter
deployments v1getter.DeploymentsGetter
recorder record.EventRecorder
}

func (h *handler) onResourceChange(name, namespace string, obj runtime.Object) ([]relatedresource.Key, error) {
Expand Down Expand Up @@ -133,9 +136,10 @@ func (h *handler) onChangeService(key string, svc *core.Service) (*core.Service,
return nil, nil
}

if svc.Spec.Type != core.ServiceTypeLoadBalancer || svc.Spec.ClusterIP == "" ||
svc.Spec.ClusterIP == "None" {
return svc, nil
if svc.Spec.Type != core.ServiceTypeLoadBalancer || svc.Spec.ClusterIP == "" || svc.Spec.ClusterIP == "None" {
// If the Service type changes from LoadBalancer to something else, make sure we clean up old deployments
err := h.deletePod(svc)
return svc, err
}

if err := h.deployPod(svc); err != nil {
Expand Down Expand Up @@ -200,7 +204,7 @@ func (h *handler) updateService(svc *core.Service) (runtime.Object, error) {
})
}

logrus.Debugf("Setting service loadbalancer %s/%s to IPs %v", svc.Namespace, svc.Name, expectedIPs)
h.recorder.Eventf(svc, core.EventTypeNormal, "UpdatedIngressIP", "LoadBalancer Ingress IP addresses updated: %s", strings.Join(expectedIPs, ", "))
return h.services.Services(svc.Namespace).UpdateStatus(context.TODO(), svc, meta.UpdateOptions{})
}

Expand Down Expand Up @@ -325,7 +329,22 @@ func filterByIPFamily(ips []string, svc *core.Service) ([]string, error) {
return nil, errors.New("unhandled ipFamilyPolicy")
}

// deployPod ensures that there is a DaemonSet for each service.
// deletePod ensures that there is not a DaemonSet for the service.
func (h *handler) deletePod(svc *core.Service) error {
name := fmt.Sprintf("svclb-%s", svc.Name)
ds, err := h.daemonsets.DaemonSets(svc.Namespace).Get(context.TODO(), name, meta.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return err
}
h.recorder.Eventf(svc, core.EventTypeNormal, "DeletedDaemonSet", "Deleteting DaemonSet %s/%s", ds.Namespace, ds.Name)
objs := objectset.NewObjectSet()
return h.processor.WithOwner(svc).Apply(objs)
}

// deployPod ensures that there is a DaemonSet for the service.
// It also ensures that any legacy Deployments from older versions of ServiceLB are deleted.
func (h *handler) deployPod(svc *core.Service) error {
if err := h.deleteOldDeployments(svc); err != nil {
Expand All @@ -342,6 +361,7 @@ func (h *handler) deployPod(svc *core.Service) error {
}
if ds != nil {
objs.Add(ds)
h.recorder.Eventf(svc, core.EventTypeNormal, "AppliedDaemonSet", "Applying LoadBalancer DaemonSet %s/%s", ds.Namespace, ds.Name)
}
return h.processor.WithOwner(svc).Apply(objs)
}
Expand Down Expand Up @@ -421,7 +441,7 @@ func (h *handler) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
}

for _, port := range svc.Spec.Ports {
portName := fmt.Sprintf("lb-port-%d", port.Port)
portName := fmt.Sprintf("lb-%s-%d", strings.ToLower(string(port.Protocol)), port.Port)
container := core.Container{
Name: portName,
Image: DefaultLBImage,
Expand Down

0 comments on commit 1f990f3

Please sign in to comment.