Skip to content

Commit

Permalink
Add pod condition PodScheduled to detect situation
Browse files Browse the repository at this point in the history
when scheduler tried to schedule a Pod, but failed.

Ref kubernetes#24404
  • Loading branch information
fgrzadkowski committed May 12, 2016
1 parent 26c99fe commit a80b179
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 6 deletions.
39 changes: 36 additions & 3 deletions pkg/api/resource_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package api

import (
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/unversioned"
)

// Returns string version of ResourceName.
Expand Down Expand Up @@ -80,12 +81,44 @@ func IsPodReadyConditionTrue(status PodStatus) bool {
// Extracts the pod ready condition from the given status and returns that.
// Returns nil if the condition is not present.
func GetPodReadyCondition(status PodStatus) *PodCondition {
_, condition := GetPodCondition(&status, PodReady)
return condition
}

func GetPodCondition(status *PodStatus, conditionType PodConditionType) (int, *PodCondition) {
for i, c := range status.Conditions {
if c.Type == PodReady {
return &status.Conditions[i]
if c.Type == conditionType {
return i, &status.Conditions[i]
}
}
return -1, nil
}

// Updates existing pod condition or creates a new one. Sets LastTransitionTime to now if the
// status has changed.
// Returns true if pod condition has changed or has been added.
func UpdatePodCondition(status *PodStatus, condition *PodCondition) bool {
condition.LastTransitionTime = unversioned.Now()
// Try to find this pod condition.
conditionIndex, oldCondition := GetPodCondition(status, condition.Type)

if oldCondition == nil {
// We are adding new pod condition.
status.Conditions = append(status.Conditions, *condition)
return true
} else {
// We are updating an existing condition, so we need to check if it has changed.
if condition.Status == oldCondition.Status {
condition.LastTransitionTime = oldCondition.LastTransitionTime
}
status.Conditions[conditionIndex] = *condition
// Return true if one of the fields have changed.
return condition.Status != oldCondition.Status ||
condition.Reason != oldCondition.Reason ||
condition.Message != oldCondition.Message ||
!condition.LastProbeTime.Equal(oldCondition.LastProbeTime) ||
!condition.LastTransitionTime.Equal(oldCondition.LastTransitionTime)
}
return nil
}

// IsNodeReady returns true if a node is ready; false otherwise.
Expand Down
2 changes: 2 additions & 0 deletions pkg/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1058,6 +1058,8 @@ type PodConditionType string

// These are valid conditions of pod.
const (
// PodScheduled represents status of the scheduling process for this pod.
PodScheduled PodConditionType = "PodScheduled"
// PodReady means the pod is able to service requests and should be added to the
// load balancing pools of all matching services.
PodReady PodConditionType = "Ready"
Expand Down
2 changes: 2 additions & 0 deletions pkg/api/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1288,6 +1288,8 @@ type PodConditionType string

// These are valid conditions of pod.
const (
// PodScheduled represents status of the scheduling process for this pod.
PodScheduled PodConditionType = "PodScheduled"
// PodReady means the pod is able to service requests and should be added to the
// load balancing pools of all matching services.
PodReady PodConditionType = "Ready"
Expand Down
10 changes: 10 additions & 0 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -3286,6 +3286,16 @@ func (kl *Kubelet) generateAPIPodStatus(pod *api.Pod, podStatus *kubecontainer.P
s.Phase = GetPhase(spec, s.ContainerStatuses)
kl.probeManager.UpdatePodStatus(pod.UID, s)
s.Conditions = append(s.Conditions, status.GeneratePodReadyCondition(spec, s.ContainerStatuses, s.Phase))
// s (the PodStatus we are creating) will not have a PodScheduled condition yet, because converStatusToAPIStatus()
// does not create one. If the existing PodStatus has a PodScheduled condition, then copy it into s and make sure
// it is set to true. If the existing PodStatus does not have a PodScheduled condition, then create one that is set to true.
if _, oldPodScheduled := api.GetPodCondition(&pod.Status, api.PodScheduled); oldPodScheduled != nil {
s.Conditions = append(s.Conditions, *oldPodScheduled)
}
api.UpdatePodCondition(&pod.Status, &api.PodCondition{
Type: api.PodScheduled,
Status: api.ConditionTrue,
})

if !kl.standaloneMode {
hostIP, err := kl.GetHostIP()
Expand Down
4 changes: 4 additions & 0 deletions pkg/registry/pod/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ func (r *BindingREST) setPodHostAndAnnotations(ctx api.Context, podID, oldMachin
for k, v := range annotations {
pod.Annotations[k] = v
}
api.UpdatePodCondition(&pod.Status, &api.PodCondition{
Type: api.PodScheduled,
Status: api.ConditionTrue,
})
finalPod = pod
return pod, nil
}))
Expand Down
20 changes: 17 additions & 3 deletions plugin/pkg/scheduler/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,10 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
return &scheduler.Config{
SchedulerCache: f.schedulerCache,
// The scheduler only needs to consider schedulable nodes.
NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()),
Algorithm: algo,
Binder: &binder{f.Client},
NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()),
Algorithm: algo,
Binder: &binder{f.Client},
PodConditionUpdater: &podConditionUpdater{f.Client},
NextPod: func() *api.Pod {
return f.getNextPod()
},
Expand Down Expand Up @@ -448,6 +449,19 @@ func (b *binder) Bind(binding *api.Binding) error {
// return b.Pods(binding.Namespace).Bind(binding)
}

type podConditionUpdater struct {
*client.Client
}

func (p *podConditionUpdater) Update(pod *api.Pod, condition *api.PodCondition) error {
glog.V(2).Infof("Updating pod condition for %s/%s to (%s==%s)", pod.Namespace, pod.Name, condition.Type, condition.Status)
if api.UpdatePodCondition(&pod.Status, condition) {
_, err := p.Pods(pod.Namespace).UpdateStatus(pod)
return err
}
return nil
}

type clock interface {
Now() time.Time
}
Expand Down
22 changes: 22 additions & 0 deletions plugin/pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ type Binder interface {
Bind(binding *api.Binding) error
}

type PodConditionUpdater interface {
Update(pod *api.Pod, podCondition *api.PodCondition) error
}

// Scheduler watches for new unscheduled pods. It attempts to find
// nodes that they fit on and writes bindings back to the api server.
type Scheduler struct {
Expand All @@ -50,6 +54,10 @@ type Config struct {
NodeLister algorithm.NodeLister
Algorithm algorithm.ScheduleAlgorithm
Binder Binder
// PodConditionUpdater is used only in case of scheduling errors. If we succeed
// with scheduling, PodScheduled condition will be updated in apiserver in /bind
// handler so that binding and setting PodCondition it is atomic.
PodConditionUpdater PodConditionUpdater

// NextPod should be a function that blocks until the next pod
// is available. We don't use a channel for this, because scheduling
Expand Down Expand Up @@ -92,6 +100,12 @@ func (s *Scheduler) scheduleOne() {
glog.V(1).Infof("Failed to schedule: %+v", pod)
s.config.Error(pod, err)
s.config.Recorder.Eventf(pod, api.EventTypeWarning, "FailedScheduling", "%v", err)
s.config.PodConditionUpdater.Update(pod, &api.PodCondition{
Type: api.PodScheduled,
Status: api.ConditionFalse,
Reason: "Unschedulable",
Message: err.Error(),
})
return
}
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start))
Expand Down Expand Up @@ -120,11 +134,19 @@ func (s *Scheduler) scheduleOne() {
}

bindingStart := time.Now()
// If binding succeded then PodScheduled condition will be updated in apiserver so that
// it's atomic with setting host.
err := s.config.Binder.Bind(b)
if err != nil {
glog.V(1).Infof("Failed to bind pod: %+v", err)
s.config.Error(pod, err)
s.config.Recorder.Eventf(pod, api.EventTypeNormal, "FailedScheduling", "Binding rejected: %v", err)
s.config.PodConditionUpdater.Update(pod, &api.PodCondition{
Type: api.PodScheduled,
Status: api.ConditionFalse,
Reason: "BindingRejected",
Message: err.Error(),
})
return
}
metrics.BindingLatency.Observe(metrics.SinceInMicroseconds(bindingStart))
Expand Down
7 changes: 7 additions & 0 deletions plugin/pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ type fakeBinder struct {

func (fb fakeBinder) Bind(binding *api.Binding) error { return fb.b(binding) }

type fakePodConditionUpdater struct{}

func (fc fakePodConditionUpdater) Update(pod *api.Pod, podCondition *api.PodCondition) error {
return nil
}

func podWithID(id, desiredHost string) *api.Pod {
return &api.Pod{
ObjectMeta: api.ObjectMeta{Name: id, SelfLink: testapi.Default.SelfLink("pods", id)},
Expand Down Expand Up @@ -128,6 +134,7 @@ func TestScheduler(t *testing.T) {
gotBinding = b
return item.injectBindError
}},
PodConditionUpdater: fakePodConditionUpdater{},
Error: func(p *api.Pod, err error) {
gotPod = p
gotError = err
Expand Down
6 changes: 6 additions & 0 deletions test/e2e/scheduler_predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,14 @@ func getPodsScheduled(pods *api.PodList) (scheduledPods, notScheduledPods []api.
for _, pod := range pods.Items {
if !masterNodes.Has(pod.Spec.NodeName) {
if pod.Spec.NodeName != "" {
_, scheduledCondition := api.GetPodCondition(&pod.Status, api.PodScheduled)
Expect(scheduledCondition != nil).To(Equal(true))
Expect(scheduledCondition.Status).To(Equal(api.ConditionTrue))
scheduledPods = append(scheduledPods, pod)
} else {
_, scheduledCondition := api.GetPodCondition(&pod.Status, api.PodScheduled)
Expect(scheduledCondition != nil).To(Equal(true))
Expect(scheduledCondition.Status).To(Equal(api.ConditionFalse))
notScheduledPods = append(notScheduledPods, pod)
}
}
Expand Down

0 comments on commit a80b179

Please sign in to comment.