From abaea218a7bb60bff79babaf3ea6a5af0d9697b6 Mon Sep 17 00:00:00 2001 From: Brad Davidson Date: Mon, 8 Apr 2024 18:04:27 +0000 Subject: [PATCH] Improve etcd load-balancer startup behavior Prefer the address of the etcd member being joined, and seed the full address list immediately on startup. Signed-off-by: Brad Davidson (cherry picked from commit 7d9abc9f07cb5e9f4485b994b1e0f156250664e5) --- pkg/cluster/cluster.go | 18 ++++++++++-- pkg/cluster/managed.go | 36 ++++++++++++------------ pkg/etcd/etcd.go | 62 ++++++++++++++++++++++++++++++------------ 3 files changed, 78 insertions(+), 38 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index e970b49eb6ea..66f98ca165d0 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -43,23 +43,37 @@ func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) { ready := make(chan struct{}) defer close(ready) - // try to get /db/info urls first before attempting to use join url + // try to get /db/info urls first, for a current list of etcd cluster member client URLs clientURLs, _, err := etcd.ClientURLs(ctx, c.clientAccessInfo, c.config.PrivateIP) if err != nil { return nil, err } - if len(clientURLs) < 1 { + // If we somehow got no error but also no client URLs, just use the address of the server we're joining + if len(clientURLs) == 0 { clientURL, err := url.Parse(c.config.JoinURL) if err != nil { return nil, err } clientURL.Host = clientURL.Hostname() + ":2379" clientURLs = append(clientURLs, clientURL.String()) + logrus.Warnf("Got empty etcd ClientURL list; using server URL %s", clientURL) } etcdProxy, err := etcd.NewETCDProxy(ctx, c.config.SupervisorPort, c.config.DataDir, clientURLs[0], utilsnet.IsIPv6CIDR(c.config.ServiceIPRanges[0])) if err != nil { return nil, err } + // immediately update the load balancer with all etcd addresses + // client URLs are a full URI, but the proxy only wants host:port + for i, c := range clientURLs { + u, err := url.Parse(c) + if err != nil { + return nil, errors.Wrap(err, "failed to parse etcd ClientURL") + } + clientURLs[i] = u.Host + } + etcdProxy.Update(clientURLs) + + // start periodic endpoint sync goroutine c.setupEtcdProxy(ctx, etcdProxy) // remove etcd member if it exists diff --git a/pkg/cluster/managed.go b/pkg/cluster/managed.go index dcf140e57758..d6c668998a8a 100644 --- a/pkg/cluster/managed.go +++ b/pkg/cluster/managed.go @@ -17,6 +17,7 @@ import ( "github.com/k3s-io/k3s/pkg/version" "github.com/sirupsen/logrus" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/wait" ) // testClusterDB returns a channel that will be closed when the datastore connection is available. @@ -132,28 +133,25 @@ func (c *Cluster) setupEtcdProxy(ctx context.Context, etcdProxy etcd.Proxy) { if c.managedDB == nil { return } - go func() { - t := time.NewTicker(30 * time.Second) - defer t.Stop() - for range t.C { - newAddresses, err := c.managedDB.GetMembersClientURLs(ctx) + // We use Poll here instead of Until because we want to wait the interval before running the function. + go wait.PollUntilWithContext(ctx, 30*time.Second, func(ctx context.Context) (bool, error) { + clientURLs, err := c.managedDB.GetMembersClientURLs(ctx) + if err != nil { + logrus.Warnf("Failed to get etcd ClientURLs: %v", err) + return false, nil + } + // client URLs are a full URI, but the proxy only wants host:port + for i, c := range clientURLs { + u, err := url.Parse(c) if err != nil { - logrus.Warnf("Failed to get etcd client URLs: %v", err) - continue + logrus.Warnf("Failed to parse etcd ClientURL: %v", err) + return false, nil } - // client URLs are a full URI, but the proxy only wants host:port - var hosts []string - for _, address := range newAddresses { - u, err := url.Parse(address) - if err != nil { - logrus.Warnf("Failed to parse etcd client URL: %v", err) - continue - } - hosts = append(hosts, u.Host) - } - etcdProxy.Update(hosts) + clientURLs[i] = u.Host } - }() + etcdProxy.Update(clientURLs) + return false, nil + }) } // deleteNodePasswdSecret wipes out the node password secret after restoration diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index 603828a94b83..cd29de505473 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -725,7 +725,6 @@ func getClientConfig(ctx context.Context, control *config.Control, endpoints ... DialTimeout: defaultDialTimeout, DialKeepAliveTime: defaultKeepAliveTime, DialKeepAliveTimeout: defaultKeepAliveTimeout, - AutoSyncInterval: defaultKeepAliveTimeout, PermitWithoutStream: true, } @@ -1387,35 +1386,50 @@ func (e *ETCD) defragment(ctx context.Context) error { // The list is retrieved from the remote server that is being joined. func ClientURLs(ctx context.Context, clientAccessInfo *clientaccess.Info, selfIP string) ([]string, Members, error) { var memberList Members - resp, err := clientAccessInfo.Get("/db/info") + + // find the address advertised for our own client URL, so that we don't connect to ourselves + ip, err := getAdvertiseAddress(selfIP) if err != nil { - return nil, memberList, &MemberListError{Err: err} + return nil, memberList, err } - if err := json.Unmarshal(resp, &memberList); err != nil { + // find the client URL of the server we're joining, so we can prioritize it + joinURL, err := url.Parse(clientAccessInfo.BaseURL) + if err != nil { return nil, memberList, err } - ip, err := getAdvertiseAddress(selfIP) + + // get the full list from the server we're joining + resp, err := clientAccessInfo.Get("/db/info") if err != nil { + return nil, memberList, &MemberListError{Err: err} + } + if err := json.Unmarshal(resp, &memberList); err != nil { return nil, memberList, err } + + // Build a list of client URLs. Learners and the current node are excluded; + // the server we're joining is listed first if found. var clientURLs []string -members: for _, member := range memberList.Members { - // excluding learner member from the client list - if member.IsLearner { - continue - } + var isSelf, isPreferred bool for _, clientURL := range member.ClientURLs { - u, err := url.Parse(clientURL) - if err != nil { - continue + if u, err := url.Parse(clientURL); err == nil { + switch u.Hostname() { + case ip: + isSelf = true + case joinURL.Hostname(): + isPreferred = true + } } - if u.Hostname() == ip { - continue members + } + if !member.IsLearner && !isSelf { + if isPreferred { + clientURLs = append(member.ClientURLs, clientURLs...) + } else { + clientURLs = append(clientURLs, member.ClientURLs...) } } - clientURLs = append(clientURLs, member.ClientURLs...) } return clientURLs, memberList, nil } @@ -1545,7 +1559,21 @@ func GetAPIServerURLsFromETCD(ctx context.Context, cfg *config.Control) ([]strin // GetMembersClientURLs will list through the member lists in etcd and return // back a combined list of client urls for each member in the cluster func (e *ETCD) GetMembersClientURLs(ctx context.Context) ([]string, error) { - return e.client.Endpoints(), nil + ctx, cancel := context.WithTimeout(ctx, testTimeout) + defer cancel() + + members, err := e.client.MemberList(ctx) + if err != nil { + return nil, err + } + + var clientURLs []string + for _, member := range members.Members { + if !member.IsLearner { + clientURLs = append(clientURLs, member.ClientURLs...) + } + } + return clientURLs, nil } // GetMembersNames will list through the member lists in etcd and return