Skip to content

Commit

Permalink
Merge pull request kubernetes#28893 from wojtek-t/optimize_priorities_4
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue

More optimizations to scheduler throughput

Ref kubernetes#28590

@davidopp @gmarek
  • Loading branch information
k8s-merge-robot authored Jul 13, 2016
2 parents 7e6a856 + c929d95 commit 7823c57
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 61 deletions.
18 changes: 7 additions & 11 deletions plugin/pkg/scheduler/algorithm/predicates/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,22 +491,18 @@ func PodFitsResources(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.N
return true, nil
}

allocatable := node.Status.Allocatable
totalMilliCPU := allocatable.Cpu().MilliValue()
totalMemory := allocatable.Memory().Value()
totalNvidiaGPU := allocatable.NvidiaGPU().Value()

if totalMilliCPU < podRequest.milliCPU+nodeInfo.RequestedResource().MilliCPU {
allocatable := nodeInfo.AllocatableResource()
if allocatable.MilliCPU < podRequest.milliCPU+nodeInfo.RequestedResource().MilliCPU {
return false,
newInsufficientResourceError(cpuResourceName, podRequest.milliCPU, nodeInfo.RequestedResource().MilliCPU, totalMilliCPU)
newInsufficientResourceError(cpuResourceName, podRequest.milliCPU, nodeInfo.RequestedResource().MilliCPU, allocatable.MilliCPU)
}
if totalMemory < podRequest.memory+nodeInfo.RequestedResource().Memory {
if allocatable.Memory < podRequest.memory+nodeInfo.RequestedResource().Memory {
return false,
newInsufficientResourceError(memoryResourceName, podRequest.memory, nodeInfo.RequestedResource().Memory, totalMemory)
newInsufficientResourceError(memoryResourceName, podRequest.memory, nodeInfo.RequestedResource().Memory, allocatable.Memory)
}
if totalNvidiaGPU < podRequest.nvidiaGPU+nodeInfo.RequestedResource().NvidiaGPU {
if allocatable.NvidiaGPU < podRequest.nvidiaGPU+nodeInfo.RequestedResource().NvidiaGPU {
return false,
newInsufficientResourceError(nvidiaGpuResourceName, podRequest.nvidiaGPU, nodeInfo.RequestedResource().NvidiaGPU, totalNvidiaGPU)
newInsufficientResourceError(nvidiaGpuResourceName, podRequest.nvidiaGPU, nodeInfo.RequestedResource().NvidiaGPU, allocatable.NvidiaGPU)
}
if glog.V(10) {
// We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
Expand Down
49 changes: 20 additions & 29 deletions plugin/pkg/scheduler/algorithm/priorities/priorities.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,13 @@ import (
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)

type resources struct {
millicpu int64
memory int64
}

func getNonZeroRequests(pod *api.Pod) *resources {
result := &resources{}
func getNonZeroRequests(pod *api.Pod) *schedulercache.Resource {
result := &schedulercache.Resource{}
for i := range pod.Spec.Containers {
container := &pod.Spec.Containers[i]
cpu, memory := priorityutil.GetNonzeroRequests(&container.Resources.Requests)
result.millicpu += cpu
result.memory += memory
result.MilliCPU += cpu
result.Memory += memory
}
return result
}
Expand All @@ -61,24 +56,22 @@ func calculateScore(requested int64, capacity int64, node string) int64 {
// Calculate the resource occupancy on a node. 'node' has information about the resources on the node.
// 'pods' is a list of pods currently scheduled on the node.
// TODO: Use Node() from nodeInfo instead of passing it.
func calculateResourceOccupancy(pod *api.Pod, podRequests *resources, node *api.Node, nodeInfo *schedulercache.NodeInfo) schedulerapi.HostPriority {
capacityMilliCPU := node.Status.Allocatable.Cpu().MilliValue()
capacityMemory := node.Status.Allocatable.Memory().Value()

func calculateResourceOccupancy(pod *api.Pod, podRequests *schedulercache.Resource, node *api.Node, nodeInfo *schedulercache.NodeInfo) schedulerapi.HostPriority {
allocatableResources := nodeInfo.AllocatableResource()
totalResources := *podRequests
totalResources.millicpu += nodeInfo.NonZeroRequest().MilliCPU
totalResources.memory += nodeInfo.NonZeroRequest().Memory
totalResources.MilliCPU += nodeInfo.NonZeroRequest().MilliCPU
totalResources.Memory += nodeInfo.NonZeroRequest().Memory

cpuScore := calculateScore(totalResources.millicpu, capacityMilliCPU, node.Name)
memoryScore := calculateScore(totalResources.memory, capacityMemory, node.Name)
cpuScore := calculateScore(totalResources.MilliCPU, allocatableResources.MilliCPU, node.Name)
memoryScore := calculateScore(totalResources.Memory, allocatableResources.Memory, node.Name)
if glog.V(10) {
// We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
// not logged. There is visible performance gain from it.
glog.V(10).Infof(
"%v -> %v: Least Requested Priority, capacity %d millicores %d memory bytes, total request %d millicores %d memory bytes, score %d CPU %d memory",
pod.Name, node.Name,
capacityMilliCPU, capacityMemory,
totalResources.millicpu, totalResources.memory,
allocatableResources.MilliCPU, allocatableResources.Memory,
totalResources.MilliCPU, totalResources.Memory,
cpuScore, memoryScore,
)
}
Expand Down Expand Up @@ -243,16 +236,14 @@ func BalancedResourceAllocation(pod *api.Pod, nodeNameToInfo map[string]*schedul
}

// TODO: Use Node() from nodeInfo instead of passing it.
func calculateBalancedResourceAllocation(pod *api.Pod, podRequests *resources, node *api.Node, nodeInfo *schedulercache.NodeInfo) schedulerapi.HostPriority {
capacityMilliCPU := node.Status.Allocatable.Cpu().MilliValue()
capacityMemory := node.Status.Allocatable.Memory().Value()

func calculateBalancedResourceAllocation(pod *api.Pod, podRequests *schedulercache.Resource, node *api.Node, nodeInfo *schedulercache.NodeInfo) schedulerapi.HostPriority {
allocatableResources := nodeInfo.AllocatableResource()
totalResources := *podRequests
totalResources.millicpu += nodeInfo.NonZeroRequest().MilliCPU
totalResources.memory += nodeInfo.NonZeroRequest().Memory
totalResources.MilliCPU += nodeInfo.NonZeroRequest().MilliCPU
totalResources.Memory += nodeInfo.NonZeroRequest().Memory

cpuFraction := fractionOfCapacity(totalResources.millicpu, capacityMilliCPU)
memoryFraction := fractionOfCapacity(totalResources.memory, capacityMemory)
cpuFraction := fractionOfCapacity(totalResources.MilliCPU, allocatableResources.MilliCPU)
memoryFraction := fractionOfCapacity(totalResources.Memory, allocatableResources.Memory)
score := int(0)
if cpuFraction >= 1 || memoryFraction >= 1 {
// if requested >= capacity, the corresponding host should never be preferrred.
Expand All @@ -271,8 +262,8 @@ func calculateBalancedResourceAllocation(pod *api.Pod, podRequests *resources, n
glog.V(10).Infof(
"%v -> %v: Balanced Resource Allocation, capacity %d millicores %d memory bytes, total request %d millicores %d memory bytes, score %d",
pod.Name, node.Name,
capacityMilliCPU, capacityMemory,
totalResources.millicpu, totalResources.memory,
allocatableResources.MilliCPU, allocatableResources.Memory,
totalResources.MilliCPU, totalResources.Memory,
score,
)
}
Expand Down
18 changes: 18 additions & 0 deletions plugin/pkg/scheduler/algorithm/priorities/priorities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ func TestZeroRequest(t *testing.T) {
const expectedPriority int = 25
for _, test := range tests {
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods)
for _, node := range test.nodes {
if _, ok := nodeNameToInfo[node.Name]; !ok {
nodeNameToInfo[node.Name] = schedulercache.NewNodeInfo()
}
nodeNameToInfo[node.Name].SetNode(node)
}
list, err := scheduler.PrioritizeNodes(
test.pod,
nodeNameToInfo,
Expand Down Expand Up @@ -389,6 +395,12 @@ func TestLeastRequested(t *testing.T) {

for _, test := range tests {
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods)
for _, node := range test.nodes {
if _, ok := nodeNameToInfo[node.Name]; !ok {
nodeNameToInfo[node.Name] = schedulercache.NewNodeInfo()
}
nodeNameToInfo[node.Name].SetNode(node)
}
list, err := LeastRequestedPriority(test.pod, nodeNameToInfo, algorithm.FakeNodeLister(test.nodes))
if err != nil {
t.Errorf("unexpected error: %v", err)
Expand Down Expand Up @@ -722,6 +734,12 @@ func TestBalancedResourceAllocation(t *testing.T) {

for _, test := range tests {
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods)
for _, node := range test.nodes {
if _, ok := nodeNameToInfo[node.Name]; !ok {
nodeNameToInfo[node.Name] = schedulercache.NewNodeInfo()
}
nodeNameToInfo[node.Name].SetNode(node)
}
list, err := BalancedResourceAllocation(test.pod, nodeNameToInfo, algorithm.FakeNodeLister(test.nodes))
if err != nil {
t.Errorf("unexpected error: %v", err)
Expand Down
33 changes: 22 additions & 11 deletions plugin/pkg/scheduler/schedulercache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ func TestAssumePodScheduled(t *testing.T) {
MilliCPU: 100,
Memory: 500,
},
pods: []*api.Pod{testPods[0]},
allocatableResource: &Resource{},
pods: []*api.Pod{testPods[0]},
},
}, {
pods: []*api.Pod{testPods[1], testPods[2]},
Expand All @@ -77,7 +78,8 @@ func TestAssumePodScheduled(t *testing.T) {
MilliCPU: 300,
Memory: 1524,
},
pods: []*api.Pod{testPods[1], testPods[2]},
allocatableResource: &Resource{},
pods: []*api.Pod{testPods[1], testPods[2]},
},
}, { // test non-zero request
pods: []*api.Pod{testPods[3]},
Expand All @@ -90,7 +92,8 @@ func TestAssumePodScheduled(t *testing.T) {
MilliCPU: priorityutil.DefaultMilliCpuRequest,
Memory: priorityutil.DefaultMemoryRequest,
},
pods: []*api.Pod{testPods[3]},
allocatableResource: &Resource{},
pods: []*api.Pod{testPods[3]},
},
}}

Expand Down Expand Up @@ -147,7 +150,8 @@ func TestExpirePod(t *testing.T) {
MilliCPU: 200,
Memory: 1024,
},
pods: []*api.Pod{testPods[1]},
allocatableResource: &Resource{},
pods: []*api.Pod{testPods[1]},
},
}}

Expand Down Expand Up @@ -194,7 +198,8 @@ func TestAddPodWillConfirm(t *testing.T) {
MilliCPU: 100,
Memory: 500,
},
pods: []*api.Pod{testPods[0]},
allocatableResource: &Resource{},
pods: []*api.Pod{testPods[0]},
},
}}

Expand Down Expand Up @@ -237,7 +242,8 @@ func TestAddPodAfterExpiration(t *testing.T) {
MilliCPU: 100,
Memory: 500,
},
pods: []*api.Pod{basePod},
allocatableResource: &Resource{},
pods: []*api.Pod{basePod},
},
}}

Expand Down Expand Up @@ -288,7 +294,8 @@ func TestUpdatePod(t *testing.T) {
MilliCPU: 200,
Memory: 1024,
},
pods: []*api.Pod{testPods[1]},
allocatableResource: &Resource{},
pods: []*api.Pod{testPods[1]},
}, {
requestedResource: &Resource{
MilliCPU: 100,
Expand All @@ -298,7 +305,8 @@ func TestUpdatePod(t *testing.T) {
MilliCPU: 100,
Memory: 500,
},
pods: []*api.Pod{testPods[0]},
allocatableResource: &Resource{},
pods: []*api.Pod{testPods[0]},
}},
}}

