From 29c560e8af3329f55de9ece9088a101b5491022b Mon Sep 17 00:00:00 2001 From: Nick Pillitteri Date: Thu, 3 Oct 2024 16:17:05 -0400 Subject: [PATCH] Prevent stale results from being cached using cache "locks" Proof-of-concept for discussion as part of: https://github.com/grafana/mimir/pull/9434 https://github.com/grafana/dskit/pull/591 Signed-off-by: Nick Pillitteri --- go.mod | 2 + go.sum | 4 +- .../tsdb/bucketcache/caching_bucket.go | 61 +++++++++++++++--- .../github.com/grafana/dskit/cache/cache.go | 13 ++++ .../github.com/grafana/dskit/cache/client.go | 62 +++++++++++++----- .../grafana/dskit/cache/compression.go | 8 +++ vendor/github.com/grafana/dskit/cache/lru.go | 35 +++++++++++ .../grafana/dskit/cache/memcached_client.go | 63 ++++++++++++++++--- vendor/github.com/grafana/dskit/cache/mock.go | 29 +++++++++ .../grafana/dskit/cache/redis_client.go | 29 ++++++++- .../github.com/grafana/dskit/cache/tracing.go | 8 +++ .../grafana/dskit/cache/versioned.go | 8 +++ vendor/modules.txt | 3 +- 13 files changed, 287 insertions(+), 38 deletions(-) diff --git a/go.mod b/go.mod index bff6dc9956..d2ec959572 100644 --- a/go.mod +++ b/go.mod @@ -281,6 +281,8 @@ require ( sigs.k8s.io/yaml v1.4.0 // indirect ) +replace github.com/grafana/dskit => github.com/grafana/dskit v0.0.0-20241003174309-ebcbf7befc39 + // Using a fork of Prometheus with Mimir-specific changes. replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240925112120-6046bf43c9b2 diff --git a/go.sum b/go.sum index 1a8eaadaab..ad74380be0 100644 --- a/go.sum +++ b/go.sum @@ -1250,8 +1250,8 @@ github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc h1:PXZQA2WCxe85T github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4= github.com/grafana/alerting v0.0.0-20240926144415-27f4e81b4b6b h1:UO4mv94pG1kzKCgBKh20TXdACBCAK2vYjV3Q2MlcpEQ= github.com/grafana/alerting v0.0.0-20240926144415-27f4e81b4b6b/go.mod h1:GMLi6d09Xqo96fCVUjNk//rcjP5NKEdjOzfWIffD5r4= -github.com/grafana/dskit v0.0.0-20240925193654-7c41a4057319 h1:KACpOOTqA4WqyyKF2fFPQFiaSOpZdOT5f5gg0qkPLiU= -github.com/grafana/dskit v0.0.0-20240925193654-7c41a4057319/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ= +github.com/grafana/dskit v0.0.0-20241003174309-ebcbf7befc39 h1:LE4TKRG0oqPe23NZXhXlEiWzZeth1Dpn9kQgPWPRW2c= +github.com/grafana/dskit v0.0.0-20241003174309-ebcbf7befc39/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc h1:BW+LjKJDz0So5LI8UZfW5neWeKpSkWqhmGjQFzcFfLM= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc/go.mod h1:JVmqPBe8A/pZWwRoJW5ZjyALeY5OXMzPl7LrVXOdZAI= github.com/grafana/franz-go v0.0.0-20241003103709-f8f76a439eb4 h1:7/CJa4ilczGHLjULGJFxRFAGsnaN33YIJEqpm45TUYs= diff --git a/pkg/storage/tsdb/bucketcache/caching_bucket.go b/pkg/storage/tsdb/bucketcache/caching_bucket.go index 3495f69d5a..855cb7563b 100644 --- a/pkg/storage/tsdb/bucketcache/caching_bucket.go +++ b/pkg/storage/tsdb/bucketcache/caching_bucket.go @@ -37,6 +37,9 @@ const ( memoryPoolContextKey contextKey = 0 cacheLookupEnabledContextKey contextKey = 1 + + invalidationLockTTL = 15 * time.Second + invalidationLockContent = "locked" ) var errObjNotFound = errors.Errorf("object not found") @@ -159,10 +162,27 @@ func NewCachingBucket(bucketID string, bucketClient objstore.Bucket, cfg *Cachin return cb, nil } -// invalidate invalidates content, existence, and attribute caches for the given object. -// Note that this is best-effort and errors invalidating the cache are ignored. -func (cb *CachingBucket) invalidate(ctx context.Context, name string) { +// TODO: Docs +func (cb *CachingBucket) invalidatePre(ctx context.Context, name string) (*getConfig, *attributesConfig) { _, getCfg := cb.cfg.findGetConfig(name) + if getCfg != nil && getCfg.invalidateOnMutation { + contentLockKey := cachingKeyContentLock(cb.bucketID, name) + // TODO: How do we handle this error? Retries? + _ = getCfg.cache.Set(ctx, contentLockKey, []byte(invalidationLockContent), invalidationLockTTL) + } + + _, attrCfg := cb.cfg.findAttributesConfig(name) + if attrCfg != nil && attrCfg.invalidateOnMutation { + attrLockKey := cachingKeyAttributes(cb.bucketID, name) + // TODO: How do we handle this error? Retries? + _ = attrCfg.cache.Set(ctx, attrLockKey, []byte(invalidationLockContent), invalidationLockTTL) + } + + return getCfg, attrCfg +} + +// TODO: Docs +func (cb *CachingBucket) invalidatePost(ctx context.Context, name string, getCfg *getConfig, attrCfg *attributesConfig) { if getCfg != nil && getCfg.invalidateOnMutation { // Get config includes an embedded Exists config and the Get() method // caches if an object exists or doesn't. Because of that, we invalidate @@ -175,7 +195,6 @@ func (cb *CachingBucket) invalidate(ctx context.Context, name string) { _ = getCfg.cache.Delete(ctx, existsKey) } - _, attrCfg := cb.cfg.findAttributesConfig(name) if attrCfg != nil && attrCfg.invalidateOnMutation { attrKey := cachingKeyAttributes(cb.bucketID, name) _ = attrCfg.cache.Delete(ctx, attrKey) @@ -183,18 +202,20 @@ func (cb *CachingBucket) invalidate(ctx context.Context, name string) { } func (cb *CachingBucket) Upload(ctx context.Context, name string, r io.Reader) error { + getCfg, attrCfg := cb.invalidatePre(ctx, name) err := cb.Bucket.Upload(ctx, name, r) if err == nil { - cb.invalidate(ctx, name) + cb.invalidatePost(ctx, name, getCfg, attrCfg) } return err } func (cb *CachingBucket) Delete(ctx context.Context, name string) error { + getCfg, attrCfg := cb.invalidatePre(ctx, name) err := cb.Bucket.Delete(ctx, name) if err == nil { - cb.invalidate(ctx, name) + cb.invalidatePost(ctx, name, getCfg, attrCfg) } return err @@ -321,6 +342,7 @@ func (cb *CachingBucket) Get(ctx context.Context, name string) (io.ReadCloser, e return cb.Bucket.Get(ctx, name) } + contentLockKey := cachingKeyContentLock(cb.bucketID, name) contentKey := cachingKeyContent(cb.bucketID, name) existsKey := cachingKeyExists(cb.bucketID, name) @@ -384,6 +406,7 @@ func (cb *CachingBucket) Get(ctx context.Context, name string) (io.ReadCloser, e buf: new(bytes.Buffer), startTime: getTime, ttl: cfg.contentTTL, + lockKey: contentLockKey, cacheKey: contentKey, maxSize: cfg.maxCacheableSize, }, nil @@ -416,6 +439,7 @@ func (cb *CachingBucket) Attributes(ctx context.Context, name string) (objstore. } func (cb *CachingBucket) cachedAttributes(ctx context.Context, name, cfgName string, cache cache.Cache, ttl time.Duration) (objstore.ObjectAttributes, error) { + lockKey := cachingKeyAttributesLock(cb.bucketID, name) key := cachingKeyAttributes(cb.bucketID, name) // Lookup the cache. @@ -441,7 +465,13 @@ func (cb *CachingBucket) cachedAttributes(ctx context.Context, name, cfgName str } if raw, err := json.Marshal(attrs); err == nil { - cache.SetMultiAsync(map[string][]byte{key: raw}, ttl) + // Attempt to add a "lock" key to the cache if it does not already exist. Only cache this + // content when we were able to insert the lock key meaning this object isn't being updated + // by another request. + addErr := cache.Add(ctx, lockKey, []byte(invalidationLockContent), invalidationLockTTL) + if addErr == nil { + cache.SetMultiAsync(map[string][]byte{key: raw}, ttl) + } } else { level.Warn(cb.logger).Log("msg", "failed to encode cached Attributes result", "key", key, "err", err) } @@ -641,6 +671,10 @@ func cachingKeyAttributes(bucketID, name string) string { return composeCachingKey("attrs", bucketID, name) } +func cachingKeyAttributesLock(bucketID, name string) string { + return composeCachingKey("attrs", bucketID, name, "lock") +} + func cachingKeyObjectSubrange(bucketID, name string, start, end int64) string { return composeCachingKey("subrange", bucketID, name, strconv.FormatInt(start, 10), strconv.FormatInt(end, 10)) } @@ -663,6 +697,10 @@ func cachingKeyContent(bucketID, name string) string { return composeCachingKey("content", bucketID, name) } +func cachingKeyContentLock(bucketID, name string) string { + return composeCachingKey("content", bucketID, name, "lock") +} + func composeCachingKey(op, bucketID string, values ...string) string { // Estimate size. estimatedSize := len(op) + len(bucketID) + (2 + len(values)) @@ -774,6 +812,7 @@ type getReader struct { startTime time.Time ttl time.Duration cacheKey string + lockKey string maxSize int } @@ -797,7 +836,13 @@ func (g *getReader) Read(p []byte) (n int, err error) { if errors.Is(err, io.EOF) && g.buf != nil { remainingTTL := g.ttl - time.Since(g.startTime) if remainingTTL > 0 { - g.c.SetMultiAsync(map[string][]byte{g.cacheKey: g.buf.Bytes()}, remainingTTL) + // Attempt to add a "lock" key to the cache if it does not already exist. Only cache this + // content when we were able to insert the lock key meaning this object isn't being updated + // by another request. + addErr := g.c.Add(context.Background(), g.lockKey, []byte(invalidationLockContent), invalidationLockTTL) + if addErr == nil { + g.c.SetMultiAsync(map[string][]byte{g.cacheKey: g.buf.Bytes()}, remainingTTL) + } } // Clear reference, to avoid doing another Store on next read. g.buf = nil diff --git a/vendor/github.com/grafana/dskit/cache/cache.go b/vendor/github.com/grafana/dskit/cache/cache.go index 78c8ea966d..2a30914393 100644 --- a/vendor/github.com/grafana/dskit/cache/cache.go +++ b/vendor/github.com/grafana/dskit/cache/cache.go @@ -13,6 +13,11 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +var ( + ErrNotStored = errors.New("item not stored") + ErrInvalidTTL = errors.New("invalid TTL") +) + // Cache is a high level interface to interact with a cache. type Cache interface { // GetMulti fetches multiple keys at once from a cache. In case of error, @@ -28,6 +33,14 @@ type Cache interface { // any underlying async operations fail, the errors will be tracked/logged. SetMultiAsync(data map[string][]byte, ttl time.Duration) + // Set stores a key and value into a cache. + Set(ctx context.Context, key string, value []byte, ttl time.Duration) error + + // Add stores a key and value into a cache only if it does not already exist. If the + // item was not stored because an entry already exists in the cache, ErrNotStored will + // be returned. + Add(ctx context.Context, key string, value []byte, ttl time.Duration) error + // Delete deletes a key from a cache. This is a synchronous operation. If an asynchronous // set operation for key is still pending to be processed, it will wait for it to complete // before performing deletion. diff --git a/vendor/github.com/grafana/dskit/cache/client.go b/vendor/github.com/grafana/dskit/cache/client.go index 033d2add60..1d3c87c56e 100644 --- a/vendor/github.com/grafana/dskit/cache/client.go +++ b/vendor/github.com/grafana/dskit/cache/client.go @@ -17,6 +17,7 @@ import ( // Common functionality shared between the Memcached and Redis Cache implementations const ( + opAdd = "add" opSet = "set" opGetMulti = "getmulti" opDelete = "delete" @@ -29,6 +30,8 @@ const ( reasonMaxItemSize = "max-item-size" reasonAsyncBufferFull = "async-buffer-full" reasonMalformedKey = "malformed-key" + reasonInvalidTTL = "invalid-ttl" + reasonNotStored = "not-stored" reasonConnectTimeout = "connect-timeout" reasonTimeout = "request-timeout" reasonServerError = "server-error" @@ -85,10 +88,12 @@ func newClientMetrics(reg prometheus.Registerer) *clientMetrics { Name: "operation_failures_total", Help: "Total number of operations against cache that failed.", }, []string{"operation", "reason"}) - for _, op := range []string{opGetMulti, opSet, opDelete, opIncrement, opFlush, opTouch, opCompareAndSwap} { + for _, op := range []string{opGetMulti, opAdd, opSet, opDelete, opIncrement, opFlush, opTouch, opCompareAndSwap} { cm.failures.WithLabelValues(op, reasonConnectTimeout) cm.failures.WithLabelValues(op, reasonTimeout) cm.failures.WithLabelValues(op, reasonMalformedKey) + cm.failures.WithLabelValues(op, reasonInvalidTTL) + cm.failures.WithLabelValues(op, reasonNotStored) cm.failures.WithLabelValues(op, reasonServerError) cm.failures.WithLabelValues(op, reasonNetworkError) cm.failures.WithLabelValues(op, reasonOther) @@ -99,6 +104,7 @@ func newClientMetrics(reg prometheus.Registerer) *clientMetrics { Help: "Total number of operations against cache that have been skipped.", }, []string{"operation", "reason"}) cm.skipped.WithLabelValues(opGetMulti, reasonMaxItemSize) + cm.skipped.WithLabelValues(opAdd, reasonMaxItemSize) cm.skipped.WithLabelValues(opSet, reasonMaxItemSize) cm.skipped.WithLabelValues(opSet, reasonAsyncBufferFull) @@ -112,6 +118,7 @@ func newClientMetrics(reg prometheus.Registerer) *clientMetrics { NativeHistogramMinResetDuration: time.Hour, }, []string{"operation"}) cm.duration.WithLabelValues(opGetMulti) + cm.duration.WithLabelValues(opAdd) cm.duration.WithLabelValues(opSet) cm.duration.WithLabelValues(opDelete) cm.duration.WithLabelValues(opIncrement) @@ -129,6 +136,7 @@ func newClientMetrics(reg prometheus.Registerer) *clientMetrics { []string{"operation"}, ) cm.dataSize.WithLabelValues(opGetMulti) + cm.dataSize.WithLabelValues(opAdd) cm.dataSize.WithLabelValues(opSet) cm.dataSize.WithLabelValues(opCompareAndSwap) @@ -172,22 +180,12 @@ func (c *baseClient) setAsync(key string, value []byte, ttl time.Duration, f fun } err := c.asyncQueue.submit(func() { - start := time.Now() - c.metrics.operations.WithLabelValues(opSet).Inc() - - err := f(key, value, ttl) - if err != nil { - level.Debug(c.logger).Log( - "msg", "failed to store item to cache", - "key", key, - "sizeBytes", len(value), - "err", err, - ) - c.trackError(opSet, err) - } - - c.metrics.dataSize.WithLabelValues(opSet).Observe(float64(len(value))) - c.metrics.duration.WithLabelValues(opSet).Observe(time.Since(start).Seconds()) + // Because this operation is executed in a separate goroutine: We run the operation without + // a context (it is expected to keep running no matter what happens) and we don't return the + // error (it will be tracked via metrics instead of being returned to the caller). + _ = c.storeOperation(context.Background(), key, value, ttl, opSet, func(_ context.Context, key string, value []byte, ttl time.Duration) error { + return f(key, value, ttl) + }) }) if err != nil { @@ -196,6 +194,32 @@ func (c *baseClient) setAsync(key string, value []byte, ttl time.Duration, f fun } } +func (c *baseClient) storeOperation(ctx context.Context, key string, value []byte, ttl time.Duration, operation string, f func(ctx context.Context, key string, value []byte, ttl time.Duration) error) error { + if c.maxItemSize > 0 && uint64(len(value)) > c.maxItemSize { + c.metrics.skipped.WithLabelValues(operation, reasonMaxItemSize).Inc() + return nil + } + + start := time.Now() + c.metrics.operations.WithLabelValues(operation).Inc() + + err := f(ctx, key, value, ttl) + if err != nil { + level.Debug(c.logger).Log( + "msg", "failed to store item to cache", + "operation", operation, + "key", key, + "sizeBytes", len(value), + "err", err, + ) + c.trackError(operation, err) + } + + c.metrics.dataSize.WithLabelValues(operation).Observe(float64(len(value))) + c.metrics.duration.WithLabelValues(operation).Observe(time.Since(start).Seconds()) + return err +} + // wait submits an async task and blocks until it completes. This can be used during // tests to ensure that async "sets" have completed before attempting to read them. func (c *baseClient) wait() error { @@ -255,6 +279,10 @@ func (c *baseClient) trackError(op string, err error) { } else { c.metrics.failures.WithLabelValues(op, reasonNetworkError).Inc() } + case errors.Is(err, ErrNotStored): + c.metrics.failures.WithLabelValues(op, reasonNotStored).Inc() + case errors.Is(err, ErrInvalidTTL): + c.metrics.failures.WithLabelValues(op, reasonInvalidTTL).Inc() case errors.Is(err, memcache.ErrMalformedKey): c.metrics.failures.WithLabelValues(op, reasonMalformedKey).Inc() case errors.Is(err, memcache.ErrServerError): diff --git a/vendor/github.com/grafana/dskit/cache/compression.go b/vendor/github.com/grafana/dskit/cache/compression.go index 4146d25977..460ac5849b 100644 --- a/vendor/github.com/grafana/dskit/cache/compression.go +++ b/vendor/github.com/grafana/dskit/cache/compression.go @@ -85,6 +85,14 @@ func (s *SnappyCache) SetMultiAsync(data map[string][]byte, ttl time.Duration) { s.next.SetMultiAsync(encoded, ttl) } +func (s *SnappyCache) Set(ctx context.Context, key string, value []byte, ttl time.Duration) error { + return s.next.Set(ctx, key, snappy.Encode(nil, value), ttl) +} + +func (s *SnappyCache) Add(ctx context.Context, key string, value []byte, ttl time.Duration) error { + return s.next.Add(ctx, key, snappy.Encode(nil, value), ttl) +} + // GetMulti implements Cache. func (s *SnappyCache) GetMulti(ctx context.Context, keys []string, opts ...Option) map[string][]byte { found := s.next.GetMulti(ctx, keys, opts...) diff --git a/vendor/github.com/grafana/dskit/cache/lru.go b/vendor/github.com/grafana/dskit/cache/lru.go index b75e5a4e27..c0b7a032f8 100644 --- a/vendor/github.com/grafana/dskit/cache/lru.go +++ b/vendor/github.com/grafana/dskit/cache/lru.go @@ -103,6 +103,41 @@ func (l *LRUCache) SetMultiAsync(data map[string][]byte, ttl time.Duration) { } } +func (l *LRUCache) Set(ctx context.Context, key string, value []byte, ttl time.Duration) error { + err := l.c.Set(ctx, key, value, ttl) + + l.mtx.Lock() + defer l.mtx.Unlock() + + expires := time.Now().Add(ttl) + l.lru.Add(key, &Item{ + Data: value, + ExpiresAt: expires, + }) + + return err +} + +func (l *LRUCache) Add(ctx context.Context, key string, value []byte, ttl time.Duration) error { + err := l.c.Add(ctx, key, value, ttl) + + // When a caller uses the Add method, the presence of absence of an entry in the cache + // has significance. In order to maintain the semantics of that, we only add an entry to + // the LRU when it was able to be successfully added to the shared cache. + if err == nil { + l.mtx.Lock() + defer l.mtx.Unlock() + + expires := time.Now().Add(ttl) + l.lru.Add(key, &Item{ + Data: value, + ExpiresAt: expires, + }) + } + + return err +} + func (l *LRUCache) GetMulti(ctx context.Context, keys []string, opts ...Option) (result map[string][]byte) { l.requests.Add(float64(len(keys))) l.mtx.Lock() diff --git a/vendor/github.com/grafana/dskit/cache/memcached_client.go b/vendor/github.com/grafana/dskit/cache/memcached_client.go index a22f803544..0c4e9e6c50 100644 --- a/vendor/github.com/grafana/dskit/cache/memcached_client.go +++ b/vendor/github.com/grafana/dskit/cache/memcached_client.go @@ -28,6 +28,7 @@ import ( const ( dnsProviderUpdateInterval = 30 * time.Second + maxTTL = 30 * 24 * time.Hour ) var ( @@ -43,6 +44,7 @@ var ( type memcachedClientBackend interface { GetMulti(keys []string, opts ...memcache.Option) (map[string]*memcache.Item, error) Set(item *memcache.Item) error + Add(item *memcache.Item) error Delete(key string) error Decrement(key string, delta uint64) (uint64, error) Increment(key string, delta uint64) (uint64, error) @@ -322,14 +324,47 @@ func (c *MemcachedClient) SetAsync(key string, value []byte, ttl time.Duration) c.setAsync(key, value, ttl, c.setSingleItem) } +func (c *MemcachedClient) Set(ctx context.Context, key string, value []byte, ttl time.Duration) error { + return c.storeOperation(ctx, key, value, ttl, opSet, func(ctx context.Context, key string, value []byte, ttl time.Duration) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + return c.setSingleItem(key, value, ttl) + } + }) +} + +func (c *MemcachedClient) Add(ctx context.Context, key string, value []byte, ttl time.Duration) error { + return c.storeOperation(ctx, key, value, ttl, opAdd, func(ctx context.Context, key string, value []byte, ttl time.Duration) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + ttlSeconds, ok := toSeconds(ttl) + if !ok { + return fmt.Errorf("%w: for set operation on %s %s", ErrInvalidTTL, key, ttl) + } + + err := c.client.Add(&memcache.Item{ + Key: key, + Value: value, + Expiration: ttlSeconds, + }) + + if errors.Is(err, memcache.ErrNotStored) { + return fmt.Errorf("%w: for add operation on %s", ErrNotStored, key) + } + + return err + } + }) +} + func (c *MemcachedClient) setSingleItem(key string, value []byte, ttl time.Duration) error { - ttlSeconds := int32(ttl.Seconds()) - // If a TTL of exactly 0 is passed, we honor it and pass it to Memcached which will - // interpret it as an infinite TTL. However, if we get a non-zero TTL that is truncated - // to 0 seconds, we discard the update since the caller didn't intend to set an infinite - // TTL. - if ttl != 0 && ttlSeconds <= 0 { - return nil + ttlSeconds, ok := toSeconds(ttl) + if !ok { + return fmt.Errorf("%w: for set operation on %s %s", ErrInvalidTTL, key, ttl) } return c.client.Set(&memcache.Item{ @@ -339,6 +374,20 @@ func (c *MemcachedClient) setSingleItem(key string, value []byte, ttl time.Durat }) } +// TODO: Docs +func toSeconds(d time.Duration) (int32, bool) { + if d > maxTTL { + return 0, false + } + + secs := int32(d.Seconds()) + if d != 0 && secs <= 0 { + return 0, false + } + + return secs, true +} + func toMemcacheOptions(opts ...Option) []memcache.Option { if len(opts) == 0 { return nil diff --git a/vendor/github.com/grafana/dskit/cache/mock.go b/vendor/github.com/grafana/dskit/cache/mock.go index b06123272d..4a5dae962d 100644 --- a/vendor/github.com/grafana/dskit/cache/mock.go +++ b/vendor/github.com/grafana/dskit/cache/mock.go @@ -43,6 +43,25 @@ func (m *MockCache) SetMultiAsync(data map[string][]byte, ttl time.Duration) { } } +func (m *MockCache) Set(_ context.Context, key string, value []byte, ttl time.Duration) error { + m.mu.Lock() + defer m.mu.Unlock() + m.cache[key] = Item{Data: value, ExpiresAt: time.Now().Add(ttl)} + return nil +} + +func (m *MockCache) Add(_ context.Context, key string, value []byte, ttl time.Duration) error { + m.mu.Lock() + defer m.mu.Unlock() + + if _, ok := m.cache[key]; ok { + return ErrNotStored + } + + m.cache[key] = Item{Data: value, ExpiresAt: time.Now().Add(ttl)} + return nil +} + func (m *MockCache) GetMulti(_ context.Context, keys []string, _ ...Option) map[string][]byte { m.mu.Lock() defer m.mu.Unlock() @@ -121,6 +140,16 @@ func (m *InstrumentedMockCache) SetMultiAsync(data map[string][]byte, ttl time.D m.cache.SetMultiAsync(data, ttl) } +func (m *InstrumentedMockCache) Set(ctx context.Context, key string, value []byte, ttl time.Duration) error { + m.storeCount.Inc() + return m.cache.Set(ctx, key, value, ttl) +} + +func (m *InstrumentedMockCache) Add(ctx context.Context, key string, value []byte, ttl time.Duration) error { + m.storeCount.Inc() + return m.cache.Add(ctx, key, value, ttl) +} + func (m *InstrumentedMockCache) GetMulti(ctx context.Context, keys []string, opts ...Option) map[string][]byte { m.fetchCount.Inc() return m.cache.GetMulti(ctx, keys, opts...) diff --git a/vendor/github.com/grafana/dskit/cache/redis_client.go b/vendor/github.com/grafana/dskit/cache/redis_client.go index cd9efb4c12..a0feb4e156 100644 --- a/vendor/github.com/grafana/dskit/cache/redis_client.go +++ b/vendor/github.com/grafana/dskit/cache/redis_client.go @@ -226,7 +226,7 @@ func NewRedisClient(logger log.Logger, name string, config RedisClientConfig, re return c, nil } -// SetMultiAsync implements RemoteCacheClient. +// SetMultiAsync implements Cache. func (c *RedisClient) SetMultiAsync(data map[string][]byte, ttl time.Duration) { c.setMultiAsync(data, ttl, func(key string, value []byte, ttl time.Duration) error { _, err := c.client.Set(context.Background(), key, value, ttl).Result() @@ -234,7 +234,7 @@ func (c *RedisClient) SetMultiAsync(data map[string][]byte, ttl time.Duration) { }) } -// SetAsync implements RemoteCacheClient. +// SetAsync implements Cache. func (c *RedisClient) SetAsync(key string, value []byte, ttl time.Duration) { c.setAsync(key, value, ttl, func(key string, buf []byte, ttl time.Duration) error { _, err := c.client.Set(context.Background(), key, buf, ttl).Result() @@ -242,7 +242,30 @@ func (c *RedisClient) SetAsync(key string, value []byte, ttl time.Duration) { }) } -// GetMulti implements RemoteCacheClient. +// Set implements Cache. +func (c *RedisClient) Set(ctx context.Context, key string, value []byte, ttl time.Duration) error { + return c.storeOperation(ctx, key, value, ttl, opSet, func(ctx context.Context, key string, value []byte, ttl time.Duration) error { + _, err := c.client.Set(ctx, key, value, ttl).Result() + return err + }) +} + +// Add implements Cache. +func (c *RedisClient) Add(ctx context.Context, key string, value []byte, ttl time.Duration) error { + return c.storeOperation(ctx, key, value, ttl, opAdd, func(ctx context.Context, key string, value []byte, ttl time.Duration) error { + stored, err := c.client.SetNX(ctx, key, value, ttl).Result() + if err != nil { + return err + } + if !stored { + return fmt.Errorf("%w: for Set NX operation on %s", ErrNotStored, key) + } + + return nil + }) +} + +// GetMulti implements Cache. func (c *RedisClient) GetMulti(ctx context.Context, keys []string, _ ...Option) map[string][]byte { if len(keys) == 0 { return nil diff --git a/vendor/github.com/grafana/dskit/cache/tracing.go b/vendor/github.com/grafana/dskit/cache/tracing.go index 68f29d140d..148c708f8a 100644 --- a/vendor/github.com/grafana/dskit/cache/tracing.go +++ b/vendor/github.com/grafana/dskit/cache/tracing.go @@ -31,6 +31,14 @@ func (t *SpanlessTracingCache) SetMultiAsync(data map[string][]byte, ttl time.Du t.next.SetMultiAsync(data, ttl) } +func (t *SpanlessTracingCache) Set(ctx context.Context, key string, value []byte, ttl time.Duration) error { + return t.next.Set(ctx, key, value, ttl) +} + +func (t *SpanlessTracingCache) Add(ctx context.Context, key string, value []byte, ttl time.Duration) error { + return t.next.Add(ctx, key, value, ttl) +} + func (t *SpanlessTracingCache) GetMulti(ctx context.Context, keys []string, opts ...Option) (result map[string][]byte) { var ( bytes int diff --git a/vendor/github.com/grafana/dskit/cache/versioned.go b/vendor/github.com/grafana/dskit/cache/versioned.go index 782cd85972..a4eef5170c 100644 --- a/vendor/github.com/grafana/dskit/cache/versioned.go +++ b/vendor/github.com/grafana/dskit/cache/versioned.go @@ -36,6 +36,14 @@ func (c *Versioned) SetMultiAsync(data map[string][]byte, ttl time.Duration) { c.cache.SetMultiAsync(versioned, ttl) } +func (c *Versioned) Set(ctx context.Context, key string, value []byte, ttl time.Duration) error { + return c.cache.Set(ctx, c.addVersion(key), value, ttl) +} + +func (c *Versioned) Add(ctx context.Context, key string, value []byte, ttl time.Duration) error { + return c.cache.Add(ctx, c.addVersion(key), value, ttl) +} + func (c *Versioned) GetMulti(ctx context.Context, keys []string, opts ...Option) map[string][]byte { versionedKeys := make([]string, len(keys)) for i, k := range keys { diff --git a/vendor/modules.txt b/vendor/modules.txt index ceb9e3fdc8..f8fdae384e 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -611,7 +611,7 @@ github.com/grafana/alerting/receivers/webex github.com/grafana/alerting/receivers/webhook github.com/grafana/alerting/receivers/wecom github.com/grafana/alerting/templates -# github.com/grafana/dskit v0.0.0-20240925193654-7c41a4057319 +# github.com/grafana/dskit v0.0.0-20240925193654-7c41a4057319 => github.com/grafana/dskit v0.0.0-20241003174309-ebcbf7befc39 ## explicit; go 1.21 github.com/grafana/dskit/backoff github.com/grafana/dskit/ballast @@ -1664,6 +1664,7 @@ sigs.k8s.io/kustomize/kyaml/yaml/walk sigs.k8s.io/yaml sigs.k8s.io/yaml/goyaml.v2 sigs.k8s.io/yaml/goyaml.v3 +# github.com/grafana/dskit => github.com/grafana/dskit v0.0.0-20241003174309-ebcbf7befc39 # github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240925112120-6046bf43c9b2 # github.com/prometheus/client_golang => github.com/prometheus/client_golang v1.19.1 # github.com/hashicorp/memberlist => github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe