Skip to content

Commit

Permalink
feat: Support high performance ingress with no label selector
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Mendoza committed Aug 8, 2024
1 parent 0c600bb commit 7bc2eea
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 32 deletions.
61 changes: 34 additions & 27 deletions cloud/linode/cilium_loadbalancers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package linode
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"slices"
Expand Down Expand Up @@ -63,7 +62,6 @@ var (
"ap-southeast": 16, // Sydney (Australia)
"ap-northeast": 11, // Tokyo (Japan)
}
errNoBGPSelector = errors.New("no BGP node selector set to configure IP sharing")
)

// getExistingSharedIPsInCluster determines the list of addresses to share on nodes by checking the
Expand Down Expand Up @@ -123,6 +121,9 @@ func (l *loadbalancers) shareIPs(ctx context.Context, addrs []string, node *v1.N
if err != nil {
return err
}
if node.Labels == nil {
node.Labels = make(map[string]string)
}
node.Labels[annotations.AnnLinodeNodeIPSharingUpdated] = "true"
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
_, err := l.kubeClient.CoreV1().Nodes().Update(ctx, node, metav1.UpdateOptions{})
Expand All @@ -148,16 +149,15 @@ func (l *loadbalancers) handleIPSharing(ctx context.Context, node *v1.Node) erro
klog.Info("skipping IP while providerID is unset")
return nil
}
if Options.BGPNodeSelector == "" {
return errNoBGPSelector
}
// If performing Service load-balancing via IP sharing + BGP, check for a special annotation
// added by the CCM gets set when load-balancer IPs have been successfully shared on the node
kv := strings.Split(Options.BGPNodeSelector, "=")
// Check if node should be participating in IP sharing via the given selector
if val, ok := node.Labels[kv[0]]; !ok || len(kv) != 2 || val != kv[1] {
// not a selected Node
return nil
if Options.BGPNodeSelector != "" {
kv := strings.Split(Options.BGPNodeSelector, "=")
// Check if node should be participating in IP sharing via the given selector
if val, ok := node.Labels[kv[0]]; !ok || len(kv) != 2 || val != kv[1] {
// not a selected Node
return nil
}
}
// check if node has been updated with IPs to share
if _, foundIpSharingUpdatedLabel := node.Labels[annotations.AnnLinodeNodeIPSharingUpdated]; foundIpSharingUpdatedLabel {
Expand Down Expand Up @@ -200,10 +200,6 @@ func (l *loadbalancers) handleIPSharing(ctx context.Context, node *v1.Node) erro
// createSharedIP requests an additional IP that can be shared on Nodes to support
// loadbalancing via Cilium LB IPAM + BGP Control Plane.
func (l *loadbalancers) createSharedIP(ctx context.Context, nodes []*v1.Node) (string, error) {
if Options.BGPNodeSelector == "" {
return "", errNoBGPSelector
}

ipHolder, err := l.ensureIPHolder(ctx)
if err != nil {
return "", err
Expand Down Expand Up @@ -237,13 +233,21 @@ func (l *loadbalancers) createSharedIP(ctx context.Context, nodes []*v1.Node) (s
}

// share the IPs with nodes participating in Cilium BGP peering
kv := strings.Split(Options.BGPNodeSelector, "=")
for _, node := range nodes {
if val, ok := node.Labels[kv[0]]; ok && len(kv) == 2 && val == kv[1] {
if Options.BGPNodeSelector == "" {
for _, node := range nodes {
if err = l.shareIPs(ctx, addrs, node); err != nil {
return "", err
}
}
} else {
kv := strings.Split(Options.BGPNodeSelector, "=")
for _, node := range nodes {
if val, ok := node.Labels[kv[0]]; ok && len(kv) == 2 && val == kv[1] {
if err = l.shareIPs(ctx, addrs, node); err != nil {
return "", err
}
}
}
}

return newSharedIP.Address, nil
Expand All @@ -252,9 +256,6 @@ func (l *loadbalancers) createSharedIP(ctx context.Context, nodes []*v1.Node) (s
// deleteSharedIP cleans up the shared IP for a LoadBalancer Service if it was assigned
// by Cilium LB IPAM, removing it from the ip-holder
func (l *loadbalancers) deleteSharedIP(ctx context.Context, service *v1.Service) error {
if Options.BGPNodeSelector == "" {
return errNoBGPSelector
}
err := l.retrieveKubeClient()
if err != nil {
return err
Expand Down Expand Up @@ -421,9 +422,6 @@ func (l *loadbalancers) getCiliumLBIPPool(ctx context.Context, service *v1.Servi

// NOTE: Cilium CRDs must be installed for this to work
func (l *loadbalancers) ensureCiliumBGPPeeringPolicy(ctx context.Context) error {
if Options.BGPNodeSelector == "" {
return errNoBGPSelector
}
regionID, ok := regionIDMap[l.zone]
if !ok {
return fmt.Errorf("unsupported region for BGP: %s", l.zone)
Expand All @@ -443,16 +441,25 @@ func (l *loadbalancers) ensureCiliumBGPPeeringPolicy(ctx context.Context) error
}

// otherwise create it
kv := strings.Split(Options.BGPNodeSelector, "=")
if len(kv) != 2 {
return fmt.Errorf("invalid node selector %s", Options.BGPNodeSelector)
var nodeSelector slimv1.LabelSelector
// If no BGPNodeSelector is specified, select all nodes by default.
if Options.BGPNodeSelector == "" {
nodeSelector = slimv1.LabelSelector{}
} else {
kv := strings.Split(Options.BGPNodeSelector, "=")
if len(kv) != 2 {
return fmt.Errorf("invalid node selector %s", Options.BGPNodeSelector)
}

nodeSelector = slimv1.LabelSelector{MatchLabels: map[string]string{kv[0]: kv[1]}}
}

ciliumBGPPeeringPolicy := &v2alpha1.CiliumBGPPeeringPolicy{
ObjectMeta: metav1.ObjectMeta{
Name: ciliumBGPPeeringPolicyName,
},
Spec: v2alpha1.CiliumBGPPeeringPolicySpec{
NodeSelector: &slimv1.LabelSelector{MatchLabels: map[string]string{kv[0]: kv[1]}},
NodeSelector: &nodeSelector,
VirtualRouters: []v2alpha1.CiliumBGPVirtualRouter{{
LocalASN: 65001,
ExportPodCIDR: ptr.To(true),
Expand Down
35 changes: 30 additions & 5 deletions cloud/linode/cilium_loadbalancers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package linode
import (
"context"
"encoding/json"
"errors"
"fmt"
"net"
"testing"
Expand Down Expand Up @@ -139,19 +138,45 @@ func addNodes(t *testing.T, kubeClient kubernetes.Interface, nodes []*v1.Node) {
}

func testNoBGPNodeLabel(t *testing.T, mc *mocks.MockClient) {
Options.BGPNodeSelector = ""
svc := createTestService()

kubeClient, _ := k8sClient.NewFakeClientset()
ciliumClient := &fakev2alpha1.FakeCiliumV2alpha1{Fake: &kubeClient.CiliumFakeClientset.Fake}
addService(t, kubeClient, svc)
addNodes(t, kubeClient, nodes)
lb := &loadbalancers{mc, zone, kubeClient, ciliumClient, ciliumLBType}

filter := map[string]string{"label": fmt.Sprintf("%s-%s", ipHolderLabelPrefix, zone)}
rawFilter, _ := json.Marshal(filter)
mc.EXPECT().ListInstances(gomock.Any(), linodego.NewListOptions(1, string(rawFilter))).Times(1).Return([]linodego.Instance{}, nil)
dummySharedIP := "45.76.101.26"
mc.EXPECT().CreateInstance(gomock.Any(), gomock.Any()).Times(1).Return(&ipHolderInstance, nil)
mc.EXPECT().GetInstanceIPAddresses(gomock.Any(), ipHolderInstance.ID).Times(1).Return(&linodego.InstanceIPAddressResponse{
IPv4: &linodego.InstanceIPv4Response{
Shared: []*linodego.InstanceIP{{Address: dummySharedIP}},
},
}, nil)
mc.EXPECT().AddInstanceIPAddress(gomock.Any(), ipHolderInstance.ID, true).Times(1).Return(&linodego.InstanceIP{Address: dummySharedIP}, nil)
mc.EXPECT().ShareIPAddresses(gomock.Any(), linodego.IPAddressesShareOptions{
IPs: []string{dummySharedIP},
LinodeID: 11111,
}).Times(1)
mc.EXPECT().ShareIPAddresses(gomock.Any(), linodego.IPAddressesShareOptions{
IPs: []string{dummySharedIP},
LinodeID: 22222,
}).Times(1)
mc.EXPECT().ShareIPAddresses(gomock.Any(), linodego.IPAddressesShareOptions{
IPs: []string{dummySharedIP},
LinodeID: 33333,
}).Times(1)

lbStatus, err := lb.EnsureLoadBalancer(context.TODO(), "linodelb", svc, nodes)
if !errors.Is(err, errNoBGPSelector) {
t.Fatalf("expected %v, got %v... %s", errNoBGPSelector, err, Options.BGPNodeSelector)
if err != nil {
t.Fatalf("expected a nil error, got %v", err)
}
if lbStatus != nil {
t.Fatalf("expected a nil lbStatus, got %v", lbStatus)
if lbStatus == nil {
t.Fatal("expected non-nil lbStatus")
}
}

Expand Down

0 comments on commit 7bc2eea

Please sign in to comment.