Skip to content
This repository has been archived by the owner on Feb 9, 2024. It is now read-only.

Commit

Permalink
Backport #1914. (#2010)
Browse files Browse the repository at this point in the history
Fix data races in plan follower tests.
Also use the static gravity path inside the container.

Updates #2006.

(cherry picked from commit 722592a)

Co-authored-by: dmitri <[email protected]>
  • Loading branch information
Kevin Nisbet and a-palchikov authored Aug 14, 2020
1 parent afa1cd6 commit bfe9dbd
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 71 deletions.
11 changes: 6 additions & 5 deletions lib/app/hooks/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import (
"fmt"

batchv1 "k8s.io/api/batch/v1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
)

// diffPodSets returns a difference in Pods between existing and new.
Expand Down Expand Up @@ -126,12 +127,12 @@ func (p *podDiff) String() string {
return out.String()
}

func describe(v interface{}) string {
switch val := v.(type) {
func describe(obj runtime.Object) string {
switch obj := obj.(type) {
case *v1.Pod:
return fmt.Sprintf("Pod %q in namespace %q", val.Name, val.Namespace)
return fmt.Sprintf("Pod %q in namespace %q", obj.Name, obj.Namespace)
case *batchv1.Job:
return fmt.Sprintf("Job %q in namespace %q", val.Name, val.Namespace)
return fmt.Sprintf("Job %q in namespace %q", obj.Name, obj.Namespace)
}
return "<unknown>"
}
Expand Down
4 changes: 2 additions & 2 deletions lib/app/hooks/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func (r *Runner) monitorPods(ctx context.Context, eventsC <-chan watch.Event,
err := r.checkJob(ctx, &job, &jobControl, podSet, w)
diff := humanize.RelTime(start, time.Now(), "elapsed", "elapsed")
if err == nil {
fmt.Fprintf(w, "%v has completed, %v.\n", describe(job), diff)
fmt.Fprintf(w, "%v has completed, %v.\n", describe(&job), diff)
return nil
}
log.Debugf("%v: %v", diff, err)
Expand All @@ -313,7 +313,7 @@ func (r *Runner) monitorPods(ctx context.Context, eventsC <-chan watch.Event,
diff = humanize.RelTime(start, time.Now(), "elapsed", "elapsed")
err = r.checkJob(ctx, &job, &jobControl, podSet, w)
if err == nil {
fmt.Fprintf(w, "%v has completed, %v.\n", describe(job), diff)
fmt.Fprintf(w, "%v has completed, %v.\n", describe(&job), diff)
return nil
}
log.Debugf("%v: %v", diff, err)
Expand Down
14 changes: 11 additions & 3 deletions lib/fsm/testhelpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"strings"
"sync"
"time"

"github.com/gravitational/gravity/lib/rpc"
Expand All @@ -39,9 +40,10 @@ func newTestEngine(getPlan func() storage.OperationPlan) *testEngine {

// testEngine is fsm engine used in tests. Keeps its changelog in memory.
type testEngine struct {
getPlan func() storage.OperationPlan
changelog storage.PlanChangelog
clock clockwork.Clock
getPlan func() storage.OperationPlan
clock clockwork.Clock
sync.Mutex // guards following fields
changelog storage.PlanChangelog
}

// GetExecutor returns one of the test executors depending on the specified phase.
Expand All @@ -58,28 +60,34 @@ func (t *testEngine) GetExecutor(p ExecutorParams, r Remote) (PhaseExecutor, err

// ChangePhaseState records the provided phase state change in the test engine.
func (t *testEngine) ChangePhaseState(ctx context.Context, ch StateChange) error {
t.Lock()
t.changelog = append(t.changelog, storage.PlanChange{
PhaseID: ch.Phase,
NewState: ch.State,
// Make sure that new changelog entries get the most recent timestamp.
Created: t.clock.Now().Add(time.Duration(len(t.changelog)) * time.Minute),
})
t.Unlock()
return nil
}

// changePhaseStateWithTimestamp records the provided phase state change in the
// test engine with the specified timestamp.
func (t *testEngine) changePhaseStateWithTimestamp(ctx context.Context, ch StateChange, created time.Time) error {
t.Lock()
t.changelog = append(t.changelog, storage.PlanChange{
PhaseID: ch.Phase,
NewState: ch.State,
Created: created,
})
t.Unlock()
return nil
}

// GetPlan returns the test plan with the changelog applied.
func (t *testEngine) GetPlan() (*storage.OperationPlan, error) {
t.Lock()
defer t.Unlock()
return ResolvePlan(t.getPlan(), t.changelog), nil
}

Expand Down
73 changes: 54 additions & 19 deletions lib/rpc/server/callable.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package server

import (
"bytes"
"io"
"os/exec"
"sync/atomic"
"syscall"
Expand All @@ -39,26 +41,26 @@ func osExec(ctx context.Context, stream pb.OutgoingMessageStream, args []string,
}

// exec executes the command specified with args streaming stdout/stderr to stream
// TODO: separate RPC failures (like failure to send messages to the stream) from command errors
func (c *osCommand) exec(ctx context.Context, stream pb.OutgoingMessageStream, args []string, log log.FieldLogger) error {
func (c *osCommand) exec(ctx context.Context, stream pb.OutgoingMessageStream, args []string, logger log.FieldLogger) error {
seq := atomic.AddInt32(&c.seq, 1)
var errOut bytes.Buffer
cmd := exec.CommandContext(ctx, args[0], args[1:]...)
cmd.Stdout = &streamWriter{stream, pb.ExecOutput_STDOUT, seq}
cmd.Stderr = &streamWriter{stream, pb.ExecOutput_STDERR, seq}
cmd.Stderr = io.MultiWriter(
&streamWriter{stream, pb.ExecOutput_STDERR, seq},
&errOut,
)

err := cmd.Start()
if err != nil {
return trace.Wrap(err, "failed to start %v", cmd.Path)
return trace.Wrap(err, "failed to start").AddField("path", cmd.Path)
}

stream.Send(&pb.Message{Element: &pb.Message_ExecStarted{&pb.ExecStarted{
Args: args,
Seq: seq,
}}})
notifyAndLogError(stream, newCommandStartedEvent(seq, args))
err = cmd.Wait()
if err == nil {
err = stream.Send(&pb.Message{Element: &pb.Message_ExecCompleted{&pb.ExecCompleted{Seq: seq}}})
return trace.Wrap(err)
notifyAndLogError(stream, newCommandCompletedEvent(seq))
return nil
}

exitCode := ExitCodeUndefined
Expand All @@ -68,17 +70,50 @@ func (c *osCommand) exec(ctx context.Context, stream pb.OutgoingMessageStream, a
}
}

errWrite := stream.Send(&pb.Message{Element: &pb.Message_ExecCompleted{&pb.ExecCompleted{
Seq: seq,
ExitCode: int32(exitCode),
Error: pb.EncodeError(trace.Wrap(err)),
}}})
if errWrite != nil {
log.WithError(errWrite).Warnf("Failed to send exec completed message: %v.", errWrite)
}
logger.WithField("output", errOut.String()).Warn("Command finished with error.")
notifyAndLogError(stream, newCommandCompletedWithErrorEvent(seq, int32(exitCode), err))
return trace.Wrap(err)
}

func notifyAndLogError(stream pb.OutgoingMessageStream, msg *pb.Message) {
if err := stream.Send(msg); err != nil {
log.WithError(err).Warnf("Failed to notify stream: %v.", msg)
}
}

func newCommandStartedEvent(seq int32, args []string) *pb.Message {
return &pb.Message{
Element: &pb.Message_ExecStarted{
ExecStarted: &pb.ExecStarted{
Args: args,
Seq: seq,
},
},
}
}

func newCommandCompletedEvent(seq int32) *pb.Message {
return &pb.Message{
Element: &pb.Message_ExecCompleted{
ExecCompleted: &pb.ExecCompleted{
Seq: seq,
},
},
}
}

func newCommandCompletedWithErrorEvent(seq, exitCode int32, err error) *pb.Message {
return &pb.Message{
Element: &pb.Message_ExecCompleted{
ExecCompleted: &pb.ExecCompleted{
Seq: seq,
ExitCode: exitCode,
Error: pb.EncodeError(err),
},
},
}
}

type osCommand struct {
seq int32
}
Expand All @@ -97,7 +132,7 @@ func (s *streamWriter) Write(p []byte) (n int, err error) {
Seq: s.seq,
}

err = s.stream.Send(&pb.Message{Element: &pb.Message_ExecOutput{data}})
err = s.stream.Send(&pb.Message{Element: &pb.Message_ExecOutput{ExecOutput: data}})
if err != nil {
return 0, err
}
Expand Down
74 changes: 32 additions & 42 deletions lib/update/cluster/phases/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,24 +73,17 @@ func NewPhaseUpgradeEtcdBackup(logger log.FieldLogger) (fsm.PhaseExecutor, error
}, nil
}

func backupFile() (string, error) {
stateDir, err := state.GetStateDir()
if err != nil {
return "", trace.Wrap(err)
}
return filepath.Join(state.GravityUpdateDir(stateDir), defaults.EtcdUpgradeBackupFile), nil
func backupFile() (path string) {
return filepath.Join(state.GravityUpdateDir(defaults.GravityDir), defaults.EtcdUpgradeBackupFile)
}

func (p *PhaseUpgradeEtcdBackup) Execute(ctx context.Context) error {
p.Info("Backup etcd.")
backupFile, err := backupFile()
if err != nil {
return trace.Wrap(err)
}
_, err = utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "backup", backupFile)
out, err := utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "backup", backupFile())
if err != nil {
return trace.Wrap(err, "failed to backup etcd")
return trace.Wrap(err, "failed to backup etcd").AddField("output", string(out))
}
p.Info("Command output: ", string(out))
return nil
}

Expand Down Expand Up @@ -130,19 +123,19 @@ func (p *PhaseUpgradeEtcdShutdown) Execute(ctx context.Context) error {
p.Info("Shutdown etcd.")
out, err := utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "disable", "--stop-api")
if err != nil {
return trace.Wrap(err)
return trace.Wrap(err).AddField("output", string(out))
}
p.Info("command output: ", string(out))
p.Info("Command output: ", string(out))
return nil
}

func (p *PhaseUpgradeEtcdShutdown) Rollback(ctx context.Context) error {
p.Info("Enable etcd.")
out, err := utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "enable")
if err != nil {
return trace.Wrap(err)
return trace.Wrap(err).AddField("output", string(out))
}
p.Info("command output: ", string(out))
p.Info("Command output: ", string(out))

if p.isLeader {
return trace.Wrap(restartGravitySite(ctx, p.Client, p.FieldLogger))
Expand Down Expand Up @@ -179,15 +172,15 @@ func (p *PhaseUpgradeEtcd) Execute(ctx context.Context) error {
// TODO(knisbet) only wipe the etcd database when required
out, err := utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "upgrade")
if err != nil {
return trace.Wrap(err)
return trace.Wrap(err).AddField("output", string(out))
}
p.Info("command output: ", string(out))
p.Info("Command output: ", string(out))

out, err = utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "enable", "--upgrade")
if err != nil {
return trace.Wrap(err)
return trace.Wrap(err).AddField("output", string(out))
}
p.Info("command output: ", string(out))
p.Info("Command output: ", string(out))

return nil
}
Expand All @@ -196,15 +189,15 @@ func (p *PhaseUpgradeEtcd) Rollback(ctx context.Context) error {
p.Info("Rollback upgrade of etcd.")
out, err := utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "disable", "--upgrade")
if err != nil {
return trace.Wrap(err)
return trace.Wrap(err).AddField("output", string(out))
}
p.Info("command output: ", string(out))
p.Info("Command output: ", string(out))

out, err = utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "rollback")
if err != nil {
return trace.Wrap(err)
return trace.Wrap(err).AddField("output", string(out))
}
p.Info("command output: ", string(out))
p.Info("Command output: ", string(out))

return nil
}
Expand Down Expand Up @@ -235,14 +228,11 @@ func NewPhaseUpgradeEtcdRestore(phase storage.OperationPhase, logger log.FieldLo
// 10. Restart etcd on the correct ports on first node // API outage ends
func (p *PhaseUpgradeEtcdRestore) Execute(ctx context.Context) error {
p.Info("Restore etcd data from backup.")
backupFile, err := backupFile()
if err != nil {
return trace.Wrap(err)
}
_, err = utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "restore", backupFile)
out, err := utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "restore", backupFile())
if err != nil {
return trace.Wrap(err)
return trace.Wrap(err).AddField("output", string(out))
}
p.Info("Command output: ", string(out))

return nil
}
Expand All @@ -256,9 +246,9 @@ func (p *PhaseUpgradeEtcdRestore) PreCheck(ctx context.Context) error {
out, err := utils.RunCommand(ctx, p.FieldLogger,
utils.PlanetCommandArgs(defaults.WaitForEtcdScript, "https://127.0.0.2:2379")...)
if err != nil {
return trace.Wrap(err)
return trace.Wrap(err).AddField("output", string(out))
}
p.Info("command output: ", string(out))
p.Info("Command output: ", string(out))
return nil
}

