Skip to content
This repository has been archived by the owner on Feb 9, 2024. It is now read-only.

Commit

Permalink
[6.1.x] Verify agents are active before upgrade resume or rollback (#…
Browse files Browse the repository at this point in the history
…2071)

* Verify agents are active before upgrade resume or rollback

* Attempt re-deploy agents if not active
  • Loading branch information
bernardjkim authored Sep 22, 2020
1 parent fc2cf3b commit 180ec21
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 27 deletions.
3 changes: 0 additions & 3 deletions lib/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,6 @@ const (
// AgentStopTimeout is amount of time agent gets to gracefully shut down
AgentStopTimeout = 10 * time.Second

// AgentStatusTimeout specifies the timeout for agent status query
AgentStatusTimeout = 5 * time.Second

// PeerConnectTimeout is the timeout of an RPC agent connecting to its peer
PeerConnectTimeout = 10 * time.Second

Expand Down
32 changes: 31 additions & 1 deletion lib/rpc/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@ package rpc

import (
"context"
"fmt"

"github.com/gravitational/gravity/lib/constants"
"github.com/gravitational/gravity/lib/defaults"
"github.com/gravitational/gravity/lib/rpc/proto"
"github.com/gravitational/gravity/lib/storage"
"github.com/gravitational/gravity/tool/common"

"github.com/buger/goterm"
"github.com/gravitational/trace"
"github.com/sirupsen/logrus"
)

// AgentStatus contains a gravity agent's status information.
Expand All @@ -44,7 +48,7 @@ type AgentStatus struct {
}

// CollectAgentStatus collects the status from the specified agents.
func CollectAgentStatus(ctx context.Context, servers storage.Servers, rpc AgentRepository) []AgentStatus {
func CollectAgentStatus(ctx context.Context, servers storage.Servers, rpc AgentRepository) StatusList {
statusCh := make(chan AgentStatus, len(servers))
for _, srv := range servers {
go func(server storage.Server) {
Expand Down Expand Up @@ -104,3 +108,29 @@ func getVersion(ctx context.Context, addr string, rpc AgentRepository) (*proto.V

return version, nil
}

// StatusList is a list of AgentStatus.
type StatusList []AgentStatus

// String returns the StatusList as a string.
func (r StatusList) String() string {
t := goterm.NewTable(0, 10, 5, ' ', 0)
common.PrintTableHeader(t, []string{"Hostname", "Address", "Status", "Version"})
for _, status := range r {
fmt.Fprintf(t, "%s\t%s\t%s\t%s\n", status.Hostname, status.Address, status.Status, status.Version)
if status.Error != nil {
logrus.WithError(status.Error).Debugf("Failed to collect agent status on %s.", status.Address)
}
}
return t.String()
}

// AgentsActive returns true if all gravity agents are active.
func (r StatusList) AgentsActive() bool {
for _, status := range r {
if status.Status == constants.GravityAgentOffline {
return false
}
}
return true
}
7 changes: 7 additions & 0 deletions tool/gravity/cli/clusterupdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func executeUpdatePhase(env *localenv.LocalEnvironment, environ LocalEnvironment
if operation.Type != ops.OperationUpdate {
return trace.NotFound("no active update operation found")
}

return executeUpdatePhaseForOperation(env, environ, params, operation.SiteOperation)
}

Expand All @@ -197,6 +198,12 @@ const gravityResumeServiceName = "gravity-resume.service"
// executeOrForkPhase either directly executes the specified operation phase,
// or launches a one-shot systemd service that executes it in the background.
func executeOrForkPhase(env *localenv.LocalEnvironment, updater updater, params PhaseParams, operation ops.SiteOperation) error {
if params.isResume() {
if err := verifyOrDeployAgents(env); err != nil {
return trace.Wrap(err)
}
}

// If given the --block flag, we're running as a systemd unit (or a user
// requested the command to execute in foreground), so proceed to perform
// the command (resume or single phase) directly.
Expand Down
5 changes: 5 additions & 0 deletions tool/gravity/cli/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ func rollbackPlan(localEnv *localenv.LocalEnvironment, environ LocalEnvironmentF
default:
return trace.BadParameter(unsupportedRollbackWarning, op.TypeString())
}

if err := verifyOrDeployAgents(localEnv); err != nil {
return trace.Wrap(err)
}

if !confirmed && !params.DryRun {
localEnv.Printf(planRollbackWarning, operationList{*op}.formatTable())
if err := enforceConfirmation("Proceed?"); err != nil {
Expand Down
60 changes: 37 additions & 23 deletions tool/gravity/cli/rpcagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ import (
"github.com/gravitational/gravity/lib/update"
clusterupdate "github.com/gravitational/gravity/lib/update/cluster"
"github.com/gravitational/gravity/lib/utils"
"github.com/gravitational/gravity/tool/common"

"github.com/buger/goterm"
"github.com/cenkalti/backoff"
teleclient "github.com/gravitational/teleport/lib/client"
"github.com/gravitational/trace"
Expand Down Expand Up @@ -455,49 +453,65 @@ func rpcAgentShutdown(env *localenv.LocalEnvironment) error {
func rpcAgentStatus(env *localenv.LocalEnvironment) error {
env.PrintStep("Collecting RPC agent status")

operator, err := env.SiteOperator()
statusList, err := collectAgentStatus(env)
if err != nil {
return trace.Wrap(err)
}

env.Println(statusList.String())

if !statusList.AgentsActive() {
return trace.BadParameter("some agents are offline")
}

return nil
}

// collectAgentStatus collects the gravity agent status from all members of the
// cluster.
func collectAgentStatus(env *localenv.LocalEnvironment) (statusList rpc.StatusList, err error) {
operator, err := env.SiteOperator()
if err != nil {
return statusList, trace.Wrap(err)
}

creds, err := fsm.GetClientCredentials()
if err != nil {
return trace.Wrap(err)
return statusList, trace.Wrap(err)
}

cluster, err := operator.GetLocalSite(context.TODO())
if err != nil {
return trace.Wrap(err)
return statusList, trace.Wrap(err)
}

timeout, err := utils.GetenvDuration(constants.AgentStatusTimeoutEnvVar)
if err != nil {
timeout = defaults.AgentStatusTimeout
timeout = defaults.AgentRequestTimeout
}

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

statusList := rpc.CollectAgentStatus(ctx, cluster.ClusterState.Servers, fsm.NewAgentRunner(creds))

var errs []error
statusList = rpc.CollectAgentStatus(ctx, cluster.ClusterState.Servers, fsm.NewAgentRunner(creds))
return statusList, nil
}

t := goterm.NewTable(0, 10, 5, ' ', 0)
common.PrintTableHeader(t, []string{"Hostname", "Address", "Status", "Version"})
for _, status := range statusList {
fmt.Fprintf(t, "%s\t%s\t%s\t%s\n", status.Hostname, status.Address, status.Status, status.Version)
if status.Error != nil {
log.WithError(status.Error).Debugf("Failed to collect agent status on %s.", status.Address)
errs = append(errs, status.Error)
}
// verifyOrDeployAgents verifies that all agents are active or attempts to
// re-deploy agents.
func verifyOrDeployAgents(env *localenv.LocalEnvironment) error {
statusList, err := collectAgentStatus(env)
if err != nil {
return trace.Wrap(err, "failed to collect agent status")
}
env.Println(t.String())

if len(errs) > 0 {
log.Warn("Some agents are offline.")
return trace.BadParameter("some agents are offline")
if statusList.AgentsActive() {
return nil
}
if err := rpcAgentDeploy(env, deployOptions{}); err != nil {
log.WithError(err).Error("Failed to deploy agents.")
env.Println(statusList.String())
return trace.BadParameter("some agents are offline; ensure all agents are deployed with `./gravity agent deploy`")
}

return nil
}

Expand Down

0 comments on commit 180ec21

Please sign in to comment.