Skip to content

Commit

Permalink
Merge pull request #10 from cloudscale-ch/denis/floating-ips
Browse files Browse the repository at this point in the history
Add support for Floating IPs
  • Loading branch information
href authored Jan 19, 2024
2 parents 13c5782 + 6968c48 commit 761dfe8
Show file tree
Hide file tree
Showing 11 changed files with 491 additions and 12 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ test:
./pkg/internal/compare

integration:
K8TEST_PATH=${PWD}/k8test go test -count=1 -tags=integration ./pkg/internal/integration -v
K8TEST_PATH=${PWD}/k8test go test -count=1 -tags=integration ./pkg/internal/integration -v -timeout 30m

coverage: test
go tool cover -html=cover.out
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ kind: Service
metadata:
annotations:
k8s.cloudscale.ch/loadbalancer-listener-allowed-cidrs: '["1.2.3.0/24"]'
k8s.cloudscale.ch/loadbalancer-floating-ips: '["1.2.3.4/32"]'
```

The full set of configuration toggles can be found in the [`pkg/cloudscale_ccm/loadbalancer.go`](pkg/cloudscale/ccm/loadbalancer.go) file.
Expand Down
57 changes: 57 additions & 0 deletions examples/nginx-hello.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Deploys the nginxdemos/hello:plain-text container and creates a
# loadbalancer service for it:
#
# export KUBECONFIG=path/to/kubeconfig
# kubectl apply -f nginx-hello.yml
#
# Wait for `kubectl describe service hello` to show "Loadbalancer Ensured",
# then use the IP address found under "LoadBalancer Ingress" to connect to the
# service.
#
# You can also use the following shortcut:
#
# curl http://$(kubectl get service hello -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
#
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: hello
spec:
replicas: 2
selector:
matchLabels:
app: hello
template:
metadata:
labels:
app: hello
spec:
containers:
- name: hello
image: nginxdemos/hello:plain-text

# Spread the containers across nodes
topologySpreadConstraints:
- maxSkew: 1
topologyKey: kubernetes.io/hostname
whenUnsatisfiable: DoNotSchedule
labelSelector:
matchLabels:
app: hello
---
apiVersion: v1
kind: Service
metadata:
labels:
app: hello
name: hello
spec:
ports:
- port: 80
protocol: TCP
targetPort: 80
name: primary
selector:
app: hello
type: LoadBalancer
156 changes: 150 additions & 6 deletions pkg/cloudscale_ccm/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package cloudscale_ccm
import (
"context"
"fmt"
"strings"

"github.com/cloudscale-ch/cloudscale-cloud-controller-manager/pkg/internal/kubeutil"
"github.com/cloudscale-ch/cloudscale-go-sdk/v4"
"golang.org/x/exp/slices"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -73,6 +76,33 @@ const (
// resources instead.
LoadBalancerVIPAddresses = "k8s.cloudscale.ch/loadbalancer-vip-addresses"

// LoadBalancerFloatingIPs assigns the given Floating IPs to the
// load balancer. The expected value is a list of addresses of the
// Floating IPs in CIDR notation. For example:
//
// ["5.102.150.123/32", "2a06:c01::123/128"]
//
// If any Floating IP address is assigned to multiple services via this
// annotation, the CCM will refuse to update the associated services, as
// this is considered a serious configuration issue that has to first be
// resolved by the operator.
//
// While the service being handled needs to have a parseable Floating IP
// config, the services it is compared to for conflict detection do not.
//
// Such services are skipped during conflict detection with the goal
// of limiting the impact of config parse errors to the service being
// processed.
//
// Floating IPs already assigned to the loadbalancer, but no longer
// present in the annotations, stay on the loadbalancer until another
// service requests them. This is due to the fact that it is not possible
// to unassign Floating IPs to point to nowhere.
//
// The Floating IPs are only assigned to the LoadBalancer once it has
// been fully created.
LoadBalancerFloatingIPs = "k8s.cloudscale.ch/loadbalancer-floating-ips"

// LoadBalancerPoolAlgorithm defines the load balancing algorithm used
// by the loadbalancer. See the API documentation for more information:
//
Expand Down Expand Up @@ -279,9 +309,9 @@ func (l *loadbalancer) EnsureLoadBalancer(
nodes []*v1.Node,
) (*v1.LoadBalancerStatus, error) {

// Skip if the service is not supported by this CCM
// Detect configuration issues and abort if they are found
serviceInfo := newServiceInfo(service, clusterName)
if supported, err := serviceInfo.isSupported(); !supported {
if err := l.ensureValidConfig(ctx, serviceInfo); err != nil {
return nil, err
}

Expand Down Expand Up @@ -347,9 +377,9 @@ func (l *loadbalancer) UpdateLoadBalancer(
nodes []*v1.Node,
) error {

// Skip if the service is not supported by this CCM
// Detect configuration issues and abort if they are found
serviceInfo := newServiceInfo(service, clusterName)
if supported, err := serviceInfo.isSupported(); !supported {
if err := l.ensureValidConfig(ctx, serviceInfo); err != nil {
return err
}

Expand Down Expand Up @@ -388,9 +418,9 @@ func (l *loadbalancer) EnsureLoadBalancerDeleted(
service *v1.Service,
) error {

// Skip if the service is not supported by this CCM
// Detect configuration issues and abort if they are found
serviceInfo := newServiceInfo(service, clusterName)
if supported, err := serviceInfo.isSupported(); !supported {
if err := l.ensureValidConfig(ctx, serviceInfo); err != nil {
return err
}

Expand All @@ -402,6 +432,120 @@ func (l *loadbalancer) EnsureLoadBalancerDeleted(
})
}

// ensureValidConfig ensures that the configuration can be applied at all,
// acting as a gate that ensures certain invariants before any changes are
// made.
//
// The general idea is that it's better to not make any chanages if the config
// is bad, rather than throwing errors later when some changes have already
// been made.
func (l *loadbalancer) ensureValidConfig(
ctx context.Context, serviceInfo *serviceInfo) error {

// Skip if the service is not supported by this CCM
if supported, err := serviceInfo.isSupported(); !supported {
return err
}

// If Floating IPs are used, make sure there are no conflicting
// assignment across services.
ips, err := l.findIPsAssignedElsewhere(ctx, serviceInfo)
if err != nil {
return fmt.Errorf("could not parse %s", LoadBalancerFloatingIPs)
}

if len(ips) > 0 {

info := make([]string, 0, len(ips))
for ip, service := range ips {
info = append(info, fmt.Sprintf("%s->%s", ip, service))
}

return fmt.Errorf(
"at least one Floating IP assigned to service %s is also "+
"assigned to another service. Refusing to continue to avoid "+
"flapping: %s",
serviceInfo.Service.Name,
strings.Join(info, ", "),
)
}

return nil
}

// findIPsAssignedElsewhere lists other services and compares their Floating
// IPs with the ones found on the given service. If an IP is found to be
// assigned to two services, the IP and the name of the service are returned.
func (l *loadbalancer) findIPsAssignedElsewhere(
ctx context.Context, serviceInfo *serviceInfo) (map[string]string, error) {

ips, err := serviceInfo.annotationList(LoadBalancerFloatingIPs)
if err != nil {
return nil, err
}

if len(ips) == 0 {
return nil, nil
}

conflicts := make(map[string]string, 0)

// Unfortuantely, there's no way to filter for the services that matter
// here. The only available field selectors for services are
// `metadata.name` and `metadata.namespace`.
//
// To support larger clusters, ensure to not load all services in a
// single call.
opts := metav1.ListOptions{
Continue: "",
Limit: 250,
}

svcs := l.k8s.CoreV1().Services("")
for {
services, err := svcs.List(ctx, opts)
if err != nil {
return nil, fmt.Errorf("failed to retrieve services: %w", err)
}

for _, service := range services.Items {
if service.Spec.Type != "LoadBalancer" {
continue
}
if service.UID == serviceInfo.Service.UID {
continue
}

otherInfo := newServiceInfo(&service, serviceInfo.clusterName)
other, err := otherInfo.annotationList(LoadBalancerFloatingIPs)

// Ignore errors loading the IPs of other services, as they would
// not be configured either, if the current service is otherwise
// okay, it should be able to continue.
//
// If this is not done, a single configuration error on a service
// causes this function to err on all other services.
if err != nil {
continue
}

for _, ip := range other {
if slices.Contains(ips, ip) {
conflicts[ip] = service.Name
}
}
}

if services.Continue == "" {
break
}

opts.Continue = services.Continue
}

return conflicts, nil
}

// loadBalancerStatus generates the v1.LoadBalancerStatus for the given
// loadbalancer, as required by Kubernetes.
func loadBalancerStatus(lb *cloudscale.LoadBalancer) *v1.LoadBalancerStatus {
Expand Down
39 changes: 39 additions & 0 deletions pkg/cloudscale_ccm/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ type lbState struct {
// necessarily bound to any given pool.
listeners map[*cloudscale.LoadBalancerPool][]cloudscale.
LoadBalancerListener

// The assigned floating IPs
floatingIPs []string
}

func newLbState(lb *cloudscale.LoadBalancer) *lbState {
Expand All @@ -44,6 +47,7 @@ func newLbState(lb *cloudscale.LoadBalancer) *lbState {
map[*cloudscale.LoadBalancerPool][]cloudscale.LoadBalancerHealthMonitor),
listeners: make(
map[*cloudscale.LoadBalancerPool][]cloudscale.LoadBalancerListener),
floatingIPs: make([]string, 0),
}
}

Expand Down Expand Up @@ -104,6 +108,14 @@ func desiredLbState(
},
})

// Get list of floating IPs if possible
ips, err := serviceInfo.annotationList(LoadBalancerFloatingIPs)
if err != nil {
return nil, fmt.Errorf("could not parse %s", LoadBalancerFloatingIPs)
}

s.floatingIPs = ips

// Each service port gets its own pool
algorithm := serviceInfo.annotation(LoadBalancerPoolAlgorithm)
protocol := serviceInfo.annotation(LoadBalancerPoolProtocol)
Expand Down Expand Up @@ -297,6 +309,19 @@ func actualLbState(
s.listeners[nil] = append(s.listeners[nil], l)
}

// Find all floating IPs assigned to the loadbalancer
ips, err := l.client.FloatingIPs.List(ctx)
if err != nil {
return nil, fmt.Errorf(
"lbstate: failed to load floating ips: %w", err)
}

for _, ip := range ips {
if ip.LoadBalancer != nil && ip.LoadBalancer.UUID == lb.UUID {
s.floatingIPs = append(s.floatingIPs, ip.Network)
}
}

return s, nil
}

Expand Down Expand Up @@ -673,6 +698,20 @@ func nextLbActions(
}
}

// Find the Floating IPs that need to be changed
_, assign := compare.Diff(
desired.floatingIPs, actual.floatingIPs, func(ip string) string {
return ip
},
)

for _, ip := range assign {
next = append(next, actions.AssignFloatingIP(
ip,
actual.lb.UUID,
))
}

return next, nil
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/cloudscale_ccm/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ func TestActualState(t *testing.T) {
},
},
)
server.On("/v1/floating-ips", 200,
[]cloudscale.FloatingIP{},
)
server.Start()
defer server.Close()

Expand Down
2 changes: 2 additions & 0 deletions pkg/cloudscale_ccm/service_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ func (s serviceInfo) annotation(key string) string {
return s.annotationOrDefault(key, "lb-standard")
case LoadBalancerVIPAddresses:
return s.annotationOrDefault(key, "[]")
case LoadBalancerFloatingIPs:
return s.annotationOrDefault(key, "[]")
case LoadBalancerPoolAlgorithm:
return s.annotationOrDefault(key, "round_robin")
case LoadBalancerHealthMonitorDelayS:
Expand Down
Loading

0 comments on commit 761dfe8

Please sign in to comment.