Expand Down Expand Up @@ -351,7 +359,8 @@ func TestExpireAddUpdatePod(t *testing.T) {
MilliCPU: 200,
Memory: 1024,
},
pods: []*api.Pod{testPods[1]},
allocatableResource: &Resource{},
pods: []*api.Pod{testPods[1]},
}, {
requestedResource: &Resource{
MilliCPU: 100,
Expand All @@ -361,7 +370,8 @@ func TestExpireAddUpdatePod(t *testing.T) {
MilliCPU: 100,
Memory: 500,
},
pods: []*api.Pod{testPods[0]},
allocatableResource: &Resource{},
pods: []*api.Pod{testPods[0]},
}},
}}

Expand Down Expand Up @@ -414,7 +424,8 @@ func TestRemovePod(t *testing.T) {
MilliCPU: 100,
Memory: 500,
},
pods: []*api.Pod{basePod},
allocatableResource: &Resource{},
pods: []*api.Pod{basePod},
},
}}

Expand Down
37 changes: 27 additions & 10 deletions plugin/pkg/scheduler/schedulercache/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type NodeInfo struct {
requestedResource *Resource
pods []*api.Pod
nonzeroRequest *Resource
// We store allocatedResources (which is Node.Status.Allocatable.*) explicitly
// as int64, to avoid conversions and accessing map.
allocatableResource *Resource
// We store allowedPodNumber (which is Node.Status.Allocatable.Pods().Value())
// explicitly as int, to avoid conversions and improve performance.
allowedPodNumber int
Expand All @@ -60,10 +63,11 @@ type Resource struct {
// the returned object.
func NewNodeInfo(pods ...*api.Pod) *NodeInfo {
ni := &NodeInfo{
requestedResource: &Resource{},
nonzeroRequest: &Resource{},
allowedPodNumber: 0,
generation: 0,
requestedResource: &Resource{},
nonzeroRequest: &Resource{},
allocatableResource: &Resource{},
allowedPodNumber: 0,
generation: 0,
}
for _, pod := range pods {
ni.addPod(pod)
Expand Down Expand Up @@ -110,15 +114,24 @@ func (n *NodeInfo) NonZeroRequest() Resource {
return *n.nonzeroRequest
}

// AllocatableResource returns allocatable resources on a given node.
func (n *NodeInfo) AllocatableResource() Resource {
if n == nil {
return emptyResource
}
return *n.allocatableResource
}

func (n *NodeInfo) Clone() *NodeInfo {
pods := append([]*api.Pod(nil), n.pods...)
clone := &NodeInfo{
node: n.node,
requestedResource: &(*n.requestedResource),
nonzeroRequest: &(*n.nonzeroRequest),
allowedPodNumber: n.allowedPodNumber,
pods: pods,
generation: n.generation,
node: n.node,
requestedResource: &(*n.requestedResource),
nonzeroRequest: &(*n.nonzeroRequest),
allocatableResource: &(*n.allocatableResource),
allowedPodNumber: n.allowedPodNumber,
pods: pods,
generation: n.generation,
}
return clone
}
Expand Down Expand Up @@ -193,6 +206,9 @@ func calculateResource(pod *api.Pod) (cpu int64, mem int64, nvidia_gpu int64, no
// Sets the overall node information.
func (n *NodeInfo) SetNode(node *api.Node) error {
n.node = node
n.allocatableResource.MilliCPU = node.Status.Allocatable.Cpu().MilliValue()
n.allocatableResource.Memory = node.Status.Allocatable.Memory().Value()
n.allocatableResource.NvidiaGPU = node.Status.Allocatable.NvidiaGPU().Value()
n.allowedPodNumber = int(node.Status.Allocatable.Pods().Value())
n.generation++
return nil
Expand All @@ -205,6 +221,7 @@ func (n *NodeInfo) RemoveNode(node *api.Node) error {
// and thus can potentially be observed later, even though they happened before
// node removal. This is handled correctly in cache.go file.
n.node = nil
n.allocatableResource = &Resource{}
n.allowedPodNumber = 0
n.generation++
return nil
Expand Down

0 comments on commit 7823c57

Please sign in to comment.