Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: report global labels with heartbeat #1066

Merged
merged 3 commits into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 49 additions & 1 deletion heartbeat/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,33 @@ import (
"log"
"net"
"net/http"
"os"
osExec "os/exec"
"runtime"
"strconv"
"strings"
"time"

cpuUtil "github.com/shirou/gopsutil/v3/cpu"

"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs/system"
cpuUtil "github.com/shirou/gopsutil/v3/cpu"
"flashcat.cloud/categraf/pkg/cmdx"
)

const collinterval = 3

type (
HeartbeatResponse struct {
Data UpdateInfo `json:"dat"`
Msg string `json:"err"`
}
UpdateInfo struct {
NewVersion string `json:"new_version"`
UpdateURL string `json:"download_url"`
}
)

func Work() {
conf := config.Config.Heartbeat

Expand Down Expand Up @@ -114,6 +129,7 @@ func work(ps *system.SystemPS, client *http.Client) {
"cpu_util": cpuUsagePercent,
"mem_util": memUsagePercent,
"unixtime": time.Now().UnixMilli(),
"global_labels": config.GlobalLabels(),
"host_ip": hostIP,
}

Expand Down Expand Up @@ -191,6 +207,38 @@ func work(ps *system.SystemPS, client *http.Client) {
if debug() {
log.Println("D! heartbeat response:", string(bs), "status code:", res.StatusCode)
}

hr := HeartbeatResponse{}
err = json.Unmarshal(bs, &hr)
if err != nil {
log.Println("W! failed to unmarshal heartbeat response:", err)
return
}
if len(hr.Data.NewVersion) != 0 && len(hr.Data.UpdateURL) != 0 && hr.Data.NewVersion != shortVersion && hr.Data.NewVersion != config.Version {
var (
out bytes.Buffer
stderr bytes.Buffer
)
exe, err := os.Executable()
if err != nil {
log.Println("E! failed to get current executable:", err)
return
}
cmd := osExec.Command(exe, "-update", "-update_url", hr.Data.UpdateURL)
cmd.Stdout = &out
cmd.Stderr = &stderr
err, timeout := cmdx.RunTimeout(cmd, time.Second*300)
if timeout {
log.Printf("E! exec %s timeout", cmd.String())
return
}
if err != nil {
log.Println("E! failed to update categraf:", err, "stderr:", stderr.String(), "stdout:",
out.String(), "command:", cmd.String())
return
}
log.Printf("update categraf(%s) from %s success, new version: %s", version(), hr.Data.UpdateURL, hr.Data.NewVersion)
}
}

func memUsage(ps *system.SystemPS) float64 {
Expand Down
2 changes: 1 addition & 1 deletion inputs/apache/apache.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (ins *Instance) Init() error {
e, err := exporter.New(logger, &ins.Config)

if err != nil {
return fmt.Errorf("could not instantiate mongodb lag exporter: %w", err)
return fmt.Errorf("could not instantiate mongodb lag exporter: %v", err)
}

ins.e = e
Expand Down
5 changes: 3 additions & 2 deletions inputs/mtail/internal/runtime/compiler/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import (
"fmt"
"strings"

"flashcat.cloud/categraf/inputs/mtail/internal/runtime/compiler/position"
"github.com/pkg/errors"

"flashcat.cloud/categraf/inputs/mtail/internal/runtime/compiler/position"
)

type compileError struct {
Expand All @@ -26,7 +27,7 @@ type ErrorList []*compileError
// Add appends an error at a position to the list of errors.
func (p *ErrorList) Add(pos *position.Position, msg string) {
if pos == nil {
pos = &position.Position{"", -1, -1, -1}
pos = &position.Position{Filename: "", Line: -1, Startcol: -1, Endcol: -1}
}
*p = append(*p, &compileError{*pos, msg})
}
Expand Down
2 changes: 1 addition & 1 deletion inputs/mtail/internal/tailer/logstream/filestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
// common change pattern anyway.
newfi, serr := os.Stat(fs.pathname)
if serr != nil {
log.Printf("stream(%s): stat error: %v", serr)
log.Printf("stream(%s): stat error: %v", fs.pathname, serr)
// If this is a NotExist error, then we should wrap up this
// goroutine. The Tailer will create a new logstream if the
// file is in the middle of a rotation and gets recreated
Expand Down
2 changes: 1 addition & 1 deletion inputs/mtail/internal/tailer/logstream/logstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func New(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname st
if err != nil {
return nil, err
}
log.Println("Parsed url as %v", u)
log.Printf("Parsed url as %v", u)

path := pathname
switch u.Scheme {
Expand Down
Loading