Skip to content

Commit

Permalink
Refine installation packages, implement stdstream redirection, -f CLI…
Browse files Browse the repository at this point in the history
… option
  • Loading branch information
nikita-vanyasin committed Dec 3, 2019
1 parent d416aed commit 7deac74
Show file tree
Hide file tree
Showing 14 changed files with 262 additions and 80 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ windows-sign:
scp ${SCP_WIN_BUILD_MACHINE_OPTIONS} ${PROJECT_DIR}/dist/cagent_windows_amd64/cagent.exe ${WIN_BUILD_MACHINE_AUTH}:${WIN_BUILD_MACHINE_CI_DIR}/dist/cagent_64.exe
scp ${SCP_WIN_BUILD_MACHINE_OPTIONS} ${PROJECT_DIR}/dist/csender_windows_386/csender.exe ${WIN_BUILD_MACHINE_AUTH}:${WIN_BUILD_MACHINE_CI_DIR}/dist/csender_386.exe
scp ${SCP_WIN_BUILD_MACHINE_OPTIONS} ${PROJECT_DIR}/dist/csender_windows_amd64/csender.exe ${WIN_BUILD_MACHINE_AUTH}:${WIN_BUILD_MACHINE_CI_DIR}/dist/csender_64.exe
scp ${SCP_WIN_BUILD_MACHINE_OPTIONS} ${PROJECT_DIR}/dist/jobmon_windows_386/jobmon.exe ${WIN_BUILD_MACHINE_AUTH}:${WIN_BUILD_MACHINE_CI_DIR}/dist/jobmon_386.exe
scp ${SCP_WIN_BUILD_MACHINE_OPTIONS} ${PROJECT_DIR}/dist/jobmon_windows_amd64/jobmon.exe ${WIN_BUILD_MACHINE_AUTH}:${WIN_BUILD_MACHINE_CI_DIR}/dist/jobmon_64.exe
scp ${SCP_WIN_BUILD_MACHINE_OPTIONS} ${PROJECT_DIR}/dist/jobmon_windows_386/jobmon.exe ${WIN_BUILD_MACHINE_AUTH}:${WIN_BUILD_MACHINE_CI_DIR}/dist/jobmon_386.exe
scp ${SCP_WIN_BUILD_MACHINE_OPTIONS} ${PROJECT_DIR}/dist/jobmon_windows_amd64/jobmon.exe ${WIN_BUILD_MACHINE_AUTH}:${WIN_BUILD_MACHINE_CI_DIR}/dist/jobmon_64.exe
# Copy other build dependencies
scp ${SCP_WIN_BUILD_MACHINE_OPTIONS} ${PROJECT_DIR}/build-win.bat ${WIN_BUILD_MACHINE_AUTH}:${WIN_BUILD_MACHINE_CI_DIR}/build-win.bat
ssh ${SSH_WIN_BUILD_MACHINE_OPTIONS} ${WIN_BUILD_MACHINE_AUTH} chmod +x ${WIN_BUILD_MACHINE_CI_DIR}/build-win.bat
Expand Down
3 changes: 2 additions & 1 deletion cmd/jobmon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func main() {
cfgPathPtr := flag.String("c", cagent.DefaultCfgPath, "config file path")

jobIDPtr := flag.String("id", "", fmt.Sprintf("id of the job, required, maximum %d characters", maxJobIDLength))
forceRunPtr := flag.Bool("f", false, "Force run of a job even if the job with the same ID is already running or its termination wasn't handled successfully.")
severityPtr := flag.String("s", "", "alert|warning|none process failed job with this severity. Overwrites the default severity of cagent.conf. Severity 'none' suppresses all messages.")
nextRunInPtr := flag.Duration("nr", 0, "<N>h|m indicates when the job should run for the next time. Allows triggering alters for not run jobs. The shortest interval is 5 minutes.")
maxExecutionTimePtr := flag.Duration("me", 0, "<N>h|m|s or just <N> for number of seconds. Max execution time for job.")
Expand Down Expand Up @@ -68,7 +69,7 @@ func main() {
)

jobMonRunner := jobmon.NewRunner(cfg.JobMonitoring.SpoolDirPath, jobConfig, logger)
err = jobMonRunner.RunJob(sigChan)
err = jobMonRunner.RunJob(sigChan, *forceRunPtr)
if err != nil {
logger.Fatalf("Could not start a job: %s", err.Error())
return
Expand Down
2 changes: 1 addition & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ type JobMonitoringConfig struct {
SpoolDirPath string `toml:"spool_dir" comment:"Path to spool dir"`
RecordStdErr bool `toml:"record_stderr" comment:"Record the last 4 KB of the error output. Default: true"`
RecordStdOut bool `toml:"record_stdout" comment:"Record the last 4 KB of the standard output. Default: false"`
Severity jobmon.Severity `toml:"severity" comment:"Failed jobs will be processed as alerts. Possible values alert or warning. Default: alert"`
Severity jobmon.Severity `toml:"severity" comment:"Failed jobs will be processed as alerts. Possible values alert, warning or none. Default: alert"`
}

func (j *JobMonitoringConfig) Validate() error {
Expand Down
2 changes: 1 addition & 1 deletion example.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,4 @@ temperature_monitoring = true # default true
spool_dir = '/var/lib/cagent/jobmon' # Linux
record_stderr = true # Record the last 4 KB of the error output. Default: true
record_stdout = false # Record the last 4 KB of the standard output. Default: false
severity = "alert" # Failed jobs will be processed as alerts. Possible values alert or warning. Default: alert
severity = "alert" # Failed jobs will be processed as alerts. Possible values alert, warning or none. Default: alert
43 changes: 39 additions & 4 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,34 @@ import (

"github.com/cloudradar-monitoring/cagent/pkg/common"
"github.com/cloudradar-monitoring/cagent/pkg/hwinfo"
"github.com/cloudradar-monitoring/cagent/pkg/jobmon"
"github.com/cloudradar-monitoring/cagent/pkg/monitoring/docker"
"github.com/cloudradar-monitoring/cagent/pkg/monitoring/networking"
"github.com/cloudradar-monitoring/cagent/pkg/monitoring/sensors"
"github.com/cloudradar-monitoring/cagent/pkg/monitoring/services"
)

type Cleaner interface {
Cleanup() error
}

// cleanupCommand allow to group multiple Cleanup steps into one object
type cleanupCommand struct {
steps []func() error
}

func (c *cleanupCommand) AddStep(f func() error) {
c.steps = append(c.steps, f)
}

func (c *cleanupCommand) Cleanup() error {
errs := common.ErrorCollector{}
for _, step := range c.steps {
errs.Add(step())
}
return errs.Combine()
}

func (ca *Cagent) Run(outputFile *os.File, interrupt chan struct{}) {
for {
err := ca.RunOnce(outputFile, ca.Config.OperationMode == OperationModeFull)
Expand All @@ -36,12 +58,17 @@ func (ca *Cagent) Run(outputFile *os.File, interrupt chan struct{}) {
}

func (ca *Cagent) RunOnce(outputFile *os.File, fullMode bool) error {
measurements := ca.collectMeasurements(fullMode)
return ca.reportMeasurements(measurements, outputFile)
measurements, cleaner := ca.collectMeasurements(fullMode)
err := ca.reportMeasurements(measurements, outputFile)
if err == nil {
err = cleaner.Cleanup()
}
return err
}

func (ca *Cagent) collectMeasurements(fullMode bool) common.MeasurementsMap {
func (ca *Cagent) collectMeasurements(fullMode bool) (common.MeasurementsMap, Cleaner) {
var errCollector = common.ErrorCollector{}
var cleanupCommand = &cleanupCommand{}
var measurements = make(common.MeasurementsMap)

cpum, err := ca.CPUWatcher().Results()
Expand Down Expand Up @@ -138,6 +165,14 @@ func (ca *Cagent) collectMeasurements(fullMode bool) common.MeasurementsMap {
if len(smartMeas) > 0 {
measurements = measurements.AddInnerWithPrefix("smartmon", smartMeas)
}

spool := jobmon.NewSpoolManager(ca.Config.JobMonitoring.SpoolDirPath, log.StandardLogger())
ids, jobs, err := spool.GetFinishedJobs()
errCollector.Add(err)
measurements = measurements.AddWithPrefix("", common.MeasurementsMap{"jobmon": jobs})
cleanupCommand.AddStep(func() error {
return spool.RemoveJobs(ids)
})
}

measurements["operation_mode"] = ca.Config.OperationMode
Expand All @@ -149,7 +184,7 @@ func (ca *Cagent) collectMeasurements(fullMode bool) common.MeasurementsMap {
measurements["cagent.success"] = 1
}

return measurements
return measurements, cleanupCommand
}

func (ca *Cagent) reportMeasurements(measurements common.MeasurementsMap, outputFile *os.File) error {
Expand Down
15 changes: 15 additions & 0 deletions pkg-scripts/msi-templates/product.wxs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,20 @@
{{end}}
</InstallExecuteSequence>
<DirectoryRef Id="TARGETDIR">
<Directory Id="CommonAppDataFolder">
<Directory Id="CommonAppCagentFolder" Name="cagent">
<Directory Id="CommonAppJobmonFolder" Name="jobmon">
<Component Id="CreateProgramDataJobmonFolder" Guid="8d8a4c84-8135-4b5b-9815-057ba94e8025">
<CreateFolder>
<util:PermissionEx User="Users" Domain="[MachineName]" GenericAll="yes" />
</CreateFolder>
</Component>
</Directory>
</Directory>
</Directory>
</DirectoryRef>
<Feature Id="DefaultFeature" Level="1">
<ComponentRef Id="ENVS"/>
{{if gt (.Files.Items | len) 0}}
Expand All @@ -197,6 +211,7 @@
<ComponentGroupRef Id="AppFiles{{$i}}" />
{{end}}
<ComponentRef Id="RegistryEntries"/>
<ComponentRef Id="CreateProgramDataJobmonFolder" />
<Feature Id="Uninstall">
<ComponentRef Id="UninstallFolder" Primary="yes"/>
</Feature>
Expand Down
2 changes: 2 additions & 0 deletions pkg-scripts/postinstall-rpm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ CONFIG_PATH=/etc/cagent/cagent.conf
versionsCount=$1

if [ ${versionsCount} == 1 ]; then # fresh install
chown cagent:cagent -R /var/lib/cagent
chmod 6777 /var/lib/cagent/jobmon
/usr/bin/cagent -y -s cagent -c ${CONFIG_PATH}
else # package update
serviceStatus=`/usr/bin/cagent -y -service_status -c ${CONFIG_PATH}`
Expand Down
2 changes: 2 additions & 0 deletions pkg-scripts/postinstall.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ CONFIG_PATH=/etc/cagent/cagent.conf
if [ "$1" = configure ]; then
# $2 contains previous version number
if [ -z "$2" ]; then # fresh install
chown cagent:cagent -R /var/lib/cagent
chmod 6777 /var/lib/cagent/jobmon
/usr/bin/cagent -y -s cagent -c ${CONFIG_PATH}
else # package update
serviceStatus=`/usr/bin/cagent -y -service_status -c ${CONFIG_PATH}`
Expand Down
42 changes: 15 additions & 27 deletions pkg/common/types.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package common

import (
"encoding/json"
"fmt"
"strconv"
"time"

"github.com/pkg/errors"
)

type MeasurementsMap map[string]interface{}
Expand All @@ -27,36 +31,20 @@ func (t *Timestamp) MarshalJSON() ([]byte, error) {
ts := time.Time(*t).Unix()
stamp := fmt.Sprint(ts)

return []byte(stamp), nil
}

// LimitedBuffer allows to store last N bytes written to it, discarding unneeded bytes
type LimitedBuffer struct {
buf []byte
n int
return json.Marshal(stamp)
}

func NewLimitedBuffer(n int) *LimitedBuffer {
return &LimitedBuffer{buf: make([]byte, 0, n), n: n}
}

func (w *LimitedBuffer) String() string {
return string(w.buf)
}
func (t *Timestamp) UnmarshalJSON(raw []byte) error {
var strTimestamp string
if err := json.Unmarshal(raw, &strTimestamp); err != nil {
return err
}

func (w *LimitedBuffer) Write(p []byte) (n int, err error) {
gotLen := len(p)
if gotLen >= w.n {
w.buf = p[gotLen-w.n-1:]
} else if gotLen > 0 {
newLength := len(w.buf) + gotLen
if newLength <= w.n {
w.buf = append(w.buf, p...)
} else {
truncateIndex := newLength - w.n - 1
w.buf = append(w.buf[truncateIndex-1:], p...)
}
timestamp, err := strconv.ParseInt(strTimestamp, 10, 0)
if err != nil {
return errors.Wrap(err, "input is not Unix timestamp")
}

return gotLen, nil
*t = Timestamp(time.Unix(timestamp, 0))
return nil
}
5 changes: 3 additions & 2 deletions pkg/jobmon/jobmon.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ type Severity string
const (
SeverityAlert = "alert"
SeverityWarning = "warning"
SeverityNone = "none"

maxStdStreamBufferSize = 100 // 4 * 1024
maxStdStreamBufferSize = 4 * 1024
)

var ValidSeverities = []Severity{SeverityAlert, SeverityWarning}
var ValidSeverities = []Severity{SeverityAlert, SeverityWarning, SeverityNone}

func IsValidJobMonitoringSeverity(s Severity) bool {
for _, validSeverity := range ValidSeverities {
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobmon/jobmon_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ func osSpecificCommandConfig(cmd *exec.Cmd) {

func osSpecificCommandTermination(cmd *exec.Cmd) {
_ = cmd.Process.Kill()
}
}
20 changes: 10 additions & 10 deletions pkg/jobmon/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,17 @@ func NewRunner(spoolDirPath string, runConfig *JobRunConfig, logger *logrus.Logg
}
}

func (r *Runner) RunJob(interruptionSignalsChan chan os.Signal) error {
func (r *Runner) RunJob(interruptionSignalsChan chan os.Signal, forceRun bool) error {
var job = newJobRun(r.cfg)
var cmd = r.createJobCommand()

stdOutBuffer := common.NewLimitedBuffer(maxStdStreamBufferSize)
if r.cfg.RecordStdOut {
cmd.Stdout = stdOutBuffer
}
stdErrBuffer := common.NewLimitedBuffer(maxStdStreamBufferSize)
if r.cfg.RecordStdErr {
cmd.Stderr = stdErrBuffer
}
stdOutBuffer := newCaptureWriter(os.Stdout, maxStdStreamBufferSize)
cmd.Stdout = stdOutBuffer

stdErrBuffer := newCaptureWriter(os.Stderr, maxStdStreamBufferSize)
cmd.Stderr = stdErrBuffer

uid, err := r.spool.NewJob(job)
uid, err := r.spool.NewJob(job, forceRun)
if err != nil {
return err
}
Expand Down Expand Up @@ -67,6 +64,9 @@ func (r *Runner) RunJob(interruptionSignalsChan chan os.Signal) error {
} else {
job.AddError(err.Error())
}
} else {
code := cmd.ProcessState.ExitCode()
exitCode = &code
}

endTimestamp := common.Timestamp(endedAt)
Expand Down
Loading

0 comments on commit 7deac74

Please sign in to comment.