Skip to content

Commit

Permalink
Improve etcd load-balancer startup behavior
Browse files Browse the repository at this point in the history
Prefer the address of the etcd member being joined, and seed the full address list immediately on startup.

Signed-off-by: Brad Davidson <[email protected]>
(cherry picked from commit 7d9abc9)
  • Loading branch information
brandond committed Apr 10, 2024
1 parent ddd9ab7 commit abaea21
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 38 deletions.
18 changes: 16 additions & 2 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,37 @@ func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) {
ready := make(chan struct{})
defer close(ready)

// try to get /db/info urls first before attempting to use join url
// try to get /db/info urls first, for a current list of etcd cluster member client URLs
clientURLs, _, err := etcd.ClientURLs(ctx, c.clientAccessInfo, c.config.PrivateIP)
if err != nil {
return nil, err
}
if len(clientURLs) < 1 {
// If we somehow got no error but also no client URLs, just use the address of the server we're joining
if len(clientURLs) == 0 {
clientURL, err := url.Parse(c.config.JoinURL)
if err != nil {
return nil, err
}
clientURL.Host = clientURL.Hostname() + ":2379"
clientURLs = append(clientURLs, clientURL.String())
logrus.Warnf("Got empty etcd ClientURL list; using server URL %s", clientURL)
}
etcdProxy, err := etcd.NewETCDProxy(ctx, c.config.SupervisorPort, c.config.DataDir, clientURLs[0], utilsnet.IsIPv6CIDR(c.config.ServiceIPRanges[0]))
if err != nil {
return nil, err
}
// immediately update the load balancer with all etcd addresses
// client URLs are a full URI, but the proxy only wants host:port
for i, c := range clientURLs {
u, err := url.Parse(c)
if err != nil {
return nil, errors.Wrap(err, "failed to parse etcd ClientURL")
}
clientURLs[i] = u.Host
}
etcdProxy.Update(clientURLs)

// start periodic endpoint sync goroutine
c.setupEtcdProxy(ctx, etcdProxy)

// remove etcd member if it exists
Expand Down
36 changes: 17 additions & 19 deletions pkg/cluster/managed.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/k3s-io/k3s/pkg/version"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
)

// testClusterDB returns a channel that will be closed when the datastore connection is available.
Expand Down Expand Up @@ -132,28 +133,25 @@ func (c *Cluster) setupEtcdProxy(ctx context.Context, etcdProxy etcd.Proxy) {
if c.managedDB == nil {
return
}
go func() {
t := time.NewTicker(30 * time.Second)
defer t.Stop()
for range t.C {
newAddresses, err := c.managedDB.GetMembersClientURLs(ctx)
// We use Poll here instead of Until because we want to wait the interval before running the function.
go wait.PollUntilWithContext(ctx, 30*time.Second, func(ctx context.Context) (bool, error) {
clientURLs, err := c.managedDB.GetMembersClientURLs(ctx)
if err != nil {
logrus.Warnf("Failed to get etcd ClientURLs: %v", err)
return false, nil
}
// client URLs are a full URI, but the proxy only wants host:port
for i, c := range clientURLs {
u, err := url.Parse(c)
if err != nil {
logrus.Warnf("Failed to get etcd client URLs: %v", err)
continue
logrus.Warnf("Failed to parse etcd ClientURL: %v", err)
return false, nil
}
// client URLs are a full URI, but the proxy only wants host:port
var hosts []string
for _, address := range newAddresses {
u, err := url.Parse(address)
if err != nil {
logrus.Warnf("Failed to parse etcd client URL: %v", err)
continue
}
hosts = append(hosts, u.Host)
}
etcdProxy.Update(hosts)
clientURLs[i] = u.Host
}
}()
etcdProxy.Update(clientURLs)
return false, nil
})
}

// deleteNodePasswdSecret wipes out the node password secret after restoration
Expand Down
62 changes: 45 additions & 17 deletions pkg/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,6 @@ func getClientConfig(ctx context.Context, control *config.Control, endpoints ...
DialTimeout: defaultDialTimeout,
DialKeepAliveTime: defaultKeepAliveTime,
DialKeepAliveTimeout: defaultKeepAliveTimeout,
AutoSyncInterval: defaultKeepAliveTimeout,
PermitWithoutStream: true,
}

Expand Down Expand Up @@ -1387,35 +1386,50 @@ func (e *ETCD) defragment(ctx context.Context) error {
// The list is retrieved from the remote server that is being joined.
func ClientURLs(ctx context.Context, clientAccessInfo *clientaccess.Info, selfIP string) ([]string, Members, error) {
var memberList Members
resp, err := clientAccessInfo.Get("/db/info")

// find the address advertised for our own client URL, so that we don't connect to ourselves
ip, err := getAdvertiseAddress(selfIP)
if err != nil {
return nil, memberList, &MemberListError{Err: err}
return nil, memberList, err
}

if err := json.Unmarshal(resp, &memberList); err != nil {
// find the client URL of the server we're joining, so we can prioritize it
joinURL, err := url.Parse(clientAccessInfo.BaseURL)
if err != nil {
return nil, memberList, err
}
ip, err := getAdvertiseAddress(selfIP)

// get the full list from the server we're joining
resp, err := clientAccessInfo.Get("/db/info")
if err != nil {
return nil, memberList, &MemberListError{Err: err}
}
if err := json.Unmarshal(resp, &memberList); err != nil {
return nil, memberList, err
}

// Build a list of client URLs. Learners and the current node are excluded;
// the server we're joining is listed first if found.
var clientURLs []string
members:
for _, member := range memberList.Members {
// excluding learner member from the client list
if member.IsLearner {
continue
}
var isSelf, isPreferred bool
for _, clientURL := range member.ClientURLs {
u, err := url.Parse(clientURL)
if err != nil {
continue
if u, err := url.Parse(clientURL); err == nil {
switch u.Hostname() {
case ip:
isSelf = true
case joinURL.Hostname():
isPreferred = true
}
}
if u.Hostname() == ip {
continue members
}
if !member.IsLearner && !isSelf {
if isPreferred {
clientURLs = append(member.ClientURLs, clientURLs...)
} else {
clientURLs = append(clientURLs, member.ClientURLs...)
}
}
clientURLs = append(clientURLs, member.ClientURLs...)
}
return clientURLs, memberList, nil
}
Expand Down Expand Up @@ -1545,7 +1559,21 @@ func GetAPIServerURLsFromETCD(ctx context.Context, cfg *config.Control) ([]strin
// GetMembersClientURLs will list through the member lists in etcd and return
// back a combined list of client urls for each member in the cluster
func (e *ETCD) GetMembersClientURLs(ctx context.Context) ([]string, error) {
return e.client.Endpoints(), nil
ctx, cancel := context.WithTimeout(ctx, testTimeout)
defer cancel()

members, err := e.client.MemberList(ctx)
if err != nil {
return nil, err
}

var clientURLs []string
for _, member := range members.Members {
if !member.IsLearner {
clientURLs = append(clientURLs, member.ClientURLs...)
}
}
return clientURLs, nil
}

// GetMembersNames will list through the member lists in etcd and return
Expand Down

0 comments on commit abaea21

Please sign in to comment.