Skip to content

Commit

Permalink
Add instrumentation for the cache implementations
Browse files Browse the repository at this point in the history
The cache instrumentation purpose is to be self contained. We want to
avoid leaking any of the logic here in consuming code.

Signed-off-by: Soule BA <[email protected]>
  • Loading branch information
souleb committed Apr 30, 2024
1 parent 64822d5 commit 21c60f7
Show file tree
Hide file tree
Showing 10 changed files with 384 additions and 77 deletions.
88 changes: 76 additions & 12 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,32 +42,46 @@ type cache[T any] struct {
// Items holds the elements in the cache.
Items map[string]Item[T]
// MaxItems is the maximum number of items the cache can hold.
MaxItems int
mu sync.RWMutex
janitor *janitor[T]
closed bool
MaxItems int
metrics *cacheMetrics
labelsFunc GetLvsFunc[T]
janitor *janitor[T]
closed bool

mu sync.RWMutex
}

var _ Expirable[any] = &Cache[any]{}

// New creates a new cache with the given configuration.
func New[T any](maxItems int, keyFunc KeyFunc[T], interval time.Duration) *Cache[T] {
func New[T any](maxItems int, keyFunc KeyFunc[T], opts ...Options[T]) (*Cache[T], error) {
opt := cacheOptions[T]{}
for _, o := range opts {
err := o(&opt)
if err != nil {
return nil, err
}
}
if opt.registerer == nil {
return nil, ErrNoRegisterer
}
c := &cache[T]{
Items: make(map[string]Item[T]),
MaxItems: maxItems,
metrics: newCacheMetrics(opt.registerer, opt.extraLabels...),
janitor: &janitor[T]{
interval: interval,
interval: opt.interval,
stop: make(chan bool),
},
}

C := &Cache[T]{cache: c, keyFunc: keyFunc}

if interval > 0 {
if opt.interval > 0 {
go c.janitor.run(c)
}

return C
return C, nil
}

// Close closes the cache. It also stops the expiration eviction process.
Expand All @@ -94,17 +108,21 @@ func (c *Cache[T]) Add(object T) error {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
c.metrics.incCacheRequests("failed")
return KeyError{object, ErrClosed}
}
_, found := c.Items[key]
if found {
c.mu.Unlock()
c.metrics.incCacheRequests("failed")
return KeyError{object, ErrAlreadyExists}
}

if c.MaxItems > 0 && len(c.Items) < c.MaxItems {
c.set(key, object)
c.mu.Unlock()
c.metrics.incCacheRequests("success")
c.metrics.incCacheItems()
return nil
}
c.mu.Unlock()
Expand All @@ -122,18 +140,22 @@ func (c *Cache[T]) Update(object T) error {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
c.metrics.incCacheRequests("failed")
return KeyError{object, ErrClosed}
}
_, found := c.Items[key]
if found {
c.set(key, object)
c.mu.Unlock()
c.metrics.incCacheRequests("success")
return nil
}

if c.MaxItems > 0 && len(c.Items) < c.MaxItems {
c.set(key, object)
c.mu.Unlock()
c.metrics.incCacheRequests("success")
c.metrics.incCacheItems()
return nil
}
c.mu.Unlock()
Expand All @@ -150,8 +172,15 @@ func (c *cache[T]) set(key string, object T) {

// Get an item from the cache. Returns the item or nil, and a bool indicating
// whether the key was found.
func (c *Cache[T]) Get(object T) (T, bool, error) {
func (c *Cache[T]) Get(object T) (item T, exists bool, err error) {
var res T
lvs := []string{}
if c.labelsFunc != nil {
lvs, err = c.labelsFunc(object, len(c.metrics.getLabels()))
if err != nil {
return res, false, KeyError{object, err}
}
}
key, err := c.keyFunc(object)
if err != nil {
return res, false, KeyError{object, err}
Expand All @@ -161,34 +190,51 @@ func (c *Cache[T]) Get(object T) (T, bool, error) {
return res, false, KeyError{object, err}
}
if !found {
c.metrics.incCacheEvents(CacheEventTypeMiss, lvs...)
return res, false, nil
}
c.metrics.incCacheEvents(CacheEventTypeHit, lvs...)
return item, true, nil
}

func (c *Cache[T]) GetByKey(key string) (T, bool, error) {
return c.get(key)
var res T
items, found, err := c.get(key)
if err != nil {
return res, false, err
}
if !found {
c.metrics.incCacheEvents(CacheEventTypeMiss)
return res, false, nil
}

c.metrics.incCacheEvents(CacheEventTypeHit)
return items, true, nil
}

func (c *cache[T]) get(key string) (T, bool, error) {
var res T
c.mu.RLock()
if c.closed {
c.mu.RUnlock()
c.metrics.incCacheRequests("failed")
return res, false, ErrClosed
}
item, found := c.Items[key]
if !found {
c.mu.RUnlock()
c.metrics.incCacheRequests("success")
return res, false, nil
}
if item.Expiration > 0 {
if item.Expiration < time.Now().UnixNano() {
c.mu.RUnlock()
return res, false, ErrExpired
c.metrics.incCacheRequests("succes")
return res, false, nil
}
}
c.mu.RUnlock()
c.metrics.incCacheRequests("success")
return item.Object, true, nil
}

Expand All @@ -201,10 +247,13 @@ func (c *Cache[T]) Delete(object T) error {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
c.metrics.incCacheRequests("failed")
return KeyError{object, ErrClosed}
}
delete(c.Items, key)
c.mu.Unlock()
c.metrics.incCacheRequests("success")
c.metrics.decCacheItems()
return nil
}

Expand All @@ -228,13 +277,15 @@ func (c *cache[T]) ListKeys() []string {
c.mu.RLock()
if c.closed {
c.mu.RUnlock()
c.metrics.incCacheRequests("failed")
return nil
}
keys := make([]string, 0, len(c.Items))
for k := range c.Items {
keys = append(keys, k)
}
c.mu.RUnlock()
c.metrics.incCacheRequests("success")
return keys
}

Expand All @@ -248,20 +299,24 @@ func (c *Cache[T]) HasExpired(object T) (bool, error) {
c.mu.RLock()
if c.closed {
c.mu.RUnlock()
c.metrics.incCacheRequests("failed")
return false, KeyError{object, ErrClosed}
}
item, ok := c.Items[key]
if !ok {
c.mu.RUnlock()
c.metrics.incCacheRequests("success")
return true, nil
}
if item.Expiration > 0 {
if item.Expiration < time.Now().UnixNano() {
c.mu.RUnlock()
c.metrics.incCacheRequests("success")
return true, nil
}
}
c.mu.RUnlock()
c.metrics.incCacheRequests("success")
return false, nil
}

Expand All @@ -276,6 +331,7 @@ func (c *Cache[T]) SetExpiration(object T, expiration time.Duration) error {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
c.metrics.incCacheRequests("failed")
return KeyError{object, ErrClosed}
}
item, ok := c.Items[key]
Expand All @@ -284,6 +340,7 @@ func (c *Cache[T]) SetExpiration(object T, expiration time.Duration) error {
c.Items[key] = item
}
c.mu.Unlock()
c.metrics.incCacheRequests("success")
return nil
}

Expand All @@ -298,20 +355,24 @@ func (c *Cache[T]) GetExpiration(object T) (time.Duration, error) {
c.mu.RLock()
if c.closed {
c.mu.RUnlock()
c.metrics.incCacheRequests("failed")
return 0, KeyError{object, ErrClosed}
}
item, ok := c.Items[key]
if !ok {
c.mu.RUnlock()
c.metrics.incCacheRequests("success")
return 0, KeyError{object, ErrNotFound}
}
if item.Expiration > 0 {
if item.Expiration < time.Now().UnixNano() {
c.mu.RUnlock()
return 0, KeyError{object, ErrExpired}
c.metrics.incCacheRequests("success")
return 0, nil
}
}
c.mu.RUnlock()
c.metrics.incCacheRequests("success")
return time.Duration(item.Expiration - time.Now().UnixNano()), nil
}

Expand All @@ -320,14 +381,17 @@ func (c *cache[T]) DeleteExpired() {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
c.metrics.incCacheRequests("failed")
return
}
for k, v := range c.Items {
if v.Expiration > 0 && v.Expiration < time.Now().UnixNano() {
delete(c.Items, k)
c.metrics.incCacheEvictions()
}
}
c.mu.Unlock()
c.metrics.incCacheRequests("success")
}

type janitor[T any] struct {
Expand Down
12 changes: 9 additions & 3 deletions cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/fluxcd/cli-utils/pkg/object"
. "github.com/onsi/gomega"
"github.com/prometheus/client_golang/prometheus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
kc "k8s.io/client-go/tools/cache"
Expand All @@ -38,7 +39,9 @@ func TestCache(t *testing.T) {
t.Run("Add and update keys", func(t *testing.T) {
g := NewWithT(t)
// create a cache that can hold 2 items and have no cleanup
cache := New(3, kc.MetaNamespaceKeyFunc, 0)
cache, err := New(3, kc.MetaNamespaceKeyFunc,
WithMetricsRegisterer[any](prometheus.NewPedanticRegistry()))
g.Expect(err).ToNot(HaveOccurred())

obj := &testObject{
TypeMeta: metav1.TypeMeta{
Expand Down Expand Up @@ -116,7 +119,10 @@ func TestCache(t *testing.T) {
t.Run("Add expiring keys", func(t *testing.T) {
g := NewWithT(t)
// new cache with a cleanup interval of 1 second
cache := New[IdentifiableObject](2, IdentifiableObjectKeyFunc, 1*time.Second)
cache, err := New[IdentifiableObject](2, IdentifiableObjectKeyFunc,
WithCleanupInterval[IdentifiableObject](1*time.Second),
WithMetricsRegisterer[IdentifiableObject](prometheus.NewPedanticRegistry()))
g.Expect(err).ToNot(HaveOccurred())

// Add an object representing an expiring token
obj := IdentifiableObject{
Expand All @@ -135,7 +141,7 @@ func TestCache(t *testing.T) {
},
}

err := cache.Add(obj)
err = cache.Add(obj)
g.Expect(err).ToNot(HaveOccurred())

// set expiration time to 2 seconds
Expand Down
1 change: 1 addition & 0 deletions cache/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var (
ErrClosed = fmt.Errorf("cache closed")
ErrFull = fmt.Errorf("cache full")
ErrExpired = fmt.Errorf("key has expired")
ErrNoRegisterer = fmt.Errorf("no prometheus registerer provided")
)

// KeyError will be returned any time a KeyFunc gives an error; it includes the object
Expand Down
1 change: 0 additions & 1 deletion cache/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ require (
github.com/prometheus/client_golang v1.19.0
k8s.io/apimachinery v0.30.0
k8s.io/client-go v0.30.0
sigs.k8s.io/controller-runtime v0.18.0
)

require (
Expand Down
4 changes: 0 additions & 4 deletions cache/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand Down Expand Up @@ -285,8 +283,6 @@ k8s.io/kubectl v0.30.0 h1:xbPvzagbJ6RNYVMVuiHArC1grrV5vSmmIcSZuCdzRyk=
k8s.io/kubectl v0.30.0/go.mod h1:zgolRw2MQXLPwmic2l/+iHs239L49fhSeICuMhQQXTI=
k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSnlTLKgpAAttJvpI=
k8s.io/utils v0.0.0-20230726121419-3b25d923346b/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
sigs.k8s.io/controller-runtime v0.18.0 h1:Z7jKuX784TQSUL1TIyeuF7j8KXZ4RtSX0YgtjKcSTME=
sigs.k8s.io/controller-runtime v0.18.0/go.mod h1:tuAt1+wbVsXIT8lPtk5RURxqAnq7xkpv2Mhttslg7Hw=
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo=
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0=
sigs.k8s.io/kustomize/api v0.17.1 h1:MYJBOP/yQ3/5tp4/sf6HiiMfNNyO97LmtnirH9SLNr4=
Expand Down
Loading

0 comments on commit 21c60f7

Please sign in to comment.