diff --git a/.circleci/config.yml b/.circleci/config.yml index ea532c6e..e5ddf199 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -19,21 +19,31 @@ jobs: - prometheus/setup_environment - run: go mod download - run: make + - run: sudo apt-get update && sudo apt-get install sqlite3 -y && sqlite3 --version + - run: CGO_BUILD=1 make test-arm: executor: arm steps: - checkout - run: uname -a - run: make test-e2e + - run: sudo apt-get update && sudo apt-get install sqlite3 -y && sqlite3 --version + - run: CGO_BUILD=1 make test-e2e build: machine: image: ubuntu-2204:current parallelism: 3 steps: - prometheus/setup_environment - - run: docker run --privileged linuxkit/binfmt:af88a591f9cc896a52ce596b9cf7ca26a061ef97 + # - run: docker run --privileged linuxkit/binfmt:af88a591f9cc896a52ce596b9cf7ca26a061ef97 - run: promu crossbuild -v --parallelism $CIRCLE_NODE_TOTAL --parallelism-thread $CIRCLE_NODE_INDEX - # - run: promu --config .promu-cgo.yml crossbuild -v --parallelism $CIRCLE_NODE_TOTAL --parallelism-thread $CIRCLE_NODE_INDEX + # Replace default CGO_BUILD before we run CGO. There is no way to inject this env + # variable into golang-builder container. So we artificially replace the default + # CGO_BUILD variable in Makefile and run CGO promu + # Do not use CGO_ENABLED env var as golang-builder image will set this var by default to zer + # and our override will not have any effect + - run: sed -i -e 's/CGO_BUILD ?= 0/CGO_BUILD ?= 1/g' Makefile + - run: promu --config .promu-cgo.yml crossbuild -v --parallelism $CIRCLE_NODE_TOTAL --parallelism-thread $CIRCLE_NODE_INDEX - persist_to_workspace: root: . paths: @@ -43,7 +53,7 @@ jobs: destination: /build workflows: version: 2 - batchjob_exporter: + batchjob_monitoring: jobs: - test: filters: diff --git a/.gitignore b/.gitignore index ed672d91..6a3b0d61 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,7 @@ # Test binary, built with `go test -c` *.test +coverage.txt # Output of the go coverage tool, specifically when used with LiteIDE *.out @@ -38,4 +39,7 @@ run.sh # Ignore test files *.db *.prof -lasttimestamp \ No newline at end of file +lasttimestamp + +# Ignore promu related files +promu* diff --git a/.promu-cgo.yml b/.promu-cgo.yml index 1ea13b06..e4d487fd 100644 --- a/.promu-cgo.yml +++ b/.promu-cgo.yml @@ -7,10 +7,10 @@ repository: path: github.com/mahendrapaipuri/batchjob_monitoring build: binaries: - - name: batchjob_exporter - path: ./cmd/batchjob_exporter - - name: batchjob_stats - path: ./cmd/batchjob_stats + - name: batchjob_stats_db + path: ./cmd/batchjob_stats_db + - name: batchjob_stats_server + path: ./cmd/batchjob_stats_server flags: -a -tags 'netgo osusergo static_build' ldflags: | -X github.com/prometheus/common/version.Version={{.Version}} @@ -21,13 +21,17 @@ build: tarball: files: - LICENSE - - NOTICE + - README.md crossbuild: platforms: - - netbsd/amd64 - - netbsd/386 + - linux/386 + - linux/amd64 + - linux/arm64 - linux/mips - - linux/mipsle - linux/mips64 - linux/mips64le + - linux/mipsle + # - linux/ppc64 - linux/ppc64le + - linux/riscv64 + # - linux/s390x diff --git a/.promu.yml b/.promu.yml index a5ce1e71..58aff079 100644 --- a/.promu.yml +++ b/.promu.yml @@ -8,8 +8,6 @@ build: binaries: - name: batchjob_exporter path: ./cmd/batchjob_exporter - - name: batchjob_stats - path: ./cmd/batchjob_stats flags: -a -tags 'netgo osusergo static_build' ldflags: | -X github.com/prometheus/common/version.Version={{.Version}} @@ -20,9 +18,17 @@ build: tarball: files: - LICENSE - - NOTICE + - README.md crossbuild: platforms: - - linux/amd64 - linux/386 + - linux/amd64 - linux/arm64 + - linux/mips + - linux/mips64 + - linux/mips64le + - linux/mipsle + # - linux/ppc64 + - linux/ppc64le + - linux/riscv64 + # - linux/s390x diff --git a/Makefile b/Makefile index b6e5c807..ee8bc9ae 100644 --- a/Makefile +++ b/Makefile @@ -21,6 +21,8 @@ CGROUPS_MODE ?= $([ $(stat -fc %T /sys/fs/cgroup/) = "cgroup2fs" ] && STATICCHECK_IGNORE = +CGO_BUILD ?= 0 + ifeq ($(GOHOSTOS), linux) test-e2e := test-e2e else @@ -33,30 +35,46 @@ else test-docker := test-docker endif -# Use CGO for non-Linux builds. -ifeq ($(GOOS), linux) +# Use CGO for batchjob_stats_* and GO for batchjob_exporter. +ifeq ($(CGO_BUILD), 1) + PROMU_CONF ?= .promu-cgo.yml + pkgs := ./pkg/jobstats ./cmd/batchjob_stats_db ./cmd/batchjob_stats_server +else PROMU_CONF ?= .promu.yml + pkgs := ./pkg/collector ./pkg/emissions ./cmd/batchjob_exporter +endif + +ifeq ($(GOHOSTOS), linux) + test-e2e := test-e2e else - ifndef GOOS - ifeq ($(GOHOSTOS), linux) - PROMU_CONF ?= .promu.yml - else - PROMU_CONF ?= .promu-cgo.yml - endif - else - # Do not use CGO for openbsd/amd64 builds - ifeq ($(GOOS), openbsd) - ifeq ($(GOARCH), amd64) - PROMU_CONF ?= .promu.yml - else - PROMU_CONF ?= .promu-cgo.yml - endif - else - PROMU_CONF ?= .promu-cgo.yml - endif - endif + test-e2e := skip-test-e2e endif +# We are using SQLite3 which needs CGO and thus, this logic +# is not relevant for us anymore +# ifeq ($(GOOS), linux) +# PROMU_CONF ?= .promu.yml +# else +# ifndef GOOS +# ifeq ($(GOHOSTOS), linux) +# PROMU_CONF ?= .promu.yml +# else +# PROMU_CONF ?= .promu-cgo.yml +# endif +# else +# # Do not use CGO for openbsd/amd64 builds +# ifeq ($(GOOS), openbsd) +# ifeq ($(GOARCH), amd64) +# PROMU_CONF ?= .promu.yml +# else +# PROMU_CONF ?= .promu-cgo.yml +# endif +# else +# PROMU_CONF ?= .promu-cgo.yml +# endif +# endif +# endif + PROMU := $(FIRST_GOPATH)/bin/promu --config $(PROMU_CONF) e2e-cgroupsv2-out = pkg/collector/fixtures/e2e-test-cgroupsv2-output.txt @@ -71,10 +89,12 @@ endif # 64bit -> 32bit mapping for cross-checking. At least for amd64/386, the 64bit CPU can execute 32bit code but not the other way around, so we don't support cross-testing upwards. cross-test = skip-test-32bit define goarch_pair - ifeq ($$(GOHOSTOS),linux) - ifeq ($$(GOHOSTARCH),$1) - GOARCH_CROSS = $2 - cross-test = test-32bit + ifeq ($$(GOHOSTOS), linux) + ifndef CGO_BUILD + ifeq ($$(GOHOSTARCH), $1) + GOARCH_CROSS = $2 + cross-test = test-32bit + endif endif endif endef @@ -110,10 +130,18 @@ update_fixtures: rm -vf pkg/collector/fixtures/sys/.unpacked ./scripts/ttar -C pkg/collector/fixtures -c -f pkg/collector/fixtures/sys.ttar sys +ifeq ($(CGO_BUILD), 0) .PHONY: test-e2e test-e2e: build pkg/collector/fixtures/sys/.unpacked @echo ">> running end-to-end tests" - ./scripts/e2e-test.sh + ./scripts/e2e-test.sh -p exporter +else +.PHONY: test-e2e +test-e2e: build pkg/collector/fixtures/sys/.unpacked + @echo ">> running end-to-end tests" + ./scripts/e2e-test.sh -p stats_db + ./scripts/e2e-test.sh -p stats_server +endif .PHONY: skip-test-e2e skip-test-e2e: diff --git a/cmd/batchjob_exporter/batchjob_exporter_test.go b/cmd/batchjob_exporter/batchjob_exporter_test.go index b5832068..f90f5594 100644 --- a/cmd/batchjob_exporter/batchjob_exporter_test.go +++ b/cmd/batchjob_exporter/batchjob_exporter_test.go @@ -14,7 +14,7 @@ import ( ) var ( - binary, _ = filepath.Abs("batchjob_exporter") + binary, _ = filepath.Abs("bin/batchjob_exporter") ) const ( diff --git a/cmd/batchjob_stats/batchjob_stats.go b/cmd/batchjob_stats_db/batchjob_stats_db.go similarity index 54% rename from cmd/batchjob_stats/batchjob_stats.go rename to cmd/batchjob_stats_db/batchjob_stats_db.go index 0aadfbcd..00d2c60b 100644 --- a/cmd/batchjob_stats/batchjob_stats.go +++ b/cmd/batchjob_stats_db/batchjob_stats_db.go @@ -3,17 +3,20 @@ package main import ( + "fmt" _ "net/http/pprof" "os" + "os/signal" "path/filepath" "runtime" + "syscall" + "time" - "github.com/alecthomas/kingpin/v2" "github.com/go-kit/log/level" + _ "github.com/mattn/go-sqlite3" "github.com/prometheus/common/promlog" "github.com/prometheus/common/promlog/flag" "github.com/prometheus/common/version" - _ "modernc.org/sqlite" "github.com/mahendrapaipuri/batchjob_monitoring/pkg/jobstats" ) @@ -25,40 +28,47 @@ var ( func main() { var ( - batchScheduler = kingpin.Flag( + batchScheduler = jobstats.JobstatDBApp.Flag( "batch.scheduler", "Name of batch scheduler (eg slurm, lsf, pbs).", ).Default("slurm").String() - dataPath = kingpin.Flag( + dataPath = jobstats.JobstatDBApp.Flag( "path.data", "Absolute path to a directory where job data is placed. SQLite DB that contains jobs stats will be saved to this directory.", ).Default("/var/lib/jobstats").String() - jobstatDBFile = kingpin.Flag( - "path.db.name", + jobstatDBFile = jobstats.JobstatDBApp.Flag( + "db.name", "Name of the SQLite DB file that contains jobs stats.", ).Default("jobstats.db").String() - jobstatDBTable = kingpin.Flag( - "path.db.table.name", + jobstatDBTable = jobstats.JobstatDBApp.Flag( + "db.table.name", "Name of the table in SQLite DB file that contains jobs stats.", ).Default("jobs").String() - retentionPeriod = kingpin.Flag( + updateInterval = jobstats.JobstatDBApp.Flag( + "db.update.interval", + "Time period in seconds at which DB will be updated with jobs stats.", + ).Default("1800").Int() + retentionPeriod = jobstats.JobstatDBApp.Flag( "data.retention.period", "Period in days for which job stats data will be retained.", ).Default("365").Int() - maxProcs = kingpin.Flag( + maxProcs = jobstats.JobstatDBApp.Flag( "runtime.gomaxprocs", "The target number of CPUs Go will run on (GOMAXPROCS)", ).Envar("GOMAXPROCS").Default("1").Int() ) promlogConfig := &promlog.Config{} - flag.AddFlags(kingpin.CommandLine, promlogConfig) - kingpin.Version(version.Print("batchjobstat")) - kingpin.CommandLine.UsageWriter(os.Stdout) - kingpin.HelpFlag.Short('h') - kingpin.Parse() + flag.AddFlags(jobstats.JobstatDBApp, promlogConfig) + jobstats.JobstatDBApp.Version(version.Print(jobstats.JobstatDBAppName)) + jobstats.JobstatDBApp.UsageWriter(os.Stdout) + jobstats.JobstatDBApp.HelpFlag.Short('h') + _, err := jobstats.JobstatDBApp.Parse(os.Args[1:]) + if err != nil { + panic(fmt.Sprintf("Failed to parse %s command", jobstats.JobstatDBAppName)) + } logger := promlog.New(promlogConfig) - level.Info(logger).Log("msg", "Running batchjob_jobstat", "version", version.Info()) + level.Info(logger).Log("msg", fmt.Sprintf("Running %s", jobstats.JobstatDBAppName), "version", version.Info()) level.Info(logger).Log("msg", "Build context", "build_context", version.BuildContext()) runtime.GOMAXPROCS(*maxProcs) @@ -74,7 +84,7 @@ func main() { // return // } // pprof.StartCPUProfile(f) - jobCollector := jobstats.NewJobStats( + jobCollector := jobstats.NewJobStatsDB( logger, *batchScheduler, jobstatDBPath, @@ -83,9 +93,30 @@ func main() { jobsLastTimeStampFile, vacuumLastTimeStampFile, ) - err := jobCollector.GetJobStats() - if err != nil { - level.Error(logger).Log("msg", "Failed to get job stats", "err", err) + + // Create a channel to propagate signals + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + // Start a ticker + ticker := time.NewTicker(time.Second * time.Duration(*updateInterval)) + defer ticker.Stop() + +loop: + for { + level.Info(logger).Log("msg", "Updating JobStats DB") + err := jobCollector.GetJobStats() + if err != nil { + level.Error(logger).Log("msg", "Failed to get job stats", "err", err) + } + + select { + case <-ticker.C: + continue + case <-interrupt: + level.Info(logger).Log("msg", "Received Interrupt. Stopping DB update") + break loop + } } // defer pprof.StopCPUProfile() } diff --git a/cmd/batchjob_stats/batchjob_stats_test.go b/cmd/batchjob_stats_db/batchjob_stats_db_test.go similarity index 77% rename from cmd/batchjob_stats/batchjob_stats_test.go rename to cmd/batchjob_stats_db/batchjob_stats_db_test.go index 6d8cb9d8..4db4a1be 100644 --- a/cmd/batchjob_stats/batchjob_stats_test.go +++ b/cmd/batchjob_stats_db/batchjob_stats_db_test.go @@ -9,12 +9,12 @@ import ( ) var ( - binary, _ = filepath.Abs("batchjob_stats") + binary, _ = filepath.Abs("bin/batchjob_stats_db") ) func TestBatchjobStatExecutable(t *testing.T) { if _, err := os.Stat(binary); err != nil { - t.Skipf("batchjob_stats binary not available, try to run `make build` first: %s", err) + t.Skipf("batchjob_stats_db binary not available, try to run `make build` first: %s", err) } tmpDir := t.TempDir() diff --git a/cmd/batchjob_stats_server/batchjob_stats_server.go b/cmd/batchjob_stats_server/batchjob_stats_server.go new file mode 100644 index 00000000..e40452cf --- /dev/null +++ b/cmd/batchjob_stats_server/batchjob_stats_server.go @@ -0,0 +1,112 @@ +// Main entrypoint for batchjob_stats_server + +package main + +import ( + "context" + "fmt" + _ "net/http/pprof" + "os" + "os/signal" + "runtime" + "syscall" + "time" + + "github.com/go-kit/log/level" + "github.com/mahendrapaipuri/batchjob_monitoring/pkg/jobstats" + "github.com/prometheus/common/promlog" + "github.com/prometheus/common/promlog/flag" + "github.com/prometheus/common/version" +) + +func main() { + var ( + webListenAddresses = jobstats.JobstatServerApp.Flag( + "web.listen-address", + "Addresses on which to expose metrics and web interface.", + ).Default(":9020").String() + webConfigFile = jobstats.JobstatServerApp.Flag( + "web.config.file", + "Path to configuration file that can enable TLS or authentication. See: https://github.com/prometheus/exporter-toolkit/blob/master/docs/web-configuration.md", + ).Default("").String() + jobstatDBFile = jobstats.JobstatServerApp.Flag( + "path.db", + "Absolute path to the SQLite DB file that contains jobs stats.", + ).Default("/var/lib/jobstats/jobstats.db").String() + jobstatDBTable = jobstats.JobstatServerApp.Flag( + "db.table.name", + "Name of the table in SQLite DB file that contains jobs stats.", + ).Default("jobs").String() + maxProcs = jobstats.JobstatServerApp.Flag( + "runtime.gomaxprocs", "The target number of CPUs Go will run on (GOMAXPROCS)", + ).Envar("GOMAXPROCS").Default("1").Int() + ) + systemdSocket := func() *bool { b := false; return &b }() // Socket activation only available on Linux + if runtime.GOOS == "linux" { + systemdSocket = jobstats.JobstatServerApp.Flag( + "web.systemd-socket", + "Use systemd socket activation listeners instead of port listeners (Linux only).", + ).Bool() + } + + promlogConfig := &promlog.Config{} + flag.AddFlags(jobstats.JobstatServerApp, promlogConfig) + jobstats.JobstatServerApp.Version(version.Print(jobstats.JobstatServerAppName)) + jobstats.JobstatServerApp.UsageWriter(os.Stdout) + jobstats.JobstatServerApp.HelpFlag.Short('h') + _, err := jobstats.JobstatServerApp.Parse(os.Args[1:]) + if err != nil { + panic(fmt.Sprintf("Failed to parse %s command", jobstats.JobstatDBAppName)) + } + logger := promlog.New(promlogConfig) + + level.Info(logger).Log("msg", fmt.Sprintf("Running %s", jobstats.JobstatServerAppName), "version", version.Info()) + level.Info(logger).Log("msg", "Build context", "build_context", version.BuildContext()) + + runtime.GOMAXPROCS(*maxProcs) + level.Debug(logger).Log("msg", "Go MAXPROCS", "procs", runtime.GOMAXPROCS(0)) + + // Create context that listens for the interrupt signal from the OS. + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + defer stop() + + config := &jobstats.Config{ + Logger: logger, + Address: *webListenAddresses, + WebSystemdSocket: *systemdSocket, + WebConfigFile: *webConfigFile, + JobstatDBFile: *jobstatDBFile, + JobstatDBTable: *jobstatDBTable, + } + + server, cleanup, err := jobstats.NewJobstatsServer(config) + defer cleanup() + if err != nil { + level.Error(logger).Log("msg", "Failed to create batchjob_stats_server", "err", err) + } + + // Initializing the server in a goroutine so that + // it won't block the graceful shutdown handling below + go func() { + if err := server.Start(); err != nil { + level.Error(logger).Log("msg", "Failed to start server", "err", err) + } + }() + + // Listen for the interrupt signal. + <-ctx.Done() + + // Restore default behavior on the interrupt signal and notify user of shutdown. + stop() + level.Info(logger).Log("msg", "Shutting down gracefully, press Ctrl+C again to force") + + // The context is used to inform the server it has 5 seconds to finish + // the request it is currently handling + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := server.Shutdown(ctx); err != nil { + level.Error(logger).Log("msg", "Failed to gracefully shutdown server", "err", err) + } + + level.Info(logger).Log("msg", "Server exiting") +} diff --git a/cmd/batchjob_stats_server/batchjob_stats_server_test.go b/cmd/batchjob_stats_server/batchjob_stats_server_test.go new file mode 100644 index 00000000..539f862a --- /dev/null +++ b/cmd/batchjob_stats_server/batchjob_stats_server_test.go @@ -0,0 +1,39 @@ +package main + +import ( + "fmt" + "os" + "os/exec" + "path/filepath" + "testing" +) + +var ( + binary, _ = filepath.Abs("bin/batchjob_stats_server") +) + +func TestBatchjobStatsServerExecutable(t *testing.T) { + if _, err := os.Stat(binary); err != nil { + t.Skipf("batchjob_stats_server binary not available, try to run `make build` first: %s", err) + } + tmpDir := t.TempDir() + dbPath := filepath.Join(tmpDir, "test.db") + + file, err := os.Create(dbPath) + if err != nil { + t.Errorf("Failed to create DB file: %s", err) + } + file.Close() + + jobstats := exec.Command(binary, "--path.db", dbPath) + if err := runCommandAndTests(jobstats); err != nil { + t.Error(err) + } +} + +func runCommandAndTests(cmd *exec.Cmd) error { + if err := cmd.Start(); err != nil { + return fmt.Errorf("failed to start command: %s", err) + } + return nil +} diff --git a/go.mod b/go.mod index bea16d0a..e9f22108 100644 --- a/go.mod +++ b/go.mod @@ -7,12 +7,13 @@ require ( github.com/containerd/cgroups/v3 v3.0.2 github.com/go-kit/log v0.2.1 github.com/google/uuid v1.4.0 + github.com/gorilla/mux v1.8.1 + github.com/mattn/go-sqlite3 v1.14.18 github.com/prometheus/client_golang v1.17.0 github.com/prometheus/common v0.45.0 github.com/prometheus/exporter-toolkit v0.10.0 github.com/prometheus/procfs v0.12.0 github.com/zeebo/xxh3 v1.0.2 - modernc.org/sqlite v1.27.0 ) require ( @@ -21,40 +22,25 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/docker/go-units v0.4.0 // indirect - github.com/dustin/go-humanize v1.0.1 // indirect github.com/go-logfmt/logfmt v0.5.1 // indirect github.com/godbus/dbus/v5 v5.1.0 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/jpillora/backoff v1.0.0 // indirect - github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect github.com/klauspost/cpuid/v2 v2.2.3 // indirect github.com/kr/text v0.2.0 // indirect - github.com/mattn/go-isatty v0.0.16 // indirect github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect github.com/opencontainers/runtime-spec v1.0.2 // indirect github.com/prometheus/client_model v0.5.0 // indirect - github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/stretchr/testify v1.8.4 // indirect github.com/xhit/go-str2duration/v2 v2.1.0 // indirect golang.org/x/crypto v0.14.0 // indirect - golang.org/x/mod v0.8.0 // indirect golang.org/x/net v0.17.0 // indirect golang.org/x/oauth2 v0.12.0 // indirect golang.org/x/sync v0.3.0 // indirect golang.org/x/sys v0.13.0 // indirect golang.org/x/text v0.13.0 // indirect - golang.org/x/tools v0.6.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/protobuf v1.31.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect - lukechampine.com/uint128 v1.2.0 // indirect - modernc.org/cc/v3 v3.40.0 // indirect - modernc.org/ccgo/v3 v3.16.13 // indirect - modernc.org/libc v1.29.0 // indirect - modernc.org/mathutil v1.6.0 // indirect - modernc.org/memory v1.7.2 // indirect - modernc.org/opt v0.1.3 // indirect - modernc.org/strutil v1.1.3 // indirect - modernc.org/token v1.0.1 // indirect ) diff --git a/go.sum b/go.sum index 92f0e5c1..6e6dc644 100644 --- a/go.sum +++ b/go.sum @@ -16,8 +16,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw= github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= -github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= -github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU= github.com/go-kit/log v0.2.1/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0= github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA= @@ -32,24 +30,20 @@ github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 h1:Xim43kblpZXfIBQsbuBVKCudVG457BR2GZFIz3uw3hQ= -github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo= github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= +github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= -github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= -github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= github.com/klauspost/cpuid/v2 v2.2.3 h1:sxCkb+qR91z4vsqw4vGGZlDgPz3G7gjaLyK3V8y70BU= github.com/klauspost/cpuid/v2 v2.2.3/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ= -github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= -github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y= -github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= +github.com/mattn/go-sqlite3 v1.14.18 h1:JL0eqdCOq6DJVNPSvArO/bIV9/P7fbGrV00LZHc+5aI= +github.com/mattn/go-sqlite3 v1.14.18/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg= github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU= @@ -68,8 +62,6 @@ github.com/prometheus/exporter-toolkit v0.10.0 h1:yOAzZTi4M22ZzVxD+fhy1URTuNRj/3 github.com/prometheus/exporter-toolkit v0.10.0/go.mod h1:+sVFzuvV5JDyw+Ih6p3zFxZNVnKQa3x5qPmDSiPu4ZY= github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= -github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= -github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -85,8 +77,6 @@ github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaD golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= -golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8= -golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= @@ -96,7 +86,6 @@ golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -104,8 +93,6 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM= -golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= @@ -121,31 +108,3 @@ gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -lukechampine.com/uint128 v1.2.0 h1:mBi/5l91vocEN8otkC5bDLhi2KdCticRiwbdB0O+rjI= -lukechampine.com/uint128 v1.2.0/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk= -modernc.org/cc/v3 v3.40.0 h1:P3g79IUS/93SYhtoeaHW+kRCIrYaxJ27MFPv+7kaTOw= -modernc.org/cc/v3 v3.40.0/go.mod h1:/bTg4dnWkSXowUO6ssQKnOV0yMVxDYNIsIrzqTFDGH0= -modernc.org/ccgo/v3 v3.16.13 h1:Mkgdzl46i5F/CNR/Kj80Ri59hC8TKAhZrYSaqvkwzUw= -modernc.org/ccgo/v3 v3.16.13/go.mod h1:2Quk+5YgpImhPjv2Qsob1DnZ/4som1lJTodubIcoUkY= -modernc.org/ccorpus v1.11.6 h1:J16RXiiqiCgua6+ZvQot4yUuUy8zxgqbqEEUuGPlISk= -modernc.org/ccorpus v1.11.6/go.mod h1:2gEUTrWqdpH2pXsmTM1ZkjeSrUWDpjMu2T6m29L/ErQ= -modernc.org/httpfs v1.0.6 h1:AAgIpFZRXuYnkjftxTAZwMIiwEqAfk8aVB2/oA6nAeM= -modernc.org/httpfs v1.0.6/go.mod h1:7dosgurJGp0sPaRanU53W4xZYKh14wfzX420oZADeHM= -modernc.org/libc v1.29.0 h1:tTFRFq69YKCF2QyGNuRUQxKBm1uZZLubf6Cjh/pVHXs= -modernc.org/libc v1.29.0/go.mod h1:DaG/4Q3LRRdqpiLyP0C2m1B8ZMGkQ+cCgOIjEtQlYhQ= -modernc.org/mathutil v1.6.0 h1:fRe9+AmYlaej+64JsEEhoWuAYBkOtQiMEU7n/XgfYi4= -modernc.org/mathutil v1.6.0/go.mod h1:Ui5Q9q1TR2gFm0AQRqQUaBWFLAhQpCwNcuhBOSedWPo= -modernc.org/memory v1.7.2 h1:Klh90S215mmH8c9gO98QxQFsY+W451E8AnzjoE2ee1E= -modernc.org/memory v1.7.2/go.mod h1:NO4NVCQy0N7ln+T9ngWqOQfi7ley4vpwvARR+Hjw95E= -modernc.org/opt v0.1.3 h1:3XOZf2yznlhC+ibLltsDGzABUGVx8J6pnFMS3E4dcq4= -modernc.org/opt v0.1.3/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0= -modernc.org/sqlite v1.27.0 h1:MpKAHoyYB7xqcwnUwkuD+npwEa0fojF0B5QRbN+auJ8= -modernc.org/sqlite v1.27.0/go.mod h1:Qxpazz0zH8Z1xCFyi5GSL3FzbtZ3fvbjmywNogldEW0= -modernc.org/strutil v1.1.3 h1:fNMm+oJklMGYfU9Ylcywl0CO5O6nTfaowNsh2wpPjzY= -modernc.org/strutil v1.1.3/go.mod h1:MEHNA7PdEnEwLvspRMtWTNnp2nnyvMfkimT1NKNAGbw= -modernc.org/tcl v1.15.2 h1:C4ybAYCGJw968e+Me18oW55kD/FexcHbqH2xak1ROSY= -modernc.org/tcl v1.15.2/go.mod h1:3+k/ZaEbKrC8ePv8zJWPtBSW0V7Gg9g8rkmhI1Kfs3c= -modernc.org/token v1.0.1 h1:A3qvTqOwexpfZZeyI0FeGPDlSWX5pjZu9hF4lU+EKWg= -modernc.org/token v1.0.1/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= -modernc.org/z v1.7.3 h1:zDJf6iHjrnB+WRD88stbXokugjyc0/pB91ri1gO6LZY= -modernc.org/z v1.7.3/go.mod h1:Ipv4tsdxZRbQyLq9Q1M6gdbkxYzdlrciF2Hi/lS7nWE= diff --git a/internal/helpers/helpers.go b/internal/helpers/helpers.go new file mode 100644 index 00000000..47e3b48a --- /dev/null +++ b/internal/helpers/helpers.go @@ -0,0 +1,30 @@ +package helpers + +import ( + "fmt" + "os/exec" + "strings" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/google/uuid" + "github.com/zeebo/xxh3" +) + +// Get a UUID5 for given slice of strings +func GetUuidFromString(stringSlice []string) (string, error) { + s := strings.Join(stringSlice[:], ",") + h := xxh3.HashString128(s).Bytes() + uuid, err := uuid.FromBytes(h[:]) + return uuid.String(), err +} + +// Execute command and return stdout/stderr +func Execute(cmd string, args []string, logger log.Logger) ([]byte, error) { + level.Debug(logger).Log("msg", "Executing", "command", cmd, "args", fmt.Sprintf("%+v", args)) + out, err := exec.Command(cmd, args...).CombinedOutput() + if err != nil { + err = fmt.Errorf("error running %s: %s", cmd, err) + } + return out, err +} diff --git a/pkg/collector/emissions.go b/pkg/collector/emissions.go index 17ca8c51..6c02a59d 100644 --- a/pkg/collector/emissions.go +++ b/pkg/collector/emissions.go @@ -4,8 +4,6 @@ package collector import ( - "bytes" - "encoding/json" "net/http" "time" @@ -14,7 +12,7 @@ import ( "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" - utils "github.com/mahendrapaipuri/batchjob_monitoring/pkg/utils" + "github.com/mahendrapaipuri/batchjob_monitoring/pkg/emissions" ) const emissionsCollectorSubsystem = "emissions" @@ -29,57 +27,24 @@ type emissionsCollector struct { } var ( - countryCode = kingpin.Flag("collector.emissions.country.code", "ISO 3166-1 alpha-3 Country code. OWID energy data [https://github.com/owid/energy-data] estimated constant emission factor is used for all countries except for France. A real time emission factor will be used for France from RTE eCO2 mix [https://www.rte-france.com/en/eco2mix/co2-emissions] data."). - Default("FRA"). - String() - globalEnergyMixDataUrl = "https://raw.githubusercontent.com/mlco2/codecarbon/master/codecarbon/data/private_infra/global_energy_mix.json" - globalEmissionFactor = 475 - getRteEnergyMixData = utils.GetRteEnergyMixData + countryCode = kingpin.Flag( + "collector.emissions.country.code", + `ISO 3166-1 alpha-3 Country code. OWID energy data [https://github.com/owid/energy-data] +estimated constant emission factor is used for all countries except for France. +A real time emission factor will be used for France from RTE eCO2 mix +[https://www.rte-france.com/en/eco2mix/co2-emissions] data.`, + ).Default("FRA").String() + globalEmissionFactor = emissions.GlobalEmissionFactor + getRteEnergyMixData = emissions.GetRteEnergyMixEmissionData ) func init() { registerCollector(emissionsCollectorSubsystem, defaultDisabled, NewEmissionsCollector) } -// Read JSON file from GitHub to get global energy mix data -func readJSONFromUrl(url string, logger log.Logger) (map[string]float64, error) { - resp, err := http.Get(url) - if err != nil { - level.Error(logger).Log("msg", "Failed to get global energy mix data", "err", err) - return nil, err - } - - defer resp.Body.Close() - buf := new(bytes.Buffer) - _, err = buf.ReadFrom(resp.Body) - if err != nil { - level.Error(logger).Log("msg", "Failed to read responde body", "err", err) - return nil, err - } - respByte := buf.Bytes() - // JSON might contain NaN, replace it with null that is allowed in JSON - respByte = bytes.Replace(respByte, []byte("NaN"), []byte("null"), -1) - var globalEmissionData map[string]energyMixDataFields - if err := json.Unmarshal(respByte, &globalEmissionData); err != nil { - level.Error(logger).Log("msg", "Failed to unmarshal JSON data", "err", err) - return nil, err - } - - var countryEmissionData = make(map[string]float64) - for country, data := range globalEmissionData { - // Set unavaible values to -1 to indicate they are indeed unavailable - if data.CarbonIntensity == 0 { - countryEmissionData[country] = -1 - } else { - countryEmissionData[country] = data.CarbonIntensity - } - } - return countryEmissionData, nil -} - // NewEmissionsCollector returns a new Collector exposing emission factor metrics. func NewEmissionsCollector(logger log.Logger) (Collector, error) { - energyData, err := readJSONFromUrl(globalEnergyMixDataUrl, logger) + energyData, err := emissions.GetEnergyMixData(http.DefaultClient, logger) if err != nil { level.Error(logger).Log("msg", "Failed to read Global energy mix data", "err", err) } @@ -146,7 +111,7 @@ func (c *emissionsCollector) getCachedEmissionFactorFrance() float64 { // Get current emission factor for France from RTE energy data mix func (c *emissionsCollector) getCurrentEmissionFactorFrance() float64 { - emissionFactor, err := getRteEnergyMixData() + emissionFactor, err := getRteEnergyMixData(http.DefaultClient, c.logger) if err != nil { level.Error(c.logger).Log("msg", "Failed to get emissions from RTE", "err", err) if emissionFactor, ok := c.energyData["FRA"]; ok { diff --git a/pkg/collector/emissions_test.go b/pkg/collector/emissions_test.go index 9fbee4d5..044ba4f1 100644 --- a/pkg/collector/emissions_test.go +++ b/pkg/collector/emissions_test.go @@ -7,16 +7,21 @@ import ( "testing" "github.com/go-kit/log" + "github.com/mahendrapaipuri/batchjob_monitoring/pkg/emissions" +) + +var ( + logger = log.NewNopLogger() ) func TestEmissionsMetrics(t *testing.T) { // Mock GetRteEnergyMixData expectedFactor := float64(55) - getRteEnergyMixData = func() (float64, error) { + getRteEnergyMixData = func(client emissions.Client, logger log.Logger) (float64, error) { return expectedFactor, nil } c := emissionsCollector{ - logger: log.NewNopLogger(), + logger: logger, countryCode: "FRA", energyData: map[string]float64{"FRA": 50, "DEU": 51, "CHL": 52}, } @@ -26,7 +31,7 @@ func TestEmissionsMetrics(t *testing.T) { } // Change mock to give different value - getRteEnergyMixData = func() (float64, error) { + getRteEnergyMixData = func(client emissions.Client, logger log.Logger) (float64, error) { return float64(65), nil } value = c.getCurrentEmissionFactor() diff --git a/pkg/collector/fixtures/e2e-test-cgroupsv1-output.txt b/pkg/collector/fixtures/e2e-test-cgroupsv1-output.txt index aff897d9..6b87f4c1 100644 --- a/pkg/collector/fixtures/e2e-test-cgroupsv1-output.txt +++ b/pkg/collector/fixtures/e2e-test-cgroupsv1-output.txt @@ -1,15 +1,15 @@ # HELP batchjob_cpu_system_seconds Cumulative CPU system seconds # TYPE batchjob_cpu_system_seconds gauge -batchjob_cpu_system_seconds{batch="slurm",jobgid="1000",jobid="1009248",jobuid="1000",jobuuid="8d4fad6d-c5e3-775b-8a8c-707e319114ec",step="",task=""} 0.45 +batchjob_cpu_system_seconds{batch="slurm",jobaccount="testacc",jobid="1009248",jobuuid="7f6c39b7-2740-fc1f-32c2-8fc28880829c",step="",task=""} 0.45 # HELP batchjob_cpu_total_seconds Cumulative CPU total seconds # TYPE batchjob_cpu_total_seconds gauge -batchjob_cpu_total_seconds{batch="slurm",jobgid="1000",jobid="1009248",jobuid="1000",jobuuid="8d4fad6d-c5e3-775b-8a8c-707e319114ec",step="",task=""} 1.012410966 +batchjob_cpu_total_seconds{batch="slurm",jobaccount="testacc",jobid="1009248",jobuuid="7f6c39b7-2740-fc1f-32c2-8fc28880829c",step="",task=""} 1.012410966 # HELP batchjob_cpu_user_seconds Cumulative CPU user seconds # TYPE batchjob_cpu_user_seconds gauge -batchjob_cpu_user_seconds{batch="slurm",jobgid="1000",jobid="1009248",jobuid="1000",jobuuid="8d4fad6d-c5e3-775b-8a8c-707e319114ec",step="",task=""} 0.39 +batchjob_cpu_user_seconds{batch="slurm",jobaccount="testacc",jobid="1009248",jobuuid="7f6c39b7-2740-fc1f-32c2-8fc28880829c",step="",task=""} 0.39 # HELP batchjob_cpus Number of CPUs # TYPE batchjob_cpus gauge -batchjob_cpus{batch="slurm",jobgid="1000",jobid="1009248",jobuid="1000",jobuuid="8d4fad6d-c5e3-775b-8a8c-707e319114ec",step="",task=""} 0 +batchjob_cpus{batch="slurm",jobaccount="testacc",jobid="1009248",jobuuid="7f6c39b7-2740-fc1f-32c2-8fc28880829c",step="",task=""} 0 # HELP batchjob_exporter_build_info A metric with a constant '1' value labeled by version, revision, branch, goversion from which batchjob_exporter was built, and the goos and goarch for the build. # TYPE batchjob_exporter_build_info gauge # HELP batchjob_ipmi_dcmi_watts_total Current Power consumption in watts @@ -17,28 +17,28 @@ batchjob_cpus{batch="slurm",jobgid="1000",jobid="1009248",jobuid="1000",jobuuid= batchjob_ipmi_dcmi_watts_total 332 # HELP batchjob_memory_cache_bytes Memory cache used in bytes # TYPE batchjob_memory_cache_bytes gauge -batchjob_memory_cache_bytes{batch="slurm",jobgid="1000",jobid="1009248",jobuid="1000",jobuuid="8d4fad6d-c5e3-775b-8a8c-707e319114ec",step="",task=""} 2.1086208e+07 +batchjob_memory_cache_bytes{batch="slurm",jobaccount="testacc",jobid="1009248",jobuuid="7f6c39b7-2740-fc1f-32c2-8fc28880829c",step="",task=""} 2.1086208e+07 # HELP batchjob_memory_fail_count Memory fail count # TYPE batchjob_memory_fail_count gauge -batchjob_memory_fail_count{batch="slurm",jobgid="1000",jobid="1009248",jobuid="1000",jobuuid="8d4fad6d-c5e3-775b-8a8c-707e319114ec",step="",task=""} 0 +batchjob_memory_fail_count{batch="slurm",jobaccount="testacc",jobid="1009248",jobuuid="7f6c39b7-2740-fc1f-32c2-8fc28880829c",step="",task=""} 0 # HELP batchjob_memory_rss_bytes Memory RSS used in bytes # TYPE batchjob_memory_rss_bytes gauge -batchjob_memory_rss_bytes{batch="slurm",jobgid="1000",jobid="1009248",jobuid="1000",jobuuid="8d4fad6d-c5e3-775b-8a8c-707e319114ec",step="",task=""} 1.0407936e+07 +batchjob_memory_rss_bytes{batch="slurm",jobaccount="testacc",jobid="1009248",jobuuid="7f6c39b7-2740-fc1f-32c2-8fc28880829c",step="",task=""} 1.0407936e+07 # HELP batchjob_memory_total_bytes Memory total in bytes # TYPE batchjob_memory_total_bytes gauge -batchjob_memory_total_bytes{batch="slurm",jobgid="1000",jobid="1009248",jobuid="1000",jobuuid="8d4fad6d-c5e3-775b-8a8c-707e319114ec",step="",task=""} 2.01362030592e+11 +batchjob_memory_total_bytes{batch="slurm",jobaccount="testacc",jobid="1009248",jobuuid="7f6c39b7-2740-fc1f-32c2-8fc28880829c",step="",task=""} 2.01362030592e+11 # HELP batchjob_memory_used_bytes Memory used in bytes # TYPE batchjob_memory_used_bytes gauge -batchjob_memory_used_bytes{batch="slurm",jobgid="1000",jobid="1009248",jobuid="1000",jobuuid="8d4fad6d-c5e3-775b-8a8c-707e319114ec",step="",task=""} 4.0194048e+07 +batchjob_memory_used_bytes{batch="slurm",jobaccount="testacc",jobid="1009248",jobuuid="7f6c39b7-2740-fc1f-32c2-8fc28880829c",step="",task=""} 4.0194048e+07 # HELP batchjob_memsw_fail_count Swap fail count # TYPE batchjob_memsw_fail_count gauge -batchjob_memsw_fail_count{batch="slurm",jobgid="1000",jobid="1009248",jobuid="1000",jobuuid="8d4fad6d-c5e3-775b-8a8c-707e319114ec",step="",task=""} 0 +batchjob_memsw_fail_count{batch="slurm",jobaccount="testacc",jobid="1009248",jobuuid="7f6c39b7-2740-fc1f-32c2-8fc28880829c",step="",task=""} 0 # HELP batchjob_memsw_total_bytes Swap total in bytes # TYPE batchjob_memsw_total_bytes gauge -batchjob_memsw_total_bytes{batch="slurm",jobgid="1000",jobid="1009248",jobuid="1000",jobuuid="8d4fad6d-c5e3-775b-8a8c-707e319114ec",step="",task=""} 9.223372036854772e+18 +batchjob_memsw_total_bytes{batch="slurm",jobaccount="testacc",jobid="1009248",jobuuid="7f6c39b7-2740-fc1f-32c2-8fc28880829c",step="",task=""} 9.223372036854772e+18 # HELP batchjob_memsw_used_bytes Swap used in bytes # TYPE batchjob_memsw_used_bytes gauge -batchjob_memsw_used_bytes{batch="slurm",jobgid="1000",jobid="1009248",jobuid="1000",jobuuid="8d4fad6d-c5e3-775b-8a8c-707e319114ec",step="",task=""} 4.032512e+07 +batchjob_memsw_used_bytes{batch="slurm",jobaccount="testacc",jobid="1009248",jobuuid="7f6c39b7-2740-fc1f-32c2-8fc28880829c",step="",task=""} 4.032512e+07 # HELP batchjob_nvidia_gpu_jobid Batch Job ID of current nVIDIA GPU # TYPE batchjob_nvidia_gpu_jobid gauge batchjob_nvidia_gpu_jobid{uuid="GPU-61a65011-6571-a6d2-5ab8-66cbb6f7f9c3"} 11000 diff --git a/pkg/collector/fixtures/e2e-test-cgroupsv2-output.txt b/pkg/collector/fixtures/e2e-test-cgroupsv2-output.txt index 602c252c..df5cded4 100644 --- a/pkg/collector/fixtures/e2e-test-cgroupsv2-output.txt +++ b/pkg/collector/fixtures/e2e-test-cgroupsv2-output.txt @@ -1,15 +1,15 @@ # HELP batchjob_cpu_system_seconds Cumulative CPU system seconds # TYPE batchjob_cpu_system_seconds gauge -batchjob_cpu_system_seconds{batch="slurm",jobgid="1000",jobid="1009248",jobuid="1000",jobuuid="8d4fad6d-c5e3-775b-8a8c-707e319114ec",step="",task=""} 115.777502 +batchjob_cpu_system_seconds{batch="slurm",jobaccount="testacc",jobid="1009248",jobuuid="7f6c39b7-2740-fc1f-32c2-8fc28880829c",step="",task=""} 115.777502 # HELP batchjob_cpu_total_seconds Cumulative CPU total seconds # TYPE batchjob_cpu_total_seconds gauge -batchjob_cpu_total_seconds{batch="slurm",jobgid="1000",jobid="1009248",jobuid="1000",jobuuid="8d4fad6d-c5e3-775b-8a8c-707e319114ec",step="",task=""} 60491.070351 +batchjob_cpu_total_seconds{batch="slurm",jobaccount="testacc",jobid="1009248",jobuuid="7f6c39b7-2740-fc1f-32c2-8fc28880829c",step="",task=""} 60491.070351 # HELP batchjob_cpu_user_seconds Cumulative CPU user seconds # TYPE batchjob_cpu_user_seconds gauge -batchjob_cpu_user_seconds{batch="slurm",jobgid="1000",jobid="1009248",jobuid="1000",jobuuid="8d4fad6d-c5e3-775b-8a8c-707e319114ec",step="",task=""} 60375.292848 +batchjob_cpu_user_seconds{batch="slurm",jobaccount="testacc",jobid="1009248",jobuuid="7f6c39b7-2740-fc1f-32c2-8fc28880829c",step="",task=""} 60375.292848 # HELP batchjob_cpus Number of CPUs # TYPE batchjob_cpus gauge -batchjob_cpus{batch="slurm",jobgid="1000",jobid="1009248",jobuid="1000",jobuuid="8d4fad6d-c5e3-775b-8a8c-707e319114ec",step="",task=""} 2 +batchjob_cpus{batch="slurm",jobaccount="testacc",jobid="1009248",jobuuid="7f6c39b7-2740-fc1f-32c2-8fc28880829c",step="",task=""} 2 # HELP batchjob_exporter_build_info A metric with a constant '1' value labeled by version, revision, branch, goversion from which batchjob_exporter was built, and the goos and goarch for the build. # TYPE batchjob_exporter_build_info gauge # HELP batchjob_ipmi_dcmi_watts_total Current Power consumption in watts @@ -17,28 +17,28 @@ batchjob_cpus{batch="slurm",jobgid="1000",jobid="1009248",jobuid="1000",jobuuid= batchjob_ipmi_dcmi_watts_total 332 # HELP batchjob_memory_cache_bytes Memory cache used in bytes # TYPE batchjob_memory_cache_bytes gauge -batchjob_memory_cache_bytes{batch="slurm",jobgid="1000",jobid="1009248",jobuid="1000",jobuuid="8d4fad6d-c5e3-775b-8a8c-707e319114ec",step="",task=""} 0 +batchjob_memory_cache_bytes{batch="slurm",jobaccount="testacc",jobid="1009248",jobuuid="7f6c39b7-2740-fc1f-32c2-8fc28880829c",step="",task=""} 0 # HELP batchjob_memory_fail_count Memory fail count # TYPE batchjob_memory_fail_count gauge -batchjob_memory_fail_count{batch="slurm",jobgid="1000",jobid="1009248",jobuid="1000",jobuuid="8d4fad6d-c5e3-775b-8a8c-707e319114ec",step="",task=""} 0 +batchjob_memory_fail_count{batch="slurm",jobaccount="testacc",jobid="1009248",jobuuid="7f6c39b7-2740-fc1f-32c2-8fc28880829c",step="",task=""} 0 # HELP batchjob_memory_rss_bytes Memory RSS used in bytes # TYPE batchjob_memory_rss_bytes gauge -batchjob_memory_rss_bytes{batch="slurm",jobgid="1000",jobid="1009248",jobuid="1000",jobuuid="8d4fad6d-c5e3-775b-8a8c-707e319114ec",step="",task=""} 4.098592768e+09 +batchjob_memory_rss_bytes{batch="slurm",jobaccount="testacc",jobid="1009248",jobuuid="7f6c39b7-2740-fc1f-32c2-8fc28880829c",step="",task=""} 4.098592768e+09 # HELP batchjob_memory_total_bytes Memory total in bytes # TYPE batchjob_memory_total_bytes gauge -batchjob_memory_total_bytes{batch="slurm",jobgid="1000",jobid="1009248",jobuid="1000",jobuuid="8d4fad6d-c5e3-775b-8a8c-707e319114ec",step="",task=""} 4.294967296e+09 +batchjob_memory_total_bytes{batch="slurm",jobaccount="testacc",jobid="1009248",jobuuid="7f6c39b7-2740-fc1f-32c2-8fc28880829c",step="",task=""} 4.294967296e+09 # HELP batchjob_memory_used_bytes Memory used in bytes # TYPE batchjob_memory_used_bytes gauge -batchjob_memory_used_bytes{batch="slurm",jobgid="1000",jobid="1009248",jobuid="1000",jobuuid="8d4fad6d-c5e3-775b-8a8c-707e319114ec",step="",task=""} 4.111491072e+09 +batchjob_memory_used_bytes{batch="slurm",jobaccount="testacc",jobid="1009248",jobuuid="7f6c39b7-2740-fc1f-32c2-8fc28880829c",step="",task=""} 4.111491072e+09 # HELP batchjob_memsw_fail_count Swap fail count # TYPE batchjob_memsw_fail_count gauge -batchjob_memsw_fail_count{batch="slurm",jobgid="1000",jobid="1009248",jobuid="1000",jobuuid="8d4fad6d-c5e3-775b-8a8c-707e319114ec",step="",task=""} 0 +batchjob_memsw_fail_count{batch="slurm",jobaccount="testacc",jobid="1009248",jobuuid="7f6c39b7-2740-fc1f-32c2-8fc28880829c",step="",task=""} 0 # HELP batchjob_memsw_total_bytes Swap total in bytes # TYPE batchjob_memsw_total_bytes gauge -batchjob_memsw_total_bytes{batch="slurm",jobgid="1000",jobid="1009248",jobuid="1000",jobuuid="8d4fad6d-c5e3-775b-8a8c-707e319114ec",step="",task=""} 0 +batchjob_memsw_total_bytes{batch="slurm",jobaccount="testacc",jobid="1009248",jobuuid="7f6c39b7-2740-fc1f-32c2-8fc28880829c",step="",task=""} 0 # HELP batchjob_memsw_used_bytes Swap used in bytes # TYPE batchjob_memsw_used_bytes gauge -batchjob_memsw_used_bytes{batch="slurm",jobgid="1000",jobid="1009248",jobuid="1000",jobuuid="8d4fad6d-c5e3-775b-8a8c-707e319114ec",step="",task=""} 0 +batchjob_memsw_used_bytes{batch="slurm",jobaccount="testacc",jobid="1009248",jobuuid="7f6c39b7-2740-fc1f-32c2-8fc28880829c",step="",task=""} 0 # HELP batchjob_nvidia_gpu_jobid Batch Job ID of current nVIDIA GPU # TYPE batchjob_nvidia_gpu_jobid gauge batchjob_nvidia_gpu_jobid{uuid="GPU-61a65011-6571-a6d2-5ab8-66cbb6f7f9c3"} 11000 diff --git a/pkg/collector/fixtures/slurmjobstat/1009248 b/pkg/collector/fixtures/slurmjobstat/1009248 index 4f0965a4..e0340a72 100644 --- a/pkg/collector/fixtures/slurmjobstat/1009248 +++ b/pkg/collector/fixtures/slurmjobstat/1009248 @@ -1 +1 @@ -1000 1000 compute-[0-2] /home/user/slurm +1000 testacc compute-[0-2] /home/user/slurm diff --git a/pkg/collector/helper.go b/pkg/collector/helper.go index 6625bb59..ba9292eb 100644 --- a/pkg/collector/helper.go +++ b/pkg/collector/helper.go @@ -2,6 +2,9 @@ package collector import ( "os" + "path/filepath" + "strconv" + "strings" "regexp" ) @@ -33,3 +36,42 @@ func fileExists(filename string) bool { func SanitizeMetricName(metricName string) string { return metricNameRegex.ReplaceAllString(metricName, "_") } + +// Load cgroups v2 metrics from a given path +func LoadCgroupsV2Metrics( + name string, + cgroupfsPath string, + controllers []string, +) (map[string]float64, error) { + data := make(map[string]float64) + + for _, fName := range controllers { + contents, err := os.ReadFile(filepath.Join(cgroupfsPath, name, fName)) + if err != nil { + return data, err + } + for _, line := range strings.Split(string(contents), "\n") { + // Some of the above have a single value and others have a "data_name 123" + parts := strings.Fields(line) + indName := fName + indData := 0 + if len(parts) == 1 || len(parts) == 2 { + if len(parts) == 2 { + indName += "." + parts[0] + indData = 1 + } + if parts[indData] == "max" { + data[indName] = -1.0 + } else { + f, err := strconv.ParseFloat(parts[indData], 64) + if err == nil { + data[indName] = f + } else { + return data, err + } + } + } + } + } + return data, nil +} diff --git a/pkg/collector/ipmi.go b/pkg/collector/ipmi.go index 9a93c6f0..88cbc64b 100644 --- a/pkg/collector/ipmi.go +++ b/pkg/collector/ipmi.go @@ -15,9 +15,8 @@ import ( "github.com/alecthomas/kingpin/v2" "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/mahendrapaipuri/batchjob_monitoring/internal/helpers" "github.com/prometheus/client_golang/prometheus" - - utils "github.com/mahendrapaipuri/batchjob_monitoring/pkg/utils" ) const ipmiCollectorSubsystem = "ipmi_dcmi" @@ -29,9 +28,10 @@ type impiCollector struct { } var ( - ipmiDcmiWrapperExec = kingpin.Flag("collector.ipmi.dcmi.wrapper.path", "Path to IPMI DCMI executable wrapper."). - Default("ipmi-dcmi-wrapper"). - String() + ipmiDcmiWrapperExec = kingpin.Flag( + "collector.ipmi.dcmi.wrapper.path", + "Path to IPMI DCMI executable wrapper.", + ).Default("ipmi-dcmi-wrapper").String() ipmiDCMIPowerMeasurementRegex = regexp.MustCompile( `^Power Measurement\s*:\s*(?PActive|Not\sAvailable).*`, ) @@ -79,7 +79,7 @@ func getValue(ipmiOutput []byte, regex *regexp.Regexp) (string, error) { // Update implements Collector and exposes IPMI DCMI power related metrics. func (c *impiCollector) Update(ch chan<- prometheus.Metric) error { args := []string{""} - stdOut, err := utils.Execute(*ipmiDcmiWrapperExec, args, c.logger) + stdOut, err := helpers.Execute(*ipmiDcmiWrapperExec, args, c.logger) if err != nil { return err } diff --git a/pkg/collector/nvidia_gpus.go b/pkg/collector/nvidia_gpus.go index a57bd716..65d80daf 100644 --- a/pkg/collector/nvidia_gpus.go +++ b/pkg/collector/nvidia_gpus.go @@ -11,17 +11,17 @@ import ( "github.com/alecthomas/kingpin/v2" "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/mahendrapaipuri/batchjob_monitoring/internal/helpers" "github.com/prometheus/client_golang/prometheus" - - utils "github.com/mahendrapaipuri/batchjob_monitoring/pkg/utils" ) const nvidiaGpuJobMapCollectorSubsystem = "nvidia_gpu" var ( - gpuStatPath = kingpin.Flag("collector.nvidia.gpu.stat.path", "Path to gpustat file that maps GPU ordinals to job IDs."). - Default("/run/gpustat"). - String() + gpuStatPath = kingpin.Flag( + "collector.nvidia.gpu.stat.path", + "Path to gpustat file that maps GPU ordinals to job IDs.", + ).Default("/run/gpustat").String() ) type Device struct { @@ -58,7 +58,7 @@ func init() { // NOTE: Hoping this command returns MIG devices too func getAllDevices(logger log.Logger) ([]Device, error) { args := []string{"--query-gpu=name,uuid", "--format=csv"} - nvidiaSmiOutput, err := utils.Execute("nvidia-smi", args, logger) + nvidiaSmiOutput, err := helpers.Execute("nvidia-smi", args, logger) if err != nil { level.Error(logger). Log("msg", "nvidia-smi command to get list of devices failed", "err", err) diff --git a/pkg/collector/rapl.go b/pkg/collector/rapl.go index 5df9c1a3..4589d8c7 100644 --- a/pkg/collector/rapl.go +++ b/pkg/collector/rapl.go @@ -32,9 +32,10 @@ func init() { } var ( - raplZoneLabel = kingpin.Flag("collector.rapl.enable-zone-label", "Enables RAPL zone labels"). - Default("false"). - Bool() + raplZoneLabel = kingpin.Flag( + "collector.rapl.enable-zone-label", + "Enables RAPL zone labels", + ).Default("false").Bool() ) // NewRaplCollector returns a new Collector exposing RAPL metrics. diff --git a/pkg/collector/slurm.go b/pkg/collector/slurm.go index 57ef1462..69d60c40 100644 --- a/pkg/collector/slurm.go +++ b/pkg/collector/slurm.go @@ -17,9 +17,8 @@ import ( "github.com/containerd/cgroups/v3/cgroup1" "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/mahendrapaipuri/batchjob_monitoring/internal/helpers" "github.com/prometheus/client_golang/prometheus" - - utils "github.com/mahendrapaipuri/batchjob_monitoring/pkg/utils" ) const slurmCollectorSubsystem = "slurm_job" @@ -27,15 +26,22 @@ const slurmCollectorSubsystem = "slurm_job" var ( cgroupV2 = false metricLock = sync.RWMutex{} - collectJobSteps = kingpin.Flag("collector.slurm.jobsteps.metrics", "Whether to collect metrics of all slurm job steps and tasks [WARNING: This option can result in very high cardinality of metrics]."). - Default("false"). - Bool() - useJobIdHash = kingpin.Flag("collector.slurm.unique.jobid", "Whether to calculate a hash based on job SLURM_JOBID, SLURM_JOB_UID, SLURM_JOB_GID, SLURM_JOB_NODELIST, SLURM_JOB_WORKDIR to get unique job identifier."). - Default("false"). - Bool() - jobStatPath = kingpin.Flag("collector.slurm.job.stat.path", "Path to jobstat files that contains a file for each job with line \"$SLURM_JOB_UID $SLURM_JOB_GID $SLURM_JOB_NODELIST $SLURM_JOB_WORKDIR\". An MD5 checksum is computed on this file to get an unique job ID if --collector.slurm.unique.jobid is used."). - Default("/run/slurmjobstat"). - String() + collectJobSteps = kingpin.Flag( + "collector.slurm.jobsteps.metrics", + `Whether to collect metrics of all slurm job steps and tasks +[WARNING: This option can result in very high cardinality of metrics].`, + ).Default("false").Bool() + // useJobIdHash = kingpin.Flag( + // "collector.slurm.unique.jobid", + // "Whether to calculate a hash based on job SLURM_JOBID, SLURM_JOB_UID, SLURM_JOB_ACCOUNT, SLURM_JOB_NODELIST, SLURM_JOB_WORKDIR to get unique job identifier.", + // ).Default("false").Bool() + jobStatPath = kingpin.Flag( + "collector.slurm.job.stat.path", + `Path to jobstat files that contains a file for each job with line +\"$SLURM_JOB_UID $SLURM_JOB_ACCOUNT $SLURM_JOB_NODELIST $SLURM_JOB_WORKDIR\". +An deterministic UUID is computed on the variables in this file and job ID to get an +unique job identifier.`, + ).Default("/run/slurmjobstat").String() ) type CgroupMetric struct { @@ -54,7 +60,7 @@ type CgroupMetric struct { memswFailCount float64 userslice bool jobuid string - jobgid string + jobaccount string jobid string jobuuid string step string @@ -98,79 +104,79 @@ func NewSlurmCollector(logger log.Logger) (Collector, error) { cpuUser: prometheus.NewDesc( prometheus.BuildFQName(namespace, "cpu", "user_seconds"), "Cumulative CPU user seconds", - []string{"batch", "jobuid", "jobgid", "jobid", "jobuuid", "step", "task"}, + []string{"batch", "jobid", "jobaccount", "jobuuid", "step", "task"}, nil, ), cpuSystem: prometheus.NewDesc( prometheus.BuildFQName(namespace, "cpu", "system_seconds"), "Cumulative CPU system seconds", - []string{"batch", "jobuid", "jobgid", "jobid", "jobuuid", "step", "task"}, + []string{"batch", "jobid", "jobaccount", "jobuuid", "step", "task"}, nil, ), cpuTotal: prometheus.NewDesc( prometheus.BuildFQName(namespace, "cpu", "total_seconds"), "Cumulative CPU total seconds", - []string{"batch", "jobuid", "jobgid", "jobid", "jobuuid", "step", "task"}, + []string{"batch", "jobid", "jobaccount", "jobuuid", "step", "task"}, nil, ), cpus: prometheus.NewDesc( prometheus.BuildFQName(namespace, "", "cpus"), "Number of CPUs", - []string{"batch", "jobuid", "jobgid", "jobid", "jobuuid", "step", "task"}, + []string{"batch", "jobid", "jobaccount", "jobuuid", "step", "task"}, nil, ), memoryRSS: prometheus.NewDesc( prometheus.BuildFQName(namespace, "memory", "rss_bytes"), "Memory RSS used in bytes", - []string{"batch", "jobuid", "jobgid", "jobid", "jobuuid", "step", "task"}, + []string{"batch", "jobid", "jobaccount", "jobuuid", "step", "task"}, nil, ), memoryCache: prometheus.NewDesc( prometheus.BuildFQName(namespace, "memory", "cache_bytes"), "Memory cache used in bytes", - []string{"batch", "jobuid", "jobgid", "jobid", "jobuuid", "step", "task"}, + []string{"batch", "jobid", "jobaccount", "jobuuid", "step", "task"}, nil, ), memoryUsed: prometheus.NewDesc( prometheus.BuildFQName(namespace, "memory", "used_bytes"), "Memory used in bytes", - []string{"batch", "jobuid", "jobgid", "jobid", "jobuuid", "step", "task"}, + []string{"batch", "jobid", "jobaccount", "jobuuid", "step", "task"}, nil, ), memoryTotal: prometheus.NewDesc( prometheus.BuildFQName(namespace, "memory", "total_bytes"), "Memory total in bytes", - []string{"batch", "jobuid", "jobgid", "jobid", "jobuuid", "step", "task"}, + []string{"batch", "jobid", "jobaccount", "jobuuid", "step", "task"}, nil, ), memoryFailCount: prometheus.NewDesc( prometheus.BuildFQName(namespace, "memory", "fail_count"), "Memory fail count", - []string{"batch", "jobuid", "jobgid", "jobid", "jobuuid", "step", "task"}, + []string{"batch", "jobid", "jobaccount", "jobuuid", "step", "task"}, nil, ), memswUsed: prometheus.NewDesc( prometheus.BuildFQName(namespace, "memsw", "used_bytes"), "Swap used in bytes", - []string{"batch", "jobuid", "jobgid", "jobid", "jobuuid", "step", "task"}, + []string{"batch", "jobid", "jobaccount", "jobuuid", "step", "task"}, nil, ), memswTotal: prometheus.NewDesc( prometheus.BuildFQName(namespace, "memsw", "total_bytes"), "Swap total in bytes", - []string{"batch", "jobuid", "jobgid", "jobid", "jobuuid", "step", "task"}, + []string{"batch", "jobid", "jobaccount", "jobuuid", "step", "task"}, nil, ), memswFailCount: prometheus.NewDesc( prometheus.BuildFQName(namespace, "memsw", "fail_count"), "Swap fail count", - []string{"batch", "jobuid", "jobgid", "jobid", "jobuuid", "step", "task"}, + []string{"batch", "jobid", "jobaccount", "jobuuid", "step", "task"}, nil, ), collectError: prometheus.NewDesc( prometheus.BuildFQName(namespace, "exporter", "collect_error"), "Indicates collection error, 0=no error, 1=error", - []string{"batch", "jobuid", "jobgid", "jobid", "jobuuid", "step", "task"}, + []string{"batch", "jobid", "jobaccount", "jobuuid", "step", "task"}, nil, ), logger: logger, @@ -196,9 +202,9 @@ func (c *slurmCollector) Update(ch chan<- prometheus.Metric) error { if m.err { ch <- prometheus.MustNewConstMetric(c.collectError, prometheus.GaugeValue, 1, m.name) } - ch <- prometheus.MustNewConstMetric(c.cpuUser, prometheus.GaugeValue, m.cpuUser, m.batch, m.jobuid, m.jobgid, m.jobid, m.jobuuid, m.step, m.task) - ch <- prometheus.MustNewConstMetric(c.cpuSystem, prometheus.GaugeValue, m.cpuSystem, m.batch, m.jobuid, m.jobgid, m.jobid, m.jobuuid, m.step, m.task) - ch <- prometheus.MustNewConstMetric(c.cpuTotal, prometheus.GaugeValue, m.cpuTotal, m.batch, m.jobuid, m.jobgid, m.jobid, m.jobuuid, m.step, m.task) + ch <- prometheus.MustNewConstMetric(c.cpuUser, prometheus.GaugeValue, m.cpuUser, m.batch, m.jobid, m.jobaccount, m.jobuuid, m.step, m.task) + ch <- prometheus.MustNewConstMetric(c.cpuSystem, prometheus.GaugeValue, m.cpuSystem, m.batch, m.jobid, m.jobaccount, m.jobuuid, m.step, m.task) + ch <- prometheus.MustNewConstMetric(c.cpuTotal, prometheus.GaugeValue, m.cpuTotal, m.batch, m.jobid, m.jobaccount, m.jobuuid, m.step, m.task) cpus := m.cpus if cpus == 0 { dir := filepath.Dir(n) @@ -207,15 +213,15 @@ func (c *slurmCollector) Update(ch chan<- prometheus.Metric) error { cpus = metrics[filepath.Dir(dir)].cpus } } - ch <- prometheus.MustNewConstMetric(c.cpus, prometheus.GaugeValue, float64(cpus), m.batch, m.jobuid, m.jobgid, m.jobid, m.jobuuid, m.step, m.task) - ch <- prometheus.MustNewConstMetric(c.memoryRSS, prometheus.GaugeValue, m.memoryRSS, m.batch, m.jobuid, m.jobgid, m.jobid, m.jobuuid, m.step, m.task) - ch <- prometheus.MustNewConstMetric(c.memoryCache, prometheus.GaugeValue, m.memoryCache, m.batch, m.jobuid, m.jobgid, m.jobid, m.jobuuid, m.step, m.task) - ch <- prometheus.MustNewConstMetric(c.memoryUsed, prometheus.GaugeValue, m.memoryUsed, m.batch, m.jobuid, m.jobgid, m.jobid, m.jobuuid, m.step, m.task) - ch <- prometheus.MustNewConstMetric(c.memoryTotal, prometheus.GaugeValue, m.memoryTotal, m.batch, m.jobuid, m.jobgid, m.jobid, m.jobuuid, m.step, m.task) - ch <- prometheus.MustNewConstMetric(c.memoryFailCount, prometheus.GaugeValue, m.memoryFailCount, m.batch, m.jobuid, m.jobgid, m.jobid, m.jobuuid, m.step, m.task) - ch <- prometheus.MustNewConstMetric(c.memswUsed, prometheus.GaugeValue, m.memswUsed, m.batch, m.jobuid, m.jobgid, m.jobid, m.jobuuid, m.step, m.task) - ch <- prometheus.MustNewConstMetric(c.memswTotal, prometheus.GaugeValue, m.memswTotal, m.batch, m.jobuid, m.jobgid, m.jobid, m.jobuuid, m.step, m.task) - ch <- prometheus.MustNewConstMetric(c.memswFailCount, prometheus.GaugeValue, m.memswFailCount, m.batch, m.jobuid, m.jobgid, m.jobid, m.jobuuid, m.step, m.task) + ch <- prometheus.MustNewConstMetric(c.cpus, prometheus.GaugeValue, float64(cpus), m.batch, m.jobid, m.jobaccount, m.jobuuid, m.step, m.task) + ch <- prometheus.MustNewConstMetric(c.memoryRSS, prometheus.GaugeValue, m.memoryRSS, m.batch, m.jobid, m.jobaccount, m.jobuuid, m.step, m.task) + ch <- prometheus.MustNewConstMetric(c.memoryCache, prometheus.GaugeValue, m.memoryCache, m.batch, m.jobid, m.jobaccount, m.jobuuid, m.step, m.task) + ch <- prometheus.MustNewConstMetric(c.memoryUsed, prometheus.GaugeValue, m.memoryUsed, m.batch, m.jobid, m.jobaccount, m.jobuuid, m.step, m.task) + ch <- prometheus.MustNewConstMetric(c.memoryTotal, prometheus.GaugeValue, m.memoryTotal, m.batch, m.jobid, m.jobaccount, m.jobuuid, m.step, m.task) + ch <- prometheus.MustNewConstMetric(c.memoryFailCount, prometheus.GaugeValue, m.memoryFailCount, m.batch, m.jobid, m.jobaccount, m.jobuuid, m.step, m.task) + ch <- prometheus.MustNewConstMetric(c.memswUsed, prometheus.GaugeValue, m.memswUsed, m.batch, m.jobid, m.jobaccount, m.jobuuid, m.step, m.task) + ch <- prometheus.MustNewConstMetric(c.memswTotal, prometheus.GaugeValue, m.memswTotal, m.batch, m.jobid, m.jobaccount, m.jobuuid, m.step, m.task) + ch <- prometheus.MustNewConstMetric(c.memswFailCount, prometheus.GaugeValue, m.memswFailCount, m.batch, m.jobid, m.jobaccount, m.jobuuid, m.step, m.task) } return nil } @@ -362,7 +368,7 @@ func (c *slurmCollector) getCPUs(name string) ([]string, error) { func (c *slurmCollector) getJobLabels(jobid string) (string, string, string) { var jobUuid string var jobUid string = "" - var jobGid string = "" + var jobAccount string = "" var jobNodelist = "" var jobWorkDir = "" var slurmJobInfo = fmt.Sprintf("%s/%s", *jobStatPath, jobid) @@ -372,13 +378,13 @@ func (c *slurmCollector) getJobLabels(jobid string) (string, string, string) { level.Error(c.logger). Log("msg", "Failed to get metadata for job", "jobid", jobid, "err", err) } else { - fmt.Sscanf(string(content), "%s %s %s %s", &jobUid, &jobGid, &jobNodelist, &jobWorkDir) + fmt.Sscanf(string(content), "%s %s %s %s", &jobUid, &jobAccount, &jobNodelist, &jobWorkDir) } - jobUuid, err = utils.GetUuidFromString( + jobUuid, err = helpers.GetUuidFromString( []string{ jobid, jobUid, - jobGid, + strings.ToLower(jobAccount), strings.ToLower(jobNodelist), strings.ToLower(jobWorkDir), }, @@ -389,7 +395,7 @@ func (c *slurmCollector) getJobLabels(jobid string) (string, string, string) { jobUuid = jobid } } - return jobUuid, jobUid, jobGid + return jobUuid, jobUid, jobAccount } // Get job details from cgroups v1 @@ -469,7 +475,7 @@ func (c *slurmCollector) getCgroupsV1Metrics(name string) (CgroupMetric, error) metric.cpus = len(cpus) } c.getInfoV1(name, &metric) - metric.jobuuid, metric.jobuid, metric.jobgid = c.getJobLabels(metric.jobid) + metric.jobuuid, metric.jobuid, metric.jobaccount = c.getJobLabels(metric.jobid) return metric, nil } @@ -522,7 +528,7 @@ func (c *slurmCollector) getCgroupsV2Metrics(name string) (CgroupMetric, error) "memory.max", "memory.stat", } - data, err := utils.LoadCgroupsV2Metrics(name, *cgroupfsPath, controllers) + data, err := LoadCgroupsV2Metrics(name, *cgroupfsPath, controllers) if err != nil { level.Error(c.logger).Log("msg", "Failed to load cgroups", "path", name, "err", err) metric.err = true @@ -557,6 +563,6 @@ func (c *slurmCollector) getCgroupsV2Metrics(name string) (CgroupMetric, error) metric.cpus = len(cpus) } c.getInfoV2(name, &metric) - metric.jobuuid, metric.jobuid, metric.jobgid = c.getJobLabels(metric.jobid) + metric.jobuuid, metric.jobuid, metric.jobaccount = c.getJobLabels(metric.jobid) return metric, nil } diff --git a/pkg/collector/slurm_test.go b/pkg/collector/slurm_test.go index 9533f89b..24a5c5e6 100644 --- a/pkg/collector/slurm_test.go +++ b/pkg/collector/slurm_test.go @@ -14,7 +14,12 @@ import ( var expectedSlurmMetrics CgroupMetric func TestCgroupsV2SlurmJobMetrics(t *testing.T) { - if _, err := kingpin.CommandLine.Parse([]string{"--path.cgroupfs", "fixtures/sys/fs/cgroup", "--collector.slurm.unique.jobid", "--collector.slurm.job.stat.path", "fixtures/slurmjobstat"}); err != nil { + if _, err := kingpin.CommandLine.Parse( + []string{ + "--path.cgroupfs", "fixtures/sys/fs/cgroup", + "--collector.slurm.job.stat.path", "fixtures/slurmjobstat", + }, + ); err != nil { t.Fatal(err) } c := slurmCollector{cgroupV2: true, logger: log.NewNopLogger()} @@ -35,9 +40,9 @@ func TestCgroupsV2SlurmJobMetrics(t *testing.T) { memswFailCount: 0, userslice: false, jobuid: "1000", - jobgid: "1000", + jobaccount: "testacc", jobid: "1009248", - jobuuid: "8d4fad6d-c5e3-775b-8a8c-707e319114ec", + jobuuid: "7f6c39b7-2740-fc1f-32c2-8fc28880829c", step: "", task: "", batch: "slurm", @@ -51,7 +56,12 @@ func TestCgroupsV2SlurmJobMetrics(t *testing.T) { } func TestCgroupsV1SlurmJobMetrics(t *testing.T) { - if _, err := kingpin.CommandLine.Parse([]string{"--path.cgroupfs", "fixtures/sys/fs/cgroup", "--collector.slurm.unique.jobid", "--collector.slurm.job.stat.path", "fixtures/slurmjobstat"}); err != nil { + if _, err := kingpin.CommandLine.Parse( + []string{ + "--path.cgroupfs", "fixtures/sys/fs/cgroup", + "--collector.slurm.job.stat.path", "fixtures/slurmjobstat", + }, + ); err != nil { t.Fatal(err) } c := slurmCollector{cgroupV2: false, logger: log.NewNopLogger()} @@ -72,9 +82,9 @@ func TestCgroupsV1SlurmJobMetrics(t *testing.T) { memswFailCount: 0, userslice: false, jobuid: "1000", - jobgid: "1000", + jobaccount: "testacc", jobid: "1009248", - jobuuid: "8d4fad6d-c5e3-775b-8a8c-707e319114ec", + jobuuid: "7f6c39b7-2740-fc1f-32c2-8fc28880829c", step: "", task: "", batch: "slurm", diff --git a/pkg/collector/types.go b/pkg/collector/types.go deleted file mode 100644 index 596264fb..00000000 --- a/pkg/collector/types.go +++ /dev/null @@ -1,23 +0,0 @@ -package collector - -type energyMixDataFields struct { - BioFuel float64 `json:"biofuel_TWh"` - CarbonIntensity float64 `json:"carbon_intensity"` - Coal float64 `json:"coal_TWh"` - CountryName string `json:"country_name"` - Fossil float64 `json:"fossil_TWh"` - Gas float64 `json:"gas_TWh"` - Hydro float64 `json:"hydroelectricity_TWh"` - IsoCode string `json:"iso_code"` - LowCarbon float64 `json:"low_carbon_TWh"` - Nuclear float64 `json:"nuclear_TWh"` - Oil float64 `json:"oil_TWh"` - OtherRenewable float64 `json:"other_renewable_TWh"` - OtherRenewableExcluBioFuel float64 `json:"other_renewable_exc_biofuel_TWh"` - PerCapita float64 `json:"per_capita_Wh"` - Renewables float64 `json:"renewables_TWh"` - Solar float64 `json:"solar_TWh"` - Total float64 `json:"total_TWh"` - Wind float64 `json:"wind_TWh"` - Year int64 `json:"year"` -} diff --git a/pkg/emissions/emissions.go b/pkg/emissions/emissions.go new file mode 100644 index 00000000..a1998201 --- /dev/null +++ b/pkg/emissions/emissions.go @@ -0,0 +1,120 @@ +//go:build !emissions +// +build !emissions + +package emissions + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" +) + +const ( + opendatasoftAPIBaseUrl = "https://odre.opendatasoft.com" + opendatasoftAPIPath = `%s/api/records/1.0/search/?%s` + globalEnergyMixDataUrl = "https://raw.githubusercontent.com/mlco2/codecarbon/master/codecarbon/data/private_infra/global_energy_mix.json" + GlobalEmissionFactor = 475 +) + +type Client interface { + Do(req *http.Request) (*http.Response, error) +} + +// Read JSON file from GitHub to get global energy mix data +func GetEnergyMixData(client Client, logger log.Logger) (map[string]float64, error) { + req, err := http.NewRequest(http.MethodGet, globalEnergyMixDataUrl, nil) + if err != nil { + level.Error(logger).Log("msg", "Failed to create HTTP request", "err", err) + return nil, err + } + + resp, _ := client.Do(req) + // resp, err := http.DefaultClient.Get(globalEnergyMixDataUrl) + if err != nil { + level.Error(logger).Log("msg", "Failed to get global energy mix data", "err", err) + return nil, err + } + + defer resp.Body.Close() + buf := new(bytes.Buffer) + _, err = buf.ReadFrom(resp.Body) + if err != nil { + level.Error(logger).Log("msg", "Failed to read responde body", "err", err) + return nil, err + } + respByte := buf.Bytes() + // JSON might contain NaN, replace it with null that is allowed in JSON + respByte = bytes.Replace(respByte, []byte("NaN"), []byte("null"), -1) + var globalEmissionData map[string]energyMixDataFields + if err := json.Unmarshal(respByte, &globalEmissionData); err != nil { + level.Error(logger).Log("msg", "Failed to unmarshal JSON data", "err", err) + return nil, err + } + + var countryEmissionData = make(map[string]float64) + for country, data := range globalEmissionData { + // Set unavaible values to -1 to indicate they are indeed unavailable + if data.CarbonIntensity == 0 { + countryEmissionData[country] = -1 + } else { + countryEmissionData[country] = data.CarbonIntensity + } + } + return countryEmissionData, nil +} + +// Request to OPENDATASOFT API to get RTE energy data for France +func GetRteEnergyMixEmissionData(client Client, logger log.Logger) (float64, error) { + params := url.Values{} + params.Add("dataset", "eco2mix-national-tr") + params.Add("facet", "nature") + params.Add("facet", "date_heure") + params.Add("start", "0") + params.Add("rows", "1") + params.Add("sort", "date_heure") + params.Add( + "q", + fmt.Sprintf( + "date_heure:[%s TO #now()] AND NOT #null(taux_co2)", + time.Now().Format("2006-01-02"), + ), + ) + queryString := params.Encode() + + req, err := http.NewRequest(http.MethodGet, + fmt.Sprintf(opendatasoftAPIPath, opendatasoftAPIBaseUrl, queryString), nil, + ) + if err != nil { + level.Error(logger).Log("msg", "Failed to create HTTP request", "err", err) + return -1, err + } + + resp, err := client.Do(req) + if err != nil { + return -1, err + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return -1, err + } + + var data nationalRealTimeResponse + err = json.Unmarshal(body, &data) + if err != nil { + return -1, err + } + + var fields []nationalRealTimeFields + for _, r := range data.Records { + fields = append(fields, r.Fields) + } + return float64(fields[0].TauxCo2), nil +} diff --git a/pkg/utils/types.go b/pkg/emissions/types.go similarity index 81% rename from pkg/utils/types.go rename to pkg/emissions/types.go index 3d51a830..6cac531d 100644 --- a/pkg/utils/types.go +++ b/pkg/emissions/types.go @@ -1,4 +1,4 @@ -package utils +package emissions // Nicked from https://github.com/nmasse-itix/ego2mix type nationalRealTimeFields struct { @@ -78,3 +78,26 @@ type nationalRealTimeResponse struct { } `json:"parameters"` Records []nationalRealTimeRecord `json:"records"` } + +// code carbon global energy data mix interface +type energyMixDataFields struct { + BioFuel float64 `json:"biofuel_TWh"` + CarbonIntensity float64 `json:"carbon_intensity"` + Coal float64 `json:"coal_TWh"` + CountryName string `json:"country_name"` + Fossil float64 `json:"fossil_TWh"` + Gas float64 `json:"gas_TWh"` + Hydro float64 `json:"hydroelectricity_TWh"` + IsoCode string `json:"iso_code"` + LowCarbon float64 `json:"low_carbon_TWh"` + Nuclear float64 `json:"nuclear_TWh"` + Oil float64 `json:"oil_TWh"` + OtherRenewable float64 `json:"other_renewable_TWh"` + OtherRenewableExcluBioFuel float64 `json:"other_renewable_exc_biofuel_TWh"` + PerCapita float64 `json:"per_capita_Wh"` + Renewables float64 `json:"renewables_TWh"` + Solar float64 `json:"solar_TWh"` + Total float64 `json:"total_TWh"` + Wind float64 `json:"wind_TWh"` + Year int64 `json:"year"` +} diff --git a/pkg/jobstats/jobstats.go b/pkg/jobstats/db.go similarity index 75% rename from pkg/jobstats/jobstats.go rename to pkg/jobstats/db.go index ce64a633..99344a7d 100644 --- a/pkg/jobstats/jobstats.go +++ b/pkg/jobstats/db.go @@ -7,22 +7,30 @@ import ( "strings" "time" + "github.com/alecthomas/kingpin/v2" "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/mahendrapaipuri/batchjob_monitoring/pkg/utils" ) var ( - dateFormat = "2006-01-02T15:04:05" - checksMap = map[string]interface{}{ - "slurm": slurmChecks, + JobstatDBAppName = "batchjob_stat_db" + JobstatDBApp = kingpin.New( + JobstatDBAppName, + "Application that conslidates the batch job stats into a local DB.", + ) + dateFormat = "2006-01-02T15:04:05" + pragmaStatements = []string{ + "PRAGMA synchronous = OFF", + "PRAGMA journal_mode = MEMORY", } - statsMap = map[string]interface{}{ - "slurm": getSlurmJobs, + indexStatements = []string{ + `CREATE INDEX i1 ON %s (Usr,Account,Start);`, + `CREATE INDEX i2 ON %s (Usr,Jobuuid);`, } + allFields = GetStructFieldName(BatchJob{}) ) -func NewJobStats( +func NewJobStatsDB( logger log.Logger, batchScheduler string, jobstatDBPath string, @@ -30,8 +38,8 @@ func NewJobStats( retentionPeriod int, jobsLastTimeStampFile string, vacuumLastTimeStampFile string, -) *jobStats { - return &jobStats{ +) *jobStatsDB { + return &jobStatsDB{ logger: logger, batchScheduler: batchScheduler, jobstatDBPath: jobstatDBPath, @@ -43,15 +51,15 @@ func NewJobStats( } // Do preliminary checks -func (j *jobStats) checks() { +func (j *jobStatsDB) checks() { if checksFunc, ok := checksMap[j.batchScheduler]; ok { checksFunc.(func(log.Logger))(j.logger) } } // Open DB connection and return connection poiner -func (j *jobStats) openDBConnection() (*sql.DB, error) { - dbConn, err := sql.Open("sqlite", j.jobstatDBPath) +func (j *jobStatsDB) openDBConnection() (*sql.DB, error) { + dbConn, err := sql.Open("sqlite3", j.jobstatDBPath) if err != nil { return nil, err } @@ -59,22 +67,20 @@ func (j *jobStats) openDBConnection() (*sql.DB, error) { } // Set all necessary PRAGMA statement on DB -func (j *jobStats) setPragmaDirectives(db *sql.DB) { +func (j *jobStatsDB) setPragmaDirectives(db *sql.DB) { // Ref: https://stackoverflow.com/questions/1711631/improve-insert-per-second-performance-of-sqlite // Ref: https://gitlab.com/gnufred/logslate/-/blob/8eda5cedc9a28da3793dcf73480d618c95cc322c/playground/sqlite3.go // Ref: https://github.com/mattn/go-sqlite3/issues/1145#issuecomment-1519012055 - _, err := db.Exec("PRAGMA synchronous = OFF") - if err != nil { - level.Error(j.logger).Log("msg", "Failed to set synchronous to OFF", "err", err) - } - _, err = db.Exec("PRAGMA journal_mode = MEMORY") - if err != nil { - level.Error(j.logger).Log("msg", "Failed to set journal_mode to MEMORY", "err", err) + for _, stmt := range pragmaStatements { + _, err := db.Exec(stmt) + if err != nil { + level.Error(j.logger).Log("msg", "Failed to execute pragma statement", "statement", stmt, "err", err) + } } } // Check if DB is locked -func (j *jobStats) checkDBLock(db *sql.DB) error { +func (j *jobStatsDB) checkDBLock(db *sql.DB) error { _, err := db.Exec("BEGIN EXCLUSIVE TRANSACTION;") if err != nil { return err @@ -87,8 +93,8 @@ func (j *jobStats) checkDBLock(db *sql.DB) error { return nil } -// Prepare DB and create table -func (j *jobStats) prepareDB() (*sql.DB, error) { +// Setup DB and create table +func (j *jobStatsDB) setupDB() (*sql.DB, error) { if _, err := os.Stat(j.jobstatDBPath); err == nil { // Open the created SQLite File db, err := j.openDBConnection() @@ -124,8 +130,8 @@ func (j *jobStats) prepareDB() (*sql.DB, error) { } // Create a table for storing job stats -func (j *jobStats) createTable(db *sql.DB) error { - allFields := utils.GetStructFieldName(BatchJob{}) +func (j *jobStatsDB) createTable(db *sql.DB) error { + // allFields := GetStructFieldName(BatchJob{}) fieldLines := []string{} for _, field := range allFields { fieldLines = append(fieldLines, fmt.Sprintf(" \"%s\" TEXT,", field)) @@ -153,41 +159,41 @@ func (j *jobStats) createTable(db *sql.DB) error { } // Prepare SQL DB index creation Statement - level.Info(j.logger).Log("msg", "Creating DB index with Usr,Account,Start columns") - createIndexSQL := fmt.Sprintf(`CREATE INDEX i1 ON %s (Usr,Account,Start);`, j.jobstatDBTable) - statement, err = db.Prepare(createIndexSQL) - if err != nil { - level.Error(j.logger). - Log("msg", "Failed to prepare SQL statement for index creation", "err", err) - return err - } + for _, stmt := range indexStatements { + level.Info(j.logger).Log("msg", "Creating DB index with Usr,Account,Start columns") + createIndexSQL := fmt.Sprintf(stmt, j.jobstatDBTable) + statement, err = db.Prepare(createIndexSQL) + if err != nil { + level.Error(j.logger). + Log("msg", "Failed to prepare SQL statement for index creation", "err", err) + return err + } - // Execute SQL DB index creation Statements - _, err = statement.Exec() - if err != nil { - level.Error(j.logger).Log("msg", "Failed to execute SQL command for index creation statement", "err", err) - return err + // Execute SQL DB index creation Statements + _, err = statement.Exec() + if err != nil { + level.Error(j.logger).Log("msg", "Failed to execute SQL command for index creation statement", "err", err) + return err + } } level.Info(j.logger).Log("msg", "SQLite DB table for jobstats successfully created") return nil } // Make and return prepare statement for inserting entries -func (j *jobStats) getSQLPrepareStatement(tx *sql.Tx) (*sql.Stmt, error) { - allFields := utils.GetStructFieldName(BatchJob{}) - fieldNamesString := strings.Join(allFields[:], ", ") - placeholderSlice := make([]string, len(allFields)) - for i := range placeholderSlice { - placeholderSlice[i] = "?" - } - placeholderString := strings.Join(placeholderSlice, ", ") - insertSQLPlaceholder := fmt.Sprintf( - "INSERT INTO %s(%s) VALUES (%s)", +func (j *jobStatsDB) prepareInsertStatement(tx *sql.Tx, numJobs int) (*sql.Stmt, error) { + placeHolderString := fmt.Sprintf( + "(%s)", + strings.Join(strings.Split(strings.Repeat("?", len(allFields)), ""), ","), + ) + fieldNamesString := strings.Join(allFields, ",") + insertStatement := fmt.Sprintf( + "INSERT INTO %s(%s) VALUES %s", j.jobstatDBTable, fieldNamesString, - placeholderString, + placeHolderString, ) - stmt, err := tx.Prepare(insertSQLPlaceholder) + stmt, err := tx.Prepare(insertStatement) if err != nil { return nil, err } @@ -195,7 +201,7 @@ func (j *jobStats) getSQLPrepareStatement(tx *sql.Tx) (*sql.Stmt, error) { } // Get start and end time to query for jobs -func (j *jobStats) getStartEndTimes() (time.Time, time.Time) { +func (j *jobStatsDB) getStartEndTimes() (time.Time, time.Time) { var startTime time.Time var foundStartTime bool = false if _, err := os.Stat(j.jobsLastTimeStampFile); err == nil { @@ -220,7 +226,7 @@ func (j *jobStats) getStartEndTimes() (time.Time, time.Time) { } // Write endTime to a file -func (j *jobStats) writeLastTimeStampFile(lastTimeStampFile string, endTime time.Time) { +func (j *jobStatsDB) writeLastTimeStampFile(lastTimeStampFile string, endTime time.Time) { lastTimeStamp := []byte(endTime.Format(dateFormat)) err := os.WriteFile(lastTimeStampFile, lastTimeStamp, 0644) if err != nil { @@ -229,7 +235,7 @@ func (j *jobStats) writeLastTimeStampFile(lastTimeStampFile string, endTime time } // Get job stats and insert them into DB -func (j *jobStats) GetJobStats() error { +func (j *jobStatsDB) GetJobStats() error { // First do basic checks j.checks() // Get start and end times for retrieving jobs @@ -247,8 +253,8 @@ func (j *jobStats) GetJobStats() error { } } - // Prepare DB and return db object - db, err := j.prepareDB() + // Setup DB and return db object + db, err := j.setupDB() if err != nil { level.Error(j.logger).Log("msg", "Preparation of DB failed", "err", err) return err @@ -277,7 +283,7 @@ func (j *jobStats) GetJobStats() error { // Set pragma statements j.setPragmaDirectives(db) - stmt, err := j.getSQLPrepareStatement(tx) + stmt, err := j.prepareInsertStatement(tx, len(jobs)) if err != nil { level.Error(j.logger).Log("msg", "Failed to prepare insert job statement in DB", "err", err) return err @@ -314,10 +320,11 @@ func (j *jobStats) GetJobStats() error { } // Vacuum DB to reduce fragementation and size -func (j *jobStats) vacuumDB(db *sql.DB) error { +func (j *jobStatsDB) vacuumDB(db *sql.DB) error { weekday := time.Now().Weekday().String() hours, _, _ := time.Now().Clock() var nextVacuumTime time.Time + // Check if lasttimestamp for vacuum exists if _, err := os.Stat(j.vacuumLastTimeStampFile); err == nil { timestamp, err := os.ReadFile(j.vacuumLastTimeStampFile) @@ -354,7 +361,7 @@ func (j *jobStats) vacuumDB(db *sql.DB) error { } // Delete old entries in DB -func (j *jobStats) deleteOldJobs(tx *sql.Tx) error { +func (j *jobStatsDB) deleteOldJobs(tx *sql.Tx) error { deleteSQLCmd := fmt.Sprintf( "DELETE FROM %s WHERE Start <= date('now', '-%d day')", j.jobstatDBTable, @@ -369,7 +376,7 @@ func (j *jobStats) deleteOldJobs(tx *sql.Tx) error { } // Insert job stat into DB -func (j *jobStats) insertJobsInDB(statement *sql.Stmt, jobStats []BatchJob) { +func (j *jobStatsDB) insertJobsInDB(statement *sql.Stmt, jobStats []BatchJob) { for _, jobStat := range jobStats { // Empty job if jobStat == (BatchJob{}) { @@ -377,11 +384,22 @@ func (j *jobStats) insertJobsInDB(statement *sql.Stmt, jobStats []BatchJob) { } level.Debug(j.logger).Log("msg", "Inserting job", "jobid", jobStat.Jobid) _, err := statement.Exec( - jobStat.Jobid, jobStat.Jobuuid, - jobStat.Partition, jobStat.Account, jobStat.Grp, jobStat.Gid, - jobStat.Usr, jobStat.Uid, jobStat.Submit, jobStat.Start, - jobStat.End, jobStat.Elapsed, jobStat.Exitcode, - jobStat.State, jobStat.Nnodes, jobStat.Nodelist, + jobStat.Jobid, + jobStat.Jobuuid, + jobStat.Partition, + jobStat.Account, + jobStat.Grp, + jobStat.Gid, + jobStat.Usr, + jobStat.Uid, + jobStat.Submit, + jobStat.Start, + jobStat.End, + jobStat.Elapsed, + jobStat.Exitcode, + jobStat.State, + jobStat.Nnodes, + jobStat.Nodelist, jobStat.NodelistExp, ) if err != nil { diff --git a/pkg/jobstats/jobstats_test.go b/pkg/jobstats/db_test.go similarity index 90% rename from pkg/jobstats/jobstats_test.go rename to pkg/jobstats/db_test.go index 63312da8..98948455 100644 --- a/pkg/jobstats/jobstats_test.go +++ b/pkg/jobstats/db_test.go @@ -9,13 +9,13 @@ import ( "time" "github.com/go-kit/log" - _ "modernc.org/sqlite" + _ "github.com/mattn/go-sqlite3" ) -func populateDBWithMockData(db *sql.DB, j *jobStats) { +func populateDBWithMockData(db *sql.DB, j *jobStatsDB) { jobs := []BatchJob{{Jobid: "10000"}, {Jobid: "10001"}} tx, _ := db.Begin() - stmt, _ := j.getSQLPrepareStatement(tx) + stmt, _ := j.prepareInsertStatement(tx, len(jobs)) j.insertJobsInDB(stmt, jobs) tx.Commit() } @@ -24,13 +24,13 @@ func TestJobStatsDBPreparation(t *testing.T) { tmpDir := t.TempDir() jobstatDBTable := "jobstats" jobstatDBPath := filepath.Join(tmpDir, "jobstats.db") - j := jobStats{ + j := jobStatsDB{ logger: log.NewNopLogger(), batchScheduler: "slurm", jobstatDBPath: jobstatDBPath, jobstatDBTable: jobstatDBTable, } - db, err := j.prepareDB() + db, err := j.setupDB() if err != nil { t.Errorf("Failed to prepare DB due to %s", err) } @@ -52,13 +52,13 @@ func TestJobStatsDBLock(t *testing.T) { tmpDir := t.TempDir() jobstatDBTable := "jobstats" jobstatDBPath := filepath.Join(tmpDir, "jobstats.db") - j := jobStats{ + j := jobStatsDB{ logger: log.NewNopLogger(), batchScheduler: "slurm", jobstatDBPath: jobstatDBPath, jobstatDBTable: jobstatDBTable, } - db, err := j.prepareDB() + db, err := j.setupDB() if err != nil { t.Errorf("Failed to prepare DB") } @@ -77,13 +77,13 @@ func TestJobStatsDBVacuum(t *testing.T) { tmpDir := t.TempDir() jobstatDBTable := "jobstats" jobstatDBPath := filepath.Join(tmpDir, "jobstats.db") - j := jobStats{ + j := jobStatsDB{ logger: log.NewNopLogger(), batchScheduler: "slurm", jobstatDBPath: jobstatDBPath, jobstatDBTable: jobstatDBTable, } - db, err := j.prepareDB() + db, err := j.setupDB() if err != nil { t.Errorf("Failed to prepare DB") } @@ -103,14 +103,14 @@ func TestJobStatsDeleteOldJobs(t *testing.T) { jobstatDBTable := "jobstats" jobId := "1111" jobstatDBPath := filepath.Join(tmpDir, "jobstats.db") - j := jobStats{ + j := jobStatsDB{ logger: log.NewNopLogger(), batchScheduler: "slurm", jobstatDBPath: jobstatDBPath, jobstatDBTable: jobstatDBTable, retentionPeriod: 1, } - db, err := j.prepareDB() + db, err := j.setupDB() if err != nil { t.Errorf("Failed to prepare DB") } @@ -125,7 +125,7 @@ func TestJobStatsDeleteOldJobs(t *testing.T) { }, } tx, _ := db.Begin() - stmt, err := j.getSQLPrepareStatement(tx) + stmt, err := j.prepareInsertStatement(tx, len(jobs)) if err != nil { t.Errorf("Failed to prepare SQL statements") } diff --git a/pkg/jobstats/fixtures/e2e-test-stats-server-output.txt b/pkg/jobstats/fixtures/e2e-test-stats-server-output.txt new file mode 100644 index 00000000..1cbb435f --- /dev/null +++ b/pkg/jobstats/fixtures/e2e-test-stats-server-output.txt @@ -0,0 +1 @@ +{"status":"success","errorType":"","error":"","warnings":null,"data":[{"jobid":"1479763","id":"667127ae-68d0-47aa-78e3-76ae76e4aba7","partition":"part1","account":"acc1","group":"grp","gid":"1000","user":"usr","uid":"1000","submit":"2023-02-21T14:37:02","start":"2023-02-21T14:37:07","end":"2023-02-21T15:26:29","elapsed":"00:49:22","exitcode":"0:0","state":"CANCELLED by 1000","nnodes":"1","nodelist":"compute-0","nodelistexp":"compute-0"},{"jobid":"1481508","id":"6bfbe049-b5ee-ea02-6cd8-3dd95acdd0e4","partition":"part1","account":"acc1","group":"grp","gid":"1000","user":"usr","uid":"1000","submit":"2023-02-21T15:48:20","start":"2023-02-21T15:49:06","end":"2023-02-21T15:57:23","elapsed":"00:08:17","exitcode":"0:0","state":"CANCELLED by 1000","nnodes":"2","nodelist":"compute-[0-2]","nodelistexp":"compute-0|compute-1|compute-2"}]} diff --git a/pkg/jobstats/fixtures/jobstats.dump b/pkg/jobstats/fixtures/jobstats.dump index 99592eda..66fd5ebc 100644 --- a/pkg/jobstats/fixtures/jobstats.dump +++ b/pkg/jobstats/fixtures/jobstats.dump @@ -20,9 +20,10 @@ CREATE TABLE jobs ( "Nodelist" TEXT, "NodelistExp" TEXT ); -INSERT INTO jobs VALUES(1,'1479763','aaaf154c-e784-9e49-2155-4aa52462782a','part1','acc1','grp','1000','usr','1000','2023-02-21T14:37:02','2023-02-21T14:37:07','2023-02-21T15:26:29','00:49:22','0:0','CANCELLED by 1000','1','compute-0','compute-0'); -INSERT INTO jobs VALUES(2,'1481508','69683fe8-5d89-9ec5-4b4f-8404c7cc37f2','part1','acc1','grp','1000','usr','1000','2023-02-21T15:48:20','2023-02-21T15:49:06','2023-02-21T15:57:23','00:08:17','0:0','CANCELLED by 1000','2','compute-[0-2]','compute-0|compute-1|compute-2'); +INSERT INTO jobs VALUES(1,'1479763','667127ae-68d0-47aa-78e3-76ae76e4aba7','part1','acc1','grp','1000','usr','1000','2023-02-21T14:37:02','2023-02-21T14:37:07','2023-02-21T15:26:29','00:49:22','0:0','CANCELLED by 1000','1','compute-0','compute-0'); +INSERT INTO jobs VALUES(2,'1481508','6bfbe049-b5ee-ea02-6cd8-3dd95acdd0e4','part1','acc1','grp','1000','usr','1000','2023-02-21T15:48:20','2023-02-21T15:49:06','2023-02-21T15:57:23','00:08:17','0:0','CANCELLED by 1000','2','compute-[0-2]','compute-0|compute-1|compute-2'); DELETE FROM sqlite_sequence; INSERT INTO sqlite_sequence VALUES('jobs',2); CREATE INDEX i1 ON jobs (Usr,Account,Start); +CREATE INDEX i2 ON jobs (Usr,Jobuuid); COMMIT; diff --git a/pkg/jobstats/helper.go b/pkg/jobstats/helper.go new file mode 100644 index 00000000..144a46a9 --- /dev/null +++ b/pkg/jobstats/helper.go @@ -0,0 +1,73 @@ +package jobstats + +import ( + "reflect" + "regexp" + "strconv" + "strings" +) + +var ( + nodelistRegExp = regexp.MustCompile(`(\[\d+\-\d+\])`) +) + +// Get all fields in a given struct +func GetStructFieldName(Struct interface{}) []string { + var fields []string + + v := reflect.ValueOf(Struct) + typeOfS := v.Type() + + for i := 0; i < v.NumField(); i++ { + fields = append(fields, typeOfS.Field(i).Name) + } + return fields +} + +// Get all values in a given struct +func GetStructFieldValue(Struct interface{}) []interface{} { + v := reflect.ValueOf(Struct) + values := make([]interface{}, v.NumField()) + + for i := 0; i < v.NumField(); i++ { + f := v.Field(i) + values = append(values, f.Interface()) + } + return values +} + +// Expand SLURM NODELIST into slice of nodenames +func NodelistParser(nodelistExp string) []string { + var nodeNames []string + // First split by , to get individual nodes + for _, nodeexp := range strings.Split(nodelistExp, ",") { + // If it contains "[", it means they are range of nodes + if strings.Contains(nodeexp, "[") { + matches := nodelistRegExp.FindAllString(nodeexp, -1) + if len(matches) == 0 { + continue + } + // Get only first match as we use recursion + for _, match := range matches[0:1] { + matchSansBrackets := match[1 : len(match)-1] + startIdx, err := strconv.Atoi(strings.Split(matchSansBrackets, "-")[0]) + if err != nil { + continue + } + endIdx, err := strconv.Atoi(strings.Split(matchSansBrackets, "-")[1]) + if err != nil { + continue + } + for i := startIdx; i <= endIdx; i++ { + nodename := strings.Replace(nodeexp, match, strconv.Itoa(i), -1) + // Add them to slice and call function again + nodeNames = append(nodeNames, NodelistParser(nodename)...) + } + } + + } else { + nodeNames = append(nodeNames, regexp.QuoteMeta(nodeexp)) + } + } + return nodeNames +} diff --git a/pkg/utils/utils_test.go b/pkg/jobstats/helper_test.go similarity index 93% rename from pkg/utils/utils_test.go rename to pkg/jobstats/helper_test.go index 7f279634..51ae3b43 100644 --- a/pkg/utils/utils_test.go +++ b/pkg/jobstats/helper_test.go @@ -1,7 +1,4 @@ -//go:build !utils -// +build !utils - -package utils +package jobstats import ( "reflect" diff --git a/pkg/jobstats/interface.go b/pkg/jobstats/interface.go new file mode 100644 index 00000000..5af3f65d --- /dev/null +++ b/pkg/jobstats/interface.go @@ -0,0 +1,10 @@ +package jobstats + +var ( + checksMap = map[string]interface{}{ + "slurm": slurmChecks, + } + statsMap = map[string]interface{}{ + "slurm": getSlurmJobs, + } +) diff --git a/pkg/jobstats/server.go b/pkg/jobstats/server.go new file mode 100644 index 00000000..bb9823b5 --- /dev/null +++ b/pkg/jobstats/server.go @@ -0,0 +1,390 @@ +package jobstats + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "net/http" + "strings" + "time" + + "github.com/alecthomas/kingpin/v2" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/gorilla/mux" + _ "github.com/mattn/go-sqlite3" + "github.com/prometheus/exporter-toolkit/web" +) + +var ( + JobstatServerAppName = "batchjob_stats_server" + JobstatServerApp = kingpin.New( + JobstatServerAppName, + "API server to serve the job and user stats of batch job user.", + ) + db *sql.DB + jobstatDBFile string + jobstatDBTable string +) + +// Create new Jobstats server +func NewJobstatsServer(c *Config) (*JobstatsServer, func(), error) { + router := mux.NewRouter() + server := &JobstatsServer{ + logger: c.Logger, + server: &http.Server{ + Addr: c.Address, + Handler: router, + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + }, + webConfig: &web.FlagConfig{ + WebListenAddresses: &[]string{c.Address}, + WebSystemdSocket: &c.WebSystemdSocket, + WebConfigFile: &c.WebConfigFile, + }, + AccountsGetter: getAccounts, + JobsGetter: getJobs, + HealthChecker: getDBStatus, + } + + jobstatDBFile = c.JobstatDBFile + jobstatDBTable = c.JobstatDBTable + + router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("X-Content-Type-Options", "nosniff") + w.WriteHeader(http.StatusOK) + w.Write([]byte(` + Batch Job Stats API Server + +

