Skip to content

Commit

Permalink
Add loadbalancer service controller
Browse files Browse the repository at this point in the history
With this new controller and OVN-K, microshift is able to support
k8s services of loadblancer type. The controller is responsible for
updating the status.loadBalancer with the node ip and hostname of
the local host. OVN-K is responsible for adding the iptables rules
accordingly.
  • Loading branch information
pliurh committed Jan 12, 2023
1 parent 574d379 commit a2b80f8
Show file tree
Hide file tree
Showing 3 changed files with 206 additions and 1 deletion.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ require (
gopkg.in/square/go-jose.v2 v2.5.1 // indirect
gopkg.in/warnings.v0 v0.1.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/cloud-provider v0.0.0 // indirect
k8s.io/cloud-provider v0.0.0
k8s.io/cluster-bootstrap v0.0.0 // indirect
k8s.io/component-helpers v0.25.2 // indirect
k8s.io/cri-api v0.0.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions pkg/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/openshift/microshift/pkg/config"
"github.com/openshift/microshift/pkg/controllers"
"github.com/openshift/microshift/pkg/kustomize"
"github.com/openshift/microshift/pkg/loadbalancerservice"
"github.com/openshift/microshift/pkg/mdns"
"github.com/openshift/microshift/pkg/node"
"github.com/openshift/microshift/pkg/servicemanager"
Expand Down Expand Up @@ -107,6 +108,7 @@ func RunMicroshift(cfg *config.MicroshiftConfig, flags *pflag.FlagSet) error {
util.Must(m.AddService((controllers.NewVersionManager((cfg)))))
util.Must(m.AddService(kustomize.NewKustomizer(cfg)))
util.Must(m.AddService(node.NewKubeletServer(cfg)))
util.Must(m.AddService(loadbalancerservice.NewLoadbalancerServiceController(cfg)))

// Storing and clearing the env, so other components don't send the READY=1 until MicroShift is fully ready
notifySocket := os.Getenv("NOTIFY_SOCKET")
Expand Down
203 changes: 203 additions & 0 deletions pkg/loadbalancerservice/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package loadbalancerservice

import (
"context"
"fmt"
"time"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
"k8s.io/cloud-provider/service/helpers"
"k8s.io/klog/v2"

"github.com/openshift/microshift/pkg/config"
)

const defaultInformerResyncPeriod = 10 * time.Minute

type LoadbalancerServiceController struct {
NodeIP string
KubeConfig string
client *kubernetes.Clientset
indexer cache.Indexer
queue workqueue.RateLimitingInterface
informer cache.SharedIndexInformer
}

func NewLoadbalancerServiceController(cfg *config.MicroshiftConfig) *LoadbalancerServiceController {
return &LoadbalancerServiceController{
NodeIP: cfg.NodeIP,
KubeConfig: cfg.KubeConfigPath(config.KubeAdmin),
}
}

func (c *LoadbalancerServiceController) Name() string {
return "microshift-loadbalancer-service-controller"
}
func (c *LoadbalancerServiceController) Dependencies() []string {
return []string{}
}

func (c *LoadbalancerServiceController) Run(ctx context.Context, ready chan<- struct{}, stopped chan<- struct{}) error {
defer close(stopped)
stopCh := make(chan struct{})
defer close(stopCh)

config, err := c.restConfig()
if err != nil {
return errors.Wrap(err, "error creating rest config for service controller")
}
c.client, err = kubernetes.NewForConfig(config)
if err != nil {
return errors.Wrap(err, "failed to create clientset for service controller")
}

klog.Infof("Starting service controller")

factory := informers.NewSharedInformerFactory(c.client, defaultInformerResyncPeriod)
serviceInformer := factory.Core().V1().Services()
c.informer = serviceInformer.Informer()
c.queue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
c.indexer = c.informer.GetIndexer()
c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
c.queue.Add(key)
}
},
UpdateFunc: func(old interface{}, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
c.queue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
c.queue.Add(key)
}
},
})

factory.Start(stopCh)

if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
return fmt.Errorf("timed out waiting for caches to sync")
}

go wait.Until(c.runWorker, time.Second, stopCh)

close(ready)

<-ctx.Done()

return ctx.Err()
}

func (c *LoadbalancerServiceController) runWorker() {
for c.processNextItem() {
}
}

func (c *LoadbalancerServiceController) processNextItem() bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)

err := c.updateServiceStatus(key.(string))
c.handleErr(err, key)
return true
}

// handleErr checks if an error happened and makes sure we will retry later.
func (c *LoadbalancerServiceController) handleErr(err error, key interface{}) {
if err == nil {
c.queue.Forget(key)
return
}

klog.Infof("Error syncing service %v: %v", key, err)

// Re-enqueue the key rate limited. Based on the rate limiter on the
// queue and the re-enqueue history, the key will be processed later again.
c.queue.AddRateLimited(key)
}

func (c *LoadbalancerServiceController) updateServiceStatus(key string) error {
obj, exists, err := c.indexer.GetByKey(key)
if err != nil {
klog.Errorf("Fetching service object with key %s from store failed with %v", key, err)
return err
}

if !exists {
klog.Infof("Service %s does not exist anymore", key)
} else {
svc := obj.(*corev1.Service)
if svc.Spec.Type != corev1.ServiceTypeLoadBalancer {
return nil
}
klog.Infof("Process service %s/%s", svc.Namespace, svc.Name)
if svc.Spec.Type != corev1.ServiceTypeLoadBalancer {
return nil
}
newStatus := c.getNewStatus(svc)
err := c.patchStatus(svc, newStatus)
if err != nil {
return err
}
if len(newStatus.Ingress) == 0 {
return fmt.Errorf("failed to get new service status: %s", key)
}
}
return nil
}

func (c *LoadbalancerServiceController) restConfig() (*rest.Config, error) {
return clientcmd.BuildConfigFromFlags("", c.KubeConfig)
}

func (c *LoadbalancerServiceController) getNewStatus(svc *corev1.Service) *corev1.LoadBalancerStatus {
newStatus := &corev1.LoadBalancerStatus{}
objs := c.indexer.List()
for _, obj := range objs {
s := obj.(*corev1.Service)
if (s.Name == svc.Name && s.Namespace == svc.Namespace) || len(s.Status.LoadBalancer.Ingress) == 0 {
continue
}
for _, ep := range s.Spec.Ports {
for _, np := range svc.Spec.Ports {
if ep.Port == np.Port {
klog.Infof("Node port %d occupied", ep.Port)
return newStatus
}
}
}
}

newStatus.Ingress = append(newStatus.Ingress, corev1.LoadBalancerIngress{
IP: c.NodeIP,
})
return newStatus
}

func (c *LoadbalancerServiceController) patchStatus(svc *corev1.Service, newStatus *corev1.LoadBalancerStatus) error {
if helpers.LoadBalancerStatusEqual(&svc.Status.LoadBalancer, newStatus) {
return nil
}
updated := svc.DeepCopy()
updated.Status.LoadBalancer = *newStatus
_, err := helpers.PatchService(c.client.CoreV1(), svc, updated)

return err
}

0 comments on commit a2b80f8

Please sign in to comment.