Skip to content

Commit

Permalink
Fix race on chunks multilevel cache + Optimize to avoid refetching al…
Browse files Browse the repository at this point in the history
…ready found keys. (#6312)

* Creating a test to show the race on the multilevel cache

Signed-off-by: alanprot <[email protected]>

* fix the race problem

* Only fetch keys that were not found on the previous cache

Signed-off-by: alanprot <[email protected]>

---------

Signed-off-by: alanprot <[email protected]>
  • Loading branch information
alanprot authored Nov 5, 2024
1 parent c25b18d commit 661f47b
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 8 deletions.
17 changes: 15 additions & 2 deletions pkg/storage/tsdb/multilevel_chunk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (m *multiLevelChunkCache) Fetch(ctx context.Context, keys []string) map[str
timer := prometheus.NewTimer(m.fetchLatency.WithLabelValues())
defer timer.ObserveDuration()

missingKeys := keys
hits := map[string][]byte{}
backfillItems := make([]map[string][]byte, len(m.caches)-1)

Expand All @@ -108,13 +109,25 @@ func (m *multiLevelChunkCache) Fetch(ctx context.Context, keys []string) map[str
if ctx.Err() != nil {
return nil
}
if data := c.Fetch(ctx, keys); len(data) > 0 {
if data := c.Fetch(ctx, missingKeys); len(data) > 0 {
for k, d := range data {
hits[k] = d
}

if i > 0 && len(hits) > 0 {
backfillItems[i-1] = hits
// lets fetch only the mising keys
m := missingKeys[:0]
for _, key := range missingKeys {
if _, ok := hits[key]; !ok {
m = append(m, key)
}
}

missingKeys = m

for k, b := range hits {
backfillItems[i-1][k] = b
}
}

if len(hits) == len(keys) {
Expand Down
60 changes: 54 additions & 6 deletions pkg/storage/tsdb/multilevel_chunk_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"testing"
"time"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/cache"
)

func Test_MultiLevelChunkCacheStore(t *testing.T) {
Expand Down Expand Up @@ -72,6 +74,43 @@ func Test_MultiLevelChunkCacheStore(t *testing.T) {
}
}

func Test_MultiLevelChunkCacheFetchRace(t *testing.T) {
cfg := MultiLevelChunkCacheConfig{
MaxAsyncConcurrency: 10,
MaxAsyncBufferSize: 100000,
MaxBackfillItems: 10000,
BackFillTTL: time.Hour * 24,
}
reg := prometheus.NewRegistry()

m1 := newMockChunkCache("m1", map[string][]byte{
"key1": []byte("value1"),
"key2": []byte("value2"),
})

inMemory, err := cache.NewInMemoryCacheWithConfig("test", log.NewNopLogger(), reg, cache.InMemoryCacheConfig{MaxSize: 10 * 1024, MaxItemSize: 1024})
require.NoError(t, err)

inMemory.Store(map[string][]byte{
"key2": []byte("value2"),
"key3": []byte("value3"),
}, time.Minute)

c := newMultiLevelChunkCache("chunk-cache", cfg, reg, inMemory, m1)

hits := c.Fetch(context.Background(), []string{"key1", "key2", "key3", "key4"})

require.Equal(t, 3, len(hits))

// We should be able to change the returned values without any race problem
delete(hits, "key1")

mlc := c.(*multiLevelChunkCache)
//Wait until async operation finishes.
mlc.backfillProcessor.Stop()

}

func Test_MultiLevelChunkCacheFetch(t *testing.T) {
cfg := MultiLevelChunkCacheConfig{
MaxAsyncConcurrency: 10,
Expand All @@ -81,12 +120,14 @@ func Test_MultiLevelChunkCacheFetch(t *testing.T) {
}

testCases := map[string]struct {
m1ExistingData map[string][]byte
m2ExistingData map[string][]byte
expectedM1Data map[string][]byte
expectedM2Data map[string][]byte
expectedFetchedData map[string][]byte
fetchKeys []string
m1ExistingData map[string][]byte
m2ExistingData map[string][]byte
expectedM1Data map[string][]byte
expectedM2Data map[string][]byte
expectedFetchedData map[string][]byte
expectedM1FetchedKeys []string
expectedM2FetchedKeys []string
fetchKeys []string
}{
"fetched data should be union of m1, m2 and 'key2' and `key3' should be backfilled to m1": {
m1ExistingData: map[string][]byte{
Expand All @@ -96,6 +137,8 @@ func Test_MultiLevelChunkCacheFetch(t *testing.T) {
"key2": []byte("value2"),
"key3": []byte("value3"),
},
expectedM1FetchedKeys: []string{"key1", "key2", "key3"},
expectedM2FetchedKeys: []string{"key2", "key3"},
expectedM1Data: map[string][]byte{
"key1": []byte("value1"),
"key2": []byte("value2"),
Expand All @@ -119,6 +162,8 @@ func Test_MultiLevelChunkCacheFetch(t *testing.T) {
m2ExistingData: map[string][]byte{
"key2": []byte("value2"),
},
expectedM1FetchedKeys: []string{"key1", "key2", "key3"},
expectedM2FetchedKeys: []string{"key2", "key3"},
expectedM1Data: map[string][]byte{
"key1": []byte("value1"),
"key2": []byte("value2"),
Expand Down Expand Up @@ -157,6 +202,8 @@ type mockChunkCache struct {
mu sync.Mutex
name string
data map[string][]byte

fetchedKeys []string
}

func newMockChunkCache(name string, data map[string][]byte) *mockChunkCache {
Expand All @@ -180,6 +227,7 @@ func (m *mockChunkCache) Fetch(_ context.Context, keys []string) map[string][]by
h := map[string][]byte{}

for _, k := range keys {
m.fetchedKeys = append(m.fetchedKeys, k)
if _, ok := m.data[k]; ok {
h[k] = m.data[k]
}
Expand Down

0 comments on commit 661f47b

Please sign in to comment.