diff --git a/lib/fsm/format.go b/lib/fsm/format.go index b966a439a5..d1c50a749b 100644 --- a/lib/fsm/format.go +++ b/lib/fsm/format.go @@ -59,6 +59,7 @@ func FormatOperationPlanJSON(w io.Writer, plan storage.OperationPlan) error { func FormatOperationPlanText(w io.Writer, plan storage.OperationPlan) { var t tabwriter.Writer t.Init(w, 0, 10, 5, ' ', 0) + fmt.Fprintln(&t, "Operation: ", OperationString(plan)) common.PrintTableHeader(&t, []string{"Phase", "Description", "State", "Node", "Requires", "Updated"}) for _, phase := range plan.Phases { printPhase(&t, phase, 0) diff --git a/lib/fsm/fsm.go b/lib/fsm/fsm.go index 1906369f62..28bfdd66a9 100644 --- a/lib/fsm/fsm.go +++ b/lib/fsm/fsm.go @@ -488,10 +488,10 @@ func (f *FSM) executeOnePhase(ctx context.Context, p Params, phase storage.Opera if err != nil { return trace.Wrap(err) } - + logger := executor.WithField("phase", phase.ID) err = executor.PreCheck(ctx) if err != nil { - executor.Errorf("Phase precheck failed: %v.", err) + logger.WithError(err).Error("Phase precheck failed.") return trace.Wrap(err) } @@ -504,11 +504,11 @@ func (f *FSM) executeOnePhase(ctx context.Context, p Params, phase storage.Opera return trace.Wrap(err) } - executor.Infof("Executing phase: %v.", phase.ID) + logger.Info("Executing phase.") err = executor.Execute(ctx) if err != nil { - executor.Errorf("Phase execution failed: %v.", err) + logger.WithError(err).Error("Phase execution failed.") if err := f.ChangePhaseState(ctx, StateChange{ Phase: phase.ID, @@ -522,7 +522,7 @@ func (f *FSM) executeOnePhase(ctx context.Context, p Params, phase storage.Opera err = executor.PostCheck(ctx) if err != nil { - executor.Errorf("Phase postcheck failed: %v.", err) + logger.WithError(err).Error("Phase postcheck failed.") return trace.Wrap(err) } @@ -589,11 +589,12 @@ func (f *FSM) rollbackPhaseLocally(ctx context.Context, p Params, phase storage. func (f *FSM) rollbackPhaseRemotely(ctx context.Context, p Params, phase storage.OperationPhase, server storage.Server) error { return f.RunCommand(ctx, f.Runner, server, Params{ - PhaseID: p.PhaseID, - Force: p.Force, - Resume: p.Resume, - Rollback: true, - Progress: p.Progress, + PhaseID: p.PhaseID, + OperationID: p.OperationID, + Force: p.Force, + Resume: p.Resume, + Rollback: true, + Progress: p.Progress, }) } diff --git a/lib/fsm/utils.go b/lib/fsm/utils.go index 33393c4d65..cc59389135 100644 --- a/lib/fsm/utils.go +++ b/lib/fsm/utils.go @@ -18,6 +18,7 @@ package fsm import ( "context" + "fmt" "github.com/gravitational/gravity/lib/constants" "github.com/gravitational/gravity/lib/ops" @@ -261,6 +262,14 @@ func OperationKey(plan storage.OperationPlan) ops.SiteOperationKey { } } +// OperationString returns the textual representation of this operation +func OperationString(plan storage.OperationPlan) string { + return fmt.Sprintf("%v(%v), cluster=%v", + ops.OperationTypeString(plan.OperationType), + plan.OperationID, + plan.ClusterName) +} + // CompleteOrFailOperation completes or fails the operation given by the plan in the specified operator. // planErr optionally specifies the error to record in the failed message and record operation failure func CompleteOrFailOperation(ctx context.Context, plan *storage.OperationPlan, operator ops.Operator, planErr string) (err error) { diff --git a/lib/localenv/clusterenv.go b/lib/localenv/clusterenv.go index 262dfbc74d..344a049e98 100644 --- a/lib/localenv/clusterenv.go +++ b/lib/localenv/clusterenv.go @@ -31,7 +31,6 @@ import ( "github.com/gravitational/gravity/lib/ops/opsservice" "github.com/gravitational/gravity/lib/pack" "github.com/gravitational/gravity/lib/pack/localpack" - "github.com/gravitational/gravity/lib/storage" "github.com/gravitational/gravity/lib/storage/keyval" "github.com/gravitational/gravity/lib/systeminfo" "github.com/gravitational/gravity/lib/users" @@ -85,7 +84,7 @@ func (r *LocalEnvironment) NewClusterEnvironment(opts ...ClusterEnvironmentOptio // ClusterEnvironment provides access to local cluster services type ClusterEnvironment struct { // Backend is the cluster etcd backend - Backend storage.Backend + Backend keyval.EtcdBackend // Packages is the package service that talks to local storage Packages pack.PackageService // ClusterPackages is the package service that talks to cluster API diff --git a/lib/ops/events/events.go b/lib/ops/events/events.go index 4d8719feaa..f505120ae8 100644 --- a/lib/ops/events/events.go +++ b/lib/ops/events/events.go @@ -41,8 +41,8 @@ func Emit(ctx context.Context, operator ops.Operator, event events.Event, fields } err := emit(ctx, operator, event, allFields) if err != nil { - log.Errorf("Failed to emit audit event %v %v: %v.", - event, fields, trace.DebugReport(err)) + log.WithError(err).Errorf("Failed to emit audit event %v %v.", + event, fields) } } diff --git a/lib/ops/ops.go b/lib/ops/ops.go index 058297b9d3..8058128943 100644 --- a/lib/ops/ops.go +++ b/lib/ops/ops.go @@ -1164,7 +1164,13 @@ func (s *SiteOperation) String() string { // TypeString returns the textual representation of the operation's type func (s *SiteOperation) TypeString() string { - switch s.Type { + return OperationTypeString(s.Type) +} + +// OperationTypeString formats the specified operation type +// as readable text +func OperationTypeString(typ string) string { + switch typ { case OperationInstall: return "install" case OperationExpand: @@ -1182,7 +1188,7 @@ func (s *SiteOperation) TypeString() string { case OperationUpdateConfig: return "update configuration" default: - return s.Type + return typ } } diff --git a/lib/storage/keyval/backend.go b/lib/storage/keyval/backend.go index 529efea24d..281f9c0069 100644 --- a/lib/storage/keyval/backend.go +++ b/lib/storage/keyval/backend.go @@ -21,9 +21,9 @@ import ( "encoding/json" "time" - log "github.com/sirupsen/logrus" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" + log "github.com/sirupsen/logrus" ) // backend implements storage interface, it also acts as a codec diff --git a/lib/storage/keyval/etcd.go b/lib/storage/keyval/etcd.go index cb25f1df9c..5937dbe7d1 100644 --- a/lib/storage/keyval/etcd.go +++ b/lib/storage/keyval/etcd.go @@ -26,6 +26,7 @@ import ( "github.com/gravitational/gravity/lib/defaults" "github.com/gravitational/gravity/lib/state" + "github.com/gravitational/gravity/lib/storage" "github.com/gravitational/gravity/lib/utils" "github.com/cenkalti/backoff" @@ -59,13 +60,15 @@ func NewETCD(cfg ETCDConfig) (*electingBackend, error) { return nil, trace.Wrap(err) } + backend := &backend{ + Clock: clock, + kvengine: engine, + } return &electingBackend{ - Backend: &backend{ - Clock: clock, - kvengine: engine, - }, - Leader: leader, - client: engine.client, + Backend: backend, + Leader: leader, + backend: backend, + engine: engine, }, nil } @@ -101,6 +104,30 @@ func LocalEtcdConfig(retryTimeout time.Duration) (*ETCDConfig, error) { }, nil } +// EtcdBackend enables access to etcd-specific features +type EtcdBackend interface { + storage.Backend + // CloneWithOptions creates a shallow copy of this backend + // with the specified options applied + CloneWithOptions(opts ...EtcdOption) storage.Backend +} + +// EtcdOption is a functional option to configure an etcd backend +type EtcdOption func(*etcdOptions) + +// WithReadQuorum specifies that reads should go through the quorum, +// e.g. return the latest committed value applied in a quorum of members +func WithReadQuorum(quorum bool) EtcdOption { + return func(config *etcdOptions) { + config.GetOptions.Quorum = quorum + } +} + +type etcdOptions struct { + // GetOptions specifies the options for Get API + GetOptions client.GetOptions +} + // Check checks if all the parameters are valid and sets defaults func (cfg *ETCDConfig) Check() error { if len(cfg.Key) == 0 { @@ -124,11 +151,9 @@ func newEngine(cfg ETCDConfig, codec Codec) (*engine, error) { return nil, trace.Wrap(err) } e := &engine{ - cfg: cfg, + config: cfg, nodes: cfg.Nodes, etcdKey: strings.Split(cfg.Key, "/"), - cancelC: make(chan bool, 1), - stopC: make(chan bool, 1), codec: codec, } if err := e.reconnect(); err != nil { @@ -141,11 +166,21 @@ type engine struct { client.KeysAPI nodes []string codec Codec - cfg ETCDConfig + config ETCDConfig etcdKey []string client client.Client - cancelC chan bool - stopC chan bool + options etcdOptions +} + +func (e *engine) copyWithOptions(opts ...EtcdOption) *engine { + options := e.options + for _, opt := range opts { + opt(&options) + } + // Create a shallow copy of the engine + engine := *e + engine.options = options + return &engine } func (e *engine) key(prefix string, keys ...string) key { @@ -171,9 +206,9 @@ func (e *engine) Close() error { func (e *engine) reconnect() error { info := transport.TLSInfo{ - CAFile: e.cfg.TLSCAFile, - CertFile: e.cfg.TLSCertFile, - KeyFile: e.cfg.TLSKeyFile, + CAFile: e.config.TLSCAFile, + CertFile: e.config.TLSCertFile, + KeyFile: e.config.TLSKeyFile, } cfg, err := info.ClientConfig() if err != nil { @@ -201,7 +236,7 @@ func (e *engine) reconnect() error { e.client = clt e.KeysAPI = retryApi{ api: client.NewKeysAPI(e.client), - interval: e.cfg.RetryInterval, + interval: e.config.RetryInterval, } return nil @@ -360,7 +395,7 @@ func (e *engine) compareAndSwapBytes(key key, val, prevVal []byte, outVal *[]byt } func (e *engine) getValBytes(key key) ([]byte, error) { - re, err := e.Get(context.TODO(), ekey(key), nil) + re, err := e.Get(context.TODO(), ekey(key), &e.options.GetOptions) if err != nil { return nil, convertErr(err) } @@ -371,7 +406,7 @@ func (e *engine) getValBytes(key key) ([]byte, error) { } func (e *engine) getVal(key key, val interface{}) error { - re, err := e.Get(context.TODO(), ekey(key), nil) + re, err := e.Get(context.TODO(), ekey(key), &e.options.GetOptions) if err != nil { return convertErr(err) } @@ -435,7 +470,7 @@ func (e *engine) releaseLock(key key) error { func (e *engine) getKeys(key key) ([]string, error) { var vals []string - re, err := e.Get(context.TODO(), ekey(key), nil) + re, err := e.Get(context.TODO(), ekey(key), &e.options.GetOptions) err = convertErr(err) if err != nil { if trace.IsNotFound(err) { @@ -572,11 +607,11 @@ func (r retryApi) retry(ctx context.Context, fn apiCall) (resp *client.Response, err = backoff.Retry(func() (err error) { resp, err = fn() if utils.IsTransientClusterError(err) { - log.Debugf("retrying on transient etcd error: %v", err) + log.WithField("err", trace.UserMessage(err)).Debug("Retry on transient etcd error.") return trace.Wrap(err) } if err != nil { - return &backoff.PermanentError{err} + return &backoff.PermanentError{Err: err} } return nil }, b) diff --git a/lib/storage/keyval/leader.go b/lib/storage/keyval/leader.go index be0f6a0a49..dc89d5f55f 100644 --- a/lib/storage/keyval/leader.go +++ b/lib/storage/keyval/leader.go @@ -27,7 +27,9 @@ import ( type electingBackend struct { storage.Backend storage.Leader - client etcd.Client + + backend *backend + engine *engine } // AddWatch starts watching the key for changes and sending them @@ -48,7 +50,18 @@ func (b *electingBackend) StepDown() { b.Leader.StepDown() } +// CloneWithOptions returns a shallow copy of this backend with the specified options applied +func (b *electingBackend) CloneWithOptions(opts ...EtcdOption) storage.Backend { + engine := b.engine.copyWithOptions(opts...) + return &electingBackend{ + Backend: b.backend, + Leader: b.Leader, + backend: b.backend, + engine: engine, + } +} + // api returns etcd API client used by tests func (b *electingBackend) api() etcd.KeysAPI { - return etcd.NewKeysAPI(b.client) + return etcd.NewKeysAPI(b.engine.client) } diff --git a/lib/update/cluster/engine.go b/lib/update/cluster/engine.go index b72cf85ad0..edfca40afc 100644 --- a/lib/update/cluster/engine.go +++ b/lib/update/cluster/engine.go @@ -315,9 +315,12 @@ func (f *engine) reconcilePlan(ctx context.Context) error { return trace.Wrap(err) } f.plan = *plan + // Use a type alias and not the string type directly as a workaround to + // log the plan unescaped. var buf bytes.Buffer + type s string fsm.FormatOperationPlanText(&buf, f.plan) - f.Debugf("Reconciled plan: %v.", buf.String()) + f.WithField("plan", s(buf.String())).Debug("Reconciled plan.") return nil } diff --git a/lib/update/reconciler.go b/lib/update/reconciler.go index 874b04da45..195cd2dea3 100644 --- a/lib/update/reconciler.go +++ b/lib/update/reconciler.go @@ -18,12 +18,15 @@ package update import ( "context" "os/exec" + "time" "github.com/gravitational/gravity/lib/defaults" "github.com/gravitational/gravity/lib/fsm" "github.com/gravitational/gravity/lib/storage" + "github.com/gravitational/gravity/lib/storage/keyval" "github.com/gravitational/gravity/lib/utils" + "github.com/cenkalti/backoff" "github.com/gravitational/trace" "github.com/sirupsen/logrus" ) @@ -36,7 +39,7 @@ type Reconciler interface { // NewDefaultReconciler returns an implementation of Reconciler that syncs changes between // the authoritative and the remote backends -func NewDefaultReconciler(remote, authoritative storage.Backend, clusterName, operationID string, logger logrus.FieldLogger) *reconciler { +func NewDefaultReconciler(remote keyval.EtcdBackend, authoritative storage.Backend, clusterName, operationID string, logger logrus.FieldLogger) *reconciler { return &reconciler{ FieldLogger: logger, backend: remote, @@ -71,7 +74,7 @@ func (r *reconciler) trySyncChangelogToEtcd(ctx context.Context) error { } if shouldSync { - return trace.Wrap(r.syncChangelog(r.localBackend, r.backend)) + return trace.Wrap(r.syncChangelog(ctx, r.localBackend, r.backend)) } return nil @@ -84,22 +87,35 @@ func (r *reconciler) trySyncChangelogFromEtcd(ctx context.Context) error { } if shouldSync { - return trace.Wrap(r.syncChangelog(r.backend, r.localBackend)) + r.Debug("Use quorum reads for plan sync.") + // Use consistent reads when querying the operation plan to avoid + // reading stale values. + // TODO(v3): This is only required for etcd client v2 as client v3 defaults + // to quorum reads and offers clientv3.WithSerializable() as an opt-out. + // See https://etcd.io/docs/v2/faq/ and https://github.com/etcd-io/etcd/issues/6829 for details + consistentSrc := r.backend.CloneWithOptions(keyval.WithReadQuorum(true)) + return trace.Wrap(r.syncChangelog(ctx, consistentSrc, r.localBackend)) } return nil } // syncChangelog will sync changelog entries from src to dst storage -func (r *reconciler) syncChangelog(src storage.Backend, dst storage.Backend) error { - return trace.Wrap(SyncChangelog(src, dst, r.cluster, r.operationID)) +func (r *reconciler) syncChangelog(ctx context.Context, src, dst storage.Backend) error { + b := backoff.NewExponentialBackOff() + b.MaxInterval = 15 * time.Second + b.MaxElapsedTime = 1 * time.Minute + return utils.RetryTransient(ctx, b, func() error { + return SyncChangelog(src, dst, r.cluster, r.operationID) + }) } type reconciler struct { logrus.FieldLogger - backend, localBackend storage.Backend - cluster string - operationID string + backend keyval.EtcdBackend + localBackend storage.Backend + cluster string + operationID string } // SyncChangelog will sync changelog entries from src to dst storage diff --git a/lib/update/updater.go b/lib/update/updater.go index 32164889e2..e40fbf239a 100644 --- a/lib/update/updater.go +++ b/lib/update/updater.go @@ -31,6 +31,7 @@ import ( "github.com/gravitational/gravity/lib/pack" "github.com/gravitational/gravity/lib/rpc" "github.com/gravitational/gravity/lib/storage" + "github.com/gravitational/gravity/lib/storage/keyval" "github.com/gravitational/gravity/lib/utils" "github.com/gravitational/trace" @@ -281,7 +282,7 @@ type Config struct { // Operator is the cluster operator service Operator ops.Operator // Backend specifies the cluster backend - Backend storage.Backend + Backend keyval.EtcdBackend // LocalBackend specifies the authoritative source for operation state LocalBackend storage.Backend // Runner specifies the runner for remote commands diff --git a/tool/gravity/cli/clusterupdate.go b/tool/gravity/cli/clusterupdate.go index 2482540cc8..652d0dd7f1 100644 --- a/tool/gravity/cli/clusterupdate.go +++ b/tool/gravity/cli/clusterupdate.go @@ -41,12 +41,12 @@ import ( "github.com/gravitational/gravity/lib/systemservice" "github.com/gravitational/gravity/lib/update" clusterupdate "github.com/gravitational/gravity/lib/update/cluster" - "github.com/gravitational/version" "github.com/coreos/go-semver/semver" "github.com/fatih/color" "github.com/ghodss/yaml" "github.com/gravitational/trace" + "github.com/gravitational/version" ) func updateCheck(env *localenv.LocalEnvironment, updatePackage string) error { @@ -183,11 +183,16 @@ func executeUpdatePhaseForOperation(env *localenv.LocalEnvironment, environ Loca return trace.Wrap(err) } defer updateEnv.Close() - updater, err := getClusterUpdater(env, updateEnv, operation, params.SkipVersionCheck) + updater, err := getClusterUpdater(env, updateEnv, operation) if err != nil { return trace.Wrap(err) } defer updater.Close() + if !params.SkipVersionCheck { + if err := validateBinaryVersion(updater); err != nil { + return trace.Wrap(err) + } + } return executeOrForkPhase(env, updater, params, operation) } @@ -284,11 +289,16 @@ func rollbackUpdatePhaseForOperation(env *localenv.LocalEnvironment, environ Loc return trace.Wrap(err) } defer updateEnv.Close() - updater, err := getClusterUpdater(env, updateEnv, operation, params.SkipVersionCheck) + updater, err := getClusterUpdater(env, updateEnv, operation) if err != nil { return trace.Wrap(err) } defer updater.Close() + if !params.SkipVersionCheck { + if err := validateBinaryVersion(updater); err != nil { + return trace.Wrap(err) + } + } err = updater.RollbackPhase(context.TODO(), fsm.Params{ PhaseID: params.PhaseID, Force: params.Force, @@ -303,7 +313,7 @@ func setUpdatePhaseForOperation(env *localenv.LocalEnvironment, environ LocalEnv return trace.Wrap(err) } defer updateEnv.Close() - updater, err := getClusterUpdater(env, updateEnv, operation, true) + updater, err := getClusterUpdater(env, updateEnv, operation) if err != nil { return trace.Wrap(err) } @@ -317,7 +327,7 @@ func completeUpdatePlanForOperation(env *localenv.LocalEnvironment, environ Loca return trace.Wrap(err) } defer updateEnv.Close() - updater, err := getClusterUpdater(env, updateEnv, operation, true) + updater, err := getClusterUpdater(env, updateEnv, operation) if err != nil { return trace.Wrap(err) } @@ -331,7 +341,7 @@ func completeUpdatePlanForOperation(env *localenv.LocalEnvironment, environ Loca return nil } -func getClusterUpdater(localEnv, updateEnv *localenv.LocalEnvironment, operation ops.SiteOperation, noValidateVersion bool) (*update.Updater, error) { +func getClusterUpdater(localEnv, updateEnv *localenv.LocalEnvironment, operation ops.SiteOperation) (*update.Updater, error) { clusterEnv, err := localEnv.NewClusterEnvironment() if err != nil { return nil, trace.Wrap(err) @@ -364,12 +374,6 @@ func getClusterUpdater(localEnv, updateEnv *localenv.LocalEnvironment, operation if err != nil { return nil, trace.Wrap(err) } - if noValidateVersion { - return updater, nil - } - if err := validateBinaryVersion(updater); err != nil { - return nil, trace.Wrap(err) - } return updater, nil } diff --git a/tool/gravity/cli/rpcagent.go b/tool/gravity/cli/rpcagent.go index ecf10ad593..d1eafd0fd4 100644 --- a/tool/gravity/cli/rpcagent.go +++ b/tool/gravity/cli/rpcagent.go @@ -17,6 +17,7 @@ limitations under the License. package cli import ( + "bytes" "context" "fmt" "io" @@ -546,6 +547,30 @@ func getGravityPackage() loc.Locator { } } +// String returns a textual representation of this request suitable +// for logging +func (r deployAgentsRequest) String() string { + var buf bytes.Buffer + fmt.Fprint(&buf, "deploy(cluster=", r.cluster.Domain) + if r.leader != nil { + fmt.Fprint(&buf, ",leader(addr=", r.leader.AdvertiseIP, ",params=", r.leaderParams, ")") + } + if r.version != "" { + fmt.Fprint(&buf, ",version=", r.version) + } + if len(r.servers) != 0 { + fmt.Fprint(&buf, ",servers(") + } + for _, s := range r.servers { + fmt.Fprint(&buf, "addr=", s.AdvertiseIP, ",") + } + if len(r.servers) != 0 { + fmt.Fprint(&buf, ")") + } + fmt.Fprint(&buf, ")") + return buf.String() +} + type deployAgentsRequest struct { clusterEnv *localenv.ClusterEnvironment clusterState storage.ClusterState diff --git a/tool/gravity/cli/update.go b/tool/gravity/cli/update.go index 260d23677b..80eb9cb8ce 100644 --- a/tool/gravity/cli/update.go +++ b/tool/gravity/cli/update.go @@ -126,7 +126,7 @@ func newUpdater(ctx context.Context, localEnv, updateEnv *localenv.LocalEnvironm }) deployCtx, cancel := context.WithTimeout(ctx, defaults.AgentDeployTimeout) defer cancel() - logger.WithField("request", req).Debug("Deploying agents on cluster nodes.") + logger.WithField("request", req.String()).Debug("Deploying agents on cluster nodes.") localEnv.PrintStep("Deploying agents on cluster nodes") creds, err := deployAgents(deployCtx, localEnv, req) if err != nil {