Skip to content

Commit

Permalink
resmgr: query Pod Resource API for extra hints.
Browse files Browse the repository at this point in the history
Try querying Pod Resource API and generate extra topology
hints using container device assignments listed there.

Signed-off-by: Krisztian Litkey <[email protected]>
  • Loading branch information
klihub committed Dec 2, 2024
1 parent fc90e52 commit 917408e
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 32 deletions.
61 changes: 42 additions & 19 deletions pkg/resmgr/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
nri "github.com/containerd/nri/pkg/api"
v1 "k8s.io/api/core/v1"

"github.com/containers/nri-plugins/pkg/agent/podresapi"
"github.com/containers/nri-plugins/pkg/utils/cpuset"

resmgr "github.com/containers/nri-plugins/pkg/apis/resmgr/v1alpha1"
Expand Down Expand Up @@ -114,6 +115,10 @@ type Pod interface {
// and return the value of the first key found.
GetEffectiveAnnotation(key, container string) (string, bool)

// GetPodResources returns the pod resources for this pod, waiting for any
// pending fetch to complete or a timeout.
GetPodResources() *podresapi.PodResources

// GetContainerAffinity returns the affinity expressions for the named container.
GetContainerAffinity(string) ([]*Affinity, error)
// ScopeExpression returns an affinity expression for defining this pod as the scope.
Expand All @@ -138,12 +143,16 @@ type Pod interface {

// A cached pod.
type pod struct {
cache *cache // our cache of object
Pod *nri.PodSandbox // pod data from NRI
QOSClass v1.PodQOSClass // pod QOS class
Affinity *podContainerAffinity // annotated container affinity
prettyName string // cached PrettyName()
ctime time.Time // time of pod creation
cache *cache // our cache of object
Pod *nri.PodSandbox // pod data from NRI
QOSClass v1.PodQOSClass // pod QOS class
Affinity *podContainerAffinity // annotated container affinity
PodResources *podresapi.PodResources // pod resources acquired from podresourceapi
podResCh <-chan *podresapi.PodResources // channel for pod resource fetch
waitResCh chan struct{} // channel for waiting for pod resource fetch
prettyName string // cached PrettyName()
ctime time.Time // time of pod creation

}

// ContainerState is the container state in the runtime.
Expand Down Expand Up @@ -222,6 +231,9 @@ type Container interface {
// The requirements are calculated from the containers cgroup parameters.
GetResourceRequirements() v1.ResourceRequirements

// GetPodResources gets container-specific resources acquired from podresourceapi.
GetPodResources() *podresapi.ContainerResources

// SetResourceUpdates sets updated resources for a container. Returns true if the
// resources were really updated.
SetResourceUpdates(*nri.LinuxResources) bool
Expand Down Expand Up @@ -322,6 +334,7 @@ type container struct {
State ContainerState // current state of the container

Requirements v1.ResourceRequirements
PodResources *podresapi.ContainerResources
ResourceUpdates *v1.ResourceRequirements
request interface{}

Expand Down Expand Up @@ -359,7 +372,7 @@ type Cacheable interface {
// itself upon startup.
type Cache interface {
// InsertPod inserts a pod into the cache, using a runtime request or reply.
InsertPod(pod *nri.PodSandbox) (Pod, error)
InsertPod(pod *nri.PodSandbox, ch <-chan *podresapi.PodResources) Pod
// DeletePod deletes a pod from the cache.
DeletePod(id string) Pod
// LookupPod looks up a pod in the cache.
Expand Down Expand Up @@ -410,7 +423,7 @@ type Cache interface {
Save() error

// RefreshPods purges/inserts stale/new pods/containers using a pod sandbox list response.
RefreshPods([]*nri.PodSandbox) ([]Pod, []Pod, []Container)
RefreshPods([]*nri.PodSandbox, <-chan podresapi.PodResourcesList) ([]Pod, []Pod, []Container)
// RefreshContainers purges/inserts stale/new containers using a container list response.
RefreshContainers([]*nri.Container) ([]Container, []Container)

Expand Down Expand Up @@ -523,12 +536,12 @@ func (cch *cache) ResetActivePolicy() error {
}

// Insert a pod into the cache.
func (cch *cache) InsertPod(nriPod *nri.PodSandbox) (Pod, error) {
p := cch.createPod(nriPod)
func (cch *cache) InsertPod(nriPod *nri.PodSandbox, ch <-chan *podresapi.PodResources) Pod {
p := cch.createPod(nriPod, ch)
cch.Pods[nriPod.GetId()] = p
cch.Save()

return p, nil
return p
}

// Delete a pod from the cache.
Expand Down Expand Up @@ -620,7 +633,7 @@ func (cch *cache) LookupContainerByCgroup(path string) (Container, bool) {
}

// RefreshPods purges/inserts stale/new pods/containers into the cache.
func (cch *cache) RefreshPods(pods []*nri.PodSandbox) ([]Pod, []Pod, []Container) {
func (cch *cache) RefreshPods(pods []*nri.PodSandbox, resCh <-chan podresapi.PodResourcesList) ([]Pod, []Pod, []Container) {
valid := make(map[string]struct{})

add := []Pod{}
Expand All @@ -631,13 +644,8 @@ func (cch *cache) RefreshPods(pods []*nri.PodSandbox) ([]Pod, []Pod, []Container
valid[item.Id] = struct{}{}
if _, ok := cch.Pods[item.Id]; !ok {
log.Debug("inserting discovered pod %s...", item.Id)
pod, err := cch.InsertPod(item)
if err != nil {
log.Error("failed to insert discovered pod %s to cache: %v",
item.Id, err)
} else {
add = append(add, pod)
}
pod := cch.InsertPod(item, nil)
add = append(add, pod)
}
}
for _, pod := range cch.Pods {
Expand All @@ -655,6 +663,16 @@ func (cch *cache) RefreshPods(pods []*nri.PodSandbox) ([]Pod, []Pod, []Container
}
}

if resCh != nil {
podResList := <-resCh
if len(podResList) > 0 {
podResMap := podResList.Map()
for _, pod := range cch.Pods {
pod.setPodResources(podResMap.GetPod(pod.GetNamespace(), pod.GetName()))
}
}
}

return add, del, containers
}

Expand Down Expand Up @@ -686,6 +704,11 @@ func (cch *cache) RefreshContainers(containers []*nri.Container) ([]Container, [
c.State = ContainerStateStale
del = append(del, c)
}

pod, ok := cch.Pods[c.GetPodID()]
if ok {
c.PodResources = pod.GetPodResources().GetContainer(c.GetName())
}
}

return add, del
Expand Down
38 changes: 32 additions & 6 deletions pkg/resmgr/cache/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"strings"
"time"

"github.com/containers/nri-plugins/pkg/agent/podresapi"
resmgr "github.com/containers/nri-plugins/pkg/apis/resmgr/v1alpha1"
"github.com/containers/nri-plugins/pkg/cgroups"
"github.com/containers/nri-plugins/pkg/kubernetes"
Expand Down Expand Up @@ -89,18 +90,25 @@ func (c *container) getDenyPathList() (*PathList, bool, error) {
// Create and initialize a cached container.
func (cch *cache) createContainer(nriCtr *nri.Container) (*container, error) {
podID := nriCtr.GetPodSandboxId()
_, ok := cch.Pods[podID]
pod, ok := cch.Pods[podID]
if !ok {
return nil, cacheError("can't find cached pod %s for container %s (%s)",
podID, nriCtr.GetId(), nriCtr.GetName())
}

c := &container{
cache: cch,
Ctr: nriCtr,
State: nriCtr.GetState(),
Tags: make(map[string]string),
ctime: time.Now(),
cache: cch,
Ctr: nriCtr,
State: nriCtr.GetState(),
Tags: make(map[string]string),
ctime: time.Now(),
PodResources: pod.GetPodResources().GetContainer(nriCtr.GetName()),
}

if c.PodResources == nil {
log.Info("no pod resources for container %s", c.PrettyName())
} else {
log.Info("got pod resources %+v", c.PodResources)
}

c.generateTopologyHints()
Expand Down Expand Up @@ -233,9 +241,19 @@ func (c *container) generateTopologyHints() {
}
}
}

checkDenied := func(path string) bool {
return checkAllowedAndDeniedPaths(path, allowPathList, denyPathList)
}

if podRes := c.GetPodResources(); podRes != nil {
hints := podRes.GetDeviceTopologyHints(checkDenied)
c.TopologyHints = topology.MergeTopologyHints(c.TopologyHints, hints)
}
} else {
log.Info("automatic topology hint generation disabled for devices")
}

}

func isReadOnlyMount(m *nri.Mount) bool {
Expand Down Expand Up @@ -452,6 +470,14 @@ func (c *container) GetResourceRequirements() v1.ResourceRequirements {
return c.Requirements
}

func (c *container) GetPodResources() *podresapi.ContainerResources {
pod, ok := c.GetPod()
if !ok {
return nil
}
return pod.GetPodResources().GetContainer(c.GetName())
}

func (c *container) SetResourceUpdates(r *nri.LinuxResources) bool {
r = mergeNRIResources(r, c.Ctr.GetLinux().GetResources())

Expand Down
31 changes: 30 additions & 1 deletion pkg/resmgr/cache/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,22 @@ import (
nri "github.com/containerd/nri/pkg/api"
v1 "k8s.io/api/core/v1"

"github.com/containers/nri-plugins/pkg/agent/podresapi"
resmgr "github.com/containers/nri-plugins/pkg/apis/resmgr/v1alpha1"
"github.com/containers/nri-plugins/pkg/cgroups"
"github.com/containers/nri-plugins/pkg/kubernetes"
)

// Create and initialize a cached pod.
func (cch *cache) createPod(nriPod *nri.PodSandbox) *pod {
func (cch *cache) createPod(nriPod *nri.PodSandbox, ch <-chan *podresapi.PodResources) *pod {
p := &pod{
cache: cch,
Pod: nriPod,
ctime: time.Now(),
}

p.goFetchPodResources(ch)

if err := p.parseCgroupForQOSClass(); err != nil {
log.Error("pod %s: %v", p.PrettyName(), err)
}
Expand Down Expand Up @@ -133,6 +136,32 @@ func (p *pod) GetQOSClass() v1.PodQOSClass {
return p.QOSClass
}

func (p *pod) goFetchPodResources(ch <-chan *podresapi.PodResources) {
go func() {
p.podResCh = ch
p.waitResCh = make(chan struct{})
defer close(p.waitResCh)

if p.podResCh != nil {
p.PodResources = <-p.podResCh
log.Debug("fetched pod resources %+v for %s", p.PodResources, p.GetName())
}
}()
}

func (p *pod) setPodResources(podRes *podresapi.PodResources) {
p.PodResources = podRes
log.Debug("set pod resources %+v for %s", p.PodResources, p.GetName())
}

func (p *pod) GetPodResources() *podresapi.PodResources {
if p.waitResCh != nil {
log.Debug("waiting for pod resources fetch to complete...")
_ = <-p.waitResCh
}
return p.PodResources
}

func (p *pod) GetContainerAffinity(name string) ([]*Affinity, error) {
if p.Affinity != nil {
return (*p.Affinity)[name], nil
Expand Down
21 changes: 15 additions & 6 deletions pkg/resmgr/nri.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"os"
"time"

"github.com/containers/nri-plugins/pkg/instrumentation/metrics"
"github.com/containers/nri-plugins/pkg/instrumentation/tracing"
Expand All @@ -42,6 +43,11 @@ var (
nri = logger.NewLogger("nri-plugin")
)

const (
podResListTimeout = 2 * time.Second
podResGetTimeout = 1 * time.Second
)

func newNRIPlugin(resmgr *resmgr) (*nriPlugin, error) {
p := &nriPlugin{
resmgr: resmgr,
Expand Down Expand Up @@ -198,7 +204,7 @@ func (p *nriPlugin) syncWithNRI(pods []*api.PodSandbox, containers []*api.Contai

nri.Info("synchronizing cache state with NRI runtime...")

_, _, deleted := m.cache.RefreshPods(pods)
_, _, deleted := m.cache.RefreshPods(pods, m.agent.GoListPodResources(podResListTimeout))
for _, c := range deleted {
nri.Info("discovered stale container %s (%s)...", c.PrettyName(), c.GetID())
released = append(released, c)
Expand Down Expand Up @@ -290,18 +296,21 @@ func (p *nriPlugin) RunPodSandbox(ctx context.Context, pod *api.PodSandbox) (ret
span.End(tracing.WithStatus(retErr))
}()

m := p.resmgr
podResCh := m.agent.GoGetPodResources(pod.GetNamespace(), pod.GetName(), podResGetTimeout)

p.dump(in, event, pod)
defer func() {
p.dump(out, event, retErr)
}()

m := p.resmgr
m.Lock()
defer m.Unlock()
b := metrics.Block()
defer b.Done()

m.cache.InsertPod(pod)
m.cache.InsertPod(pod, podResCh)

return nil
}

Expand Down Expand Up @@ -383,19 +392,19 @@ func (p *nriPlugin) RemovePodSandbox(ctx context.Context, podSandbox *api.PodSan
return nil
}

func (p *nriPlugin) CreateContainer(ctx context.Context, podSandbox *api.PodSandbox, container *api.Container) (adjust *api.ContainerAdjustment, updates []*api.ContainerUpdate, retErr error) {
func (p *nriPlugin) CreateContainer(ctx context.Context, pod *api.PodSandbox, container *api.Container) (adjust *api.ContainerAdjustment, updates []*api.ContainerUpdate, retErr error) {
event := CreateContainer

_, span := tracing.StartSpan(
ctx,
event,
tracing.WithAttributes(containerSpanTags(podSandbox, container)...),
tracing.WithAttributes(containerSpanTags(pod, container)...),
)
defer func() {
span.End(tracing.WithStatus(retErr))
}()

p.dump(in, event, podSandbox, container)
p.dump(in, event, pod, container)
defer func() {
p.dump(out, event, adjust, updates, retErr)
}()
Expand Down

0 comments on commit 917408e

Please sign in to comment.