Skip to content

Commit

Permalink
Implement cache cleaner with object finalizer
Browse files Browse the repository at this point in the history
  • Loading branch information
motoki317 committed May 3, 2022
1 parent 289adfa commit 3b52584
Show file tree
Hide file tree
Showing 11 changed files with 211 additions and 27 deletions.
10 changes: 10 additions & 0 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ type backend[K comparable, V any] interface {
Set(key K, v V)
// Delete the value for key.
Delete(key K)
// DeleteIf deletes all values that match the predicate.
DeleteIf(predicate func(key K, value V) bool)
// Purge all values.
Purge()
}
Expand All @@ -37,6 +39,14 @@ func (m mapBackend[K, V]) Delete(key K) {
delete(m, key)
}

func (m mapBackend[K, V]) DeleteIf(predicate func(key K, value V) bool) {
for k, v := range m {
if predicate(k, v) {
delete(m, k)
}
}
}

func (m mapBackend[K, V]) Purge() {
// This form is optimized by the Go-compiler; it calls faster internal mapclear() instead of looping, and avoids
// allocating new memory.
Expand Down
84 changes: 72 additions & 12 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sc
import (
"context"
"errors"
"runtime"
"sync"
"time"
)
Expand Down Expand Up @@ -58,14 +59,55 @@ func New[K comparable, V any](replaceFn replaceFunc[K, V], freshFor, ttl time.Du
return nil, errors.New("unknown cache backend")
}

return &Cache[K, V]{
values: b,
calls: make(map[K]*call[V]),
fn: replaceFn,
freshFor: freshFor,
ttl: ttl,
strictCoalescing: config.enableStrictCoalescing,
}, nil
c := &Cache[K, V]{
cache: &cache[K, V]{
values: b,
calls: make(map[K]*call[V]),
fn: replaceFn,
freshFor: freshFor,
ttl: ttl,
strictCoalescing: config.enableStrictCoalescing,
},
}

if config.cleanupInterval > 0 {
closer := make(chan struct{})
c.cl = newCleaner(c.cache, config.cleanupInterval, closer)
runtime.SetFinalizer(c, stopCleaner[K, V])
}

return c, nil
}

type cleaner[K comparable, V any] struct {
ticker *time.Ticker
closer chan struct{}
c *cache[K, V]
}

func newCleaner[K comparable, V any](c *cache[K, V], interval time.Duration, closer chan struct{}) *cleaner[K, V] {
cl := &cleaner[K, V]{
ticker: time.NewTicker(interval),
closer: closer,
c: c,
}
go cl.run()
return cl
}

func (c *cleaner[K, V]) run() {
for {
select {
case <-c.ticker.C:
c.c.cleanup()
case <-c.closer:
return
}
}
}

func stopCleaner[K comparable, V any](c *Cache[K, V]) {
c.cl.closer <- struct{}{}
}

