Skip to content
This repository has been archived by the owner on Feb 9, 2024. It is now read-only.

Commit

Permalink
[6.1.x] use etcd quorum reads for querying the operation plan (#2144)
Browse files Browse the repository at this point in the history
* Only refuse to use etcd if itis been explicitly disabled on a node (e.g.
during etcd upgrades) - otherwise use exponential backoff in case of
transient errors.

* Reverse the etcd condition - check for disabled instead as enabled has many interpretations

* Refine service disabled status detection

* Extend etcd backend implementation to enable setting etcd-specific
configuration for reads to be able to do consistent reads when querying
operation plan during upgrades.

* Actually use the quorum reads in the reconciler

* Address review comments
  • Loading branch information
a-palchikov authored Sep 30, 2020
1 parent 8d030d5 commit 0ff7f47
Show file tree
Hide file tree
Showing 15 changed files with 176 additions and 63 deletions.
1 change: 1 addition & 0 deletions lib/fsm/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func FormatOperationPlanJSON(w io.Writer, plan storage.OperationPlan) error {
func FormatOperationPlanText(w io.Writer, plan storage.OperationPlan) {
var t tabwriter.Writer
t.Init(w, 0, 10, 5, ' ', 0)
fmt.Fprintln(&t, "Operation: ", OperationString(plan))
common.PrintTableHeader(&t, []string{"Phase", "Description", "State", "Node", "Requires", "Updated"})
for _, phase := range plan.Phases {
printPhase(&t, phase, 0)
Expand Down
21 changes: 11 additions & 10 deletions lib/fsm/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,10 +488,10 @@ func (f *FSM) executeOnePhase(ctx context.Context, p Params, phase storage.Opera
if err != nil {
return trace.Wrap(err)
}

logger := executor.WithField("phase", phase.ID)
err = executor.PreCheck(ctx)
if err != nil {
executor.Errorf("Phase precheck failed: %v.", err)
logger.WithError(err).Error("Phase precheck failed.")
return trace.Wrap(err)
}

Expand All @@ -504,11 +504,11 @@ func (f *FSM) executeOnePhase(ctx context.Context, p Params, phase storage.Opera
return trace.Wrap(err)
}

executor.Infof("Executing phase: %v.", phase.ID)
logger.Info("Executing phase.")

err = executor.Execute(ctx)
if err != nil {
executor.Errorf("Phase execution failed: %v.", err)
logger.WithError(err).Error("Phase execution failed.")
if err := f.ChangePhaseState(ctx,
StateChange{
Phase: phase.ID,
Expand All @@ -522,7 +522,7 @@ func (f *FSM) executeOnePhase(ctx context.Context, p Params, phase storage.Opera

err = executor.PostCheck(ctx)
if err != nil {
executor.Errorf("Phase postcheck failed: %v.", err)
logger.WithError(err).Error("Phase postcheck failed.")
return trace.Wrap(err)
}

Expand Down Expand Up @@ -589,11 +589,12 @@ func (f *FSM) rollbackPhaseLocally(ctx context.Context, p Params, phase storage.

func (f *FSM) rollbackPhaseRemotely(ctx context.Context, p Params, phase storage.OperationPhase, server storage.Server) error {
return f.RunCommand(ctx, f.Runner, server, Params{
PhaseID: p.PhaseID,
Force: p.Force,
Resume: p.Resume,
Rollback: true,
Progress: p.Progress,
PhaseID: p.PhaseID,
OperationID: p.OperationID,
Force: p.Force,
Resume: p.Resume,
Rollback: true,
Progress: p.Progress,
})
}

Expand Down
9 changes: 9 additions & 0 deletions lib/fsm/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package fsm

import (
"context"
"fmt"

"github.com/gravitational/gravity/lib/constants"
"github.com/gravitational/gravity/lib/ops"
Expand Down Expand Up @@ -261,6 +262,14 @@ func OperationKey(plan storage.OperationPlan) ops.SiteOperationKey {
}
}

// OperationString returns the textual representation of this operation
func OperationString(plan storage.OperationPlan) string {
return fmt.Sprintf("%v(%v), cluster=%v",
ops.OperationTypeString(plan.OperationType),
plan.OperationID,
plan.ClusterName)
}

// CompleteOrFailOperation completes or fails the operation given by the plan in the specified operator.
// planErr optionally specifies the error to record in the failed message and record operation failure
func CompleteOrFailOperation(ctx context.Context, plan *storage.OperationPlan, operator ops.Operator, planErr string) (err error) {
Expand Down
3 changes: 1 addition & 2 deletions lib/localenv/clusterenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/gravitational/gravity/lib/ops/opsservice"
"github.com/gravitational/gravity/lib/pack"
"github.com/gravitational/gravity/lib/pack/localpack"
"github.com/gravitational/gravity/lib/storage"
"github.com/gravitational/gravity/lib/storage/keyval"
"github.com/gravitational/gravity/lib/systeminfo"
"github.com/gravitational/gravity/lib/users"
Expand Down Expand Up @@ -85,7 +84,7 @@ func (r *LocalEnvironment) NewClusterEnvironment(opts ...ClusterEnvironmentOptio
// ClusterEnvironment provides access to local cluster services
type ClusterEnvironment struct {
// Backend is the cluster etcd backend
Backend storage.Backend
Backend keyval.EtcdBackend
// Packages is the package service that talks to local storage
Packages pack.PackageService
// ClusterPackages is the package service that talks to cluster API
Expand Down
4 changes: 2 additions & 2 deletions lib/ops/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func Emit(ctx context.Context, operator ops.Operator, event events.Event, fields
}
err := emit(ctx, operator, event, allFields)
if err != nil {
log.Errorf("Failed to emit audit event %v %v: %v.",
event, fields, trace.DebugReport(err))
log.WithError(err).Errorf("Failed to emit audit event %v %v.",
event, fields)
}
}

Expand Down
10 changes: 8 additions & 2 deletions lib/ops/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -1164,7 +1164,13 @@ func (s *SiteOperation) String() string {

// TypeString returns the textual representation of the operation's type
func (s *SiteOperation) TypeString() string {
switch s.Type {
return OperationTypeString(s.Type)
}

// OperationTypeString formats the specified operation type
// as readable text
func OperationTypeString(typ string) string {
switch typ {
case OperationInstall:
return "install"
case OperationExpand:
Expand All @@ -1182,7 +1188,7 @@ func (s *SiteOperation) TypeString() string {
case OperationUpdateConfig:
return "update configuration"
default:
return s.Type
return typ
}
}

Expand Down
2 changes: 1 addition & 1 deletion lib/storage/keyval/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import (
"encoding/json"
"time"

log "github.com/sirupsen/logrus"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
log "github.com/sirupsen/logrus"
)

// backend implements storage interface, it also acts as a codec
Expand Down
77 changes: 56 additions & 21 deletions lib/storage/keyval/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/gravitational/gravity/lib/defaults"
"github.com/gravitational/gravity/lib/state"
"github.com/gravitational/gravity/lib/storage"
"github.com/gravitational/gravity/lib/utils"

"github.com/cenkalti/backoff"
Expand Down Expand Up @@ -59,13 +60,15 @@ func NewETCD(cfg ETCDConfig) (*electingBackend, error) {
return nil, trace.Wrap(err)
}

backend := &backend{
Clock: clock,
kvengine: engine,
}
return &electingBackend{
Backend: &backend{
Clock: clock,
kvengine: engine,
},
Leader: leader,
client: engine.client,
Backend: backend,
Leader: leader,
backend: backend,
engine: engine,
}, nil
}

Expand Down Expand Up @@ -101,6 +104,30 @@ func LocalEtcdConfig(retryTimeout time.Duration) (*ETCDConfig, error) {
}, nil
}

// EtcdBackend enables access to etcd-specific features
type EtcdBackend interface {
storage.Backend
// CloneWithOptions creates a shallow copy of this backend
// with the specified options applied
CloneWithOptions(opts ...EtcdOption) storage.Backend
}

// EtcdOption is a functional option to configure an etcd backend
type EtcdOption func(*etcdOptions)

// WithReadQuorum specifies that reads should go through the quorum,
// e.g. return the latest committed value applied in a quorum of members
func WithReadQuorum(quorum bool) EtcdOption {
return func(config *etcdOptions) {
config.GetOptions.Quorum = quorum
}
}

type etcdOptions struct {
// GetOptions specifies the options for Get API
GetOptions client.GetOptions
}

// Check checks if all the parameters are valid and sets defaults
func (cfg *ETCDConfig) Check() error {
if len(cfg.Key) == 0 {
Expand All @@ -124,11 +151,9 @@ func newEngine(cfg ETCDConfig, codec Codec) (*engine, error) {
return nil, trace.Wrap(err)
}
e := &engine{
cfg: cfg,
config: cfg,
nodes: cfg.Nodes,
etcdKey: strings.Split(cfg.Key, "/"),
cancelC: make(chan bool, 1),
stopC: make(chan bool, 1),
codec: codec,
}
if err := e.reconnect(); err != nil {
Expand All @@ -141,11 +166,21 @@ type engine struct {
client.KeysAPI
nodes []string
codec Codec
cfg ETCDConfig
config ETCDConfig
etcdKey []string
client client.Client
cancelC chan bool
stopC chan bool
options etcdOptions
}

func (e *engine) copyWithOptions(opts ...EtcdOption) *engine {
options := e.options
for _, opt := range opts {
opt(&options)
}
// Create a shallow copy of the engine
engine := *e
engine.options = options
return &engine
}

func (e *engine) key(prefix string, keys ...string) key {
Expand All @@ -171,9 +206,9 @@ func (e *engine) Close() error {

func (e *engine) reconnect() error {
info := transport.TLSInfo{
CAFile: e.cfg.TLSCAFile,
CertFile: e.cfg.TLSCertFile,
KeyFile: e.cfg.TLSKeyFile,
CAFile: e.config.TLSCAFile,
CertFile: e.config.TLSCertFile,
KeyFile: e.config.TLSKeyFile,
}
cfg, err := info.ClientConfig()
if err != nil {
Expand Down Expand Up @@ -201,7 +236,7 @@ func (e *engine) reconnect() error {
e.client = clt
e.KeysAPI = retryApi{
api: client.NewKeysAPI(e.client),
interval: e.cfg.RetryInterval,
interval: e.config.RetryInterval,
}

return nil
Expand Down Expand Up @@ -360,7 +395,7 @@ func (e *engine) compareAndSwapBytes(key key, val, prevVal []byte, outVal *[]byt
}

func (e *engine) getValBytes(key key) ([]byte, error) {
re, err := e.Get(context.TODO(), ekey(key), nil)
re, err := e.Get(context.TODO(), ekey(key), &e.options.GetOptions)
if err != nil {
return nil, convertErr(err)
}
Expand All @@ -371,7 +406,7 @@ func (e *engine) getValBytes(key key) ([]byte, error) {
}

func (e *engine) getVal(key key, val interface{}) error {
re, err := e.Get(context.TODO(), ekey(key), nil)
re, err := e.Get(context.TODO(), ekey(key), &e.options.GetOptions)
if err != nil {
return convertErr(err)
}
Expand Down Expand Up @@ -435,7 +470,7 @@ func (e *engine) releaseLock(key key) error {

func (e *engine) getKeys(key key) ([]string, error) {
var vals []string
re, err := e.Get(context.TODO(), ekey(key), nil)
re, err := e.Get(context.TODO(), ekey(key), &e.options.GetOptions)
err = convertErr(err)
if err != nil {
if trace.IsNotFound(err) {
Expand Down Expand Up @@ -572,11 +607,11 @@ func (r retryApi) retry(ctx context.Context, fn apiCall) (resp *client.Response,
err = backoff.Retry(func() (err error) {
resp, err = fn()
if utils.IsTransientClusterError(err) {
log.Debugf("retrying on transient etcd error: %v", err)
log.WithField("err", trace.UserMessage(err)).Debug("Retry on transient etcd error.")
return trace.Wrap(err)
}
if err != nil {
return &backoff.PermanentError{err}
return &backoff.PermanentError{Err: err}
}
return nil
}, b)
Expand Down
17 changes: 15 additions & 2 deletions lib/storage/keyval/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import (
type electingBackend struct {
storage.Backend
storage.Leader
client etcd.Client

backend *backend
engine *engine
}

// AddWatch starts watching the key for changes and sending them
Expand All @@ -48,7 +50,18 @@ func (b *electingBackend) StepDown() {
b.Leader.StepDown()
}

// CloneWithOptions returns a shallow copy of this backend with the specified options applied
func (b *electingBackend) CloneWithOptions(opts ...EtcdOption) storage.Backend {
engine := b.engine.copyWithOptions(opts...)
return &electingBackend{
Backend: b.backend,
Leader: b.Leader,
backend: b.backend,
engine: engine,
}
}

// api returns etcd API client used by tests
func (b *electingBackend) api() etcd.KeysAPI {
return etcd.NewKeysAPI(b.client)
return etcd.NewKeysAPI(b.engine.client)
}
5 changes: 4 additions & 1 deletion lib/update/cluster/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,12 @@ func (f *engine) reconcilePlan(ctx context.Context) error {
return trace.Wrap(err)
}
f.plan = *plan
// Use a type alias and not the string type directly as a workaround to
// log the plan unescaped.
var buf bytes.Buffer
type s string
fsm.FormatOperationPlanText(&buf, f.plan)
f.Debugf("Reconciled plan: %v.", buf.String())
f.WithField("plan", s(buf.String())).Debug("Reconciled plan.")
return nil
}

Expand Down
Loading

0 comments on commit 0ff7f47

Please sign in to comment.