Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libmem: implement policy-agnostic memory allocation/accounting. #332

Merged
merged 10 commits into from
Oct 7, 2024
115 changes: 105 additions & 10 deletions cmd/plugins/balloons/policy/balloons-policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/containers/nri-plugins/pkg/resmgr/cache"
cpucontrol "github.com/containers/nri-plugins/pkg/resmgr/control/cpu"
"github.com/containers/nri-plugins/pkg/resmgr/events"
libmem "github.com/containers/nri-plugins/pkg/resmgr/lib/memory"
policy "github.com/containers/nri-plugins/pkg/resmgr/policy"
"github.com/containers/nri-plugins/pkg/utils"
"github.com/containers/nri-plugins/pkg/utils/cpuset"
Expand Down Expand Up @@ -79,6 +80,7 @@ type balloons struct {
balloons []*Balloon // balloon instances: reserved, default and user-defined

cpuAllocator cpuallocator.CPUAllocator // CPU allocator used by the policy
memAllocator *libmem.Allocator // memory allocator used by the policy
}

// Balloon contains attributes of a balloon instance
Expand Down Expand Up @@ -170,6 +172,12 @@ func (p *balloons) Setup(policyOptions *policy.BackendOptions) error {
p.cch = policyOptions.Cache
p.cpuAllocator = cpuallocator.NewCPUAllocator(policyOptions.System)

malloc, err := libmem.NewAllocator(libmem.WithSystemNodes(policyOptions.System))
if err != nil {
return balloonsError("failed to create memory allocator: %w", err)
}
p.memAllocator = malloc

log.Info("setting up %s policy...", PolicyName)
if p.cpuTree, err = NewCpuTreeFromSystem(); err != nil {
log.Errorf("creating CPU topology tree failed: %s", err)
Expand Down Expand Up @@ -1268,14 +1276,7 @@ func (p *balloons) fillFarFromDevices(blnDefs []*BalloonDef) {
// closestMems returns memory node IDs good for pinning containers
// that run on given CPUs
func (p *balloons) closestMems(cpus cpuset.CPUSet) idset.IDSet {
mems := idset.NewIDSet()
sys := p.options.System
for _, nodeID := range sys.NodeIDs() {
if !cpus.Intersection(sys.Node(nodeID).CPUSet()).IsEmpty() {
mems.Add(nodeID)
}
}
return mems
return idset.NewIDSet(p.memAllocator.CPUSetAffinity(cpus).Slice()...)
}

// filterBalloons returns balloons for which the test function returns true
Expand Down Expand Up @@ -1465,6 +1466,9 @@ func (p *balloons) assignContainer(c cache.Container, bln *Balloon) {

// dismissContainer removes a container from a balloon
func (p *balloons) dismissContainer(c cache.Container, bln *Balloon) {
if err := p.memAllocator.Release(c.GetID()); err != nil {
log.Error("dismissContainer: failed to release memory for %s: %v", c.PrettyName(), err)
}
podID := c.GetPodID()
bln.PodIDs[podID] = removeString(bln.PodIDs[podID], c.GetID())
if len(bln.PodIDs[podID]) == 0 {
Expand All @@ -1486,11 +1490,102 @@ func (p *balloons) pinCpuMem(c cache.Container, cpus cpuset.CPUSet, mems idset.I
if p.bpoptions.PinMemory == nil || *p.bpoptions.PinMemory {
if c.PreserveMemoryResources() {
log.Debug(" - preserving %s pinning to memory %q", c.PrettyName, c.GetCpusetMems())
preserveMems, err := parseIDSet(c.GetCpusetMems())
if err != nil {
log.Error("failed to parse CpusetMems: %v", err)
} else {
zone := p.allocMem(c, preserveMems, 0, true)
log.Debug(" - allocated preserved memory %s", c.PrettyName, zone)
c.SetCpusetMems(zone.MemsetString())
klihub marked this conversation as resolved.
Show resolved Hide resolved
}
} else {
log.Debug(" - pinning %s to memory %s", c.PrettyName(), mems)
c.SetCpusetMems(mems.String())
memTypeMask, err := c.MemoryTypes()
if err != nil {
log.Error("%v", err)
}
if memTypeMask != 0 {
// memory-type pod/container-specific
// annotation overrides balloon's
// memory options that are the default
// to all containers in the balloon.
log.Debug(" - %s memory-type annotation overrides balloon mems %s", c.PrettyName(), mems)
}
log.Debug(" - requested %s to memory %s (types %s)", c.PrettyName(), mems, memTypeMask)
zone := p.allocMem(c, mems, memTypeMask, false)
log.Debug(" - allocated %s to memory %s", c.PrettyName(), zone)
c.SetCpusetMems(zone.MemsetString())
}
}
}

func (p *balloons) allocMem(c cache.Container, mems idset.IDSet, types libmem.TypeMask, preserve bool) libmem.NodeMask {
var (
amount = getMemoryLimit(c)
nodes = libmem.NewNodeMask(mems.Members()...)
req *libmem.Request
zone libmem.NodeMask
updates map[string]libmem.NodeMask
err error
)

if _, ok := p.memAllocator.AssignedZone(c.GetID()); !ok {
if preserve {
req = libmem.PreservedContainer(
c.GetID(),
c.PrettyName(),
amount,
nodes,
)
} else {
req = libmem.ContainerWithTypes(
c.GetID(),
c.PrettyName(),
string(c.GetQOSClass()),
amount,
nodes,
types,
)
}
zone, updates, err = p.memAllocator.Allocate(req)
} else {

zone, updates, err = p.memAllocator.Realloc(c.GetID(), nodes, types)
}

if err != nil {
log.Error("allocMem: falling back to %s, failed to allocate memory for %s: %v",
nodes, c.PrettyName(), err)
return nodes
}

for oID, oz := range updates {
if oc, ok := p.cch.LookupContainer(oID); ok {
oc.SetCpusetMems(oz.MemsetString())
}
}

return zone
}

func parseIDSet(mems string) (idset.IDSet, error) {
cset, err := cpuset.Parse(mems)
if err != nil {
return idset.NewIDSet(), err
}
return idset.NewIDSet(cset.List()...), nil
}

func getMemoryLimit(c cache.Container) int64 {
res, ok := c.GetResourceUpdates()
if !ok {
res = c.GetResourceRequirements()
}

if limit, ok := res.Limits[corev1.ResourceMemory]; ok {
return limit.Value()
}

return 0
}

// balloonsError formats an error from this policy.
Expand Down
89 changes: 54 additions & 35 deletions cmd/plugins/topology-aware/policy/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ package topologyaware

import (
"encoding/json"
"errors"
"time"

"github.com/containers/nri-plugins/pkg/resmgr/cache"
libmem "github.com/containers/nri-plugins/pkg/resmgr/lib/memory"
"github.com/containers/nri-plugins/pkg/utils/cpuset"
idset "github.com/intel/goresctrl/pkg/utils"
)

const (
Expand Down Expand Up @@ -60,29 +61,57 @@ func (p *policy) restoreAllocations(allocations *allocations) error {

// reinstateGrants tries to restore the given grants exactly as such.
func (p *policy) reinstateGrants(grants map[string]Grant) error {
// TODO(klihub):
// Our grant reinstating is now too simplistic. restoreMemOffer
// blindly assumes it can take offers for known containers. But
// during reconfiguration (as opposed to restarts) we already
// have existing allocations for known containers, so we can't
// ask for offers for them. We could try to Reallocate() here,
// but the later parts on these code paths expect offers, not
// container zone updates. So the simplest for now is to just
// release memory for all grants... we'll then ask for offers
// for them with affinity to the zone they had been allocated
// to...

for id, grant := range grants {
if err := p.releaseMem(id); err != nil && !errors.Is(err, libmem.ErrUnknownRequest) {
log.Error("failed to release memory for grant %s: %v", grant, err)
}
}

for id, grant := range grants {
c := grant.GetContainer()

pool := grant.GetCPUNode()
supply := pool.FreeSupply()

if err := supply.Reserve(grant); err != nil {
return policyError("failed to update pool %q with CPU grant of %q: %v",
o, err := p.restoreMemOffer(grant)
if err != nil {
return policyError("failed to get libmem offer for pool %q, grant of %s: %w",
pool.Name(), c.PrettyName(), err)
}

log.Info("updated pool %q with reinstated CPU grant of %q",
pool.Name(), c.PrettyName())

pool = grant.GetMemoryNode()
if err := supply.ReserveMemory(grant); err != nil {
grant.GetCPUNode().FreeSupply().ReleaseCPU(grant)
return policyError("failed to update pool %q with extra memory of %q: %v",
updates, err := supply.Reserve(grant, o)
if err != nil {
return policyError("failed to update pool %q with CPU grant of %q: %v",
pool.Name(), c.PrettyName(), err)
}

log.Info("updated pool %q with reinstanted memory reservation of %q",
pool.Name(), c.PrettyName())
for uID, uZone := range updates {
if ug, ok := p.allocations.grants[uID]; !ok {
log.Error("failed to update grant %s to memory zone to %s, grant not found",
uID, uZone)
} else {
ug.SetMemoryZone(uZone)
if opt.PinMemory {
ug.GetContainer().SetCpusetMems(uZone.MemsetString())
}
log.Info("updated grant %s to memory zone %s", uID, uZone)
}
}

log.Info("updated pool %q with reinstated CPU grant of %q, memory zone %s",
pool.Name(), c.PrettyName(), grant.GetMemoryZone())

p.allocations.grants[id] = grant
p.applyGrant(grant)
Expand All @@ -94,16 +123,15 @@ func (p *policy) reinstateGrants(grants map[string]Grant) error {
}

type cachedGrant struct {
Exclusive string
Part int
CPUType cpuClass
Container string
Pool string
MemoryPool string
MemType memoryType
Memset idset.IDSet
MemoryLimit memoryMap
ColdStart time.Duration
Exclusive string
Part int
CPUType cpuClass
Container string
Pool string
MemoryPool libmem.NodeMask
MemType memoryType
MemSize int64
ColdStart time.Duration
}

func newCachedGrant(cg Grant) *cachedGrant {
Expand All @@ -113,15 +141,9 @@ func newCachedGrant(cg Grant) *cachedGrant {
ccg.CPUType = cg.CPUType()
ccg.Container = cg.GetContainer().GetID()
ccg.Pool = cg.GetCPUNode().Name()
ccg.MemoryPool = cg.GetMemoryNode().Name()
ccg.MemoryPool = cg.GetMemoryZone()
ccg.MemType = cg.MemoryType()
ccg.Memset = cg.Memset().Clone()

ccg.MemoryLimit = make(memoryMap)
for key, value := range cg.MemLimit() {
ccg.MemoryLimit[key] = value
}

ccg.MemSize = cg.GetMemorySize()
ccg.ColdStart = cg.ColdStart()

return ccg
Expand All @@ -144,14 +166,11 @@ func (ccg *cachedGrant) ToGrant(policy *policy) (Grant, error) {
cpuset.MustParse(ccg.Exclusive),
ccg.Part,
ccg.MemType,
ccg.MemoryLimit,
ccg.ColdStart,
)

if g.Memset().String() != ccg.Memset.String() {
log.Error("cache error: mismatch in stored/recalculated memset: %s != %s",
ccg.Memset, g.Memset())
}
g.SetMemoryZone(ccg.MemoryPool)
g.SetMemorySize(ccg.MemSize)

return g, nil
}
Expand Down
8 changes: 4 additions & 4 deletions cmd/plugins/topology-aware/policy/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ func TestAllocationMarshalling(t *testing.T) {
}{
{
name: "non-zero Exclusive",
data: []byte(`{"key1":{"Exclusive":"1","Part":1,"CPUType":0,"Container":"1","Pool":"testnode","MemoryPool":"testnode","MemType":"DRAM,PMEM,HBM","Memset":"","MemoryLimit":{},"ColdStart":0}}`),
data: []byte(`{"key1":{"Exclusive":"1","Part":1,"CPUType":0,"Container":"1","Pool":"testnode","MemoryPool":0,"MemType":"DRAM,PMEM,HBM","MemSize":0,"ColdStart":0}}`),
},
{
name: "zero Exclusive",
data: []byte(`{"key1":{"Exclusive":"","Part":1,"CPUType":0,"Container":"1","Pool":"testnode","MemoryPool":"testnode","MemType":"DRAM,PMEM,HBM","Memset":"","MemoryLimit":{},"ColdStart":0}}`),
data: []byte(`{"key1":{"Exclusive":"","Part":1,"CPUType":0,"Container":"1","Pool":"testnode","MemoryPool":0,"MemType":"DRAM,PMEM,HBM","MemSize":0,"ColdStart":0}}`),
},
}
for _, tc := range tcases {
Expand All @@ -104,8 +104,8 @@ func TestAllocationMarshalling(t *testing.T) {
node: node{
name: "testnode",
kind: UnknownNode,
noderes: newSupply(&node{}, cpuset.New(), cpuset.New(), cpuset.New(), 0, 0, createMemoryMap(0, 0, 0), createMemoryMap(0, 0, 0)),
freeres: newSupply(&node{}, cpuset.New(), cpuset.New(), cpuset.New(), 0, 0, createMemoryMap(0, 0, 0), createMemoryMap(0, 0, 0)),
noderes: newSupply(&node{}, cpuset.New(), cpuset.New(), cpuset.New(), 0, 0),
freeres: newSupply(&node{}, cpuset.New(), cpuset.New(), cpuset.New(), 0, 0),
},
},
},
Expand Down
10 changes: 8 additions & 2 deletions cmd/plugins/topology-aware/policy/coldstart.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/containers/nri-plugins/pkg/resmgr/cache"
"github.com/containers/nri-plugins/pkg/resmgr/events"
libmem "github.com/containers/nri-plugins/pkg/resmgr/lib/memory"
)

// trigger cold start for the container if necessary.
Expand Down Expand Up @@ -63,8 +64,13 @@ func (p *policy) finishColdStart(c cache.Container) (bool, error) {
return false, policyError("coldstart: no grant found for %s", c.PrettyName())
}

log.Info("restoring memset to grant %v", g)
g.RestoreMemset()
log.Info("reallocating %s after coldstart", g)
err := g.ReallocMemory(p.memZoneType(g.GetMemoryZone()) | libmem.TypeMaskDRAM)
if err != nil {
log.Error("failed to reallocate %s after coldstart: %v", g, err)
} else {
log.Info("reallocated %s", g)
}
g.ClearTimer()

return true, nil
Expand Down
Loading