Skip to content

Commit

Permalink
WiP: policy: implement policy system metrics.
Browse files Browse the repository at this point in the history
Signed-off-by: Krisztian Litkey <[email protected]>
  • Loading branch information
klihub committed Nov 10, 2024
1 parent 342a491 commit 067c3a8
Show file tree
Hide file tree
Showing 3 changed files with 291 additions and 25 deletions.
25 changes: 21 additions & 4 deletions pkg/resmgr/policy/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,28 @@ type Collector struct {

var _ prometheus.Collector = &Collector{}

func (p *policy) registerCollector() error {
collector := &Collector{policy: p}
p.collector = collector
func (p *policy) registerMetrics() (*Collector, *SystemMetrics, error) {
pc := p.NewCollector()
sm := p.NewSystemMetrics()

return metrics.Register(p.ActivePolicy(), collector, metrics.WithGroup("policy"))
if err := pc.Register(); err != nil {
return nil, nil, err
}
if err := sm.Register(); err != nil {
return nil, nil, err
}

return pc, sm, nil
}

func (p *policy) NewCollector() *Collector {
return &Collector{
policy: p,
}
}

func (c *Collector) Register() error {
return metrics.Register(c.policy.ActivePolicy(), c, metrics.WithGroup("policy"))
}

func (c *Collector) Describe(ch chan<- *prometheus.Desc) {
Expand Down
47 changes: 26 additions & 21 deletions pkg/resmgr/policy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,13 @@ type ZoneAttribute struct {

// Policy instance/state.
type policy struct {
options Options // policy options
cache cache.Cache // system state cache
active Backend // our active backend
system system.System // system/HW/topology info
sendEvent SendEventFn // function to send event up to the resource manager
collector *Collector // policy metrics collector
options Options // policy options
cache cache.Cache // system state cache
active Backend // our active backend
system system.System // system/HW/topology info
sendEvent SendEventFn // function to send event up to the resource manager
pcollect *Collector // policy metrics collector
smetrics *SystemMetrics // system metrics collector
}

// backend is a registered Backend.
Expand All @@ -226,10 +227,20 @@ func NewPolicy(backend Backend, cache cache.Cache, o *Options) (Policy, error) {
active: backend,
}

if err := p.registerCollector(); err != nil {
sys, err := system.DiscoverSystem()
if err != nil {
return nil, policyError("failed to discover system topology: %v", err)
}
p.system = sys

pc, sm, err := p.registerMetrics()
if err != nil {
return nil, err
}

p.pcollect = pc
p.smetrics = sm

return p, nil
}

Expand All @@ -242,12 +253,6 @@ func (p *policy) ActivePolicy() string {

// Start starts up policy, preparing it for serving requests.
func (p *policy) Start(cfg interface{}) error {
sys, err := system.DiscoverSystem()
if err != nil {
return policyError("failed to discover system topology: %v", err)
}
p.system = sys

log.Info("activating '%s' policy...", p.active.Name())

if err := p.active.Setup(&BackendOptions{
Expand All @@ -273,29 +278,29 @@ func (p *policy) Reconfigure(cfg interface{}) error {

// Sync synchronizes the active policy state.
func (p *policy) Sync(add []cache.Container, del []cache.Container) error {
p.collector.Block()
defer p.collector.Unblock()
p.pcollect.Block()
defer p.pcollect.Unblock()
return p.active.Sync(add, del)
}

// AllocateResources allocates resources for a container.
func (p *policy) AllocateResources(c cache.Container) error {
p.collector.Block()
defer p.collector.Unblock()
p.pcollect.Block()
defer p.pcollect.Unblock()
return p.active.AllocateResources(c)
}

// ReleaseResources release resources of a container.
func (p *policy) ReleaseResources(c cache.Container) error {
p.collector.Block()
defer p.collector.Unblock()
p.pcollect.Block()
defer p.pcollect.Unblock()
return p.active.ReleaseResources(c)
}

// UpdateResources updates resource allocations of a container.
func (p *policy) UpdateResources(c cache.Container) error {
p.collector.Block()
defer p.collector.Unblock()
p.pcollect.Block()
defer p.pcollect.Unblock()
return p.active.UpdateResources(c)
}

Expand Down
244 changes: 244 additions & 0 deletions pkg/resmgr/policy/system-metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
// Copyright The NRI Plugins Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package policy

import (
"strconv"

"github.com/containers/nri-plugins/pkg/metrics"
"github.com/containers/nri-plugins/pkg/resmgr/cache"
system "github.com/containers/nri-plugins/pkg/sysfs"
"github.com/containers/nri-plugins/pkg/utils/cpuset"
"github.com/prometheus/client_golang/prometheus"
v1 "k8s.io/api/core/v1"
)

const (
nodeCapacity = iota
nodeUsage
nodeContainers
cpuAllocation
cpuContainers
metricsCount
)

type (
SystemMetrics struct {
cache cache.Cache
system system.System
Nodes map[int]*NodeMetric
Cpus map[int]*CpuMetric
Metrics []*prometheus.GaugeVec
}
NodeMetric struct {
Id int
IdLabel string
Type string
Capacity int64
Usage int64
ContainerCount int
}
CpuMetric struct {
Id int
IdLabel string
Allocation int
ContainerCount int
}
)

func (p *policy) NewSystemMetrics() *SystemMetrics {
s := &SystemMetrics{
cache: p.cache,
system: p.system,
Nodes: map[int]*NodeMetric{},
Cpus: map[int]*CpuMetric{},
Metrics: make([]*prometheus.GaugeVec, metricsCount, metricsCount),
}

s.Metrics[nodeCapacity] = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "mem_node_capacity",
Help: "Capacity of the memory node.",
},
[]string{
"node_id",
},
)
s.Metrics[nodeUsage] = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "mem_node_usage",
Help: "Usage of the memory node",
},
[]string{
"node_id",
},
)
s.Metrics[nodeContainers] = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "mem_node_container_count",
Help: "Number of containers assigned to the memory node.",
},
[]string{
"node_id",
},
)
s.Metrics[cpuAllocation] = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "cpu_allocation",
Help: "Total allocation of the CPU.",
},
[]string{
"cpu_id",
},
)
s.Metrics[cpuContainers] = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "cpu_container_count",
Help: "Number of containers assigned to the CPU.",
},
[]string{
"cpu_id",
},
)