Expand All @@ -283,31 +273,31 @@ func (p *PhaseUpgradeEtcdRestart) Execute(ctx context.Context) error {
p.Info("Restart etcd after upgrade.")
out, err := utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "disable", "--upgrade")
if err != nil {
return trace.Wrap(err)
return trace.Wrap(err).AddField("output", string(out))
}
p.Info("command output: ", string(out))
p.Info("Command output: ", string(out))

out, err = utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "enable")
if err != nil {
return trace.Wrap(err)
return trace.Wrap(err).AddField("output", string(out))
}
p.Info("command output: ", string(out))
p.Info("Command output: ", string(out))
return nil
}

func (p *PhaseUpgradeEtcdRestart) Rollback(ctx context.Context) error {
p.Info("Reenable etcd upgrade service.")
out, err := utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "disable", "--stop-api")
if err != nil {
return trace.Wrap(err)
return trace.Wrap(err).AddField("output", string(out))
}
p.Info("command output: ", string(out))
p.Info("Command output: ", string(out))

out, err = utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "enable", "--upgrade")
if err != nil {
return trace.Wrap(err)
return trace.Wrap(err).AddField("output", string(out))
}
p.Info("command output: ", string(out))
p.Info("Command output: ", string(out))
return nil
}

Expand Down Expand Up @@ -358,9 +348,9 @@ func restartGravitySite(ctx context.Context, client *kubeapi.Clientset, l log.Fi
// wait for etcd to form a cluster
out, err := utils.RunCommand(ctx, l, utils.PlanetCommandArgs(defaults.WaitForEtcdScript)...)
if err != nil {
return trace.Wrap(err)
return trace.Wrap(err).AddField("output", string(out))
}
l.Info("command output: ", string(out))
l.Info("Command output: ", string(out))

// delete the gravity-site pods, in order to force them to restart
// This is because the leader election process seems to break during the etcd upgrade
Expand Down

0 comments on commit bfe9dbd

Please sign in to comment.