diff --git a/pkg/api/resource_helpers.go b/pkg/api/resource_helpers.go index 7a98a4c2f0701..3ebd80df90363 100644 --- a/pkg/api/resource_helpers.go +++ b/pkg/api/resource_helpers.go @@ -18,6 +18,7 @@ package api import ( "k8s.io/kubernetes/pkg/api/resource" + "k8s.io/kubernetes/pkg/api/unversioned" ) // Returns string version of ResourceName. @@ -80,12 +81,44 @@ func IsPodReadyConditionTrue(status PodStatus) bool { // Extracts the pod ready condition from the given status and returns that. // Returns nil if the condition is not present. func GetPodReadyCondition(status PodStatus) *PodCondition { + _, condition := GetPodCondition(&status, PodReady) + return condition +} + +func GetPodCondition(status *PodStatus, conditionType PodConditionType) (int, *PodCondition) { for i, c := range status.Conditions { - if c.Type == PodReady { - return &status.Conditions[i] + if c.Type == conditionType { + return i, &status.Conditions[i] + } + } + return -1, nil +} + +// Updates existing pod condition or creates a new one. Sets LastTransitionTime to now if the +// status has changed. +// Returns true if pod condition has changed or has been added. +func UpdatePodCondition(status *PodStatus, condition *PodCondition) bool { + condition.LastTransitionTime = unversioned.Now() + // Try to find this pod condition. + conditionIndex, oldCondition := GetPodCondition(status, condition.Type) + + if oldCondition == nil { + // We are adding new pod condition. + status.Conditions = append(status.Conditions, *condition) + return true + } else { + // We are updating an existing condition, so we need to check if it has changed. + if condition.Status == oldCondition.Status { + condition.LastTransitionTime = oldCondition.LastTransitionTime } + status.Conditions[conditionIndex] = *condition + // Return true if one of the fields have changed. + return condition.Status != oldCondition.Status || + condition.Reason != oldCondition.Reason || + condition.Message != oldCondition.Message || + !condition.LastProbeTime.Equal(oldCondition.LastProbeTime) || + !condition.LastTransitionTime.Equal(oldCondition.LastTransitionTime) } - return nil } // IsNodeReady returns true if a node is ready; false otherwise. diff --git a/pkg/api/types.go b/pkg/api/types.go index 4b6ec153b8966..436d8dc8a9b49 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -1071,6 +1071,8 @@ type PodConditionType string // These are valid conditions of pod. const ( + // PodScheduled represents status of the scheduling process for this pod. + PodScheduled PodConditionType = "PodScheduled" // PodReady means the pod is able to service requests and should be added to the // load balancing pools of all matching services. PodReady PodConditionType = "Ready" diff --git a/pkg/api/v1/types.go b/pkg/api/v1/types.go index 00381d20e8314..551624b5da276 100644 --- a/pkg/api/v1/types.go +++ b/pkg/api/v1/types.go @@ -1301,6 +1301,8 @@ type PodConditionType string // These are valid conditions of pod. const ( + // PodScheduled represents status of the scheduling process for this pod. + PodScheduled PodConditionType = "PodScheduled" // PodReady means the pod is able to service requests and should be added to the // load balancing pools of all matching services. PodReady PodConditionType = "Ready" diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 1973f523fe018..4625a12b17bf1 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -3351,6 +3351,16 @@ func (kl *Kubelet) generateAPIPodStatus(pod *api.Pod, podStatus *kubecontainer.P s.Phase = GetPhase(spec, s.ContainerStatuses) kl.probeManager.UpdatePodStatus(pod.UID, s) s.Conditions = append(s.Conditions, status.GeneratePodReadyCondition(spec, s.ContainerStatuses, s.Phase)) + // s (the PodStatus we are creating) will not have a PodScheduled condition yet, because converStatusToAPIStatus() + // does not create one. If the existing PodStatus has a PodScheduled condition, then copy it into s and make sure + // it is set to true. If the existing PodStatus does not have a PodScheduled condition, then create one that is set to true. + if _, oldPodScheduled := api.GetPodCondition(&pod.Status, api.PodScheduled); oldPodScheduled != nil { + s.Conditions = append(s.Conditions, *oldPodScheduled) + } + api.UpdatePodCondition(&pod.Status, &api.PodCondition{ + Type: api.PodScheduled, + Status: api.ConditionTrue, + }) if !kl.standaloneMode { hostIP, err := kl.GetHostIP() diff --git a/pkg/registry/pod/etcd/etcd.go b/pkg/registry/pod/etcd/etcd.go index 0a3e364127882..dda8c9af53e7c 100644 --- a/pkg/registry/pod/etcd/etcd.go +++ b/pkg/registry/pod/etcd/etcd.go @@ -166,6 +166,10 @@ func (r *BindingREST) setPodHostAndAnnotations(ctx api.Context, podID, oldMachin for k, v := range annotations { pod.Annotations[k] = v } + api.UpdatePodCondition(&pod.Status, &api.PodCondition{ + Type: api.PodScheduled, + Status: api.ConditionTrue, + }) finalPod = pod return pod, nil })) diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index c4c5ccc908ddb..e8aa6c034013f 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -379,9 +379,10 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, return &scheduler.Config{ SchedulerCache: f.schedulerCache, // The scheduler only needs to consider schedulable nodes. - NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()), - Algorithm: algo, - Binder: &binder{f.Client}, + NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()), + Algorithm: algo, + Binder: &binder{f.Client}, + PodConditionUpdater: &podConditionUpdater{f.Client}, NextPod: func() *api.Pod { return f.getNextPod() }, @@ -541,6 +542,19 @@ func (b *binder) Bind(binding *api.Binding) error { // return b.Pods(binding.Namespace).Bind(binding) } +type podConditionUpdater struct { + *client.Client +} + +func (p *podConditionUpdater) Update(pod *api.Pod, condition *api.PodCondition) error { + glog.V(2).Infof("Updating pod condition for %s/%s to (%s==%s)", pod.Namespace, pod.Name, condition.Type, condition.Status) + if api.UpdatePodCondition(&pod.Status, condition) { + _, err := p.Pods(pod.Namespace).UpdateStatus(pod) + return err + } + return nil +} + type clock interface { Now() time.Time } diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go index b9b6dd32021a9..236b6ae2f00bb 100644 --- a/plugin/pkg/scheduler/scheduler.go +++ b/plugin/pkg/scheduler/scheduler.go @@ -37,6 +37,10 @@ type Binder interface { Bind(binding *api.Binding) error } +type PodConditionUpdater interface { + Update(pod *api.Pod, podCondition *api.PodCondition) error +} + // Scheduler watches for new unscheduled pods. It attempts to find // nodes that they fit on and writes bindings back to the api server. type Scheduler struct { @@ -50,6 +54,10 @@ type Config struct { NodeLister algorithm.NodeLister Algorithm algorithm.ScheduleAlgorithm Binder Binder + // PodConditionUpdater is used only in case of scheduling errors. If we succeed + // with scheduling, PodScheduled condition will be updated in apiserver in /bind + // handler so that binding and setting PodCondition it is atomic. + PodConditionUpdater PodConditionUpdater // NextPod should be a function that blocks until the next pod // is available. We don't use a channel for this, because scheduling @@ -92,6 +100,12 @@ func (s *Scheduler) scheduleOne() { glog.V(1).Infof("Failed to schedule: %+v", pod) s.config.Error(pod, err) s.config.Recorder.Eventf(pod, api.EventTypeWarning, "FailedScheduling", "%v", err) + s.config.PodConditionUpdater.Update(pod, &api.PodCondition{ + Type: api.PodScheduled, + Status: api.ConditionFalse, + Reason: "Unschedulable", + Message: err.Error(), + }) return } metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start)) @@ -120,11 +134,19 @@ func (s *Scheduler) scheduleOne() { } bindingStart := time.Now() + // If binding succeded then PodScheduled condition will be updated in apiserver so that + // it's atomic with setting host. err := s.config.Binder.Bind(b) if err != nil { glog.V(1).Infof("Failed to bind pod: %+v", err) s.config.Error(pod, err) s.config.Recorder.Eventf(pod, api.EventTypeNormal, "FailedScheduling", "Binding rejected: %v", err) + s.config.PodConditionUpdater.Update(pod, &api.PodCondition{ + Type: api.PodScheduled, + Status: api.ConditionFalse, + Reason: "BindingRejected", + Message: err.Error(), + }) return } metrics.BindingLatency.Observe(metrics.SinceInMicroseconds(bindingStart)) diff --git a/plugin/pkg/scheduler/scheduler_test.go b/plugin/pkg/scheduler/scheduler_test.go index deac028340834..066d5e8ec56f4 100644 --- a/plugin/pkg/scheduler/scheduler_test.go +++ b/plugin/pkg/scheduler/scheduler_test.go @@ -43,6 +43,12 @@ type fakeBinder struct { func (fb fakeBinder) Bind(binding *api.Binding) error { return fb.b(binding) } +type fakePodConditionUpdater struct{} + +func (fc fakePodConditionUpdater) Update(pod *api.Pod, podCondition *api.PodCondition) error { + return nil +} + func podWithID(id, desiredHost string) *api.Pod { return &api.Pod{ ObjectMeta: api.ObjectMeta{Name: id, SelfLink: testapi.Default.SelfLink("pods", id)}, @@ -128,6 +134,7 @@ func TestScheduler(t *testing.T) { gotBinding = b return item.injectBindError }}, + PodConditionUpdater: fakePodConditionUpdater{}, Error: func(p *api.Pod, err error) { gotPod = p gotError = err diff --git a/test/e2e/scheduler_predicates.go b/test/e2e/scheduler_predicates.go index 6f727a0d66a45..fba55636b0bb0 100644 --- a/test/e2e/scheduler_predicates.go +++ b/test/e2e/scheduler_predicates.go @@ -45,8 +45,14 @@ func getPodsScheduled(pods *api.PodList) (scheduledPods, notScheduledPods []api. for _, pod := range pods.Items { if !masterNodes.Has(pod.Spec.NodeName) { if pod.Spec.NodeName != "" { + _, scheduledCondition := api.GetPodCondition(&pod.Status, api.PodScheduled) + Expect(scheduledCondition != nil).To(Equal(true)) + Expect(scheduledCondition.Status).To(Equal(api.ConditionTrue)) scheduledPods = append(scheduledPods, pod) } else { + _, scheduledCondition := api.GetPodCondition(&pod.Status, api.PodScheduled) + Expect(scheduledCondition != nil).To(Equal(true)) + Expect(scheduledCondition.Status).To(Equal(api.ConditionFalse)) notScheduledPods = append(notScheduledPods, pod) } }