Skip to content

Commit

Permalink
Simplify the cache
Browse files Browse the repository at this point in the history
- Remove dependency on kubernetes.
- Simplify the Store interface to only have Get, Set and Delete.
- Add new RecordCacheEvent() method in the cache implementations for
  recording the cache events, hit or miss, along with labels of the
  associated flux object being reconciled. The labels name, namespace
  and kind for this metric are not required to be configured when
  creating the cache. They are predefined in the created metrics
  recorder, similar to the runtime metrics recorder for the rest of
  flux. RecordCacheEvent() has to be called by the caller of cache
  operations explicitly along with the label information of the
  associated object being reconciled. The cache no longer has the
  knowledge of the object being reconciled, decoupled.
- With the decoupled cache and reconciling object for metrics, the
  storeOptions no longer need to have the generic type defined.
  Simplifying the usage.
- Add new DeleteCacheEvent() method in the cache implementations for
  deleting the cache events, hit or miss, which are associated with the
  object being reconciled. When the reconciling object is deleted, these
  metrics associated with the object must also be deleted.
- Updated all the tests to use simple string cache.
- Get() now returns a pointer instead of a separate exists boolean. If
  the pointer is nil, the item is not found in the cache.
- Get() takes a key and returns a cached item for it. GetByKey() is
  removed as Get() does the same.

Signed-off-by: Sunny <[email protected]>
  • Loading branch information
darkowlzz committed Nov 7, 2024
1 parent ac1007b commit 9e291d1
Show file tree
Hide file tree
Showing 10 changed files with 415 additions and 1,201 deletions.
167 changes: 54 additions & 113 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,17 @@ const (
defaultInterval = time.Minute
)

// Cache[T] is a thread-safe in-memory key/object store.
// It can be used to store objects with optional expiration.
// A function to extract the key from the object must be provided.
// Use the New function to create a new cache that is ready to use.
// Cache[T] is a thread-safe in-memory key/value store.
// It can be used to store items with optional expiration.
type Cache[T any] struct {
*cache[T]
// keyFunc is used to make the key for objects stored in and retrieved from index, and
// should be deterministic.
keyFunc KeyFunc[T]
}

// item is an item stored in the cache.
type item[T any] struct {
key string
// object is the item's object.
object T
// value is the item's value.
value T
// expiresAt is the item's expiration time.
expiresAt time.Time
}
Expand All @@ -61,41 +56,39 @@ type cache[T any] struct {
// It is initially true, and set to false when the items are not sorted.
sorted bool
// capacity is the maximum number of index the cache can hold.
capacity int
metrics *cacheMetrics
labelsFunc GetLvsFunc[T]
janitor *janitor[T]
closed bool
capacity int
metrics *cacheMetrics
janitor *janitor[T]
closed bool

mu sync.RWMutex
}

var _ Expirable[any] = &Cache[any]{}

