Skip to content

Commit

Permalink
balloons: add PreferSpreadOnPhysicalCores
Browse files Browse the repository at this point in the history
- Enable using different CPU allocators in different balloon types. So
  far the same CPU allocator was used with the same options for all
  CPU allocations in the policy.
- Add new PreferSpreadOnPhysicalCores option to toggle CPU allocations
  packing or spreading on physical CPU cores. This option is specific
  to balloon type with a policy level default.
- Make existing AllocatorTopologyBalancing option specific to balloon
  type, too.

Signed-off-by: Antti Kervinen <[email protected]>
  • Loading branch information
askervin authored and klihub committed Oct 17, 2023
1 parent 543a642 commit ea841f6
Show file tree
Hide file tree
Showing 7 changed files with 434 additions and 31 deletions.
43 changes: 32 additions & 11 deletions cmd/plugins/balloons/policy/balloons-policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ type Balloon struct {
// - len(PodIDs) is the number of pods in the balloon.
// - len(PodIDs[podID]) is the number of containers of podID
// currently assigned to the balloon.
PodIDs map[string][]string
PodIDs map[string][]string
cpuTreeAllocator *cpuTreeAllocator
}

var log logger.Logger = logger.NewLogger("policy")
Expand Down Expand Up @@ -540,14 +541,32 @@ func (p *balloons) newBalloon(blnDef *BalloonDef, confCpus bool) (*Balloon, erro
break
}
}
// Configure new cpuTreeAllocator for this balloon if there
// are type specific allocator options, otherwise use policy
// default allocator.
cpuTreeAllocator := p.cpuTreeAllocator
if blnDef.AllocatorTopologyBalancing != nil || blnDef.PreferSpreadOnPhysicalCores != nil {
allocatorOptions := cpuTreeAllocatorOptions{
topologyBalancing: p.bpoptions.AllocatorTopologyBalancing,
preferSpreadOnPhysicalCores: p.bpoptions.PreferSpreadOnPhysicalCores,
}
if blnDef.AllocatorTopologyBalancing != nil {
allocatorOptions.topologyBalancing = *blnDef.AllocatorTopologyBalancing
}
if blnDef.PreferSpreadOnPhysicalCores != nil {
allocatorOptions.preferSpreadOnPhysicalCores = *blnDef.PreferSpreadOnPhysicalCores
}
cpuTreeAllocator = p.cpuTree.NewAllocator(allocatorOptions)
}

