diff --git a/pkg/servicelb/controller.go b/pkg/servicelb/controller.go index 81b6d0a93d41..4edae0f65766 100644 --- a/pkg/servicelb/controller.go +++ b/pkg/servicelb/controller.go @@ -8,6 +8,7 @@ import ( "strconv" "strings" + util "github.com/k3s-io/k3s/pkg/util" "github.com/k3s-io/k3s/pkg/version" "github.com/rancher/wrangler/pkg/apply" "github.com/rancher/wrangler/pkg/condition" @@ -16,7 +17,6 @@ import ( "github.com/rancher/wrangler/pkg/objectset" "github.com/rancher/wrangler/pkg/relatedresource" "github.com/rancher/wrangler/pkg/slice" - "github.com/sirupsen/logrus" apps "k8s.io/api/apps/v1" core "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -27,6 +27,7 @@ import ( "k8s.io/client-go/kubernetes" v1getter "k8s.io/client-go/kubernetes/typed/apps/v1" coregetter "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/record" utilsnet "k8s.io/utils/net" utilpointer "k8s.io/utils/pointer" ) @@ -40,7 +41,8 @@ var ( ) const ( - Ready = condition.Cond("Ready") + Ready = condition.Cond("Ready") + ControllerName = "svccontroller" ) var ( @@ -63,17 +65,17 @@ func Register(ctx context.Context, nodeCache: nodes.Cache(), podCache: pods.Cache(), deploymentCache: deployments.Cache(), - processor: apply.WithSetID("svccontroller"). - WithCacheTypes(daemonSetController), - serviceCache: services.Cache(), - services: kubernetes.CoreV1(), - daemonsets: kubernetes.AppsV1(), - deployments: kubernetes.AppsV1(), + processor: apply.WithSetID(ControllerName).WithCacheTypes(daemonSetController), + serviceCache: services.Cache(), + services: kubernetes.CoreV1(), + daemonsets: kubernetes.AppsV1(), + deployments: kubernetes.AppsV1(), + recorder: util.BuildControllerEventRecorder(kubernetes, ControllerName), } - services.OnChange(ctx, "svccontroller", h.onChangeService) - nodes.OnChange(ctx, "svccontroller", h.onChangeNode) - relatedresource.Watch(ctx, "svccontroller-watcher", + services.OnChange(ctx, ControllerName, h.onChangeService) + nodes.OnChange(ctx, ControllerName, h.onChangeNode) + relatedresource.Watch(ctx, ControllerName+"-watcher", h.onResourceChange, services, pods, @@ -93,6 +95,7 @@ type handler struct { services coregetter.ServicesGetter daemonsets v1getter.DaemonSetsGetter deployments v1getter.DeploymentsGetter + recorder record.EventRecorder } func (h *handler) onResourceChange(name, namespace string, obj runtime.Object) ([]relatedresource.Key, error) { @@ -133,9 +136,10 @@ func (h *handler) onChangeService(key string, svc *core.Service) (*core.Service, return nil, nil } - if svc.Spec.Type != core.ServiceTypeLoadBalancer || svc.Spec.ClusterIP == "" || - svc.Spec.ClusterIP == "None" { - return svc, nil + if svc.Spec.Type != core.ServiceTypeLoadBalancer || svc.Spec.ClusterIP == "" || svc.Spec.ClusterIP == "None" { + // If the Service type changes from LoadBalancer to something else, make sure we clean up old deployments + err := h.deletePod(svc) + return svc, err } if err := h.deployPod(svc); err != nil { @@ -200,7 +204,7 @@ func (h *handler) updateService(svc *core.Service) (runtime.Object, error) { }) } - logrus.Debugf("Setting service loadbalancer %s/%s to IPs %v", svc.Namespace, svc.Name, expectedIPs) + h.recorder.Eventf(svc, core.EventTypeNormal, "UpdatedIngressIP", "LoadBalancer Ingress IP addresses updated: %s", strings.Join(expectedIPs, ", ")) return h.services.Services(svc.Namespace).UpdateStatus(context.TODO(), svc, meta.UpdateOptions{}) } @@ -325,7 +329,22 @@ func filterByIPFamily(ips []string, svc *core.Service) ([]string, error) { return nil, errors.New("unhandled ipFamilyPolicy") } -// deployPod ensures that there is a DaemonSet for each service. +// deletePod ensures that there is not a DaemonSet for the service. +func (h *handler) deletePod(svc *core.Service) error { + name := fmt.Sprintf("svclb-%s", svc.Name) + ds, err := h.daemonsets.DaemonSets(svc.Namespace).Get(context.TODO(), name, meta.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return nil + } + return err + } + h.recorder.Eventf(svc, core.EventTypeNormal, "DeletedDaemonSet", "Deleteting DaemonSet %s/%s", ds.Namespace, ds.Name) + objs := objectset.NewObjectSet() + return h.processor.WithOwner(svc).Apply(objs) +} + +// deployPod ensures that there is a DaemonSet for the service. // It also ensures that any legacy Deployments from older versions of ServiceLB are deleted. func (h *handler) deployPod(svc *core.Service) error { if err := h.deleteOldDeployments(svc); err != nil { @@ -342,6 +361,7 @@ func (h *handler) deployPod(svc *core.Service) error { } if ds != nil { objs.Add(ds) + h.recorder.Eventf(svc, core.EventTypeNormal, "AppliedDaemonSet", "Applying LoadBalancer DaemonSet %s/%s", ds.Namespace, ds.Name) } return h.processor.WithOwner(svc).Apply(objs) } @@ -421,7 +441,7 @@ func (h *handler) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) { } for _, port := range svc.Spec.Ports { - portName := fmt.Sprintf("lb-port-%d", port.Port) + portName := fmt.Sprintf("lb-%s-%d", strings.ToLower(string(port.Protocol)), port.Port) container := core.Container{ Name: portName, Image: DefaultLBImage,