Skip to content

Commit

Permalink
Fix issues with defragment and alarm clear on etcd startup
Browse files Browse the repository at this point in the history
* Use clientv3.NewCtxClient instead of New to avoid automatic retry of all RPCs
* Only timeout status requests; allow defrag and alarm clear requests to run to completion.
* Only clear alarms on the local cluster member, not ALL cluster members

Signed-off-by: Brad Davidson <[email protected]>
  • Loading branch information
brandond committed Oct 26, 2024
1 parent 518b88d commit 8a22fcb
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 75 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,11 @@ require (
github.com/vishvananda/netlink v1.2.1-beta.2
github.com/yl2chen/cidranger v1.0.2
go.etcd.io/etcd/api/v3 v3.5.16
go.etcd.io/etcd/client/pkg/v3 v3.5.16
go.etcd.io/etcd/client/v3 v3.5.16
go.etcd.io/etcd/etcdutl/v3 v3.5.13
go.etcd.io/etcd/server/v3 v3.5.16
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.27.0
golang.org/x/net v0.28.0
golang.org/x/sync v0.8.0
Expand Down Expand Up @@ -422,7 +424,6 @@ require (
github.com/xlab/treeprint v1.2.0 // indirect
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect
go.etcd.io/bbolt v1.3.11 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.16 // indirect
go.etcd.io/etcd/client/v2 v2.305.16 // indirect
go.etcd.io/etcd/pkg/v3 v3.5.16 // indirect
go.etcd.io/etcd/raft/v3 v3.5.16 // indirect
Expand All @@ -444,7 +445,6 @@ require (
go.uber.org/fx v1.20.1 // indirect
go.uber.org/mock v0.4.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/mod v0.20.0 // indirect
golang.org/x/oauth2 v0.21.0 // indirect
Expand Down
181 changes: 110 additions & 71 deletions pkg/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,15 @@ import (
"github.com/sirupsen/logrus"
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/pkg/v3/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/credentials"
snapshotv3 "go.etcd.io/etcd/etcdutl/v3/snapshot"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -55,7 +62,7 @@ import (
)

const (
testTimeout = time.Second * 30
statusTimeout = time.Second * 30
manageTickerTime = time.Second * 15
learnerMaxStallTime = time.Minute * 5
memberRemovalTimeout = time.Minute * 1
Expand Down Expand Up @@ -206,42 +213,44 @@ func (e *ETCD) Test(ctx context.Context) error {
return errors.New("etcd datastore is not started")
}

ctx, cancel := context.WithTimeout(ctx, testTimeout)
defer cancel()

endpoints := getEndpoints(e.config)
status, err := e.client.Status(ctx, endpoints[0])
status, err := e.status(ctx)
if err != nil {
return err
return errors.Wrap(err, "failed to get etcd status")
} else if status.IsLearner {
return errors.New("this server has not yet been promoted from learner to voting member")
} else if status.Leader == 0 {
return etcdserver.ErrNoLeader
}

if status.IsLearner {
return errors.New("this server has not yet been promoted from learner to voting member")
logrus.Infof("Connected to etcd %s - datastore using %d of %d bytes", status.Version, status.DbSizeInUse, status.DbSize)
if len(status.Errors) > 0 {
logrus.Warnf("Errors present on etcd cluster: %s", strings.Join(status.Errors, ","))
}

// defrag this node to reclaim freed space from compacted revisions
if err := e.defragment(ctx); err != nil {
return errors.Wrap(err, "failed to defragment etcd database")
}

if err := e.clearAlarms(ctx); err != nil {
return errors.Wrap(err, "failed to report and disarm etcd alarms")
// clear alarms on this node
if err := e.clearAlarms(ctx, status.Header.MemberId); err != nil {
return errors.Wrap(err, "failed to disarm etcd alarms")
}

// refresh status to see if any errors remain after clearing alarms
status, err = e.client.Status(ctx, endpoints[0])
if err != nil {
// refresh status - note that errors may remain on other nodes, but this
// should not prevent us from continuing with startup.
if status, err := e.status(ctx); err != nil {
return err
}

if len(status.Errors) > 0 {
return fmt.Errorf("etcd cluster errors: %s", strings.Join(status.Errors, ", "))
} else if len(status.Errors) > 0 {
logrus.Warnf("Errors present on etcd cluster after defragment: %s", strings.Join(status.Errors, ","))
}

members, err := e.client.MemberList(ctx)
if err != nil {
return err
}

// Ensure that there is a cluster member with our peerURL and name
var memberNameUrls []string
for _, member := range members.Members {
for _, peerURL := range member.PeerURLs {
Expand All @@ -253,6 +262,8 @@ func (e *ETCD) Test(ctx context.Context) error {
memberNameUrls = append(memberNameUrls, member.Name+"="+member.PeerURLs[0])
}
}

// no matching PeerURL on any Member, return an error that indicates what was expected vs what we found.
return &membershipError{members: memberNameUrls, self: e.name + "=" + e.peerURL()}
}

Expand Down Expand Up @@ -523,17 +534,16 @@ func (e *ETCD) startClient(ctx context.Context) error {
e.config.Datastore.BackendTLSConfig.CertFile = e.config.Runtime.ClientETCDCert
e.config.Datastore.BackendTLSConfig.KeyFile = e.config.Runtime.ClientETCDKey

client, err := getClient(ctx, e.config, endpoints...)
client, conn, err := getClient(ctx, e.config, endpoints...)
if err != nil {
return err
}
e.client = client

go func() {
<-ctx.Done()
client := e.client
e.client = nil
client.Close()
conn.Close()
}()

return nil
Expand All @@ -554,11 +564,11 @@ func (e *ETCD) join(ctx context.Context, clientAccessInfo *clientaccess.Info) er
return err
}

client, err := getClient(clientCtx, e.config, clientURLs...)
client, conn, err := getClient(clientCtx, e.config, clientURLs...)
if err != nil {
return err
}
defer client.Close()
defer conn.Close()

for _, member := range memberList.Members {
for _, peer := range member.PeerURLs {
Expand Down Expand Up @@ -725,13 +735,53 @@ func (e *ETCD) infoHandler() http.Handler {
// If the runtime config does not list any endpoints, the default endpoint is used.
// The returned client should be closed when no longer needed, in order to avoid leaking GRPC
// client goroutines.
func getClient(ctx context.Context, control *config.Control, endpoints ...string) (*clientv3.Client, error) {
func getClient(ctx context.Context, control *config.Control, endpoints ...string) (*clientv3.Client, *grpc.ClientConn, error) {
logger, err := logutil.CreateDefaultZapLogger(zapcore.DebugLevel)
if err != nil {
return nil, nil, err
}

cfg, err := getClientConfig(ctx, control, endpoints...)
if err != nil {
return nil, err
return nil, nil, err
}

// Set up dialer and resolver options.
// This is normally handled by clientv3.New() but that wraps all the GRPC
// service with retry handlers and uses deprecated grpc.DialContext() which
// tries to establish a connection even when one isn't wanted.
if cfg.DialKeepAliveTime > 0 {
params := keepalive.ClientParameters{
Time: cfg.DialKeepAliveTime,
Timeout: cfg.DialKeepAliveTimeout,
PermitWithoutStream: cfg.PermitWithoutStream,
}
cfg.DialOptions = append(cfg.DialOptions, grpc.WithKeepaliveParams(params))
}

return clientv3.New(*cfg)
if cfg.TLS != nil {
creds := credentials.NewBundle(credentials.Config{TLSConfig: cfg.TLS}).TransportCredentials()
cfg.DialOptions = append(cfg.DialOptions, grpc.WithTransportCredentials(creds))
} else {
cfg.DialOptions = append(cfg.DialOptions, grpc.WithTransportCredentials(insecure.NewCredentials()))
}

cfg.DialOptions = append(cfg.DialOptions, grpc.WithResolvers(NewSimpleResolver(cfg.Endpoints[0])))

target := fmt.Sprintf("%s://%p/%s", scheme, cfg, authority(cfg.Endpoints[0]))
conn, err := grpc.NewClient(target, cfg.DialOptions...)
if err != nil {
return nil, nil, err
}

// Create a new client and wire up the GRPC service interfaces.
// Ref: https://github.com/etcd-io/etcd/blob/v3.5.16/client/v3/client.go#L87
client := clientv3.NewCtxClient(ctx, clientv3.WithZapLogger(logger.Named(version.Program+"-etcd-client")))
client.Cluster = clientv3.NewClusterFromClusterClient(etcdserverpb.NewClusterClient(conn), client)
client.KV = clientv3.NewKVFromKVClient(etcdserverpb.NewKVClient(conn), client)
client.Maintenance = clientv3.NewMaintenanceFromMaintenanceClient(etcdserverpb.NewMaintenanceClient(conn), client)

return client, conn, nil
}

// getClientConfig generates an etcd client config connected to the specified endpoints.
Expand Down Expand Up @@ -851,11 +901,11 @@ func (e *ETCD) migrateFromSQLite(ctx context.Context) error {
}
defer sqliteClient.Close()

etcdClient, err := getClient(ctx, e.config)
etcdClient, conn, err := getClient(ctx, e.config)
if err != nil {
return err
}
defer etcdClient.Close()
defer conn.Close()

values, err := sqliteClient.List(ctx, "/registry/", 0)
if err != nil {
Expand Down Expand Up @@ -984,17 +1034,16 @@ func (e *ETCD) StartEmbeddedTemporary(ctx context.Context) error {
return errors.New("etcd datastore already started")
}

client, err := getClient(ctx, e.config)
client, conn, err := getClient(ctx, e.config)
if err != nil {
return err
}
e.client = client

go func() {
<-ctx.Done()
client := e.client
e.client = nil
client.Close()
conn.Close()
}()

if err := cp.Copy(etcdDataDir, tmpDataDir, cp.Options{PreserveOwner: true}); err != nil {
Expand Down Expand Up @@ -1251,8 +1300,6 @@ func (e *ETCD) trackLearnerProgress(ctx context.Context, progress *learnerProgre
}

func (e *ETCD) getETCDStatus(ctx context.Context, url string) (*clientv3.StatusResponse, error) {
ctx, cancel := context.WithTimeout(ctx, defaultDialTimeout)
defer cancel()
resp, err := e.client.Status(ctx, url)
if err != nil {
return resp, errors.Wrap(err, "failed to check etcd member status")
Expand Down Expand Up @@ -1363,12 +1410,10 @@ func (e *ETCD) setLearnerProgress(ctx context.Context, status *learnerProgress)
return err
}

// clearAlarms checks for any alarms on the local etcd member. If found, they are
// reported and the alarm state is cleared.
func (e *ETCD) clearAlarms(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, testTimeout)
defer cancel()

// clearAlarms checks for any NOSPACE alarms on the local etcd member.
// If found, they are reported and the alarm state is cleared.
// Other alarm types are not handled.
func (e *ETCD) clearAlarms(ctx context.Context, memberID uint64) error {
if e.client == nil {
return errors.New("etcd client was nil")
}
Expand All @@ -1379,22 +1424,37 @@ func (e *ETCD) clearAlarms(ctx context.Context) error {
}

for _, alarm := range alarmList.Alarms {
logrus.Warnf("Alarm on etcd member %d: %s", alarm.MemberID, alarm.Alarm)
}

if len(alarmList.Alarms) > 0 {
if _, err := e.client.AlarmDisarm(ctx, &clientv3.AlarmMember{}); err != nil {
return fmt.Errorf("etcd alarm disarm failed: %v", err)
if alarm.MemberID != memberID {
// ignore alarms on other cluster members, they should manage their own problems
continue
}
if alarm.Alarm == etcdserverpb.AlarmType_NOSPACE {
if _, err := e.client.AlarmDisarm(ctx, &clientv3.AlarmMember{MemberID: alarm.MemberID, Alarm: alarm.Alarm}); err != nil {
return fmt.Errorf("%s disarm failed: %v", alarm.Alarm, err)
}
logrus.Infof("%s disarmed successfully", alarm.Alarm)
} else {
return fmt.Errorf("%s alarm must be disarmed manually", alarm.Alarm)
}
logrus.Infof("Alarms disarmed on etcd server")
}
return nil
}

func (e *ETCD) defragment(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, testTimeout)
// status returns status using the first etcd endpoint.
func (e *ETCD) status(ctx context.Context) (*clientv3.StatusResponse, error) {
if e.client == nil {
return nil, errors.New("etcd client was nil")
}

ctx, cancel := context.WithTimeout(ctx, statusTimeout)
defer cancel()

endpoints := getEndpoints(e.config)
return e.client.Status(ctx, endpoints[0])
}

// defragment defragments the etcd datastore using the first etcd endpoint
func (e *ETCD) defragment(ctx context.Context) error {
if e.client == nil {
return errors.New("etcd client was nil")
}
Expand Down Expand Up @@ -1550,11 +1610,11 @@ func backupDirWithRetention(dir string, maxBackupRetention int) (string, error)
// GetAPIServerURLsFromETCD will try to fetch the version.Program/apiaddresses key from etcd
// and unmarshal it to a list of apiserver endpoints.
func GetAPIServerURLsFromETCD(ctx context.Context, cfg *config.Control) ([]string, error) {
cl, err := getClient(ctx, cfg)
cl, conn, err := getClient(ctx, cfg)
if err != nil {
return nil, err
}
defer cl.Close()
defer conn.Close()

etcdResp, err := cl.KV.Get(ctx, AddressKey)
if err != nil {
Expand All @@ -1576,9 +1636,6 @@ func GetAPIServerURLsFromETCD(ctx context.Context, cfg *config.Control) ([]strin
// GetMembersClientURLs will list through the member lists in etcd and return
// back a combined list of client urls for each member in the cluster
func (e *ETCD) GetMembersClientURLs(ctx context.Context) ([]string, error) {
ctx, cancel := context.WithTimeout(ctx, testTimeout)
defer cancel()

members, err := e.client.MemberList(ctx)
if err != nil {
return nil, err
Expand All @@ -1593,24 +1650,6 @@ func (e *ETCD) GetMembersClientURLs(ctx context.Context) ([]string, error) {
return clientURLs, nil
}

// GetMembersNames will list through the member lists in etcd and return
// back a combined list of member names
func (e *ETCD) GetMembersNames(ctx context.Context) ([]string, error) {
ctx, cancel := context.WithTimeout(ctx, testTimeout)
defer cancel()

members, err := e.client.MemberList(ctx)
if err != nil {
return nil, err
}

var memberNames []string
for _, member := range members.Members {
memberNames = append(memberNames, member.Name)
}
return memberNames, nil
}

// RemoveSelf will remove the member if it exists in the cluster. This should
// only be called on a node that may have previously run etcd, but will not
// currently run etcd, to ensure that it is not a member of the cluster.
Expand Down
Loading

0 comments on commit 8a22fcb

Please sign in to comment.