Skip to content

Commit

Permalink
feat: support headless service without selector (#216)
Browse files Browse the repository at this point in the history
* feat: support headless service without selector

Signed-off-by: Lin Yang <[email protected]>

* fix: golang lint

Signed-off-by: Lin Yang <[email protected]>

---------

Signed-off-by: Lin Yang <[email protected]>
  • Loading branch information
reaver-flomesh authored Mar 23, 2024
1 parent afbf40b commit 9d944ad
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 49 deletions.
141 changes: 98 additions & 43 deletions pkg/gateway/cache/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package cache
import (
"fmt"

"k8s.io/apimachinery/pkg/util/sets"

gwv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"

"k8s.io/utils/pointer"
Expand Down Expand Up @@ -293,60 +295,23 @@ func (c *GatewayCache) serviceConfigs(services map[string]serviceInfo) map[strin
continue
}

selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
MatchLabels: map[string]string{
constants.KubernetesEndpointSliceServiceNameLabel: svc.Name,
},
})
if err != nil {
log.Error().Msgf("Failed to convert LabelSelector to Selector: %s", err)
continue
}

endpointSliceList, err := c.informers.GetListers().EndpointSlice.EndpointSlices(svc.Namespace).List(selector)
if err != nil {
log.Error().Msgf("Failed to list EndpointSlice of Service %s: %s", svcKey, err)
continue
}

