Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: alanprot <[email protected]>
  • Loading branch information
alanprot committed Nov 6, 2024
1 parent 49429c9 commit a5e8997
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 32 deletions.
38 changes: 19 additions & 19 deletions pkg/storage/tsdb/expanded_postings_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,17 @@ func (c *BlocksPostingsForMatchersCache) fetchPostings(blockID ulid.ULID, ix tsd
return c.result(promise)
}

func (c *BlocksPostingsForMatchersCache) result(promise *cacheEntryPromise[[]storage.SeriesRef]) func(ctx context.Context) (index.Postings, error) {
func (c *BlocksPostingsForMatchersCache) result(ce *cacheEntryPromise[[]storage.SeriesRef]) func(ctx context.Context) (index.Postings, error) {
return func(ctx context.Context) (index.Postings, error) {
ids, err := promise.result(ctx)
return index.NewListPostings(ids), err
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-ce.done:
if ctx.Err() != nil {
return nil, ctx.Err()
}
return index.NewListPostings(ce.v), ce.err
}
}
}

Expand Down Expand Up @@ -327,9 +334,15 @@ func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)
c.expire()
}

// If is cached but is expired, lets try to replace the cache value
if ok && loaded.(*cacheEntryPromise[V]).isExpired(c.cfg.Ttl, c.timeNow()) {
if c.cachedValues.CompareAndSwap(k, loaded, r) {
if ok {

// If the promise is already in the cache, lets wait it to fetch the data.
select {
case <-loaded.(*cacheEntryPromise[V]).done:
}

// If is cached but is expired, lets try to replace the cache value.
if loaded.(*cacheEntryPromise[V]).isExpired(c.cfg.Ttl, c.timeNow()) && c.cachedValues.CompareAndSwap(k, loaded, r) {
r.v, r.sizeBytes, r.err = fetch()
r.sizeBytes += int64(len(k))
c.updateSize(loaded.(*cacheEntryPromise[V]).sizeBytes, r.sizeBytes)
Expand Down Expand Up @@ -404,19 +417,6 @@ type cacheEntryPromise[V any] struct {
err error
}

func (ce *cacheEntryPromise[V]) result(ctx context.Context) (V, error) {
select {
case <-ctx.Done():
return ce.v, ctx.Err()
case <-ce.done:
if ctx.Err() != nil {
return ce.v, ctx.Err()
}

return ce.v, ce.err
}
}

func (ce *cacheEntryPromise[V]) isExpired(ttl time.Duration, now time.Time) bool {
ts := ce.ts
r := now.Sub(ts)
Expand Down
17 changes: 4 additions & 13 deletions pkg/storage/tsdb/expanded_postings_cache_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package tsdb

import (
"context"
"fmt"
"strings"
"sync"
Expand Down Expand Up @@ -54,9 +53,7 @@ func TestFifoCacheDisabled(t *testing.T) {
return 1, 0, nil
})
require.False(t, loaded)
v, err := old.result(context.Background())
require.NoError(t, err)
require.Equal(t, 1, v)
require.Equal(t, 1, old.v)
require.False(t, cache.contains("key1"))
}

Expand Down Expand Up @@ -101,17 +98,13 @@ func TestFifoCacheExpire(t *testing.T) {
return 1, 8, nil
})
require.False(t, loaded)
v, err := p.result(context.Background())
require.NoError(t, err)
require.Equal(t, 1, v)
require.Equal(t, 1, p.v)
require.True(t, cache.contains(key))
p, loaded = cache.getPromiseForKey(key, func() (int, int64, error) {
return 1, 0, nil
})
require.True(t, loaded)
v, err = p.result(context.Background())
require.NoError(t, err)
require.Equal(t, 1, v)
require.Equal(t, 1, p.v)
}

totalCacheSize := 0
Expand All @@ -137,10 +130,8 @@ func TestFifoCacheExpire(t *testing.T) {
return 2, 18, nil
})
require.False(t, loaded)
v, err := p.result(context.Background())
require.NoError(t, err)
// New value
require.Equal(t, 2, v)
require.Equal(t, 2, p.v)
// Total Size Updated
require.Equal(t, originalSize+10, cache.cachedBytes)
}
Expand Down

0 comments on commit a5e8997

Please sign in to comment.