// Allocate CPUs
if blnDef == p.reservedBalloonDef ||
(blnDef == p.defaultBalloonDef && blnDef.MinCpus == 0 && blnDef.MaxCpus == 0) {
// The reserved balloon uses ReservedResources CPUs.
// So does the default balloon unless its CPU counts are tweaked.
cpus = p.reserved
} else {
addFromCpus, _, err := p.cpuTreeAllocator.ResizeCpus(cpuset.New(), p.freeCpus, blnDef.MinCpus)
addFromCpus, _, err := cpuTreeAllocator.ResizeCpus(cpuset.New(), p.freeCpus, blnDef.MinCpus)
if err != nil {
return nil, balloonsError("failed to choose a cpuset for allocating first %d CPUs from %#s", blnDef.MinCpus, p.freeCpus)
}
Expand All @@ -558,12 +577,13 @@ func (p *balloons) newBalloon(blnDef *BalloonDef, confCpus bool) (*Balloon, erro
p.freeCpus = p.freeCpus.Difference(cpus)
}
bln := &Balloon{
Def: blnDef,
Instance: freeInstance,
PodIDs: make(map[string][]string),
Cpus: cpus,
SharedIdleCpus: cpuset.New(),
Mems: p.closestMems(cpus),
Def: blnDef,
Instance: freeInstance,
PodIDs: make(map[string][]string),
Cpus: cpus,
SharedIdleCpus: cpuset.New(),
Mems: p.closestMems(cpus),
cpuTreeAllocator: cpuTreeAllocator,
}
if confCpus {
if err = p.useCpuClass(bln); err != nil {
Expand Down Expand Up @@ -1042,7 +1062,8 @@ func (p *balloons) setConfig(bpoptions *BalloonsOptions) error {
p.freeCpus = p.allowed.Clone()
p.freeCpus = p.freeCpus.Difference(p.reserved)
p.cpuTreeAllocator = p.cpuTree.NewAllocator(cpuTreeAllocatorOptions{
topologyBalancing: bpoptions.AllocatorTopologyBalancing,
topologyBalancing: bpoptions.AllocatorTopologyBalancing,
preferSpreadOnPhysicalCores: bpoptions.PreferSpreadOnPhysicalCores,
})
// Instantiate built-in reserved and default balloons.
reservedBalloon, err := p.newBalloon(p.reservedBalloonDef, false)
Expand Down Expand Up @@ -1149,7 +1170,7 @@ func (p *balloons) resizeBalloon(bln *Balloon, newMilliCpus int) error {
defer p.useCpuClass(bln)
if cpuCountDelta > 0 {
// Inflate the balloon.
addFromCpus, _, err := p.cpuTreeAllocator.ResizeCpus(bln.Cpus, p.freeCpus, cpuCountDelta)
addFromCpus, _, err := bln.cpuTreeAllocator.ResizeCpus(bln.Cpus, p.freeCpus, cpuCountDelta)
if err != nil {
return balloonsError("resize/inflate: failed to choose a cpuset for allocating additional %d CPUs: %w", cpuCountDelta, err)
}
Expand All @@ -1163,7 +1184,7 @@ func (p *balloons) resizeBalloon(bln *Balloon, newMilliCpus int) error {
p.updatePinning(p.shareIdleCpus(p.freeCpus, newCpus)...)
} else {
// Deflate the balloon.
_, removeFromCpus, err := p.cpuTreeAllocator.ResizeCpus(bln.Cpus, p.freeCpus, cpuCountDelta)
_, removeFromCpus, err := bln.cpuTreeAllocator.ResizeCpus(bln.Cpus, p.freeCpus, cpuCountDelta)
if err != nil {
return balloonsError("resize/deflate: failed to choose a cpuset for releasing %d CPUs: %w", -cpuCountDelta, err)
}
Expand Down
182 changes: 180 additions & 2 deletions cmd/plugins/balloons/policy/cputree.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ type cpuTreeAllocatorOptions struct {
// topologyBalancing true prefers allocating from branches
// with most free CPUs (spread allocations), while false is
// the opposite (packed allocations).
topologyBalancing bool
topologyBalancing bool
preferSpreadOnPhysicalCores bool
}

// Strings returns topology level as a string
Expand Down Expand Up @@ -131,6 +132,19 @@ func (t *cpuTreeNode) String() string {
return fmt.Sprintf("%s%v", t.name, t.children)
}

func (t *cpuTreeNode) PrettyPrint() string {
origDepth := t.Depth()
lines := []string{}
t.DepthFirstWalk(func(tn *cpuTreeNode) error {
lines = append(lines,
fmt.Sprintf("%s%s: %q cpus: %s",
strings.Repeat(" ", (tn.Depth()-origDepth)*4),
tn.level, tn.name, tn.cpus))
return nil
})
return strings.Join(lines, "\n")
}

// String returns cpuTreeNodeAttributes as a string.
func (tna cpuTreeNodeAttributes) String() string {
return fmt.Sprintf("%s{%d,%v,%d,%d}", tna.t.name, tna.depth,
Expand All @@ -146,6 +160,34 @@ func NewCpuTree(name string) *cpuTreeNode {
}
}

func (t *cpuTreeNode) CopyTree() *cpuTreeNode {
newNode := t.CopyNode()
newNode.children = make([]*cpuTreeNode, 0, len(t.children))
for _, child := range t.children {
newNode.AddChild(child.CopyTree())
}
return newNode
}

func (t *cpuTreeNode) CopyNode() *cpuTreeNode {
newNode := cpuTreeNode{
name: t.name,
level: t.level,
parent: t.parent,
children: t.children,
cpus: t.cpus,
}
return &newNode
}

// Depth returns the distance from the root node.
func (t *cpuTreeNode) Depth() int {
if t.parent == nil {
return 0
}
return t.parent.Depth() + 1
}

// AddChild adds new child node to a CPU tree node.
func (t *cpuTreeNode) AddChild(child *cpuTreeNode) {
child.parent = t
Expand All @@ -165,6 +207,38 @@ func (t *cpuTreeNode) Cpus() cpuset.CPUSet {
return t.cpus
}

// SiblingIndex returns the index of this node among its parents
// children. Returns -1 for the root node, -2 if this node is not
// listed among the children of its parent.
func (t *cpuTreeNode) SiblingIndex() int {
if t.parent == nil {
return -1
}
for idx, child := range t.parent.children {
if child == t {
return idx
}
}
return -2
}

func (t *cpuTreeNode) FindLeafWithCpu(cpu int) *cpuTreeNode {
var found *cpuTreeNode
t.DepthFirstWalk(func(tn *cpuTreeNode) error {
if len(tn.children) > 0 {
return nil
}
for _, cpuHere := range tn.cpus.List() {
if cpu == cpuHere {
found = tn
return WalkStop
}
}
return nil // not found here, no more children to search
})
return found
}

// WalkSkipChildren error returned from a DepthFirstWalk handler
// prevents walking deeper in the tree. The caller of the
// DepthFirstWalk will get no error.
Expand Down Expand Up @@ -236,13 +310,18 @@ func NewCpuTreeFromSystem() (*cpuTreeNode, error) {
nodeTree.level = CPUTopologyLevelNuma
dieTree.AddChild(nodeTree)
node := sys.Node(nodeID)
threadsSeen := map[int]struct{}{}
for _, cpuID := range node.CPUSet().List() {
if _, alreadySeen := threadsSeen[cpuID]; alreadySeen {
continue
}
cpuTree := NewCpuTree(fmt.Sprintf("p%dd%dn%dcpu%d", packageID, dieID, nodeID, cpuID))

cpuTree.level = CPUTopologyLevelCore
nodeTree.AddChild(cpuTree)
cpu := sys.CPU(cpuID)
for _, threadID := range cpu.ThreadCPUSet().List() {
threadsSeen[threadID] = struct{}{}
threadTree := NewCpuTree(fmt.Sprintf("p%dd%dn%dcpu%dt%d", packageID, dieID, nodeID, cpuID, threadID))
threadTree.level = CPUTopologyLevelThread
cpuTree.AddChild(threadTree)
Expand Down Expand Up @@ -312,13 +391,83 @@ func (t *cpuTreeNode) toAttributedSlice(
}
}

// SplitLevel returns the root node of a new CPU tree where all
// branches of a topology level have been split into new classes.
func (t *cpuTreeNode) SplitLevel(splitLevel CPUTopologyLevel, cpuClassifier func(int) int) *cpuTreeNode {
newRoot := t.CopyTree()
newRoot.DepthFirstWalk(func(tn *cpuTreeNode) error {
// Dive into the level that will be split.
if tn.level != splitLevel {
return nil
}
// Classify CPUs to the map: class -> list of cpus
classCpus := map[int][]int{}
for _, cpu := range t.cpus.List() {
class := cpuClassifier(cpu)
classCpus[class] = append(classCpus[class], cpu)
}
// Clear existing children of this node. New children
// will be classes whose children are masked versions
// of original children of this node.
origChildren := tn.children
tn.children = make([]*cpuTreeNode, 0, len(classCpus))
// Add new child corresponding each class.
for class, cpus := range classCpus {
cpuMask := cpuset.New(cpus...)
newNode := NewCpuTree(fmt.Sprintf("%sclass%d", tn.name, class))
tn.AddChild(newNode)
newNode.cpus = tn.cpus.Intersection(cpuMask)
newNode.level = tn.level
newNode.parent = tn
for _, child := range origChildren {
newChild := child.CopyTree()
newChild.DepthFirstWalk(func(cn *cpuTreeNode) error {
cn.cpus = cn.cpus.Intersection(cpuMask)
if cn.cpus.Size() == 0 && cn.parent != nil {
// all cpus masked
// out: cut out this
// branch
newSiblings := []*cpuTreeNode{}
for _, child := range cn.parent.children {
if child != cn {
newSiblings = append(newSiblings, child)
}
}
cn.parent.children = newSiblings
return WalkSkipChildren
}
return nil
})
newNode.AddChild(newChild)
}
}
return WalkSkipChildren
})
return newRoot
}

// NewAllocator returns new CPU allocator for allocating CPUs from a
// CPU tree branch.
func (t *cpuTreeNode) NewAllocator(options cpuTreeAllocatorOptions) *cpuTreeAllocator {
ta := &cpuTreeAllocator{
root: t,
options: options,
}
if options.preferSpreadOnPhysicalCores {
newTree := t.SplitLevel(CPUTopologyLevelNuma,
// CPU classifier: class of the CPU equals to
// the index in the child list of its parent
// node in the tree. Expect leaf node is a
// hyperthread, parent a physical core.
func(cpu int) int {
leaf := t.FindLeafWithCpu(cpu)
if leaf == nil {
log.Fatalf("SplitLevel CPU classifier: cpu %d not in tree:\n%s\n\n", cpu, t.PrettyPrint())
}
return leaf.SiblingIndex()
})
ta.root = newTree
}
return ta
}

Expand Down Expand Up @@ -409,7 +558,36 @@ func (ta *cpuTreeAllocator) sorterRelease(tnas []cpuTreeNodeAttributes) func(int
// abs(delta) CPUs can be freed.
func (ta *cpuTreeAllocator) ResizeCpus(currentCpus, freeCpus cpuset.CPUSet, delta int) (cpuset.CPUSet, cpuset.CPUSet, error) {
if delta > 0 {
return ta.resizeCpus(currentCpus, freeCpus, delta)
addFromSuperset, removeFromSuperset, err := ta.resizeCpus(currentCpus, freeCpus, delta)
if !ta.options.preferSpreadOnPhysicalCores || addFromSuperset.Size() == delta {
return addFromSuperset, removeFromSuperset, err
}
// addFromSuperset contains more CPUs (equally good
// choices) than actually needed. In case of
// preferSpreadOnPhysicalCores, however, selecting any
// of these does not result in equally good
// result. Therefore, in this case, construct addFrom
// set by adding one CPU at a time.
addFrom := cpuset.New()
for n := 0; n < delta; n++ {
addSingleFrom, _, err := ta.resizeCpus(currentCpus, freeCpus, 1)
if err != nil {
return addFromSuperset, removeFromSuperset, err
}
if addSingleFrom.Size() != 1 {
return addFromSuperset, removeFromSuperset, fmt.Errorf("internal error: failed to find single CPU to allocate, "+
"currentCpus=%s freeCpus=%s expectedSingle=%s",
currentCpus, freeCpus, addSingleFrom)
}
addFrom = addFrom.Union(addSingleFrom)
if addFrom.Size() != n+1 {
return addFromSuperset, removeFromSuperset, fmt.Errorf("internal error: double add the same CPU (%s) to cpuset %s on round %d",
addSingleFrom, addFrom, n+1)
}
currentCpus = currentCpus.Union(addSingleFrom)
freeCpus = freeCpus.Difference(addSingleFrom)
}
return addFrom, removeFromSuperset, nil
}
// In multi-CPU removal, remove CPUs one by one instead of
// trying to find a single topology element from which all of
Expand Down
Loading

0 comments on commit ea841f6

Please sign in to comment.