From 661f47b80313a27a6e86a27b77b42d8d7b77e17b Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Mon, 4 Nov 2024 23:09:08 -0800 Subject: [PATCH] Fix race on chunks multilevel cache + Optimize to avoid refetching already found keys. (#6312) * Creating a test to show the race on the multilevel cache Signed-off-by: alanprot * fix the race problem * Only fetch keys that were not found on the previous cache Signed-off-by: alanprot --------- Signed-off-by: alanprot --- pkg/storage/tsdb/multilevel_chunk_cache.go | 17 +++++- .../tsdb/multilevel_chunk_cache_test.go | 60 +++++++++++++++++-- 2 files changed, 69 insertions(+), 8 deletions(-) diff --git a/pkg/storage/tsdb/multilevel_chunk_cache.go b/pkg/storage/tsdb/multilevel_chunk_cache.go index e1b0f5bc20..8daa2f56ce 100644 --- a/pkg/storage/tsdb/multilevel_chunk_cache.go +++ b/pkg/storage/tsdb/multilevel_chunk_cache.go @@ -98,6 +98,7 @@ func (m *multiLevelChunkCache) Fetch(ctx context.Context, keys []string) map[str timer := prometheus.NewTimer(m.fetchLatency.WithLabelValues()) defer timer.ObserveDuration() + missingKeys := keys hits := map[string][]byte{} backfillItems := make([]map[string][]byte, len(m.caches)-1) @@ -108,13 +109,25 @@ func (m *multiLevelChunkCache) Fetch(ctx context.Context, keys []string) map[str if ctx.Err() != nil { return nil } - if data := c.Fetch(ctx, keys); len(data) > 0 { + if data := c.Fetch(ctx, missingKeys); len(data) > 0 { for k, d := range data { hits[k] = d } if i > 0 && len(hits) > 0 { - backfillItems[i-1] = hits + // lets fetch only the mising keys + m := missingKeys[:0] + for _, key := range missingKeys { + if _, ok := hits[key]; !ok { + m = append(m, key) + } + } + + missingKeys = m + + for k, b := range hits { + backfillItems[i-1][k] = b + } } if len(hits) == len(keys) { diff --git a/pkg/storage/tsdb/multilevel_chunk_cache_test.go b/pkg/storage/tsdb/multilevel_chunk_cache_test.go index c72c1f3a55..4b50b8fd02 100644 --- a/pkg/storage/tsdb/multilevel_chunk_cache_test.go +++ b/pkg/storage/tsdb/multilevel_chunk_cache_test.go @@ -6,8 +6,10 @@ import ( "testing" "time" + "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/cache" ) func Test_MultiLevelChunkCacheStore(t *testing.T) { @@ -72,6 +74,43 @@ func Test_MultiLevelChunkCacheStore(t *testing.T) { } } +func Test_MultiLevelChunkCacheFetchRace(t *testing.T) { + cfg := MultiLevelChunkCacheConfig{ + MaxAsyncConcurrency: 10, + MaxAsyncBufferSize: 100000, + MaxBackfillItems: 10000, + BackFillTTL: time.Hour * 24, + } + reg := prometheus.NewRegistry() + + m1 := newMockChunkCache("m1", map[string][]byte{ + "key1": []byte("value1"), + "key2": []byte("value2"), + }) + + inMemory, err := cache.NewInMemoryCacheWithConfig("test", log.NewNopLogger(), reg, cache.InMemoryCacheConfig{MaxSize: 10 * 1024, MaxItemSize: 1024}) + require.NoError(t, err) + + inMemory.Store(map[string][]byte{ + "key2": []byte("value2"), + "key3": []byte("value3"), + }, time.Minute) + + c := newMultiLevelChunkCache("chunk-cache", cfg, reg, inMemory, m1) + + hits := c.Fetch(context.Background(), []string{"key1", "key2", "key3", "key4"}) + + require.Equal(t, 3, len(hits)) + + // We should be able to change the returned values without any race problem + delete(hits, "key1") + + mlc := c.(*multiLevelChunkCache) + //Wait until async operation finishes. + mlc.backfillProcessor.Stop() + +} + func Test_MultiLevelChunkCacheFetch(t *testing.T) { cfg := MultiLevelChunkCacheConfig{ MaxAsyncConcurrency: 10, @@ -81,12 +120,14 @@ func Test_MultiLevelChunkCacheFetch(t *testing.T) { } testCases := map[string]struct { - m1ExistingData map[string][]byte - m2ExistingData map[string][]byte - expectedM1Data map[string][]byte - expectedM2Data map[string][]byte - expectedFetchedData map[string][]byte - fetchKeys []string + m1ExistingData map[string][]byte + m2ExistingData map[string][]byte + expectedM1Data map[string][]byte + expectedM2Data map[string][]byte + expectedFetchedData map[string][]byte + expectedM1FetchedKeys []string + expectedM2FetchedKeys []string + fetchKeys []string }{ "fetched data should be union of m1, m2 and 'key2' and `key3' should be backfilled to m1": { m1ExistingData: map[string][]byte{ @@ -96,6 +137,8 @@ func Test_MultiLevelChunkCacheFetch(t *testing.T) { "key2": []byte("value2"), "key3": []byte("value3"), }, + expectedM1FetchedKeys: []string{"key1", "key2", "key3"}, + expectedM2FetchedKeys: []string{"key2", "key3"}, expectedM1Data: map[string][]byte{ "key1": []byte("value1"), "key2": []byte("value2"), @@ -119,6 +162,8 @@ func Test_MultiLevelChunkCacheFetch(t *testing.T) { m2ExistingData: map[string][]byte{ "key2": []byte("value2"), }, + expectedM1FetchedKeys: []string{"key1", "key2", "key3"}, + expectedM2FetchedKeys: []string{"key2", "key3"}, expectedM1Data: map[string][]byte{ "key1": []byte("value1"), "key2": []byte("value2"), @@ -157,6 +202,8 @@ type mockChunkCache struct { mu sync.Mutex name string data map[string][]byte + + fetchedKeys []string } func newMockChunkCache(name string, data map[string][]byte) *mockChunkCache { @@ -180,6 +227,7 @@ func (m *mockChunkCache) Fetch(_ context.Context, keys []string) map[string][]by h := map[string][]byte{} for _, k := range keys { + m.fetchedKeys = append(m.fetchedKeys, k) if _, ok := m.data[k]; ok { h[k] = m.data[k] }