Skip to content

Commit

Permalink
fix: watch Endpoints changes for headless service (#217)
Browse files Browse the repository at this point in the history
Signed-off-by: Lin Yang <[email protected]>
  • Loading branch information
reaver-flomesh committed Apr 9, 2024
1 parent 2972609 commit 37bb922
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 5 deletions.
6 changes: 3 additions & 3 deletions pkg/gateway/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewGatewayCache(informerCollection *informers.InformerCollection, kubeClien
cfg: cfg,

triggers: map[ResourceType]Trigger{
//EndpointsResourceType: &EndpointsTrigger{},
EndpointsResourceType: &EndpointsTrigger{},
ServicesResourceType: &ServicesTrigger{},
ServiceImportsResourceType: &ServiceImportsTrigger{},
EndpointSlicesResourceType: &EndpointSlicesTrigger{},
Expand Down Expand Up @@ -91,8 +91,8 @@ func (c *GatewayCache) Delete(obj interface{}) bool {

func (c *GatewayCache) getTrigger(obj interface{}) Trigger {
switch obj.(type) {
//case *corev1.Endpoints:
// return c.triggers[EndpointsResourceType]
case *corev1.Endpoints:
return c.triggers[EndpointsResourceType]
case *corev1.Service:
return c.triggers[ServicesResourceType]
case *mcsv1alpha1.ServiceImport:
Expand Down
4 changes: 2 additions & 2 deletions pkg/gateway/cache/endpoints_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (p *EndpointsTrigger) Insert(obj interface{}, cache *GatewayCache) bool {
key := utils.ObjectKey(ep)
//cache.endpoints[key] = struct{}{}

return cache.isRoutableService(key)
return cache.isHeadlessServiceWithoutSelector(key) && cache.isRoutableService(key)
}

// Delete removes the Endpoints object from the cache and returns true if the cache was modified
Expand All @@ -43,5 +43,5 @@ func (p *EndpointsTrigger) Delete(obj interface{}, cache *GatewayCache) bool {
//
//return found

return cache.isRoutableService(key)
return cache.isHeadlessServiceWithoutSelector(key) && cache.isRoutableService(key)
}
10 changes: 10 additions & 0 deletions pkg/gateway/cache/methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,3 +430,13 @@ func (c *GatewayCache) getServiceFromCache(key client.ObjectKey) (*corev1.Servic

return obj, nil
}

func (c *GatewayCache) isHeadlessServiceWithoutSelector(key client.ObjectKey) bool {
service, err := c.getServiceFromCache(key)
if err != nil {
log.Error().Msgf("failed to get service from cache: %v", err)
return false
}

return service.Spec.ClusterIP == corev1.ClusterIPNone && len(service.Spec.Selector) == 0
}
9 changes: 9 additions & 0 deletions pkg/gateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func newClient(informerCollection *informers.InformerCollection, kubeClient kube
for _, informerKey := range []fsminformers.InformerKey{
fsminformers.InformerKeyService,
fsminformers.InformerKeyServiceImport,
fsminformers.InformerKeyEndpoints,
fsminformers.InformerKeyEndpointSlices,
fsminformers.InformerKeySecret,
fsminformers.InformerKeyGatewayAPIGatewayClass,
Expand Down Expand Up @@ -200,6 +201,8 @@ func getEventTypesByObjectType(obj interface{}) *k8s.EventTypes {
return getEventTypesByInformerKey(fsminformers.InformerKeyService)
case *mcsv1alpha1.ServiceImport:
return getEventTypesByInformerKey(fsminformers.InformerKeyServiceImport)
case *corev1.Endpoints:
return getEventTypesByInformerKey(fsminformers.InformerKeyEndpoints)
case *discoveryv1.EndpointSlice:
return getEventTypesByInformerKey(fsminformers.InformerKeyEndpointSlices)
case *corev1.Secret:
Expand Down Expand Up @@ -263,6 +266,12 @@ func getEventTypesByInformerKey(informerKey fsminformers.InformerKey) *k8s.Event
Update: announcements.EndpointSlicesUpdated,
Delete: announcements.EndpointSlicesDeleted,
}
case fsminformers.InformerKeyEndpoints:
return &k8s.EventTypes{
Add: announcements.EndpointAdded,
Update: announcements.EndpointUpdated,
Delete: announcements.EndpointDeleted,
}
case fsminformers.InformerKeySecret:
return &k8s.EventTypes{
Add: announcements.SecretAdded,
Expand Down

0 comments on commit 37bb922

Please sign in to comment.