diff --git a/pkg/resmgr/cache/cache.go b/pkg/resmgr/cache/cache.go index ee11ce88a..dc0982617 100644 --- a/pkg/resmgr/cache/cache.go +++ b/pkg/resmgr/cache/cache.go @@ -27,6 +27,7 @@ import ( nri "github.com/containerd/nri/pkg/api" v1 "k8s.io/api/core/v1" + "github.com/containers/nri-plugins/pkg/agent/podresapi" "github.com/containers/nri-plugins/pkg/utils/cpuset" resmgr "github.com/containers/nri-plugins/pkg/apis/resmgr/v1alpha1" @@ -114,6 +115,10 @@ type Pod interface { // and return the value of the first key found. GetEffectiveAnnotation(key, container string) (string, bool) + // GetPodResources returns the pod resources for this pod, waiting for any + // pending fetch to complete or a timeout. + GetPodResources() *podresapi.PodResources + // GetContainerAffinity returns the affinity expressions for the named container. GetContainerAffinity(string) ([]*Affinity, error) // ScopeExpression returns an affinity expression for defining this pod as the scope. @@ -138,12 +143,16 @@ type Pod interface { // A cached pod. type pod struct { - cache *cache // our cache of object - Pod *nri.PodSandbox // pod data from NRI - QOSClass v1.PodQOSClass // pod QOS class - Affinity *podContainerAffinity // annotated container affinity - prettyName string // cached PrettyName() - ctime time.Time // time of pod creation + cache *cache // our cache of object + Pod *nri.PodSandbox // pod data from NRI + QOSClass v1.PodQOSClass // pod QOS class + Affinity *podContainerAffinity // annotated container affinity + PodResources *podresapi.PodResources // pod resources acquired from podresourceapi + podResCh <-chan *podresapi.PodResources // channel for pod resource fetch + waitResCh chan struct{} // channel for waiting for pod resource fetch + prettyName string // cached PrettyName() + ctime time.Time // time of pod creation + } // ContainerState is the container state in the runtime. @@ -222,6 +231,9 @@ type Container interface { // The requirements are calculated from the containers cgroup parameters. GetResourceRequirements() v1.ResourceRequirements + // GetPodResources gets container-specific resources acquired from podresourceapi. + GetPodResources() *podresapi.ContainerResources + // SetResourceUpdates sets updated resources for a container. Returns true if the // resources were really updated. SetResourceUpdates(*nri.LinuxResources) bool @@ -322,6 +334,7 @@ type container struct { State ContainerState // current state of the container Requirements v1.ResourceRequirements + PodResources *podresapi.ContainerResources ResourceUpdates *v1.ResourceRequirements request interface{} @@ -359,7 +372,7 @@ type Cacheable interface { // itself upon startup. type Cache interface { // InsertPod inserts a pod into the cache, using a runtime request or reply. - InsertPod(pod *nri.PodSandbox) (Pod, error) + InsertPod(pod *nri.PodSandbox, ch <-chan *podresapi.PodResources) Pod // DeletePod deletes a pod from the cache. DeletePod(id string) Pod // LookupPod looks up a pod in the cache. @@ -410,7 +423,7 @@ type Cache interface { Save() error // RefreshPods purges/inserts stale/new pods/containers using a pod sandbox list response. - RefreshPods([]*nri.PodSandbox) ([]Pod, []Pod, []Container) + RefreshPods([]*nri.PodSandbox, <-chan podresapi.PodResourcesList) ([]Pod, []Pod, []Container) // RefreshContainers purges/inserts stale/new containers using a container list response. RefreshContainers([]*nri.Container) ([]Container, []Container) @@ -523,12 +536,12 @@ func (cch *cache) ResetActivePolicy() error { } // Insert a pod into the cache. -func (cch *cache) InsertPod(nriPod *nri.PodSandbox) (Pod, error) { - p := cch.createPod(nriPod) +func (cch *cache) InsertPod(nriPod *nri.PodSandbox, ch <-chan *podresapi.PodResources) Pod { + p := cch.createPod(nriPod, ch) cch.Pods[nriPod.GetId()] = p cch.Save() - return p, nil + return p } // Delete a pod from the cache. @@ -620,7 +633,7 @@ func (cch *cache) LookupContainerByCgroup(path string) (Container, bool) { } // RefreshPods purges/inserts stale/new pods/containers into the cache. -func (cch *cache) RefreshPods(pods []*nri.PodSandbox) ([]Pod, []Pod, []Container) { +func (cch *cache) RefreshPods(pods []*nri.PodSandbox, resCh <-chan podresapi.PodResourcesList) ([]Pod, []Pod, []Container) { valid := make(map[string]struct{}) add := []Pod{} @@ -631,13 +644,8 @@ func (cch *cache) RefreshPods(pods []*nri.PodSandbox) ([]Pod, []Pod, []Container valid[item.Id] = struct{}{} if _, ok := cch.Pods[item.Id]; !ok { log.Debug("inserting discovered pod %s...", item.Id) - pod, err := cch.InsertPod(item) - if err != nil { - log.Error("failed to insert discovered pod %s to cache: %v", - item.Id, err) - } else { - add = append(add, pod) - } + pod := cch.InsertPod(item, nil) + add = append(add, pod) } } for _, pod := range cch.Pods { @@ -655,6 +663,16 @@ func (cch *cache) RefreshPods(pods []*nri.PodSandbox) ([]Pod, []Pod, []Container } } + if resCh != nil { + podResList := <-resCh + if len(podResList) > 0 { + podResMap := podResList.Map() + for _, pod := range cch.Pods { + pod.setPodResources(podResMap.GetPod(pod.GetNamespace(), pod.GetName())) + } + } + } + return add, del, containers } @@ -686,6 +704,11 @@ func (cch *cache) RefreshContainers(containers []*nri.Container) ([]Container, [ c.State = ContainerStateStale del = append(del, c) } + + pod, ok := cch.Pods[c.GetPodID()] + if ok { + c.PodResources = pod.GetPodResources().GetContainer(c.GetName()) + } } return add, del diff --git a/pkg/resmgr/cache/container.go b/pkg/resmgr/cache/container.go index 306492883..3031e1f5b 100644 --- a/pkg/resmgr/cache/container.go +++ b/pkg/resmgr/cache/container.go @@ -24,6 +24,7 @@ import ( "strings" "time" + "github.com/containers/nri-plugins/pkg/agent/podresapi" resmgr "github.com/containers/nri-plugins/pkg/apis/resmgr/v1alpha1" "github.com/containers/nri-plugins/pkg/cgroups" "github.com/containers/nri-plugins/pkg/kubernetes" @@ -89,18 +90,25 @@ func (c *container) getDenyPathList() (*PathList, bool, error) { // Create and initialize a cached container. func (cch *cache) createContainer(nriCtr *nri.Container) (*container, error) { podID := nriCtr.GetPodSandboxId() - _, ok := cch.Pods[podID] + pod, ok := cch.Pods[podID] if !ok { return nil, cacheError("can't find cached pod %s for container %s (%s)", podID, nriCtr.GetId(), nriCtr.GetName()) } c := &container{ - cache: cch, - Ctr: nriCtr, - State: nriCtr.GetState(), - Tags: make(map[string]string), - ctime: time.Now(), + cache: cch, + Ctr: nriCtr, + State: nriCtr.GetState(), + Tags: make(map[string]string), + ctime: time.Now(), + PodResources: pod.GetPodResources().GetContainer(nriCtr.GetName()), + } + + if c.PodResources == nil { + log.Info("no pod resources for container %s", c.PrettyName()) + } else { + log.Info("got pod resources %+v", c.PodResources) } c.generateTopologyHints() @@ -233,9 +241,19 @@ func (c *container) generateTopologyHints() { } } } + + checkDenied := func(path string) bool { + return checkAllowedAndDeniedPaths(path, allowPathList, denyPathList) + } + + if podRes := c.GetPodResources(); podRes != nil { + hints := podRes.GetDeviceTopologyHints(checkDenied) + c.TopologyHints = topology.MergeTopologyHints(c.TopologyHints, hints) + } } else { log.Info("automatic topology hint generation disabled for devices") } + } func isReadOnlyMount(m *nri.Mount) bool { @@ -452,6 +470,14 @@ func (c *container) GetResourceRequirements() v1.ResourceRequirements { return c.Requirements } +func (c *container) GetPodResources() *podresapi.ContainerResources { + pod, ok := c.GetPod() + if !ok { + return nil + } + return pod.GetPodResources().GetContainer(c.GetName()) +} + func (c *container) SetResourceUpdates(r *nri.LinuxResources) bool { r = mergeNRIResources(r, c.Ctr.GetLinux().GetResources()) diff --git a/pkg/resmgr/cache/pod.go b/pkg/resmgr/cache/pod.go index c69bcbe56..2c82083b5 100644 --- a/pkg/resmgr/cache/pod.go +++ b/pkg/resmgr/cache/pod.go @@ -21,19 +21,22 @@ import ( nri "github.com/containerd/nri/pkg/api" v1 "k8s.io/api/core/v1" + "github.com/containers/nri-plugins/pkg/agent/podresapi" resmgr "github.com/containers/nri-plugins/pkg/apis/resmgr/v1alpha1" "github.com/containers/nri-plugins/pkg/cgroups" "github.com/containers/nri-plugins/pkg/kubernetes" ) // Create and initialize a cached pod. -func (cch *cache) createPod(nriPod *nri.PodSandbox) *pod { +func (cch *cache) createPod(nriPod *nri.PodSandbox, ch <-chan *podresapi.PodResources) *pod { p := &pod{ cache: cch, Pod: nriPod, ctime: time.Now(), } + p.goFetchPodResources(ch) + if err := p.parseCgroupForQOSClass(); err != nil { log.Error("pod %s: %v", p.PrettyName(), err) } @@ -133,6 +136,32 @@ func (p *pod) GetQOSClass() v1.PodQOSClass { return p.QOSClass } +func (p *pod) goFetchPodResources(ch <-chan *podresapi.PodResources) { + go func() { + p.podResCh = ch + p.waitResCh = make(chan struct{}) + defer close(p.waitResCh) + + if p.podResCh != nil { + p.PodResources = <-p.podResCh + log.Debug("fetched pod resources %+v for %s", p.PodResources, p.GetName()) + } + }() +} + +func (p *pod) setPodResources(podRes *podresapi.PodResources) { + p.PodResources = podRes + log.Debug("set pod resources %+v for %s", p.PodResources, p.GetName()) +} + +func (p *pod) GetPodResources() *podresapi.PodResources { + if p.waitResCh != nil { + log.Debug("waiting for pod resources fetch to complete...") + _ = <-p.waitResCh + } + return p.PodResources +} + func (p *pod) GetContainerAffinity(name string) ([]*Affinity, error) { if p.Affinity != nil { return (*p.Affinity)[name], nil diff --git a/pkg/resmgr/nri.go b/pkg/resmgr/nri.go index 379b992db..ba7c58eba 100644 --- a/pkg/resmgr/nri.go +++ b/pkg/resmgr/nri.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "os" + "time" "github.com/containers/nri-plugins/pkg/instrumentation/metrics" "github.com/containers/nri-plugins/pkg/instrumentation/tracing" @@ -42,6 +43,11 @@ var ( nri = logger.NewLogger("nri-plugin") ) +const ( + podResListTimeout = 2 * time.Second + podResGetTimeout = 1 * time.Second +) + func newNRIPlugin(resmgr *resmgr) (*nriPlugin, error) { p := &nriPlugin{ resmgr: resmgr, @@ -198,7 +204,7 @@ func (p *nriPlugin) syncWithNRI(pods []*api.PodSandbox, containers []*api.Contai nri.Info("synchronizing cache state with NRI runtime...") - _, _, deleted := m.cache.RefreshPods(pods) + _, _, deleted := m.cache.RefreshPods(pods, m.agent.GoListPodResources(podResListTimeout)) for _, c := range deleted { nri.Info("discovered stale container %s (%s)...", c.PrettyName(), c.GetID()) released = append(released, c) @@ -290,18 +296,21 @@ func (p *nriPlugin) RunPodSandbox(ctx context.Context, pod *api.PodSandbox) (ret span.End(tracing.WithStatus(retErr)) }() + m := p.resmgr + podResCh := m.agent.GoGetPodResources(pod.GetNamespace(), pod.GetName(), podResGetTimeout) + p.dump(in, event, pod) defer func() { p.dump(out, event, retErr) }() - m := p.resmgr m.Lock() defer m.Unlock() b := metrics.Block() defer b.Done() - m.cache.InsertPod(pod) + m.cache.InsertPod(pod, podResCh) + return nil } @@ -383,19 +392,19 @@ func (p *nriPlugin) RemovePodSandbox(ctx context.Context, podSandbox *api.PodSan return nil } -func (p *nriPlugin) CreateContainer(ctx context.Context, podSandbox *api.PodSandbox, container *api.Container) (adjust *api.ContainerAdjustment, updates []*api.ContainerUpdate, retErr error) { +func (p *nriPlugin) CreateContainer(ctx context.Context, pod *api.PodSandbox, container *api.Container) (adjust *api.ContainerAdjustment, updates []*api.ContainerUpdate, retErr error) { event := CreateContainer _, span := tracing.StartSpan( ctx, event, - tracing.WithAttributes(containerSpanTags(podSandbox, container)...), + tracing.WithAttributes(containerSpanTags(pod, container)...), ) defer func() { span.End(tracing.WithStatus(retErr)) }() - p.dump(in, event, podSandbox, container) + p.dump(in, event, pod, container) defer func() { p.dump(out, event, adjust, updates, retErr) }()