diff --git a/pkg/resmgr/policy/metrics.go b/pkg/resmgr/policy/metrics.go index 12ff57d3e..368bb18bf 100644 --- a/pkg/resmgr/policy/metrics.go +++ b/pkg/resmgr/policy/metrics.go @@ -15,13 +15,21 @@ package policy import ( + "strconv" + "sync" + "github.com/prometheus/client_golang/prometheus" + v1 "k8s.io/api/core/v1" "github.com/containers/nri-plugins/pkg/metrics" + "github.com/containers/nri-plugins/pkg/resmgr/cache" + system "github.com/containers/nri-plugins/pkg/sysfs" + "github.com/containers/nri-plugins/pkg/utils/cpuset" ) type PolicyCollector struct { policy *policy + block sync.Mutex } func (p *policy) newPolicyCollector() *PolicyCollector { @@ -41,3 +49,222 @@ func (c *PolicyCollector) Describe(ch chan<- *prometheus.Desc) { func (c *PolicyCollector) Collect(ch chan<- prometheus.Metric) { c.policy.active.GetMetrics().Collect(ch) } + +const ( + nodeCapacity = iota + nodeUsage + nodeContainers + cpuAllocation + cpuContainers + metricsCount +) + +type ( + SystemCollector struct { + cache cache.Cache + system system.System + Nodes map[int]*NodeMetric + Cpus map[int]*CpuMetric + Metrics []*prometheus.GaugeVec + } + NodeMetric struct { + Id int + IdLabel string + Type string + Capacity int64 + Usage int64 + ContainerCount int + } + CpuMetric struct { + Id int + IdLabel string + Allocation int + ContainerCount int + } +) + +func (p *policy) newSystemCollector() *SystemCollector { + s := &SystemCollector{ + cache: p.cache, + system: p.system, + Nodes: map[int]*NodeMetric{}, + Cpus: map[int]*CpuMetric{}, + Metrics: make([]*prometheus.GaugeVec, metricsCount, metricsCount), + } + + s.Metrics[nodeCapacity] = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "mem_node_capacity", + Help: "Capacity of the memory node.", + }, + []string{ + "node_id", + }, + ) + s.Metrics[nodeUsage] = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "mem_node_usage", + Help: "Usage of the memory node", + }, + []string{ + "node_id", + }, + ) + s.Metrics[nodeContainers] = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "mem_node_container_count", + Help: "Number of containers assigned to the memory node.", + }, + []string{ + "node_id", + }, + ) + s.Metrics[cpuAllocation] = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "cpu_allocation", + Help: "Total allocation of the CPU.", + }, + []string{ + "cpu_id", + }, + ) + s.Metrics[cpuContainers] = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "cpu_container_count", + Help: "Number of containers assigned to the CPU.", + }, + []string{ + "cpu_id", + }, + ) + + for _, id := range s.system.NodeIDs() { + var ( + sys = s.system.Node(id) + capa, used = s.getMemInfo(sys) + node = &NodeMetric{ + Id: sys.ID(), + IdLabel: strconv.Itoa(sys.ID()), + Type: sys.GetMemoryType().String(), + Capacity: capa, + Usage: used, + } + ) + s.Nodes[id] = node + } + + for _, id := range s.system.CPUIDs() { + cpu := &CpuMetric{ + Id: id, + IdLabel: strconv.Itoa(id), + } + s.Cpus[id] = cpu + } + + return s +} + +func (s *SystemCollector) Describe(ch chan<- *prometheus.Desc) { + s.Metrics[nodeCapacity].Describe(ch) + s.Metrics[nodeUsage].Describe(ch) + s.Metrics[nodeContainers].Describe(ch) + s.Metrics[cpuAllocation].Describe(ch) + s.Metrics[cpuContainers].Describe(ch) +} + +func (s *SystemCollector) Collect(ch chan<- prometheus.Metric) { + s.Update() + s.Metrics[nodeCapacity].Collect(ch) + s.Metrics[nodeUsage].Collect(ch) + s.Metrics[nodeContainers].Collect(ch) + s.Metrics[cpuAllocation].Collect(ch) + s.Metrics[cpuContainers].Collect(ch) +} + +func (s *SystemCollector) register() error { + return metrics.Register("system", s, metrics.WithGroup("policy")) +} + +func (s *SystemCollector) Update() { + for _, n := range s.Nodes { + sys := s.system.Node(n.Id) + capa, used := s.getMemInfo(sys) + + if n.Capacity == 0 { + n.Capacity = capa + s.Metrics[nodeCapacity].WithLabelValues(n.IdLabel).Set(float64(n.Capacity)) + } + + n.Usage = used + n.ContainerCount = 0 + } + + for _, c := range s.Cpus { + c.ContainerCount = 0 + c.Allocation = 0 + } + + for _, ctr := range s.cache.GetContainers() { + switch ctr.GetState() { + case cache.ContainerStateCreated: + case cache.ContainerStateRunning: + default: + continue + } + + var ( + cpu, mem = s.getCpuAndMemset(ctr) + req, _ = s.getCpuResources(ctr) + ) + + for _, id := range mem.List() { + if n, ok := s.Nodes[id]; ok { + n.ContainerCount++ + } + } + + for _, id := range cpu.List() { + if c, ok := s.Cpus[id]; ok { + c.ContainerCount++ + if cpu.Size() > 0 { + c.Allocation += req / cpu.Size() + } + } + } + } + + for _, n := range s.Nodes { + s.Metrics[nodeUsage].WithLabelValues(n.IdLabel).Set(float64(n.Usage)) + } + for _, c := range s.Cpus { + s.Metrics[cpuAllocation].WithLabelValues(c.IdLabel).Set(float64(c.Allocation)) + s.Metrics[cpuContainers].WithLabelValues(c.IdLabel).Set(float64(c.ContainerCount)) + } +} + +func (s *SystemCollector) getMemInfo(n system.Node) (capacity, used int64) { + if n != nil { + if i, _ := n.MemoryInfo(); i != nil { + return int64(i.MemTotal), int64(i.MemUsed) + } + } + return 0, 0 +} + +func (s *SystemCollector) getCpuAndMemset(ctr cache.Container) (cpu, mem cpuset.CPUSet) { + cset, _ := cpuset.Parse(ctr.GetCpusetCpus()) + mset, _ := cpuset.Parse(ctr.GetCpusetMems()) + return cset, mset +} + +func (s *SystemCollector) getCpuResources(ctr cache.Container) (request, limit int) { + res := ctr.GetResourceRequirements() + if qty, ok := res.Requests[v1.ResourceCPU]; ok { + request = int(qty.MilliValue()) + } + if qty, ok := res.Limits[v1.ResourceCPU]; ok { + limit = int(qty.MilliValue()) + } + + return request, limit +} diff --git a/pkg/resmgr/policy/policy.go b/pkg/resmgr/policy/policy.go index 5b7901e4f..8b74d8995 100644 --- a/pkg/resmgr/policy/policy.go +++ b/pkg/resmgr/policy/policy.go @@ -204,6 +204,7 @@ type policy struct { system system.System // system/HW/topology info sendEvent SendEventFn // function to send event up to the resource manager pcollect *PolicyCollector // policy metrics collector + scollect *SystemCollector // system metrics collector } // backend is a registered Backend. @@ -238,6 +239,12 @@ func NewPolicy(backend Backend, cache cache.Cache, o *Options) (Policy, error) { } p.pcollect = pcollect + scollect := p.newSystemCollector() + if err = scollect.register(); err != nil { + return nil, policyError("failed to register system collector: %v", err) + } + p.scollect = scollect + return p, nil }