for _, id := range s.system.NodeIDs() {
var (
sys = s.system.Node(id)
capa, used = s.getMemInfo(sys)
node = &NodeMetric{
Id: sys.ID(),
IdLabel: strconv.Itoa(sys.ID()),
Type: sys.GetMemoryType().String(),
Capacity: capa,
Usage: used,
}
)
s.Nodes[id] = node
}

for _, id := range s.system.CPUIDs() {
cpu := &CpuMetric{
Id: id,
IdLabel: strconv.Itoa(id),
}
s.Cpus[id] = cpu
}

return s
}

func (s *SystemMetrics) Describe(ch chan<- *prometheus.Desc) {
s.Metrics[nodeCapacity].Describe(ch)
s.Metrics[nodeUsage].Describe(ch)
s.Metrics[nodeContainers].Describe(ch)
s.Metrics[cpuAllocation].Describe(ch)
s.Metrics[cpuContainers].Describe(ch)
}

func (s *SystemMetrics) Collect(ch chan<- prometheus.Metric) {
s.Update()
s.Metrics[nodeCapacity].Collect(ch)
s.Metrics[nodeUsage].Collect(ch)
s.Metrics[nodeContainers].Collect(ch)
s.Metrics[cpuAllocation].Collect(ch)
s.Metrics[cpuContainers].Collect(ch)
}

func (s *SystemMetrics) Register() error {
return metrics.Register("system", s, metrics.WithGroup("policy"))
}

func (s *SystemMetrics) Update() {
for _, n := range s.Nodes {
sys := s.system.Node(n.Id)
capa, used := s.getMemInfo(sys)

if n.Capacity == 0 {
n.Capacity = capa
s.Metrics[nodeCapacity].WithLabelValues(n.IdLabel).Set(float64(n.Capacity))
}

n.Usage = used
n.ContainerCount = 0
}

for _, c := range s.Cpus {
c.ContainerCount = 0
}

for _, ctr := range s.cache.GetContainers() {
switch ctr.GetState() {
case cache.ContainerStateCreated:
case cache.ContainerStateRunning:
default:
continue
}

var (
cpu, mem = s.getCpuAndMemset(ctr)
req, _ = s.getCpuResources(ctr)
)

for _, id := range mem.List() {
if n, ok := s.Nodes[id]; ok {
n.ContainerCount++
}
}

for _, id := range cpu.List() {
if c, ok := s.Cpus[id]; ok {
c.ContainerCount++
if cpu.Size() > 0 {
c.Allocation += req / cpu.Size()
}
}
}
}

for _, n := range s.Nodes {
s.Metrics[nodeUsage].WithLabelValues(n.IdLabel).Set(float64(n.Usage))
}
for _, c := range s.Cpus {
s.Metrics[cpuAllocation].WithLabelValues(c.IdLabel).Set(float64(c.Allocation))
s.Metrics[cpuContainers].WithLabelValues(c.IdLabel).Set(float64(c.ContainerCount))
}
}

func (s *SystemMetrics) getMemInfo(n system.Node) (capacity, used int64) {
if n != nil {
if i, _ := n.MemoryInfo(); i != nil {
return int64(i.MemTotal), int64(i.MemUsed)
}
}
return 0, 0
}

func (s *SystemMetrics) getCpuAndMemset(ctr cache.Container) (cpu, mem cpuset.CPUSet) {
cset, _ := cpuset.Parse(ctr.GetCpusetCpus())
mset, _ := cpuset.Parse(ctr.GetCpusetMems())
return cset, mset
}

func (s *SystemMetrics) getCpuResources(ctr cache.Container) (request, limit int) {
res := ctr.GetResourceRequirements()
if qty, ok := res.Requests[v1.ResourceCPU]; ok {
request = int(qty.MilliValue())
}
if qty, ok := res.Limits[v1.ResourceCPU]; ok {
limit = int(qty.MilliValue())
}

return request, limit
}

0 comments on commit 067c3a8

Please sign in to comment.