From 8511634a0166e2c499fda16e1ecce02ccfc2f9d0 Mon Sep 17 00:00:00 2001 From: Lin Yang Date: Mon, 8 Apr 2024 11:30:54 +0800 Subject: [PATCH] fix: watch Endpoints changes for headless service (#217) Signed-off-by: Lin Yang --- pkg/gateway/cache/cache.go | 6 +++--- pkg/gateway/cache/endpoints_trigger.go | 4 ++-- pkg/gateway/cache/methods.go | 10 ++++++++++ pkg/gateway/client.go | 9 +++++++++ 4 files changed, 24 insertions(+), 5 deletions(-) diff --git a/pkg/gateway/cache/cache.go b/pkg/gateway/cache/cache.go index 666659c6f..f4959c9ef 100644 --- a/pkg/gateway/cache/cache.go +++ b/pkg/gateway/cache/cache.go @@ -41,7 +41,7 @@ func NewGatewayCache(informerCollection *informers.InformerCollection, kubeClien cfg: cfg, triggers: map[ResourceType]Trigger{ - //EndpointsResourceType: &EndpointsTrigger{}, + EndpointsResourceType: &EndpointsTrigger{}, ServicesResourceType: &ServicesTrigger{}, ServiceImportsResourceType: &ServiceImportsTrigger{}, EndpointSlicesResourceType: &EndpointSlicesTrigger{}, @@ -91,8 +91,8 @@ func (c *GatewayCache) Delete(obj interface{}) bool { func (c *GatewayCache) getTrigger(obj interface{}) Trigger { switch obj.(type) { - //case *corev1.Endpoints: - // return c.triggers[EndpointsResourceType] + case *corev1.Endpoints: + return c.triggers[EndpointsResourceType] case *corev1.Service: return c.triggers[ServicesResourceType] case *mcsv1alpha1.ServiceImport: diff --git a/pkg/gateway/cache/endpoints_trigger.go b/pkg/gateway/cache/endpoints_trigger.go index 3022e63ba..502bb541d 100644 --- a/pkg/gateway/cache/endpoints_trigger.go +++ b/pkg/gateway/cache/endpoints_trigger.go @@ -23,7 +23,7 @@ func (p *EndpointsTrigger) Insert(obj interface{}, cache *GatewayCache) bool { key := utils.ObjectKey(ep) //cache.endpoints[key] = struct{}{} - return cache.isRoutableService(key) + return cache.isHeadlessServiceWithoutSelector(key) && cache.isRoutableService(key) } // Delete removes the Endpoints object from the cache and returns true if the cache was modified @@ -43,5 +43,5 @@ func (p *EndpointsTrigger) Delete(obj interface{}, cache *GatewayCache) bool { // //return found - return cache.isRoutableService(key) + return cache.isHeadlessServiceWithoutSelector(key) && cache.isRoutableService(key) } diff --git a/pkg/gateway/cache/methods.go b/pkg/gateway/cache/methods.go index 5ea922653..653912ac2 100644 --- a/pkg/gateway/cache/methods.go +++ b/pkg/gateway/cache/methods.go @@ -430,3 +430,13 @@ func (c *GatewayCache) getServiceFromCache(key client.ObjectKey) (*corev1.Servic return obj, nil } + +func (c *GatewayCache) isHeadlessServiceWithoutSelector(key client.ObjectKey) bool { + service, err := c.getServiceFromCache(key) + if err != nil { + log.Error().Msgf("failed to get service from cache: %v", err) + return false + } + + return service.Spec.ClusterIP == corev1.ClusterIPNone && len(service.Spec.Selector) == 0 +} diff --git a/pkg/gateway/client.go b/pkg/gateway/client.go index 30ee71083..779ad2581 100644 --- a/pkg/gateway/client.go +++ b/pkg/gateway/client.go @@ -75,6 +75,7 @@ func newClient(informerCollection *informers.InformerCollection, kubeClient kube for _, informerKey := range []fsminformers.InformerKey{ fsminformers.InformerKeyService, fsminformers.InformerKeyServiceImport, + fsminformers.InformerKeyEndpoints, fsminformers.InformerKeyEndpointSlices, fsminformers.InformerKeySecret, fsminformers.InformerKeyGatewayAPIGatewayClass, @@ -200,6 +201,8 @@ func getEventTypesByObjectType(obj interface{}) *k8s.EventTypes { return getEventTypesByInformerKey(fsminformers.InformerKeyService) case *mcsv1alpha1.ServiceImport: return getEventTypesByInformerKey(fsminformers.InformerKeyServiceImport) + case *corev1.Endpoints: + return getEventTypesByInformerKey(fsminformers.InformerKeyEndpoints) case *discoveryv1.EndpointSlice: return getEventTypesByInformerKey(fsminformers.InformerKeyEndpointSlices) case *corev1.Secret: @@ -263,6 +266,12 @@ func getEventTypesByInformerKey(informerKey fsminformers.InformerKey) *k8s.Event Update: announcements.EndpointSlicesUpdated, Delete: announcements.EndpointSlicesDeleted, } + case fsminformers.InformerKeyEndpoints: + return &k8s.EventTypes{ + Add: announcements.EndpointAdded, + Update: announcements.EndpointUpdated, + Delete: announcements.EndpointDeleted, + } case fsminformers.InformerKeySecret: return &k8s.EventTypes{ Add: announcements.SecretAdded,