Skip to content

Commit

Permalink
Add Floating IP support
Browse files Browse the repository at this point in the history
The integration tests are currently failing, there is an internal
ticket to track that issue.
  • Loading branch information
href committed Jan 19, 2024
1 parent ee103a6 commit 6968c48
Show file tree
Hide file tree
Showing 11 changed files with 437 additions and 15 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ test:
./pkg/internal/compare

integration:
K8TEST_PATH=${PWD}/k8test go test -count=1 -tags=integration ./pkg/internal/integration -v
K8TEST_PATH=${PWD}/k8test go test -count=1 -tags=integration ./pkg/internal/integration -v -timeout 30m

coverage: test
go tool cover -html=cover.out
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ kind: Service
metadata:
annotations:
k8s.cloudscale.ch/loadbalancer-listener-allowed-cidrs: '["1.2.3.0/24"]'
k8s.cloudscale.ch/loadbalancer-floating-ips: '["1.2.3.4/32"]'
```

The full set of configuration toggles can be found in the [`pkg/cloudscale_ccm/loadbalancer.go`](pkg/cloudscale/ccm/loadbalancer.go) file.
Expand Down
6 changes: 3 additions & 3 deletions examples/nginx-hello.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
# export KUBECONFIG=path/to/kubeconfig
# kubectl apply -f nginx-hello.yml
#
# Wait for `kubectl describe hello` to show "Loadbalancer Ensured", then
# use the IP address found under "LoadBalancer Ingress" to connect to the
# Wait for `kubectl describe service hello` to show "Loadbalancer Ensured",
# then use the IP address found under "LoadBalancer Ingress" to connect to the
# service.
#
# You can also use the following shortcut:
#
# curl http://$(kubectl get service hello -o json | jq -r '.status.loadBalancer.ingress[0].ip')
# curl http://$(kubectl get service hello -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
#
---
apiVersion: apps/v1
Expand Down
156 changes: 150 additions & 6 deletions pkg/cloudscale_ccm/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package cloudscale_ccm
import (
"context"
"fmt"
"strings"

"github.com/cloudscale-ch/cloudscale-cloud-controller-manager/pkg/internal/kubeutil"
"github.com/cloudscale-ch/cloudscale-go-sdk/v4"
"golang.org/x/exp/slices"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -73,6 +76,33 @@ const (
// resources instead.
LoadBalancerVIPAddresses = "k8s.cloudscale.ch/loadbalancer-vip-addresses"

// LoadBalancerFloatingIPs assigns the given Floating IPs to the
// load balancer. The expected value is a list of addresses of the
// Floating IPs in CIDR notation. For example:
//
// ["5.102.150.123/32", "2a06:c01::123/128"]
//
// If any Floating IP address is assigned to multiple services via this
// annotation, the CCM will refuse to update the associated services, as
// this is considered a serious configuration issue that has to first be
// resolved by the operator.
//
// While the service being handled needs to have a parseable Floating IP
// config, the services it is compared to for conflict detection do not.
//
// Such services are skipped during conflict detection with the goal
// of limiting the impact of config parse errors to the service being
// processed.
//
// Floating IPs already assigned to the loadbalancer, but no longer
// present in the annotations, stay on the loadbalancer until another
// service requests them. This is due to the fact that it is not possible
// to unassign Floating IPs to point to nowhere.
//
// The Floating IPs are only assigned to the LoadBalancer once it has
// been fully created.
LoadBalancerFloatingIPs = "k8s.cloudscale.ch/loadbalancer-floating-ips"

// LoadBalancerPoolAlgorithm defines the load balancing algorithm used
// by the loadbalancer. See the API documentation for more information:
//
Expand Down Expand Up @@ -279,9 +309,9 @@ func (l *loadbalancer) EnsureLoadBalancer(
nodes []*v1.Node,
) (*v1.LoadBalancerStatus, error) {

// Skip if the service is not supported by this CCM
// Detect configuration issues and abort if they are found
serviceInfo := newServiceInfo(service, clusterName)
if supported, err := serviceInfo.isSupported(); !supported {
if err := l.ensureValidConfig(ctx, serviceInfo); err != nil {
return nil, err
}

Expand Down Expand Up @@ -347,9 +377,9 @@ func (l *loadbalancer) UpdateLoadBalancer(
nodes []*v1.Node,
) error {

// Skip if the service is not supported by this CCM
// Detect configuration issues and abort if they are found
serviceInfo := newServiceInfo(service, clusterName)
if supported, err := serviceInfo.isSupported(); !supported {
if err := l.ensureValidConfig(ctx, serviceInfo); err != nil {
return err
}

Expand Down Expand Up @@ -388,9 +418,9 @@ func (l *loadbalancer) EnsureLoadBalancerDeleted(
service *v1.Service,
) error {

// Skip if the service is not supported by this CCM
// Detect configuration issues and abort if they are found
serviceInfo := newServiceInfo(service, clusterName)
if supported, err := serviceInfo.isSupported(); !supported {
if err := l.ensureValidConfig(ctx, serviceInfo); err != nil {
return err
}

Expand All @@ -402,6 +432,120 @@ func (l *loadbalancer) EnsureLoadBalancerDeleted(
})
}

// ensureValidConfig ensures that the configuration can be applied at all,
// acting as a gate that ensures certain invariants before any changes are
// made.
//
// The general idea is that it's better to not make any chanages if the config
// is bad, rather than throwing errors later when some changes have already
// been made.
func (l *loadbalancer) ensureValidConfig(
ctx context.Context, serviceInfo *serviceInfo) error {

// Skip if the service is not supported by this CCM
if supported, err := serviceInfo.isSupported(); !supported {
return err
}

// If Floating IPs are used, make sure there are no conflicting
// assignment across services.
ips, err := l.findIPsAssignedElsewhere(ctx, serviceInfo)
if err != nil {
return fmt.Errorf("could not parse %s", LoadBalancerFloatingIPs)
}

if len(ips) > 0 {

info := make([]string, 0, len(ips))
for ip, service := range ips {
info = append(info, fmt.Sprintf("%s->%s", ip, service))
}

return fmt.Errorf(
"at least one Floating IP assigned to service %s is also "+
"assigned to another service. Refusing to continue to avoid "+
"flapping: %s",
serviceInfo.Service.Name,
strings.Join(info, ", "),
)
}

return nil
}

// findIPsAssignedElsewhere lists other services and compares their Floating
// IPs with the ones found on the given service. If an IP is found to be
// assigned to two services, the IP and the name of the service are returned.
func (l *loadbalancer) findIPsAssignedElsewhere(
ctx context.Context, serviceInfo *serviceInfo) (map[string]string, error) {

ips, err := serviceInfo.annotationList(LoadBalancerFloatingIPs)
if err != nil {
return nil, err
}

if len(ips) == 0 {
return nil, nil
}

conflicts := make(map[string]string, 0)

// Unfortuantely, there's no way to filter for the services that matter
// here. The only available field selectors for services are
// `metadata.name` and `metadata.namespace`.
//
// To support larger clusters, ensure to not load all services in a
// single call.
opts := metav1.ListOptions{
Continue: "",
Limit: 250,
}

svcs := l.k8s.CoreV1().Services("")
for {
services, err := svcs.List(ctx, opts)
if err != nil {
return nil, fmt.Errorf("failed to retrieve services: %w", err)
}

for _, service := range services.Items {
if service.Spec.Type != "LoadBalancer" {
continue
}
if service.UID == serviceInfo.Service.UID {
continue
}

otherInfo := newServiceInfo(&service, serviceInfo.clusterName)
other, err := otherInfo.annotationList(LoadBalancerFloatingIPs)

// Ignore errors loading the IPs of other services, as they would
// not be configured either, if the current service is otherwise
// okay, it should be able to continue.
//
// If this is not done, a single configuration error on a service
// causes this function to err on all other services.
if err != nil {
continue
}

for _, ip := range other {
if slices.Contains(ips, ip) {
conflicts[ip] = service.Name
}
}
}

if services.Continue == "" {
break
}

opts.Continue = services.Continue
}

return conflicts, nil
}

// loadBalancerStatus generates the v1.LoadBalancerStatus for the given
// loadbalancer, as required by Kubernetes.
func loadBalancerStatus(lb *cloudscale.LoadBalancer) *v1.LoadBalancerStatus {
Expand Down
39 changes: 39 additions & 0 deletions pkg/cloudscale_ccm/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ type lbState struct {
// necessarily bound to any given pool.
listeners map[*cloudscale.LoadBalancerPool][]cloudscale.
LoadBalancerListener

// The assigned floating IPs
floatingIPs []string
}

func newLbState(lb *cloudscale.LoadBalancer) *lbState {
Expand All @@ -44,6 +47,7 @@ func newLbState(lb *cloudscale.LoadBalancer) *lbState {
map[*cloudscale.LoadBalancerPool][]cloudscale.LoadBalancerHealthMonitor),
listeners: make(
map[*cloudscale.LoadBalancerPool][]cloudscale.LoadBalancerListener),
floatingIPs: make([]string, 0),
}
}

Expand Down Expand Up @@ -104,6 +108,14 @@ func desiredLbState(
},
})

// Get list of floating IPs if possible
ips, err := serviceInfo.annotationList(LoadBalancerFloatingIPs)
if err != nil {
return nil, fmt.Errorf("could not parse %s", LoadBalancerFloatingIPs)
}

s.floatingIPs = ips

// Each service port gets its own pool
algorithm := serviceInfo.annotation(LoadBalancerPoolAlgorithm)
protocol := serviceInfo.annotation(LoadBalancerPoolProtocol)
Expand Down Expand Up @@ -297,6 +309,19 @@ func actualLbState(
s.listeners[nil] = append(s.listeners[nil], l)
}

// Find all floating IPs assigned to the loadbalancer
ips, err := l.client.FloatingIPs.List(ctx)
if err != nil {
return nil, fmt.Errorf(
"lbstate: failed to load floating ips: %w", err)
}

for _, ip := range ips {
if ip.LoadBalancer != nil && ip.LoadBalancer.UUID == lb.UUID {
s.floatingIPs = append(s.floatingIPs, ip.Network)
}
}

return s, nil
}

Expand Down Expand Up @@ -673,6 +698,20 @@ func nextLbActions(
}
}

// Find the Floating IPs that need to be changed
_, assign := compare.Diff(
desired.floatingIPs, actual.floatingIPs, func(ip string) string {
return ip
},
)

for _, ip := range assign {
next = append(next, actions.AssignFloatingIP(
ip,
actual.lb.UUID,
))
}

return next, nil
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/cloudscale_ccm/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ func TestActualState(t *testing.T) {
},
},
)
server.On("/v1/floating-ips", 200,
[]cloudscale.FloatingIP{},
)
server.Start()
defer server.Close()

Expand Down
2 changes: 2 additions & 0 deletions pkg/cloudscale_ccm/service_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ func (s serviceInfo) annotation(key string) string {
return s.annotationOrDefault(key, "lb-standard")
case LoadBalancerVIPAddresses:
return s.annotationOrDefault(key, "[]")
case LoadBalancerFloatingIPs:
return s.annotationOrDefault(key, "[]")
case LoadBalancerPoolAlgorithm:
return s.annotationOrDefault(key, "round_robin")
case LoadBalancerHealthMonitorDelayS:
Expand Down
28 changes: 28 additions & 0 deletions pkg/internal/actions/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,3 +562,31 @@ func (a *UpdateMonitorNumberAction) Run(
return ProceedOnSuccess(
client.LoadBalancerHealthMonitors.Update(ctx, a.monitorUUID, &req))
}

// AssignFloatingIP assigns a Floating IP to the given LoadBalancer UUID
type AssignFloatingIPAction struct {
ip string
lbUUID string
}

func AssignFloatingIP(ip string, lbUUID string) Action {
return &AssignFloatingIPAction{ip: ip, lbUUID: lbUUID}
}

func (a *AssignFloatingIPAction) Label() string {
return fmt.Sprintf("assign-floating-ip(%s -> %s)", a.ip, a.lbUUID)
}

func (a *AssignFloatingIPAction) Run(
ctx context.Context, client *cloudscale.Client) (Control, error) {

ip := strings.SplitN(a.ip, "/", 2)[0]

err := client.FloatingIPs.Update(
ctx, ip, &cloudscale.FloatingIPUpdateRequest{
LoadBalancer: a.lbUUID,
},
)

return ProceedOnSuccess(err)
}
21 changes: 21 additions & 0 deletions pkg/internal/actions/actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,3 +514,24 @@ func TestUpdateMonitorNumberAction(t *testing.T) {
assert.Error(t, err)
assert.Equal(t, Errored, v)
}

func TestAssignFloatingIPAction(t *testing.T) {
server := testkit.NewMockAPIServer()
server.On("/v1/floating-ips/100.1.1.1", 201, "{}")
server.Start()
defer server.Close()

action := AssignFloatingIP(
"100.1.1.1/24", "00000000-0000-0000-0000-000000000000")

assert.NotEmpty(t, action.Label())
v, err := action.Run(context.Background(), server.Client())

assert.NoError(t, err)
assert.Equal(t, Proceed, v)

var sent cloudscale.FloatingIPUpdateRequest
server.LastSent(&sent)

assert.Equal(t, "00000000-0000-0000-0000-000000000000", sent.LoadBalancer)
}
Loading

0 comments on commit 6968c48

Please sign in to comment.