Skip to content

Commit

Permalink
Merge pull request kubernetes#24142 from rrati/controller-sync-interv…
Browse files Browse the repository at this point in the history
…al-23394

Automatic merge from submit-queue

Separated resync and relist functionality in reflector kubernetes#23394

controller-manager kubernetes#23394
  • Loading branch information
k8s-merge-robot committed May 19, 2016
2 parents d33fa39 + e388c13 commit 044d55e
Show file tree
Hide file tree
Showing 11 changed files with 236 additions and 67 deletions.
6 changes: 6 additions & 0 deletions contrib/mesos/pkg/queue/historical.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,12 @@ func (f *HistoricalFIFO) merge(id string, obj UniqueCopyable) (notifications []E
return
}

// Resync will touch all objects to put them into the processing queue
func (f *HistoricalFIFO) Resync() error {
// Nothing to do
return nil
}

// NewHistorical returns a Store which can be used to queue up items to
// process. If a non-nil Mux is provided, then modifications to the
// the FIFO are delivered on a channel specific to this fifo.
Expand Down
25 changes: 25 additions & 0 deletions pkg/client/cache/delta_fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,10 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
func (f *DeltaFIFO) List() []interface{} {
f.lock.RLock()
defer f.lock.RUnlock()
return f.listLocked()
}

func (f *DeltaFIFO) listLocked() []interface{} {
list := make([]interface{}, 0, len(f.items))
for _, item := range f.items {
// Copy item's slice so operations on this slice (delta
Expand Down Expand Up @@ -452,6 +456,27 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
return nil
}

// Resync will send a sync event for each item
func (f *DeltaFIFO) Resync() error {
f.lock.RLock()
defer f.lock.RUnlock()
for _, k := range f.knownObjects.ListKeys() {
obj, exists, err := f.knownObjects.GetByKey(k)
if err != nil {
glog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, k)
continue
} else if !exists {
glog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", k)
continue
}

if err := f.queueActionLocked(Sync, obj); err != nil {
return fmt.Errorf("couldn't queue object: %v", err)
}
}
return nil
}