Job Stats

+

Jobs

+

Accounts

+ + `)) + }) + + // Allow only GET methods + router.HandleFunc("/api/health", server.health).Methods("GET") + router.HandleFunc("/api/accounts", server.accounts).Methods("GET") + router.HandleFunc("/api/jobs", server.jobs).Methods("GET") + + // Open DB connection + var err error + db, err = sql.Open("sqlite3", jobstatDBFile) + if err != nil { + return nil, func() {}, err + } + return server, func() {}, nil +} + +// Start server +func (s *JobstatsServer) Start() error { + level.Info(s.logger).Log("msg", "Starting batchjob_stats_server") + if err := web.ListenAndServe(s.server, s.webConfig, s.logger); err != nil && err != http.ErrServerClosed { + level.Error(s.logger).Log("msg", "Failed to Listen and Server HTTP server", "err", err) + return err + } + return nil +} + +// Shutdown server +func (s *JobstatsServer) Shutdown(ctx context.Context) error { + if err := s.server.Shutdown(ctx); err != nil { + level.Error(s.logger).Log("msg", "Failed to shutdown HTTP server", "err", err) + return err + } + return nil +} + +// Get current user from the header +func (s *JobstatsServer) getUser(r *http.Request) string { + // Check if username header is available + user := r.Header.Get("X-Grafana-User") + if user == "" { + level.Warn(s.logger).Log("msg", "Header X-Grafana-User not found") + } + return user +} + +// Set response headers +func (s *JobstatsServer) setHeaders(w http.ResponseWriter) { + w.Header().Set("Content-Type", "application/json") + w.Header().Set("X-Content-Type-Options", "nosniff") +} + +// GET /api/accounts +// Get all accounts of a user +func (s *JobstatsServer) accounts(w http.ResponseWriter, r *http.Request) { + var response AccountsResponse + s.setHeaders(w) + w.WriteHeader(http.StatusOK) + + // Get current user from header + user := s.getUser(r) + // If no user found, return empty response + if user == "" { + response = AccountsResponse{ + Response: Response{ + Status: "error", + ErrorType: "User Error", + Error: "No user identified", + }, + Data: []Account{}, + } + err := json.NewEncoder(w).Encode(&response) + if err != nil { + level.Error(s.logger).Log("msg", "Failed to encode response", "err", err) + w.Write([]byte("KO")) + } + return + } + + // Get all user accounts + accounts, err := s.AccountsGetter(user, s.logger) + if err != nil { + level.Error(s.logger).Log("msg", "Failed to retrieve accounts", "user", user, "err", err) + response = AccountsResponse{ + Response: Response{ + Status: "error", + ErrorType: "Data error", + Error: "Failed to retrieve user accounts", + }, + Data: []Account{}, + } + err = json.NewEncoder(w).Encode(&response) + if err != nil { + level.Error(s.logger).Log("msg", "Failed to encode response", "err", err) + w.Write([]byte("KO")) + } + return + } + + // Write response + response = AccountsResponse{ + Response: Response{ + Status: "success", + }, + Data: accounts, + } + err = json.NewEncoder(w).Encode(&response) + if err != nil { + level.Error(s.logger).Log("msg", "Failed to encode response", "err", err) + w.Write([]byte("KO")) + } +} + +// GET /api/jobs +// Get jobs of a user based on query params +func (s *JobstatsServer) jobs(w http.ResponseWriter, r *http.Request) { + var response JobsResponse + s.setHeaders(w) + w.WriteHeader(http.StatusOK) + + // Get current user from header + user := s.getUser(r) + // If no user found, return empty response + if user == "" { + response = JobsResponse{ + Response: Response{ + Status: "error", + ErrorType: "User Error", + Error: "No user identified", + }, + Data: []BatchJob{}, + } + err := json.NewEncoder(w).Encode(&response) + if err != nil { + level.Error(s.logger).Log("msg", "Failed to encode response", "err", err) + w.Write([]byte("KO")) + } + return + } + + // Get query parameters + accounts := r.URL.Query()["account"] + var from, to string + // If no from provided, use 1 week from now as from + if from = r.URL.Query().Get("from"); from == "" { + from = time.Now().Add(-time.Duration(168) * time.Hour).Format(dateFormat) + } + if to = r.URL.Query().Get("to"); to == "" { + to = time.Now().Format(dateFormat) + } + + // Get all user jobs in the given time window + jobs, err := s.JobsGetter(user, accounts, from, to, s.logger) + if err != nil { + level.Error(s.logger).Log("msg", "Failed to retrieve jobs", "user", user, "err", err) + response = JobsResponse{ + Response: Response{ + Status: "error", + ErrorType: "Data error", + Error: "Failed to retrieve user jobs", + }, + Data: []BatchJob{}, + } + err = json.NewEncoder(w).Encode(&response) + if err != nil { + level.Error(s.logger).Log("msg", "Failed to encode response", "err", err) + w.Write([]byte("KO")) + } + return + } + + // Write response + response = JobsResponse{ + Response: Response{ + Status: "success", + }, + Data: jobs, + } + err = json.NewEncoder(w).Encode(&response) + if err != nil { + level.Error(s.logger).Log("msg", "Failed to encode response", "err", err) + w.Write([]byte("KO")) + } +} + +// Check status of server +func (s *JobstatsServer) health(w http.ResponseWriter, r *http.Request) { + if !s.HealthChecker(s.logger) { + w.Header().Set("X-Content-Type-Options", "nosniff") + w.WriteHeader(http.StatusServiceUnavailable) + w.Write([]byte("KO")) + } else { + w.Header().Set("X-Content-Type-Options", "nosniff") + w.WriteHeader(http.StatusOK) + w.Write([]byte("OK")) + } +} + +// Get user accounts using SQL query +func getAccounts(user string, logger log.Logger) ([]Account, error) { + // Prepare statement + stmt, err := db.Prepare(fmt.Sprintf("SELECT DISTINCT Account FROM %s WHERE Usr = ?", jobstatDBTable)) + if err != nil { + level.Error(logger).Log("msg", "Failed to prepare SQL statement for accounts query", "user", user, "err", err) + return nil, err + } + + defer stmt.Close() + rows, err := stmt.Query(user) + if err != nil { + level.Error(logger).Log("msg", "Failed to execute SQL statement for accounts query", "user", user, "err", err) + return nil, err + } + + // Loop through rows, using Scan to assign column data to struct fields. + var accounts []Account + for rows.Next() { + var account string + if err := rows.Scan(&account); err != nil { + level.Error(logger).Log("msg", "Could not scan row for accounts query", "user", user, "err", err) + } + accounts = append(accounts, Account{ID: account}) + } + level.Debug(logger).Log("msg", "Accounts found for user", "user", user, "accounts", accounts) + return accounts, nil +} + +// Get user jobs using SQL query +func getJobs(user string, accounts []string, from string, to string, logger log.Logger) ([]BatchJob, error) { + allFields := GetStructFieldName(BatchJob{}) + + // Prepare SQL statement + stmt, err := db.Prepare( + fmt.Sprintf("SELECT %s FROM %s WHERE Usr = ? AND Start BETWEEN ? AND ? AND Account IN (%s)", + strings.Join(allFields, ","), + jobstatDBTable, + strings.Join(strings.Split(strings.Repeat("?", len(accounts)), ""), ","), + ), + ) + if err != nil { + level.Error(logger).Log("msg", "Failed to prepare SQL statement for jobs query", "user", user, "err", err) + return nil, err + } + + defer stmt.Close() + + // Prepare query args + var args = []any{user, from, to} + for _, account := range accounts { + args = append(args, account) + } + + rows, err := stmt.Query(args...) + if err != nil { + level.Error(logger).Log("msg", "Failed to execute SQL statement for jobs query", "user", user, "err", err) + return nil, err + } + + // Loop through rows, using Scan to assign column data to struct fields. + var jobs []BatchJob + for rows.Next() { + var jobid, jobuuid, + partition, account, + group, gid, user, uid, + submit, start, end, elapsed, + exitcode, state, + nnodes, nodelist, nodelistExp string + if err := rows.Scan( + &jobid, + &jobuuid, + &partition, + &account, + &group, + &gid, + &user, + &uid, + &submit, + &start, + &end, + &elapsed, + &exitcode, + &state, + &nnodes, + &nodelist, + &nodelistExp, + ); err != nil { + level.Error(logger).Log("msg", "Could not scan row for accounts query", "user", user, "err", err) + } + jobs = append(jobs, + BatchJob{ + Jobid: jobid, + Jobuuid: jobuuid, + Partition: partition, + Account: account, + Grp: group, + Gid: gid, + Usr: user, + Uid: uid, + Submit: submit, + Start: start, + End: end, + Elapsed: elapsed, + Exitcode: exitcode, + State: state, + Nnodes: nnodes, + Nodelist: nodelist, + NodelistExp: nodelistExp, + }, + ) + } + level.Debug(logger).Log( + "msg", "Jobs found for user", "user", user, + "numjobs", len(jobs), "accounts", strings.Join(accounts, ","), "from", from, "to", to, + ) + return jobs, nil +} + +// Ping DB for connection test +func getDBStatus(logger log.Logger) bool { + err := db.Ping() + if err != nil { + level.Error(logger).Log("msg", "DB Ping failed", "err", err) + return false + } + return true +} diff --git a/pkg/jobstats/server_test.go b/pkg/jobstats/server_test.go new file mode 100644 index 00000000..c0e86b05 --- /dev/null +++ b/pkg/jobstats/server_test.go @@ -0,0 +1,164 @@ +package jobstats + +import ( + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "reflect" + "testing" + + "github.com/go-kit/log" +) + +func setupServer() *JobstatsServer { + logger := log.NewNopLogger() + server, _, _ := NewJobstatsServer(&Config{Logger: logger}) + server.AccountsGetter = getMockAccounts + server.JobsGetter = getMockJobs + return server +} + +func getMockAccounts(user string, logger log.Logger) ([]Account, error) { + return []Account{{ID: "foo"}, {ID: "bar"}}, nil +} + +func getMockJobs(user string, accounts []string, from string, to string, logger log.Logger) ([]BatchJob, error) { + return []BatchJob{{Jobid: "1000"}, {Jobid: "10001"}}, nil +} + +// Test /api/accounts when no user header found +func TestAccountsHandlerNoUserHeader(t *testing.T) { + server := setupServer() + // Create request + req := httptest.NewRequest(http.MethodGet, "/api/accounts", nil) + + // Start recorder + w := httptest.NewRecorder() + server.accounts(w, req) + res := w.Result() + defer res.Body.Close() + + // Get body + data, err := io.ReadAll(res.Body) + if err != nil { + t.Errorf("expected error to be nil got %v", err) + } + + // Unmarshal byte into structs. + var response AccountsResponse + json.Unmarshal(data, &response) + + if response.Status != "error" { + t.Errorf("expected error status got %v", response.Status) + } + if response.ErrorType != "User Error" { + t.Errorf("expected User Error type got %v", response.ErrorType) + } + if !reflect.DeepEqual(response.Data, []Account{}) { + t.Errorf("expected empty data got %v", response.Data) + } +} + +// Test /api/accounts when header found +func TestAccountsHandlerWithUserHeader(t *testing.T) { + server := setupServer() + // Create request + req := httptest.NewRequest(http.MethodGet, "/api/accounts", nil) + // Add user header + req.Header.Set("X-Grafana-User", "foo") + + // Start recorder + w := httptest.NewRecorder() + server.accounts(w, req) + res := w.Result() + defer res.Body.Close() + + // Get body + data, err := io.ReadAll(res.Body) + if err != nil { + t.Errorf("expected error to be nil got %v", err) + } + + // Expected result + expectedAccounts, _ := getMockAccounts("foo", logger) + + // Unmarshal byte into structs. + var response AccountsResponse + json.Unmarshal(data, &response) + + if response.Status != "success" { + t.Errorf("expected success status got %v", response.Status) + } + if !reflect.DeepEqual(response.Data, expectedAccounts) { + t.Errorf("expected %v got %v", expectedAccounts, response.Data) + } +} + +// Test /api/jobs when no user header found +func TestJobsHandlerNoUserHeader(t *testing.T) { + server := setupServer() + // Create request + req := httptest.NewRequest(http.MethodGet, "/api/jobs", nil) + + // Start recorder + w := httptest.NewRecorder() + server.jobs(w, req) + res := w.Result() + defer res.Body.Close() + + // Get body + data, err := io.ReadAll(res.Body) + if err != nil { + t.Errorf("expected error to be nil got %v", err) + } + + // Unmarshal byte into structs. + var response JobsResponse + json.Unmarshal(data, &response) + + if response.Status != "error" { + t.Errorf("expected error status got %v", response.Status) + } + if response.ErrorType != "User Error" { + t.Errorf("expected User Error type got %v", response.ErrorType) + } + if !reflect.DeepEqual(response.Data, []BatchJob{}) { + t.Errorf("expected empty data got %v", response.Data) + } +} + +// Test /api/jobs when user header found +func TestJobsHandlerWithUserHeader(t *testing.T) { + server := setupServer() + // Create request + req := httptest.NewRequest(http.MethodGet, "/api/jobs", nil) + // Add user header + req.Header.Set("X-Grafana-User", "foo") + + // Start recorder + w := httptest.NewRecorder() + server.jobs(w, req) + res := w.Result() + defer res.Body.Close() + + // Get body + data, err := io.ReadAll(res.Body) + if err != nil { + t.Errorf("expected error to be nil got %v", err) + } + + // Expected result + expectedJobs, _ := getMockJobs("foo", []string{"foo", "bar"}, "", "", logger) + + // Unmarshal byte into structs. + var response JobsResponse + json.Unmarshal(data, &response) + + if response.Status != "success" { + t.Errorf("expected success status got %v", response.Status) + } + if !reflect.DeepEqual(response.Data, expectedJobs) { + t.Errorf("expected %v got %v", expectedJobs, response.Data) + } +} diff --git a/pkg/jobstats/slurm.go b/pkg/jobstats/slurm.go index 68725081..f41b35b2 100644 --- a/pkg/jobstats/slurm.go +++ b/pkg/jobstats/slurm.go @@ -6,16 +6,15 @@ import ( "sync" "time" - "github.com/alecthomas/kingpin/v2" "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/mahendrapaipuri/batchjob_monitoring/pkg/utils" + "github.com/mahendrapaipuri/batchjob_monitoring/internal/helpers" ) var ( jobLock = sync.RWMutex{} slurmDateFormat = "2006-01-02T15:04:05" - sacctPath = kingpin.Flag( + sacctPath = JobstatDBApp.Flag( "slurm.sacct.path", "Absolute path to sacct executable.", ).Default("/usr/bin/sacct").String() @@ -27,7 +26,7 @@ func runSacctCmd(startTime string, endTime string, logger log.Logger) ([]byte, e "--format", "jobid,partition,account,group,gid,user,uid,submit,start,end,elapsed,exitcode,state,nnodes,nodelist,jobname,workdir", "--state", "CANCELLED,COMPLETED,FAILED,NODE_FAIL,PREEMPTED,TIMEOUT", "--starttime", startTime, "--endtime", endTime} - return utils.Execute(*sacctPath, args, logger) + return helpers.Execute(*sacctPath, args, logger) } // Parse sacct command output and return batchjob slice @@ -59,12 +58,12 @@ func parseSacctCmdOutput(sacctOutput string, logger log.Logger) ([]BatchJob, int wg.Done() return } - // Generate UUID from jobID, uid, gid, nodelist(lowercase), workdir(lowercase) - jobUuid, err := utils.GetUuidFromString( + // Generate UUID from jobID, uid, account, nodelist(lowercase), workdir(lowercase) + jobUuid, err := helpers.GetUuidFromString( []string{ components[0], components[6], - components[4], + strings.ToLower(components[2]), strings.ToLower(components[14]), strings.ToLower(components[16]), }, @@ -74,7 +73,7 @@ func parseSacctCmdOutput(sacctOutput string, logger log.Logger) ([]BatchJob, int Log("msg", "Failed to generate UUID for job", "jobid", jobid, "err", err) jobUuid = jobid } - allNodes := utils.NodelistParser(components[14]) + allNodes := NodelistParser(components[14]) nodelistExp := strings.Join(allNodes, "|") jobStat = BatchJob{ components[0], diff --git a/pkg/jobstats/slurm_test.go b/pkg/jobstats/slurm_test.go index 7450a5a4..eb63ea2c 100644 --- a/pkg/jobstats/slurm_test.go +++ b/pkg/jobstats/slurm_test.go @@ -15,7 +15,7 @@ var ( expectedBatchJobs = []BatchJob{ { Jobid: "1479763", - Jobuuid: "aaaf154c-e784-9e49-2155-4aa52462782a", + Jobuuid: "667127ae-68d0-47aa-78e3-76ae76e4aba7", Partition: "part1", Account: "acc1", Grp: "grp", @@ -34,7 +34,7 @@ var ( }, { Jobid: "1481508", - Jobuuid: "69683fe8-5d89-9ec5-4b4f-8404c7cc37f2", + Jobuuid: "6bfbe049-b5ee-ea02-6cd8-3dd95acdd0e4", Partition: "part1", Account: "acc1", Grp: "grp", diff --git a/pkg/jobstats/types.go b/pkg/jobstats/types.go index 3efeb788..d8966c34 100644 --- a/pkg/jobstats/types.go +++ b/pkg/jobstats/types.go @@ -1,8 +1,13 @@ package jobstats -import "github.com/go-kit/log" +import ( + "net/http" -type jobStats struct { + "github.com/go-kit/log" + "github.com/prometheus/exporter-toolkit/web" +) + +type jobStatsDB struct { logger log.Logger batchScheduler string jobstatDBPath string @@ -13,21 +18,61 @@ type jobStats struct { } type BatchJob struct { - Jobid string - Jobuuid string - Partition string - Account string - Grp string - Gid string - Usr string - Uid string - Submit string - Start string - End string - Elapsed string - Exitcode string - State string - Nnodes string - Nodelist string - NodelistExp string + Jobid string `json:"jobid"` + Jobuuid string `json:"id"` + Partition string `json:"partition"` + Account string `json:"account"` + Grp string `json:"group"` + Gid string `json:"gid"` + Usr string `json:"user"` + Uid string `json:"uid"` + Submit string `json:"submit"` + Start string `json:"start"` + End string `json:"end"` + Elapsed string `json:"elapsed"` + Exitcode string `json:"exitcode"` + State string `json:"state"` + Nnodes string `json:"nnodes"` + Nodelist string `json:"nodelist"` + NodelistExp string `json:"nodelistexp"` +} + +type Config struct { + Logger log.Logger + Address string + WebSystemdSocket bool + WebConfigFile string + JobstatDBFile string + JobstatDBTable string +} + +type Account struct { + ID string `json:"id"` +} + +type Response struct { + Status string `json:"status"` + Data []Account `json:"data"` + ErrorType string `json:"errorType"` + Error string `json:"error"` + Warnings []string `json:"warnings"` +} + +type AccountsResponse struct { + Response + Data []Account `json:"data"` +} + +type JobsResponse struct { + Response + Data []BatchJob `json:"data"` +} + +type JobstatsServer struct { + logger log.Logger + server *http.Server + webConfig *web.FlagConfig + AccountsGetter func(string, log.Logger) ([]Account, error) + JobsGetter func(string, []string, string, string, log.Logger) ([]BatchJob, error) + HealthChecker func(log.Logger) bool } diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go deleted file mode 100644 index 4bebf1af..00000000 --- a/pkg/utils/utils.go +++ /dev/null @@ -1,200 +0,0 @@ -//go:build !utils -// +build !utils - -package utils - -import ( - "encoding/json" - "fmt" - "io" - "net/http" - "net/url" - "os" - "os/exec" - "path/filepath" - "reflect" - "regexp" - "strconv" - "strings" - "time" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/google/uuid" - "github.com/zeebo/xxh3" -) - -const ( - OPENDATASOFT_API_PATH = `%s/api/records/1.0/search/?%s` - OPENDATASOFT_API_BASEURL = `https://odre.opendatasoft.com` -) - -var ( - nodelistRegExp = regexp.MustCompile(`(\[\d+\-\d+\])`) -) - -// Execute command and return stdout/stderr -func Execute(cmd string, args []string, logger log.Logger) ([]byte, error) { - level.Debug(logger).Log("msg", "Executing", "command", cmd, "args", fmt.Sprintf("%+v", args)) - out, err := exec.Command(cmd, args...).CombinedOutput() - if err != nil { - err = fmt.Errorf("error running %s: %s", cmd, err) - } - return out, err -} - -// Get all fields in a given struct -func GetStructFieldName(Struct interface{}) []string { - var fields []string - - v := reflect.ValueOf(Struct) - typeOfS := v.Type() - - for i := 0; i < v.NumField(); i++ { - fields = append(fields, typeOfS.Field(i).Name) - } - return fields -} - -// Get all values in a given struct -func GetStructFieldValue(Struct interface{}) []interface{} { - v := reflect.ValueOf(Struct) - values := make([]interface{}, v.NumField()) - - for i := 0; i < v.NumField(); i++ { - f := v.Field(i) - values = append(values, f.Interface()) - } - return values -} - -// Expand SLURM NODELIST into slice of nodenames -func NodelistParser(nodelistExp string) []string { - var nodeNames []string - // First split by , to get individual nodes - for _, nodeexp := range strings.Split(nodelistExp, ",") { - // If it contains "[", it means they are range of nodes - if strings.Contains(nodeexp, "[") { - matches := nodelistRegExp.FindAllString(nodeexp, -1) - if len(matches) == 0 { - continue - } - // Get only first match as we use recursion - for _, match := range matches[0:1] { - matchSansBrackets := match[1 : len(match)-1] - startIdx, err := strconv.Atoi(strings.Split(matchSansBrackets, "-")[0]) - if err != nil { - continue - } - endIdx, err := strconv.Atoi(strings.Split(matchSansBrackets, "-")[1]) - if err != nil { - continue - } - for i := startIdx; i <= endIdx; i++ { - nodename := strings.Replace(nodeexp, match, strconv.Itoa(i), -1) - // Add them to slice and call function again - nodeNames = append(nodeNames, NodelistParser(nodename)...) - } - } - - } else { - nodeNames = append(nodeNames, regexp.QuoteMeta(nodeexp)) - } - } - return nodeNames -} - -// Load cgroups v2 metrics from a given path -func LoadCgroupsV2Metrics( - name string, - cgroupfsPath string, - controllers []string, -) (map[string]float64, error) { - data := make(map[string]float64) - - for _, fName := range controllers { - contents, err := os.ReadFile(filepath.Join(cgroupfsPath, name, fName)) - if err != nil { - return data, err - } - for _, line := range strings.Split(string(contents), "\n") { - // Some of the above have a single value and others have a "data_name 123" - parts := strings.Fields(line) - indName := fName - indData := 0 - if len(parts) == 1 || len(parts) == 2 { - if len(parts) == 2 { - indName += "." + parts[0] - indData = 1 - } - if parts[indData] == "max" { - data[indName] = -1.0 - } else { - f, err := strconv.ParseFloat(parts[indData], 64) - if err == nil { - data[indName] = f - } else { - return data, err - } - } - } - } - } - return data, nil -} - -// Request to OPENDATASOFT API to get RTE energy data for France -func GetRteEnergyMixData() (float64, error) { - params := url.Values{} - params.Add("dataset", "eco2mix-national-tr") - params.Add("facet", "nature") - params.Add("facet", "date_heure") - params.Add("start", "0") - params.Add("rows", "1") - params.Add("sort", "date_heure") - params.Add( - "q", - fmt.Sprintf( - "date_heure:[%s TO #now()] AND NOT #null(taux_co2)", - time.Now().Format("2006-01-02"), - ), - ) - queryString := params.Encode() - - resp, err := http.DefaultClient.Get( - fmt.Sprintf(OPENDATASOFT_API_PATH, OPENDATASOFT_API_BASEURL, queryString), - ) - if err != nil { - return -1, err - } - - body, err := io.ReadAll(resp.Body) - if err != nil { - return -1, err - } - - var data nationalRealTimeResponse - err = json.Unmarshal(body, &data) - if err != nil { - return -1, err - } - - var fields []nationalRealTimeFields - for _, r := range data.Records { - fields = append(fields, r.Fields) - } - return float64(fields[0].TauxCo2), nil -} - -// Get a UUID5 for given slice of strings -func GetUuidFromString(stringSlice []string) (string, error) { - s := strings.Join(stringSlice[:], ",") - h := xxh3.HashString128(s).Bytes() - uuid, err := uuid.FromBytes(h[:]) - // hash := md5.Sum([]byte(s)) - // md5string := hex.EncodeToString(hash[:]) - // // generate the UUID from the - // // first 16 bytes of the MD5 hash - // uuid, err := uuid.FromBytes([]byte(md5string[0:16])) - return uuid.String(), err -} diff --git a/scripts/e2e-test.sh b/scripts/e2e-test.sh index 3d3ac716..1d1e73a2 100755 --- a/scripts/e2e-test.sh +++ b/scripts/e2e-test.sh @@ -11,21 +11,13 @@ skip_re="^(go_|batchjob_exporter_build_info|batchjob_scrape_collector_duration_s arch="$(uname -m)" -cgroups_mode=$([ $(stat -fc %T /sys/fs/cgroup/) = "cgroup2fs" ] && echo "unified" || ( [ -e /sys/fs/cgroup/unified/ ] && echo "hybrid" || echo "legacy")) -# cgroups_mode="legacy" -echo "cgroups mode detected is ${cgroups_mode}" - -case "${cgroups_mode}" in - legacy|hybrid) exporter_fixture='pkg/collector/fixtures/e2e-test-cgroupsv1-output.txt' ;; - *) exporter_fixture='pkg/collector/fixtures/e2e-test-cgroupsv2-output.txt' ;; -esac - -jobstats_fixture='pkg/jobstats/fixtures/jobstats.dump' - -keep=0; update=0; verbose=0 -while getopts 'hkuv' opt +package="exporter"; keep=0; update=0; verbose=0 +while getopts 'hp:kuv' opt do case "$opt" in + p) + package=$OPTARG + ;; k) keep=1 ;; @@ -37,7 +29,8 @@ do set -x ;; *) - echo "Usage: $0 [-k] [-u] [-v]" + echo "Usage: $0 [-p] [-k] [-u] [-v]" + echo " -p: package to test [options: exporter, stats_db, stats_server]" echo " -k: keep temporary files and leave batchjob_exporter running" echo " -u: update fixtures" echo " -v: verbose output" @@ -46,48 +39,54 @@ do esac done -if [ ! -x ./bin/batchjob_exporter ] +if [ "${package}" = "exporter" ] then - echo './bin/batchjob_exporter not found. Consider running `go build` first.' >&2 - exit 1 -fi + cgroups_mode=$([ $(stat -fc %T /sys/fs/cgroup/) = "cgroup2fs" ] && echo "unified" || ( [ -e /sys/fs/cgroup/unified/ ] && echo "hybrid" || echo "legacy")) + # cgroups_mode="legacy" + echo "cgroups mode detected: ${cgroups_mode}" -PATH=$PWD/pkg/collector/fixtures:$PATH ./bin/batchjob_exporter \ - --path.sysfs="pkg/collector/fixtures/sys" \ - --path.cgroupfs="pkg/collector/fixtures/sys/fs/cgroup" \ - --collector.slurm.unique.jobid \ - --collector.slurm.job.stat.path="pkg/collector/fixtures/slurmjobstat" \ - --collector.ipmi.dcmi.wrapper.path="pkg/collector/fixtures/ipmi-dcmi-wrapper.sh" \ - --collector.nvidia_gpu \ - --collector.nvidia.gpu.stat.path="pkg/collector/fixtures/gpustat" \ - --web.listen-address "127.0.0.1:${port}" \ - --log.level="debug" > "${tmpdir}/batchjob_exporter.log" 2>&1 & + case "${cgroups_mode}" in + legacy|hybrid) fixture='pkg/collector/fixtures/e2e-test-cgroupsv1-output.txt' ;; + *) fixture='pkg/collector/fixtures/e2e-test-cgroupsv2-output.txt' ;; + esac -echo $! > "${tmpdir}/batchjob_exporter.pid" + logfile="${tmpdir}/batchjob_exporter.log" + fixture_output="${tmpdir}/e2e-test-exporter-output.txt" + pidfile="${tmpdir}/batchjob_exporter.pid" +elif [ "${package}" = "stats_db" ] +then + fixture='pkg/jobstats/fixtures/jobstats.dump' + logfile="${tmpdir}/batchjob_stats_db.log" + fixture_output="${tmpdir}/e2e-test-stats-db-output.txt" + pidfile="${tmpdir}/batchjob_stats_db.pid" +else + fixture='pkg/jobstats/fixtures/jobstats.dump' + logfile="${tmpdir}/batchjob_stats_server.log" + fixture_output="${tmpdir}/e2e-test-stats-server-output.txt" + pidfile="${tmpdir}/batchjob_stats_server.pid" +fi finish() { if [ $? -ne 0 -o ${verbose} -ne 0 ] then cat << EOF >&2 LOG ===================== -$(cat "${tmpdir}/batchjob_exporter.log") -$(cat "${tmpdir}/batchjob_stats.log") +$(cat "${logfile}") ========================= EOF fi if [ ${update} -ne 0 ] then - cp "${tmpdir}/e2e-test-output.txt" "${exporter_fixture}" - cp "${tmpdir}/output.dump" "${jobstats_fixture}" + cp "${fixture_output}" "${fixture}" fi if [ ${keep} -eq 0 ] then - kill -9 "$(cat ${tmpdir}/batchjob_exporter.pid)" + kill -9 "$(cat ${pidfile})" # This silences the "Killed" message set +e - wait "$(cat ${tmpdir}/batchjob_exporter.pid)" > /dev/null 2>&1 + wait "$(cat ${pidfile})" > /dev/null 2>&1 rm -rf "${tmpdir}" fi } @@ -107,33 +106,82 @@ get() { fi } -sleep 1 +if [ "${package}" = "exporter" ] +then + if [ ! -x ./bin/batchjob_exporter ] + then + echo './bin/batchjob_exporter not found. Consider running `go build` first.' >&2 + exit 1 + fi -get "127.0.0.1:${port}/metrics" | grep -E -v "${skip_re}" > "${tmpdir}/e2e-test-output.txt" + PATH=$PWD/pkg/collector/fixtures:$PATH ./bin/batchjob_exporter \ + --path.sysfs="pkg/collector/fixtures/sys" \ + --path.cgroupfs="pkg/collector/fixtures/sys/fs/cgroup" \ + --collector.slurm.job.stat.path="pkg/collector/fixtures/slurmjobstat" \ + --collector.ipmi.dcmi.wrapper.path="pkg/collector/fixtures/ipmi-dcmi-wrapper.sh" \ + --collector.nvidia_gpu \ + --collector.nvidia.gpu.stat.path="pkg/collector/fixtures/gpustat" \ + --web.listen-address "127.0.0.1:${port}" \ + --log.level="debug" > "${logfile}" 2>&1 & -diff -u \ - "${exporter_fixture}" \ - "${tmpdir}/e2e-test-output.txt" + echo $! > "${pidfile}" + + sleep 1 -if [ ! -x ./bin/batchjob_stats ] + get "127.0.0.1:${port}/metrics" | grep -E -v "${skip_re}" > "${fixture_output}" +elif [ "${package}" = "stats_db" ] then - echo './bin/batchjob_stats not found. Consider running `go build` first.' >&2 - exit 1 -fi + if [ ! -x ./bin/batchjob_stats_db ] + then + echo './bin/batchjob_stats_db not found. Consider running `go build` first.' >&2 + exit 1 + fi + + ./bin/batchjob_stats_db \ + --slurm.sacct.path="pkg/jobstats/fixtures/sacct" \ + --path.data="${tmpdir}" \ + --log.level="debug" > "${logfile}" 2>&1 & + + echo $! > "${pidfile}" -./bin/batchjob_stats \ - --slurm.sacct.path="pkg/jobstats/fixtures/sacct" \ - --path.data="${tmpdir}" \ - --log.level="debug" > "${tmpdir}/batchjob_stats.log" 2>&1 + sleep 2 -if ! command -v sqlite3 &> /dev/null + if ! command -v sqlite3 &> /dev/null + then + echo "sqlite3 could not be found. Skipping batchjob_stats_db test..." + exit 0 + fi + + sqlite3 "${tmpdir}/jobstats.db" .dump >"${fixture_output}" +elif [ "${package}" = "stats_server" ] then - echo "sqlite3 could not be found. Skipping batchjob_stats test..." - exit 0 -fi + if [ ! -x ./bin/batchjob_stats_server ] + then + echo './bin/batchjob_stats_server not found. Consider running `go build` first.' >&2 + exit 1 + fi + + if ! command -v sqlite3 &> /dev/null + then + echo "sqlite3 could not be found. Skipping batchjob_stats_db test..." + exit 0 + fi + + cat "${fixture}" | sqlite3 "${tmpdir}/jobstats.db" + fixture='pkg/jobstats/fixtures/e2e-test-stats-server-output.txt' -sqlite3 "${tmpdir}/jobstats.db" .dump >"${tmpdir}/output.dump" + ./bin/batchjob_stats_server \ + --path.db="${tmpdir}/jobstats.db" \ + --web.listen-address="127.0.0.1:${port}" \ + --log.level="debug" > "${logfile}" 2>&1 & + + echo $! > "${pidfile}" + + sleep 2 + + get -H "X-Grafana-User: usr" "127.0.0.1:${port}/api/jobs?from=2023-02-20&to=2023-02-25&account=acc1" > "${fixture_output}" +fi diff -u \ - "${jobstats_fixture}" \ - "${tmpdir}/output.dump" + "${fixture}" \ + "${fixture_output}"