Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add preferCloseToDevices balloon type option #203

Merged
merged 5 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion cmd/plugins/balloons/policy/balloons-policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,10 +568,12 @@ func (p *balloons) newBalloon(blnDef *BalloonDef, confCpus bool) (*Balloon, erro
// are type specific allocator options, otherwise use policy
// default allocator.
cpuTreeAllocator := p.cpuTreeAllocator
if blnDef.AllocatorTopologyBalancing != nil || blnDef.PreferSpreadOnPhysicalCores != nil {
if blnDef.AllocatorTopologyBalancing != nil || blnDef.PreferSpreadOnPhysicalCores != nil || len(blnDef.PreferCloseToDevices) > 0 || len(blnDef.PreferFarFromDevices) > 0 {
allocatorOptions := cpuTreeAllocatorOptions{
topologyBalancing: p.bpoptions.AllocatorTopologyBalancing,
preferSpreadOnPhysicalCores: p.bpoptions.PreferSpreadOnPhysicalCores,
preferCloseToDevices: blnDef.PreferCloseToDevices,
preferFarFromDevices: blnDef.PreferFarFromDevices,
}
if blnDef.AllocatorTopologyBalancing != nil {
allocatorOptions.topologyBalancing = *blnDef.AllocatorTopologyBalancing
Expand Down Expand Up @@ -1091,6 +1093,8 @@ func (p *balloons) setConfig(bpoptions *BalloonsOptions) error {
p.balloons = []*Balloon{}
p.freeCpus = p.allowed.Clone()
p.freeCpus = p.freeCpus.Difference(p.reserved)
p.fillFarFromDevices(bpoptions.BalloonDefs)

p.cpuTreeAllocator = p.cpuTree.NewAllocator(cpuTreeAllocatorOptions{
topologyBalancing: bpoptions.AllocatorTopologyBalancing,
preferSpreadOnPhysicalCores: bpoptions.PreferSpreadOnPhysicalCores,
Expand Down Expand Up @@ -1146,6 +1150,39 @@ func (p *balloons) setConfig(bpoptions *BalloonsOptions) error {
return nil
}

// fillFarFromDevices adds BalloonDefs implicit device anti-affinities
// towards devices that other BalloonDefs prefer to be close to.
func (p *balloons) fillFarFromDevices(blnDefs []*BalloonDef) {
// devDefClose[device][blnDef.Name] equals true if and
// only if the blnDef prefers to be close to the device.
devDefClose := map[string]map[string]bool{}
// avoidDevs is a list of devices for which at least one
// balloon type prefers to be close to. The order of devices
// in the avoidDevs list is significant: devices in the
// beginning of the list will be more effectively avoided than
// devices later in the list.
avoidDevs := []string{}
for _, blnDef := range blnDefs {
for _, closeDev := range blnDef.PreferCloseToDevices {
if _, ok := devDefClose[closeDev]; !ok {
avoidDevs = append(avoidDevs, closeDev)
devDefClose[closeDev] = map[string]bool{}
}
devDefClose[closeDev][blnDef.Name] = true
}
}
// Add every device in avoidDev to PreferFarFromDevices lists
// of those balloon types that do not prefer to be close to
// the device.
for _, avoidDev := range avoidDevs {
for _, blnDef := range blnDefs {
if !devDefClose[avoidDev][blnDef.Name] {
blnDef.PreferFarFromDevices = append(blnDef.PreferFarFromDevices, avoidDev)
}
}
}
}

// closestMems returns memory node IDs good for pinning containers
// that run on given CPUs
func (p *balloons) closestMems(cpus cpuset.CPUSet) idset.IDSet {
Expand Down
189 changes: 180 additions & 9 deletions cmd/plugins/balloons/policy/cputree.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"

system "github.com/containers/nri-plugins/pkg/sysfs"
"github.com/containers/nri-plugins/pkg/topology"
"github.com/containers/nri-plugins/pkg/utils/cpuset"
)

Expand Down Expand Up @@ -55,8 +56,9 @@ type cpuTreeNodeAttributes struct {
// cpuTreeAllocator allocates CPUs from the branch of a CPU tree
// where the "root" node is the topmost CPU of the branch.
type cpuTreeAllocator struct {
options cpuTreeAllocatorOptions
root *cpuTreeNode
options cpuTreeAllocatorOptions
root *cpuTreeNode
cacheCloseCpuSets map[string][]cpuset.CPUSet
}

// cpuTreeAllocatorOptions contains parameters for the CPU allocator
Expand All @@ -67,8 +69,12 @@ type cpuTreeAllocatorOptions struct {
// the opposite (packed allocations).
topologyBalancing bool
preferSpreadOnPhysicalCores bool
preferCloseToDevices []string
preferFarFromDevices []string
}

var emptyCpuSet = cpuset.New()

// String returns string representation of a CPU tree node.
func (t *cpuTreeNode) String() string {
if len(t.children) == 0 {
Expand Down Expand Up @@ -395,8 +401,9 @@ func (t *cpuTreeNode) SplitLevel(splitLevel CPUTopologyLevel, cpuClassifier func
// CPU tree branch.
func (t *cpuTreeNode) NewAllocator(options cpuTreeAllocatorOptions) *cpuTreeAllocator {
ta := &cpuTreeAllocator{
root: t,
options: options,
root: t,
options: options,
cacheCloseCpuSets: map[string][]cpuset.CPUSet{},
}
if options.preferSpreadOnPhysicalCores {
newTree := t.SplitLevel(CPUTopologyLevelNuma,
Expand Down Expand Up @@ -502,8 +509,172 @@ func (ta *cpuTreeAllocator) sorterRelease(tnas []cpuTreeNodeAttributes) func(int
// - removeFromCpus contains CPUs in currentCpus set from which
// abs(delta) CPUs can be freed.
func (ta *cpuTreeAllocator) ResizeCpus(currentCpus, freeCpus cpuset.CPUSet, delta int) (cpuset.CPUSet, cpuset.CPUSet, error) {
resizers := []cpuResizerFunc{
ta.resizeCpusOnlyIfNecessary,
ta.resizeCpusWithDevices,
ta.resizeCpusOneAtATime,
ta.resizeCpusMaxLocalSet,
ta.resizeCpusNow}
return ta.nextCpuResizer(resizers, currentCpus, freeCpus, delta)
}

type cpuResizerFunc func(resizers []cpuResizerFunc, currentCpus, freeCpus cpuset.CPUSet, delta int) (cpuset.CPUSet, cpuset.CPUSet, error)

func (ta *cpuTreeAllocator) nextCpuResizer(resizers []cpuResizerFunc, currentCpus, freeCpus cpuset.CPUSet, delta int) (cpuset.CPUSet, cpuset.CPUSet, error) {
if len(resizers) == 0 {
return freeCpus, currentCpus, fmt.Errorf("internal error: a CPU resizer consulted next resizer but there was no one left")
}
remainingResizers := resizers[1:]
log.Debugf("- resizer-%d(%q, %q, %d)", len(remainingResizers), currentCpus, freeCpus, delta)
addFrom, removeFrom, err := resizers[0](remainingResizers, currentCpus, freeCpus, delta)
return addFrom, removeFrom, err
}

// resizeCpusNow does not call next resizer. Instead it keeps all CPU
// allocations from freeCpus and CPU releases from currentCpus equally
// good. This is the terminal block of resizers chain.
func (ta *cpuTreeAllocator) resizeCpusNow(resizers []cpuResizerFunc, currentCpus, freeCpus cpuset.CPUSet, delta int) (cpuset.CPUSet, cpuset.CPUSet, error) {
return freeCpus, currentCpus, nil
}

// resizeCpusOnlyIfNecessary is the fast path for making trivial
// reservations and to fail if resizing is not possible.
func (ta *cpuTreeAllocator) resizeCpusOnlyIfNecessary(resizers []cpuResizerFunc, currentCpus, freeCpus cpuset.CPUSet, delta int) (cpuset.CPUSet, cpuset.CPUSet, error) {
switch {
case delta == 0:
// Nothing to do.
return emptyCpuSet, emptyCpuSet, nil
case delta > 0:
if freeCpus.Size() < delta {
return freeCpus, emptyCpuSet, fmt.Errorf("not enough free CPUs (%d) to resize current CPU set from %d to %d CPUs", freeCpus.Size(), currentCpus.Size(), currentCpus.Size()+delta)
} else if freeCpus.Size() == delta {
// Allocate all the remaining free CPUs.
return freeCpus, emptyCpuSet, nil
}
case delta < 0:
if currentCpus.Size() < -delta {
return emptyCpuSet, currentCpus, fmt.Errorf("not enough current CPUs (%d) to release %d CPUs", currentCpus.Size(), -delta)
} else if currentCpus.Size() == -delta {
// Free all allocated CPUs.
return emptyCpuSet, currentCpus, nil
}
}
return ta.nextCpuResizer(resizers, currentCpus, freeCpus, delta)
}

// resizeCpusWithDevices prefers allocating CPUs from those freeCpus
// that are topologically close to preferred devices, and releasing
// those currentCpus that are not.
func (ta *cpuTreeAllocator) resizeCpusWithDevices(resizers []cpuResizerFunc, currentCpus, freeCpus cpuset.CPUSet, delta int) (cpuset.CPUSet, cpuset.CPUSet, error) {
// allCloseCpuSets contains cpusets in the order of priority.
// Applying the first cpusets in it are prioritized over ones
// after them.
allCloseCpuSets := [][]cpuset.CPUSet{}
for _, devPath := range ta.options.preferCloseToDevices {
if closeCpuSets := ta.topologyHintCpus(devPath); len(closeCpuSets) > 0 {
allCloseCpuSets = append(allCloseCpuSets, closeCpuSets)
}
}
for _, devPath := range ta.options.preferFarFromDevices {
for _, farCpuSet := range ta.topologyHintCpus(devPath) {
allCloseCpuSets = append(allCloseCpuSets, []cpuset.CPUSet{freeCpus.Difference(farCpuSet)})
}
}
if len(allCloseCpuSets) == 0 {
return ta.nextCpuResizer(resizers, currentCpus, freeCpus, delta)
}
if delta > 0 {
// Allocate N=delta CPUs from freeCpus based on topology hints.
// Build a new set of freeCpus with at least N CPUs based on
// intersection with CPU hints.
// In case of conflicting topology hints the first
// hints in the list are the most important.
remainingFreeCpus := freeCpus
appliedHints := 0
totalHints := 0
for _, closeCpuSets := range allCloseCpuSets {
for _, cpus := range closeCpuSets {
totalHints++
newRemainingFreeCpus := remainingFreeCpus.Intersection(cpus)
if newRemainingFreeCpus.Size() >= delta {
appliedHints++
log.Debugf(" - take hinted cpus %q, common free %q", cpus, newRemainingFreeCpus)
remainingFreeCpus = newRemainingFreeCpus
} else {
log.Debugf(" - drop hinted cpus %q, not enough common free in %q", cpus, newRemainingFreeCpus)
}
}
}
log.Debugf(" - original free cpus %q, took %d/%d hints, remaining free: %q",
freeCpus, appliedHints, totalHints, remainingFreeCpus)
return ta.nextCpuResizer(resizers, currentCpus, remainingFreeCpus, delta)
} else if delta < 0 {
// Free N=-delta CPUs from currentCpus based on topology hints.
// 1. Sort currentCpus based on topology hints (leastHintedCpus).
// 2. Pick largest hint value that has to be released (maxHints).
// 3. Free all CPUs that have a hint value smaller than maxHints.
// 4. Let next CPU resizer choose CPUs to be freed among
// CPUs with hint value maxHints.
currentCpuHints := map[int]uint64{}
for hintPriority, closeCpuSets := range allCloseCpuSets {
for _, cpus := range closeCpuSets {
for _, cpu := range cpus.Intersection(currentCpus).UnsortedList() {
currentCpuHints[cpu] += 1 << (len(allCloseCpuSets) - 1 - hintPriority)
}
}
}
leastHintedCpus := currentCpus.UnsortedList()
sort.Slice(leastHintedCpus, func(i, j int) bool {
return currentCpuHints[leastHintedCpus[i]] < currentCpuHints[leastHintedCpus[j]]
})
maxHints := currentCpuHints[leastHintedCpus[-delta]]
currentToFreeForSure := cpuset.New()
currentToFreeMaybe := cpuset.New()
for i := 0; i < len(leastHintedCpus) && currentCpuHints[leastHintedCpus[i]] <= maxHints; i++ {
if currentCpuHints[leastHintedCpus[i]] < maxHints {
currentToFreeForSure = currentToFreeForSure.Union(cpuset.New(leastHintedCpus[i]))
} else {
currentToFreeMaybe = currentToFreeMaybe.Union(cpuset.New(leastHintedCpus[i]))
}
}
remainingDelta := delta + currentToFreeForSure.Size()
log.Debugf(" - device hints: from cpus %q: free for sure: %q and %d more from: %q",
currentCpus, currentToFreeForSure, -remainingDelta, currentToFreeMaybe)
_, freeFromMaybe, err := ta.nextCpuResizer(resizers, currentToFreeMaybe, freeCpus, remainingDelta)
// Do not include possible extra CPUs from
// freeFromMaybe to make sure that all CPUs with least
// hints will be freed.
for _, cpu := range freeFromMaybe.UnsortedList() {
if currentToFreeForSure.Size() >= -delta {
break
}
currentToFreeForSure = currentToFreeForSure.Union(cpuset.New(cpu))
}
return freeCpus, currentToFreeForSure, err
}
return freeCpus, currentCpus, nil
}

// Fetch cached topology hint, return error only once per bad dev
func (ta *cpuTreeAllocator) topologyHintCpus(dev string) []cpuset.CPUSet {
if closeCpuSets, ok := ta.cacheCloseCpuSets[dev]; ok {
return closeCpuSets
}
topologyHints, err := topology.NewTopologyHints(dev)
if err != nil {
log.Errorf("failed to find topology of device %q: %v", dev, err)
ta.cacheCloseCpuSets[dev] = []cpuset.CPUSet{}
} else {
for _, topologyHint := range topologyHints {
ta.cacheCloseCpuSets[dev] = append(ta.cacheCloseCpuSets[dev], cpuset.MustParse(topologyHint.CPUs))
}
}
return ta.cacheCloseCpuSets[dev]
}

func (ta *cpuTreeAllocator) resizeCpusOneAtATime(resizers []cpuResizerFunc, currentCpus, freeCpus cpuset.CPUSet, delta int) (cpuset.CPUSet, cpuset.CPUSet, error) {
if delta > 0 {
addFromSuperset, removeFromSuperset, err := ta.resizeCpus(currentCpus, freeCpus, delta)
addFromSuperset, removeFromSuperset, err := ta.nextCpuResizer(resizers, currentCpus, freeCpus, delta)
if !ta.options.preferSpreadOnPhysicalCores || addFromSuperset.Size() == delta {
return addFromSuperset, removeFromSuperset, err
}
Expand All @@ -515,7 +686,7 @@ func (ta *cpuTreeAllocator) ResizeCpus(currentCpus, freeCpus cpuset.CPUSet, delt
// set by adding one CPU at a time.
addFrom := cpuset.New()
for n := 0; n < delta; n++ {
addSingleFrom, _, err := ta.resizeCpus(currentCpus, freeCpus, 1)
addSingleFrom, _, err := ta.nextCpuResizer(resizers, currentCpus, freeCpus, 1)
if err != nil {
return addFromSuperset, removeFromSuperset, err
}
Expand All @@ -540,7 +711,7 @@ func (ta *cpuTreeAllocator) ResizeCpus(currentCpus, freeCpus cpuset.CPUSet, delt
removeFrom := cpuset.New()
addFrom := cpuset.New()
for n := 0; n < -delta; n++ {
_, removeSingleFrom, err := ta.resizeCpus(currentCpus, freeCpus, -1)
_, removeSingleFrom, err := ta.nextCpuResizer(resizers, currentCpus, freeCpus, -1)
if err != nil {
return addFrom, removeFrom, err
}
Expand All @@ -563,7 +734,7 @@ func (ta *cpuTreeAllocator) ResizeCpus(currentCpus, freeCpus cpuset.CPUSet, delt
return addFrom, removeFrom, nil
}

func (ta *cpuTreeAllocator) resizeCpus(currentCpus, freeCpus cpuset.CPUSet, delta int) (cpuset.CPUSet, cpuset.CPUSet, error) {
func (ta *cpuTreeAllocator) resizeCpusMaxLocalSet(resizers []cpuResizerFunc, currentCpus, freeCpus cpuset.CPUSet, delta int) (cpuset.CPUSet, cpuset.CPUSet, error) {
tnas := ta.root.ToAttributedSlice(currentCpus, freeCpus,
func(tna *cpuTreeNodeAttributes) bool {
// filter out branches with insufficient cpus
Expand All @@ -587,5 +758,5 @@ func (ta *cpuTreeAllocator) resizeCpus(currentCpus, freeCpus cpuset.CPUSet, delt
if len(tnas) == 0 {
return freeCpus, currentCpus, fmt.Errorf("not enough free CPUs")
}
return tnas[0].freeCpus, tnas[0].currentCpus, nil
return ta.nextCpuResizer(resizers, tnas[0].currentCpus, tnas[0].freeCpus, delta)
}
Loading