// A KeyListerGetter is anything that knows how to list its keys and look up by key.
type KeyListerGetter interface {
KeyLister
Expand Down
6 changes: 6 additions & 0 deletions pkg/client/cache/expiration_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func (c *ExpirationCache) ListKeys() []string {
func (c *ExpirationCache) Add(obj interface{}) error {
c.expirationLock.Lock()
defer c.expirationLock.Unlock()

key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
Expand Down Expand Up @@ -191,6 +192,11 @@ func (c *ExpirationCache) Replace(list []interface{}, resourceVersion string) er
return nil
}

// Resync will touch all objects to put them into the processing queue
func (c *ExpirationCache) Resync() error {
return c.cacheStorage.Resync()
}

// NewTTLStore creates and returns a ExpirationCache with a TTLPolicy
func NewTTLStore(keyFunc KeyFunc, ttl time.Duration) Store {
return &ExpirationCache{
Expand Down
102 changes: 102 additions & 0 deletions pkg/client/cache/fake_custom_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cache

// FakeStore lets you define custom functions for store operations
type FakeCustomStore struct {
AddFunc func(obj interface{}) error
UpdateFunc func(obj interface{}) error
DeleteFunc func(obj interface{}) error
ListFunc func() []interface{}
ListKeysFunc func() []string
GetFunc func(obj interface{}) (item interface{}, exists bool, err error)
GetByKeyFunc func(key string) (item interface{}, exists bool, err error)
ReplaceFunc func(list []interface{}, resourceVerion string) error
ResyncFunc func() error
}

// Add calls the custom Add function if defined
func (f *FakeCustomStore) Add(obj interface{}) error {
if f.AddFunc != nil {
return f.AddFunc(obj)
}
return nil
}

// Update calls the custom Update function if defined
func (f *FakeCustomStore) Update(obj interface{}) error {
if f.UpdateFunc != nil {
return f.Update(obj)
}
return nil
}

// Delete calls the custom Delete function if defined
func (f *FakeCustomStore) Delete(obj interface{}) error {
if f.DeleteFunc != nil {
return f.DeleteFunc(obj)
}
return nil
}

// List calls the custom List function if defined
func (f *FakeCustomStore) List() []interface{} {
if f.ListFunc != nil {
return f.ListFunc()
}
return nil
}

// ListKeys calls the custom ListKeys function if defined
func (f *FakeCustomStore) ListKeys() []string {
if f.ListKeysFunc != nil {
return f.ListKeysFunc()
}
return nil
}

// Get calls the custom Get function if defined
func (f *FakeCustomStore) Get(obj interface{}) (item interface{}, exists bool, err error) {
if f.GetFunc != nil {
return f.GetFunc(obj)
}
return nil, false, nil
}

// GetByKey calls the custom GetByKey function if defined
func (f *FakeCustomStore) GetByKey(key string) (item interface{}, exists bool, err error) {
if f.GetByKeyFunc != nil {
return f.GetByKeyFunc(key)
}
return nil, false, nil
}

// Replace calls the custom Replace function if defined
func (f *FakeCustomStore) Replace(list []interface{}, resourceVersion string) error {
if f.ReplaceFunc != nil {
return f.ReplaceFunc(list, resourceVersion)
}
return nil
}

// Resync calls the custom Resync function if defined
func (f *FakeCustomStore) Resync() error {
if f.ResyncFunc != nil {
return f.ResyncFunc()
}
return nil
}
22 changes: 22 additions & 0 deletions pkg/client/cache/fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package cache

import (
"sync"

"k8s.io/kubernetes/pkg/util/sets"
)

// Queue is exactly like a Store, but has a Pop() method too.
Expand Down Expand Up @@ -241,6 +243,26 @@ func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {
return nil
}

// Resync will touch all objects to put them into the processing queue
func (f *FIFO) Resync() error {
f.lock.Lock()
defer f.lock.Unlock()

inQueue := sets.NewString()
for _, id := range f.queue {
inQueue.Insert(id)
}
for id := range f.items {
if !inQueue.Has(id) {
f.queue = append(f.queue, id)
}
}
if len(f.queue) > 0 {
f.cond.Broadcast()
}
return nil
}

// NewFIFO returns a Store which can be used to queue up items to
// process.
func NewFIFO(keyFunc KeyFunc) *FIFO {
Expand Down
11 changes: 9 additions & 2 deletions pkg/client/cache/reflector.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,15 +337,22 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
}
return nil
}

if err := r.watchHandler(w, &resourceVersion, resyncCh, stopCh); err != nil {
if err != errorResyncRequested && err != errorStopRequested {
glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
}
return nil
if err != errorResyncRequested {
return nil
}
}
if r.canForceResyncNow() {
glog.V(4).Infof("%s: next resync planned for %#v, forcing now", r.name, r.nextResync)
return nil
if err := r.store.Resync(); err != nil {
return err
}
cleanup()
resyncCh, cleanup = r.resyncChan()
}
}
}
Expand Down
62 changes: 20 additions & 42 deletions pkg/client/cache/reflector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,22 @@ import (

type testLW struct {
ListFunc func() (runtime.Object, error)
WatchFunc func(resourceVersion string) (watch.Interface, error)
WatchFunc func(options api.ListOptions) (watch.Interface, error)
}

func (t *testLW) List(options api.ListOptions) (runtime.Object, error) {
return t.ListFunc()
}
func (t *testLW) Watch(options api.ListOptions) (watch.Interface, error) {
return t.WatchFunc(options.ResourceVersion)
return t.WatchFunc(options)
}

func TestCloseWatchChannelOnError(t *testing.T) {
r := NewReflector(&testLW{}, &api.Pod{}, NewStore(MetaNamespaceKeyFunc), 0)
pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}}
fw := watch.NewFake()
r.listerWatcher = &testLW{
WatchFunc: func(rv string) (watch.Interface, error) {
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return fw, nil
},
ListFunc: func() (runtime.Object, error) {
Expand All @@ -73,7 +73,7 @@ func TestRunUntil(t *testing.T) {
r := NewReflector(&testLW{}, &api.Pod{}, store, 0)
fw := watch.NewFake()
r.listerWatcher = &testLW{
WatchFunc: func(rv string) (watch.Interface, error) {
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return fw, nil
},
ListFunc: func() (runtime.Object, error) {
Expand Down Expand Up @@ -191,19 +191,6 @@ func TestReflectorWatchHandler(t *testing.T) {
}
}

func TestReflectorWatchHandlerTimeout(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&testLW{}, &api.Pod{}, s, 0)
fw := watch.NewFake()
var resumeRV string
exit := make(chan time.Time, 1)
exit <- time.Now()
err := g.watchHandler(fw, &resumeRV, exit, wait.NeverStop)
if err != errorResyncRequested {
t.Errorf("expected timeout error, but got %q", err)
}
}

func TestReflectorStopWatch(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&testLW{}, &api.Pod{}, s, 0)
Expand All @@ -225,7 +212,8 @@ func TestReflectorListAndWatch(t *testing.T) {
// inject an error.
expectedRVs := []string{"1", "3"}
lw := &testLW{
WatchFunc: func(rv string) (watch.Interface, error) {
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
rv := options.ResourceVersion
fw := watch.NewFake()
if e, a := expectedRVs[0], rv; e != a {
t.Errorf("Expected rv %v, but got %v", e, a)
Expand Down Expand Up @@ -340,7 +328,7 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) {
}
watchRet, watchErr := item.events, item.watchErr
lw := &testLW{
WatchFunc: func(rv string) (watch.Interface, error) {
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
if watchErr != nil {
return nil, watchErr
}
Expand All @@ -364,40 +352,30 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) {
}

func TestReflectorResync(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc)

currentTime := time.Time{}
iteration := 0
stopCh := make(chan struct{})
s := &FakeCustomStore{
ResyncFunc: func() error {
iteration++
if iteration == 2 {
close(stopCh)
}
return nil
},
}

lw := &testLW{
WatchFunc: func(rv string) (watch.Interface, error) {
if iteration == 0 {
// Move time, but do not force resync.
currentTime = currentTime.Add(30 * time.Second)
} else if iteration == 1 {
// Move time to force resync.
currentTime = currentTime.Add(28 * time.Second)
} else if iteration >= 2 {
t.Fatalf("should have forced resync earlier")
}
iteration++
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
fw := watch.NewFake()
// Send something to the watcher to avoid "watch too short" errors.
go func() {
fw.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", ResourceVersion: strconv.Itoa(iteration)}})
fw.Stop()
}()
return fw, nil
},
ListFunc: func() (runtime.Object, error) {
return &api.PodList{ListMeta: unversioned.ListMeta{ResourceVersion: "0"}}, nil
},
}
resyncPeriod := time.Minute
resyncPeriod := 1 * time.Millisecond
r := NewReflector(lw, &api.Pod{}, s, resyncPeriod)
r.now = func() time.Time { return currentTime }

r.ListAndWatch(wait.NeverStop)
r.ListAndWatch(stopCh)
if iteration != 2 {
t.Errorf("exactly 2 iterations were expected, got: %v", iteration)
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/client/cache/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Store interface {
// given list. Store takes ownership of the list, you should not reference
// it after calling this function.
Replace([]interface{}, string) error
Resync() error
}

// KeyFunc knows how to make a key from an object. Implementations should be deterministic.
Expand Down Expand Up @@ -217,6 +218,11 @@ func (c *cache) Replace(list []interface{}, resourceVersion string) error {
return nil
}

// Resync touches all items in the store to force processing
func (c *cache) Resync() error {
return c.cacheStorage.Resync()
}

// NewStore returns a Store implemented simply with a map and a lock.
func NewStore(keyFunc KeyFunc) Store {
return &cache{
Expand Down
Loading

0 comments on commit 044d55e

Please sign in to comment.