diff --git a/Makefile b/Makefile index 7ccaae5..d5dc26e 100644 --- a/Makefile +++ b/Makefile @@ -64,8 +64,8 @@ windows-sign: scp ${SCP_WIN_BUILD_MACHINE_OPTIONS} ${PROJECT_DIR}/dist/cagent_windows_amd64/cagent.exe ${WIN_BUILD_MACHINE_AUTH}:${WIN_BUILD_MACHINE_CI_DIR}/dist/cagent_64.exe scp ${SCP_WIN_BUILD_MACHINE_OPTIONS} ${PROJECT_DIR}/dist/csender_windows_386/csender.exe ${WIN_BUILD_MACHINE_AUTH}:${WIN_BUILD_MACHINE_CI_DIR}/dist/csender_386.exe scp ${SCP_WIN_BUILD_MACHINE_OPTIONS} ${PROJECT_DIR}/dist/csender_windows_amd64/csender.exe ${WIN_BUILD_MACHINE_AUTH}:${WIN_BUILD_MACHINE_CI_DIR}/dist/csender_64.exe - scp ${SCP_WIN_BUILD_MACHINE_OPTIONS} ${PROJECT_DIR}/dist/jobmon_windows_386/jobmon.exe ${WIN_BUILD_MACHINE_AUTH}:${WIN_BUILD_MACHINE_CI_DIR}/dist/jobmon_386.exe - scp ${SCP_WIN_BUILD_MACHINE_OPTIONS} ${PROJECT_DIR}/dist/jobmon_windows_amd64/jobmon.exe ${WIN_BUILD_MACHINE_AUTH}:${WIN_BUILD_MACHINE_CI_DIR}/dist/jobmon_64.exe + scp ${SCP_WIN_BUILD_MACHINE_OPTIONS} ${PROJECT_DIR}/dist/jobmon_windows_386/jobmon.exe ${WIN_BUILD_MACHINE_AUTH}:${WIN_BUILD_MACHINE_CI_DIR}/dist/jobmon_386.exe + scp ${SCP_WIN_BUILD_MACHINE_OPTIONS} ${PROJECT_DIR}/dist/jobmon_windows_amd64/jobmon.exe ${WIN_BUILD_MACHINE_AUTH}:${WIN_BUILD_MACHINE_CI_DIR}/dist/jobmon_64.exe # Copy other build dependencies scp ${SCP_WIN_BUILD_MACHINE_OPTIONS} ${PROJECT_DIR}/build-win.bat ${WIN_BUILD_MACHINE_AUTH}:${WIN_BUILD_MACHINE_CI_DIR}/build-win.bat ssh ${SSH_WIN_BUILD_MACHINE_OPTIONS} ${WIN_BUILD_MACHINE_AUTH} chmod +x ${WIN_BUILD_MACHINE_CI_DIR}/build-win.bat diff --git a/cmd/jobmon/main.go b/cmd/jobmon/main.go index ade43ec..d9263df 100644 --- a/cmd/jobmon/main.go +++ b/cmd/jobmon/main.go @@ -37,6 +37,7 @@ func main() { cfgPathPtr := flag.String("c", cagent.DefaultCfgPath, "config file path") jobIDPtr := flag.String("id", "", fmt.Sprintf("id of the job, required, maximum %d characters", maxJobIDLength)) + forceRunPtr := flag.Bool("f", false, "Force run of a job even if the job with the same ID is already running or its termination wasn't handled successfully.") severityPtr := flag.String("s", "", "alert|warning|none process failed job with this severity. Overwrites the default severity of cagent.conf. Severity 'none' suppresses all messages.") nextRunInPtr := flag.Duration("nr", 0, "h|m indicates when the job should run for the next time. Allows triggering alters for not run jobs. The shortest interval is 5 minutes.") maxExecutionTimePtr := flag.Duration("me", 0, "h|m|s or just for number of seconds. Max execution time for job.") @@ -68,7 +69,7 @@ func main() { ) jobMonRunner := jobmon.NewRunner(cfg.JobMonitoring.SpoolDirPath, jobConfig, logger) - err = jobMonRunner.RunJob(sigChan) + err = jobMonRunner.RunJob(sigChan, *forceRunPtr) if err != nil { logger.Fatalf("Could not start a job: %s", err.Error()) return diff --git a/config.go b/config.go index 8e8eb34..0ab9339 100644 --- a/config.go +++ b/config.go @@ -132,7 +132,7 @@ type JobMonitoringConfig struct { SpoolDirPath string `toml:"spool_dir" comment:"Path to spool dir"` RecordStdErr bool `toml:"record_stderr" comment:"Record the last 4 KB of the error output. Default: true"` RecordStdOut bool `toml:"record_stdout" comment:"Record the last 4 KB of the standard output. Default: false"` - Severity jobmon.Severity `toml:"severity" comment:"Failed jobs will be processed as alerts. Possible values alert or warning. Default: alert"` + Severity jobmon.Severity `toml:"severity" comment:"Failed jobs will be processed as alerts. Possible values alert, warning or none. Default: alert"` } func (j *JobMonitoringConfig) Validate() error { diff --git a/example.config.toml b/example.config.toml index 9f961a8..03cb704 100644 --- a/example.config.toml +++ b/example.config.toml @@ -88,4 +88,4 @@ temperature_monitoring = true # default true spool_dir = '/var/lib/cagent/jobmon' # Linux record_stderr = true # Record the last 4 KB of the error output. Default: true record_stdout = false # Record the last 4 KB of the standard output. Default: false - severity = "alert" # Failed jobs will be processed as alerts. Possible values alert or warning. Default: alert + severity = "alert" # Failed jobs will be processed as alerts. Possible values alert, warning or none. Default: alert diff --git a/handler.go b/handler.go index cffa1ee..4e0602c 100644 --- a/handler.go +++ b/handler.go @@ -13,12 +13,34 @@ import ( "github.com/cloudradar-monitoring/cagent/pkg/common" "github.com/cloudradar-monitoring/cagent/pkg/hwinfo" + "github.com/cloudradar-monitoring/cagent/pkg/jobmon" "github.com/cloudradar-monitoring/cagent/pkg/monitoring/docker" "github.com/cloudradar-monitoring/cagent/pkg/monitoring/networking" "github.com/cloudradar-monitoring/cagent/pkg/monitoring/sensors" "github.com/cloudradar-monitoring/cagent/pkg/monitoring/services" ) +type Cleaner interface { + Cleanup() error +} + +// cleanupCommand allow to group multiple Cleanup steps into one object +type cleanupCommand struct { + steps []func() error +} + +func (c *cleanupCommand) AddStep(f func() error) { + c.steps = append(c.steps, f) +} + +func (c *cleanupCommand) Cleanup() error { + errs := common.ErrorCollector{} + for _, step := range c.steps { + errs.Add(step()) + } + return errs.Combine() +} + func (ca *Cagent) Run(outputFile *os.File, interrupt chan struct{}) { for { err := ca.RunOnce(outputFile, ca.Config.OperationMode == OperationModeFull) @@ -36,12 +58,17 @@ func (ca *Cagent) Run(outputFile *os.File, interrupt chan struct{}) { } func (ca *Cagent) RunOnce(outputFile *os.File, fullMode bool) error { - measurements := ca.collectMeasurements(fullMode) - return ca.reportMeasurements(measurements, outputFile) + measurements, cleaner := ca.collectMeasurements(fullMode) + err := ca.reportMeasurements(measurements, outputFile) + if err == nil { + err = cleaner.Cleanup() + } + return err } -func (ca *Cagent) collectMeasurements(fullMode bool) common.MeasurementsMap { +func (ca *Cagent) collectMeasurements(fullMode bool) (common.MeasurementsMap, Cleaner) { var errCollector = common.ErrorCollector{} + var cleanupCommand = &cleanupCommand{} var measurements = make(common.MeasurementsMap) cpum, err := ca.CPUWatcher().Results() @@ -138,6 +165,14 @@ func (ca *Cagent) collectMeasurements(fullMode bool) common.MeasurementsMap { if len(smartMeas) > 0 { measurements = measurements.AddInnerWithPrefix("smartmon", smartMeas) } + + spool := jobmon.NewSpoolManager(ca.Config.JobMonitoring.SpoolDirPath, log.StandardLogger()) + ids, jobs, err := spool.GetFinishedJobs() + errCollector.Add(err) + measurements = measurements.AddWithPrefix("", common.MeasurementsMap{"jobmon": jobs}) + cleanupCommand.AddStep(func() error { + return spool.RemoveJobs(ids) + }) } measurements["operation_mode"] = ca.Config.OperationMode @@ -149,7 +184,7 @@ func (ca *Cagent) collectMeasurements(fullMode bool) common.MeasurementsMap { measurements["cagent.success"] = 1 } - return measurements + return measurements, cleanupCommand } func (ca *Cagent) reportMeasurements(measurements common.MeasurementsMap, outputFile *os.File) error { diff --git a/pkg-scripts/msi-templates/product.wxs b/pkg-scripts/msi-templates/product.wxs index 4e9e6c3..dab65b9 100644 --- a/pkg-scripts/msi-templates/product.wxs +++ b/pkg-scripts/msi-templates/product.wxs @@ -188,6 +188,20 @@ {{end}} + + + + + + + + + + + + + + {{if gt (.Files.Items | len) 0}} @@ -197,6 +211,7 @@ {{end}} + diff --git a/pkg-scripts/postinstall-rpm.sh b/pkg-scripts/postinstall-rpm.sh index 8f0d17e..76b7052 100755 --- a/pkg-scripts/postinstall-rpm.sh +++ b/pkg-scripts/postinstall-rpm.sh @@ -7,6 +7,8 @@ CONFIG_PATH=/etc/cagent/cagent.conf versionsCount=$1 if [ ${versionsCount} == 1 ]; then # fresh install + chown cagent:cagent -R /var/lib/cagent + chmod 6777 /var/lib/cagent/jobmon /usr/bin/cagent -y -s cagent -c ${CONFIG_PATH} else # package update serviceStatus=`/usr/bin/cagent -y -service_status -c ${CONFIG_PATH}` diff --git a/pkg-scripts/postinstall.sh b/pkg-scripts/postinstall.sh index 044e37c..a600b33 100755 --- a/pkg-scripts/postinstall.sh +++ b/pkg-scripts/postinstall.sh @@ -5,6 +5,8 @@ CONFIG_PATH=/etc/cagent/cagent.conf if [ "$1" = configure ]; then # $2 contains previous version number if [ -z "$2" ]; then # fresh install + chown cagent:cagent -R /var/lib/cagent + chmod 6777 /var/lib/cagent/jobmon /usr/bin/cagent -y -s cagent -c ${CONFIG_PATH} else # package update serviceStatus=`/usr/bin/cagent -y -service_status -c ${CONFIG_PATH}` diff --git a/pkg/common/types.go b/pkg/common/types.go index c752b64..6cca615 100644 --- a/pkg/common/types.go +++ b/pkg/common/types.go @@ -1,8 +1,12 @@ package common import ( + "encoding/json" "fmt" + "strconv" "time" + + "github.com/pkg/errors" ) type MeasurementsMap map[string]interface{} @@ -27,36 +31,20 @@ func (t *Timestamp) MarshalJSON() ([]byte, error) { ts := time.Time(*t).Unix() stamp := fmt.Sprint(ts) - return []byte(stamp), nil -} - -// LimitedBuffer allows to store last N bytes written to it, discarding unneeded bytes -type LimitedBuffer struct { - buf []byte - n int + return json.Marshal(stamp) } -func NewLimitedBuffer(n int) *LimitedBuffer { - return &LimitedBuffer{buf: make([]byte, 0, n), n: n} -} - -func (w *LimitedBuffer) String() string { - return string(w.buf) -} +func (t *Timestamp) UnmarshalJSON(raw []byte) error { + var strTimestamp string + if err := json.Unmarshal(raw, &strTimestamp); err != nil { + return err + } -func (w *LimitedBuffer) Write(p []byte) (n int, err error) { - gotLen := len(p) - if gotLen >= w.n { - w.buf = p[gotLen-w.n-1:] - } else if gotLen > 0 { - newLength := len(w.buf) + gotLen - if newLength <= w.n { - w.buf = append(w.buf, p...) - } else { - truncateIndex := newLength - w.n - 1 - w.buf = append(w.buf[truncateIndex-1:], p...) - } + timestamp, err := strconv.ParseInt(strTimestamp, 10, 0) + if err != nil { + return errors.Wrap(err, "input is not Unix timestamp") } - return gotLen, nil + *t = Timestamp(time.Unix(timestamp, 0)) + return nil } diff --git a/pkg/jobmon/jobmon.go b/pkg/jobmon/jobmon.go index 46fcd45..f2db9a3 100644 --- a/pkg/jobmon/jobmon.go +++ b/pkg/jobmon/jobmon.go @@ -12,11 +12,12 @@ type Severity string const ( SeverityAlert = "alert" SeverityWarning = "warning" + SeverityNone = "none" - maxStdStreamBufferSize = 100 // 4 * 1024 + maxStdStreamBufferSize = 4 * 1024 ) -var ValidSeverities = []Severity{SeverityAlert, SeverityWarning} +var ValidSeverities = []Severity{SeverityAlert, SeverityWarning, SeverityNone} func IsValidJobMonitoringSeverity(s Severity) bool { for _, validSeverity := range ValidSeverities { diff --git a/pkg/jobmon/jobmon_windows.go b/pkg/jobmon/jobmon_windows.go index f1581bc..9d88899 100644 --- a/pkg/jobmon/jobmon_windows.go +++ b/pkg/jobmon/jobmon_windows.go @@ -11,4 +11,4 @@ func osSpecificCommandConfig(cmd *exec.Cmd) { func osSpecificCommandTermination(cmd *exec.Cmd) { _ = cmd.Process.Kill() -} \ No newline at end of file +} diff --git a/pkg/jobmon/runner.go b/pkg/jobmon/runner.go index 7e859e7..70c52da 100644 --- a/pkg/jobmon/runner.go +++ b/pkg/jobmon/runner.go @@ -25,20 +25,17 @@ func NewRunner(spoolDirPath string, runConfig *JobRunConfig, logger *logrus.Logg } } -func (r *Runner) RunJob(interruptionSignalsChan chan os.Signal) error { +func (r *Runner) RunJob(interruptionSignalsChan chan os.Signal, forceRun bool) error { var job = newJobRun(r.cfg) var cmd = r.createJobCommand() - stdOutBuffer := common.NewLimitedBuffer(maxStdStreamBufferSize) - if r.cfg.RecordStdOut { - cmd.Stdout = stdOutBuffer - } - stdErrBuffer := common.NewLimitedBuffer(maxStdStreamBufferSize) - if r.cfg.RecordStdErr { - cmd.Stderr = stdErrBuffer - } + stdOutBuffer := newCaptureWriter(os.Stdout, maxStdStreamBufferSize) + cmd.Stdout = stdOutBuffer + + stdErrBuffer := newCaptureWriter(os.Stderr, maxStdStreamBufferSize) + cmd.Stderr = stdErrBuffer - uid, err := r.spool.NewJob(job) + uid, err := r.spool.NewJob(job, forceRun) if err != nil { return err } @@ -67,6 +64,9 @@ func (r *Runner) RunJob(interruptionSignalsChan chan os.Signal) error { } else { job.AddError(err.Error()) } + } else { + code := cmd.ProcessState.ExitCode() + exitCode = &code } endTimestamp := common.Timestamp(endedAt) diff --git a/pkg/jobmon/spool.go b/pkg/jobmon/spool.go index 7b86213..ffedd5e 100644 --- a/pkg/jobmon/spool.go +++ b/pkg/jobmon/spool.go @@ -4,7 +4,6 @@ import ( "encoding/hex" "encoding/json" "fmt" - "io/ioutil" "os" "path/filepath" "strconv" @@ -21,6 +20,7 @@ import ( const ( markerRunning = "0" markerFinished = "1" + spoolDirPermissions = 6777 spoolEntryPermissions = 0666 jsonExtension = "json" ) @@ -36,86 +36,172 @@ type SpoolManager struct { // NewSpoolManager creates a new object to manage jobmon spool // dirPath must be absolute path func NewSpoolManager(dirPath string, logger *logrus.Logger) *SpoolManager { - lockFile, _ := lockfile.New(getLockFilePath(dirPath)) + lockFile, _ := lockfile.New(fmt.Sprintf("%s/spool.lock", dirPath)) return &SpoolManager{dirPath, &lockFile, logger} } -func (s *SpoolManager) NewJob(r *JobRun) (string, error) { +func (s *SpoolManager) NewJob(r *JobRun, forcedRun bool) (string, error) { err := s.getLock() if err != nil { return "", err } defer s.releaseLock() - alreadyRunning, err := s.isJobAlreadyRunning(r.ID) + duplicateRunEntries, err := s.findDuplicateRuns(r.ID) if err != nil { return "", err } - if alreadyRunning { + alreadyRunning := len(duplicateRunEntries) > 0 + if !forcedRun && alreadyRunning { r.AddError(ErrJobAlreadyRunning.Error()) } - jsonBytes, err := json.Marshal(r) - if err != nil { - return "", err + if forcedRun { + err = removeFiles(duplicateRunEntries) + if err != nil { + return "", err + } } uniqID := getUniqJobRunID(r.ID, alreadyRunning, r.StartedAt) filePath := s.getFilePath(uniqID) - err = saveJSONFile(filePath, jsonBytes) + err = s.saveJobRun(filePath, r) if err != nil { return "", err } - if alreadyRunning { + if !forcedRun && alreadyRunning { err = ErrJobAlreadyRunning } return uniqID, err } -func saveJSONFile(filePath string, jsonBytes []byte) error { - err := ioutil.WriteFile(filePath, jsonBytes, spoolEntryPermissions) +func (s *SpoolManager) FinishJob(uniqID string, r *JobRun) error { + err := s.getLock() if err != nil { - return errors.Wrapf(err, "while writing new file %s", filePath) + return err } - return nil + defer s.releaseLock() + + filePath := s.getFilePath(uniqID) + newFilePath := s.getFilePath(getUniqJobRunID(r.ID, true, r.StartedAt)) + err = os.Rename(filePath, newFilePath) + if err != nil { + return errors.Wrapf(err, "could not mark job %s as finished", uniqID) + } + + return s.saveJobRun(newFilePath, r) } -func (s *SpoolManager) FinishJob(uniqID string, r *JobRun) error { - jsonBytes, err := json.Marshal(r) +func (s *SpoolManager) GetFinishedJobs() ([]string, []*JobRun, error) { + err := s.getLock() if err != nil { - return err + return nil, nil, err + } + + pattern := fmt.Sprintf("%s/%s_*_*.%s", s.dirPath, markerFinished, jsonExtension) + fileNames, err := filepath.Glob(pattern) + s.releaseLock() + if err != nil { + return nil, nil, err + } + + ids := make([]string, 0) + jobs := make([]*JobRun, 0) + for _, f := range fileNames { + j, err := s.readEntryFile(f) + if err != nil { + return nil, nil, err + } + ids = append(ids, getUniqJobRunID(j.ID, true, j.StartedAt)) + jobs = append(jobs, j) + } + + return ids, jobs, nil +} + +func (s *SpoolManager) readEntryFile(path string) (*JobRun, error) { + jsonFile, err := os.Open(path) + if err != nil { + return nil, errors.Wrapf(err, "while opening file %s", path) } + defer jsonFile.Close() + var j JobRun + err = json.NewDecoder(jsonFile).Decode(&j) + if err != nil { + return nil, errors.Wrapf(err, "while decoding file %s", path) + } + return &j, nil +} - err = s.getLock() +func (s *SpoolManager) RemoveJobs(ids []string) error { + err := s.getLock() if err != nil { return err } defer s.releaseLock() - filePath := s.getFilePath(uniqID) - newFilePath := s.getFilePath(getUniqJobRunID(r.ID, true, r.StartedAt)) - err = os.Rename(filePath, newFilePath) + var filePaths []string + for _, uniqID := range ids { + filePaths = append(filePaths, s.getFilePath(uniqID)) + } + + return removeFiles(filePaths) +} + +func (s *SpoolManager) ensureSpoolDirExists() error { + _, err := os.Stat(s.dirPath) + if os.IsNotExist(err) { + err = os.MkdirAll(s.dirPath, spoolDirPermissions) + if err != nil { + return errors.Wrapf( + err, + "could not create spool dir %s. Please check you have enough rights or try create the dir manually", + s.dirPath, + ) + } + } else if err != nil { + err = errors.Wrapf(err, "while checking spool dir %s exists", s.dirPath) + } + return err +} + +func (s *SpoolManager) saveJobRun(filePath string, r *JobRun) error { + err := s.ensureSpoolDirExists() if err != nil { - return errors.Wrapf(err, "could not mark job %s as finished", uniqID) + return err + } + + fl, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, spoolEntryPermissions) + if err != nil { + return errors.Wrapf(err, "can not open file for writing: %s", filePath) } + defer fl.Close() - return saveJSONFile(newFilePath, jsonBytes) + err = json.NewEncoder(fl).Encode(r) + if err != nil { + return errors.Wrapf(err, "while encoding spool entry to %s", filePath) + } + return nil } -func (s *SpoolManager) isJobAlreadyRunning(jobID string) (bool, error) { +func (s *SpoolManager) findDuplicateRuns(jobID string) ([]string, error) { encodedJobID := encodeJobID(jobID) pattern := fmt.Sprintf("%s/%s_*_%s.%s", s.dirPath, markerRunning, encodedJobID, jsonExtension) matches, err := filepath.Glob(pattern) if err != nil { - return false, errors.Wrapf(err, "while searching %s", pattern) + return nil, errors.Wrapf(err, "while searching %s", pattern) } - return len(matches) > 0, nil + return matches, nil } func (s *SpoolManager) getLock() error { - err := s.lock.TryLock() + err := s.ensureSpoolDirExists() + if err != nil { + return err + } + err = s.lock.TryLock() if err != nil { err = errors.Wrap(err, "could not get lock") } @@ -133,10 +219,6 @@ func (s *SpoolManager) getFilePath(id string) string { return fmt.Sprintf("%s/%s.%s", s.dirPath, id, jsonExtension) } -func getLockFilePath(dirPath string) string { - return fmt.Sprintf("%s/spool.lock", dirPath) -} - func getUniqJobRunID(jobID string, isJobFinished bool, jobStartedAt common.Timestamp) string { marker := markerRunning if isJobFinished { @@ -153,3 +235,22 @@ func getUniqJobRunID(jobID string, isJobFinished bool, jobStartedAt common.Times func encodeJobID(id string) string { return hex.EncodeToString([]byte(id)) } + +func removeFiles(filePaths []string) error { + for _, f := range filePaths { + err := removeFile(f) + if err != nil { + return err + } + } + return nil +} + +// removeFile ignores error if file already deleted or not exists +func removeFile(path string) error { + err := os.Remove(path) + if err != nil && !os.IsNotExist(err) { + return errors.Wrapf(err, "while removing %s", path) + } + return nil +} diff --git a/pkg/jobmon/writer.go b/pkg/jobmon/writer.go new file mode 100644 index 0000000..f133c93 --- /dev/null +++ b/pkg/jobmon/writer.go @@ -0,0 +1,37 @@ +package jobmon + +import ( + "io" +) + +// captureWriter copies all data to destination writer and captures last N bytes +type captureWriter struct { + buf []byte + n int + dst io.Writer +} + +func newCaptureWriter(dst io.Writer, n int) *captureWriter { + return &captureWriter{buf: make([]byte, 0, n), n: n, dst: dst} +} + +func (w *captureWriter) String() string { + return string(w.buf) +} + +func (w *captureWriter) Write(p []byte) (n int, err error) { + gotLen := len(p) + if gotLen >= w.n { + w.buf = p[gotLen-w.n-1:] + } else if gotLen > 0 { + newLength := len(w.buf) + gotLen + if newLength <= w.n { + w.buf = append(w.buf, p...) + } else { + truncateIndex := newLength - w.n - 1 + w.buf = append(w.buf[truncateIndex-1:], p...) + } + } + + return w.dst.Write(p) +}