Reflector watches a specified resource and causes all changes to be reflected in the given store.
// Reflector watches a specified resource and causes all changes to be reflected in the given store.
type Reflector struct {
name string
expectedTypeName string
expectedType reflect.Type
expectedGVK *schema.GroupVersionKind
store Store
listerWatcher ListerWatcher
backoffManager wait.BackoffManager
initConnBackoffManager wait.BackoffManager
resyncPeriod time.Duration
ShouldResync func() bool
clock clock.Clock
paginatedResult bool
lastSyncResourceVersion string
isLastSyncResourceVersionUnavailable bool
lastSyncResourceVersionMutex sync.RWMutex
WatchListPageSize int64
watchErrorHandler WatchErrorHandler
}
-
store
: DeltaFIFO can be used for store. -
reflector.ListAndWatch
function is called inRun
. (tools/cache/reflector.go#L223)// Run repeatedly uses the reflector's ListAndWatch to fetch all the // objects and subsequent deltas. // Run will exit when stopCh is closed. func (r *Reflector) Run(stopCh <-chan struct{}) { klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) wait.BackoffUntil(func() { if err := r.ListAndWatch(stopCh); err != nil { r.watchErrorHandler(r, err) } }, r.backoffManager, true, stopCh) klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) }
wait.BackoffUntil: BackoffUntil loops until stop channel is closed, run f every duration given by BackoffManager. If sliding is true, the period is computed after f runs. If it is false then period includes the runtime for f.
-
r.ListAndWatch
function calls- Call
list
func (reflector.go#357)- Get resourceVersion (v1.meta/ListMeta - The metadata.resourceVersion of a resource collection (the response to a list) identifies the resource version at which the collection was constructed.) from api concept)
- Get all items and call syncWith(items, resourceVersion)
- syncWith replaces the store's items with the given items.
r.store.Replace(found, resourceVersion)
- In a for loop, call
r.store.Resync()
periodically if resync is necessary. - In a for loop, call listerwatcher.Watch
w, err := listerwatcher.Watch()
- Call the watchHandler
->
watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.expectedTypeName, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh)
store.Add
,store.Update
,store.Delete
- Call
TBD