Skip to content

Commit

Permalink
Implementing ExternalTrafficPolicy: local in winkernel kube-proxy via…
Browse files Browse the repository at this point in the history
… DSR
  • Loading branch information
elweb9858 committed Oct 30, 2020
1 parent 1968e96 commit 1bcddb0
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 6 deletions.
21 changes: 15 additions & 6 deletions pkg/proxy/winkernel/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ type serviceInfo struct {
remoteEndpoint *endpointsInfo
hns HostNetworkService
preserveDIP bool
localTrafficDSR bool
}

type hnsNetworkInfo struct {
Expand Down Expand Up @@ -350,9 +351,11 @@ func (refCountMap endPointsReferenceCountMap) getRefCount(hnsID string) *uint16
func (proxier *Proxier) newServiceInfo(port *v1.ServicePort, service *v1.Service, baseInfo *proxy.BaseServiceInfo) proxy.ServicePort {
info := &serviceInfo{BaseServiceInfo: baseInfo}
preserveDIP := service.Annotations["preserve-destination"] == "true"
localTrafficDSR := service.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal
err := hcn.DSRSupported()
if err != nil {
preserveDIP = false
localTrafficDSR = false
}
// targetPort is zero if it is specified as a name in port.TargetPort.
// Its real value would be got later from endpoints.
Expand All @@ -364,6 +367,7 @@ func (proxier *Proxier) newServiceInfo(port *v1.ServicePort, service *v1.Service
info.preserveDIP = preserveDIP
info.targetPort = targetPort
info.hns = proxier.hns
info.localTrafficDSR = localTrafficDSR

for _, eip := range service.Spec.ExternalIPs {
info.externalIPs = append(info.externalIPs, &externalIPInfo{ip: eip})
Expand Down Expand Up @@ -1157,12 +1161,12 @@ func (proxier *Proxier) syncProxyRules() {
// If the preserve-destination service annotation is present, we will disable routing mesh for NodePort.
// This means that health services can use Node Port without falsely getting results from a different node.
nodePortEndpoints := hnsEndpoints
if svcInfo.preserveDIP {
if svcInfo.preserveDIP || svcInfo.localTrafficDSR {
nodePortEndpoints = hnsLocalEndpoints
}
hnsLoadBalancer, err := hns.getLoadBalancer(
nodePortEndpoints,
loadBalancerFlags{localRoutedVIP: true, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode},
loadBalancerFlags{isDSR: svcInfo.localTrafficDSR, localRoutedVIP: true, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode},
sourceVip,
"",
Enum(svcInfo.Protocol()),
Expand All @@ -1180,10 +1184,15 @@ func (proxier *Proxier) syncProxyRules() {

// Create a Load Balancer Policy for each external IP
for _, externalIP := range svcInfo.externalIPs {
// Disable routing mesh if ExternalTrafficPolicy is set to local
externalIPEndpoints := hnsEndpoints
if svcInfo.localTrafficDSR {
externalIPEndpoints = hnsLocalEndpoints
}
// Try loading existing policies, if already available
hnsLoadBalancer, err = hns.getLoadBalancer(
hnsEndpoints,
loadBalancerFlags{sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode},
externalIPEndpoints,
loadBalancerFlags{isDSR: svcInfo.localTrafficDSR, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode},
sourceVip,
externalIP.ip,
Enum(svcInfo.Protocol()),
Expand All @@ -1201,12 +1210,12 @@ func (proxier *Proxier) syncProxyRules() {
for _, lbIngressIP := range svcInfo.loadBalancerIngressIPs {
// Try loading existing policies, if already available
lbIngressEndpoints := hnsEndpoints
if svcInfo.preserveDIP {
if svcInfo.preserveDIP || svcInfo.localTrafficDSR {
lbIngressEndpoints = hnsLocalEndpoints
}
hnsLoadBalancer, err := hns.getLoadBalancer(
lbIngressEndpoints,
loadBalancerFlags{isDSR: svcInfo.preserveDIP || proxier.isDSR, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode},
loadBalancerFlags{isDSR: svcInfo.preserveDIP || proxier.isDSR || svcInfo.localTrafficDSR, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode},
sourceVip,
lbIngressIP.ip,
Enum(svcInfo.Protocol()),
Expand Down
76 changes: 76 additions & 0 deletions pkg/proxy/winkernel/proxier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type fakeHNS struct{}
func newFakeHNS() *fakeHNS {
return &fakeHNS{}
}

func (hns fakeHNS) getNetworkByName(name string) (*hnsNetworkInfo, error) {
var remoteSubnets []*remoteSubnetInfo
rs := &remoteSubnetInfo{
Expand All @@ -63,9 +64,11 @@ func (hns fakeHNS) getNetworkByName(name string) (*hnsNetworkInfo, error) {
remoteSubnets: remoteSubnets,
}, nil
}

func (hns fakeHNS) getEndpointByID(id string) (*endpointsInfo, error) {
return nil, nil
}

func (hns fakeHNS) getEndpointByIpAddress(ip string, networkName string) (*endpointsInfo, error) {
_, ipNet, _ := net.ParseCIDR(destinationPrefix)

Expand All @@ -81,6 +84,7 @@ func (hns fakeHNS) getEndpointByIpAddress(ip string, networkName string) (*endpo
return nil, nil

}

func (hns fakeHNS) createEndpoint(ep *endpointsInfo, networkName string) (*endpointsInfo, error) {
return &endpointsInfo{
ip: ep.ip,
Expand All @@ -90,17 +94,21 @@ func (hns fakeHNS) createEndpoint(ep *endpointsInfo, networkName string) (*endpo
hns: hns,
}, nil
}

func (hns fakeHNS) deleteEndpoint(hnsID string) error {
return nil
}

func (hns fakeHNS) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error) {
return &loadBalancerInfo{
hnsID: guid,
}, nil
}

func (hns fakeHNS) deleteLoadBalancer(hnsID string) error {
return nil
}

func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clusterCIDR string, hostname string, nodeIP net.IP, networkType string, endpointSliceEnabled bool) *Proxier {
sourceVip := "192.168.1.2"
hnsNetworkInfo := &hnsNetworkInfo{
Expand Down Expand Up @@ -187,6 +195,7 @@ func TestCreateServiceVip(t *testing.T) {
}
}
}

func TestCreateRemoteEndpointOverlay(t *testing.T) {
syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "Overlay", false)
Expand Down Expand Up @@ -251,6 +260,7 @@ func TestCreateRemoteEndpointOverlay(t *testing.T) {
t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[guid], *epInfo.refCount)
}
}

func TestCreateRemoteEndpointL2Bridge(t *testing.T) {
syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "L2Bridge", false)
Expand Down Expand Up @@ -695,6 +705,69 @@ func TestCreateLoadBalancer(t *testing.T) {
}

}

func TestCreateDsrLoadBalancer(t *testing.T) {
syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "Overlay", false)
if proxier == nil {
t.Error()
}

svcIP := "10.20.30.41"
svcPort := 80
svcNodePort := 3001
svcPortName := proxy.ServicePortName{
NamespacedName: makeNSN("ns1", "svc1"),
Port: "p80",
Protocol: v1.ProtocolTCP,
}

makeServiceMap(proxier,
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
svc.Spec.Type = "NodePort"
svc.Spec.ClusterIP = svcIP
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
svc.Spec.Ports = []v1.ServicePort{{
Name: svcPortName.Port,
Port: int32(svcPort),
Protocol: v1.ProtocolTCP,
NodePort: int32(svcNodePort),
}}
}),
)
makeEndpointsMap(proxier,
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
ept.Subsets = []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{
IP: epIpAddressRemote,
}},
Ports: []v1.EndpointPort{{
Name: svcPortName.Port,
Port: int32(svcPort),
Protocol: v1.ProtocolTCP,
}},
}}
}),
)

proxier.setInitialized(true)
proxier.syncProxyRules()

svc := proxier.serviceMap[svcPortName]
svcInfo, ok := svc.(*serviceInfo)
if !ok {
t.Errorf("Failed to cast serviceInfo %q", svcPortName.String())

} else {
if svcInfo.hnsID != guid {
t.Errorf("%v does not match %v", svcInfo.hnsID, guid)
}
if svcInfo.localTrafficDSR != true {
t.Errorf("Failed to create DSR loadbalancer with local traffic policy")
}
}
}

func TestEndpointSlice(t *testing.T) {
syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "Overlay", true)
Expand Down Expand Up @@ -767,6 +840,7 @@ func TestEndpointSlice(t *testing.T) {
}
}
}

func TestNoopEndpointSlice(t *testing.T) {
p := Proxier{}
p.OnEndpointSliceAdd(&discovery.EndpointSlice{})
Expand Down Expand Up @@ -799,6 +873,7 @@ func TestFindRemoteSubnetProviderAddress(t *testing.T) {
func makeNSN(namespace, name string) types.NamespacedName {
return types.NamespacedName{Namespace: namespace, Name: name}
}

func makeServiceMap(proxier *Proxier, allServices ...*v1.Service) {
for i := range allServices {
proxier.OnServiceAdd(allServices[i])
Expand All @@ -817,6 +892,7 @@ func deleteServices(proxier *Proxier, allServices ...*v1.Service) {
defer proxier.mu.Unlock()
proxier.servicesSynced = true
}

func makeTestService(namespace, name string, svcFunc func(*v1.Service)) *v1.Service {
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Expand Down

0 comments on commit 1bcddb0

Please sign in to comment.