diff --git a/contrib/mesos/pkg/queue/historical.go b/contrib/mesos/pkg/queue/historical.go index 901dbb6ad8aaf..edd9419d531a9 100644 --- a/contrib/mesos/pkg/queue/historical.go +++ b/contrib/mesos/pkg/queue/historical.go @@ -377,6 +377,12 @@ func (f *HistoricalFIFO) merge(id string, obj UniqueCopyable) (notifications []E return } +// Resync will touch all objects to put them into the processing queue +func (f *HistoricalFIFO) Resync() error { + // Nothing to do + return nil +} + // NewHistorical returns a Store which can be used to queue up items to // process. If a non-nil Mux is provided, then modifications to the // the FIFO are delivered on a channel specific to this fifo. diff --git a/pkg/client/cache/delta_fifo.go b/pkg/client/cache/delta_fifo.go index e5dce16b61e0e..800d952bb2ce4 100644 --- a/pkg/client/cache/delta_fifo.go +++ b/pkg/client/cache/delta_fifo.go @@ -306,6 +306,10 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err func (f *DeltaFIFO) List() []interface{} { f.lock.RLock() defer f.lock.RUnlock() + return f.listLocked() +} + +func (f *DeltaFIFO) listLocked() []interface{} { list := make([]interface{}, 0, len(f.items)) for _, item := range f.items { // Copy item's slice so operations on this slice (delta @@ -452,6 +456,27 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error { return nil } +// Resync will send a sync event for each item +func (f *DeltaFIFO) Resync() error { + f.lock.RLock() + defer f.lock.RUnlock() + for _, k := range f.knownObjects.ListKeys() { + obj, exists, err := f.knownObjects.GetByKey(k) + if err != nil { + glog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, k) + continue + } else if !exists { + glog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", k) + continue + } + + if err := f.queueActionLocked(Sync, obj); err != nil { + return fmt.Errorf("couldn't queue object: %v", err) + } + } + return nil +} + // A KeyListerGetter is anything that knows how to list its keys and look up by key. type KeyListerGetter interface { KeyLister diff --git a/pkg/client/cache/expiration_cache.go b/pkg/client/cache/expiration_cache.go index 964deda079507..ad8684e8c377b 100644 --- a/pkg/client/cache/expiration_cache.go +++ b/pkg/client/cache/expiration_cache.go @@ -146,6 +146,7 @@ func (c *ExpirationCache) ListKeys() []string { func (c *ExpirationCache) Add(obj interface{}) error { c.expirationLock.Lock() defer c.expirationLock.Unlock() + key, err := c.keyFunc(obj) if err != nil { return KeyError{obj, err} @@ -191,6 +192,11 @@ func (c *ExpirationCache) Replace(list []interface{}, resourceVersion string) er return nil } +// Resync will touch all objects to put them into the processing queue +func (c *ExpirationCache) Resync() error { + return c.cacheStorage.Resync() +} + // NewTTLStore creates and returns a ExpirationCache with a TTLPolicy func NewTTLStore(keyFunc KeyFunc, ttl time.Duration) Store { return &ExpirationCache{ diff --git a/pkg/client/cache/fake_custom_store.go b/pkg/client/cache/fake_custom_store.go new file mode 100644 index 0000000000000..ccd69ef7bfdd3 --- /dev/null +++ b/pkg/client/cache/fake_custom_store.go @@ -0,0 +1,102 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +// FakeStore lets you define custom functions for store operations +type FakeCustomStore struct { + AddFunc func(obj interface{}) error + UpdateFunc func(obj interface{}) error + DeleteFunc func(obj interface{}) error + ListFunc func() []interface{} + ListKeysFunc func() []string + GetFunc func(obj interface{}) (item interface{}, exists bool, err error) + GetByKeyFunc func(key string) (item interface{}, exists bool, err error) + ReplaceFunc func(list []interface{}, resourceVerion string) error + ResyncFunc func() error +} + +// Add calls the custom Add function if defined +func (f *FakeCustomStore) Add(obj interface{}) error { + if f.AddFunc != nil { + return f.AddFunc(obj) + } + return nil +} + +// Update calls the custom Update function if defined +func (f *FakeCustomStore) Update(obj interface{}) error { + if f.UpdateFunc != nil { + return f.Update(obj) + } + return nil +} + +// Delete calls the custom Delete function if defined +func (f *FakeCustomStore) Delete(obj interface{}) error { + if f.DeleteFunc != nil { + return f.DeleteFunc(obj) + } + return nil +} + +// List calls the custom List function if defined +func (f *FakeCustomStore) List() []interface{} { + if f.ListFunc != nil { + return f.ListFunc() + } + return nil +} + +// ListKeys calls the custom ListKeys function if defined +func (f *FakeCustomStore) ListKeys() []string { + if f.ListKeysFunc != nil { + return f.ListKeysFunc() + } + return nil +} + +// Get calls the custom Get function if defined +func (f *FakeCustomStore) Get(obj interface{}) (item interface{}, exists bool, err error) { + if f.GetFunc != nil { + return f.GetFunc(obj) + } + return nil, false, nil +} + +// GetByKey calls the custom GetByKey function if defined +func (f *FakeCustomStore) GetByKey(key string) (item interface{}, exists bool, err error) { + if f.GetByKeyFunc != nil { + return f.GetByKeyFunc(key) + } + return nil, false, nil +} + +// Replace calls the custom Replace function if defined +func (f *FakeCustomStore) Replace(list []interface{}, resourceVersion string) error { + if f.ReplaceFunc != nil { + return f.ReplaceFunc(list, resourceVersion) + } + return nil +} + +// Resync calls the custom Resync function if defined +func (f *FakeCustomStore) Resync() error { + if f.ResyncFunc != nil { + return f.ResyncFunc() + } + return nil +} diff --git a/pkg/client/cache/fifo.go b/pkg/client/cache/fifo.go index d4076a326d812..f98bea6f44537 100644 --- a/pkg/client/cache/fifo.go +++ b/pkg/client/cache/fifo.go @@ -18,6 +18,8 @@ package cache import ( "sync" + + "k8s.io/kubernetes/pkg/util/sets" ) // Queue is exactly like a Store, but has a Pop() method too. @@ -241,6 +243,26 @@ func (f *FIFO) Replace(list []interface{}, resourceVersion string) error { return nil } +// Resync will touch all objects to put them into the processing queue +func (f *FIFO) Resync() error { + f.lock.Lock() + defer f.lock.Unlock() + + inQueue := sets.NewString() + for _, id := range f.queue { + inQueue.Insert(id) + } + for id := range f.items { + if !inQueue.Has(id) { + f.queue = append(f.queue, id) + } + } + if len(f.queue) > 0 { + f.cond.Broadcast() + } + return nil +} + // NewFIFO returns a Store which can be used to queue up items to // process. func NewFIFO(keyFunc KeyFunc) *FIFO { diff --git a/pkg/client/cache/reflector.go b/pkg/client/cache/reflector.go index e4d467534d7cd..159f35f4d110c 100644 --- a/pkg/client/cache/reflector.go +++ b/pkg/client/cache/reflector.go @@ -337,15 +337,22 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { } return nil } + if err := r.watchHandler(w, &resourceVersion, resyncCh, stopCh); err != nil { if err != errorResyncRequested && err != errorStopRequested { glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err) } - return nil + if err != errorResyncRequested { + return nil + } } if r.canForceResyncNow() { glog.V(4).Infof("%s: next resync planned for %#v, forcing now", r.name, r.nextResync) - return nil + if err := r.store.Resync(); err != nil { + return err + } + cleanup() + resyncCh, cleanup = r.resyncChan() } } } diff --git a/pkg/client/cache/reflector_test.go b/pkg/client/cache/reflector_test.go index 223769adea01b..8937ce17d9aa3 100644 --- a/pkg/client/cache/reflector_test.go +++ b/pkg/client/cache/reflector_test.go @@ -32,14 +32,14 @@ import ( type testLW struct { ListFunc func() (runtime.Object, error) - WatchFunc func(resourceVersion string) (watch.Interface, error) + WatchFunc func(options api.ListOptions) (watch.Interface, error) } func (t *testLW) List(options api.ListOptions) (runtime.Object, error) { return t.ListFunc() } func (t *testLW) Watch(options api.ListOptions) (watch.Interface, error) { - return t.WatchFunc(options.ResourceVersion) + return t.WatchFunc(options) } func TestCloseWatchChannelOnError(t *testing.T) { @@ -47,7 +47,7 @@ func TestCloseWatchChannelOnError(t *testing.T) { pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}} fw := watch.NewFake() r.listerWatcher = &testLW{ - WatchFunc: func(rv string) (watch.Interface, error) { + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return fw, nil }, ListFunc: func() (runtime.Object, error) { @@ -73,7 +73,7 @@ func TestRunUntil(t *testing.T) { r := NewReflector(&testLW{}, &api.Pod{}, store, 0) fw := watch.NewFake() r.listerWatcher = &testLW{ - WatchFunc: func(rv string) (watch.Interface, error) { + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return fw, nil }, ListFunc: func() (runtime.Object, error) { @@ -191,19 +191,6 @@ func TestReflectorWatchHandler(t *testing.T) { } } -func TestReflectorWatchHandlerTimeout(t *testing.T) { - s := NewStore(MetaNamespaceKeyFunc) - g := NewReflector(&testLW{}, &api.Pod{}, s, 0) - fw := watch.NewFake() - var resumeRV string - exit := make(chan time.Time, 1) - exit <- time.Now() - err := g.watchHandler(fw, &resumeRV, exit, wait.NeverStop) - if err != errorResyncRequested { - t.Errorf("expected timeout error, but got %q", err) - } -} - func TestReflectorStopWatch(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) g := NewReflector(&testLW{}, &api.Pod{}, s, 0) @@ -225,7 +212,8 @@ func TestReflectorListAndWatch(t *testing.T) { // inject an error. expectedRVs := []string{"1", "3"} lw := &testLW{ - WatchFunc: func(rv string) (watch.Interface, error) { + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + rv := options.ResourceVersion fw := watch.NewFake() if e, a := expectedRVs[0], rv; e != a { t.Errorf("Expected rv %v, but got %v", e, a) @@ -340,7 +328,7 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) { } watchRet, watchErr := item.events, item.watchErr lw := &testLW{ - WatchFunc: func(rv string) (watch.Interface, error) { + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { if watchErr != nil { return nil, watchErr } @@ -364,40 +352,30 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) { } func TestReflectorResync(t *testing.T) { - s := NewStore(MetaNamespaceKeyFunc) - - currentTime := time.Time{} iteration := 0 + stopCh := make(chan struct{}) + s := &FakeCustomStore{ + ResyncFunc: func() error { + iteration++ + if iteration == 2 { + close(stopCh) + } + return nil + }, + } lw := &testLW{ - WatchFunc: func(rv string) (watch.Interface, error) { - if iteration == 0 { - // Move time, but do not force resync. - currentTime = currentTime.Add(30 * time.Second) - } else if iteration == 1 { - // Move time to force resync. - currentTime = currentTime.Add(28 * time.Second) - } else if iteration >= 2 { - t.Fatalf("should have forced resync earlier") - } - iteration++ + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { fw := watch.NewFake() - // Send something to the watcher to avoid "watch too short" errors. - go func() { - fw.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", ResourceVersion: strconv.Itoa(iteration)}}) - fw.Stop() - }() return fw, nil }, ListFunc: func() (runtime.Object, error) { return &api.PodList{ListMeta: unversioned.ListMeta{ResourceVersion: "0"}}, nil }, } - resyncPeriod := time.Minute + resyncPeriod := 1 * time.Millisecond r := NewReflector(lw, &api.Pod{}, s, resyncPeriod) - r.now = func() time.Time { return currentTime } - - r.ListAndWatch(wait.NeverStop) + r.ListAndWatch(stopCh) if iteration != 2 { t.Errorf("exactly 2 iterations were expected, got: %v", iteration) } diff --git a/pkg/client/cache/store.go b/pkg/client/cache/store.go index a3b7c92ddc559..71115f2ce5dd7 100644 --- a/pkg/client/cache/store.go +++ b/pkg/client/cache/store.go @@ -44,6 +44,7 @@ type Store interface { // given list. Store takes ownership of the list, you should not reference // it after calling this function. Replace([]interface{}, string) error + Resync() error } // KeyFunc knows how to make a key from an object. Implementations should be deterministic. @@ -217,6 +218,11 @@ func (c *cache) Replace(list []interface{}, resourceVersion string) error { return nil } +// Resync touches all items in the store to force processing +func (c *cache) Resync() error { + return c.cacheStorage.Resync() +} + // NewStore returns a Store implemented simply with a map and a lock. func NewStore(keyFunc KeyFunc) Store { return &cache{ diff --git a/pkg/client/cache/thread_safe_store.go b/pkg/client/cache/thread_safe_store.go index 6cab6861a856d..11077e25b2eec 100644 --- a/pkg/client/cache/thread_safe_store.go +++ b/pkg/client/cache/thread_safe_store.go @@ -50,6 +50,7 @@ type ThreadSafeStore interface { // AddIndexers adds more indexers to this store. If you call this after you already have data // in the store, the results are undefined. AddIndexers(newIndexers Indexers) error + Resync() error } // threadSafeMap implements ThreadSafeStore @@ -272,6 +273,11 @@ func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) error { return nil } +func (c *threadSafeMap) Resync() error { + // Nothing to do + return nil +} + func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore { return &threadSafeMap{ items: map[string]interface{}{}, diff --git a/pkg/controller/framework/controller_test.go b/pkg/controller/framework/controller_test.go index fb132f8f3c1ee..b17aba4ab679e 100644 --- a/pkg/controller/framework/controller_test.go +++ b/pkg/controller/framework/controller_test.go @@ -29,10 +29,23 @@ import ( "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/watch" "github.com/google/gofuzz" ) +type testLW struct { + ListFunc func(options api.ListOptions) (runtime.Object, error) + WatchFunc func(options api.ListOptions) (watch.Interface, error) +} + +func (t *testLW) List(options api.ListOptions) (runtime.Object, error) { + return t.ListFunc(options) +} +func (t *testLW) Watch(options api.ListOptions) (watch.Interface, error) { + return t.WatchFunc(options) +} + func Example() { // source simulates an apiserver object endpoint. source := framework.NewFakeControllerSource() @@ -295,18 +308,15 @@ func TestUpdate(t *testing.T) { source := framework.NewFakeControllerSource() const ( - FROM = "from" - ADD_MISSED = "missed the add event" - TO = "to" + FROM = "from" + TO = "to" ) // These are the transitions we expect to see; because this is // asynchronous, there are a lot of valid possibilities. type pair struct{ from, to string } allowedTransitions := map[pair]bool{ - pair{FROM, TO}: true, - pair{FROM, ADD_MISSED}: true, - pair{ADD_MISSED, TO}: true, + pair{FROM, TO}: true, // Because a resync can happen when we've already observed one // of the above but before the item is deleted. @@ -337,21 +347,6 @@ func TestUpdate(t *testing.T) { source.Add(pod(name, FROM, false)) source.Modify(pod(name, TO, true)) }, - func(name string) { - name = "b-" + name - source.Add(pod(name, FROM, false)) - source.ModifyDropWatch(pod(name, TO, true)) - }, - func(name string) { - name = "c-" + name - source.AddDropWatch(pod(name, FROM, false)) - source.Modify(pod(name, ADD_MISSED, false)) - source.Modify(pod(name, TO, true)) - }, - func(name string) { - name = "d-" + name - source.Add(pod(name, FROM, true)) - }, } const threads = 3 @@ -362,10 +357,20 @@ func TestUpdate(t *testing.T) { // Make a controller that deletes things once it observes an update. // It calls Done() on the wait group on deletions so we can tell when // everything we've added has been deleted. + watchCh := make(chan struct{}) _, controller := framework.NewInformer( - source, + &testLW{ + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + watch, err := source.Watch(options) + close(watchCh) + return watch, err + }, + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return source.List(options) + }, + }, &api.Pod{}, - time.Millisecond*1, + 0, framework.ResourceEventHandlerFuncs{ UpdateFunc: func(oldObj, newObj interface{}) { o, n := oldObj.(*api.Pod), newObj.(*api.Pod) @@ -388,6 +393,7 @@ func TestUpdate(t *testing.T) { // all testDoneWG.Add() calls must happen before this point stop := make(chan struct{}) go controller.Run(stop) + <-watchCh // run every test a few times, in parallel var wg sync.WaitGroup diff --git a/pkg/storage/watch_cache.go b/pkg/storage/watch_cache.go index 3b34479fa2ac8..3e5ce5d73003f 100644 --- a/pkg/storage/watch_cache.go +++ b/pkg/storage/watch_cache.go @@ -324,3 +324,8 @@ func (w *watchCache) GetAllEventsSince(resourceVersion uint64) ([]watchCacheEven defer w.RUnlock() return w.GetAllEventsSinceThreadUnsafe(resourceVersion) } + +func (w *watchCache) Resync() error { + // Nothing to do + return nil +}