From 21c60f7df1dcf810de6ed5ae309fcb6575c609e8 Mon Sep 17 00:00:00 2001 From: Soule BA Date: Tue, 30 Apr 2024 16:50:27 +0200 Subject: [PATCH] Add instrumentation for the cache implementations The cache instrumentation purpose is to be self contained. We want to avoid leaking any of the logic here in consuming code. Signed-off-by: Soule BA --- cache/cache.go | 88 ++++++++++++++++++++++++---- cache/cache_test.go | 12 +++- cache/errors.go | 1 + cache/go.mod | 1 - cache/go.sum | 4 -- cache/lru.go | 99 +++++++++++++++++++++++++------ cache/lru_test.go | 27 ++++++--- cache/metrics.go | 131 ++++++++++++++++++++++++++++++++---------- cache/metrics_test.go | 61 ++++++++++++++++++++ cache/store.go | 37 ++++++++++++ 10 files changed, 384 insertions(+), 77 deletions(-) create mode 100644 cache/metrics_test.go diff --git a/cache/cache.go b/cache/cache.go index e13a165fa..419d064f6 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -42,32 +42,46 @@ type cache[T any] struct { // Items holds the elements in the cache. Items map[string]Item[T] // MaxItems is the maximum number of items the cache can hold. - MaxItems int - mu sync.RWMutex - janitor *janitor[T] - closed bool + MaxItems int + metrics *cacheMetrics + labelsFunc GetLvsFunc[T] + janitor *janitor[T] + closed bool + + mu sync.RWMutex } var _ Expirable[any] = &Cache[any]{} // New creates a new cache with the given configuration. -func New[T any](maxItems int, keyFunc KeyFunc[T], interval time.Duration) *Cache[T] { +func New[T any](maxItems int, keyFunc KeyFunc[T], opts ...Options[T]) (*Cache[T], error) { + opt := cacheOptions[T]{} + for _, o := range opts { + err := o(&opt) + if err != nil { + return nil, err + } + } + if opt.registerer == nil { + return nil, ErrNoRegisterer + } c := &cache[T]{ Items: make(map[string]Item[T]), MaxItems: maxItems, + metrics: newCacheMetrics(opt.registerer, opt.extraLabels...), janitor: &janitor[T]{ - interval: interval, + interval: opt.interval, stop: make(chan bool), }, } C := &Cache[T]{cache: c, keyFunc: keyFunc} - if interval > 0 { + if opt.interval > 0 { go c.janitor.run(c) } - return C + return C, nil } // Close closes the cache. It also stops the expiration eviction process. @@ -94,17 +108,21 @@ func (c *Cache[T]) Add(object T) error { c.mu.Lock() if c.closed { c.mu.Unlock() + c.metrics.incCacheRequests("failed") return KeyError{object, ErrClosed} } _, found := c.Items[key] if found { c.mu.Unlock() + c.metrics.incCacheRequests("failed") return KeyError{object, ErrAlreadyExists} } if c.MaxItems > 0 && len(c.Items) < c.MaxItems { c.set(key, object) c.mu.Unlock() + c.metrics.incCacheRequests("success") + c.metrics.incCacheItems() return nil } c.mu.Unlock() @@ -122,18 +140,22 @@ func (c *Cache[T]) Update(object T) error { c.mu.Lock() if c.closed { c.mu.Unlock() + c.metrics.incCacheRequests("failed") return KeyError{object, ErrClosed} } _, found := c.Items[key] if found { c.set(key, object) c.mu.Unlock() + c.metrics.incCacheRequests("success") return nil } if c.MaxItems > 0 && len(c.Items) < c.MaxItems { c.set(key, object) c.mu.Unlock() + c.metrics.incCacheRequests("success") + c.metrics.incCacheItems() return nil } c.mu.Unlock() @@ -150,8 +172,15 @@ func (c *cache[T]) set(key string, object T) { // Get an item from the cache. Returns the item or nil, and a bool indicating // whether the key was found. -func (c *Cache[T]) Get(object T) (T, bool, error) { +func (c *Cache[T]) Get(object T) (item T, exists bool, err error) { var res T + lvs := []string{} + if c.labelsFunc != nil { + lvs, err = c.labelsFunc(object, len(c.metrics.getLabels())) + if err != nil { + return res, false, KeyError{object, err} + } + } key, err := c.keyFunc(object) if err != nil { return res, false, KeyError{object, err} @@ -161,13 +190,26 @@ func (c *Cache[T]) Get(object T) (T, bool, error) { return res, false, KeyError{object, err} } if !found { + c.metrics.incCacheEvents(CacheEventTypeMiss, lvs...) return res, false, nil } + c.metrics.incCacheEvents(CacheEventTypeHit, lvs...) return item, true, nil } func (c *Cache[T]) GetByKey(key string) (T, bool, error) { - return c.get(key) + var res T + items, found, err := c.get(key) + if err != nil { + return res, false, err + } + if !found { + c.metrics.incCacheEvents(CacheEventTypeMiss) + return res, false, nil + } + + c.metrics.incCacheEvents(CacheEventTypeHit) + return items, true, nil } func (c *cache[T]) get(key string) (T, bool, error) { @@ -175,20 +217,24 @@ func (c *cache[T]) get(key string) (T, bool, error) { c.mu.RLock() if c.closed { c.mu.RUnlock() + c.metrics.incCacheRequests("failed") return res, false, ErrClosed } item, found := c.Items[key] if !found { c.mu.RUnlock() + c.metrics.incCacheRequests("success") return res, false, nil } if item.Expiration > 0 { if item.Expiration < time.Now().UnixNano() { c.mu.RUnlock() - return res, false, ErrExpired + c.metrics.incCacheRequests("succes") + return res, false, nil } } c.mu.RUnlock() + c.metrics.incCacheRequests("success") return item.Object, true, nil } @@ -201,10 +247,13 @@ func (c *Cache[T]) Delete(object T) error { c.mu.Lock() if c.closed { c.mu.Unlock() + c.metrics.incCacheRequests("failed") return KeyError{object, ErrClosed} } delete(c.Items, key) c.mu.Unlock() + c.metrics.incCacheRequests("success") + c.metrics.decCacheItems() return nil } @@ -228,6 +277,7 @@ func (c *cache[T]) ListKeys() []string { c.mu.RLock() if c.closed { c.mu.RUnlock() + c.metrics.incCacheRequests("failed") return nil } keys := make([]string, 0, len(c.Items)) @@ -235,6 +285,7 @@ func (c *cache[T]) ListKeys() []string { keys = append(keys, k) } c.mu.RUnlock() + c.metrics.incCacheRequests("success") return keys } @@ -248,20 +299,24 @@ func (c *Cache[T]) HasExpired(object T) (bool, error) { c.mu.RLock() if c.closed { c.mu.RUnlock() + c.metrics.incCacheRequests("failed") return false, KeyError{object, ErrClosed} } item, ok := c.Items[key] if !ok { c.mu.RUnlock() + c.metrics.incCacheRequests("success") return true, nil } if item.Expiration > 0 { if item.Expiration < time.Now().UnixNano() { c.mu.RUnlock() + c.metrics.incCacheRequests("success") return true, nil } } c.mu.RUnlock() + c.metrics.incCacheRequests("success") return false, nil } @@ -276,6 +331,7 @@ func (c *Cache[T]) SetExpiration(object T, expiration time.Duration) error { c.mu.Lock() if c.closed { c.mu.Unlock() + c.metrics.incCacheRequests("failed") return KeyError{object, ErrClosed} } item, ok := c.Items[key] @@ -284,6 +340,7 @@ func (c *Cache[T]) SetExpiration(object T, expiration time.Duration) error { c.Items[key] = item } c.mu.Unlock() + c.metrics.incCacheRequests("success") return nil } @@ -298,20 +355,24 @@ func (c *Cache[T]) GetExpiration(object T) (time.Duration, error) { c.mu.RLock() if c.closed { c.mu.RUnlock() + c.metrics.incCacheRequests("failed") return 0, KeyError{object, ErrClosed} } item, ok := c.Items[key] if !ok { c.mu.RUnlock() + c.metrics.incCacheRequests("success") return 0, KeyError{object, ErrNotFound} } if item.Expiration > 0 { if item.Expiration < time.Now().UnixNano() { c.mu.RUnlock() - return 0, KeyError{object, ErrExpired} + c.metrics.incCacheRequests("success") + return 0, nil } } c.mu.RUnlock() + c.metrics.incCacheRequests("success") return time.Duration(item.Expiration - time.Now().UnixNano()), nil } @@ -320,14 +381,17 @@ func (c *cache[T]) DeleteExpired() { c.mu.Lock() if c.closed { c.mu.Unlock() + c.metrics.incCacheRequests("failed") return } for k, v := range c.Items { if v.Expiration > 0 && v.Expiration < time.Now().UnixNano() { delete(c.Items, k) + c.metrics.incCacheEvictions() } } c.mu.Unlock() + c.metrics.incCacheRequests("success") } type janitor[T any] struct { diff --git a/cache/cache_test.go b/cache/cache_test.go index 5091d534a..f7b9f49c0 100644 --- a/cache/cache_test.go +++ b/cache/cache_test.go @@ -22,6 +22,7 @@ import ( "github.com/fluxcd/cli-utils/pkg/object" . "github.com/onsi/gomega" + "github.com/prometheus/client_golang/prometheus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" kc "k8s.io/client-go/tools/cache" @@ -38,7 +39,9 @@ func TestCache(t *testing.T) { t.Run("Add and update keys", func(t *testing.T) { g := NewWithT(t) // create a cache that can hold 2 items and have no cleanup - cache := New(3, kc.MetaNamespaceKeyFunc, 0) + cache, err := New(3, kc.MetaNamespaceKeyFunc, + WithMetricsRegisterer[any](prometheus.NewPedanticRegistry())) + g.Expect(err).ToNot(HaveOccurred()) obj := &testObject{ TypeMeta: metav1.TypeMeta{ @@ -116,7 +119,10 @@ func TestCache(t *testing.T) { t.Run("Add expiring keys", func(t *testing.T) { g := NewWithT(t) // new cache with a cleanup interval of 1 second - cache := New[IdentifiableObject](2, IdentifiableObjectKeyFunc, 1*time.Second) + cache, err := New[IdentifiableObject](2, IdentifiableObjectKeyFunc, + WithCleanupInterval[IdentifiableObject](1*time.Second), + WithMetricsRegisterer[IdentifiableObject](prometheus.NewPedanticRegistry())) + g.Expect(err).ToNot(HaveOccurred()) // Add an object representing an expiring token obj := IdentifiableObject{ @@ -135,7 +141,7 @@ func TestCache(t *testing.T) { }, } - err := cache.Add(obj) + err = cache.Add(obj) g.Expect(err).ToNot(HaveOccurred()) // set expiration time to 2 seconds diff --git a/cache/errors.go b/cache/errors.go index 0099a46cd..c797523f7 100644 --- a/cache/errors.go +++ b/cache/errors.go @@ -25,6 +25,7 @@ var ( ErrClosed = fmt.Errorf("cache closed") ErrFull = fmt.Errorf("cache full") ErrExpired = fmt.Errorf("key has expired") + ErrNoRegisterer = fmt.Errorf("no prometheus registerer provided") ) // KeyError will be returned any time a KeyFunc gives an error; it includes the object diff --git a/cache/go.mod b/cache/go.mod index 842079a60..58123a2ca 100644 --- a/cache/go.mod +++ b/cache/go.mod @@ -8,7 +8,6 @@ require ( github.com/prometheus/client_golang v1.19.0 k8s.io/apimachinery v0.30.0 k8s.io/client-go v0.30.0 - sigs.k8s.io/controller-runtime v0.18.0 ) require ( diff --git a/cache/go.sum b/cache/go.sum index 62b68a622..20f0be3e3 100644 --- a/cache/go.sum +++ b/cache/go.sum @@ -48,8 +48,6 @@ github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4 github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= -github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= -github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -285,8 +283,6 @@ k8s.io/kubectl v0.30.0 h1:xbPvzagbJ6RNYVMVuiHArC1grrV5vSmmIcSZuCdzRyk= k8s.io/kubectl v0.30.0/go.mod h1:zgolRw2MQXLPwmic2l/+iHs239L49fhSeICuMhQQXTI= k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSnlTLKgpAAttJvpI= k8s.io/utils v0.0.0-20230726121419-3b25d923346b/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= -sigs.k8s.io/controller-runtime v0.18.0 h1:Z7jKuX784TQSUL1TIyeuF7j8KXZ4RtSX0YgtjKcSTME= -sigs.k8s.io/controller-runtime v0.18.0/go.mod h1:tuAt1+wbVsXIT8lPtk5RURxqAnq7xkpv2Mhttslg7Hw= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= sigs.k8s.io/kustomize/api v0.17.1 h1:MYJBOP/yQ3/5tp4/sf6HiiMfNNyO97LmtnirH9SLNr4= diff --git a/cache/lru.go b/cache/lru.go index 5830f6f23..81cd6d303 100644 --- a/cache/lru.go +++ b/cache/lru.go @@ -46,27 +46,41 @@ type LRU[T any] struct { capacity int // keyFunc is used to make the key for objects stored in and retrieved from items, and // should be deterministic. - keyFunc KeyFunc[T] - head *Node[T] - tail *Node[T] - mu sync.RWMutex + keyFunc KeyFunc[T] + metrics *cacheMetrics + labelsFunc GetLvsFunc[T] + head *Node[T] + tail *Node[T] + mu sync.RWMutex } var _ Store[any] = &LRU[any]{} // NewLRU creates a new LRU cache with the given capacity and keyFunc. -func NewLRU[T any](capacity int, keyFunc KeyFunc[T]) *LRU[T] { +func NewLRU[T any](capacity int, keyFunc KeyFunc[T], opts ...Options[T]) (*LRU[T], error) { + opt := cacheOptions[T]{} + for _, o := range opts { + err := o(&opt) + if err != nil { + return nil, err + } + } + if opt.registerer == nil { + return nil, ErrNoRegisterer + } head := &Node[T]{} tail := &Node[T]{} head.addNext(tail) tail.addPrev(head) return &LRU[T]{ - cache: make(map[string]*Node[T]), - keyFunc: keyFunc, - capacity: capacity, - head: head, - tail: tail, - } + cache: make(map[string]*Node[T]), + keyFunc: keyFunc, + metrics: newCacheMetrics(opt.registerer, opt.extraLabels...), + labelsFunc: opt.labelsFunc, + capacity: capacity, + head: head, + tail: tail, + }, nil } // Add adds a node to the end of the list @@ -82,16 +96,23 @@ func (c *LRU[T]) Add(object T) error { _, ok := c.cache[key] c.mu.RUnlock() if ok { + c.metrics.incCacheRequests("failed") return KeyError{object, ErrAlreadyExists} } c.mu.Lock() - c.add(&Node[T]{key: key, object: object}) + evicted := c.add(&Node[T]{key: key, object: object}) c.mu.Unlock() + c.metrics.incCacheRequests("success") + if evicted { + c.metrics.incCacheEvictions() + } + c.metrics.incCacheItems() return nil } -func (c *LRU[T]) add(node *Node[T]) { +func (c *LRU[T]) add(node *Node[T]) bool { + evicted := false prev := c.tail.prev prev.addNext(node) c.tail.addPrev(node) @@ -102,18 +123,22 @@ func (c *LRU[T]) add(node *Node[T]) { if len(c.cache) > c.capacity { c.delete(c.head.next) + evicted = true } + return evicted } // Delete removes a node from the list func (c *LRU[T]) Delete(object T) error { key, err := c.keyFunc(object) if err != nil { + c.metrics.incCacheRequests("failed") return KeyError{object, err} } // if node is head or tail, do nothing if key == c.head.key || key == c.tail.key { + c.metrics.incCacheRequests("success") return nil } @@ -122,11 +147,14 @@ func (c *LRU[T]) Delete(object T) error { node, ok := c.cache[key] if !ok { c.mu.Unlock() + c.metrics.incCacheRequests("success") return nil } c.delete(node) c.mu.Unlock() + c.metrics.incCacheRequests("success") + c.metrics.decCacheItems() return nil } @@ -138,17 +166,46 @@ func (c *LRU[T]) delete(node *Node[T]) { // Get returns the given object from the cache. // If the object is not in the cache, it returns false. func (c *LRU[T]) Get(object T) (item T, exists bool, err error) { + var res T + lvs := []string{} + if c.labelsFunc != nil { + lvs, err = c.labelsFunc(object, len(c.metrics.getLabels())) + if err != nil { + return res, false, KeyError{object, err} + } + } key, err := c.keyFunc(object) if err != nil { + c.metrics.incCacheRequests("failed") return item, false, KeyError{object, err} } - return c.get(key) + item, exists, err = c.get(key) + if err != nil { + return res, false, KeyError{object, err} + } + if !exists { + c.metrics.incCacheEvents(CacheEventTypeMiss, lvs...) + return res, false, nil + } + c.metrics.incCacheEvents(CacheEventTypeHit, lvs...) + return item, true, nil } // GetByKey returns the object for the given key. -func (c *LRU[T]) GetByKey(key string) (item T, exists bool, err error) { - return c.get(key) +func (c *LRU[T]) GetByKey(key string) (T, bool, error) { + var res T + item, found, err := c.get(key) + if err != nil { + return res, false, err + } + if !found { + c.metrics.incCacheEvents(CacheEventTypeMiss) + return res, false, nil + } + + c.metrics.incCacheEvents(CacheEventTypeHit) + return item, true, nil } func (c *LRU[T]) get(key string) (item T, exists bool, err error) { @@ -157,11 +214,14 @@ func (c *LRU[T]) get(key string) (item T, exists bool, err error) { node, ok := c.cache[key] if !ok { c.mu.Unlock() + c.metrics.incCacheRequests("success") + return res, false, nil } c.delete(node) - c.add(node) + _ = c.add(node) c.mu.Unlock() + c.metrics.incCacheRequests("success") return node.object, true, err } @@ -170,6 +230,7 @@ func (c *LRU[T]) get(key string) (item T, exists bool, err error) { func (c *LRU[T]) Update(object T) error { key, err := c.keyFunc(object) if err != nil { + c.metrics.incCacheRequests("failed") return KeyError{object, err} } @@ -177,8 +238,9 @@ func (c *LRU[T]) Update(object T) error { node, ok := c.cache[key] if ok { c.delete(node) - c.add(&Node[T]{key: key, object: object}) + _ = c.add(&Node[T]{key: key, object: object}) c.mu.Unlock() + c.metrics.incCacheRequests("success") return nil } c.mu.Unlock() @@ -193,5 +255,6 @@ func (c *LRU[T]) ListKeys() []string { keys = append(keys, k) } c.mu.RUnlock() + c.metrics.incCacheRequests("success") return keys } diff --git a/cache/lru_test.go b/cache/lru_test.go index 3e970f450..ac2263d0c 100644 --- a/cache/lru_test.go +++ b/cache/lru_test.go @@ -21,6 +21,7 @@ import ( "github.com/fluxcd/cli-utils/pkg/object" . "github.com/onsi/gomega" + "github.com/prometheus/client_golang/prometheus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" kc "k8s.io/client-go/tools/cache" @@ -214,7 +215,9 @@ func Test_LRU(t *testing.T) { for _, v := range testCases { t.Run(v.name, func(t *testing.T) { g := NewWithT(t) - cache := NewLRU(5, kc.MetaNamespaceKeyFunc) + cache, err := NewLRU(5, kc.MetaNamespaceKeyFunc, + WithMetricsRegisterer[any](prometheus.NewPedanticRegistry())) + g.Expect(err).ToNot(HaveOccurred()) for _, input := range v.inputs { err := cache.Add(input) g.Expect(err).ToNot(HaveOccurred()) @@ -233,7 +236,9 @@ func Test_LRU(t *testing.T) { func Test_LRU_Add(t *testing.T) { g := NewWithT(t) - cache := NewLRU[IdentifiableObject](1, IdentifiableObjectKeyFunc) + cache, err := NewLRU[IdentifiableObject](1, IdentifiableObjectKeyFunc, + WithMetricsRegisterer[IdentifiableObject](prometheus.NewPedanticRegistry())) + g.Expect(err).ToNot(HaveOccurred()) // Add an object representing an expiring token obj := IdentifiableObject{ @@ -247,7 +252,7 @@ func Test_LRU_Add(t *testing.T) { }, Object: "test-token", } - err := cache.Add(obj) + err = cache.Add(obj) g.Expect(err).ToNot(HaveOccurred()) g.Expect(cache.ListKeys()).To(ConsistOf("test-ns_test_test-group_TestObject")) @@ -264,7 +269,9 @@ func Test_LRU_Add(t *testing.T) { func Test_LRU_Update(t *testing.T) { g := NewWithT(t) - cache := NewLRU[IdentifiableObject](1, IdentifiableObjectKeyFunc) + cache, err := NewLRU[IdentifiableObject](1, IdentifiableObjectKeyFunc, + WithMetricsRegisterer[IdentifiableObject](prometheus.NewPedanticRegistry())) + g.Expect(err).ToNot(HaveOccurred()) // Add an object representing an expiring token obj := IdentifiableObject{ @@ -278,7 +285,7 @@ func Test_LRU_Update(t *testing.T) { }, Object: "test-token", } - err := cache.Add(obj) + err = cache.Add(obj) g.Expect(err).ToNot(HaveOccurred()) g.Expect(cache.ListKeys()).To(ConsistOf("test-ns_test_test-group_TestObject")) @@ -291,7 +298,9 @@ func Test_LRU_Update(t *testing.T) { func Test_LRU_Get(t *testing.T) { g := NewWithT(t) - cache := NewLRU[IdentifiableObject](5, IdentifiableObjectKeyFunc) + cache, err := NewLRU[IdentifiableObject](5, IdentifiableObjectKeyFunc, + WithMetricsRegisterer[IdentifiableObject](prometheus.NewPedanticRegistry())) + g.Expect(err).ToNot(HaveOccurred()) // Add an object representing an expiring token obj := IdentifiableObject{ @@ -321,7 +330,9 @@ func Test_LRU_Get(t *testing.T) { func Test_LRU_Delete(t *testing.T) { g := NewWithT(t) - cache := NewLRU[IdentifiableObject](5, IdentifiableObjectKeyFunc) + cache, err := NewLRU[IdentifiableObject](5, IdentifiableObjectKeyFunc, + WithMetricsRegisterer[IdentifiableObject](prometheus.NewPedanticRegistry())) + g.Expect(err).ToNot(HaveOccurred()) // Add an object representing an expiring token obj := IdentifiableObject{ @@ -336,7 +347,7 @@ func Test_LRU_Delete(t *testing.T) { Object: "test-token", } - err := cache.Add(obj) + err = cache.Add(obj) g.Expect(err).ToNot(HaveOccurred()) err = cache.Delete(obj) diff --git a/cache/metrics.go b/cache/metrics.go index 3d1bb8e2b..23e5a9e79 100644 --- a/cache/metrics.go +++ b/cache/metrics.go @@ -17,8 +17,10 @@ limitations under the License. package cache import ( + "fmt" + "github.com/prometheus/client_golang/prometheus" - "sigs.k8s.io/controller-runtime/pkg/metrics" + "github.com/prometheus/client_golang/prometheus/promauto" ) const ( @@ -28,54 +30,121 @@ const ( CacheEventTypeHit = "cache_hit" ) -// CacheRecorder is a recorder for cache events. -type CacheRecorder struct { +type cacheMetrics struct { // cacheEventsCounter is a counter for cache events. - cacheEventsCounter *prometheus.CounterVec + cacheEventsCounter *prometheus.CounterVec + cacheItemsGauge prometheus.Gauge + cacheRequestsCounter *prometheus.CounterVec + cacheEvictionCounter prometheus.Counter + labels []string } -// NewCacheRecorder returns a new CacheRecorder. -// The configured labels are: event_type, name, namespace. -// The event_type is one of: -// - "miss" -// - "hit" -// - "update" -// -// The name is the name of the reconciled resource. -// The namespace is the namespace of the reconciled resource. -func NewCacheRecorder() *CacheRecorder { - return &CacheRecorder{ - cacheEventsCounter: prometheus.NewCounterVec( +// newcacheMetrics returns a new cacheMetrics. +func newCacheMetrics(reg prometheus.Registerer, labels ...string) *cacheMetrics { + labels = append([]string{"event_type"}, labels...) + return &cacheMetrics{ + cacheEventsCounter: promauto.With(reg).NewCounterVec( prometheus.CounterOpts{ Name: "gotk_cache_events_total", Help: "Total number of cache retrieval events for a Gitops Toolkit resource reconciliation.", }, - []string{"event_type", "name", "namespace"}, + labels, + ), + cacheItemsGauge: promauto.With(reg).NewGauge( + prometheus.GaugeOpts{ + Name: "gotk_cached_items", + Help: "Total number of items in the cache.", + }, + ), + cacheRequestsCounter: promauto.With(reg).NewCounterVec( + prometheus.CounterOpts{ + Name: "gotk_cache_requests_total", + Help: "Total number of cache requests partioned by success or failure.", + }, + []string{"status"}, + ), + cacheEvictionCounter: promauto.With(reg).NewCounter( + prometheus.CounterOpts{ + Name: "gotk_cache_evictions_total", + Help: "Total number of cache evictions.", + }, ), + labels: labels, } } -// Collectors returns the metrics.Collector objects for the CacheRecorder. -func (r *CacheRecorder) Collectors() []prometheus.Collector { +func (m *cacheMetrics) getLabels() []string { + return m.labels +} + +// collectors returns the metrics.Collector objects for the cacheMetrics. +func (m *cacheMetrics) collectors() []prometheus.Collector { return []prometheus.Collector{ - r.cacheEventsCounter, + m.cacheEventsCounter, + m.cacheItemsGauge, + m.cacheRequestsCounter, + m.cacheEvictionCounter, } } -// IncCacheEventCount increment by 1 the cache event count for the given event type, name and namespace. -func (r *CacheRecorder) IncCacheEvents(event, name, namespace string) { - r.cacheEventsCounter.WithLabelValues(event, name, namespace).Inc() +// incCacheEventCount increment by 1 the cache event count for the given event type, name and namespace. +func (m *cacheMetrics) incCacheEvents(event string, lvs ...string) { + lvs = append([]string{event}, lvs...) + m.cacheEventsCounter.WithLabelValues(lvs...).Inc() } -// DeleteCacheEvent deletes the cache event metric. -func (r *CacheRecorder) DeleteCacheEvent(event, name, namespace string) { - r.cacheEventsCounter.DeleteLabelValues(event, name, namespace) +// deleteCacheEvent deletes the cache event metric. +func (m *cacheMetrics) deleteCacheEvent(event string, lvs ...string) { + lvs = append([]string{event}, lvs...) + m.cacheEventsCounter.DeleteLabelValues(lvs...) } -// MustMakeMetrics creates a new CacheRecorder, and registers the metrics collectors in the controller-runtime metrics registry. -func MustMakeMetrics() *CacheRecorder { - r := NewCacheRecorder() - metrics.Registry.MustRegister(r.Collectors()...) +// SetCachedItems sets the number of cached items. +func (m *cacheMetrics) setCachedItems(value float64) { + m.cacheItemsGauge.Set(value) +} + +// incCacheItems increments the number of cached items by 1. +func (m *cacheMetrics) incCacheItems() { + m.cacheItemsGauge.Inc() +} + +// decCacheItems decrements the number of cached items by 1. +func (m *cacheMetrics) decCacheItems() { + m.cacheItemsGauge.Dec() +} + +// incCacheRequests increments the cache request count for the given status. +func (m *cacheMetrics) incCacheRequests(status string) { + m.cacheRequestsCounter.WithLabelValues(status).Inc() +} + +// incCacheEvictions increments the cache eviction count by 1. +func (m *cacheMetrics) incCacheEvictions() { + m.cacheEvictionCounter.Inc() +} + +// MustMakeMetrics registers the metrics collectors in the given registerer. +func MustMakeMetrics(r prometheus.Registerer, m *cacheMetrics) { + r.MustRegister(m.collectors()...) +} + +// IdentifiableObjectLabels are the labels for an IdentifiableObject. +var IdentifiableObjectLabels []string = []string{"name", "namespace", "kind"} + +// GetLvsFunc is a function that returns the label's values for a metric. +type GetLvsFunc[T any] func(obj T, cardinality int) ([]string, error) + +// IdentifiableObjectLVSFunc returns the label's values for a metric for an IdentifiableObject. +func IdentifiableObjectLVSFunc[T any](object T, cardinality int) ([]string, error) { + n, ok := any(object).(IdentifiableObject) + if !ok { + return nil, fmt.Errorf("object is not an IdentifiableObject") + } + lvs := []string{n.Name, n.Namespace, n.GroupKind.Kind} + if len(lvs) != cardinality { + return nil, fmt.Errorf("expected cardinality %d, got %d", cardinality, len(lvs)) + } - return r + return []string{n.Name, n.Namespace, n.GroupKind.Kind}, nil } diff --git a/cache/metrics_test.go b/cache/metrics_test.go new file mode 100644 index 000000000..03f1e6420 --- /dev/null +++ b/cache/metrics_test.go @@ -0,0 +1,61 @@ +/* +Copyright 2024 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "bytes" + "testing" + + . "github.com/onsi/gomega" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" +) + +func TestCacheMetrics(t *testing.T) { + g := NewWithT(t) + reg := prometheus.NewPedanticRegistry() + m := newCacheMetrics(reg, IdentifiableObjectLabels...) + g.Expect(m).ToNot(BeNil()) + + // CounterVec is a collection of counters and is not exported until it has counters in it. + m.incCacheEvents(CacheEventTypeHit, []string{"test", "test-ns", "TestObject"}...) + m.incCacheEvents(CacheEventTypeMiss, []string{"test", "test-ns", "TestObject"}...) + m.incCacheRequests("success") + m.incCacheRequests("failed") + + err := testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP gotk_cache_events_total Total number of cache retrieval events for a Gitops Toolkit resource reconciliation. + # TYPE gotk_cache_events_total counter + gotk_cache_events_total{event_type="cache_hit",kind="TestObject",name="test",namespace="test-ns"} 1 + gotk_cache_events_total{event_type="cache_miss",kind="TestObject",name="test",namespace="test-ns"} 1 + # HELP gotk_cache_evictions_total Total number of cache evictions. + # TYPE gotk_cache_evictions_total counter + gotk_cache_evictions_total 0 + # HELP gotk_cache_requests_total Total number of cache requests partioned by success or failure. + # TYPE gotk_cache_requests_total counter + gotk_cache_requests_total{status="failed"} 1 + gotk_cache_requests_total{status="success"} 1 + # HELP gotk_cached_items Total number of items in the cache. + # TYPE gotk_cached_items gauge + gotk_cached_items 0 + `)) + g.Expect(err).ToNot(HaveOccurred()) + + res, err := testutil.GatherAndLint(reg) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(res).To(BeEmpty()) +} diff --git a/cache/store.go b/cache/store.go index 93b4ed95c..ee39baf46 100644 --- a/cache/store.go +++ b/cache/store.go @@ -21,6 +21,7 @@ import ( "time" "github.com/fluxcd/cli-utils/pkg/object" + "github.com/prometheus/client_golang/prometheus" ) // Store is an interface for a cache store. @@ -69,3 +70,39 @@ func IdentifiableObjectKeyFunc[T any](object T) (string, error) { } return n.String(), nil } + +type cacheOptions[T any] struct { + interval time.Duration + registerer prometheus.Registerer + extraLabels []string + labelsFunc GetLvsFunc[T] +} + +type Options[T any] func(*cacheOptions[T]) error + +// WithMetricsLabels sets the extra labels for the cache metrics. +func WithMetricsLabels[T any](labels []string, f GetLvsFunc[T]) Options[T] { + return func(o *cacheOptions[T]) error { + if labels != nil && f == nil { + return fmt.Errorf("labelsFunc must be set if labels are provided") + } + o.extraLabels = labels + o.labelsFunc = f + return nil + } +} + +// WithCleanupInterval sets the interval for the cache cleanup. +func WithCleanupInterval[T any](interval time.Duration) Options[T] { + return func(o *cacheOptions[T]) error { + o.interval = interval + return nil + } +} + +func WithMetricsRegisterer[T any](r prometheus.Registerer) Options[T] { + return func(o *cacheOptions[T]) error { + o.registerer = r + return nil + } +}