Skip to content
This repository has been archived by the owner on Feb 9, 2024. It is now read-only.

Commit

Permalink
Do not stream stderr to the report tarball. (#326)
Browse files Browse the repository at this point in the history
  • Loading branch information
a-palchikov authored Mar 19, 2019
1 parent 04eaf7c commit 0146844
Show file tree
Hide file tree
Showing 11 changed files with 76 additions and 99 deletions.
36 changes: 18 additions & 18 deletions lib/checks/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -885,21 +885,21 @@ func defaultPortChecker(options *validationpb.ValidateOptions) health.Checker {
}

var portRanges = []monitoring.PortRange{
monitoring.PortRange{Protocol: "tcp", From: 7496, To: 7496, Description: "serf (health check agents) peer to peer"},
monitoring.PortRange{Protocol: "tcp", From: 7373, To: 7373, Description: "serf (health check agents) peer to peer"},
monitoring.PortRange{Protocol: "tcp", From: 2379, To: 2380, Description: "etcd"},
monitoring.PortRange{Protocol: "tcp", From: 4001, To: 4001, Description: "etcd"},
monitoring.PortRange{Protocol: "tcp", From: 7001, To: 7001, Description: "etcd"},
monitoring.PortRange{Protocol: "tcp", From: 6443, To: 6443, Description: "kubernetes API server"},
monitoring.PortRange{Protocol: "tcp", From: 30000, To: 32767, Description: "kubernetes internal services range"},
monitoring.PortRange{Protocol: "tcp", From: 10248, To: 10255, Description: "kubernetes internal services range"},
monitoring.PortRange{Protocol: "tcp", From: 5000, To: 5000, Description: "docker registry"},
monitoring.PortRange{Protocol: "tcp", From: 3022, To: 3025, Description: "teleport internal SSH control panel"},
monitoring.PortRange{Protocol: "tcp", From: 3080, To: 3080, Description: "teleport Web UI"},
monitoring.PortRange{Protocol: "tcp", From: 3008, To: 3011, Description: "internal Gravity services"},
monitoring.PortRange{Protocol: "tcp", From: 32009, To: 32009, Description: "Gravity OpsCenter control panel"},
monitoring.PortRange{Protocol: "tcp", From: 7575, To: 7575, Description: "Gravity RPC agent"},
monitoring.PortRange{Protocol: "udp", From: vxlanPort, To: vxlanPort, Description: "overlay network"},
{Protocol: "tcp", From: 7496, To: 7496, Description: "serf (health check agents) peer to peer"},
{Protocol: "tcp", From: 7373, To: 7373, Description: "serf (health check agents) peer to peer"},
{Protocol: "tcp", From: 2379, To: 2380, Description: "etcd"},
{Protocol: "tcp", From: 4001, To: 4001, Description: "etcd"},
{Protocol: "tcp", From: 7001, To: 7001, Description: "etcd"},
{Protocol: "tcp", From: 6443, To: 6443, Description: "kubernetes API server"},
{Protocol: "tcp", From: 30000, To: 32767, Description: "kubernetes internal services range"},
{Protocol: "tcp", From: 10248, To: 10255, Description: "kubernetes internal services range"},
{Protocol: "tcp", From: 5000, To: 5000, Description: "docker registry"},
{Protocol: "tcp", From: 3022, To: 3025, Description: "teleport internal SSH control panel"},
{Protocol: "tcp", From: 3080, To: 3080, Description: "teleport Web UI"},
{Protocol: "tcp", From: 3008, To: 3011, Description: "internal Gravity services"},
{Protocol: "tcp", From: 32009, To: 32009, Description: "Gravity OpsCenter control panel"},
{Protocol: "tcp", From: 7575, To: 7575, Description: "Gravity RPC agent"},
{Protocol: "udp", From: vxlanPort, To: vxlanPort, Description: "overlay network"},
}

dnsConfig := storage.DefaultDNSConfig
Expand Down Expand Up @@ -984,7 +984,7 @@ func constructBandwidthRequest(servers []Server) (PingPongGame, error) {
}
game[server.AdvertiseIP] = PingPongRequest{
Duration: defaults.BandwidthTestDuration,
Listen: []validationpb.Addr{validationpb.Addr{
Listen: []validationpb.Addr{{
Addr: server.AdvertiseIP,
}},
Ping: remote,
Expand Down Expand Up @@ -1026,8 +1026,8 @@ func ifTestsDisabled() bool {

// RunStream executes the specified command on r.server.
// Implements utils.CommandRunner
func (r *serverRemote) RunStream(w io.Writer, args ...string) error {
return trace.Wrap(r.remote.Exec(context.TODO(), r.server.AdvertiseIP, args, w))
func (r *serverRemote) RunStream(ctx context.Context, w io.Writer, args ...string) error {
return trace.Wrap(r.remote.Exec(ctx, r.server.AdvertiseIP, args, w))
}

type serverRemote struct {
Expand Down
3 changes: 2 additions & 1 deletion lib/ops/opsservice/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ func (s *site) collectDebugInfo(reportWriter report.Writer, runner *serverRunner
defer w.Close()

err = runner.RunStream(w, s.gravityCommand("system", "report",
fmt.Sprintf("--filter=%v", constants.ReportFilterSystem), "--compressed")...)
fmt.Sprintf("--filter=%v", constants.ReportFilterSystem),
"--compressed")...)
if err != nil {
return trace.Wrap(err, "failed to collect diagnostics")
}
Expand Down
5 changes: 3 additions & 2 deletions lib/report/bash.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,21 @@ limitations under the License.
package report

import (
"context"
"fmt"
"io"
"os"
"path/filepath"

"github.com/gravitational/gravity/lib/utils"

log "github.com/sirupsen/logrus"
"github.com/gravitational/trace"
log "github.com/sirupsen/logrus"
)

// Collect fetches shell histories for all users from passwd.
// Collect implements Collector
func (r bashHistoryCollector) Collect(reportWriter Writer, runner utils.CommandRunner) error {
func (r bashHistoryCollector) Collect(ctx context.Context, reportWriter Writer, runner utils.CommandRunner) error {
log.Debug("collecting bash histories")
passwd, err := utils.GetPasswd()
if err != nil {
Expand Down
13 changes: 7 additions & 6 deletions lib/report/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package report

import (
"context"
"fmt"
"io"

Expand All @@ -30,7 +31,7 @@ import (

// KubernetesInfo returns a list of collectors to fetch kubernetes-related
// diagnostics.
func KubernetesInfo(runner utils.CommandRunner) Collectors {
func KubernetesInfo(ctx context.Context, runner utils.CommandRunner) Collectors {
runner = planetContextRunner{runner}
// general kubernetes info
commands := Collectors{
Expand All @@ -44,7 +45,7 @@ func KubernetesInfo(runner utils.CommandRunner) Collectors {
"get", "events", "--all-namespaces"))...),
}

namespaces, err := kubectl.GetNamespaces(runner)
namespaces, err := kubectl.GetNamespaces(ctx, runner)
if err != nil || len(namespaces) == 0 {
namespaces = defaults.UsedNamespaces
}
Expand All @@ -58,13 +59,13 @@ func KubernetesInfo(runner utils.CommandRunner) Collectors {
}

// fetch pod logs
pods, err := kubectl.GetPods(namespace, runner)
pods, err := kubectl.GetPods(ctx, namespace, runner)
if err != nil {
log.Errorf("failed to query pods in namespace %v: %v", namespace, trace.DebugReport(err))
continue
}
for _, pod := range pods {
containers, err := kubectl.GetPodContainers(namespace, pod, runner)
containers, err := kubectl.GetPodContainers(ctx, namespace, pod, runner)
if err != nil {
log.Errorf("failed to query container in pod %v in namespace %v: %v",
pod, namespace, trace.DebugReport(err))
Expand All @@ -85,8 +86,8 @@ func KubernetesInfo(runner utils.CommandRunner) Collectors {

// RunStream executes the command specified with args in the context of the planet container
// Implements utils.CommandRunner
func (r planetContextRunner) RunStream(w io.Writer, args ...string) error {
return r.CommandRunner.RunStream(w, utils.PlanetCommandSlice(args)...)
func (r planetContextRunner) RunStream(ctx context.Context, w io.Writer, args ...string) error {
return r.CommandRunner.RunStream(ctx, w, utils.PlanetCommandSlice(args)...)
}

type planetContextRunner struct {
Expand Down
15 changes: 8 additions & 7 deletions lib/report/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package report

import (
"context"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -86,17 +87,17 @@ type Writer func(name string) (io.WriteCloser, error)
type Collector interface {
// Collect collects diagnostics using CommandRunner and serializes
// them using specified Writer
Collect(Writer, utils.CommandRunner) error
Collect(context.Context, Writer, utils.CommandRunner) error
}

// Collectors is a list of Collectors
type Collectors []Collector

// Collect implements Collector for a list of Collectors
func (r Collectors) Collect(reportWriter Writer, runner utils.CommandRunner) error {
func (r Collectors) Collect(ctx context.Context, reportWriter Writer, runner utils.CommandRunner) error {
var errors []error
for _, collector := range r {
err := collector.Collect(reportWriter, runner)
err := collector.Collect(ctx, reportWriter, runner)
if err != nil {
errors = append(errors, err)
}
Expand Down Expand Up @@ -126,7 +127,7 @@ type Command struct {
}

// Collect implements Collector for this Command
func (r Command) Collect(reportWriter Writer, runner utils.CommandRunner) error {
func (r Command) Collect(ctx context.Context, reportWriter Writer, runner utils.CommandRunner) error {
w, err := reportWriter(r.name)
if err != nil {
return trace.Wrap(err)
Expand All @@ -135,7 +136,7 @@ func (r Command) Collect(reportWriter Writer, runner utils.CommandRunner) error

args := []string{r.cmd}
args = append(args, r.args...)
return runner.RunStream(w, args...)
return runner.RunStream(ctx, w, args...)
}

// Script creates a new script collector
Expand All @@ -144,15 +145,15 @@ func Script(name, script string) ScriptCollector {
}

// Collect implements Collector using a bash script
func (r ScriptCollector) Collect(reportWriter Writer, runner utils.CommandRunner) error {
func (r ScriptCollector) Collect(ctx context.Context, reportWriter Writer, runner utils.CommandRunner) error {
args := []string{"/bin/bash", "-c", r.script}
w, err := reportWriter(r.name)
if err != nil {
return trace.Wrap(err)
}
defer w.Close()

return runner.RunStream(w, args...)
return runner.RunStream(ctx, w, args...)
}

// ScriptCollector is a convenience Collector to execute bash scripts
Expand Down
2 changes: 1 addition & 1 deletion lib/system/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
// GetFilesystem detects the filesystem on device specified with path
func GetFilesystem(ctx context.Context, path string, runner utils.CommandRunner) (filesystem string, err error) {
var out bytes.Buffer
err = runner.RunStream(&out, "lsblk", "--noheading", "--output", "FSTYPE", path)
err = runner.RunStream(ctx, &out, "lsblk", "--noheading", "--output", "FSTYPE", path)
if err != nil {
return "", trace.Wrap(err, "failed to determine filesystem type on %v", path)
}
Expand Down
4 changes: 2 additions & 2 deletions lib/system/fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ xfs
}
}

func (r testRunner) RunStream(w io.Writer, args ...string) error {
func (r testRunner) RunStream(ctx context.Context, w io.Writer, args ...string) error {
fmt.Fprintf(w, string(r))
return nil
}

type testRunner string

func (r failingRunner) RunStream(w io.Writer, args ...string) error {
func (r failingRunner) RunStream(context.Context, io.Writer, ...string) error {
return r.error
}

Expand Down
3 changes: 1 addition & 2 deletions lib/system/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ func formatDevice(path string) (filesystem string, err error) {
{"ext4", []string{"mkfs.ext4", "-F"}},
}

runner := utils.NewRunner(nil)
filesystem, err = system.GetFilesystem(context.TODO(), path, runner)
filesystem, err = system.GetFilesystem(context.TODO(), path, utils.Runner)
if err != nil {
return "", trace.Wrap(err)
}
Expand Down
58 changes: 21 additions & 37 deletions lib/utils/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,61 +87,45 @@ func RunInPlanetCommand(ctx context.Context, log log.FieldLogger, args ...string

// RunCommand executes the command specified with args
func RunCommand(ctx context.Context, log log.FieldLogger, args ...string) ([]byte, error) {
r := NewRunnerWithContext(ctx, log)
var out bytes.Buffer
if err := r.RunStream(&out, args...); err != nil {
if err := RunStream(ctx, &out, args...); err != nil {
return out.Bytes(), trace.Wrap(err)
}
return out.Bytes(), nil
}

// NewRunnerWithContext creates a new CommandRunner using the specified context
func NewRunnerWithContext(ctx context.Context, log log.FieldLogger, setters ...CommandOptionSetter) *runner {
runner := NewRunner(log, setters...)
runner.ctx = ctx
return runner
}

// NewRunner creates a new CommandRunner using ExecX APIs
func NewRunner(logger log.FieldLogger, setters ...CommandOptionSetter) *runner {
if logger == nil {
logger = log.StandardLogger()
}
return &runner{
setters: setters,
FieldLogger: logger,
}
}
// Runner is the default CommandRunner
var Runner CommandRunner = CommandRunnerFunc(RunStream)

// CommandRunner abstracts command execution.
// w specifies the sink for command's output.
// The command is given with args
type CommandRunner interface {
// RunStream executes a command specified with args and streams
// output to w
RunStream(w io.Writer, args ...string) error
// output to w using ctx for cancellation
RunStream(ctx context.Context, w io.Writer, args ...string) error
}

// RunStream executes a command specified with args and streams
// output to w
// RunStream invokes r with the specified arguments.
// Implements CommandRunner
func (r *runner) RunStream(w io.Writer, args ...string) error {
func (r CommandRunnerFunc) RunStream(ctx context.Context, w io.Writer, args ...string) error {
return r(ctx, w, args...)
}

// CommandRunnerFunc is the wrapper that allows standalone functions
// to act as CommandRunners
type CommandRunnerFunc func(ctx context.Context, w io.Writer, args ...string) error

// RunStream executes a command specified with args and streams output to w
func RunStream(ctx context.Context, w io.Writer, args ...string) error {
name := args[0]
args = args[1:]

var cmd *exec.Cmd
if r.ctx != nil {
cmd = exec.CommandContext(r.ctx, name, args...)
} else {
cmd = exec.Command(name, args...)
cmd := exec.CommandContext(ctx, name, args...)
cmd.Stdout = w
if err := cmd.Start(); err != nil {
return trace.Wrap(err)
}
return ExecL(cmd, w, r.FieldLogger, r.setters...)
}

type runner struct {
setters []CommandOptionSetter
ctx context.Context
log.FieldLogger
return trace.Wrap(cmd.Wait())
}

// ExecL executes the specified cmd and logs the command line to the specified entry
Expand Down
23 changes: 6 additions & 17 deletions lib/utils/kubectl/kubectl.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"os/exec"
"path/filepath"
Expand Down Expand Up @@ -83,22 +82,12 @@ func RunCommand(cmd *Cmd, options ...optionSetter) ([]byte, error) {
return exec.Command(cmd.command, cmd.args...).CombinedOutput()
}

// Stream runs a kubectl command specified with args and streams stdout and stderr to the provided io.Writers
func Stream(ctx context.Context, out io.Writer, err io.Writer, args ...string) error {
cmd := exec.CommandContext(ctx, defaults.KubectlBin, args...)
log.Debugf("executing %v", cmd)
cmd.Stdout = out
cmd.Stderr = err

return cmd.Run()
}

// GetNamespaces fetches the names of all namespaces
func GetNamespaces(runner utils.CommandRunner) ([]string, error) {
func GetNamespaces(ctx context.Context, runner utils.CommandRunner) ([]string, error) {
cmd := Command("get", "namespaces", "--output", "jsonpath={.items..metadata.name}")
var buf bytes.Buffer

err := runner.RunStream(&buf, cmd.Args()...)
err := runner.RunStream(ctx, &buf, cmd.Args()...)
if err != nil {
return nil, trace.Wrap(err)
}
Expand All @@ -109,13 +98,13 @@ func GetNamespaces(runner utils.CommandRunner) ([]string, error) {
}

// GetPods fetches the names of the pods from the given namespace
func GetPods(namespace string, runner utils.CommandRunner) ([]string, error) {
func GetPods(ctx context.Context, namespace string, runner utils.CommandRunner) ([]string, error) {
cmd := Command("get", "pods",
"--namespace", namespace,
"--output", "jsonpath={.items..metadata.name}")
var buf bytes.Buffer

err := runner.RunStream(&buf, cmd.Args()...)
err := runner.RunStream(ctx, &buf, cmd.Args()...)
if err != nil {
return nil, trace.Wrap(err)
}
Expand All @@ -132,13 +121,13 @@ func GetPods(namespace string, runner utils.CommandRunner) ([]string, error) {

// GetPodContainers fetches the names of the containers from the specified pod
// in the given namespace
func GetPodContainers(namespace, pod string, runner utils.CommandRunner) ([]string, error) {
func GetPodContainers(ctx context.Context, namespace, pod string, runner utils.CommandRunner) ([]string, error) {
cmd := Command("get", "pod", pod,
"--namespace", namespace,
"--output", "jsonpath={.status.containerStatuses..name}")
var buf bytes.Buffer

err := runner.RunStream(&buf, cmd.Args()...)
err := runner.RunStream(ctx, &buf, cmd.Args()...)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down
Loading

0 comments on commit 0146844

Please sign in to comment.