if len(endpointSliceList) == 0 {
if svc.Spec.Type == corev1.ServiceTypeExternalName {
log.Warn().Msgf("Type of Service %s is %s, will be ignored", svcKey, corev1.ServiceTypeExternalName)
continue
}

svcPort, err := getServicePort(svc, svcInfo.svcPortName.Port)
if err != nil {
log.Error().Msgf("Failed to get ServicePort: %s", err)
continue
}
upstreams := c.upstreams(svcInfo, svc)

filteredSlices := filterEndpointSliceList(endpointSliceList, svcPort)
if len(filteredSlices) == 0 {
log.Error().Msgf("no valid endpoints found for Service %s and port %+v", svcKey, svcPort)
if len(upstreams) == 0 {
continue
}

endpointSet := make(map[endpointInfo]struct{})
for _, eps := range filteredSlices {
for _, endpoint := range eps.Endpoints {
if !isEndpointReady(endpoint) {
continue
}
endpointPort := findPort(eps.Ports, svcPort)

for _, address := range endpoint.Addresses {
ep := endpointInfo{address: address, port: endpointPort}
endpointSet[ep] = struct{}{}
}
}
}

svcCfg := &fgw.ServiceConfig{
//Filters: svcInfo.filters,
//Filters: referredSvcInfo.filters,
Endpoints: make(map[string]fgw.Endpoint),
}

for ep := range endpointSet {
hostport := fmt.Sprintf("%s:%d", ep.address, ep.port)
for _, hostport := range upstreams {
svcCfg.Endpoints[hostport] = fgw.Endpoint{
Weight: 1,
}
Expand All @@ -362,6 +327,96 @@ func (c *GatewayCache) serviceConfigs(services map[string]serviceInfo) map[strin
return configs
}

func (c *GatewayCache) upstreams(referredSvcInfo serviceInfo, svc *corev1.Service) []string {
if svc.Spec.ClusterIP == corev1.ClusterIPNone && len(svc.Spec.Selector) == 0 {
return c.getEndpointsOfHeadlessServiceWithoutSelector(referredSvcInfo, svc)
}

return c.upstreamsBySelector(referredSvcInfo, svc)
}

func (c *GatewayCache) getEndpointsOfHeadlessServiceWithoutSelector(referredSvcInfo serviceInfo, svc *corev1.Service) []string {
endpoints, err := c.informers.GetListers().Endpoints.Endpoints(svc.Namespace).Get(svc.Name)
if err != nil {
log.Error().Msgf("Failed to get Endpoints of Service %s: %s", svc.Name, err)
return nil
}

if len(endpoints.Subsets) == 0 {
return nil
}

svcPort, err := getServicePort(svc, referredSvcInfo.svcPortName.Port)
if err != nil {
log.Error().Msgf("Failed to get ServicePort: %s", err)
return nil
}

endpointSet := sets.New[string]()
for _, subset := range endpoints.Subsets {
if endpointPort := findEndpointPort(subset.Ports, svcPort); endpointPort > 0 && endpointPort <= 65535 {
for _, address := range subset.Addresses {
endpointSet.Insert(fmt.Sprintf("%s:%d", address.IP, endpointPort))
}
}
}

return sets.List(endpointSet)
}

func (c *GatewayCache) upstreamsBySelector(referredSvcInfo serviceInfo, svc *corev1.Service) []string {
svcKey := referredSvcInfo.svcPortName.NamespacedName

selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
MatchLabels: map[string]string{
constants.KubernetesEndpointSliceServiceNameLabel: svc.Name,
},
})
if err != nil {
log.Error().Msgf("Failed to convert LabelSelector to Selector: %s", err)
return nil
}

endpointSliceList, err := c.informers.GetListers().EndpointSlice.EndpointSlices(svc.Namespace).List(selector)
if err != nil {
log.Error().Msgf("Failed to list EndpointSlice of Service %s: %s", svcKey, err)
return nil
}

if len(endpointSliceList) == 0 {
return nil
}

svcPort, err := getServicePort(svc, referredSvcInfo.svcPortName.Port)
if err != nil {
log.Error().Msgf("Failed to get ServicePort: %s", err)
return nil
}

filteredSlices := filterEndpointSliceList(endpointSliceList, svcPort)
if len(filteredSlices) == 0 {
log.Error().Msgf("no valid endpoints found for Service %s and port %+v", svcKey, svcPort)
return nil
}

endpointSet := sets.New[string]()
for _, eps := range filteredSlices {
for _, endpoint := range eps.Endpoints {
if !isEndpointReady(endpoint) {
continue
}

if endpointPort := findEndpointSlicePort(eps.Ports, svcPort); endpointPort > 0 && endpointPort <= 65535 {
for _, address := range endpoint.Addresses {
endpointSet.Insert(fmt.Sprintf("%s:%d", address, endpointPort))
}
}
}
}

return sets.List(endpointSet)
}

func (c *GatewayCache) chains() fgw.Chains {
if c.cfg.GetFeatureFlags().EnableGatewayAgentService {
return fgw.Chains{
Expand Down
8 changes: 4 additions & 4 deletions pkg/gateway/cache/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,10 @@ type serviceInfo struct {
//filters []routecfg.Filter
}

type endpointInfo struct {
address string
port int32
}
//type endpointInfo struct {
// address string
// port int32
//}

type globalPolicyAttachments struct {
rateLimits map[gwpkg.PolicyMatchType][]gwpav1alpha1.RateLimitPolicy
Expand Down
23 changes: 21 additions & 2 deletions pkg/gateway/cache/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,10 +507,10 @@ func ignoreEndpointSlice(endpointSlice *discoveryv1.EndpointSlice, port corev1.S
}

// ignore endpoint slices that don't have a matching port.
return findPort(endpointSlice.Ports, port) == 0
return findEndpointSlicePort(endpointSlice.Ports, port) == 0
}

func findPort(ports []discoveryv1.EndpointPort, svcPort corev1.ServicePort) int32 {
func findEndpointSlicePort(ports []discoveryv1.EndpointPort, svcPort corev1.ServicePort) int32 {
portName := svcPort.Name
for _, p := range ports {
if p.Port == nil {
Expand All @@ -525,6 +525,25 @@ func findPort(ports []discoveryv1.EndpointPort, svcPort corev1.ServicePort) int3
return 0
}

func findEndpointPort(ports []corev1.EndpointPort, svcPort corev1.ServicePort) int32 {
for i, epPort := range ports {
if svcPort.Name == "" {
// port.Name is optional if there is only one port
return epPort.Port
}

if svcPort.Name == epPort.Name {
return epPort.Port
}

if i == len(ports)-1 && svcPort.TargetPort.Type == intstr.Int {
return svcPort.TargetPort.IntVal
}
}

return 0
}

func getDefaultPort(svcPort corev1.ServicePort) int32 {
switch svcPort.TargetPort.Type {
case intstr.Int:
Expand Down

0 comments on commit 9d944ad

Please sign in to comment.