From bfe9dbd0189b2967e4be6528e89a159b3f8cd2f7 Mon Sep 17 00:00:00 2001 From: Kevin Nisbet Date: Fri, 14 Aug 2020 12:14:32 -0400 Subject: [PATCH] Backport https://github.com/gravitational/gravity/pull/1914. (#2010) Fix data races in plan follower tests. Also use the static gravity path inside the container. Updates https://github.com/gravitational/gravity/issues/2006. (cherry picked from commit 722592a65f0de454d5d09572addde7177116913b) Co-authored-by: dmitri --- lib/app/hooks/diff.go | 11 ++--- lib/app/hooks/hooks.go | 4 +- lib/fsm/testhelpers.go | 14 ++++-- lib/rpc/server/callable.go | 73 ++++++++++++++++++++++-------- lib/update/cluster/phases/etcd.go | 74 +++++++++++++------------------ 5 files changed, 105 insertions(+), 71 deletions(-) diff --git a/lib/app/hooks/diff.go b/lib/app/hooks/diff.go index 0b971eed41..9e409dd1f6 100644 --- a/lib/app/hooks/diff.go +++ b/lib/app/hooks/diff.go @@ -21,7 +21,8 @@ import ( "fmt" batchv1 "k8s.io/api/batch/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" ) // diffPodSets returns a difference in Pods between existing and new. @@ -126,12 +127,12 @@ func (p *podDiff) String() string { return out.String() } -func describe(v interface{}) string { - switch val := v.(type) { +func describe(obj runtime.Object) string { + switch obj := obj.(type) { case *v1.Pod: - return fmt.Sprintf("Pod %q in namespace %q", val.Name, val.Namespace) + return fmt.Sprintf("Pod %q in namespace %q", obj.Name, obj.Namespace) case *batchv1.Job: - return fmt.Sprintf("Job %q in namespace %q", val.Name, val.Namespace) + return fmt.Sprintf("Job %q in namespace %q", obj.Name, obj.Namespace) } return "" } diff --git a/lib/app/hooks/hooks.go b/lib/app/hooks/hooks.go index d08a90d45a..298b52abf3 100644 --- a/lib/app/hooks/hooks.go +++ b/lib/app/hooks/hooks.go @@ -297,7 +297,7 @@ func (r *Runner) monitorPods(ctx context.Context, eventsC <-chan watch.Event, err := r.checkJob(ctx, &job, &jobControl, podSet, w) diff := humanize.RelTime(start, time.Now(), "elapsed", "elapsed") if err == nil { - fmt.Fprintf(w, "%v has completed, %v.\n", describe(job), diff) + fmt.Fprintf(w, "%v has completed, %v.\n", describe(&job), diff) return nil } log.Debugf("%v: %v", diff, err) @@ -313,7 +313,7 @@ func (r *Runner) monitorPods(ctx context.Context, eventsC <-chan watch.Event, diff = humanize.RelTime(start, time.Now(), "elapsed", "elapsed") err = r.checkJob(ctx, &job, &jobControl, podSet, w) if err == nil { - fmt.Fprintf(w, "%v has completed, %v.\n", describe(job), diff) + fmt.Fprintf(w, "%v has completed, %v.\n", describe(&job), diff) return nil } log.Debugf("%v: %v", diff, err) diff --git a/lib/fsm/testhelpers.go b/lib/fsm/testhelpers.go index 836cc5a94e..dcab78d055 100644 --- a/lib/fsm/testhelpers.go +++ b/lib/fsm/testhelpers.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "strings" + "sync" "time" "github.com/gravitational/gravity/lib/rpc" @@ -39,9 +40,10 @@ func newTestEngine(getPlan func() storage.OperationPlan) *testEngine { // testEngine is fsm engine used in tests. Keeps its changelog in memory. type testEngine struct { - getPlan func() storage.OperationPlan - changelog storage.PlanChangelog - clock clockwork.Clock + getPlan func() storage.OperationPlan + clock clockwork.Clock + sync.Mutex // guards following fields + changelog storage.PlanChangelog } // GetExecutor returns one of the test executors depending on the specified phase. @@ -58,28 +60,34 @@ func (t *testEngine) GetExecutor(p ExecutorParams, r Remote) (PhaseExecutor, err // ChangePhaseState records the provided phase state change in the test engine. func (t *testEngine) ChangePhaseState(ctx context.Context, ch StateChange) error { + t.Lock() t.changelog = append(t.changelog, storage.PlanChange{ PhaseID: ch.Phase, NewState: ch.State, // Make sure that new changelog entries get the most recent timestamp. Created: t.clock.Now().Add(time.Duration(len(t.changelog)) * time.Minute), }) + t.Unlock() return nil } // changePhaseStateWithTimestamp records the provided phase state change in the // test engine with the specified timestamp. func (t *testEngine) changePhaseStateWithTimestamp(ctx context.Context, ch StateChange, created time.Time) error { + t.Lock() t.changelog = append(t.changelog, storage.PlanChange{ PhaseID: ch.Phase, NewState: ch.State, Created: created, }) + t.Unlock() return nil } // GetPlan returns the test plan with the changelog applied. func (t *testEngine) GetPlan() (*storage.OperationPlan, error) { + t.Lock() + defer t.Unlock() return ResolvePlan(t.getPlan(), t.changelog), nil } diff --git a/lib/rpc/server/callable.go b/lib/rpc/server/callable.go index 0b02637409..58ca595ed1 100644 --- a/lib/rpc/server/callable.go +++ b/lib/rpc/server/callable.go @@ -17,6 +17,8 @@ limitations under the License. package server import ( + "bytes" + "io" "os/exec" "sync/atomic" "syscall" @@ -39,26 +41,26 @@ func osExec(ctx context.Context, stream pb.OutgoingMessageStream, args []string, } // exec executes the command specified with args streaming stdout/stderr to stream -// TODO: separate RPC failures (like failure to send messages to the stream) from command errors -func (c *osCommand) exec(ctx context.Context, stream pb.OutgoingMessageStream, args []string, log log.FieldLogger) error { +func (c *osCommand) exec(ctx context.Context, stream pb.OutgoingMessageStream, args []string, logger log.FieldLogger) error { seq := atomic.AddInt32(&c.seq, 1) + var errOut bytes.Buffer cmd := exec.CommandContext(ctx, args[0], args[1:]...) cmd.Stdout = &streamWriter{stream, pb.ExecOutput_STDOUT, seq} - cmd.Stderr = &streamWriter{stream, pb.ExecOutput_STDERR, seq} + cmd.Stderr = io.MultiWriter( + &streamWriter{stream, pb.ExecOutput_STDERR, seq}, + &errOut, + ) err := cmd.Start() if err != nil { - return trace.Wrap(err, "failed to start %v", cmd.Path) + return trace.Wrap(err, "failed to start").AddField("path", cmd.Path) } - stream.Send(&pb.Message{Element: &pb.Message_ExecStarted{&pb.ExecStarted{ - Args: args, - Seq: seq, - }}}) + notifyAndLogError(stream, newCommandStartedEvent(seq, args)) err = cmd.Wait() if err == nil { - err = stream.Send(&pb.Message{Element: &pb.Message_ExecCompleted{&pb.ExecCompleted{Seq: seq}}}) - return trace.Wrap(err) + notifyAndLogError(stream, newCommandCompletedEvent(seq)) + return nil } exitCode := ExitCodeUndefined @@ -68,17 +70,50 @@ func (c *osCommand) exec(ctx context.Context, stream pb.OutgoingMessageStream, a } } - errWrite := stream.Send(&pb.Message{Element: &pb.Message_ExecCompleted{&pb.ExecCompleted{ - Seq: seq, - ExitCode: int32(exitCode), - Error: pb.EncodeError(trace.Wrap(err)), - }}}) - if errWrite != nil { - log.WithError(errWrite).Warnf("Failed to send exec completed message: %v.", errWrite) - } + logger.WithField("output", errOut.String()).Warn("Command finished with error.") + notifyAndLogError(stream, newCommandCompletedWithErrorEvent(seq, int32(exitCode), err)) return trace.Wrap(err) } +func notifyAndLogError(stream pb.OutgoingMessageStream, msg *pb.Message) { + if err := stream.Send(msg); err != nil { + log.WithError(err).Warnf("Failed to notify stream: %v.", msg) + } +} + +func newCommandStartedEvent(seq int32, args []string) *pb.Message { + return &pb.Message{ + Element: &pb.Message_ExecStarted{ + ExecStarted: &pb.ExecStarted{ + Args: args, + Seq: seq, + }, + }, + } +} + +func newCommandCompletedEvent(seq int32) *pb.Message { + return &pb.Message{ + Element: &pb.Message_ExecCompleted{ + ExecCompleted: &pb.ExecCompleted{ + Seq: seq, + }, + }, + } +} + +func newCommandCompletedWithErrorEvent(seq, exitCode int32, err error) *pb.Message { + return &pb.Message{ + Element: &pb.Message_ExecCompleted{ + ExecCompleted: &pb.ExecCompleted{ + Seq: seq, + ExitCode: exitCode, + Error: pb.EncodeError(err), + }, + }, + } +} + type osCommand struct { seq int32 } @@ -97,7 +132,7 @@ func (s *streamWriter) Write(p []byte) (n int, err error) { Seq: s.seq, } - err = s.stream.Send(&pb.Message{Element: &pb.Message_ExecOutput{data}}) + err = s.stream.Send(&pb.Message{Element: &pb.Message_ExecOutput{ExecOutput: data}}) if err != nil { return 0, err } diff --git a/lib/update/cluster/phases/etcd.go b/lib/update/cluster/phases/etcd.go index 5b3b588b43..31aca1453b 100644 --- a/lib/update/cluster/phases/etcd.go +++ b/lib/update/cluster/phases/etcd.go @@ -73,24 +73,17 @@ func NewPhaseUpgradeEtcdBackup(logger log.FieldLogger) (fsm.PhaseExecutor, error }, nil } -func backupFile() (string, error) { - stateDir, err := state.GetStateDir() - if err != nil { - return "", trace.Wrap(err) - } - return filepath.Join(state.GravityUpdateDir(stateDir), defaults.EtcdUpgradeBackupFile), nil +func backupFile() (path string) { + return filepath.Join(state.GravityUpdateDir(defaults.GravityDir), defaults.EtcdUpgradeBackupFile) } func (p *PhaseUpgradeEtcdBackup) Execute(ctx context.Context) error { p.Info("Backup etcd.") - backupFile, err := backupFile() - if err != nil { - return trace.Wrap(err) - } - _, err = utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "backup", backupFile) + out, err := utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "backup", backupFile()) if err != nil { - return trace.Wrap(err, "failed to backup etcd") + return trace.Wrap(err, "failed to backup etcd").AddField("output", string(out)) } + p.Info("Command output: ", string(out)) return nil } @@ -130,9 +123,9 @@ func (p *PhaseUpgradeEtcdShutdown) Execute(ctx context.Context) error { p.Info("Shutdown etcd.") out, err := utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "disable", "--stop-api") if err != nil { - return trace.Wrap(err) + return trace.Wrap(err).AddField("output", string(out)) } - p.Info("command output: ", string(out)) + p.Info("Command output: ", string(out)) return nil } @@ -140,9 +133,9 @@ func (p *PhaseUpgradeEtcdShutdown) Rollback(ctx context.Context) error { p.Info("Enable etcd.") out, err := utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "enable") if err != nil { - return trace.Wrap(err) + return trace.Wrap(err).AddField("output", string(out)) } - p.Info("command output: ", string(out)) + p.Info("Command output: ", string(out)) if p.isLeader { return trace.Wrap(restartGravitySite(ctx, p.Client, p.FieldLogger)) @@ -179,15 +172,15 @@ func (p *PhaseUpgradeEtcd) Execute(ctx context.Context) error { // TODO(knisbet) only wipe the etcd database when required out, err := utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "upgrade") if err != nil { - return trace.Wrap(err) + return trace.Wrap(err).AddField("output", string(out)) } - p.Info("command output: ", string(out)) + p.Info("Command output: ", string(out)) out, err = utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "enable", "--upgrade") if err != nil { - return trace.Wrap(err) + return trace.Wrap(err).AddField("output", string(out)) } - p.Info("command output: ", string(out)) + p.Info("Command output: ", string(out)) return nil } @@ -196,15 +189,15 @@ func (p *PhaseUpgradeEtcd) Rollback(ctx context.Context) error { p.Info("Rollback upgrade of etcd.") out, err := utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "disable", "--upgrade") if err != nil { - return trace.Wrap(err) + return trace.Wrap(err).AddField("output", string(out)) } - p.Info("command output: ", string(out)) + p.Info("Command output: ", string(out)) out, err = utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "rollback") if err != nil { - return trace.Wrap(err) + return trace.Wrap(err).AddField("output", string(out)) } - p.Info("command output: ", string(out)) + p.Info("Command output: ", string(out)) return nil } @@ -235,14 +228,11 @@ func NewPhaseUpgradeEtcdRestore(phase storage.OperationPhase, logger log.FieldLo // 10. Restart etcd on the correct ports on first node // API outage ends func (p *PhaseUpgradeEtcdRestore) Execute(ctx context.Context) error { p.Info("Restore etcd data from backup.") - backupFile, err := backupFile() - if err != nil { - return trace.Wrap(err) - } - _, err = utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "restore", backupFile) + out, err := utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "restore", backupFile()) if err != nil { - return trace.Wrap(err) + return trace.Wrap(err).AddField("output", string(out)) } + p.Info("Command output: ", string(out)) return nil } @@ -256,9 +246,9 @@ func (p *PhaseUpgradeEtcdRestore) PreCheck(ctx context.Context) error { out, err := utils.RunCommand(ctx, p.FieldLogger, utils.PlanetCommandArgs(defaults.WaitForEtcdScript, "https://127.0.0.2:2379")...) if err != nil { - return trace.Wrap(err) + return trace.Wrap(err).AddField("output", string(out)) } - p.Info("command output: ", string(out)) + p.Info("Command output: ", string(out)) return nil } @@ -283,15 +273,15 @@ func (p *PhaseUpgradeEtcdRestart) Execute(ctx context.Context) error { p.Info("Restart etcd after upgrade.") out, err := utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "disable", "--upgrade") if err != nil { - return trace.Wrap(err) + return trace.Wrap(err).AddField("output", string(out)) } - p.Info("command output: ", string(out)) + p.Info("Command output: ", string(out)) out, err = utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "enable") if err != nil { - return trace.Wrap(err) + return trace.Wrap(err).AddField("output", string(out)) } - p.Info("command output: ", string(out)) + p.Info("Command output: ", string(out)) return nil } @@ -299,15 +289,15 @@ func (p *PhaseUpgradeEtcdRestart) Rollback(ctx context.Context) error { p.Info("Reenable etcd upgrade service.") out, err := utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "disable", "--stop-api") if err != nil { - return trace.Wrap(err) + return trace.Wrap(err).AddField("output", string(out)) } - p.Info("command output: ", string(out)) + p.Info("Command output: ", string(out)) out, err = utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "enable", "--upgrade") if err != nil { - return trace.Wrap(err) + return trace.Wrap(err).AddField("output", string(out)) } - p.Info("command output: ", string(out)) + p.Info("Command output: ", string(out)) return nil } @@ -358,9 +348,9 @@ func restartGravitySite(ctx context.Context, client *kubeapi.Clientset, l log.Fi // wait for etcd to form a cluster out, err := utils.RunCommand(ctx, l, utils.PlanetCommandArgs(defaults.WaitForEtcdScript)...) if err != nil { - return trace.Wrap(err) + return trace.Wrap(err).AddField("output", string(out)) } - l.Info("command output: ", string(out)) + l.Info("Command output: ", string(out)) // delete the gravity-site pods, in order to force them to restart // This is because the leader election process seems to break during the etcd upgrade