diff --git a/cmd/kube-apiserver/app/options/options.go b/cmd/kube-apiserver/app/options/options.go index 9f3e002592971..4cf8e0555cc3e 100644 --- a/cmd/kube-apiserver/app/options/options.go +++ b/cmd/kube-apiserver/app/options/options.go @@ -24,6 +24,7 @@ import ( genericoptions "k8s.io/kubernetes/pkg/genericapiserver/options" kubeletclient "k8s.io/kubernetes/pkg/kubelet/client" "k8s.io/kubernetes/pkg/master/ports" + "k8s.io/kubernetes/pkg/registry/generic/registry" "github.com/spf13/pflag" ) @@ -83,4 +84,5 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.KubeletConfig.CAFile, "kubelet-certificate-authority", s.KubeletConfig.CAFile, "Path to a cert. file for the certificate authority.") // TODO: delete this flag as soon as we identify and fix all clients that send malformed updates, like #14126. fs.BoolVar(&validation.RepairMalformedUpdates, "repair-malformed-updates", validation.RepairMalformedUpdates, "If true, server will do its best to fix the update request to pass the validation, e.g., setting empty UID in update request to its existing value. This flag can be turned off after we fix all the clients that send malformed updates.") + fs.BoolVar(®istry.EnableGarbageCollector, "enable-garbage-collector", false, "Enables the generic garbage collector. MUST be synced with the corresponding flag of the kube-controller-manager.") } diff --git a/docs/admin/kube-apiserver.md b/docs/admin/kube-apiserver.md index e01d0b926767b..9a2443818bb91 100644 --- a/docs/admin/kube-apiserver.md +++ b/docs/admin/kube-apiserver.md @@ -73,6 +73,7 @@ kube-apiserver --cors-allowed-origins=[]: List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled. --delete-collection-workers=1: Number of workers spawned for DeleteCollection call. These are used to speed up namespace cleanup. --deserialization-cache-size=50000: Number of deserialized json objects to cache in memory. + --enable-garbage-collector[=false]: Enables the generic garbage collector. MUST be synced with the corresponding flag of the kube-controller-manager. --enable-swagger-ui[=false]: Enables swagger ui on the apiserver at /swagger-ui --etcd-cafile="": SSL Certificate Authority file used to secure etcd communication --etcd-certfile="": SSL certification file used to secure etcd communication diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index ef05ec48f73e2..1591f9e7581f7 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -111,6 +111,7 @@ enable-debugging-handlers enable-hostpath-provisioner enable-server enable-swagger-ui +enable-garbage-collector etcd-cafile etcd-certfile etcd-config diff --git a/pkg/api/helpers.go b/pkg/api/helpers.go index 7af5ed8af50d3..d173c1fb887ad 100644 --- a/pkg/api/helpers.go +++ b/pkg/api/helpers.go @@ -225,7 +225,9 @@ func IsServiceIPRequested(service *Service) bool { } var standardFinalizers = sets.NewString( - string(FinalizerKubernetes)) + string(FinalizerKubernetes), + FinalizerOrphan, +) func IsStandardFinalizerName(str string) bool { return standardFinalizers.Has(str) diff --git a/pkg/api/meta.go b/pkg/api/meta.go index d0cddcadb07d8..f3f84063dd883 100644 --- a/pkg/api/meta.go +++ b/pkg/api/meta.go @@ -90,6 +90,8 @@ func (meta *ObjectMeta) GetLabels() map[string]string { return m func (meta *ObjectMeta) SetLabels(labels map[string]string) { meta.Labels = labels } func (meta *ObjectMeta) GetAnnotations() map[string]string { return meta.Annotations } func (meta *ObjectMeta) SetAnnotations(annotations map[string]string) { meta.Annotations = annotations } +func (meta *ObjectMeta) GetFinalizers() []string { return meta.Finalizers } +func (meta *ObjectMeta) SetFinalizers(finalizers []string) { meta.Finalizers = finalizers } func (meta *ObjectMeta) GetOwnerReferences() []metatypes.OwnerReference { ret := make([]metatypes.OwnerReference, len(meta.OwnerReferences)) diff --git a/pkg/api/meta/interfaces.go b/pkg/api/meta/interfaces.go index 36d41e8bda79b..90d7501801e25 100644 --- a/pkg/api/meta/interfaces.go +++ b/pkg/api/meta/interfaces.go @@ -58,6 +58,8 @@ type Object interface { SetLabels(labels map[string]string) GetAnnotations() map[string]string SetAnnotations(annotations map[string]string) + GetFinalizers() []string + SetFinalizers(finalizers []string) GetOwnerReferences() []metatypes.OwnerReference SetOwnerReferences([]metatypes.OwnerReference) } diff --git a/pkg/api/meta/meta.go b/pkg/api/meta/meta.go index 37ba93bdd92f5..b9c8d69d02cb6 100644 --- a/pkg/api/meta/meta.go +++ b/pkg/api/meta/meta.go @@ -395,6 +395,7 @@ type genericAccessor struct { labels *map[string]string annotations *map[string]string ownerReferences reflect.Value + finalizers *[]string } func (a genericAccessor) GetNamespace() string { @@ -527,6 +528,17 @@ func (a genericAccessor) SetAnnotations(annotations map[string]string) { *a.annotations = annotations } +func (a genericAccessor) GetFinalizers() []string { + if a.finalizers == nil { + return nil + } + return *a.finalizers +} + +func (a genericAccessor) SetFinalizers(finalizers []string) { + *a.finalizers = finalizers +} + func (a genericAccessor) GetOwnerReferences() []metatypes.OwnerReference { var ret []metatypes.OwnerReference s := a.ownerReferences @@ -599,6 +611,9 @@ func extractFromObjectMeta(v reflect.Value, a *genericAccessor) error { if err := runtime.FieldPtr(v, "Annotations", &a.annotations); err != nil { return err } + if err := runtime.FieldPtr(v, "Finalizers", &a.finalizers); err != nil { + return err + } ownerReferences := v.FieldByName("OwnerReferences") if !ownerReferences.IsValid() { return fmt.Errorf("struct %#v lacks OwnerReferences type", v) diff --git a/pkg/api/meta/meta_test.go b/pkg/api/meta/meta_test.go index 83daec0845c33..73eeffb98e259 100644 --- a/pkg/api/meta/meta_test.go +++ b/pkg/api/meta/meta_test.go @@ -43,6 +43,10 @@ func TestAPIObjectMeta(t *testing.T) { SelfLink: "some/place/only/we/know", Labels: map[string]string{"foo": "bar"}, Annotations: map[string]string{"x": "y"}, + Finalizers: []string{ + "finalizer.1", + "finalizer.2", + }, }, } var _ meta.Object = &j.ObjectMeta @@ -72,6 +76,9 @@ func TestAPIObjectMeta(t *testing.T) { if e, a := "some/place/only/we/know", accessor.GetSelfLink(); e != a { t.Errorf("expected %v, got %v", e, a) } + if e, a := []string{"finalizer.1", "finalizer.2"}, accessor.GetFinalizers(); !reflect.DeepEqual(e, a) { + t.Errorf("expected %v, got %v", e, a) + } typeAccessor, err := meta.TypeAccessor(j) if err != nil { @@ -92,6 +99,7 @@ func TestAPIObjectMeta(t *testing.T) { typeAccessor.SetKind("d") accessor.SetResourceVersion("2") accessor.SetSelfLink("google.com") + accessor.SetFinalizers([]string{"finalizer.3"}) // Prove that accessor changes the original object. if e, a := "baz", j.Namespace; e != a { @@ -118,6 +126,9 @@ func TestAPIObjectMeta(t *testing.T) { if e, a := "google.com", j.SelfLink; e != a { t.Errorf("expected %v, got %v", e, a) } + if e, a := []string{"finalizer.3"}, j.Finalizers; !reflect.DeepEqual(e, a) { + t.Errorf("expected %v, got %v", e, a) + } typeAccessor.SetAPIVersion("d") typeAccessor.SetKind("e") @@ -143,6 +154,7 @@ func TestGenericTypeMeta(t *testing.T) { Labels map[string]string `json:"labels,omitempty"` Annotations map[string]string `json:"annotations,omitempty"` OwnerReferences []api.OwnerReference `json:"ownerReferences,omitempty"` + Finalizers []string `json:"finalizers,omitempty"` } type Object struct { TypeMeta `json:",inline"` @@ -159,6 +171,7 @@ func TestGenericTypeMeta(t *testing.T) { SelfLink: "some/place/only/we/know", Labels: map[string]string{"foo": "bar"}, Annotations: map[string]string{"x": "y"}, + Finalizers: []string{"finalizer.1", "finalizer.2"}, }, } accessor, err := meta.Accessor(&j) @@ -183,6 +196,9 @@ func TestGenericTypeMeta(t *testing.T) { if e, a := "some/place/only/we/know", accessor.GetSelfLink(); e != a { t.Errorf("expected %v, got %v", e, a) } + if e, a := []string{"finalizer.1", "finalizer.2"}, accessor.GetFinalizers(); !reflect.DeepEqual(e, a) { + t.Errorf("expected %v, got %v", e, a) + } typeAccessor, err := meta.TypeAccessor(&j) if err != nil { @@ -203,6 +219,7 @@ func TestGenericTypeMeta(t *testing.T) { typeAccessor.SetKind("d") accessor.SetResourceVersion("2") accessor.SetSelfLink("google.com") + accessor.SetFinalizers([]string{"finalizer.3"}) // Prove that accessor changes the original object. if e, a := "baz", j.Namespace; e != a { @@ -229,6 +246,9 @@ func TestGenericTypeMeta(t *testing.T) { if e, a := "google.com", j.SelfLink; e != a { t.Errorf("expected %v, got %v", e, a) } + if e, a := []string{"finalizer.3"}, j.Finalizers; !reflect.DeepEqual(e, a) { + t.Errorf("expected %v, got %v", e, a) + } typeAccessor.SetAPIVersion("d") typeAccessor.SetKind("e") @@ -252,6 +272,7 @@ type InternalTypeMeta struct { APIVersion string `json:"apiVersion,omitempty"` Labels map[string]string `json:"labels,omitempty"` Annotations map[string]string `json:"annotations,omitempty"` + Finalizers []string `json:"finalizers,omitempty"` OwnerReferences []api.OwnerReference `json:"ownerReferences,omitempty"` } @@ -435,6 +456,7 @@ func TestGenericObjectMeta(t *testing.T) { ResourceVersion string `json:"resourceVersion,omitempty"` Labels map[string]string `json:"labels,omitempty"` Annotations map[string]string `json:"annotations,omitempty"` + Finalizers []string `json:"finalizers,omitempty"` OwnerReferences []api.OwnerReference `json:"ownerReferences,omitempty"` } type Object struct { @@ -455,6 +477,10 @@ func TestGenericObjectMeta(t *testing.T) { SelfLink: "some/place/only/we/know", Labels: map[string]string{"foo": "bar"}, Annotations: map[string]string{"a": "b"}, + Finalizers: []string{ + "finalizer.1", + "finalizer.2", + }, }, } accessor, err := meta.Accessor(&j) @@ -485,6 +511,9 @@ func TestGenericObjectMeta(t *testing.T) { if e, a := 1, len(accessor.GetAnnotations()); e != a { t.Errorf("expected %v, got %v", e, a) } + if e, a := []string{"finalizer.1", "finalizer.2"}, accessor.GetFinalizers(); !reflect.DeepEqual(e, a) { + t.Errorf("expected %v, got %v", e, a) + } typeAccessor, err := meta.TypeAccessor(&j) if err != nil { @@ -507,6 +536,7 @@ func TestGenericObjectMeta(t *testing.T) { accessor.SetSelfLink("google.com") accessor.SetLabels(map[string]string{"other": "label"}) accessor.SetAnnotations(map[string]string{"c": "d"}) + accessor.SetFinalizers([]string{"finalizer.3"}) // Prove that accessor changes the original object. if e, a := "baz", j.Namespace; e != a { @@ -539,6 +569,9 @@ func TestGenericObjectMeta(t *testing.T) { if e, a := map[string]string{"c": "d"}, j.Annotations; !reflect.DeepEqual(e, a) { t.Errorf("expected %#v, got %#v", e, a) } + if e, a := []string{"finalizer.3"}, j.Finalizers; !reflect.DeepEqual(e, a) { + t.Errorf("expected %v, got %v", e, a) + } } func TestGenericListMeta(t *testing.T) { diff --git a/pkg/api/types.go b/pkg/api/types.go index 96d187d9cfc99..d05702bdf1333 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -2100,6 +2100,7 @@ type FinalizerName string // These are internal finalizer values to Kubernetes, must be qualified name unless defined here const ( FinalizerKubernetes FinalizerName = "kubernetes" + FinalizerOrphan string = "orphan" ) // NamespaceStatus is information about the current status of a Namespace. diff --git a/pkg/api/validation/validation.go b/pkg/api/validation/validation.go index d494f4ec1fbc5..d967ddf185f69 100644 --- a/pkg/api/validation/validation.go +++ b/pkg/api/validation/validation.go @@ -323,7 +323,9 @@ func ValidateObjectMeta(meta *api.ObjectMeta, requiresNamespace bool, nameFn Val allErrs = append(allErrs, unversionedvalidation.ValidateLabels(meta.Labels, fldPath.Child("labels"))...) allErrs = append(allErrs, ValidateAnnotations(meta.Annotations, fldPath.Child("annotations"))...) allErrs = append(allErrs, ValidateOwnerReferences(meta.OwnerReferences, fldPath.Child("ownerReferences"))...) - + for _, finalizer := range meta.Finalizers { + allErrs = append(allErrs, validateFinalizerName(finalizer, fldPath.Child("finalizers"))...) + } return allErrs } @@ -373,7 +375,6 @@ func ValidateObjectMetaUpdate(newMeta, oldMeta *api.ObjectMeta, fldPath *field.P allErrs = append(allErrs, ValidateImmutableField(newMeta.Namespace, oldMeta.Namespace, fldPath.Child("namespace"))...) allErrs = append(allErrs, ValidateImmutableField(newMeta.UID, oldMeta.UID, fldPath.Child("uid"))...) allErrs = append(allErrs, ValidateImmutableField(newMeta.CreationTimestamp, oldMeta.CreationTimestamp, fldPath.Child("creationTimestamp"))...) - allErrs = append(allErrs, ValidateImmutableField(newMeta.Finalizers, oldMeta.Finalizers, fldPath.Child("finalizers"))...) allErrs = append(allErrs, unversionedvalidation.ValidateLabels(newMeta.Labels, fldPath.Child("labels"))...) allErrs = append(allErrs, ValidateAnnotations(newMeta.Annotations, fldPath.Child("annotations"))...) diff --git a/pkg/api/validation/validation_test.go b/pkg/api/validation/validation_test.go index 2da1d31327799..e493672729ca8 100644 --- a/pkg/api/validation/validation_test.go +++ b/pkg/api/validation/validation_test.go @@ -267,21 +267,6 @@ func TestValidateObjectMetaTrimsTrailingSlash(t *testing.T) { } } -// Ensure updating finalizers is disallowed -func TestValidateObjectMetaUpdateDisallowsUpdatingFinalizers(t *testing.T) { - errs := ValidateObjectMetaUpdate( - &api.ObjectMeta{Name: "test", ResourceVersion: "1", Finalizers: []string{"orphaning"}}, - &api.ObjectMeta{Name: "test", ResourceVersion: "1"}, - field.NewPath("field"), - ) - if len(errs) != 1 { - t.Fatalf("unexpected errors: %v", errs) - } - if !strings.Contains(errs[0].Error(), "field is immutable") { - t.Errorf("unexpected error message: %v", errs) - } -} - func TestValidateAnnotations(t *testing.T) { successCases := []map[string]string{ {"simple": "bar"}, diff --git a/pkg/client/cache/delta_fifo.go b/pkg/client/cache/delta_fifo.go index 800d952bb2ce4..3603c5f002836 100644 --- a/pkg/client/cache/delta_fifo.go +++ b/pkg/client/cache/delta_fifo.go @@ -84,7 +84,7 @@ func NewDeltaFIFO(keyFunc KeyFunc, compressor DeltaCompressor, knownObjects KeyL // A note on the KeyLister used by the DeltaFIFO: It's main purpose is // to list keys that are "known", for the purpose of figuring out which // items have been deleted when Replace() or Delete() are called. The deleted -// objet will be included in the DeleteFinalStateUnknown markers. These objects +// object will be included in the DeleteFinalStateUnknown markers. These objects // could be stale. // // You may provide a function to compress deltas (e.g., represent a diff --git a/pkg/controller/garbagecollector/garbagecollector.go b/pkg/controller/garbagecollector/garbagecollector.go index 8d8044f728035..91d1b228a2308 100644 --- a/pkg/controller/garbagecollector/garbagecollector.go +++ b/pkg/controller/garbagecollector/garbagecollector.go @@ -18,6 +18,7 @@ package garbagecollector import ( "fmt" + "sync" "time" "github.com/golang/glog" @@ -34,6 +35,7 @@ import ( "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/types" + utilerrors "k8s.io/kubernetes/pkg/util/errors" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait" @@ -63,13 +65,27 @@ func (s objectReference) String() string { // GarbageCollector.processItem() reads the nodes, but it only reads the fields // that never get changed by Propagator.processEvent(). type node struct { - identity objectReference - dependents map[*node]struct{} + identity objectReference + // dependents will be read by the orphan() routine, we need to protect it with a lock. + dependentsLock *sync.RWMutex + dependents map[*node]struct{} // When processing an Update event, we need to compare the updated // ownerReferences with the owners recorded in the graph. owners []metatypes.OwnerReference } +func (ownerNode *node) addDependent(dependent *node) { + ownerNode.dependentsLock.Lock() + defer ownerNode.dependentsLock.Unlock() + ownerNode.dependents[dependent] = struct{}{} +} + +func (ownerNode *node) deleteDependent(dependent *node) { + ownerNode.dependentsLock.Lock() + defer ownerNode.dependentsLock.Unlock() + delete(ownerNode.dependents, dependent) +} + type eventType int const ( @@ -85,11 +101,35 @@ type event struct { oldObj interface{} } +type concurrentUIDToNode struct { + *sync.RWMutex + uidToNode map[types.UID]*node +} + +func (m *concurrentUIDToNode) Write(node *node) { + m.Lock() + defer m.Unlock() + m.uidToNode[node.identity.UID] = node +} + +func (m *concurrentUIDToNode) Read(uid types.UID) (*node, bool) { + m.RLock() + defer m.RUnlock() + n, ok := m.uidToNode[uid] + return n, ok +} + +func (m *concurrentUIDToNode) Delete(uid types.UID) { + m.Lock() + defer m.Unlock() + delete(m.uidToNode, uid) +} + type Propagator struct { eventQueue *workqueue.Type // uidToNode doesn't require a lock to protect, because only the // single-threaded Propagator.processEvent() reads/writes it. - uidToNode map[types.UID]*node + uidToNode *concurrentUIDToNode gc *GarbageCollector } @@ -99,7 +139,7 @@ type Propagator struct { // processItem() will verify if the owner exists according to the API server. func (p *Propagator) addDependentToOwners(n *node, owners []metatypes.OwnerReference) { for _, owner := range owners { - ownerNode, ok := p.uidToNode[owner.UID] + ownerNode, ok := p.uidToNode.Read(owner.UID) if !ok { // Create a "virtual" node in the graph for the owner if it doesn't // exist in the graph yet. Then enqueue the virtual node into the @@ -111,37 +151,38 @@ func (p *Propagator) addDependentToOwners(n *node, owners []metatypes.OwnerRefer OwnerReference: owner, Namespace: n.identity.Namespace, }, - dependents: make(map[*node]struct{}), + dependentsLock: &sync.RWMutex{}, + dependents: make(map[*node]struct{}), } - p.uidToNode[ownerNode.identity.UID] = ownerNode + p.uidToNode.Write(ownerNode) p.gc.dirtyQueue.Add(ownerNode) } - ownerNode.dependents[n] = struct{}{} + ownerNode.addDependent(n) } } // insertNode insert the node to p.uidToNode; then it finds all owners as listed // in n.owners, and adds the node to their dependents list. func (p *Propagator) insertNode(n *node) { - p.uidToNode[n.identity.UID] = n + p.uidToNode.Write(n) p.addDependentToOwners(n, n.owners) } // removeDependentFromOwners remove n from owners' dependents list. func (p *Propagator) removeDependentFromOwners(n *node, owners []metatypes.OwnerReference) { for _, owner := range owners { - ownerNode, ok := p.uidToNode[owner.UID] + ownerNode, ok := p.uidToNode.Read(owner.UID) if !ok { continue } - delete(ownerNode.dependents, n) + ownerNode.deleteDependent(n) } } // removeNode removes the node from p.uidToNode, then finds all // owners as listed in n.owners, and removes n from their dependents list. func (p *Propagator) removeNode(n *node) { - delete(p.uidToNode, n.identity.UID) + p.uidToNode.Delete(n.identity.UID) p.removeDependentFromOwners(n, n.owners) } @@ -171,6 +212,133 @@ func referencesDiffs(old []metatypes.OwnerReference, new []metatypes.OwnerRefere return added, removed } +func shouldOrphanDependents(e event, accessor meta.Object) bool { + // The delta_fifo may combine the creation and update of the object into one + // event, so we need to check AddEvent as well. + if e.oldObj == nil { + if accessor.GetDeletionTimestamp() == nil { + return false + } + } else { + oldAccessor, err := meta.Accessor(e.oldObj) + if err != nil { + utilruntime.HandleError(fmt.Errorf("cannot access oldObj: %v", err)) + return false + } + // ignore the event if it's not updating DeletionTimestamp from non-nil to nil. + if accessor.GetDeletionTimestamp() == nil || oldAccessor.GetDeletionTimestamp() != nil { + return false + } + } + finalizers := accessor.GetFinalizers() + for _, finalizer := range finalizers { + if finalizer == api.FinalizerOrphan { + return true + } + } + return false +} + +// dependents are copies of pointers to the owner's dependents, they don't need to be locked. +func (gc *GarbageCollector) orhpanDependents(owner objectReference, dependents []*node) error { + var failedDependents []objectReference + var errorsSlice []error + for _, dependent := range dependents { + // the dependent.identity.UID is used as precondition + deleteOwnerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}`, owner.UID, dependent.identity.UID) + _, err := gc.patchObject(dependent.identity, []byte(deleteOwnerRefPatch)) + // note that if the target ownerReference doesn't exist in the + // dependent, strategic merge patch will NOT return an error. + if err != nil && !errors.IsNotFound(err) { + errorsSlice = append(errorsSlice, fmt.Errorf("orphaning %s failed with %v", dependent.identity, err)) + } + } + if len(failedDependents) != 0 { + return fmt.Errorf("failed to orphan dependents of owner %s, got errors: %s", owner, utilerrors.NewAggregate(errorsSlice).Error()) + } + fmt.Println("CHAO: successfully updated all dependents") + return nil +} + +// TODO: Using Patch when strategicmerge supports deleting an entry from a +// slice of a base type. +func (gc *GarbageCollector) removeOrphanFinalizer(owner *node) error { + const retries = 5 + for count := 0; count < retries; count++ { + ownerObject, err := gc.getObject(owner.identity) + if err != nil { + return fmt.Errorf("cannot finalize owner %s, because cannot get it. The garbage collector will retry later.", owner.identity) + } + accessor, err := meta.Accessor(ownerObject) + if err != nil { + return fmt.Errorf("cannot access the owner object: %v. The garbage collector will retry later.", err) + } + finalizers := accessor.GetFinalizers() + var newFinalizers []string + found := false + for _, f := range finalizers { + if f == api.FinalizerOrphan { + found = true + } else { + newFinalizers = append(newFinalizers, f) + } + } + if !found { + glog.V(6).Infof("the orphan finalizer is already removed from object %s", owner.identity) + return nil + } + // remove the owner from dependent's OwnerReferences + ownerObject.SetFinalizers(newFinalizers) + _, err = gc.updateObject(owner.identity, ownerObject) + if err == nil { + return nil + } + if err != nil && !errors.IsConflict(err) { + return fmt.Errorf("cannot update the finalizers of owner %s, with error: %v, tried %d times", owner.identity, err, count+1) + } + // retry if it's a conflict + glog.V(6).Infof("got conflict updating the owner object %s, tried %d times", owner.identity, count+1) + } + return fmt.Errorf("updateMaxRetries(%d) has reached. The garbage collector will retry later for owner %v.", retries, owner.identity) +} + +// orphanFinalizer dequeues a node from the orphanQueue, then finds its dependents +// based on the graph maintained by the GC, then removes it from the +// OwnerReferences of its dependents, and finally updates the owner to remove +// the "Orphan" finalizer. The node is add back into the orphanQueue if any of +// these steps fail. +func (gc *GarbageCollector) orphanFinalizer() { + key, quit := gc.orphanQueue.Get() + if quit { + return + } + defer gc.orphanQueue.Done(key) + owner, ok := key.(*node) + if !ok { + utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", key)) + } + // we don't need to lock each element, because they never get updated + owner.dependentsLock.RLock() + dependents := make([]*node, 0, len(owner.dependents)) + for dependent := range owner.dependents { + dependents = append(dependents, dependent) + } + owner.dependentsLock.RUnlock() + + err := gc.orhpanDependents(owner.identity, dependents) + if err != nil { + glog.V(6).Infof("orphanDependents for %s failed with %v", owner.identity, err) + gc.orphanQueue.Add(owner) + return + } + // update the owner, remove "orphaningFinalizer" from its finalizers list + err = gc.removeOrphanFinalizer(owner) + if err != nil { + glog.V(6).Infof("removeOrphanFinalizer for %s failed with %v", owner.identity, err) + gc.orphanQueue.Add(owner) + } +} + // Dequeueing an event from eventQueue, updating graph, populating dirty_queue. func (p *Propagator) processEvent() { key, quit := p.eventQueue.Get() @@ -196,7 +364,7 @@ func (p *Propagator) processEvent() { } glog.V(6).Infof("Propagator process object: %s/%s, namespace %s, name %s, event type %s", typeAccessor.GetAPIVersion(), typeAccessor.GetKind(), accessor.GetNamespace(), accessor.GetName(), event.eventType) // Check if the node already exsits - existingNode, found := p.uidToNode[accessor.GetUID()] + existingNode, found := p.uidToNode.Read(accessor.GetUID()) switch { case (event.eventType == addEvent || event.eventType == updateEvent) && !found: newNode := &node{ @@ -209,13 +377,24 @@ func (p *Propagator) processEvent() { }, Namespace: accessor.GetNamespace(), }, - dependents: make(map[*node]struct{}), - owners: accessor.GetOwnerReferences(), + dependentsLock: &sync.RWMutex{}, + dependents: make(map[*node]struct{}), + owners: accessor.GetOwnerReferences(), } p.insertNode(newNode) + // the underlying delta_fifo may combine a creation and deletion into one event + if shouldOrphanDependents(event, accessor) { + glog.V(6).Infof("add %s to the orphanQueue", newNode.identity) + p.gc.orphanQueue.Add(newNode) + } case (event.eventType == addEvent || event.eventType == updateEvent) && found: - // TODO: finalizer: Check if ObjectMeta.DeletionTimestamp is updated from nil to non-nil - // We only need to add/remove owner refs for now + // caveat: if GC observes the creation of the dependents later than the + // deletion of the owner, then the orphaning finalizer won't be effective. + if shouldOrphanDependents(event, accessor) { + glog.V(6).Infof("add %s to the orphanQueue", existingNode.identity) + p.gc.orphanQueue.Add(existingNode) + } + // add/remove owner refs added, removed := referencesDiffs(existingNode.owners, accessor.GetOwnerReferences()) if len(added) == 0 && len(removed) == 0 { glog.V(6).Infof("The updateEvent %#v doesn't change node references, ignore", event) @@ -234,6 +413,8 @@ func (p *Propagator) processEvent() { return } p.removeNode(existingNode) + existingNode.dependentsLock.RLock() + defer existingNode.dependentsLock.RUnlock() for dep := range existingNode.dependents { p.gc.dirtyQueue.Add(dep) } @@ -244,11 +425,12 @@ func (p *Propagator) processEvent() { // removing ownerReferences from the dependents if the owner is deleted with // DeleteOptions.OrphanDependents=true. type GarbageCollector struct { - restMapper meta.RESTMapper - clientPool dynamic.ClientPool - dirtyQueue *workqueue.Type - monitors []monitor - propagator *Propagator + restMapper meta.RESTMapper + clientPool dynamic.ClientPool + dirtyQueue *workqueue.Type + orphanQueue *workqueue.Type + monitors []monitor + propagator *Propagator } func monitorFor(p *Propagator, clientPool dynamic.ClientPool, resource unversioned.GroupVersionResource) (monitor, error) { @@ -320,15 +502,19 @@ var ignoredResources = map[unversioned.GroupVersionResource]struct{}{ func NewGarbageCollector(clientPool dynamic.ClientPool, resources []unversioned.GroupVersionResource) (*GarbageCollector, error) { gc := &GarbageCollector{ - clientPool: clientPool, - dirtyQueue: workqueue.New(), + clientPool: clientPool, + dirtyQueue: workqueue.New(), + orphanQueue: workqueue.New(), // TODO: should use a dynamic RESTMapper built from the discovery results. restMapper: registered.RESTMapper(), } gc.propagator = &Propagator{ eventQueue: workqueue.New(), - uidToNode: make(map[types.UID]*node), - gc: gc, + uidToNode: &concurrentUIDToNode{ + RWMutex: &sync.RWMutex{}, + uidToNode: make(map[types.UID]*node), + }, + gc: gc, } for _, resource := range resources { if _, ok := ignoredResources[resource]; ok { @@ -396,6 +582,26 @@ func (gc *GarbageCollector) getObject(item objectReference) (*runtime.Unstructur return client.Resource(resource, item.Namespace).Get(item.Name) } +func (gc *GarbageCollector) updateObject(item objectReference, obj *runtime.Unstructured) (*runtime.Unstructured, error) { + fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind) + client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion()) + resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0) + if err != nil { + return nil, err + } + return client.Resource(resource, item.Namespace).Update(obj) +} + +func (gc *GarbageCollector) patchObject(item objectReference, patch []byte) (*runtime.Unstructured, error) { + fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind) + client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion()) + resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0) + if err != nil { + return nil, err + } + return client.Resource(resource, item.Namespace).Patch(item.Name, api.StrategicMergePatchType, patch) +} + func objectReferenceToUnstructured(ref objectReference) *runtime.Unstructured { ret := &runtime.Unstructured{} ret.SetKind(ref.Kind) @@ -479,17 +685,19 @@ func (gc *GarbageCollector) Run(workers int, stopCh <-chan struct{}) { for i := 0; i < workers; i++ { go wait.Until(gc.worker, 0, stopCh) + go wait.Until(gc.orphanFinalizer, 0, stopCh) } <-stopCh glog.Infof("Shutting down garbage collector") gc.dirtyQueue.ShutDown() + gc.orphanQueue.ShutDown() gc.propagator.eventQueue.ShutDown() } // QueueDrained returns if the dirtyQueue and eventQueue are drained. It's -// useful for debugging. +// useful for debugging. Note that it doesn't guarantee the workers are idle. func (gc *GarbageCollector) QueuesDrained() bool { - return gc.dirtyQueue.Len() == 0 && gc.propagator.eventQueue.Len() == 0 + return gc.dirtyQueue.Len() == 0 && gc.propagator.eventQueue.Len() == 0 && gc.orphanQueue.Len() == 0 } // *FOR TEST USE ONLY* It's not safe to call this function when the GC is still @@ -498,7 +706,7 @@ func (gc *GarbageCollector) QueuesDrained() bool { // uidToNode graph. It's useful for debugging. func (gc *GarbageCollector) GraphHasUID(UIDs []types.UID) bool { for _, u := range UIDs { - if _, ok := gc.propagator.uidToNode[u]; ok { + if _, ok := gc.propagator.uidToNode.Read(u); ok { return true } } diff --git a/pkg/controller/garbagecollector/garbagecollector_test.go b/pkg/controller/garbagecollector/garbagecollector_test.go index a1a935858c87d..6c044a90c60e4 100644 --- a/pkg/controller/garbagecollector/garbagecollector_test.go +++ b/pkg/controller/garbagecollector/garbagecollector_test.go @@ -273,7 +273,10 @@ func TestProcessEvent(t *testing.T) { for _, scenario := range testScenarios { propagator := &Propagator{ eventQueue: workqueue.New(), - uidToNode: make(map[types.UID]*node), + uidToNode: &concurrentUIDToNode{ + RWMutex: &sync.RWMutex{}, + uidToNode: make(map[types.UID]*node), + }, gc: &GarbageCollector{ dirtyQueue: workqueue.New(), }, @@ -281,7 +284,36 @@ func TestProcessEvent(t *testing.T) { for i := 0; i < len(scenario.events); i++ { propagator.eventQueue.Add(scenario.events[i]) propagator.processEvent() - verifyGraphInvariants(scenario.name, propagator.uidToNode, t) + verifyGraphInvariants(scenario.name, propagator.uidToNode.uidToNode, t) } } } + +// TestDependentsRace relies on golang's data race detector to check if there is +// data race among in the dependents field. +func TestDependentsRace(t *testing.T) { + clientPool := dynamic.NewClientPool(&restclient.Config{}, dynamic.LegacyAPIPathResolverFunc) + podResource := []unversioned.GroupVersionResource{{Version: "v1", Resource: "pods"}} + gc, err := NewGarbageCollector(clientPool, podResource) + if err != nil { + t.Fatal(err) + } + + const updates = 100 + owner := &node{dependentsLock: &sync.RWMutex{}, dependents: make(map[*node]struct{})} + ownerUID := types.UID("owner") + gc.propagator.uidToNode.Write(owner) + go func() { + for i := 0; i < updates; i++ { + dependent := &node{} + gc.propagator.addDependentToOwners(dependent, []metatypes.OwnerReference{{UID: ownerUID}}) + gc.propagator.removeDependentFromOwners(dependent, []metatypes.OwnerReference{{UID: ownerUID}}) + } + }() + go func() { + gc.orphanQueue.Add(owner) + for i := 0; i < updates; i++ { + gc.orphanFinalizer() + } + }() +} diff --git a/pkg/registry/generic/registry/store.go b/pkg/registry/generic/registry/store.go index 2a4979e7b7520..3571956400a3f 100644 --- a/pkg/registry/generic/registry/store.go +++ b/pkg/registry/generic/registry/store.go @@ -21,6 +21,7 @@ import ( "reflect" "strings" "sync" + "time" "k8s.io/kubernetes/pkg/api" kubeerr "k8s.io/kubernetes/pkg/api/errors" @@ -41,6 +42,10 @@ import ( "github.com/golang/glog" ) +// EnableGarbageCollector affects the handling of Update and Delete requests. It +// must be synced with the corresponding flag in kube-controller-manager. +var EnableGarbageCollector bool + // Store implements generic.Registry. // It's intended to be embeddable, so that you can implement any // non-generic functions if needed. @@ -235,6 +240,49 @@ func (e *Store) Create(ctx api.Context, obj runtime.Object) (runtime.Object, err return out, nil } +// shouldDelete checks if a Update is removing all the object's finalizers. If so, +// it further checks if the object's DeletionGracePeriodSeconds is 0. If so, it +// returns true. +func (e *Store) shouldDelete(ctx api.Context, key string, obj, existing runtime.Object) bool { + if !EnableGarbageCollector { + return false + } + newMeta, err := api.ObjectMetaFor(obj) + if err != nil { + utilruntime.HandleError(err) + return false + } + oldMeta, err := api.ObjectMetaFor(existing) + if err != nil { + utilruntime.HandleError(err) + return false + } + return len(newMeta.Finalizers) == 0 && oldMeta.DeletionGracePeriodSeconds != nil && *oldMeta.DeletionGracePeriodSeconds == 0 +} + +func (e *Store) deleteForEmptyFinalizers(ctx api.Context, name, key string, obj runtime.Object, preconditions *storage.Preconditions) (runtime.Object, bool, error) { + out := e.NewFunc() + glog.V(6).Infof("going to delete %s from regitry, triggered by update", name) + if err := e.Storage.Delete(ctx, key, out, preconditions); err != nil { + // Deletion is racy, i.e., there could be multiple update + // requests to remove all finalizers from the object, so we + // ignore the NotFound error. + if storage.IsNotFound(err) { + _, err := e.finalizeDelete(obj, true) + // clients are expecting an updated object if a PUT succeeded, + // but finalizeDelete returns a unversioned.Status, so return + // the object in the request instead. + return obj, false, err + } + return nil, false, storeerr.InterpretDeleteError(err, e.QualifiedResource, name) + } + _, err := e.finalizeDelete(out, true) + // clients are expecting an updated object if a PUT succeeded, but + // finalizeDelete returns a unversioned.Status, so return the object in + // the request instead. + return obj, false, err +} + // Update performs an atomic update and set of the object. Returns the result of the update // or an error. If the registry allows create-on-update, the create flow will be executed. // A bool is returned along with the object and any errors, to indicate object creation. @@ -255,7 +303,8 @@ func (e *Store) Update(ctx api.Context, name string, objInfo rest.UpdatedObjectI } out := e.NewFunc() - + // deleteObj is only used in case a deletion is carried out + var deleteObj runtime.Object err = e.Storage.GuaranteedUpdate(ctx, key, out, true, storagePreconditions, func(existing runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { // Given the existing object, get the new object obj, err := objInfo.UpdatedObject(ctx, existing) @@ -321,6 +370,11 @@ func (e *Store) Update(ctx api.Context, name string, objInfo rest.UpdatedObjectI if err := rest.BeforeUpdate(e.UpdateStrategy, ctx, obj, existing); err != nil { return nil, nil, err } + delete := e.shouldDelete(ctx, key, obj, existing) + if delete { + deleteObj = obj + return nil, nil, errEmptiedFinalizers + } ttl, err := e.calculateTTL(obj, res.TTL, true) if err != nil { return nil, nil, err @@ -332,6 +386,10 @@ func (e *Store) Update(ctx api.Context, name string, objInfo rest.UpdatedObjectI }) if err != nil { + // delete the object + if err == errEmptiedFinalizers { + return e.deleteForEmptyFinalizers(ctx, name, key, deleteObj, storagePreconditions) + } if creating { err = storeerr.InterpretCreateError(err, e.QualifiedResource, name) err = rest.CheckGeneratedNameError(e.CreateStrategy, err, creatingObj) @@ -380,10 +438,174 @@ func (e *Store) Get(ctx api.Context, name string) (runtime.Object, error) { } var ( - errAlreadyDeleting = fmt.Errorf("abort delete") - errDeleteNow = fmt.Errorf("delete now") + errAlreadyDeleting = fmt.Errorf("abort delete") + errDeleteNow = fmt.Errorf("delete now") + errEmptiedFinalizers = fmt.Errorf("emptied finalizers") ) +// return if we need to update the finalizers of the object, and the desired list of finalizers +func shouldUpdateFinalizers(accessor meta.Object, options *api.DeleteOptions) (shouldUpdate bool, newFinalizers []string) { + if options == nil || options.OrphanDependents == nil { + return false, accessor.GetFinalizers() + } + shouldOrphan := *options.OrphanDependents + alreadyOrphan := false + finalizers := accessor.GetFinalizers() + newFinalizers = make([]string, 0, len(finalizers)) + for _, f := range finalizers { + if f == api.FinalizerOrphan { + alreadyOrphan = true + if !shouldOrphan { + continue + } + } + newFinalizers = append(newFinalizers, f) + } + if shouldOrphan && !alreadyOrphan { + newFinalizers = append(newFinalizers, api.FinalizerOrphan) + } + shouldUpdate = shouldOrphan != alreadyOrphan + return shouldUpdate, newFinalizers +} + +// markAsDeleting sets the obj's DeletionGracePeriodSeconds to 0, and sets the +// DeletionTimestamp to "now". Finalizers are watching for such updates and will +// finalize the object if their IDs are present in the object's Finalizers list. +func markAsDeleting(obj runtime.Object) (err error) { + objectMeta, kerr := api.ObjectMetaFor(obj) + if kerr != nil { + return kerr + } + now := unversioned.NewTime(time.Now()) + objectMeta.DeletionTimestamp = &now + var zero int64 = 0 + objectMeta.DeletionGracePeriodSeconds = &zero + return nil +} + +// this functions need to be kept synced with updateForGracefulDeletionAndFinalizers. +func (e *Store) updateForGracefulDeletion(ctx api.Context, name, key string, options *api.DeleteOptions, preconditions storage.Preconditions, in runtime.Object) (err error, ignoreNotFound, deleteImmediately bool, out, lastExisting runtime.Object) { + lastGraceful := int64(0) + out = e.NewFunc() + err = e.Storage.GuaranteedUpdate( + ctx, key, out, false, &preconditions, + storage.SimpleUpdate(func(existing runtime.Object) (runtime.Object, error) { + graceful, pendingGraceful, err := rest.BeforeDelete(e.DeleteStrategy, ctx, existing, options) + if err != nil { + return nil, err + } + if pendingGraceful { + return nil, errAlreadyDeleting + } + if !graceful { + return nil, errDeleteNow + } + lastGraceful = *options.GracePeriodSeconds + lastExisting = existing + return existing, nil + }), + ) + switch err { + case nil: + if lastGraceful > 0 { + return nil, false, false, out, lastExisting + } + // If we are here, the registry supports grace period mechanism and + // we are intentionally delete gracelessly. In this case, we may + // enter a race with other k8s components. If other component wins + // the race, the object will not be found, and we should tolerate + // the NotFound error. See + // https://github.com/kubernetes/kubernetes/issues/19403 for + // details. + return nil, true, true, out, lastExisting + case errDeleteNow: + // we've updated the object to have a zero grace period, or it's already at 0, so + // we should fall through and truly delete the object. + return nil, false, true, out, lastExisting + case errAlreadyDeleting: + out, err = e.finalizeDelete(in, true) + return err, false, false, out, lastExisting + default: + return storeerr.InterpretUpdateError(err, e.QualifiedResource, name), false, false, out, lastExisting + } +} + +// this functions need to be kept synced with updateForGracefulDeletion. +func (e *Store) updateForGracefulDeletionAndFinalizers(ctx api.Context, name, key string, options *api.DeleteOptions, preconditions storage.Preconditions, in runtime.Object) (err error, ignoreNotFound, deleteImmediately bool, out, lastExisting runtime.Object) { + lastGraceful := int64(0) + var pendingFinalizers bool + out = e.NewFunc() + err = e.Storage.GuaranteedUpdate( + ctx, key, out, false, &preconditions, + storage.SimpleUpdate(func(existing runtime.Object) (runtime.Object, error) { + graceful, pendingGraceful, err := rest.BeforeDelete(e.DeleteStrategy, ctx, existing, options) + if err != nil { + return nil, err + } + if pendingGraceful { + return nil, errAlreadyDeleting + } + + // Add/remove the orphan finalizer as the options dictates. + // Note that this occurs after checking pendingGraceufl, so + // finalizers cannot be updated via DeleteOptions if deletion has + // started. + existingAccessor, err := meta.Accessor(existing) + if err != nil { + return nil, err + } + shouldUpdate, newFinalizers := shouldUpdateFinalizers(existingAccessor, options) + if shouldUpdate { + existingAccessor.SetFinalizers(newFinalizers) + } + + if !graceful { + // set the DeleteGracePeriods to 0 if the object has pendingFinalizers but not supporting graceful deletion + pendingFinalizers = len(existingAccessor.GetFinalizers()) != 0 + if pendingFinalizers { + glog.V(6).Infof("update the DeletionTimestamp to \"now\" and GracePeriodSeconds to 0 for object %s, because it has pending finalizers", name) + err = markAsDeleting(existing) + if err != nil { + return nil, err + } + return existing, nil + } + return nil, errDeleteNow + } + lastGraceful = *options.GracePeriodSeconds + lastExisting = existing + return existing, nil + }), + ) + switch err { + case nil: + // If there are pending finalizers, we never delete the object immediately. + if pendingFinalizers { + return nil, false, false, out, lastExisting + } + if lastGraceful > 0 { + return nil, false, false, out, lastExisting + } + // If we are here, the registry supports grace period mechanism and + // we are intentionally delete gracelessly. In this case, we may + // enter a race with other k8s components. If other component wins + // the race, the object will not be found, and we should tolerate + // the NotFound error. See + // https://github.com/kubernetes/kubernetes/issues/19403 for + // details. + return nil, true, true, out, lastExisting + case errDeleteNow: + // we've updated the object to have a zero grace period, or it's already at 0, so + // we should fall through and truly delete the object. + return nil, false, true, out, lastExisting + case errAlreadyDeleting: + out, err = e.finalizeDelete(in, true) + return err, false, false, out, lastExisting + default: + return storeerr.InterpretUpdateError(err, e.QualifiedResource, name), false, false, out, lastExisting + } +} + // Delete removes the item from storage. func (e *Store) Delete(ctx api.Context, name string, options *api.DeleteOptions) (runtime.Object, error) { key, err := e.KeyFunc(ctx, name) @@ -395,7 +617,6 @@ func (e *Store) Delete(ctx api.Context, name string, options *api.DeleteOptions) if err := e.Storage.Get(ctx, key, obj, false); err != nil { return nil, storeerr.InterpretDeleteError(err, e.QualifiedResource, name) } - // support older consumers of delete by treating "nil" as delete immediately if options == nil { options = api.NewDeleteOptions(0) @@ -408,58 +629,39 @@ func (e *Store) Delete(ctx api.Context, name string, options *api.DeleteOptions) if err != nil { return nil, err } + // this means finalizers cannot be updated via DeleteOptions if a deletion is already pending if pendingGraceful { return e.finalizeDelete(obj, false) } - var ignoreNotFound bool = false - var lastExisting runtime.Object = nil - if graceful { - out := e.NewFunc() - lastGraceful := int64(0) - err := e.Storage.GuaranteedUpdate( - ctx, key, out, false, &preconditions, - storage.SimpleUpdate(func(existing runtime.Object) (runtime.Object, error) { - graceful, pendingGraceful, err := rest.BeforeDelete(e.DeleteStrategy, ctx, existing, options) - if err != nil { - return nil, err - } - if pendingGraceful { - return nil, errAlreadyDeleting - } - if !graceful { - return nil, errDeleteNow - } - lastGraceful = *options.GracePeriodSeconds - lastExisting = existing - return existing, nil - }), - ) - switch err { - case nil: - if lastGraceful > 0 { - return out, nil - } - // If we are here, the registry supports grace period mechanism and - // we are intentionally delete gracelessly. In this case, we may - // enter a race with other k8s components. If other component wins - // the race, the object will not be found, and we should tolerate - // the NotFound error. See - // https://github.com/kubernetes/kubernetes/issues/19403 for - // details. - ignoreNotFound = true - // exit the switch and delete immediately - case errDeleteNow: - // we've updated the object to have a zero grace period, or it's already at 0, so - // we should fall through and truly delete the object. - case errAlreadyDeleting: - return e.finalizeDelete(obj, true) - default: - return nil, storeerr.InterpretUpdateError(err, e.QualifiedResource, name) + // check if obj has pending finalizers + accessor, err := meta.Accessor(obj) + if err != nil { + return nil, kubeerr.NewInternalError(err) + } + pendingFinalizers := len(accessor.GetFinalizers()) != 0 + var ignoreNotFound bool + var deleteImmediately bool = true + var lastExisting, out runtime.Object + if !EnableGarbageCollector { + // TODO: remove the check on graceful, because we support no-op updates now. + if graceful { + err, ignoreNotFound, deleteImmediately, out, lastExisting = e.updateForGracefulDeletion(ctx, name, key, options, preconditions, obj) + } + } else { + shouldUpdateFinalizers, _ := shouldUpdateFinalizers(accessor, options) + // TODO: remove the check, because we support no-op updates now. + if graceful || pendingFinalizers || shouldUpdateFinalizers { + err, ignoreNotFound, deleteImmediately, out, lastExisting = e.updateForGracefulDeletionAndFinalizers(ctx, name, key, options, preconditions, obj) } } + // !deleteImmediately covers all cases where err != nil. We keep both to be future-proof. + if !deleteImmediately || err != nil { + return out, err + } // delete immediately, or no graceful deletion supported - out := e.NewFunc() + glog.V(6).Infof("going to delete %s from regitry: ", name) + out = e.NewFunc() if err := e.Storage.Delete(ctx, key, out, &preconditions); err != nil { // Please refer to the place where we set ignoreNotFound for the reason // why we ignore the NotFound error . diff --git a/pkg/registry/generic/registry/store_test.go b/pkg/registry/generic/registry/store_test.go index 67a62b1320d74..e26bec885d6ae 100644 --- a/pkg/registry/generic/registry/store_test.go +++ b/pkg/registry/generic/registry/store_test.go @@ -553,6 +553,259 @@ func TestStoreDelete(t *testing.T) { } } +func TestStoreHandleFinalizers(t *testing.T) { + EnableGarbageCollector = true + defer func() { EnableGarbageCollector = false }() + podWithFinalizer := &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "foo", Finalizers: []string{"foo.com/x"}}, + Spec: api.PodSpec{NodeName: "machine"}, + } + + testContext := api.WithNamespace(api.NewContext(), "test") + server, registry := NewTestGenericStoreRegistry(t) + defer server.Terminate(t) + + // create pod + _, err := registry.Create(testContext, podWithFinalizer) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + // delete object with nil delete options doesn't delete the object + _, err = registry.Delete(testContext, podWithFinalizer.Name, nil) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + // the object should still exist + obj, err := registry.Get(testContext, podWithFinalizer.Name) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + podWithFinalizer, ok := obj.(*api.Pod) + if !ok { + t.Errorf("Unexpected object: %#v", obj) + } + if podWithFinalizer.ObjectMeta.DeletionTimestamp == nil { + t.Errorf("Expect the object to have DeletionTimestamp set, but got %#v", podWithFinalizer.ObjectMeta) + } + if podWithFinalizer.ObjectMeta.DeletionGracePeriodSeconds == nil || *podWithFinalizer.ObjectMeta.DeletionGracePeriodSeconds != 0 { + t.Errorf("Expect the object to have 0 DeletionGracePeriodSecond, but got %#v", podWithFinalizer.ObjectMeta) + } + + updatedPodWithFinalizer := &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "foo", Finalizers: []string{"foo.com/x"}, ResourceVersion: podWithFinalizer.ObjectMeta.ResourceVersion}, + Spec: api.PodSpec{NodeName: "machine"}, + } + _, _, err = registry.Update(testContext, updatedPodWithFinalizer.ObjectMeta.Name, rest.DefaultUpdatedObjectInfo(updatedPodWithFinalizer, api.Scheme)) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + // the object should still exist, because it still has a finalizer + obj, err = registry.Get(testContext, podWithFinalizer.Name) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + podWithFinalizer, ok = obj.(*api.Pod) + if !ok { + t.Errorf("Unexpected object: %#v", obj) + } + + podWithNoFinalizer := &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "foo", ResourceVersion: podWithFinalizer.ObjectMeta.ResourceVersion}, + Spec: api.PodSpec{NodeName: "anothermachine"}, + } + _, _, err = registry.Update(testContext, podWithFinalizer.ObjectMeta.Name, rest.DefaultUpdatedObjectInfo(podWithNoFinalizer, api.Scheme)) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + // the pod should be removed, because it's finalizer is removed + _, err = registry.Get(testContext, podWithFinalizer.Name) + if !errors.IsNotFound(err) { + t.Errorf("Unexpected error: %v", err) + } +} + +func TestStoreDeleteWithOrphanDependents(t *testing.T) { + EnableGarbageCollector = true + defer func() { EnableGarbageCollector = false }() + podWithOrphanFinalizer := func(name string) *api.Pod { + return &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: name, Finalizers: []string{"foo.com/x", api.FinalizerOrphan, "bar.com/y"}}, + Spec: api.PodSpec{NodeName: "machine"}, + } + } + podWithOtherFinalizers := func(name string) *api.Pod { + return &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: name, Finalizers: []string{"foo.com/x", "bar.com/y"}}, + Spec: api.PodSpec{NodeName: "machine"}, + } + } + podWithNoFinalizer := func(name string) *api.Pod { + return &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: name}, + Spec: api.PodSpec{NodeName: "machine"}, + } + } + podWithOnlyOrphanFinalizer := func(name string) *api.Pod { + return &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: name, Finalizers: []string{api.FinalizerOrphan}}, + Spec: api.PodSpec{NodeName: "machine"}, + } + } + trueVar, falseVar := true, false + orphanOptions := &api.DeleteOptions{OrphanDependents: &trueVar} + nonOrphanOptions := &api.DeleteOptions{OrphanDependents: &falseVar} + nilOrphanOptions := &api.DeleteOptions{} + + testcases := []struct { + pod *api.Pod + options *api.DeleteOptions + expectNotFound bool + updatedFinalizers []string + }{ + // cases run with DeleteOptions.OrphanDedependents=true + { + podWithOrphanFinalizer("pod1"), + orphanOptions, + false, + []string{"foo.com/x", api.FinalizerOrphan, "bar.com/y"}, + }, + { + podWithOtherFinalizers("pod2"), + orphanOptions, + false, + []string{"foo.com/x", "bar.com/y", api.FinalizerOrphan}, + }, + { + podWithNoFinalizer("pod3"), + orphanOptions, + false, + []string{api.FinalizerOrphan}, + }, + { + podWithOnlyOrphanFinalizer("pod4"), + orphanOptions, + false, + []string{api.FinalizerOrphan}, + }, + // cases run with DeleteOptions.OrphanDedependents=false + { + podWithOrphanFinalizer("pod5"), + nonOrphanOptions, + false, + []string{"foo.com/x", "bar.com/y"}, + }, + { + podWithOtherFinalizers("pod6"), + nonOrphanOptions, + false, + []string{"foo.com/x", "bar.com/y"}, + }, + { + podWithNoFinalizer("pod7"), + nonOrphanOptions, + true, + []string{}, + }, + { + podWithOnlyOrphanFinalizer("pod8"), + nonOrphanOptions, + true, + []string{}, + }, + // cases run with nil DeleteOptions, the finalizers are not updated. + { + podWithOrphanFinalizer("pod9"), + nil, + false, + []string{"foo.com/x", api.FinalizerOrphan, "bar.com/y"}, + }, + { + podWithOtherFinalizers("pod10"), + nil, + false, + []string{"foo.com/x", "bar.com/y"}, + }, + { + podWithNoFinalizer("pod11"), + nil, + true, + []string{}, + }, + { + podWithOnlyOrphanFinalizer("pod12"), + nil, + false, + []string{api.FinalizerOrphan}, + }, + // cases run with non-nil DeleteOptions, but nil OrphanDependents, it's treated as OrphanDependents=true + { + podWithOrphanFinalizer("pod13"), + nilOrphanOptions, + false, + []string{"foo.com/x", api.FinalizerOrphan, "bar.com/y"}, + }, + { + podWithOtherFinalizers("pod14"), + nilOrphanOptions, + false, + []string{"foo.com/x", "bar.com/y"}, + }, + { + podWithNoFinalizer("pod15"), + nilOrphanOptions, + true, + []string{}, + }, + { + podWithOnlyOrphanFinalizer("pod16"), + nilOrphanOptions, + false, + []string{api.FinalizerOrphan}, + }, + } + + testContext := api.WithNamespace(api.NewContext(), "test") + server, registry := NewTestGenericStoreRegistry(t) + defer server.Terminate(t) + + for _, tc := range testcases { + // create pod + _, err := registry.Create(testContext, tc.pod) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + _, err = registry.Delete(testContext, tc.pod.Name, tc.options) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + obj, err := registry.Get(testContext, tc.pod.Name) + if tc.expectNotFound && (err == nil || !errors.IsNotFound(err)) { + t.Fatalf("Unexpected error: %v", err) + } + if !tc.expectNotFound && err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !tc.expectNotFound { + pod, ok := obj.(*api.Pod) + if !ok { + t.Fatalf("Expect the object to be a pod, but got %#v", obj) + } + if pod.ObjectMeta.DeletionTimestamp == nil { + t.Errorf("Expect the object to have DeletionTimestamp set, but got %#v", pod.ObjectMeta) + } + if pod.ObjectMeta.DeletionGracePeriodSeconds == nil || *pod.ObjectMeta.DeletionGracePeriodSeconds != 0 { + t.Errorf("Expect the object to have 0 DeletionGracePeriodSecond, but got %#v", pod.ObjectMeta) + } + if e, a := tc.updatedFinalizers, pod.ObjectMeta.Finalizers; !reflect.DeepEqual(e, a) { + t.Errorf("Expect object %s to have finalizers %v, got %v", pod.ObjectMeta.Name, e, a) + } + } + } +} + func TestStoreDeleteCollection(t *testing.T) { podA := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} podB := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}} diff --git a/pkg/runtime/types.go b/pkg/runtime/types.go index d50f642890801..0c6d48afc4a45 100644 --- a/pkg/runtime/types.go +++ b/pkg/runtime/types.go @@ -153,6 +153,19 @@ func getNestedString(obj map[string]interface{}, fields ...string) string { return "" } +func getNestedSlice(obj map[string]interface{}, fields ...string) []string { + if m, ok := getNestedField(obj, fields...).([]interface{}); ok { + strSlice := make([]string, 0, len(m)) + for _, v := range m { + if str, ok := v.(string); ok { + strSlice = append(strSlice, str) + } + } + return strSlice + } + return nil +} + func getNestedMap(obj map[string]interface{}, fields ...string) map[string]string { if m, ok := getNestedField(obj, fields...).(map[string]interface{}); ok { strMap := make(map[string]string, len(m)) @@ -179,6 +192,14 @@ func setNestedField(obj map[string]interface{}, value interface{}, fields ...str m[fields[len(fields)-1]] = value } +func setNestedSlice(obj map[string]interface{}, value []string, fields ...string) { + m := make([]interface{}, 0, len(value)) + for _, v := range value { + m = append(m, v) + } + setNestedField(obj, m, fields...) +} + func setNestedMap(obj map[string]interface{}, value map[string]string, fields ...string) { m := make(map[string]interface{}, len(value)) for k, v := range value { @@ -194,6 +215,13 @@ func (u *Unstructured) setNestedField(value interface{}, fields ...string) { setNestedField(u.Object, value, fields...) } +func (u *Unstructured) setNestedSlice(value []string, fields ...string) { + if u.Object == nil { + u.Object = make(map[string]interface{}) + } + setNestedSlice(u.Object, value, fields...) +} + func (u *Unstructured) setNestedMap(value map[string]string, fields ...string) { if u.Object == nil { u.Object = make(map[string]interface{}) @@ -385,6 +413,14 @@ func (u *Unstructured) GroupVersionKind() unversioned.GroupVersionKind { return gvk } +func (u *Unstructured) GetFinalizers() []string { + return getNestedSlice(u.Object, "metadata", "finalizers") +} + +func (u *Unstructured) SetFinalizers(finalizers []string) { + u.setNestedSlice(finalizers, "metadata", "finalizers") +} + // UnstructuredList allows lists that do not have Golang structs // registered to be manipulated generically. This can be used to deal // with the API lists from a plug-in. diff --git a/pkg/runtime/unstructured_test.go b/pkg/runtime/unstructured_test.go index 9cdd8b89342b1..db87f78420b19 100644 --- a/pkg/runtime/unstructured_test.go +++ b/pkg/runtime/unstructured_test.go @@ -156,6 +156,10 @@ func TestUnstructuredGetters(t *testing.T) { "uid": "2", }, }, + "finalizers": []interface{}{ + "finalizer.1", + "finalizer.2", + }, }, }, } @@ -223,7 +227,10 @@ func TestUnstructuredGetters(t *testing.T) { }, } if got, want := refs, expectedOwnerReferences; !reflect.DeepEqual(got, want) { - t.Errorf("GetOwnerReference()=%v, want %v", got, want) + t.Errorf("GetOwnerReferences()=%v, want %v", got, want) + } + if got, want := unstruct.GetFinalizers(), []string{"finalizer.1", "finalizer.2"}; !reflect.DeepEqual(got, want) { + t.Errorf("GetFinalizers()=%v, want %v", got, want) } } @@ -263,6 +270,10 @@ func TestUnstructuredSetters(t *testing.T) { "uid": "2", }, }, + "finalizers": []interface{}{ + "finalizer.1", + "finalizer.2", + }, }, }, } @@ -295,6 +306,7 @@ func TestUnstructuredSetters(t *testing.T) { }, } unstruct.SetOwnerReferences(newOwnerReferences) + unstruct.SetFinalizers([]string{"finalizer.1", "finalizer.2"}) if !reflect.DeepEqual(unstruct, want) { t.Errorf("Wanted: \n%s\n Got:\n%s", want, unstruct) diff --git a/pkg/storage/interfaces.go b/pkg/storage/interfaces.go index e3f22cb849f01..89290e29abe56 100644 --- a/pkg/storage/interfaces.go +++ b/pkg/storage/interfaces.go @@ -129,7 +129,7 @@ type Interface interface { // GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType') // retrying the update until success if there is index conflict. // Note that object passed to tryUpdate may change across invocations of tryUpdate() if - // other writers are simultaneously updating it, to tryUpdate() needs to take into account + // other writers are simultaneously updating it, so tryUpdate() needs to take into account // the current contents of the object when deciding how the update object should look. // If the key doesn't exist, it will return NotFound storage error if ignoreNotFound=false // or zero value in 'ptrToType' parameter otherwise. diff --git a/test/integration/garbage_collector_test.go b/test/integration/garbage_collector_test.go index 07fa60a449c7b..e03266413033d 100644 --- a/test/integration/garbage_collector_test.go +++ b/test/integration/garbage_collector_test.go @@ -23,6 +23,7 @@ import ( "net/http" "net/http/httptest" "strconv" + "strings" "sync" "testing" "time" @@ -36,11 +37,22 @@ import ( "k8s.io/kubernetes/pkg/client/typed/dynamic" "k8s.io/kubernetes/pkg/controller/garbagecollector" "k8s.io/kubernetes/pkg/master" + "k8s.io/kubernetes/pkg/registry/generic/registry" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/test/integration/framework" ) +func getOrphanOptions() *api.DeleteOptions { + var trueVar = true + return &api.DeleteOptions{OrphanDependents: &trueVar} +} + +func getNonOrphanOptions() *api.DeleteOptions { + var falseVar = false + return &api.DeleteOptions{OrphanDependents: &falseVar} +} + const garbageCollectedPodName = "test.pod.1" const independentPodName = "test.pod.2" const oneValidOwnerPodName = "test.pod.3" @@ -49,7 +61,9 @@ const remainingRCName = "test.rc.2" func newPod(podName string, ownerReferences []v1.OwnerReference) *v1.Pod { for i := 0; i < len(ownerReferences); i++ { - ownerReferences[i].Kind = "ReplicationController" + if len(ownerReferences[i].Kind) == 0 { + ownerReferences[i].Kind = "ReplicationController" + } ownerReferences[i].APIVersion = "v1" } return &v1.Pod{ @@ -139,6 +153,9 @@ func TestCascadingDeletion(t *testing.T) { t.Log("This test is failing too much-- lavalamp removed it to stop the submit queue bleeding") return gc, clientSet := setup(t) + oldEnableGarbageCollector := registry.EnableGarbageCollector + registry.EnableGarbageCollector = true + defer func() { registry.EnableGarbageCollector = oldEnableGarbageCollector }() rcClient := clientSet.Core().ReplicationControllers(framework.TestNS) podClient := clientSet.Core().Pods(framework.TestNS) @@ -238,6 +255,9 @@ func TestCascadingDeletion(t *testing.T) { // doesn't exist. It verifies the GC will delete such an object. func TestCreateWithNonExisitentOwner(t *testing.T) { gc, clientSet := setup(t) + oldEnableGarbageCollector := registry.EnableGarbageCollector + registry.EnableGarbageCollector = true + defer func() { registry.EnableGarbageCollector = oldEnableGarbageCollector }() podClient := clientSet.Core().Pods(framework.TestNS) pod := newPod(garbageCollectedPodName, []v1.OwnerReference{{UID: "doesn't matter", Name: toBeDeletedRCName}}) @@ -269,66 +289,90 @@ func TestCreateWithNonExisitentOwner(t *testing.T) { } } -func createRemoveRCsPods(t *testing.T, clientSet clientset.Interface, id int, wg *sync.WaitGroup, rcUIDs chan types.UID) { +func setupRCsPods(t *testing.T, gc *garbagecollector.GarbageCollector, clientSet clientset.Interface, nameSuffix string, initialFinalizers []string, options *api.DeleteOptions, wg *sync.WaitGroup, rcUIDs chan types.UID) { defer wg.Done() rcClient := clientSet.Core().ReplicationControllers(framework.TestNS) podClient := clientSet.Core().Pods(framework.TestNS) // create rc. - rcName := toBeDeletedRCName + strconv.Itoa(id) - toBeDeletedRC, err := rcClient.Create(newOwnerRC(rcName)) + rcName := "test.rc." + nameSuffix + rc := newOwnerRC(rcName) + rc.ObjectMeta.Finalizers = initialFinalizers + rc, err := rcClient.Create(rc) if err != nil { t.Fatalf("Failed to create replication controller: %v", err) } - rcUIDs <- toBeDeletedRC.ObjectMeta.UID - // create pods. These pods should be cascadingly deleted. + rcUIDs <- rc.ObjectMeta.UID + // create pods. + var podUIDs []types.UID for j := 0; j < 3; j++ { - podName := garbageCollectedPodName + strconv.Itoa(id) + "-" + strconv.Itoa(j) - pod := newPod(podName, []v1.OwnerReference{{UID: toBeDeletedRC.ObjectMeta.UID, Name: rcName}}) + podName := "test.pod." + nameSuffix + "-" + strconv.Itoa(j) + pod := newPod(podName, []v1.OwnerReference{{UID: rc.ObjectMeta.UID, Name: rc.ObjectMeta.Name}}) _, err = podClient.Create(pod) if err != nil { t.Fatalf("Failed to create Pod: %v", err) } + podUIDs = append(podUIDs, pod.ObjectMeta.UID) + } + orphan := (options != nil && options.OrphanDependents != nil && *options.OrphanDependents) || (options == nil && len(initialFinalizers) != 0 && initialFinalizers[0] == api.FinalizerOrphan) + // if we intend to orphan the pods, we need wait for the gc to observe the + // creation of the pods, otherwise if the deletion of RC is observed before + // the creation of the pods, the pods will not be orphaned. + if orphan { + wait.Poll(5*time.Second, 60*time.Second, func() (bool, error) { return gc.GraphHasUID(podUIDs), nil }) } // delete the rc - if err := rcClient.Delete(rcName, nil); err != nil { + if err := rcClient.Delete(rc.ObjectMeta.Name, options); err != nil { t.Fatalf("failed to delete replication controller: %v", err) } } -func allObjectsRemoved(clientSet clientset.Interface) (bool, error) { +func verifyRemainingObjects(t *testing.T, clientSet clientset.Interface, rcNum, podNum int) (bool, error) { rcClient := clientSet.Core().ReplicationControllers(framework.TestNS) podClient := clientSet.Core().Pods(framework.TestNS) pods, err := podClient.List(api.ListOptions{}) if err != nil { return false, fmt.Errorf("Failed to list pods: %v", err) } - if len(pods.Items) != 0 { - return false, nil + var ret = true + if len(pods.Items) != podNum { + ret = false + t.Logf("expect %d pods, got %d pods", podNum, len(pods.Items)) } rcs, err := rcClient.List(api.ListOptions{}) if err != nil { return false, fmt.Errorf("Failed to list replication controllers: %v", err) } - if len(rcs.Items) != 0 { - return false, nil + if len(rcs.Items) != rcNum { + ret = false + t.Logf("expect %d RCs, got %d RCs", rcNum, len(rcs.Items)) } - return true, nil + return ret, nil } // This stress test the garbage collector func TestStressingCascadingDeletion(t *testing.T) { t.Logf("starts garbage collector stress test") gc, clientSet := setup(t) + oldEnableGarbageCollector := registry.EnableGarbageCollector + registry.EnableGarbageCollector = true + defer func() { registry.EnableGarbageCollector = oldEnableGarbageCollector }() stopCh := make(chan struct{}) go gc.Run(5, stopCh) defer close(stopCh) - const collections = 50 + const collections = 30 var wg sync.WaitGroup - wg.Add(collections) - rcUIDs := make(chan types.UID, collections) + wg.Add(collections * 4) + rcUIDs := make(chan types.UID, collections*4) for i := 0; i < collections; i++ { - go createRemoveRCsPods(t, clientSet, i, &wg, rcUIDs) + // rc is created with empty finalizers, deleted with nil delete options, pods will be deleted + go setupRCsPods(t, gc, clientSet, "collection1-"+strconv.Itoa(i), []string{}, nil, &wg, rcUIDs) + // rc is created with the orphan finalizer, deleted with nil options, pods will remain. + go setupRCsPods(t, gc, clientSet, "collection2-"+strconv.Itoa(i), []string{api.FinalizerOrphan}, nil, &wg, rcUIDs) + // rc is created with the orphan finalizer, deleted with DeleteOptions.OrphanDependents=false, pods will be deleted. + go setupRCsPods(t, gc, clientSet, "collection3-"+strconv.Itoa(i), []string{api.FinalizerOrphan}, getNonOrphanOptions(), &wg, rcUIDs) + // rc is created with empty finalizers, deleted with DeleteOptions.OrphanDependents=true, pods will remain. + go setupRCsPods(t, gc, clientSet, "collection4-"+strconv.Itoa(i), []string{}, getOrphanOptions(), &wg, rcUIDs) } wg.Wait() t.Logf("all pods are created, all replications controllers are created then deleted") @@ -339,14 +383,29 @@ func TestStressingCascadingDeletion(t *testing.T) { t.Fatal(err) } t.Logf("garbage collector queues drained") - // wait for all replication controllers and pods to be deleted. This - // shouldn't take long, because the queues are already drained. + // wait for the RCs and Pods to reach the expected numbers. This shouldn't + // take long, because the queues are already drained. if err := wait.Poll(5*time.Second, 30*time.Second, func() (bool, error) { - return allObjectsRemoved(clientSet) + podsInEachCollection := 3 + // see the comments on the calls to setupRCsPods for details + remainingGroups := 2 + return verifyRemainingObjects(t, clientSet, 0, collections*podsInEachCollection*remainingGroups) }); err != nil { t.Fatal(err) } - t.Logf("all replication controllers and pods are deleted") + t.Logf("number of remaining replication controllers and pods are as expected") + + // verify the remaining pods all have "orphan" in their names. + podClient := clientSet.Core().Pods(framework.TestNS) + pods, err := podClient.List(api.ListOptions{}) + if err != nil { + t.Fatal(err) + } + for _, pod := range pods.Items { + if !strings.Contains(pod.ObjectMeta.Name, "collection2-") && !strings.Contains(pod.ObjectMeta.Name, "collection4-") { + t.Errorf("got unexpected remaining pod: %#v", pod) + } + } // verify there is no node representing replication controllers in the gc's graph uids := make([]types.UID, 0, collections) @@ -358,3 +417,70 @@ func TestStressingCascadingDeletion(t *testing.T) { t.Errorf("Expect all nodes representing replication controllers are removed from the Propagator's graph") } } + +func TestOrphaning(t *testing.T) { + gc, clientSet := setup(t) + oldEnableGarbageCollector := registry.EnableGarbageCollector + registry.EnableGarbageCollector = true + defer func() { registry.EnableGarbageCollector = oldEnableGarbageCollector }() + podClient := clientSet.Core().Pods(framework.TestNS) + rcClient := clientSet.Core().ReplicationControllers(framework.TestNS) + // create the RC with the orphan finalizer set + toBeDeletedRC := newOwnerRC(toBeDeletedRCName) + toBeDeletedRC, err := rcClient.Create(toBeDeletedRC) + if err != nil { + t.Fatalf("Failed to create replication controller: %v", err) + } + + // these pods should be ophaned. + var podUIDs []types.UID + podsNum := 3 + for i := 0; i < podsNum; i++ { + podName := garbageCollectedPodName + strconv.Itoa(i) + pod := newPod(podName, []v1.OwnerReference{{UID: toBeDeletedRC.ObjectMeta.UID, Name: toBeDeletedRCName}}) + _, err = podClient.Create(pod) + if err != nil { + t.Fatalf("Failed to create Pod: %v", err) + } + podUIDs = append(podUIDs, pod.ObjectMeta.UID) + } + stopCh := make(chan struct{}) + go gc.Run(5, stopCh) + defer close(stopCh) + + // we need wait for the gc to observe the creation of the pods, otherwise if + // the deletion of RC is observed before the creation of the pods, the pods + // will not be orphaned. + wait.Poll(5*time.Second, 60*time.Second, func() (bool, error) { return gc.GraphHasUID(podUIDs), nil }) + + err = rcClient.Delete(toBeDeletedRCName, getOrphanOptions()) + if err != nil { + t.Fatalf("Failed to gracefully delete the rc: %v", err) + } + + // wait for the garbage collector to drain its queue + if err := wait.Poll(10*time.Second, 300*time.Second, func() (bool, error) { + return gc.QueuesDrained(), nil + }); err != nil { + t.Fatal(err) + } + + // verify pods don't have the ownerPod as an owner anymore + pods, err := podClient.List(api.ListOptions{}) + if err != nil { + t.Fatalf("Failed to list pods: %v", err) + } + if len(pods.Items) != podsNum { + t.Errorf("Expect %d pod(s), but got %#v", podsNum, pods) + } + for _, pod := range pods.Items { + if len(pod.ObjectMeta.OwnerReferences) != 0 { + t.Errorf("pod %s still has non-empty OwnerRefereces: %v", pod.ObjectMeta.Name, pod.ObjectMeta.OwnerReferences) + } + } + // verify the toBeDeleteRC is deleted + rcs, err := rcClient.List(api.ListOptions{}) + if len(rcs.Items) != 0 { + t.Errorf("Expect RCs to be deleted, but got %#v", rcs.Items) + } +}