Skip to content

Commit

Permalink
Prevent stale results from being cached using cache "locks"
Browse files Browse the repository at this point in the history
Proof-of-concept for discussion as part of:

#9434

grafana/dskit#591

Signed-off-by: Nick Pillitteri <[email protected]>
  • Loading branch information
56quarters committed Oct 3, 2024
1 parent d6abaff commit 29c560e
Show file tree
Hide file tree
Showing 13 changed files with 287 additions and 38 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ require (
sigs.k8s.io/yaml v1.4.0 // indirect
)

replace github.com/grafana/dskit => github.com/grafana/dskit v0.0.0-20241003174309-ebcbf7befc39

// Using a fork of Prometheus with Mimir-specific changes.
replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240925112120-6046bf43c9b2

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1250,8 +1250,8 @@ github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc h1:PXZQA2WCxe85T
github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4=
github.com/grafana/alerting v0.0.0-20240926144415-27f4e81b4b6b h1:UO4mv94pG1kzKCgBKh20TXdACBCAK2vYjV3Q2MlcpEQ=
github.com/grafana/alerting v0.0.0-20240926144415-27f4e81b4b6b/go.mod h1:GMLi6d09Xqo96fCVUjNk//rcjP5NKEdjOzfWIffD5r4=
github.com/grafana/dskit v0.0.0-20240925193654-7c41a4057319 h1:KACpOOTqA4WqyyKF2fFPQFiaSOpZdOT5f5gg0qkPLiU=
github.com/grafana/dskit v0.0.0-20240925193654-7c41a4057319/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ=
github.com/grafana/dskit v0.0.0-20241003174309-ebcbf7befc39 h1:LE4TKRG0oqPe23NZXhXlEiWzZeth1Dpn9kQgPWPRW2c=
github.com/grafana/dskit v0.0.0-20241003174309-ebcbf7befc39/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ=
github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc h1:BW+LjKJDz0So5LI8UZfW5neWeKpSkWqhmGjQFzcFfLM=
github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc/go.mod h1:JVmqPBe8A/pZWwRoJW5ZjyALeY5OXMzPl7LrVXOdZAI=
github.com/grafana/franz-go v0.0.0-20241003103709-f8f76a439eb4 h1:7/CJa4ilczGHLjULGJFxRFAGsnaN33YIJEqpm45TUYs=
Expand Down
61 changes: 53 additions & 8 deletions pkg/storage/tsdb/bucketcache/caching_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ const (

memoryPoolContextKey contextKey = 0
cacheLookupEnabledContextKey contextKey = 1

invalidationLockTTL = 15 * time.Second
invalidationLockContent = "locked"
)

var errObjNotFound = errors.Errorf("object not found")
Expand Down Expand Up @@ -159,10 +162,27 @@ func NewCachingBucket(bucketID string, bucketClient objstore.Bucket, cfg *Cachin
return cb, nil
}

