diff --git a/cache/cache.go b/cache/cache.go index c16c3b56e..b61ac07d6 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -17,32 +17,46 @@ limitations under the License. package cache import ( + "cmp" + "slices" + "sort" "sync" "time" ) +// noExpiration is a sentinel value used to indicate no expiration time. +// It is used instead of 0, to be able to sort items by expiration time ascending. +// 10 years is a good enough approximation for "no expiration" time for a cache. +const noExpiration = time.Second * 86400 * 365 * 10 // 10 years + // Cache[T] is a thread-safe in-memory key/object store. // It can be used to store objects with optional expiration. type Cache[T any] struct { *cache[T] - // keyFunc is used to make the key for objects stored in and retrieved from items, and + // keyFunc is used to make the key for objects stored in and retrieved from index, and // should be deterministic. keyFunc KeyFunc[T] } -// Item is an item stored in the cache. -type Item[T any] struct { - // Object is the item's object. - Object T - // Expiration is the item's expiration time. - Expiration int64 +// item is an item stored in the cache. +type item[T any] struct { + key string + // object is the item's object. + object T + // expiration is the item's expiration time. + expiration int64 } type cache[T any] struct { - // Items holds the elements in the cache. - Items map[string]Item[T] - // MaxItems is the maximum number of items the cache can hold. - MaxItems int + // index holds the cache index. + index map[string]*item[T] + // items is the store of elements in the cache. + items []*item[T] + // sorted indicates whether the items are sorted by expiration time. + // It is initially true, and set to false when the items are not sorted. + sorted bool + // capacity is the maximum number of index the cache can hold. + capacity int metrics *cacheMetrics labelsFunc GetLvsFunc[T] janitor *janitor[T] @@ -54,8 +68,8 @@ type cache[T any] struct { var _ Expirable[any] = &Cache[any]{} // New creates a new cache with the given configuration. -func New[T any](maxItems int, keyFunc KeyFunc[T], opts ...Options[T]) (*Cache[T], error) { - opt := cacheOptions[T]{} +func New[T any](capacity int, keyFunc KeyFunc[T], opts ...Options[T]) (*Cache[T], error) { + opt := storeOptions[T]{} for _, o := range opts { err := o(&opt) if err != nil { @@ -66,9 +80,12 @@ func New[T any](maxItems int, keyFunc KeyFunc[T], opts ...Options[T]) (*Cache[T] return nil, ErrNoRegisterer } c := &cache[T]{ - Items: make(map[string]Item[T]), - MaxItems: maxItems, - metrics: newCacheMetrics(opt.registerer, opt.extraLabels...), + index: make(map[string]*item[T]), + items: make([]*item[T], 0, capacity), + sorted: true, + capacity: capacity, + metrics: newCacheMetrics(opt.registerer, opt.extraLabels...), + labelsFunc: opt.labelsFunc, janitor: &janitor[T]{ interval: opt.interval, stop: make(chan bool), @@ -96,8 +113,8 @@ func (c *Cache[T]) Close() error { return nil } -// Add an item to the cache, existing items will not be overwritten. -// To overwrite existing items, use Update. +// Add an item to the cache, existing index will not be overwritten. +// To overwrite existing index, use Update. // If the cache is full, Add will return an error. func (c *Cache[T]) Add(object T) error { key, err := c.keyFunc(object) @@ -108,20 +125,20 @@ func (c *Cache[T]) Add(object T) error { c.mu.Lock() if c.closed { c.mu.Unlock() - c.metrics.incCacheRequests("failed") + c.metrics.incCacheRequests(StatusFailure) return KeyError{object, ErrClosed} } - _, found := c.Items[key] + _, found := c.index[key] if found { c.mu.Unlock() - c.metrics.incCacheRequests("failed") + c.metrics.incCacheRequests(StatusFailure) return KeyError{object, ErrAlreadyExists} } - if c.MaxItems > 0 && len(c.Items) < c.MaxItems { + if c.capacity > 0 && len(c.index) < c.capacity { c.set(key, object) c.mu.Unlock() - c.metrics.incCacheRequests("success") + c.metrics.incCacheRequests(StatusSuccess) c.metrics.incCacheItems() return nil } @@ -140,21 +157,21 @@ func (c *Cache[T]) Update(object T) error { c.mu.Lock() if c.closed { c.mu.Unlock() - c.metrics.incCacheRequests("failed") + c.metrics.incCacheRequests(StatusFailure) return KeyError{object, ErrClosed} } - _, found := c.Items[key] + _, found := c.index[key] if found { c.set(key, object) c.mu.Unlock() - c.metrics.incCacheRequests("success") + c.metrics.incCacheRequests(StatusSuccess) return nil } - if c.MaxItems > 0 && len(c.Items) < c.MaxItems { + if c.capacity > 0 && len(c.index) < c.capacity { c.set(key, object) c.mu.Unlock() - c.metrics.incCacheRequests("success") + c.metrics.incCacheRequests(StatusSuccess) c.metrics.incCacheItems() return nil } @@ -163,11 +180,19 @@ func (c *Cache[T]) Update(object T) error { } func (c *cache[T]) set(key string, object T) { - var e int64 - c.Items[key] = Item[T]{ - Object: object, - Expiration: e, + item := item[T]{ + key: key, + object: object, + expiration: int64(noExpiration), + } + + if _, found := c.index[key]; found { + // item already exists, update it only + c.index[key] = &item + return } + c.index[key] = &item + c.items = append(c.items, &item) } // Get an item from the cache. Returns the item or nil, and a bool indicating @@ -176,7 +201,7 @@ func (c *Cache[T]) Get(object T) (item T, exists bool, err error) { var res T lvs := []string{} if c.labelsFunc != nil { - lvs, err = c.labelsFunc(object, len(c.metrics.getLabels())) + lvs, err = c.labelsFunc(object, len(c.metrics.getExtraLabels())) if err != nil { return res, false, KeyError{object, err} } @@ -200,7 +225,7 @@ func (c *Cache[T]) Get(object T) (item T, exists bool, err error) { // GetByKey returns the object for the given key. func (c *Cache[T]) GetByKey(key string) (T, bool, error) { var res T - items, found, err := c.get(key) + index, found, err := c.get(key) if err != nil { return res, false, err } @@ -210,7 +235,7 @@ func (c *Cache[T]) GetByKey(key string) (T, bool, error) { } c.metrics.incCacheEvents(CacheEventTypeHit) - return items, true, nil + return index, true, nil } func (c *cache[T]) get(key string) (T, bool, error) { @@ -218,28 +243,29 @@ func (c *cache[T]) get(key string) (T, bool, error) { c.mu.RLock() if c.closed { c.mu.RUnlock() - c.metrics.incCacheRequests("failed") + c.metrics.incCacheRequests(StatusFailure) return res, false, ErrClosed } - item, found := c.Items[key] + item, found := c.index[key] if !found { c.mu.RUnlock() - c.metrics.incCacheRequests("success") + c.metrics.incCacheRequests(StatusSuccess) return res, false, nil } - if item.Expiration > 0 { - if item.Expiration < time.Now().UnixNano() { + if item.expiration > 0 { + if item.expiration < time.Now().UnixNano() { c.mu.RUnlock() c.metrics.incCacheRequests("succes") return res, false, nil } } c.mu.RUnlock() - c.metrics.incCacheRequests("success") - return item.Object, true, nil + c.metrics.incCacheRequests(StatusSuccess) + return item.object, true, nil } // Delete an item from the cache. Does nothing if the key is not in the cache. +// It actually sets the item expiration to now, so that it will at the cleanup. func (c *Cache[T]) Delete(object T) error { key, err := c.keyFunc(object) if err != nil { @@ -248,19 +274,22 @@ func (c *Cache[T]) Delete(object T) error { c.mu.Lock() if c.closed { c.mu.Unlock() - c.metrics.incCacheRequests("failed") + c.metrics.incCacheRequests(StatusFailure) return KeyError{object, ErrClosed} } - delete(c.Items, key) + if item, ok := c.index[key]; ok { + // set the item expiration to now + // so that it will be removed by the janitor + item.expiration = time.Now().UnixNano() + } c.mu.Unlock() - c.metrics.incCacheRequests("success") - c.metrics.decCacheItems() + c.metrics.incCacheRequests(StatusSuccess) return nil } -// Clear all items from the cache. -// This reallocates the underlying array holding the items, -// so that the memory used by the items is reclaimed. +// Clear all index from the cache. +// This reallocates the underlying array holding the index, +// so that the memory used by the index is reclaimed. // A closed cache cannot be cleared. func (c *cache[T]) Clear() { c.mu.Lock() @@ -268,7 +297,7 @@ func (c *cache[T]) Clear() { c.mu.Unlock() return } - c.Items = make(map[string]Item[T]) + c.index = make(map[string]*item[T]) c.mu.Unlock() } @@ -278,18 +307,53 @@ func (c *cache[T]) ListKeys() []string { c.mu.RLock() if c.closed { c.mu.RUnlock() - c.metrics.incCacheRequests("failed") + c.metrics.incCacheRequests(StatusFailure) return nil } - keys := make([]string, 0, len(c.Items)) - for k := range c.Items { + keys := make([]string, 0, len(c.index)) + for k := range c.index { keys = append(keys, k) } c.mu.RUnlock() - c.metrics.incCacheRequests("success") + c.metrics.incCacheRequests(StatusSuccess) return keys } +// Resize resizes the cache and returns the number of index removed. +func (c *cache[T]) Resize(size int) int { + overflow := len(c.items) - size + if overflow <= 0 { + c.metrics.incCacheRequests(StatusSuccess) + return 0 + } + c.mu.Lock() + if c.closed { + c.mu.Unlock() + c.metrics.incCacheRequests(StatusFailure) + return 0 + } + + if !c.sorted { + // sort the slice of index by expiration time + slices.SortFunc(c.items, func(i, j *item[T]) int { + return cmp.Compare(i.expiration, j.expiration) + }) + c.sorted = true + } + + // delete the overflow indexes + for _, v := range c.items[:overflow] { + delete(c.index, v.key) + c.metrics.incCacheEvictions() + c.metrics.decCacheItems() + } + // remove the overflow indexes from the slice + c.items = c.items[overflow:] + c.mu.Unlock() + c.metrics.incCacheRequests(StatusSuccess) + return overflow +} + // HasExpired returns true if the item has expired. func (c *Cache[T]) HasExpired(object T) (bool, error) { key, err := c.keyFunc(object) @@ -300,24 +364,24 @@ func (c *Cache[T]) HasExpired(object T) (bool, error) { c.mu.RLock() if c.closed { c.mu.RUnlock() - c.metrics.incCacheRequests("failed") + c.metrics.incCacheRequests(StatusFailure) return false, KeyError{object, ErrClosed} } - item, ok := c.Items[key] + item, ok := c.index[key] if !ok { c.mu.RUnlock() - c.metrics.incCacheRequests("success") + c.metrics.incCacheRequests(StatusSuccess) return true, nil } - if item.Expiration > 0 { - if item.Expiration < time.Now().UnixNano() { - c.mu.RUnlock() - c.metrics.incCacheRequests("success") - return true, nil - } + + if item.expiration < time.Now().UnixNano() { + c.mu.RUnlock() + c.metrics.incCacheRequests(StatusSuccess) + return true, nil } + c.mu.RUnlock() - c.metrics.incCacheRequests("success") + c.metrics.incCacheRequests(StatusSuccess) return false, nil } @@ -332,16 +396,19 @@ func (c *Cache[T]) SetExpiration(object T, expiration time.Duration) error { c.mu.Lock() if c.closed { c.mu.Unlock() - c.metrics.incCacheRequests("failed") + c.metrics.incCacheRequests(StatusFailure) return KeyError{object, ErrClosed} } - item, ok := c.Items[key] + item, ok := c.index[key] if ok { - item.Expiration = time.Now().Add(expiration).UnixNano() - c.Items[key] = item + item.expiration = time.Now().Add(expiration).UnixNano() + // mark the items as not sorted + c.sorted = false + c.metrics.incCacheRequests(StatusSuccess) + } else { + c.metrics.incCacheRequests(StatusFailure) } c.mu.Unlock() - c.metrics.incCacheRequests("success") return nil } @@ -356,43 +423,59 @@ func (c *Cache[T]) GetExpiration(object T) (time.Duration, error) { c.mu.RLock() if c.closed { c.mu.RUnlock() - c.metrics.incCacheRequests("failed") + c.metrics.incCacheRequests(StatusFailure) return 0, KeyError{object, ErrClosed} } - item, ok := c.Items[key] + item, ok := c.index[key] if !ok { c.mu.RUnlock() - c.metrics.incCacheRequests("success") + c.metrics.incCacheRequests(StatusSuccess) return 0, KeyError{object, ErrNotFound} } - if item.Expiration > 0 { - if item.Expiration < time.Now().UnixNano() { + if item.expiration > 0 { + if item.expiration < time.Now().UnixNano() { c.mu.RUnlock() - c.metrics.incCacheRequests("success") + c.metrics.incCacheRequests(StatusSuccess) return 0, nil } } c.mu.RUnlock() - c.metrics.incCacheRequests("success") - return time.Duration(item.Expiration - time.Now().UnixNano()), nil + c.metrics.incCacheRequests(StatusSuccess) + return time.Duration(item.expiration - time.Now().UnixNano()), nil } -// DeleteExpired deletes all expired items from the cache. -func (c *cache[T]) DeleteExpired() { +// deleteExpired deletes all expired index from the cache. +// It is called by the janitor. +func (c *cache[T]) deleteExpired() { c.mu.Lock() if c.closed { c.mu.Unlock() - c.metrics.incCacheRequests("failed") return } - for k, v := range c.Items { - if v.Expiration > 0 && v.Expiration < time.Now().UnixNano() { - delete(c.Items, k) - c.metrics.incCacheEvictions() - } + + if !c.sorted { + // sort the slice of index by expiration time + slices.SortFunc(c.items, func(i, j *item[T]) int { + return cmp.Compare(i.expiration, j.expiration) + }) + c.sorted = true + } + + t := time.Now().UnixNano() + index := sort.Search(len(c.items), func(i int) bool { + // smallest index with an expiration greater than t + return c.items[i].expiration > t + }) + + // delete the expired indexes + for _, v := range c.items[:index] { + delete(c.index, v.key) + c.metrics.incCacheEvictions() + c.metrics.decCacheItems() } + // remove the expired indexes from the slice + c.items = c.items[index:] c.mu.Unlock() - c.metrics.incCacheRequests("success") } type janitor[T any] struct { @@ -405,7 +488,7 @@ func (j *janitor[T]) run(c *cache[T]) { for { select { case <-ticker.C: - c.DeleteExpired() + c.deleteExpired() case <-j.stop: ticker.Stop() return diff --git a/cache/cache_test.go b/cache/cache_test.go index 2717c96e9..0b939ef49 100644 --- a/cache/cache_test.go +++ b/cache/cache_test.go @@ -18,7 +18,7 @@ package cache import ( "fmt" - "math/rand" + "math/rand/v2" "sync" "testing" "time" @@ -43,7 +43,8 @@ func TestCache(t *testing.T) { g := NewWithT(t) // create a cache that can hold 2 items and have no cleanup cache, err := New(3, kc.MetaNamespaceKeyFunc, - WithMetricsRegisterer[any](prometheus.NewPedanticRegistry())) + WithMetricsRegisterer[any](prometheus.NewPedanticRegistry()), + WithCleanupInterval[any](1*time.Second)) g.Expect(err).ToNot(HaveOccurred()) obj := &testObject{ @@ -117,6 +118,14 @@ func TestCache(t *testing.T) { g.Expect(err).ToNot(HaveOccurred()) g.Expect(found).To(BeTrue()) g.Expect(item).To(Equal(obj3)) + + // cleanup the cache + cache.Clear() + g.Expect(cache.ListKeys()).To(BeEmpty()) + + // close the cache + err = cache.Close() + g.Expect(err).ToNot(HaveOccurred()) }) t.Run("Add expiring keys", func(t *testing.T) { @@ -168,6 +177,337 @@ func TestCache(t *testing.T) { }) } +func Test_Cache_Add(t *testing.T) { + g := NewWithT(t) + reg := prometheus.NewPedanticRegistry() + cache, err := New[IdentifiableObject](1, IdentifiableObjectKeyFunc, + WithMetricsRegisterer[IdentifiableObject](reg), + WithCleanupInterval[IdentifiableObject](10*time.Millisecond)) + g.Expect(err).ToNot(HaveOccurred()) + + // Add an object representing an expiring token + obj := IdentifiableObject{ + ObjMetadata: object.ObjMetadata{ + Namespace: "test-ns", + Name: "test", + GroupKind: schema.GroupKind{ + Group: "test-group", + Kind: "TestObject", + }, + }, + Object: "test-token", + } + err = cache.Add(obj) + g.Expect(err).ToNot(HaveOccurred()) + err = cache.SetExpiration(obj, 10*time.Millisecond) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(cache.ListKeys()).To(ConsistOf("test-ns_test_test-group_TestObject")) + + // try adding the same object again + err = cache.Add(obj) + g.Expect(err).To(HaveOccurred()) + g.Expect(err.Error()).To(ContainSubstring("already exists")) + + // wait for the item to expire + time.Sleep(20 * time.Millisecond) + ok, err := cache.HasExpired(obj) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(ok).To(BeTrue()) + + // add another object + obj.Name = "test2" + err = cache.Add(obj) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(cache.ListKeys()).To(ConsistOf("test-ns_test2_test-group_TestObject")) + + // validate metrics + validateMetrics(reg, ` + # HELP gotk_cache_evictions_total Total number of cache evictions. + # TYPE gotk_cache_evictions_total counter + gotk_cache_evictions_total 1 + # HELP gotk_cache_requests_total Total number of cache requests partioned by success or failure. + # TYPE gotk_cache_requests_total counter + gotk_cache_requests_total{status="failure"} 1 + gotk_cache_requests_total{status="success"} 6 + # HELP gotk_cached_items Total number of items in the cache. + # TYPE gotk_cached_items gauge + gotk_cached_items 1 +`, t) +} + +func Test_Cache_Update(t *testing.T) { + g := NewWithT(t) + reg := prometheus.NewPedanticRegistry() + cache, err := New[IdentifiableObject](1, IdentifiableObjectKeyFunc, + WithMetricsRegisterer[IdentifiableObject](reg)) + g.Expect(err).ToNot(HaveOccurred()) + + // Add an object representing an expiring token + obj := IdentifiableObject{ + ObjMetadata: object.ObjMetadata{ + Namespace: "test-ns", + Name: "test", + GroupKind: schema.GroupKind{ + Group: "test-group", + Kind: "TestObject", + }, + }, + Object: "test-token", + } + err = cache.Add(obj) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(cache.ListKeys()).To(ConsistOf("test-ns_test_test-group_TestObject")) + + obj.Object = "test-token2" + err = cache.Update(obj) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(cache.ListKeys()).To(ConsistOf("test-ns_test_test-group_TestObject")) + g.Expect(cache.index["test-ns_test_test-group_TestObject"].object.Object).To(Equal("test-token2")) + + // validate metrics + validateMetrics(reg, ` + # HELP gotk_cache_evictions_total Total number of cache evictions. + # TYPE gotk_cache_evictions_total counter + gotk_cache_evictions_total 0 + # HELP gotk_cache_requests_total Total number of cache requests partioned by success or failure. + # TYPE gotk_cache_requests_total counter + gotk_cache_requests_total{status="success"} 4 + # HELP gotk_cached_items Total number of items in the cache. + # TYPE gotk_cached_items gauge + gotk_cached_items 1 + `, t) +} + +func Test_Cache_Get(t *testing.T) { + g := NewWithT(t) + reg := prometheus.NewPedanticRegistry() + cache, err := New[IdentifiableObject](5, IdentifiableObjectKeyFunc, + WithMetricsRegisterer[IdentifiableObject](reg), + WithMetricsLabels[IdentifiableObject](IdentifiableObjectLabels, IdentifiableObjectLVSFunc)) + g.Expect(err).ToNot(HaveOccurred()) + + // Add an object representing an expiring token + obj := IdentifiableObject{ + ObjMetadata: object.ObjMetadata{ + Namespace: "test-ns", + Name: "test", + GroupKind: schema.GroupKind{ + Group: "test-group", + Kind: "TestObject", + }, + }, + Object: "test-token", + } + + _, found, err := cache.Get(obj) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(found).To(BeFalse()) + + err = cache.Add(obj) + g.Expect(err).ToNot(HaveOccurred()) + + item, found, err := cache.Get(obj) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(found).To(BeTrue()) + g.Expect(item).To(Equal(obj)) + + validateMetrics(reg, ` + # HELP gotk_cache_events_total Total number of cache retrieval events for a Gitops Toolkit resource reconciliation. + # TYPE gotk_cache_events_total counter + gotk_cache_events_total{event_type="cache_hit",kind="TestObject",name="test",namespace="test-ns"} 1 + gotk_cache_events_total{event_type="cache_miss",kind="TestObject",name="test",namespace="test-ns"} 1 + # HELP gotk_cache_evictions_total Total number of cache evictions. + # TYPE gotk_cache_evictions_total counter + gotk_cache_evictions_total 0 + # HELP gotk_cache_requests_total Total number of cache requests partioned by success or failure. + # TYPE gotk_cache_requests_total counter + gotk_cache_requests_total{status="success"} 3 + # HELP gotk_cached_items Total number of items in the cache. + # TYPE gotk_cached_items gauge + gotk_cached_items 1 +`, t) +} + +func Test_Cache_Delete(t *testing.T) { + g := NewWithT(t) + reg := prometheus.NewPedanticRegistry() + cache, err := New[IdentifiableObject](5, IdentifiableObjectKeyFunc, + WithMetricsRegisterer[IdentifiableObject](reg), + WithCleanupInterval[IdentifiableObject](1*time.Millisecond)) + g.Expect(err).ToNot(HaveOccurred()) + + // Add an object representing an expiring token + obj := IdentifiableObject{ + ObjMetadata: object.ObjMetadata{ + Namespace: "test-ns", + Name: "test", + GroupKind: schema.GroupKind{ + Group: "test-group", + Kind: "TestObject", + }, + }, + Object: "test-token", + } + + err = cache.Add(obj) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(cache.ListKeys()).To(ConsistOf("test-ns_test_test-group_TestObject")) + + err = cache.Delete(obj) + g.Expect(err).ToNot(HaveOccurred()) + + time.Sleep(5 * time.Millisecond) + g.Expect(cache.ListKeys()).To(BeEmpty()) + + validateMetrics(reg, ` + # HELP gotk_cache_evictions_total Total number of cache evictions. + # TYPE gotk_cache_evictions_total counter + gotk_cache_evictions_total 1 + # HELP gotk_cache_requests_total Total number of cache requests partioned by success or failure. + # TYPE gotk_cache_requests_total counter + gotk_cache_requests_total{status="success"} 4 + # HELP gotk_cached_items Total number of items in the cache. + # TYPE gotk_cached_items gauge + gotk_cached_items 0 +`, t) +} + +func Test_Cache_deleteExpired(t *testing.T) { + type expiringItem struct { + object StoreObject[string] + expiration time.Duration + } + tests := []struct { + name string + items []expiringItem + nonExpiredKeys []string + }{ + { + name: "non expiring items", + items: []expiringItem{ + { + object: StoreObject[string]{ + Object: "test-token", + Key: "test", + }, + expiration: noExpiration, + }, + { + object: StoreObject[string]{ + Object: "test-token2", + Key: "test2", + }, + expiration: noExpiration, + }, + }, + nonExpiredKeys: []string{"test", "test2"}, + }, + { + name: "expiring items", + items: []expiringItem{ + { + object: StoreObject[string]{ + Object: "test-token", + Key: "test", + }, + expiration: 1 * time.Millisecond, + }, + { + object: StoreObject[string]{ + Object: "test-token2", + Key: "test2", + }, + expiration: 1 * time.Millisecond, + }, + }, + nonExpiredKeys: []string{}, + }, + { + name: "mixed items", + items: []expiringItem{ + { + object: StoreObject[string]{ + Object: "test-token", + Key: "test", + }, + expiration: 1 * time.Millisecond, + }, + { + object: StoreObject[string]{ + Object: "test-token2", + Key: "test2", + }, + expiration: noExpiration, + }, + { + object: StoreObject[string]{ + Object: "test-token3", + Key: "test3", + }, + expiration: 1 * time.Minute, + }, + }, + nonExpiredKeys: []string{"test2", "test3"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + reg := prometheus.NewPedanticRegistry() + cache, err := New[StoreObject[string]](5, StoreObjectKeyFunc, + WithMetricsRegisterer[StoreObject[string]](reg), + WithCleanupInterval[StoreObject[string]](1*time.Millisecond)) + g.Expect(err).ToNot(HaveOccurred()) + + for _, item := range tt.items { + err := cache.Add(item.object) + g.Expect(err).ToNot(HaveOccurred()) + if item.expiration != noExpiration { + err = cache.SetExpiration(item.object, item.expiration) + g.Expect(err).ToNot(HaveOccurred()) + } + } + + time.Sleep(5 * time.Millisecond) + keys := cache.ListKeys() + g.Expect(keys).To(ConsistOf(tt.nonExpiredKeys)) + }) + } +} + +func Test_Cache_Resize(t *testing.T) { + n := 100 + g := NewWithT(t) + reg := prometheus.NewPedanticRegistry() + cache, err := New[IdentifiableObject](n, IdentifiableObjectKeyFunc, + WithMetricsRegisterer[IdentifiableObject](reg), + WithCleanupInterval[IdentifiableObject](10*time.Millisecond)) + g.Expect(err).ToNot(HaveOccurred()) + + for i := range n { + obj := IdentifiableObject{ + ObjMetadata: object.ObjMetadata{ + Namespace: "test-ns", + Name: fmt.Sprintf("test-%d", i), + GroupKind: schema.GroupKind{ + Group: "test-group", + Kind: "TestObject", + }, + }, + Object: "test-token", + } + err = cache.Add(obj) + g.Expect(err).ToNot(HaveOccurred()) + err = cache.SetExpiration(obj, 10*time.Minute) + g.Expect(err).ToNot(HaveOccurred()) + } + + deleted := cache.Resize(10) + g.Expect(deleted).To(Equal(n - 10)) + g.Expect(cache.ListKeys()).To(HaveLen(10)) +} + func TestCache_Concurrent(t *testing.T) { const ( concurrency = 500 @@ -187,7 +527,7 @@ func TestCache_Concurrent(t *testing.T) { // simulate concurrent read and write for i := 0; i < concurrency; i++ { - key := rand.Intn(keysNum) + key := rand.IntN(keysNum) wg.Add(2) go func() { defer wg.Done() diff --git a/cache/lru.go b/cache/lru.go index 81cd6d303..e6c4ffb13 100644 --- a/cache/lru.go +++ b/cache/lru.go @@ -58,7 +58,7 @@ var _ Store[any] = &LRU[any]{} // NewLRU creates a new LRU cache with the given capacity and keyFunc. func NewLRU[T any](capacity int, keyFunc KeyFunc[T], opts ...Options[T]) (*LRU[T], error) { - opt := cacheOptions[T]{} + opt := storeOptions[T]{} for _, o := range opts { err := o(&opt) if err != nil { @@ -96,23 +96,23 @@ func (c *LRU[T]) Add(object T) error { _, ok := c.cache[key] c.mu.RUnlock() if ok { - c.metrics.incCacheRequests("failed") + c.metrics.incCacheRequests(StatusFailure) return KeyError{object, ErrAlreadyExists} } c.mu.Lock() evicted := c.add(&Node[T]{key: key, object: object}) c.mu.Unlock() - c.metrics.incCacheRequests("success") + c.metrics.incCacheRequests(StatusSuccess) if evicted { c.metrics.incCacheEvictions() + return nil } c.metrics.incCacheItems() return nil } -func (c *LRU[T]) add(node *Node[T]) bool { - evicted := false +func (c *LRU[T]) add(node *Node[T]) (evicted bool) { prev := c.tail.prev prev.addNext(node) c.tail.addPrev(node) @@ -123,22 +123,22 @@ func (c *LRU[T]) add(node *Node[T]) bool { if len(c.cache) > c.capacity { c.delete(c.head.next) - evicted = true + return true } - return evicted + return false } // Delete removes a node from the list func (c *LRU[T]) Delete(object T) error { key, err := c.keyFunc(object) if err != nil { - c.metrics.incCacheRequests("failed") + c.metrics.incCacheRequests(StatusFailure) return KeyError{object, err} } // if node is head or tail, do nothing if key == c.head.key || key == c.tail.key { - c.metrics.incCacheRequests("success") + c.metrics.incCacheRequests(StatusSuccess) return nil } @@ -147,13 +147,13 @@ func (c *LRU[T]) Delete(object T) error { node, ok := c.cache[key] if !ok { c.mu.Unlock() - c.metrics.incCacheRequests("success") + c.metrics.incCacheRequests(StatusSuccess) return nil } c.delete(node) c.mu.Unlock() - c.metrics.incCacheRequests("success") + c.metrics.incCacheRequests(StatusSuccess) c.metrics.decCacheItems() return nil } @@ -169,14 +169,14 @@ func (c *LRU[T]) Get(object T) (item T, exists bool, err error) { var res T lvs := []string{} if c.labelsFunc != nil { - lvs, err = c.labelsFunc(object, len(c.metrics.getLabels())) + lvs, err = c.labelsFunc(object, len(c.metrics.getExtraLabels())) if err != nil { return res, false, KeyError{object, err} } } key, err := c.keyFunc(object) if err != nil { - c.metrics.incCacheRequests("failed") + c.metrics.incCacheRequests(StatusFailure) return item, false, KeyError{object, err} } @@ -214,14 +214,13 @@ func (c *LRU[T]) get(key string) (item T, exists bool, err error) { node, ok := c.cache[key] if !ok { c.mu.Unlock() - c.metrics.incCacheRequests("success") - + c.metrics.incCacheRequests(StatusSuccess) return res, false, nil } c.delete(node) _ = c.add(node) c.mu.Unlock() - c.metrics.incCacheRequests("success") + c.metrics.incCacheRequests(StatusSuccess) return node.object, true, err } @@ -230,7 +229,7 @@ func (c *LRU[T]) get(key string) (item T, exists bool, err error) { func (c *LRU[T]) Update(object T) error { key, err := c.keyFunc(object) if err != nil { - c.metrics.incCacheRequests("failed") + c.metrics.incCacheRequests(StatusFailure) return KeyError{object, err} } @@ -240,7 +239,7 @@ func (c *LRU[T]) Update(object T) error { c.delete(node) _ = c.add(&Node[T]{key: key, object: object}) c.mu.Unlock() - c.metrics.incCacheRequests("success") + c.metrics.incCacheRequests(StatusSuccess) return nil } c.mu.Unlock() @@ -255,6 +254,24 @@ func (c *LRU[T]) ListKeys() []string { keys = append(keys, k) } c.mu.RUnlock() - c.metrics.incCacheRequests("success") + c.metrics.incCacheRequests(StatusSuccess) return keys } + +// Resize resizes the cache and returns the number of items removed. +func (c *LRU[T]) Resize(size int) int { + overflow := len(c.cache) - size + if overflow <= 0 { + c.metrics.incCacheRequests(StatusSuccess) + return 0 + } + c.mu.Lock() + c.capacity = size + for i := 0; i < overflow; i++ { + c.delete(c.head.next) + c.metrics.incCacheEvictions() + } + c.mu.Unlock() + c.metrics.incCacheRequests(StatusSuccess) + return overflow +} diff --git a/cache/lru_test.go b/cache/lru_test.go index 56f865950..25b3bc576 100644 --- a/cache/lru_test.go +++ b/cache/lru_test.go @@ -17,7 +17,8 @@ limitations under the License. package cache import ( - "math/rand" + "fmt" + "math/rand/v2" "sync" "testing" @@ -238,8 +239,9 @@ func Test_LRU(t *testing.T) { func Test_LRU_Add(t *testing.T) { g := NewWithT(t) + reg := prometheus.NewPedanticRegistry() cache, err := NewLRU[IdentifiableObject](1, IdentifiableObjectKeyFunc, - WithMetricsRegisterer[IdentifiableObject](prometheus.NewPedanticRegistry())) + WithMetricsRegisterer[IdentifiableObject](reg)) g.Expect(err).ToNot(HaveOccurred()) // Add an object representing an expiring token @@ -263,16 +265,32 @@ func Test_LRU_Add(t *testing.T) { g.Expect(err).To(HaveOccurred()) g.Expect(err.Error()).To(ContainSubstring("already exists")) + // add another object obj.Name = "test2" err = cache.Add(obj) g.Expect(err).ToNot(HaveOccurred()) g.Expect(cache.ListKeys()).To(ConsistOf("test-ns_test2_test-group_TestObject")) + + // validate metrics + validateMetrics(reg, ` + # HELP gotk_cache_evictions_total Total number of cache evictions. + # TYPE gotk_cache_evictions_total counter + gotk_cache_evictions_total 1 + # HELP gotk_cache_requests_total Total number of cache requests partioned by success or failure. + # TYPE gotk_cache_requests_total counter + gotk_cache_requests_total{status="failure"}1 + gotk_cache_requests_total{status="success"} 4 + # HELP gotk_cached_items Total number of items in the cache. + # TYPE gotk_cached_items gauge + gotk_cached_items 1 +`, t) } func Test_LRU_Update(t *testing.T) { g := NewWithT(t) + reg := prometheus.NewPedanticRegistry() cache, err := NewLRU[IdentifiableObject](1, IdentifiableObjectKeyFunc, - WithMetricsRegisterer[IdentifiableObject](prometheus.NewPedanticRegistry())) + WithMetricsRegisterer[IdentifiableObject](reg)) g.Expect(err).ToNot(HaveOccurred()) // Add an object representing an expiring token @@ -296,12 +314,27 @@ func Test_LRU_Update(t *testing.T) { g.Expect(err).ToNot(HaveOccurred()) g.Expect(cache.ListKeys()).To(ConsistOf("test-ns_test_test-group_TestObject")) g.Expect(cache.cache["test-ns_test_test-group_TestObject"].object.Object).To(Equal("test-token2")) + + // validate metrics + validateMetrics(reg, ` + # HELP gotk_cache_evictions_total Total number of cache evictions. + # TYPE gotk_cache_evictions_total counter + gotk_cache_evictions_total 0 + # HELP gotk_cache_requests_total Total number of cache requests partioned by success or failure. + # TYPE gotk_cache_requests_total counter + gotk_cache_requests_total{status="success"} 4 + # HELP gotk_cached_items Total number of items in the cache. + # TYPE gotk_cached_items gauge + gotk_cached_items 1 + `, t) } func Test_LRU_Get(t *testing.T) { g := NewWithT(t) + reg := prometheus.NewPedanticRegistry() cache, err := NewLRU[IdentifiableObject](5, IdentifiableObjectKeyFunc, - WithMetricsRegisterer[IdentifiableObject](prometheus.NewPedanticRegistry())) + WithMetricsRegisterer[IdentifiableObject](reg), + WithMetricsLabels[IdentifiableObject](IdentifiableObjectLabels, IdentifiableObjectLVSFunc)) g.Expect(err).ToNot(HaveOccurred()) // Add an object representing an expiring token @@ -328,12 +361,29 @@ func Test_LRU_Get(t *testing.T) { g.Expect(err).ToNot(HaveOccurred()) g.Expect(found).To(BeTrue()) g.Expect(item).To(Equal(obj)) + + validateMetrics(reg, ` + # HELP gotk_cache_events_total Total number of cache retrieval events for a Gitops Toolkit resource reconciliation. + # TYPE gotk_cache_events_total counter + gotk_cache_events_total{event_type="cache_hit",kind="TestObject",name="test",namespace="test-ns"} 1 + gotk_cache_events_total{event_type="cache_miss",kind="TestObject",name="test",namespace="test-ns"} 1 + # HELP gotk_cache_evictions_total Total number of cache evictions. + # TYPE gotk_cache_evictions_total counter + gotk_cache_evictions_total 0 + # HELP gotk_cache_requests_total Total number of cache requests partioned by success or failure. + # TYPE gotk_cache_requests_total counter + gotk_cache_requests_total{status="success"} 3 + # HELP gotk_cached_items Total number of items in the cache. + # TYPE gotk_cached_items gauge + gotk_cached_items 1 +`, t) } func Test_LRU_Delete(t *testing.T) { g := NewWithT(t) + reg := prometheus.NewPedanticRegistry() cache, err := NewLRU[IdentifiableObject](5, IdentifiableObjectKeyFunc, - WithMetricsRegisterer[IdentifiableObject](prometheus.NewPedanticRegistry())) + WithMetricsRegisterer[IdentifiableObject](reg)) g.Expect(err).ToNot(HaveOccurred()) // Add an object representing an expiring token @@ -355,6 +405,47 @@ func Test_LRU_Delete(t *testing.T) { err = cache.Delete(obj) g.Expect(err).ToNot(HaveOccurred()) g.Expect(cache.ListKeys()).To(BeEmpty()) + + validateMetrics(reg, ` + # HELP gotk_cache_evictions_total Total number of cache evictions. + # TYPE gotk_cache_evictions_total counter + gotk_cache_evictions_total 0 + # HELP gotk_cache_requests_total Total number of cache requests partioned by success or failure. + # TYPE gotk_cache_requests_total counter + gotk_cache_requests_total{status="success"} 3 + # HELP gotk_cached_items Total number of items in the cache. + # TYPE gotk_cached_items gauge + gotk_cached_items 0 +`, t) +} + +func Test_LRU_Resize(t *testing.T) { + n := 100 + g := NewWithT(t) + reg := prometheus.NewPedanticRegistry() + cache, err := NewLRU[IdentifiableObject](n, IdentifiableObjectKeyFunc, + WithMetricsRegisterer[IdentifiableObject](reg)) + g.Expect(err).ToNot(HaveOccurred()) + + for i := range n { + obj := IdentifiableObject{ + ObjMetadata: object.ObjMetadata{ + Namespace: "test-ns", + Name: fmt.Sprintf("test-%d", i), + GroupKind: schema.GroupKind{ + Group: "test-group", + Kind: "TestObject", + }, + }, + Object: "test-token", + } + err = cache.Add(obj) + g.Expect(err).ToNot(HaveOccurred()) + } + + deleted := cache.Resize(10) + g.Expect(deleted).To(Equal(n - 10)) + g.Expect(cache.ListKeys()).To(HaveLen(10)) } func TestLRU_Concurrent(t *testing.T) { @@ -375,7 +466,7 @@ func TestLRU_Concurrent(t *testing.T) { // simulate concurrent read and write for i := 0; i < concurrency; i++ { - key := rand.Intn(keysNum) + key := rand.IntN(keysNum) wg.Add(2) go func() { defer wg.Done() diff --git a/cache/metrics.go b/cache/metrics.go index 23e5a9e79..37d243406 100644 --- a/cache/metrics.go +++ b/cache/metrics.go @@ -28,6 +28,10 @@ const ( CacheEventTypeMiss = "cache_miss" // CacheEventTypeHit is the event type for cache hits. CacheEventTypeHit = "cache_hit" + // StatusSuccess is the status for successful cache requests. + StatusSuccess = "success" + // StatusFailure is the status for failed cache requests. + StatusFailure = "failure" ) type cacheMetrics struct { @@ -36,12 +40,12 @@ type cacheMetrics struct { cacheItemsGauge prometheus.Gauge cacheRequestsCounter *prometheus.CounterVec cacheEvictionCounter prometheus.Counter - labels []string + extraLabels []string } // newcacheMetrics returns a new cacheMetrics. -func newCacheMetrics(reg prometheus.Registerer, labels ...string) *cacheMetrics { - labels = append([]string{"event_type"}, labels...) +func newCacheMetrics(reg prometheus.Registerer, extraLabels ...string) *cacheMetrics { + labels := append([]string{"event_type"}, extraLabels...) return &cacheMetrics{ cacheEventsCounter: promauto.With(reg).NewCounterVec( prometheus.CounterOpts{ @@ -69,12 +73,12 @@ func newCacheMetrics(reg prometheus.Registerer, labels ...string) *cacheMetrics Help: "Total number of cache evictions.", }, ), - labels: labels, + extraLabels: extraLabels, } } -func (m *cacheMetrics) getLabels() []string { - return m.labels +func (m *cacheMetrics) getExtraLabels() []string { + return m.extraLabels } // collectors returns the metrics.Collector objects for the cacheMetrics. diff --git a/cache/metrics_test.go b/cache/metrics_test.go index 03f1e6420..5a2d90b8a 100644 --- a/cache/metrics_test.go +++ b/cache/metrics_test.go @@ -35,9 +35,9 @@ func TestCacheMetrics(t *testing.T) { m.incCacheEvents(CacheEventTypeHit, []string{"test", "test-ns", "TestObject"}...) m.incCacheEvents(CacheEventTypeMiss, []string{"test", "test-ns", "TestObject"}...) m.incCacheRequests("success") - m.incCacheRequests("failed") + m.incCacheRequests("failure") - err := testutil.GatherAndCompare(reg, bytes.NewBufferString(` + validateMetrics(reg, ` # HELP gotk_cache_events_total Total number of cache retrieval events for a Gitops Toolkit resource reconciliation. # TYPE gotk_cache_events_total counter gotk_cache_events_total{event_type="cache_hit",kind="TestObject",name="test",namespace="test-ns"} 1 @@ -47,15 +47,20 @@ func TestCacheMetrics(t *testing.T) { gotk_cache_evictions_total 0 # HELP gotk_cache_requests_total Total number of cache requests partioned by success or failure. # TYPE gotk_cache_requests_total counter - gotk_cache_requests_total{status="failed"} 1 + gotk_cache_requests_total{status="failure"} 1 gotk_cache_requests_total{status="success"} 1 # HELP gotk_cached_items Total number of items in the cache. # TYPE gotk_cached_items gauge gotk_cached_items 0 - `)) - g.Expect(err).ToNot(HaveOccurred()) + `, t) res, err := testutil.GatherAndLint(reg) g.Expect(err).ToNot(HaveOccurred()) g.Expect(res).To(BeEmpty()) } + +func validateMetrics(reg prometheus.Gatherer, expected string, t *testing.T) { + g := NewWithT(t) + err := testutil.GatherAndCompare(reg, bytes.NewBufferString(expected)) + g.Expect(err).ToNot(HaveOccurred()) +} diff --git a/cache/store.go b/cache/store.go index ee39baf46..83dc477da 100644 --- a/cache/store.go +++ b/cache/store.go @@ -28,61 +28,45 @@ import ( // It is a geric version of the Kubernetes client-go cache.Store interface. // See https://pkg.go.dev/k8s.io/client-go/tools/cache#Store type Store[T any] interface { + // Add adds an object to the store. Add(object T) error + // Update updates an object in the store. Update(object T) error + // Delete deletes an object from the store. Delete(object T) error + // ListKeys returns a list of keys in the store. ListKeys() []string + // Get returns the object stored in the store. Get(object T) (item T, exists bool, err error) + // GetByKey returns the object stored in the store by key. GetByKey(key string) (item T, exists bool, err error) + // Resize resizes the store and returns the number of items removed. + Resize(int) int } // Expirable is an interface for a cache store that supports expiration. type Expirable[T any] interface { Store[T] + // SetExpiration sets the expiration time for the object. SetExpiration(object T, expiration time.Duration) error // GetExpiration returns the time.Duration until the objectect expires. GetExpiration(object T) (time.Duration, error) + // HasExpired returns true if the object has expired. HasExpired(object T) (bool, error) } -// KeyFunc knows how to make a key from an object. Implementations should be deterministic. -type KeyFunc[T any] func(object T) (string, error) - -// IdentifiableObject is a wrapper for an object with its identifying metadata. -type IdentifiableObject struct { - object.ObjMetadata - Object any -} - -// ExplicitKey can be passed to IdentifiableObjectFunc if you have the key for -// the objectect but not the objectect itself. -type ExplicitKey string - -// IdentifiableObjectKeyFunc is a convenient default KeyFunc which knows how to make -// keys from IdentifiableObject objects. -func IdentifiableObjectKeyFunc[T any](object T) (string, error) { - if key, ok := any(object).(ExplicitKey); ok { - return string(key), nil - } - n, ok := any(object).(IdentifiableObject) - if !ok { - return "", fmt.Errorf("object has no meta: %v", object) - } - return n.String(), nil -} - -type cacheOptions[T any] struct { +type storeOptions[T any] struct { interval time.Duration registerer prometheus.Registerer extraLabels []string labelsFunc GetLvsFunc[T] } -type Options[T any] func(*cacheOptions[T]) error +type Options[T any] func(*storeOptions[T]) error // WithMetricsLabels sets the extra labels for the cache metrics. func WithMetricsLabels[T any](labels []string, f GetLvsFunc[T]) Options[T] { - return func(o *cacheOptions[T]) error { + return func(o *storeOptions[T]) error { if labels != nil && f == nil { return fmt.Errorf("labelsFunc must be set if labels are provided") } @@ -94,15 +78,56 @@ func WithMetricsLabels[T any](labels []string, f GetLvsFunc[T]) Options[T] { // WithCleanupInterval sets the interval for the cache cleanup. func WithCleanupInterval[T any](interval time.Duration) Options[T] { - return func(o *cacheOptions[T]) error { + return func(o *storeOptions[T]) error { o.interval = interval return nil } } func WithMetricsRegisterer[T any](r prometheus.Registerer) Options[T] { - return func(o *cacheOptions[T]) error { + return func(o *storeOptions[T]) error { o.registerer = r return nil } } + +// KeyFunc knows how to make a key from an object. Implementations should be deterministic. +type KeyFunc[T any] func(object T) (string, error) + +// IdentifiableObject is a wrapper for an object with its identifying metadata. +type IdentifiableObject struct { + object.ObjMetadata + // Object is the object that is being stored. + Object any +} + +// ExplicitKey can be passed to IdentifiableObjectFunc if you have the key for +// the objectect but not the objectect itself. +type ExplicitKey string + +// IdentifiableObjectKeyFunc is a convenient default KeyFunc which knows how to make +// keys from IdentifiableObject objects. +func IdentifiableObjectKeyFunc[T any](object T) (string, error) { + if key, ok := any(object).(ExplicitKey); ok { + return string(key), nil + } + n, ok := any(object).(IdentifiableObject) + if !ok { + return "", fmt.Errorf("object has no meta: %v", object) + } + return n.String(), nil +} + +// StoreObject is a wrapper for an object with its identifying key. +// It is used to store objects in a Store. +type StoreObject[T any] struct { + // Object is the object that is being stored. + Object T + // Key is the key for the object. + Key string +} + +// StoreObjectKeyFunc returns the key for a StoreObject. +func StoreObjectKeyFunc[T any](object StoreObject[T]) (string, error) { + return object.Key, nil +}