diff --git a/agentlocal/agent_local.go b/agentlocal/agent_local.go index 2da5b4250..a7f2c3fbf 100644 --- a/agentlocal/agent_local.go +++ b/agentlocal/agent_local.go @@ -16,10 +16,13 @@ package agentlocal import ( + "archive/zip" "bytes" "context" _ "expvar" // register /debug/vars + "fmt" "html/template" + "io" "log" "net" "net/http" @@ -47,6 +50,7 @@ import ( "google.golang.org/protobuf/types/known/durationpb" "github.com/percona/pmm-agent/config" + "github.com/percona/pmm-agent/storelogs" ) const ( @@ -61,6 +65,7 @@ type Server struct { configFilepath string l *logrus.Entry + ringLogs *storelogs.LogsStore reload chan struct{} reloadCloseOnce sync.Once @@ -70,14 +75,18 @@ type Server struct { // NewServer creates new server. // // Caller should call Run. -func NewServer(cfg *config.Config, supervisor supervisor, client client, configFilepath string) *Server { +func NewServer(cfg *config.Config, supervisor supervisor, client client, configFilepath string, ringLog *storelogs.LogsStore) *Server { + logger := logrus.New() + logger.Out = io.MultiWriter(os.Stderr, ringLog) + return &Server{ cfg: cfg, supervisor: supervisor, client: client, configFilepath: configFilepath, - l: logrus.WithField("component", "local-server"), + l: logger.WithField("component", "local-server"), reload: make(chan struct{}), + ringLogs: ringLog, } } @@ -289,6 +298,7 @@ func (s *Server) runJSONServer(ctx context.Context, grpcAddress string) { mux.Handle("/debug/", http.DefaultServeMux) mux.Handle("/debug", debugPageHandler) mux.Handle("/", proxyMux) + mux.HandleFunc("/logs.zip", s.Zip) server := &http.Server{ Addr: address, @@ -314,7 +324,53 @@ func (s *Server) runJSONServer(ctx context.Context, grpcAddress string) { _ = server.Close() // call Close() in all cases } +// addData add data to zip file +func addData(zipW *zip.Writer, name string, data []byte) { + f, err := zipW.Create(name) + if err != nil { + log.Fatal(err) + } + _, err = f.Write(data) + if err != nil { + log.Fatal(err) + } +} + // check interfaces var ( _ agentlocalpb.AgentLocalServer = (*Server)(nil) ) + +func (s *Server) Zip(w http.ResponseWriter, r *http.Request) { + buf := &bytes.Buffer{} + writer := zip.NewWriter(buf) + b := &bytes.Buffer{} + for _, serverLog := range s.ringLogs.GetLogs() { + _, err := b.WriteString(serverLog) + if err != nil { + log.Fatal(err) + } + } + addData(writer, "pmm-agent.txt", b.Bytes()) + + for id, logs := range s.supervisor.AgentsLogs() { + b := &bytes.Buffer{} + for _, l := range logs { + _, err := b.WriteString(l + "\n") + if err != nil { + log.Fatal(err) + } + } + addData(writer, fmt.Sprintf("%s.txt", id), b.Bytes()) + } + err := writer.Close() + if err != nil { + log.Fatal(err) + } + w.Header().Set("Content-Type", "application/zip") + w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=\"%s.zip\"", "logs")) + _, err = w.Write(buf.Bytes()) + if err != nil { + log.Fatal(err) + } +} diff --git a/agentlocal/agent_local_test.go b/agentlocal/agent_local_test.go index db99612c3..6f7051ad5 100644 --- a/agentlocal/agent_local_test.go +++ b/agentlocal/agent_local_test.go @@ -16,7 +16,14 @@ package agentlocal import ( + "archive/zip" + "bufio" + "bytes" "context" + "fmt" + "io/ioutil" + "log" + "net/http/httptest" "testing" "time" @@ -28,6 +35,7 @@ import ( "google.golang.org/protobuf/types/known/durationpb" "github.com/percona/pmm-agent/config" + "github.com/percona/pmm-agent/storelogs" ) func TestServerStatus(t *testing.T) { @@ -61,7 +69,8 @@ func TestServerStatus(t *testing.T) { agentInfo, supervisor, client, cfg := setup(t) defer supervisor.AssertExpectations(t) defer client.AssertExpectations(t) - s := NewServer(cfg, supervisor, client, "/some/dir/pmm-agent.yaml") + ringLog := storelogs.New(500) + s := NewServer(cfg, supervisor, client, "/some/dir/pmm-agent.yaml", ringLog) // without network info actual, err := s.Status(context.Background(), &agentlocalpb.StatusRequest{GetNetworkInfo: false}) @@ -87,7 +96,8 @@ func TestServerStatus(t *testing.T) { client.On("GetNetworkInformation").Return(latency, clockDrift, nil) defer supervisor.AssertExpectations(t) defer client.AssertExpectations(t) - s := NewServer(cfg, supervisor, client, "/some/dir/pmm-agent.yaml") + ringLog := storelogs.New(500) + s := NewServer(cfg, supervisor, client, "/some/dir/pmm-agent.yaml", ringLog) // with network info actual, err := s.Status(context.Background(), &agentlocalpb.StatusRequest{GetNetworkInfo: true}) @@ -108,3 +118,128 @@ func TestServerStatus(t *testing.T) { assert.Equal(t, expected, actual) }) } + +func TestGetZipFile(t *testing.T) { + setup := func(t *testing.T) ([]*agentlocalpb.AgentInfo, *mockSupervisor, *mockClient, *config.Config) { + agentInfo := []*agentlocalpb.AgentInfo{{ + AgentId: "/agent_id/00000000-0000-4000-8000-000000000002", + AgentType: inventorypb.AgentType_NODE_EXPORTER, + Status: inventorypb.AgentStatus_RUNNING, + }} + var supervisor mockSupervisor + supervisor.Test(t) + supervisor.On("AgentsList").Return(agentInfo) + agentLogs := make(map[string][]string) + agentLogs[inventorypb.AgentType_NODE_EXPORTER.String()] = []string{ + "logs1", + "logs2", + } + supervisor.On("AgentsLogs").Return(agentLogs) + var client mockClient + client.Test(t) + client.On("GetServerConnectMetadata").Return(&agentpb.ServerConnectMetadata{ + AgentRunsOnNodeID: "/node_id/00000000-0000-4000-8000-000000000003", + ServerVersion: "2.0.0-dev", + }) + cfg := &config.Config{ + ID: "/agent_id/00000000-0000-4000-8000-000000000001", + Server: config.Server{ + Address: "127.0.0.1:8443", + Username: "username", + Password: "password", + }, + } + return agentInfo, &supervisor, &client, cfg + } + + t.Run("test zip file", func(t *testing.T) { + _, supervisor, client, cfg := setup(t) + defer supervisor.AssertExpectations(t) + defer client.AssertExpectations(t) + ringLog := storelogs.New(10) + s := NewServer(cfg, supervisor, client, "/some/dir/pmm-agent.yaml", ringLog) + _, err := s.Status(context.Background(), &agentlocalpb.StatusRequest{GetNetworkInfo: false}) + require.NoError(t, err) + + rec := httptest.NewRecorder() + req := httptest.NewRequest("GET", "/logs.zip", nil) + s.Zip(rec, req) + existFile, err := ioutil.ReadAll(rec.Body) + require.NoError(t, err) + + expectedFile, err := generateTestZip(s) + require.NoError(t, err) + bufExp := bytes.NewReader(expectedFile) + bufExs := bytes.NewReader(existFile) + + zipExp, err := zip.NewReader(bufExp, bufExp.Size()) + require.NoError(t, err) + zipExs, err := zip.NewReader(bufExp, bufExs.Size()) + require.NoError(t, err) + + for i, ex := range zipExp.File { + assert.Equal(t, ex.Name, zipExs.File[i].Name) + deepCompare(t, ex, zipExs.File[i]) + } + require.NoError(t, err) + assert.Equal(t, expectedFile, existFile) + }) +} + +// generateTestZip generate test zip file. +func generateTestZip(s *Server) ([]byte, error) { + agentLogs := make(map[string][]string) + agentLogs[inventorypb.AgentType_NODE_EXPORTER.String()] = []string{ + "logs1", + "logs2", + } + buf := &bytes.Buffer{} + writer := zip.NewWriter(buf) + b := &bytes.Buffer{} + for _, serverLog := range s.ringLogs.GetLogs() { + _, err := b.WriteString(serverLog) + if err != nil { + return nil, err + } + } + addData(writer, "pmm-agent.txt", b.Bytes()) + + for id, logs := range agentLogs { + b := &bytes.Buffer{} + for _, l := range logs { + _, err := b.WriteString(l + "\n") + if err != nil { + return nil, err + } + } + addData(writer, fmt.Sprintf("%s.txt", id), b.Bytes()) + } + err := writer.Close() + if err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +// deepCompare compare two zip files. +func deepCompare(t *testing.T, file1, file2 *zip.File) bool { + sf, err := file1.Open() + if err != nil { + log.Fatal(err) + } + + df, err := file2.Open() + if err != nil { + log.Fatal(err) + } + + sscan := bufio.NewScanner(sf) + dscan := bufio.NewScanner(df) + + for sscan.Scan() { + dscan.Scan() + assert.Equal(t, sscan.Bytes(), dscan.Bytes()) + } + + return false +} diff --git a/agentlocal/deps.go b/agentlocal/deps.go index 2ff667d19..a7031b848 100644 --- a/agentlocal/deps.go +++ b/agentlocal/deps.go @@ -39,4 +39,5 @@ type client interface { // We use it instead of real type for testing and to avoid dependency cycle. type supervisor interface { AgentsList() []*agentlocalpb.AgentInfo + AgentsLogs() map[string][]string } diff --git a/agentlocal/mock_supervisor_test.go b/agentlocal/mock_supervisor_test.go index 720b4c9bc..342574e6f 100644 --- a/agentlocal/mock_supervisor_test.go +++ b/agentlocal/mock_supervisor_test.go @@ -27,3 +27,19 @@ func (_m *mockSupervisor) AgentsList() []*agentlocalpb.AgentInfo { return r0 } + +// AgentsLogs provides a mock function with given fields: +func (_m *mockSupervisor) AgentsLogs() map[string][]string { + ret := _m.Called() + + var r0 map[string][]string + if rf, ok := ret.Get(0).(func() map[string][]string); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[string][]string) + } + } + + return r0 +} diff --git a/agents/process/process_child.go b/agents/process/process_child.go index 6c61e4199..b597854ee 100644 --- a/agents/process/process_child.go +++ b/agents/process/process_child.go @@ -25,7 +25,7 @@ import ( "context" "flag" "fmt" - "io" + "io/ioutil" "time" "github.com/percona/pmm/api/inventorypb" @@ -37,9 +37,8 @@ import ( func main() { flag.Parse() logger := logrus.New() - logger.SetOutput(io.Discard) + logger.SetOutput(ioutil.Discard) l := logrus.NewEntry(logger) - p := process.New(&process.Params{Path: "sleep", Args: []string{"100500"}}, nil, l) go p.Run(context.Background()) diff --git a/agents/process/process_test.go b/agents/process/process_test.go index 8d06e6722..7cb36f3a3 100644 --- a/agents/process/process_test.go +++ b/agents/process/process_test.go @@ -17,6 +17,7 @@ package process import ( "context" + "io/ioutil" "os" "os/exec" "runtime" @@ -123,7 +124,7 @@ func TestProcess(t *testing.T) { }) t.Run("Killed", func(t *testing.T) { - f, err := os.CreateTemp("", "pmm-agent-process-test-noterm") + f, err := ioutil.TempFile("", "pmm-agent-process-test-noterm") require.NoError(t, err) require.NoError(t, f.Close()) defer func() { @@ -146,7 +147,7 @@ func TestProcess(t *testing.T) { t.Skip("Pdeathsig is implemented only on Linux") } - f, err := os.CreateTemp("", "pmm-agent-process-test-child") + f, err := ioutil.TempFile("", "pmm-agent-process-test-child") require.NoError(t, err) require.NoError(t, f.Close()) defer func() { diff --git a/agents/supervisor/supervisor.go b/agents/supervisor/supervisor.go index 16f29c4db..e1702065a 100644 --- a/agents/supervisor/supervisor.go +++ b/agents/supervisor/supervisor.go @@ -19,6 +19,8 @@ package supervisor import ( "context" "fmt" + "io" + "os" "path/filepath" "regexp" "runtime/pprof" @@ -43,6 +45,7 @@ import ( "github.com/percona/pmm-agent/agents/postgres/pgstatstatements" "github.com/percona/pmm-agent/agents/process" "github.com/percona/pmm-agent/config" + "github.com/percona/pmm-agent/storelogs" "github.com/percona/pmm-agent/utils/templates" ) @@ -71,6 +74,7 @@ type agentProcessInfo struct { requestedState *agentpb.SetStateRequest_AgentProcess listenPort uint16 processExecPath string + logs *storelogs.LogsStore // store logs } // builtinAgentInfo describes built-in Agent. @@ -80,6 +84,7 @@ type builtinAgentInfo struct { requestedState *agentpb.SetStateRequest_BuiltinAgent describe func(chan<- *prometheus.Desc) // agent's func to describe Prometheus metrics collect func(chan<- prometheus.Metric) // agent's func to provide Prometheus metrics + logs *storelogs.LogsStore // store logs } // NewSupervisor creates new Supervisor object. @@ -143,6 +148,26 @@ func (s *Supervisor) AgentsList() []*agentlocalpb.AgentInfo { return res } +// AgentsLogs returns logs for all Agents managed by this supervisor. +func (s *Supervisor) AgentsLogs() map[string][]string { + s.rw.RLock() + defer s.rw.RUnlock() + s.arw.RLock() + defer s.arw.RUnlock() + res := make(map[string][]string) + + for id, agent := range s.agentProcesses { + newID := strings.ReplaceAll(id, "/agent_id/", "") + res[fmt.Sprintf("%s %s", agent.requestedState.Type.String(), newID)] = agent.logs.GetLogs() + } + + for id, agent := range s.builtinAgents { + newID := strings.ReplaceAll(id, "/agent_id/", "") + res[fmt.Sprintf("%s %s", agent.requestedState.Type.String(), newID)] = agent.logs.GetLogs() + } + return res +} + // Changes returns channel with Agent's state changes. func (s *Supervisor) Changes() <-chan *agentpb.StateChangedRequest { return s.changes @@ -329,6 +354,7 @@ func filter(existing, new map[string]agentpb.AgentParams) (toStart, toRestart, t const ( type_TEST_SLEEP inventorypb.AgentType = 998 // process type_TEST_NOOP inventorypb.AgentType = 999 // built-in + maxAgentLogs = 200 // max number logs can store each agent ) // startProcess starts Agent's process. @@ -341,13 +367,8 @@ func (s *Supervisor) startProcess(agentID string, agentProcess *agentpb.SetState ctx, cancel := context.WithCancel(s.ctx) agentType := strings.ToLower(agentProcess.Type.String()) - l := logrus.WithFields(logrus.Fields{ - "component": "agent-process", - "agentID": agentID, - "type": agentType, - }) + ringLog, l := s.newLogger("agent-process", agentID, agentType) l.Debugf("Starting: %s.", processParams) - process := process.New(processParams, agentProcess.RedactWords, l) go pprof.Do(ctx, pprof.Labels("agentID", agentID, "type", agentType), process.Run) @@ -372,20 +393,30 @@ func (s *Supervisor) startProcess(agentID string, agentProcess *agentpb.SetState requestedState: proto.Clone(agentProcess).(*agentpb.SetStateRequest_AgentProcess), listenPort: port, processExecPath: processParams.Path, + logs: ringLog, } return nil } +func (s *Supervisor) newLogger(component string, agentID string, agentType string) (*storelogs.LogsStore, *logrus.Entry) { + ringLog := storelogs.New(maxAgentLogs) + logger := logrus.New() + logger.SetFormatter(logrus.StandardLogger().Formatter) + logger.Out = io.MultiWriter(os.Stderr, ringLog) + l := logger.WithFields(logrus.Fields{ + "component": component, + "agentID": agentID, + "type": agentType, + }) + return ringLog, l +} + // startBuiltin starts built-in Agent. // Must be called with s.rw held for writing. func (s *Supervisor) startBuiltin(agentID string, builtinAgent *agentpb.SetStateRequest_BuiltinAgent) error { ctx, cancel := context.WithCancel(s.ctx) agentType := strings.ToLower(builtinAgent.Type.String()) - l := logrus.WithFields(logrus.Fields{ - "component": "agent-builtin", - "agentID": agentID, - "type": agentType, - }) + ringLog, l := s.newLogger("agent-process", agentID, agentType) done := make(chan struct{}) var agent agents.BuiltinAgent @@ -490,6 +521,7 @@ func (s *Supervisor) startBuiltin(agentID string, builtinAgent *agentpb.SetState requestedState: proto.Clone(builtinAgent).(*agentpb.SetStateRequest_BuiltinAgent), describe: agent.Describe, collect: agent.Collect, + logs: ringLog, } return nil } diff --git a/commands/run.go b/commands/run.go index 005abd799..c30b2ef38 100644 --- a/commands/run.go +++ b/commands/run.go @@ -28,9 +28,12 @@ import ( "github.com/percona/pmm-agent/client" "github.com/percona/pmm-agent/config" "github.com/percona/pmm-agent/connectionchecker" + "github.com/percona/pmm-agent/storelogs" "github.com/percona/pmm-agent/versioner" ) +const maxServerLogs = 500 // max number logs can store server + // Run implements `pmm-agent run` default command. func Run() { l := logrus.WithField("component", "main") @@ -40,6 +43,7 @@ func Run() { // handle termination signals signals := make(chan os.Signal, 1) signal.Notify(signals, unix.SIGTERM, unix.SIGINT) + ringLog := storelogs.New(maxServerLogs) go func() { s := <-signals signal.Stop(signals) @@ -55,7 +59,7 @@ func Run() { config.ConfigureLogger(cfg) l.Debugf("Loaded configuration: %+v", cfg) - run(ctx, cfg, configFilepath) + run(ctx, cfg, configFilepath, ringLog) if ctx.Err() != nil { return @@ -65,7 +69,7 @@ func Run() { // run runs all pmm-agent components with given configuration until ctx is cancellled. // See documentation for NewXXX, Run, and Done -func run(ctx context.Context, cfg *config.Config, configFilepath string) { +func run(ctx context.Context, cfg *config.Config, configFilepath string, ringLog *storelogs.LogsStore) { var cancel context.CancelFunc ctx, cancel = context.WithCancel(ctx) @@ -77,7 +81,8 @@ func run(ctx context.Context, cfg *config.Config, configFilepath string) { connectionChecker := connectionchecker.New(&cfg.Paths) v := versioner.New(&versioner.RealExecFunctions{}) client := client.New(cfg, supervisor, connectionChecker, v) - localServer := agentlocal.NewServer(cfg, supervisor, client, configFilepath) + + localServer := agentlocal.NewServer(cfg, supervisor, client, configFilepath, ringLog) go func() { _ = client.Run(ctx) diff --git a/main.go b/main.go index 9d6193045..80898cd3c 100644 --- a/main.go +++ b/main.go @@ -33,7 +33,6 @@ func main() { if version.Version == "" { panic("pmm-agent version is not set during build.") } - // we don't have configuration options for formatter, so set it once there logrus.SetFormatter(&logrus.TextFormatter{ // Enable multiline-friendly formatter in both development (with terminal) and production (without terminal): @@ -63,7 +62,6 @@ func main() { kingpin.HelpFlag = app.HelpFlag kingpin.HelpCommand = app.HelpCommand cmd := kingpin.Parse() - switch cmd { case "run": // delay logger configuration until we read configuration file diff --git a/storelogs/storelogs.go b/storelogs/storelogs.go new file mode 100644 index 000000000..076e81b22 --- /dev/null +++ b/storelogs/storelogs.go @@ -0,0 +1,63 @@ +// pmm-agent +// Copyright 2019 Percona LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package storelogs help to store logs +package storelogs + +import ( + "container/ring" + "fmt" + "strings" + "sync" +) + +// LogsStore implement ring save logs. +type LogsStore struct { + log *ring.Ring + m sync.RWMutex +} + +// New creates LogsStore. +func New(count int) *LogsStore { + return &LogsStore{ + log: ring.New(count), + m: sync.RWMutex{}, + } +} + +// Write writes log for store. +func (l *LogsStore) Write(b []byte) (n int, err error) { + l.m.Lock() + l.log.Value = string(b) + l.log = l.log.Next() + l.m.Unlock() + return len(b), nil +} + +// GetLogs return all logs. +func (l *LogsStore) GetLogs() (logs []string) { + if l != nil { + l.m.Lock() + l.log.Do(func(p interface{}) { + log := fmt.Sprint(p) + replacer := strings.NewReplacer("\u001B[36m", "", "\u001B[0m", "", "\u001B[33", "", "\u001B[31m", "", " ", " ") + if p != nil { + logs = append(logs, replacer.Replace(log)) + } + }) + l.m.Unlock() + } + return logs +}