// invalidate invalidates content, existence, and attribute caches for the given object.
// Note that this is best-effort and errors invalidating the cache are ignored.
func (cb *CachingBucket) invalidate(ctx context.Context, name string) {
// TODO: Docs
func (cb *CachingBucket) invalidatePre(ctx context.Context, name string) (*getConfig, *attributesConfig) {
_, getCfg := cb.cfg.findGetConfig(name)
if getCfg != nil && getCfg.invalidateOnMutation {
contentLockKey := cachingKeyContentLock(cb.bucketID, name)
// TODO: How do we handle this error? Retries?
_ = getCfg.cache.Set(ctx, contentLockKey, []byte(invalidationLockContent), invalidationLockTTL)
}

_, attrCfg := cb.cfg.findAttributesConfig(name)
if attrCfg != nil && attrCfg.invalidateOnMutation {
attrLockKey := cachingKeyAttributes(cb.bucketID, name)
// TODO: How do we handle this error? Retries?
_ = attrCfg.cache.Set(ctx, attrLockKey, []byte(invalidationLockContent), invalidationLockTTL)
}

return getCfg, attrCfg
}

// TODO: Docs
func (cb *CachingBucket) invalidatePost(ctx context.Context, name string, getCfg *getConfig, attrCfg *attributesConfig) {
if getCfg != nil && getCfg.invalidateOnMutation {
// Get config includes an embedded Exists config and the Get() method
// caches if an object exists or doesn't. Because of that, we invalidate
Expand All @@ -175,26 +195,27 @@ func (cb *CachingBucket) invalidate(ctx context.Context, name string) {
_ = getCfg.cache.Delete(ctx, existsKey)
}

_, attrCfg := cb.cfg.findAttributesConfig(name)
if attrCfg != nil && attrCfg.invalidateOnMutation {
attrKey := cachingKeyAttributes(cb.bucketID, name)
_ = attrCfg.cache.Delete(ctx, attrKey)
}
}

func (cb *CachingBucket) Upload(ctx context.Context, name string, r io.Reader) error {
getCfg, attrCfg := cb.invalidatePre(ctx, name)
err := cb.Bucket.Upload(ctx, name, r)
if err == nil {
cb.invalidate(ctx, name)
cb.invalidatePost(ctx, name, getCfg, attrCfg)
}

return err
}

func (cb *CachingBucket) Delete(ctx context.Context, name string) error {
getCfg, attrCfg := cb.invalidatePre(ctx, name)
err := cb.Bucket.Delete(ctx, name)
if err == nil {
cb.invalidate(ctx, name)
cb.invalidatePost(ctx, name, getCfg, attrCfg)
}

return err
Expand Down Expand Up @@ -321,6 +342,7 @@ func (cb *CachingBucket) Get(ctx context.Context, name string) (io.ReadCloser, e
return cb.Bucket.Get(ctx, name)
}

contentLockKey := cachingKeyContentLock(cb.bucketID, name)
contentKey := cachingKeyContent(cb.bucketID, name)
existsKey := cachingKeyExists(cb.bucketID, name)

Expand Down Expand Up @@ -384,6 +406,7 @@ func (cb *CachingBucket) Get(ctx context.Context, name string) (io.ReadCloser, e
buf: new(bytes.Buffer),
startTime: getTime,
ttl: cfg.contentTTL,
lockKey: contentLockKey,
cacheKey: contentKey,
maxSize: cfg.maxCacheableSize,
}, nil
Expand Down Expand Up @@ -416,6 +439,7 @@ func (cb *CachingBucket) Attributes(ctx context.Context, name string) (objstore.
}

func (cb *CachingBucket) cachedAttributes(ctx context.Context, name, cfgName string, cache cache.Cache, ttl time.Duration) (objstore.ObjectAttributes, error) {
lockKey := cachingKeyAttributesLock(cb.bucketID, name)
key := cachingKeyAttributes(cb.bucketID, name)

// Lookup the cache.
Expand All @@ -441,7 +465,13 @@ func (cb *CachingBucket) cachedAttributes(ctx context.Context, name, cfgName str
}

if raw, err := json.Marshal(attrs); err == nil {
cache.SetMultiAsync(map[string][]byte{key: raw}, ttl)
// Attempt to add a "lock" key to the cache if it does not already exist. Only cache this
// content when we were able to insert the lock key meaning this object isn't being updated
// by another request.
addErr := cache.Add(ctx, lockKey, []byte(invalidationLockContent), invalidationLockTTL)
if addErr == nil {
cache.SetMultiAsync(map[string][]byte{key: raw}, ttl)
}
} else {
level.Warn(cb.logger).Log("msg", "failed to encode cached Attributes result", "key", key, "err", err)
}
Expand Down Expand Up @@ -641,6 +671,10 @@ func cachingKeyAttributes(bucketID, name string) string {
return composeCachingKey("attrs", bucketID, name)
}

func cachingKeyAttributesLock(bucketID, name string) string {
return composeCachingKey("attrs", bucketID, name, "lock")
}

func cachingKeyObjectSubrange(bucketID, name string, start, end int64) string {
return composeCachingKey("subrange", bucketID, name, strconv.FormatInt(start, 10), strconv.FormatInt(end, 10))
}
Expand All @@ -663,6 +697,10 @@ func cachingKeyContent(bucketID, name string) string {
return composeCachingKey("content", bucketID, name)
}

func cachingKeyContentLock(bucketID, name string) string {
return composeCachingKey("content", bucketID, name, "lock")
}

func composeCachingKey(op, bucketID string, values ...string) string {
// Estimate size.
estimatedSize := len(op) + len(bucketID) + (2 + len(values))
Expand Down Expand Up @@ -774,6 +812,7 @@ type getReader struct {
startTime time.Time
ttl time.Duration
cacheKey string
lockKey string
maxSize int
}

Expand All @@ -797,7 +836,13 @@ func (g *getReader) Read(p []byte) (n int, err error) {
if errors.Is(err, io.EOF) && g.buf != nil {
remainingTTL := g.ttl - time.Since(g.startTime)
if remainingTTL > 0 {
g.c.SetMultiAsync(map[string][]byte{g.cacheKey: g.buf.Bytes()}, remainingTTL)
// Attempt to add a "lock" key to the cache if it does not already exist. Only cache this
// content when we were able to insert the lock key meaning this object isn't being updated
// by another request.
addErr := g.c.Add(context.Background(), g.lockKey, []byte(invalidationLockContent), invalidationLockTTL)
if addErr == nil {
g.c.SetMultiAsync(map[string][]byte{g.cacheKey: g.buf.Bytes()}, remainingTTL)
}
}
// Clear reference, to avoid doing another Store on next read.
g.buf = nil
Expand Down
13 changes: 13 additions & 0 deletions vendor/github.com/grafana/dskit/cache/cache.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

62 changes: 45 additions & 17 deletions vendor/github.com/grafana/dskit/cache/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions vendor/github.com/grafana/dskit/cache/compression.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 35 additions & 0 deletions vendor/github.com/grafana/dskit/cache/lru.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 29c560e

Please sign in to comment.