// New creates a new cache with the given configuration.
func New[T any](capacity int, keyFunc KeyFunc[T], opts ...Options[T]) (*Cache[T], error) {
func New[T any](capacity int, opts ...Options) (*Cache[T], error) {
opt, err := makeOptions(opts...)
if err != nil {
return nil, fmt.Errorf("failed to apply options: %w", err)
}

c := &cache[T]{
index: make(map[string]*item[T]),
items: make([]*item[T], 0, capacity),
sorted: true,
capacity: capacity,
labelsFunc: opt.labelsFunc,
index: make(map[string]*item[T]),
items: make([]*item[T], 0, capacity),
sorted: true,
capacity: capacity,
janitor: &janitor[T]{
interval: opt.interval,
stop: make(chan bool),
},
}

if opt.registerer != nil {
c.metrics = newCacheMetrics(opt.registerer, opt.extraLabels...)
c.metrics = newCacheMetrics(opt.registerer)
}

C := &Cache[T]{cache: c, keyFunc: keyFunc}
C := &Cache[T]{cache: c}

if opt.interval > 0 {
go c.janitor.run(c)
Expand All @@ -104,8 +97,8 @@ func New[T any](capacity int, keyFunc KeyFunc[T], opts ...Options[T]) (*Cache[T]
return C, nil
}

func makeOptions[T any](opts ...Options[T]) (*storeOptions[T], error) {
opt := storeOptions[T]{}
func makeOptions(opts ...Options) (*storeOptions, error) {
opt := storeOptions{}
for _, o := range opts {
err := o(&opt)
if err != nil {
Expand All @@ -131,14 +124,8 @@ func (c *Cache[T]) Close() error {
}

// Set an item in the cache, existing index will be overwritten.
// If the cache is full, Add will return an error.
func (c *Cache[T]) Set(object T) error {
key, err := c.keyFunc(object)
if err != nil {
recordRequest(c.metrics, StatusFailure)
return &CacheError{Reason: ErrInvalidKey, Err: err}
}

// If the cache is full, an error is returned.
func (c *Cache[T]) Set(key string, value T) error {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
Expand All @@ -147,14 +134,14 @@ func (c *Cache[T]) Set(object T) error {
}
_, found := c.index[key]
if found {
c.set(key, object)
c.set(key, value)
c.mu.Unlock()
recordRequest(c.metrics, StatusSuccess)
return nil
}

if c.capacity > 0 && len(c.index) < c.capacity {
c.set(key, object)
c.set(key, value)
c.mu.Unlock()
recordRequest(c.metrics, StatusSuccess)
recordItemIncrement(c.metrics)
Expand All @@ -165,10 +152,10 @@ func (c *Cache[T]) Set(object T) error {
return ErrCacheFull
}

func (c *cache[T]) set(key string, object T) {
func (c *cache[T]) set(key string, value T) {
item := item[T]{
key: key,
object: object,
value: value,
expiresAt: time.Now().Add(noExpiration),
}

Expand All @@ -181,86 +168,41 @@ func (c *cache[T]) set(key string, object T) {
c.items = append(c.items, &item)
}

// Get an item from the cache. Returns the item or nil, and a bool indicating
// whether the key was found.
func (c *Cache[T]) Get(object T) (item T, exists bool, err error) {
var res T
lvs := []string{}
if c.labelsFunc != nil {
lvs, err = c.labelsFunc(object, len(c.metrics.getExtraLabels()))
if err != nil {
recordRequest(c.metrics, StatusFailure)
return res, false, &CacheError{Reason: ErrInvalidLabels, Err: err}
}
}
key, err := c.keyFunc(object)
if err != nil {
recordRequest(c.metrics, StatusFailure)
return res, false, &CacheError{Reason: ErrInvalidKey, Err: err}
}
item, found, err := c.get(key)
if err != nil {
return res, false, err
}
if !found {
recordEvent(c.metrics, CacheEventTypeMiss, lvs...)
return res, false, nil
}
recordEvent(c.metrics, CacheEventTypeHit, lvs...)
return item, true, nil
}

// GetByKey returns the object for the given key.
func (c *Cache[T]) GetByKey(key string) (T, bool, error) {
var res T
index, found, err := c.get(key)
if err != nil {
return res, false, err
}
if !found {
recordEvent(c.metrics, CacheEventTypeMiss)
return res, false, nil
}

recordEvent(c.metrics, CacheEventTypeHit)
return index, true, nil
}

func (c *cache[T]) get(key string) (T, bool, error) {
var res T
// Get returns a pointer to an item in the cache for the given key. If no item
// is found, it's a nil pointer.
// The caller can record cache hit or miss based on the result with
// Cache.RecordCacheEvent().
func (c *Cache[T]) Get(key string) (*T, error) {
c.mu.RLock()
if c.closed {
c.mu.RUnlock()
recordRequest(c.metrics, StatusFailure)
return res, false, ErrCacheClosed
return nil, ErrCacheClosed
}
item, found := c.index[key]
if !found {
c.mu.RUnlock()
recordRequest(c.metrics, StatusSuccess)
return res, false, nil
return nil, nil
}
if !item.expiresAt.IsZero() {
if item.expiresAt.Compare(time.Now()) < 0 {
c.mu.RUnlock()
recordRequest(c.metrics, StatusSuccess)
return res, false, nil
return nil, nil
}
}
c.mu.RUnlock()
recordRequest(c.metrics, StatusSuccess)
return item.object, true, nil
// Copy the value to prevent writes to the cached item.
r := item.value
return &r, nil
}

// Delete an item from the cache. Does nothing if the key is not in the cache.
// It actually sets the item expiration to `now“, so that it will be deleted at
// the cleanup.
func (c *Cache[T]) Delete(object T) error {
key, err := c.keyFunc(object)
if err != nil {
recordRequest(c.metrics, StatusFailure)
return &CacheError{Reason: ErrInvalidKey, Err: err}
}
func (c *Cache[T]) Delete(key string) error {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
Expand Down Expand Up @@ -355,13 +297,7 @@ func (c *cache[T]) Resize(size int) (int, error) {
}

// HasExpired returns true if the item has expired.
func (c *Cache[T]) HasExpired(object T) (bool, error) {
key, err := c.keyFunc(object)
if err != nil {
recordRequest(c.metrics, StatusFailure)
return false, &CacheError{Reason: ErrInvalidKey, Err: err}
}

func (c *Cache[T]) HasExpired(key string) (bool, error) {
c.mu.RLock()
if c.closed {
c.mu.RUnlock()
Expand All @@ -387,13 +323,7 @@ func (c *Cache[T]) HasExpired(object T) (bool, error) {
}

// SetExpiration sets the expiration for the given key.
func (c *Cache[T]) SetExpiration(object T, expiration time.Time) error {
key, err := c.keyFunc(object)
if err != nil {
recordRequest(c.metrics, StatusFailure)
return &CacheError{Reason: ErrInvalidKey, Err: err}
}

func (c *Cache[T]) SetExpiration(key string, expiration time.Time) error {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
Expand All @@ -417,12 +347,7 @@ func (c *Cache[T]) SetExpiration(object T, expiration time.Time) error {
// GetExpiration returns the expiration for the given key.
// Returns zero if the key is not in the cache or the item
// has already expired.
func (c *Cache[T]) GetExpiration(object T) (time.Time, error) {
key, err := c.keyFunc(object)
if err != nil {
recordRequest(c.metrics, StatusFailure)
return time.Time{}, &CacheError{Reason: ErrInvalidKey, Err: err}
}
func (c *Cache[T]) GetExpiration(key string) (time.Time, error) {
c.mu.RLock()
if c.closed {
c.mu.RUnlock()
Expand Down Expand Up @@ -481,6 +406,22 @@ func (c *cache[T]) deleteExpired() {
c.mu.Unlock()
}

// RecordCacheEvent records a cache event (cache_miss or cache_hit) with kind,
// name and namespace of the associated object being reconciled.
func (c *Cache[T]) RecordCacheEvent(event, kind, name, namespace string) {
if c.metrics != nil {
c.metrics.incCacheEvents(event, kind, name, namespace)
}
}

// DeleteCacheEvent deletes the cache event (cache_miss or cache_hit) metric for
// the associated object being reconciled, given their kind, name and namespace.
func (c *Cache[T]) DeleteCacheEvent(event, kind, name, namespace string) {
if c.metrics != nil {
c.metrics.deleteCacheEvent(event, kind, name, namespace)
}
}

type janitor[T any] struct {
interval time.Duration
stop chan bool
Expand Down
Loading

0 comments on commit 9e291d1

Please sign in to comment.