// Cache represents a single cache instance.
Expand All @@ -75,6 +117,14 @@ func New[K comparable, V any](replaceFn replaceFunc[K, V], freshFor, ttl time.Du
// Notice that Cache doesn't have Set(key K, value V) method - this is intentional. Users are expected to delegate
// the cache replacement logic to Cache by simply calling Get.
type Cache[K comparable, V any] struct {
*cache[K, V]
cl *cleaner[K, V]
}

// cache is the actual cache instance.
// See https://github.com/patrickmn/go-cache/blob/46f407853014144407b6c2ec7ccc76bf67958d93/cache.go#L1115
// for the reason Cache and cache is separate.
type cache[K comparable, V any] struct {
values backend[K, value[V]]
calls map[K]*call[V]
mu sync.Mutex // mu protects values and calls
Expand All @@ -89,7 +139,7 @@ type Cache[K comparable, V any] struct {
// May return a stale item (older than freshFor, but younger than ttl) while a single goroutine is launched
// in the background to update the cache.
// Returns an error as it is if replaceFn returns an error.
func (c *Cache[K, V]) Get(ctx context.Context, key K) (V, error) {
func (c *cache[K, V]) Get(ctx context.Context, key K) (V, error) {
// Record time as soon as Get is called *before acquiring the lock* - this maximizes the reuse of values
t0 := time.Now()
c.mu.Lock()
Expand Down Expand Up @@ -145,7 +195,7 @@ retry:
// Forget instructs the cache to forget about the key.
// Corresponding item will be deleted, ongoing cache replacement results (if any) will not be added to the cache,
// and any future Get calls will immediately retrieve a new item.
func (c *Cache[K, V]) Forget(key K) {
func (c *cache[K, V]) Forget(key K) {
c.mu.Lock()
if ca, ok := c.calls[key]; ok {
ca.forgotten = true
Expand All @@ -158,7 +208,7 @@ func (c *Cache[K, V]) Forget(key K) {
// Purge instructs the cache to delete all values, and Forget about all ongoing calls.
// Note that frequently calling Purge will worsen the cache performance.
// If you only need to Forget about a specific key, use Forget instead.
func (c *Cache[K, V]) Purge() {
func (c *cache[K, V]) Purge() {
c.mu.Lock()
for _, cl := range c.calls {
cl.forgotten = true
Expand All @@ -168,7 +218,7 @@ func (c *Cache[K, V]) Purge() {
c.mu.Unlock()
}

func (c *Cache[K, V]) set(ctx context.Context, cl *call[V], key K) {
func (c *cache[K, V]) set(ctx context.Context, cl *call[V], key K) {
// Record time *just before* fn() is called - this maximizes the reuse of values
cl.val.t = time.Now()
cl.val.v, cl.err = c.fn(ctx, key)
Expand All @@ -184,3 +234,13 @@ func (c *Cache[K, V]) set(ctx context.Context, cl *call[V], key K) {
c.mu.Unlock()
cl.wg.Done()
}

// cleanup cleans up expired items from the cache, freeing memory.
func (c *cache[K, V]) cleanup() {
c.mu.Lock()
now := time.Now() // Record time after acquiring the lock to maximize freeing of expired items
c.values.DeleteIf(func(key K, value value[V]) bool {
return value.isExpired(now, c.ttl)
})
c.mu.Unlock()
}
62 changes: 62 additions & 0 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sc
import (
"context"
"errors"
"runtime"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -910,3 +911,64 @@ func TestCache_ZeroTimeCache(t *testing.T) {
})
}
}

// TestCleaningCache tests caches with cleaner option, which will clean up expired items on a regular interval.
func TestCleaningCache(t *testing.T) {
t.Parallel()

for _, c := range allCaches(10) {
c := c
t.Run(c.name, func(t *testing.T) {
t.Parallel()

var cnt int64
replaceFn := func(ctx context.Context, key string) (string, error) {
atomic.AddInt64(&cnt, 1)
return "value-" + key, nil
}
cache, err := New(replaceFn, 700*time.Millisecond, 1000*time.Millisecond, append(c.cacheOpts, WithCleanupInterval(300*time.Millisecond))...)
assert.NoError(t, err)

// t=0ms, cache the value
v, err := cache.Get(context.Background(), "k1")
assert.NoError(t, err)
assert.Equal(t, "value-k1", v)
assert.EqualValues(t, 1, atomic.LoadInt64(&cnt))

time.Sleep(400 * time.Millisecond)
// t=400ms, value is still cached and fresh
v, err = cache.Get(context.Background(), "k1")
assert.NoError(t, err)
assert.Equal(t, "value-k1", v)
assert.EqualValues(t, 1, atomic.LoadInt64(&cnt))

time.Sleep(1 * time.Second)
// t=1400ms, expired value is automatically removed from the cache, freeing memory
// although, this has no effect if viewed from the public interface of Cache
v, err = cache.Get(context.Background(), "k1")
assert.NoError(t, err)
assert.Equal(t, "value-k1", v)
assert.EqualValues(t, 2, atomic.LoadInt64(&cnt))
})
}
}

// TestCleaningCacheFinalizer tests that cache finalizers to stop cleaner is working.
// Since there's not really a good way of ensuring call to the finalizer, this just increases the test coverage.
func TestCleaningCacheFinalizer(t *testing.T) {
t.Parallel()

for _, c := range allCaches(10) {
c := c
t.Run(c.name, func(t *testing.T) {
t.Parallel()

replaceFn := func(_ context.Context, _ struct{}) (string, error) { return "", nil }
c, err := New(replaceFn, time.Hour, time.Hour, append(c.cacheOpts, WithCleanupInterval(time.Second))...)
assert.NoError(t, err)

_, _ = c.Get(context.Background(), struct{}{})
runtime.GC() // finalizer is called and cleaner is stopped
})
}
}
18 changes: 18 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package sc

import (
"time"
)

// CacheOption represents a single cache option.
// See other package-level functions which return CacheOption for more details.
type CacheOption func(c *cacheConfig)
Expand All @@ -8,6 +12,7 @@ type cacheConfig struct {
enableStrictCoalescing bool
backend cacheBackendType
capacity int
cleanupInterval time.Duration
}

type cacheBackendType int
Expand All @@ -23,6 +28,7 @@ func defaultConfig() cacheConfig {
enableStrictCoalescing: false,
backend: cacheBackendMap,
capacity: 0,
cleanupInterval: 0,
}
}

Expand Down Expand Up @@ -72,3 +78,15 @@ func EnableStrictCoalescing() CacheOption {
c.enableStrictCoalescing = true
}
}

// WithCleanupInterval specifies cleanup interval of expired items.
//
// Note that by default, a cache will be initialized without a cleaner.
// Try tuning your cache size (and using non-map backend) before using this option.
// Using cleanup interval on a cache with many items may decrease the through-put,
// since the cleaner has to take a lock to iterate through all items.
func WithCleanupInterval(interval time.Duration) CacheOption {
return func(c *cacheConfig) {
c.cleanupInterval = interval
}
}
5 changes: 1 addition & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@ module github.com/motoki317/sc

go 1.18

require (
github.com/motoki317/lru v0.0.3
github.com/stretchr/testify v1.7.1
)
require github.com/stretchr/testify v1.7.1

require (
github.com/davecgh/go-spew v1.1.1 // indirect
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/motoki317/lru v0.0.3 h1:+WETs/rojxz4y4KVAQM1syZLVQilL/BD1Lxb2uSSQ1A=
github.com/motoki317/lru v0.0.3/go.mod h1:KWwfc+XCJ4hn+RM4f1+017dyzXWYkHWj6nouX3bU6PM=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down
9 changes: 9 additions & 0 deletions lru/lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ func (c *Cache[K, V]) Delete(key K) bool {
return true
}

// DeleteIf deletes all elements that match the predicate.
func (c *Cache[K, V]) DeleteIf(predicate func(key K, value V) bool) {
for k, v := range c.items {
if predicate(k, v.Value.value) {
c.deleteElement(v)
}
}
}

// DeleteOldest deletes the oldest item from the cache.
func (c *Cache[K, V]) DeleteOldest() (key K, value V, ok bool) {
if e := c.ll.Back(); e != nil {
Expand Down
35 changes: 29 additions & 6 deletions lru/lru_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestCapacity(t *testing.T) {
}
}

func TestGet(t *testing.T) {
func TestCache_Get(t *testing.T) {
t.Run("missing", func(t *testing.T) {
lru := lru.New[int, int]()

Expand All @@ -56,7 +56,7 @@ func TestGet(t *testing.T) {
})
}

func TestPeek(t *testing.T) {
func TestCache_Peek(t *testing.T) {
t.Run("missing", func(t *testing.T) {
lru := lru.New[int, int]()

Expand All @@ -75,7 +75,7 @@ func TestPeek(t *testing.T) {
})
}

func TestSet(t *testing.T) {
func TestCache_Set(t *testing.T) {
t.Run("missing", func(t *testing.T) {
lru := lru.New[int, int]()

Expand All @@ -97,7 +97,7 @@ func TestSet(t *testing.T) {
})
}

func TestDelete(t *testing.T) {
func TestCache_Delete(t *testing.T) {
t.Run("missing", func(t *testing.T) {
lru := lru.New[int, int]()

Expand All @@ -118,7 +118,30 @@ func TestDelete(t *testing.T) {
})
}

func TestDeleteOldest(t *testing.T) {
func TestCache_DeleteIf(t *testing.T) {
lru := lru.New[int, int]()

lru.Set(1, 10)
lru.Set(2, 10)
lru.Set(3, 10)
lru.Set(4, 10)

lru.DeleteIf(func(key int, value int) bool {
return key%2 == 0
})

require.Equal(t, 2, lru.Len())
_, ok := lru.Peek(1)
require.True(t, ok)
_, ok = lru.Peek(2)
require.False(t, ok)
_, ok = lru.Peek(3)
require.True(t, ok)
_, ok = lru.Peek(4)
require.False(t, ok)
}

func TestCache_DeleteOldest(t *testing.T) {
t.Run("missing", func(t *testing.T) {
lru := lru.New[int, int]()

Expand All @@ -145,7 +168,7 @@ func TestDeleteOldest(t *testing.T) {
})
}

func TestFlush(t *testing.T) {
func TestCache_Flush(t *testing.T) {
lru := lru.New[int, int]()

key, value := 1, 100
Expand Down
2 changes: 1 addition & 1 deletion sc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func strictCaches(cap int) []testCase {
return Map(nonStrictCaches(cap), func(t testCase, _ int) testCase {
return testCase{
name: "strict " + t.name,
cacheOpts: append(append([]CacheOption{}, t.cacheOpts...), EnableStrictCoalescing()),
cacheOpts: append(t.cacheOpts, EnableStrictCoalescing()),
}
})
}
Expand Down
Loading

0 comments on commit 3b52584

Please sign in to comment.