diff --git a/Makefile b/Makefile index b2d0c06..41532c4 100644 --- a/Makefile +++ b/Makefile @@ -17,7 +17,7 @@ test: ./pkg/internal/compare integration: - K8TEST_PATH=${PWD}/k8test go test -count=1 -tags=integration ./pkg/internal/integration -v + K8TEST_PATH=${PWD}/k8test go test -count=1 -tags=integration ./pkg/internal/integration -v -timeout 30m coverage: test go tool cover -html=cover.out diff --git a/README.md b/README.md index dd5e21e..6f20929 100644 --- a/README.md +++ b/README.md @@ -211,6 +211,7 @@ kind: Service metadata: annotations: k8s.cloudscale.ch/loadbalancer-listener-allowed-cidrs: '["1.2.3.0/24"]' + k8s.cloudscale.ch/loadbalancer-floating-ips: '["1.2.3.4/32"]' ``` The full set of configuration toggles can be found in the [`pkg/cloudscale_ccm/loadbalancer.go`](pkg/cloudscale/ccm/loadbalancer.go) file. diff --git a/examples/nginx-hello.yml b/examples/nginx-hello.yml new file mode 100644 index 0000000..345feea --- /dev/null +++ b/examples/nginx-hello.yml @@ -0,0 +1,57 @@ +# Deploys the nginxdemos/hello:plain-text container and creates a +# loadbalancer service for it: +# +# export KUBECONFIG=path/to/kubeconfig +# kubectl apply -f nginx-hello.yml +# +# Wait for `kubectl describe service hello` to show "Loadbalancer Ensured", +# then use the IP address found under "LoadBalancer Ingress" to connect to the +# service. +# +# You can also use the following shortcut: +# +# curl http://$(kubectl get service hello -o jsonpath='{.status.loadBalancer.ingress[0].ip}') +# +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: hello +spec: + replicas: 2 + selector: + matchLabels: + app: hello + template: + metadata: + labels: + app: hello + spec: + containers: + - name: hello + image: nginxdemos/hello:plain-text + + # Spread the containers across nodes + topologySpreadConstraints: + - maxSkew: 1 + topologyKey: kubernetes.io/hostname + whenUnsatisfiable: DoNotSchedule + labelSelector: + matchLabels: + app: hello +--- +apiVersion: v1 +kind: Service +metadata: + labels: + app: hello + name: hello +spec: + ports: + - port: 80 + protocol: TCP + targetPort: 80 + name: primary + selector: + app: hello + type: LoadBalancer diff --git a/pkg/cloudscale_ccm/loadbalancer.go b/pkg/cloudscale_ccm/loadbalancer.go index 0e927d9..20bb182 100644 --- a/pkg/cloudscale_ccm/loadbalancer.go +++ b/pkg/cloudscale_ccm/loadbalancer.go @@ -3,10 +3,13 @@ package cloudscale_ccm import ( "context" "fmt" + "strings" "github.com/cloudscale-ch/cloudscale-cloud-controller-manager/pkg/internal/kubeutil" "github.com/cloudscale-ch/cloudscale-go-sdk/v4" + "golang.org/x/exp/slices" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" ) @@ -73,6 +76,33 @@ const ( // resources instead. LoadBalancerVIPAddresses = "k8s.cloudscale.ch/loadbalancer-vip-addresses" + // LoadBalancerFloatingIPs assigns the given Floating IPs to the + // load balancer. The expected value is a list of addresses of the + // Floating IPs in CIDR notation. For example: + // + // ["5.102.150.123/32", "2a06:c01::123/128"] + // + // If any Floating IP address is assigned to multiple services via this + // annotation, the CCM will refuse to update the associated services, as + // this is considered a serious configuration issue that has to first be + // resolved by the operator. + // + // While the service being handled needs to have a parseable Floating IP + // config, the services it is compared to for conflict detection do not. + // + // Such services are skipped during conflict detection with the goal + // of limiting the impact of config parse errors to the service being + // processed. + // + // Floating IPs already assigned to the loadbalancer, but no longer + // present in the annotations, stay on the loadbalancer until another + // service requests them. This is due to the fact that it is not possible + // to unassign Floating IPs to point to nowhere. + // + // The Floating IPs are only assigned to the LoadBalancer once it has + // been fully created. + LoadBalancerFloatingIPs = "k8s.cloudscale.ch/loadbalancer-floating-ips" + // LoadBalancerPoolAlgorithm defines the load balancing algorithm used // by the loadbalancer. See the API documentation for more information: // @@ -279,9 +309,9 @@ func (l *loadbalancer) EnsureLoadBalancer( nodes []*v1.Node, ) (*v1.LoadBalancerStatus, error) { - // Skip if the service is not supported by this CCM + // Detect configuration issues and abort if they are found serviceInfo := newServiceInfo(service, clusterName) - if supported, err := serviceInfo.isSupported(); !supported { + if err := l.ensureValidConfig(ctx, serviceInfo); err != nil { return nil, err } @@ -347,9 +377,9 @@ func (l *loadbalancer) UpdateLoadBalancer( nodes []*v1.Node, ) error { - // Skip if the service is not supported by this CCM + // Detect configuration issues and abort if they are found serviceInfo := newServiceInfo(service, clusterName) - if supported, err := serviceInfo.isSupported(); !supported { + if err := l.ensureValidConfig(ctx, serviceInfo); err != nil { return err } @@ -388,9 +418,9 @@ func (l *loadbalancer) EnsureLoadBalancerDeleted( service *v1.Service, ) error { - // Skip if the service is not supported by this CCM + // Detect configuration issues and abort if they are found serviceInfo := newServiceInfo(service, clusterName) - if supported, err := serviceInfo.isSupported(); !supported { + if err := l.ensureValidConfig(ctx, serviceInfo); err != nil { return err } @@ -402,6 +432,120 @@ func (l *loadbalancer) EnsureLoadBalancerDeleted( }) } +// ensureValidConfig ensures that the configuration can be applied at all, +// acting as a gate that ensures certain invariants before any changes are +// made. +// +// The general idea is that it's better to not make any chanages if the config +// is bad, rather than throwing errors later when some changes have already +// been made. +func (l *loadbalancer) ensureValidConfig( + ctx context.Context, serviceInfo *serviceInfo) error { + + // Skip if the service is not supported by this CCM + if supported, err := serviceInfo.isSupported(); !supported { + return err + } + + // If Floating IPs are used, make sure there are no conflicting + // assignment across services. + ips, err := l.findIPsAssignedElsewhere(ctx, serviceInfo) + if err != nil { + return fmt.Errorf("could not parse %s", LoadBalancerFloatingIPs) + } + + if len(ips) > 0 { + + info := make([]string, 0, len(ips)) + for ip, service := range ips { + info = append(info, fmt.Sprintf("%s->%s", ip, service)) + } + + return fmt.Errorf( + "at least one Floating IP assigned to service %s is also "+ + "assigned to another service. Refusing to continue to avoid "+ + "flapping: %s", + serviceInfo.Service.Name, + strings.Join(info, ", "), + ) + } + + return nil +} + +// findIPsAssignedElsewhere lists other services and compares their Floating +// IPs with the ones found on the given service. If an IP is found to be +// assigned to two services, the IP and the name of the service are returned. +func (l *loadbalancer) findIPsAssignedElsewhere( + ctx context.Context, serviceInfo *serviceInfo) (map[string]string, error) { + + ips, err := serviceInfo.annotationList(LoadBalancerFloatingIPs) + if err != nil { + return nil, err + } + + if len(ips) == 0 { + return nil, nil + } + + conflicts := make(map[string]string, 0) + + // Unfortuantely, there's no way to filter for the services that matter + // here. The only available field selectors for services are + // `metadata.name` and `metadata.namespace`. + // + // To support larger clusters, ensure to not load all services in a + // single call. + opts := metav1.ListOptions{ + Continue: "", + Limit: 250, + } + + svcs := l.k8s.CoreV1().Services("") + for { + services, err := svcs.List(ctx, opts) + if err != nil { + return nil, fmt.Errorf("failed to retrieve services: %w", err) + } + + for _, service := range services.Items { + if service.Spec.Type != "LoadBalancer" { + continue + } + if service.UID == serviceInfo.Service.UID { + continue + } + + otherInfo := newServiceInfo(&service, serviceInfo.clusterName) + other, err := otherInfo.annotationList(LoadBalancerFloatingIPs) + + // Ignore errors loading the IPs of other services, as they would + // not be configured either, if the current service is otherwise + // okay, it should be able to continue. + // + // If this is not done, a single configuration error on a service + // causes this function to err on all other services. + if err != nil { + continue + } + + for _, ip := range other { + if slices.Contains(ips, ip) { + conflicts[ip] = service.Name + } + } + } + + if services.Continue == "" { + break + } + + opts.Continue = services.Continue + } + + return conflicts, nil +} + // loadBalancerStatus generates the v1.LoadBalancerStatus for the given // loadbalancer, as required by Kubernetes. func loadBalancerStatus(lb *cloudscale.LoadBalancer) *v1.LoadBalancerStatus { diff --git a/pkg/cloudscale_ccm/reconcile.go b/pkg/cloudscale_ccm/reconcile.go index 97c210b..e13dd3f 100644 --- a/pkg/cloudscale_ccm/reconcile.go +++ b/pkg/cloudscale_ccm/reconcile.go @@ -32,6 +32,9 @@ type lbState struct { // necessarily bound to any given pool. listeners map[*cloudscale.LoadBalancerPool][]cloudscale. LoadBalancerListener + + // The assigned floating IPs + floatingIPs []string } func newLbState(lb *cloudscale.LoadBalancer) *lbState { @@ -44,6 +47,7 @@ func newLbState(lb *cloudscale.LoadBalancer) *lbState { map[*cloudscale.LoadBalancerPool][]cloudscale.LoadBalancerHealthMonitor), listeners: make( map[*cloudscale.LoadBalancerPool][]cloudscale.LoadBalancerListener), + floatingIPs: make([]string, 0), } } @@ -104,6 +108,14 @@ func desiredLbState( }, }) + // Get list of floating IPs if possible + ips, err := serviceInfo.annotationList(LoadBalancerFloatingIPs) + if err != nil { + return nil, fmt.Errorf("could not parse %s", LoadBalancerFloatingIPs) + } + + s.floatingIPs = ips + // Each service port gets its own pool algorithm := serviceInfo.annotation(LoadBalancerPoolAlgorithm) protocol := serviceInfo.annotation(LoadBalancerPoolProtocol) @@ -297,6 +309,19 @@ func actualLbState( s.listeners[nil] = append(s.listeners[nil], l) } + // Find all floating IPs assigned to the loadbalancer + ips, err := l.client.FloatingIPs.List(ctx) + if err != nil { + return nil, fmt.Errorf( + "lbstate: failed to load floating ips: %w", err) + } + + for _, ip := range ips { + if ip.LoadBalancer != nil && ip.LoadBalancer.UUID == lb.UUID { + s.floatingIPs = append(s.floatingIPs, ip.Network) + } + } + return s, nil } @@ -673,6 +698,20 @@ func nextLbActions( } } + // Find the Floating IPs that need to be changed + _, assign := compare.Diff( + desired.floatingIPs, actual.floatingIPs, func(ip string) string { + return ip + }, + ) + + for _, ip := range assign { + next = append(next, actions.AssignFloatingIP( + ip, + actual.lb.UUID, + )) + } + return next, nil } diff --git a/pkg/cloudscale_ccm/reconcile_test.go b/pkg/cloudscale_ccm/reconcile_test.go index 952b3b6..aaef175 100644 --- a/pkg/cloudscale_ccm/reconcile_test.go +++ b/pkg/cloudscale_ccm/reconcile_test.go @@ -231,6 +231,9 @@ func TestActualState(t *testing.T) { }, }, ) + server.On("/v1/floating-ips", 200, + []cloudscale.FloatingIP{}, + ) server.Start() defer server.Close() diff --git a/pkg/cloudscale_ccm/service_info.go b/pkg/cloudscale_ccm/service_info.go index 7e31935..10f0603 100644 --- a/pkg/cloudscale_ccm/service_info.go +++ b/pkg/cloudscale_ccm/service_info.go @@ -86,6 +86,8 @@ func (s serviceInfo) annotation(key string) string { return s.annotationOrDefault(key, "lb-standard") case LoadBalancerVIPAddresses: return s.annotationOrDefault(key, "[]") + case LoadBalancerFloatingIPs: + return s.annotationOrDefault(key, "[]") case LoadBalancerPoolAlgorithm: return s.annotationOrDefault(key, "round_robin") case LoadBalancerHealthMonitorDelayS: diff --git a/pkg/internal/actions/actions.go b/pkg/internal/actions/actions.go index c959e7a..c831ed3 100644 --- a/pkg/internal/actions/actions.go +++ b/pkg/internal/actions/actions.go @@ -562,3 +562,31 @@ func (a *UpdateMonitorNumberAction) Run( return ProceedOnSuccess( client.LoadBalancerHealthMonitors.Update(ctx, a.monitorUUID, &req)) } + +// AssignFloatingIP assigns a Floating IP to the given LoadBalancer UUID +type AssignFloatingIPAction struct { + ip string + lbUUID string +} + +func AssignFloatingIP(ip string, lbUUID string) Action { + return &AssignFloatingIPAction{ip: ip, lbUUID: lbUUID} +} + +func (a *AssignFloatingIPAction) Label() string { + return fmt.Sprintf("assign-floating-ip(%s -> %s)", a.ip, a.lbUUID) +} + +func (a *AssignFloatingIPAction) Run( + ctx context.Context, client *cloudscale.Client) (Control, error) { + + ip := strings.SplitN(a.ip, "/", 2)[0] + + err := client.FloatingIPs.Update( + ctx, ip, &cloudscale.FloatingIPUpdateRequest{ + LoadBalancer: a.lbUUID, + }, + ) + + return ProceedOnSuccess(err) +} diff --git a/pkg/internal/actions/actions_test.go b/pkg/internal/actions/actions_test.go index d0608a8..32e17ea 100644 --- a/pkg/internal/actions/actions_test.go +++ b/pkg/internal/actions/actions_test.go @@ -514,3 +514,24 @@ func TestUpdateMonitorNumberAction(t *testing.T) { assert.Error(t, err) assert.Equal(t, Errored, v) } + +func TestAssignFloatingIPAction(t *testing.T) { + server := testkit.NewMockAPIServer() + server.On("/v1/floating-ips/100.1.1.1", 201, "{}") + server.Start() + defer server.Close() + + action := AssignFloatingIP( + "100.1.1.1/24", "00000000-0000-0000-0000-000000000000") + + assert.NotEmpty(t, action.Label()) + v, err := action.Run(context.Background(), server.Client()) + + assert.NoError(t, err) + assert.Equal(t, Proceed, v) + + var sent cloudscale.FloatingIPUpdateRequest + server.LastSent(&sent) + + assert.Equal(t, "00000000-0000-0000-0000-000000000000", sent.LoadBalancer) +} diff --git a/pkg/internal/integration/main_test.go b/pkg/internal/integration/main_test.go index bbd4dde..07bd6f5 100644 --- a/pkg/internal/integration/main_test.go +++ b/pkg/internal/integration/main_test.go @@ -7,6 +7,7 @@ import ( "fmt" "log" "math/rand" + "net/http" "os" "testing" "time" @@ -103,7 +104,72 @@ func (s *IntegrationTestSuite) SetupTest() { } } +func (s *IntegrationTestSuite) Region() string { + return s.Nodes()[0].Labels["topology.kubernetes.io/region"] +} + +func (s *IntegrationTestSuite) CreateGlobalFloatingIP() ( + *cloudscale.FloatingIP, error) { + + ip, err := s.api.FloatingIPs.Create( + context.Background(), &cloudscale.FloatingIPCreateRequest{ + IPVersion: 4, + Type: "global", + }, + ) + + if err != nil { + return nil, fmt.Errorf("failed to create Floating IP: %s", err) + } + + s.resources = append(s.resources, ip.HREF) + + return ip, nil +} + +func (s *IntegrationTestSuite) CreateRegionalFloatingIP(region string) ( + *cloudscale.FloatingIP, error) { + + ip, err := s.api.FloatingIPs.Create( + context.Background(), &cloudscale.FloatingIPCreateRequest{ + IPVersion: 4, + Type: "regional", + RegionalResourceRequest: cloudscale.RegionalResourceRequest{ + Region: region, + }, + }, + ) + + if err != nil { + return nil, fmt.Errorf("failed to create Floating IP: %s", err) + } + + s.resources = append(s.resources, ip.HREF) + + return ip, nil +} + func (s *IntegrationTestSuite) TearDownTest() { + errors := 0 + + if s.resources != nil { + for _, url := range s.resources { + req, err := s.api.NewRequest( + context.Background(), http.MethodDelete, url, nil) + if err != nil { + s.T().Logf("preparing to delete %s failed: %s", url, err) + errors++ + } + + err = s.api.Do(context.Background(), req, nil) + if err != nil { + s.T().Logf("deleting %s failed: %s", url, err) + errors++ + } + } + } + s.resources = nil + err := s.k8s.CoreV1().Namespaces().Delete( context.Background(), s.ns, @@ -111,7 +177,12 @@ func (s *IntegrationTestSuite) TearDownTest() { ) if err != nil { - panic(fmt.Sprintf("could not delete namespace %s: %s", s.ns, err)) + s.T().Logf("could not delete namespace %s: %s", s.ns, err) + errors++ + } + + if errors > 0 { + panic(fmt.Sprintf("failed cleanup test: %d errors", errors)) } s.ns = "" diff --git a/pkg/internal/integration/service_test.go b/pkg/internal/integration/service_test.go index 8be9fe8..aecfaa9 100644 --- a/pkg/internal/integration/service_test.go +++ b/pkg/internal/integration/service_test.go @@ -14,6 +14,7 @@ import ( "github.com/cloudscale-ch/cloudscale-cloud-controller-manager/pkg/cloudscale_ccm" "github.com/cloudscale-ch/cloudscale-cloud-controller-manager/pkg/internal/kubeutil" "github.com/cloudscale-ch/cloudscale-cloud-controller-manager/pkg/internal/testkit" + cloudscale "github.com/cloudscale-ch/cloudscale-go-sdk/v4" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -154,9 +155,13 @@ func (s *IntegrationTestSuite) AwaitServiceReady( service = s.ServiceNamed(name) s.Require().NotNil(service) - if len(service.Status.LoadBalancer.Ingress) >= 1 { - return service + if service.Annotations != nil { + uuid := service.Annotations["k8s.cloudscale.ch/loadbalancer-uuid"] + if uuid != "" { + return service + } } + time.Sleep(1 * time.Second) } @@ -169,14 +174,14 @@ func (s *IntegrationTestSuite) TestServiceEndToEnd() { start := time.Now() // Deploy a TCP server that returns the hostname - s.T().Log("Creating hostname deployment") + s.T().Log("Creating nginx deployment") s.CreateDeployment("nginx", "nginxdemos/hello:plain-text", 2, 80) // Expose the deployment using a LoadBalancer service s.ExposeDeployment("nginx", 80, 80, nil) // Wait for the service to be ready - s.T().Log("Waiting for hostname service to be ready") + s.T().Log("Waiting for nginx service to be ready") service := s.AwaitServiceReady("nginx", 180*time.Second) s.Require().NotNil(service) @@ -380,3 +385,111 @@ func (s *IntegrationTestSuite) TestServiceTrafficPolicyLocal() { assertPrefix(addr, &cluster_policy_prefix) assertFastResponses(addr, &cluster_policy_prefix) } + +func (s *IntegrationTestSuite) TestServiceWithGlobalFloatingIP() { + global, err := s.CreateGlobalFloatingIP() + s.Require().NoError(err) + s.RunTestServiceWithFloatingIP(global) +} + +func (s *IntegrationTestSuite) TestServiceWithRegionalFloatingIP() { + regional, err := s.CreateRegionalFloatingIP(s.Region()) + s.Require().NoError(err) + s.RunTestServiceWithFloatingIP(regional) +} + +func (s *IntegrationTestSuite) RunTestServiceWithFloatingIP( + fip *cloudscale.FloatingIP) { + + // Deploy a TCP server that returns the hostname + s.T().Log("Creating nginx deployment") + s.CreateDeployment("nginx", "nginxdemos/hello:plain-text", 2, 80) + + // Expose the deployment using a LoadBalancer service with Floating IP + s.ExposeDeployment("nginx", 80, 80, map[string]string{ + "k8s.cloudscale.ch/loadbalancer-floating-ips": fmt.Sprintf( + `["%s"]`, fip.Network), + }) + + // Wait for the service to be ready + s.T().Log("Waiting for nginx service to be ready") + service := s.AwaitServiceReady("nginx", 180*time.Second) + s.Require().NotNil(service) + + // Ensure that we get responses from two different pods (round-robin) + s.T().Log("Verifying hostname service responses") + addr := strings.SplitN(fip.Network, "/", 2)[0] + responses := make(map[string]int) + errors := 0 + bound := false + + for i := 0; i < 100; i++ { + response, err := testkit.HelloNginx(addr, 80) + + // The first 25 requests may err, as the Floating IP has to propagate + if err != nil && !bound { + continue + } + + if err == nil && !bound { + s.Assert().LessOrEqual(errors, 25) + bound = true + errors = 0 + } + + if err != nil { + s.T().Logf("Request %d failed: %s", i, err) + errors++ + } + + if response != nil { + s.Assert().NotEmpty(response.ServerName) + responses[response.ServerName]++ + } + + time.Sleep(5 * time.Millisecond) + } + + // Allow for errors, which occurs maybe once in the first 100 requests + // to a service, and which does not occur anymore later (even when + // running for a long time). + s.Assert().LessOrEqual(errors, 1) + s.Assert().Len(responses, 2) +} + +func (s *IntegrationTestSuite) TestFloatingIPConflicts() { + + // Create a regional floating IP + regional, err := s.CreateRegionalFloatingIP(s.Region()) + s.Require().NoError(err) + + // Deploy a TCP server that returns the hostname + s.T().Log("Creating nginx deployment") + s.CreateDeployment("nginx", "nginxdemos/hello:plain-text", 2, 80) + + // Expose the deployment using a LoadBalancer service with Floating IP + s.ExposeDeployment("nginx", 80, 80, map[string]string{ + "k8s.cloudscale.ch/loadbalancer-floating-ips": fmt.Sprintf( + `["%s"]`, regional.Network), + }) + + // Wait for the service to be ready + s.T().Log("Waiting for nginx service to be ready") + service := s.AwaitServiceReady("nginx", 180*time.Second) + s.Require().NotNil(service) + + // Configure a second service with the same floating IP + start := time.Now() + + s.ExposeDeployment("service-2", 80, 80, map[string]string{ + "k8s.cloudscale.ch/loadbalancer-floating-ips": fmt.Sprintf( + `["%s"]`, regional.Network), + }) + + // Wait for a moment before checking the log + time.Sleep(5 * time.Second) + + // Ensure the conflict was detected + lines := s.CCMLogs(start) + s.Assert().Contains(lines, "assigned to another service") +}