diff --git a/development/mimir-microservices-mode/docker-compose.jsonnet b/development/mimir-microservices-mode/docker-compose.jsonnet index 5a099617ef..b4f5838bd6 100644 --- a/development/mimir-microservices-mode/docker-compose.jsonnet +++ b/development/mimir-microservices-mode/docker-compose.jsonnet @@ -294,7 +294,7 @@ std.manifestYamlDoc({ memcached:: { memcached: { - image: 'memcached:1.6.19-alpine', + image: 'memcached:1.6.28-alpine', ports: [ '11211:11211', ], @@ -303,7 +303,7 @@ std.manifestYamlDoc({ memcached_exporter:: { 'memcached-exporter': { - image: 'prom/memcached-exporter:v0.6.0', + image: 'prom/memcached-exporter:v0.14.4', command: ['--memcached.address=memcached:11211', '--web.listen-address=0.0.0.0:9150'], }, }, diff --git a/development/mimir-microservices-mode/docker-compose.yml b/development/mimir-microservices-mode/docker-compose.yml index b548bbc479..44f9806172 100644 --- a/development/mimir-microservices-mode/docker-compose.yml +++ b/development/mimir-microservices-mode/docker-compose.yml @@ -275,14 +275,14 @@ "ports": - "9900:9900" "memcached": - "image": "memcached:1.6.19-alpine" + "image": "memcached:1.6.28-alpine" "ports": - "11211:11211" "memcached-exporter": "command": - "--memcached.address=memcached:11211" - "--web.listen-address=0.0.0.0:9150" - "image": "prom/memcached-exporter:v0.6.0" + "image": "prom/memcached-exporter:v0.14.4" "minio": "command": - "server" diff --git a/pkg/mimir/mimir.go b/pkg/mimir/mimir.go index a778a05ac3..96556d55fc 100644 --- a/pkg/mimir/mimir.go +++ b/pkg/mimir/mimir.go @@ -723,8 +723,7 @@ type Mimir struct { QueryFrontendTopicOffsetsReader *ingest.TopicOffsetsReader QueryFrontendCodec querymiddleware.Codec Ruler *ruler.Ruler - RulerDirectStorage rulestore.RuleStore - RulerCachedStorage rulestore.RuleStore + RulerStorage rulestore.RuleStore Alertmanager *alertmanager.MultitenantAlertmanager Compactor *compactor.MultitenantCompactor StoreGateway *storegateway.StoreGateway diff --git a/pkg/mimir/modules.go b/pkg/mimir/modules.go index 7d9050259d..96d0065336 100644 --- a/pkg/mimir/modules.go +++ b/pkg/mimir/modules.go @@ -830,13 +830,12 @@ func (t *Mimir) initRulerStorage() (serv services.Service, err error) { // we do accept stale data for about a polling interval (2 intervals in the worst // case scenario due to the jitter applied). cacheTTL := t.Cfg.Ruler.PollInterval - - t.RulerDirectStorage, t.RulerCachedStorage, err = ruler.NewRuleStore(context.Background(), t.Cfg.RulerStorage, t.Overrides, rules.FileLoader{}, cacheTTL, util_log.Logger, t.Registerer) + t.RulerStorage, err = ruler.NewRuleStore(context.Background(), t.Cfg.RulerStorage, t.Overrides, rules.FileLoader{}, cacheTTL, util_log.Logger, t.Registerer) return } func (t *Mimir) initRuler() (serv services.Service, err error) { - if t.RulerDirectStorage == nil { + if t.RulerStorage == nil { level.Info(util_log.Logger).Log("msg", "The ruler storage has not been configured. Not starting the ruler.") return nil, nil } @@ -939,8 +938,7 @@ func (t *Mimir) initRuler() (serv services.Service, err error) { manager, t.Registerer, util_log.Logger, - t.RulerDirectStorage, - t.RulerCachedStorage, + t.RulerStorage, t.Overrides, ) if err != nil { @@ -951,7 +949,7 @@ func (t *Mimir) initRuler() (serv services.Service, err error) { t.API.RegisterRuler(t.Ruler) // Expose HTTP configuration and prometheus-compatible Ruler APIs - t.API.RegisterRulerAPI(ruler.NewAPI(t.Ruler, t.RulerDirectStorage, util_log.Logger), t.Cfg.Ruler.EnableAPI, t.BuildInfoHandler) + t.API.RegisterRulerAPI(ruler.NewAPI(t.Ruler, t.RulerStorage, util_log.Logger), t.Cfg.Ruler.EnableAPI, t.BuildInfoHandler) return t.Ruler, nil } diff --git a/pkg/mimir/modules_test.go b/pkg/mimir/modules_test.go index a726a8609f..c8ecc7b8da 100644 --- a/pkg/mimir/modules_test.go +++ b/pkg/mimir/modules_test.go @@ -159,11 +159,9 @@ func TestMimir_InitRulerStorage(t *testing.T) { require.NoError(t, err) if testData.expectedInit { - assert.NotNil(t, mimir.RulerDirectStorage) - assert.NotNil(t, mimir.RulerCachedStorage) + assert.NotNil(t, mimir.RulerStorage) } else { - assert.Nil(t, mimir.RulerDirectStorage) - assert.Nil(t, mimir.RulerCachedStorage) + assert.Nil(t, mimir.RulerStorage) } }) } diff --git a/pkg/ruler/api.go b/pkg/ruler/api.go index 05f02a91f9..1d205c421b 100644 --- a/pkg/ruler/api.go +++ b/pkg/ruler/api.go @@ -473,7 +473,9 @@ func (a *API) ListRules(w http.ResponseWriter, req *http.Request) { } level.Debug(logger).Log("msg", "retrieving rule groups with namespace", "userID", userID, "namespace", namespace) - rgs, err := a.store.ListRuleGroupsForUserAndNamespace(ctx, userID, namespace) + // Disable any caching when getting list of all rule groups since listing results + // are cached and not invalidated and this API is expected to be strongly consistent. + rgs, err := a.store.ListRuleGroupsForUserAndNamespace(ctx, userID, namespace, rulestore.WithCacheDisabled()) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return @@ -606,7 +608,9 @@ func (a *API) CreateRuleGroup(w http.ResponseWriter, req *http.Request) { // Only list rule groups when enforcing a max number of groups for this tenant and namespace. if a.ruler.IsMaxRuleGroupsLimited(userID, namespace) { - rgs, err := a.store.ListRuleGroupsForUserAndNamespace(ctx, userID, "") + // Disable any caching when getting list of all rule groups since listing results + // are cached and not invalidated and we need the most up-to-date number. + rgs, err := a.store.ListRuleGroupsForUserAndNamespace(ctx, userID, "", rulestore.WithCacheDisabled()) if err != nil { level.Error(logger).Log("msg", "unable to fetch current rule groups for validation", "err", err.Error(), "user", userID) http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/pkg/ruler/api_test.go b/pkg/ruler/api_test.go index e66d247a71..d27c95ef26 100644 --- a/pkg/ruler/api_test.go +++ b/pkg/ruler/api_test.go @@ -163,7 +163,7 @@ func TestRuler_ListRules(t *testing.T) { store.setMissingRuleGroups(tc.missingRules) r := prepareRuler(t, cfg, store, withStart()) - a := NewAPI(r, r.directStore, log.NewNopLogger()) + a := NewAPI(r, r.store, log.NewNopLogger()) router := mux.NewRouter() router.Path("/prometheus/config/v1/rules").Methods("GET").HandlerFunc(a.ListRules) @@ -936,7 +936,7 @@ func TestRuler_PrometheusRules(t *testing.T) { return len(rls.Groups) }) - a := NewAPI(r, r.directStore, log.NewNopLogger()) + a := NewAPI(r, r.store, log.NewNopLogger()) req := requestFor(t, http.MethodGet, "https://localhost:8080/prometheus/api/v1/rules"+tc.queryParams, nil, userID) w := httptest.NewRecorder() @@ -993,7 +993,7 @@ func TestRuler_PrometheusAlerts(t *testing.T) { return len(rls.Groups) }) - a := NewAPI(r, r.directStore, log.NewNopLogger()) + a := NewAPI(r, r.store, log.NewNopLogger()) req := requestFor(t, http.MethodGet, "https://localhost:8080/prometheus/api/v1/alerts", nil, "user1") w := httptest.NewRecorder() @@ -1172,7 +1172,7 @@ rules: reg := prometheus.NewPedanticRegistry() r := prepareRuler(t, rulerCfg, newMockRuleStore(make(map[string]rulespb.RuleGroupList)), withStart(), withRulerAddrAutomaticMapping(), withPrometheusRegisterer(reg)) - a := NewAPI(r, r.directStore, log.NewNopLogger()) + a := NewAPI(r, r.store, log.NewNopLogger()) router := mux.NewRouter() router.Path("/prometheus/config/v1/rules/{namespace}").Methods("POST").HandlerFunc(a.CreateRuleGroup) @@ -1207,6 +1207,92 @@ rules: } } +func TestAPI_CreateRuleGroupWithCaching(t *testing.T) { + // Configure the ruler to only sync the rules based on notifications upon API changes. + cfg := defaultRulerConfig(t) + cfg.PollInterval = time.Hour + cfg.OutboundSyncQueuePollInterval = 100 * time.Millisecond + cfg.InboundSyncQueuePollInterval = 100 * time.Millisecond + + const successResponse = `{"status":"success","data":null,"errorType":"","error":""}` + + ruleGroupVersion1 := `name: group1 +interval: 15s +rules: + - record: up_rule + expr: up + - alert: up_alert + expr: up < 1 +` + ruleGroupVersion2 := `name: group1 +interval: 15s +rules: + - record: up_rule + expr: up + - alert: up_alert + expr: up <= 1 +` + + mockCache, store := newInMemoryRuleStore(t) + + reg := prometheus.NewPedanticRegistry() + // Set rule group limits since this performs a list call to count the current number of rule groups + // and we're testing if the API layer is correctly telling the rule store not to serve cached results. + r := prepareRuler(t, cfg, store, withStart(), withRulerAddrAutomaticMapping(), withPrometheusRegisterer(reg), withLimits(validation.MockOverrides(func(defaults *validation.Limits, _ map[string]*validation.Limits) { + defaults.RulerMaxRuleGroupsPerTenant = 2 + defaults.RulerMaxRulesPerRuleGroup = 2 + }))) + a := NewAPI(r, r.store, log.NewNopLogger()) + + router := mux.NewRouter() + router.Path("/prometheus/config/v1/rules/{namespace}/{groupName}").Methods(http.MethodGet).HandlerFunc(a.GetRuleGroup) + router.Path("/prometheus/config/v1/rules/{namespace}").Methods(http.MethodPost).HandlerFunc(a.CreateRuleGroup) + + // Pre-condition check: the ruler should have run the initial rules sync. + verifySyncRulesMetric(t, reg, 1, 0) + + // Store the initial version of the rule group + req := requestFor(t, http.MethodPost, "https://localhost:8080/prometheus/config/v1/rules/namespace1", strings.NewReader(ruleGroupVersion1), "user1") + w := httptest.NewRecorder() + router.ServeHTTP(w, req) + assert.Equal(t, http.StatusAccepted, w.Code) + assert.Equal(t, successResponse, w.Body.String()) + // Invalidation of exists and content + assert.Equal(t, 2, mockCache.CountDeleteCalls()) + + verifySyncRulesMetric(t, reg, 1, 1) + + // Fetch it back and ensure the content is what we expect even though content can be cached + req = requestFor(t, http.MethodGet, "https://localhost:8080/prometheus/config/v1/rules/namespace1/group1", nil, "user1") + w = httptest.NewRecorder() + router.ServeHTTP(w, req) + assert.Equal(t, http.StatusOK, w.Code) + assert.Equal(t, ruleGroupVersion1, w.Body.String()) + // Iter from initial sync, get, iter from sync + assert.Equal(t, 3, mockCache.CountFetchCalls()) + + // Store a new version of the group that is slightly different + req = requestFor(t, http.MethodPost, "https://localhost:8080/prometheus/config/v1/rules/namespace1", strings.NewReader(ruleGroupVersion2), "user1") + w = httptest.NewRecorder() + router.ServeHTTP(w, req) + assert.Equal(t, http.StatusAccepted, w.Code) + assert.Equal(t, successResponse, w.Body.String()) + // Invalidating exists and content again + assert.Equal(t, 4, mockCache.CountDeleteCalls()) + + verifySyncRulesMetric(t, reg, 1, 2) + + // Fetch it back and ensure content is updated to the new version meaning the cache was invalidated + req = requestFor(t, http.MethodGet, "https://localhost:8080/prometheus/config/v1/rules/namespace1/group1", nil, "user1") + w = httptest.NewRecorder() + router.ServeHTTP(w, req) + assert.Equal(t, http.StatusOK, w.Code) + assert.Equal(t, ruleGroupVersion2, w.Body.String()) + // Iter from initial sync, get, iter from sync, another get, iter from sync + assert.Equal(t, 5, mockCache.CountFetchCalls()) + +} + func TestAPI_DeleteNamespace(t *testing.T) { // Configure the ruler to only sync the rules based on notifications upon API changes. cfg := defaultRulerConfig(t) @@ -1237,7 +1323,7 @@ func TestAPI_DeleteNamespace(t *testing.T) { reg := prometheus.NewPedanticRegistry() r := prepareRuler(t, cfg, newMockRuleStore(mockRulesNamespaces), withStart(), withRulerAddrAutomaticMapping(), withPrometheusRegisterer(reg)) - a := NewAPI(r, r.directStore, log.NewNopLogger()) + a := NewAPI(r, r.store, log.NewNopLogger()) router := mux.NewRouter() router.Path("/prometheus/config/v1/rules/{namespace}").Methods(http.MethodDelete).HandlerFunc(a.DeleteNamespace) @@ -1294,7 +1380,7 @@ func TestAPI_DeleteRuleGroup(t *testing.T) { reg := prometheus.NewPedanticRegistry() r := prepareRuler(t, cfg, newMockRuleStore(mockRulesNamespaces), withStart(), withRulerAddrAutomaticMapping(), withPrometheusRegisterer(reg)) - a := NewAPI(r, r.directStore, log.NewNopLogger()) + a := NewAPI(r, r.store, log.NewNopLogger()) router := mux.NewRouter() router.Path("/prometheus/config/v1/rules/{namespace}/{groupName}").Methods(http.MethodDelete).HandlerFunc(a.DeleteRuleGroup) @@ -1336,7 +1422,7 @@ func TestRuler_LimitsPerGroup(t *testing.T) { defaults.RulerMaxRulesPerRuleGroup = 1 }))) - a := NewAPI(r, r.directStore, log.NewNopLogger()) + a := NewAPI(r, r.store, log.NewNopLogger()) tc := []struct { name string @@ -1389,7 +1475,7 @@ func TestRuler_RulerGroupLimits(t *testing.T) { defaults.RulerMaxRulesPerRuleGroup = 1 }))) - a := NewAPI(r, r.directStore, log.NewNopLogger()) + a := NewAPI(r, r.store, log.NewNopLogger()) tc := []struct { name string @@ -1449,7 +1535,7 @@ func TestRuler_RulerGroupLimitsDisabled(t *testing.T) { defaults.RulerMaxRulesPerRuleGroup = 0 }))) - a := NewAPI(r, r.directStore, log.NewNopLogger()) + a := NewAPI(r, r.store, log.NewNopLogger()) tc := []struct { name string @@ -1551,7 +1637,7 @@ func TestAPIRoutesCorrectlyHandleInvalidOrgID(t *testing.T) { r := prepareRuler(t, cfg, newMockRuleStore(map[string]rulespb.RuleGroupList{}), withStart()) - a := NewAPI(r, r.directStore, log.NewNopLogger()) + a := NewAPI(r, r.store, log.NewNopLogger()) router := mux.NewRouter() router.Path("/api/v1/rules").Methods(http.MethodGet).HandlerFunc(a.PrometheusRules) diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index 9ccc2a00f2..9c8cddf168 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -38,7 +38,6 @@ import ( "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/ruler/rulespb" "github.com/grafana/mimir/pkg/ruler/rulestore" - "github.com/grafana/mimir/pkg/storage/tsdb/bucketcache" "github.com/grafana/mimir/pkg/util" "github.com/grafana/mimir/pkg/util/grpcencoding/s2" util_log "github.com/grafana/mimir/pkg/util/log" @@ -314,13 +313,12 @@ type MultiTenantManager interface { type Ruler struct { services.Service - cfg Config - lifecycler *ring.BasicLifecycler - ring *ring.Ring - directStore rulestore.RuleStore - cachedStore rulestore.RuleStore - manager MultiTenantManager - limits RulesLimits + cfg Config + lifecycler *ring.BasicLifecycler + ring *ring.Ring + store rulestore.RuleStore + manager MultiTenantManager + limits RulesLimits metrics *rulerMetrics @@ -346,20 +344,14 @@ type Ruler struct { } // NewRuler creates a new ruler from a distributor and chunk store. -func NewRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, logger log.Logger, directStore, cachedStore rulestore.RuleStore, limits RulesLimits) (*Ruler, error) { - // If the cached store is not configured, just fallback to the direct one. - if cachedStore == nil { - cachedStore = directStore - } - - return newRuler(cfg, manager, reg, logger, directStore, cachedStore, limits, newRulerClientPool(cfg.ClientTLSConfig, logger, reg)) +func NewRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, logger log.Logger, store rulestore.RuleStore, limits RulesLimits) (*Ruler, error) { + return newRuler(cfg, manager, reg, logger, store, limits, newRulerClientPool(cfg.ClientTLSConfig, logger, reg)) } -func newRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, logger log.Logger, directStore, cachedStore rulestore.RuleStore, limits RulesLimits, clientPool ClientsPool) (*Ruler, error) { +func newRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, logger log.Logger, store rulestore.RuleStore, limits RulesLimits, clientPool ClientsPool) (*Ruler, error) { ruler := &Ruler{ cfg: cfg, - directStore: directStore, - cachedStore: cachedStore, + store: store, manager: manager, registry: reg, logger: logger, @@ -633,7 +625,7 @@ func (r *Ruler) syncRules(ctx context.Context, userIDs []string, reason rulesSyn func (r *Ruler) loadRuleGroupsToSync(ctx context.Context, configs map[string]rulespb.RuleGroupList) (map[string]rulespb.RuleGroupList, error) { // Load rule groups. start := time.Now() - missing, err := r.directStore.LoadRuleGroups(ctx, configs) + missing, err := r.store.LoadRuleGroups(ctx, configs) r.metrics.loadRuleGroups.Observe(time.Since(start).Seconds()) if err != nil { @@ -660,7 +652,12 @@ func (r *Ruler) listRuleGroupsToSyncForAllUsers(ctx context.Context, reason rule // In order to reduce API calls to the object storage among all ruler replicas, // we support lookup of stale data for a short period. - users, err := r.cachedStore.ListAllUsers(bucketcache.WithCacheLookupEnabled(ctx, cacheLookupEnabled)) + var opts []rulestore.Option + if !cacheLookupEnabled { + opts = append(opts, rulestore.WithCacheDisabled()) + } + + users, err := r.store.ListAllUsers(ctx, opts...) if err != nil { return nil, errors.Wrap(err, "unable to list users of ruler") } @@ -711,11 +708,16 @@ func (r *Ruler) listRuleGroupsToSyncForUsers(ctx context.Context, userIDs []stri concurrency = len(userRings) } + var opts []rulestore.Option + if !cacheLookupEnabled { + opts = append(opts, rulestore.WithCacheDisabled()) + } + g, gctx := errgroup.WithContext(ctx) for i := 0; i < concurrency; i++ { g.Go(func() error { for userID := range userCh { - groups, err := r.cachedStore.ListRuleGroupsForUserAndNamespace(bucketcache.WithCacheLookupEnabled(gctx, cacheLookupEnabled), userID, "") + groups, err := r.store.ListRuleGroupsForUserAndNamespace(gctx, userID, "", opts...) if err != nil { return errors.Wrapf(err, "failed to fetch rule groups for user %s", userID) } @@ -1224,7 +1226,7 @@ func (r *Ruler) DeleteTenantConfiguration(w http.ResponseWriter, req *http.Reque return } - err = r.directStore.DeleteNamespace(req.Context(), userID, "") // Empty namespace = delete all rule groups. + err = r.store.DeleteNamespace(req.Context(), userID, "") // Empty namespace = delete all rule groups. if err != nil && !errors.Is(err, rulestore.ErrGroupNamespaceNotFound) { respondServerError(logger, w, err.Error()) return @@ -1238,8 +1240,8 @@ func (r *Ruler) DeleteTenantConfiguration(w http.ResponseWriter, req *http.Reque func (r *Ruler) ListAllRules(w http.ResponseWriter, req *http.Request) { logger := util_log.WithContext(req.Context(), r.logger) - - userIDs, err := r.directStore.ListAllUsers(req.Context()) + // Disable caching when getting a list of users since this API is expected to be strongly consistent. + userIDs, err := r.store.ListAllUsers(req.Context(), rulestore.WithCacheDisabled()) if err != nil { level.Error(logger).Log("msg", errListAllUser, "err", err) http.Error(w, fmt.Sprintf("%s: %s", errListAllUser, err.Error()), http.StatusInternalServerError) @@ -1255,12 +1257,14 @@ func (r *Ruler) ListAllRules(w http.ResponseWriter, req *http.Request) { }() err = concurrency.ForEachUser(req.Context(), userIDs, fetchRulesConcurrency, func(ctx context.Context, userID string) error { - rg, err := r.directStore.ListRuleGroupsForUserAndNamespace(ctx, userID, "") + // Disable any caching when getting list of all rule groups since listing results + // are cached and not invalidated and this API is expected to be strongly consistent. + rg, err := r.store.ListRuleGroupsForUserAndNamespace(ctx, userID, "", rulestore.WithCacheDisabled()) if err != nil { return errors.Wrapf(err, "failed to fetch ruler config for user %s", userID) } userRules := map[string]rulespb.RuleGroupList{userID: rg} - if missing, err := r.directStore.LoadRuleGroups(ctx, userRules); err != nil { + if missing, err := r.store.LoadRuleGroups(ctx, userRules); err != nil { return errors.Wrapf(err, "failed to load ruler config for user %s", userID) } else if len(missing) > 0 { // This API is expected to be strongly consistent, so it's an error if any rule group was missing. diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index 6a9533ad1b..a10c4005f7 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -54,12 +54,12 @@ import ( "github.com/grafana/mimir/pkg/storage/bucket" "github.com/grafana/mimir/pkg/storage/bucket/filesystem" "github.com/grafana/mimir/pkg/util" - util_test "github.com/grafana/mimir/pkg/util/test" + utiltest "github.com/grafana/mimir/pkg/util/test" "github.com/grafana/mimir/pkg/util/validation" ) func TestMain(m *testing.M) { - util_test.VerifyNoLeakTestMain(m) + utiltest.VerifyNoLeakTestMain(m) } func defaultRulerConfig(t testing.TB) Config { @@ -210,7 +210,7 @@ func prepareRuler(t *testing.T, cfg Config, storage rulestore.RuleStore, opts .. options := applyPrepareOptions(t, cfg.Ring.Common.InstanceID, opts...) manager := prepareRulerManager(t, cfg, opts...) - ruler, err := newRuler(cfg, manager, options.registerer, options.logger, storage, storage, options.limits, newMockClientsPool(cfg, options.logger, options.registerer, options.rulerAddrMap)) + ruler, err := newRuler(cfg, manager, options.registerer, options.logger, storage, options.limits, newMockClientsPool(cfg, options.logger, options.registerer, options.rulerAddrMap)) require.NoError(t, err) if options.rulerAddrAutoMap { @@ -1571,7 +1571,7 @@ func verifyExpectedDeletedRuleGroupsForUser(t *testing.T, r *Ruler, userID strin ctx := context.Background() t.Run("ListRuleGroupsForUserAndNamespace()", func(t *testing.T) { - list, err := r.directStore.ListRuleGroupsForUserAndNamespace(ctx, userID, "") + list, err := r.store.ListRuleGroupsForUserAndNamespace(ctx, userID, "") require.NoError(t, err) if expectedDeleted { diff --git a/pkg/ruler/rulestore/bucketclient/bucket_client.go b/pkg/ruler/rulestore/bucketclient/bucket_client.go index dfc87128ef..4a75cb2f48 100644 --- a/pkg/ruler/rulestore/bucketclient/bucket_client.go +++ b/pkg/ruler/rulestore/bucketclient/bucket_client.go @@ -24,6 +24,8 @@ import ( "github.com/grafana/mimir/pkg/ruler/rulespb" "github.com/grafana/mimir/pkg/ruler/rulestore" "github.com/grafana/mimir/pkg/storage/bucket" + "github.com/grafana/mimir/pkg/storage/tsdb/bucketcache" + "github.com/grafana/mimir/pkg/util/spanlogger" ) const ( @@ -57,13 +59,13 @@ func NewBucketRuleStore(bkt objstore.Bucket, cfgProvider bucket.TenantConfigProv } // getRuleGroup loads and return a rules group. If existing rule group is supplied, it is Reset and reused. If nil, new RuleGroupDesc is allocated. -func (b *BucketRuleStore) getRuleGroup(ctx context.Context, userID, namespace, groupName string, rg *rulespb.RuleGroupDesc) (*rulespb.RuleGroupDesc, error) { +func (b *BucketRuleStore) getRuleGroup(ctx context.Context, userID, namespace, groupName string, rg *rulespb.RuleGroupDesc, spanlog *spanlogger.SpanLogger) (*rulespb.RuleGroupDesc, error) { userBucket := bucket.NewUserBucketClient(userID, b.bucket, b.cfgProvider) objectKey := getRuleGroupObjectKey(namespace, groupName) reader, err := userBucket.Get(ctx, objectKey) if userBucket.IsObjNotFoundErr(err) { - level.Debug(b.logger).Log("msg", "rule group does not exist", "user", userID, "key", objectKey) + spanlog.DebugLog("msg", "rule group does not exist", "user", userID, "key", objectKey) return nil, rulestore.ErrGroupNotFound } @@ -92,7 +94,15 @@ func (b *BucketRuleStore) getRuleGroup(ctx context.Context, userID, namespace, g } // ListAllUsers implements rules.RuleStore. -func (b *BucketRuleStore) ListAllUsers(ctx context.Context) ([]string, error) { +func (b *BucketRuleStore) ListAllUsers(ctx context.Context, opts ...rulestore.Option) ([]string, error) { + logger, ctx := spanlogger.NewWithLogger(ctx, b.logger, "BucketRuleStore.ListAllUsers") + defer logger.Finish() + + options := rulestore.CollectOptions(opts...) + if options.DisableCache { + ctx = bucketcache.WithCacheLookupEnabled(ctx, false) + } + var users []string err := b.bucket.Iter(ctx, "", func(user string) error { users = append(users, strings.TrimSuffix(user, objstore.DirDelim)) @@ -106,11 +116,18 @@ func (b *BucketRuleStore) ListAllUsers(ctx context.Context) ([]string, error) { } // ListRuleGroupsForUserAndNamespace implements rules.RuleStore. -func (b *BucketRuleStore) ListRuleGroupsForUserAndNamespace(ctx context.Context, userID string, namespace string) (rulespb.RuleGroupList, error) { - userBucket := bucket.NewUserBucketClient(userID, b.bucket, b.cfgProvider) +func (b *BucketRuleStore) ListRuleGroupsForUserAndNamespace(ctx context.Context, userID string, namespace string, opts ...rulestore.Option) (rulespb.RuleGroupList, error) { + logger, ctx := spanlogger.NewWithLogger(ctx, b.logger, "BucketRuleStore.ListRuleGroupsForUserAndNamespace") + defer logger.Finish() + userBucket := bucket.NewUserBucketClient(userID, b.bucket, b.cfgProvider) groupList := rulespb.RuleGroupList{} + options := rulestore.CollectOptions(opts...) + if options.DisableCache { + ctx = bucketcache.WithCacheLookupEnabled(ctx, false) + } + // The prefix to list objects depends on whether the namespace has been // specified in the request. prefix := "" @@ -121,7 +138,7 @@ func (b *BucketRuleStore) ListRuleGroupsForUserAndNamespace(ctx context.Context, err := userBucket.Iter(ctx, prefix, func(key string) error { namespace, group, err := parseRuleGroupObjectKey(key) if err != nil { - level.Warn(b.logger).Log("msg", "invalid rule group object key found while listing rule groups", "user", userID, "key", key, "err", err) + level.Warn(logger).Log("msg", "invalid rule group object key found while listing rule groups", "user", userID, "key", key, "err", err) // Do not fail just because of a spurious item in the bucket. return nil @@ -143,6 +160,9 @@ func (b *BucketRuleStore) ListRuleGroupsForUserAndNamespace(ctx context.Context, // LoadRuleGroups implements rules.RuleStore. func (b *BucketRuleStore) LoadRuleGroups(ctx context.Context, groupsToLoad map[string]rulespb.RuleGroupList) (missing rulespb.RuleGroupList, err error) { + logger, ctx := spanlogger.NewWithLogger(ctx, b.logger, "BucketRuleStore.LoadRuleGroups") + defer logger.Finish() + var ( ch = make(chan *rulespb.RuleGroupDesc) missingMx sync.Mutex @@ -161,7 +181,7 @@ func (b *BucketRuleStore) LoadRuleGroups(ctx context.Context, groupsToLoad map[s } // Reuse group pointer from the map. - loadedGroup, err := b.getRuleGroup(gCtx, user, namespace, groupName, inputGroup) + loadedGroup, err := b.getRuleGroup(gCtx, user, namespace, groupName, inputGroup, logger) switch { case errors.Is(err, rulestore.ErrGroupNotFound): @@ -202,11 +222,17 @@ outer: // GetRuleGroup implements rules.RuleStore. func (b *BucketRuleStore) GetRuleGroup(ctx context.Context, userID string, namespace string, group string) (*rulespb.RuleGroupDesc, error) { - return b.getRuleGroup(ctx, userID, namespace, group, nil) + logger, ctx := spanlogger.NewWithLogger(ctx, b.logger, "BucketRuleStore.GetRuleGroup") + defer logger.Finish() + + return b.getRuleGroup(ctx, userID, namespace, group, nil, logger) } // SetRuleGroup implements rules.RuleStore. func (b *BucketRuleStore) SetRuleGroup(ctx context.Context, userID string, namespace string, group *rulespb.RuleGroupDesc) error { + logger, ctx := spanlogger.NewWithLogger(ctx, b.logger, "BucketRuleStore.SetRuleGroup") + defer logger.Finish() + userBucket := bucket.NewUserBucketClient(userID, b.bucket, b.cfgProvider) data, err := proto.Marshal(group) if err != nil { @@ -218,6 +244,9 @@ func (b *BucketRuleStore) SetRuleGroup(ctx context.Context, userID string, names // DeleteRuleGroup implements rules.RuleStore. func (b *BucketRuleStore) DeleteRuleGroup(ctx context.Context, userID string, namespace string, group string) error { + logger, ctx := spanlogger.NewWithLogger(ctx, b.logger, "BucketRuleStore.DeleteRuleGroup") + defer logger.Finish() + userBucket := bucket.NewUserBucketClient(userID, b.bucket, b.cfgProvider) err := userBucket.Delete(ctx, getRuleGroupObjectKey(namespace, group)) if b.bucket.IsObjNotFoundErr(err) { @@ -228,7 +257,12 @@ func (b *BucketRuleStore) DeleteRuleGroup(ctx context.Context, userID string, na // DeleteNamespace implements rules.RuleStore. func (b *BucketRuleStore) DeleteNamespace(ctx context.Context, userID string, namespace string) error { - ruleGroupList, err := b.ListRuleGroupsForUserAndNamespace(ctx, userID, namespace) + logger, ctx := spanlogger.NewWithLogger(ctx, b.logger, "BucketRuleStore.DeleteNamespace") + defer logger.Finish() + + // Disable caching when listing all rule groups for a user since listing entries are not + // invalidated in the cache when rule groups are modified and we need to delete everything. + ruleGroupList, err := b.ListRuleGroupsForUserAndNamespace(ctx, userID, namespace, rulestore.WithCacheDisabled()) if err != nil { return err } @@ -243,10 +277,10 @@ func (b *BucketRuleStore) DeleteNamespace(ctx context.Context, userID string, na return err } objectKey := getRuleGroupObjectKey(rg.Namespace, rg.Name) - level.Debug(b.logger).Log("msg", "deleting rule group", "user", userID, "namespace", namespace, "key", objectKey) + logger.DebugLog("msg", "deleting rule group", "user", userID, "namespace", namespace, "key", objectKey) err = userBucket.Delete(ctx, objectKey) if err != nil { - level.Error(b.logger).Log("msg", "unable to delete rule group from namespace", "user", userID, "namespace", namespace, "key", objectKey, "err", err) + level.Error(logger).Log("msg", "unable to delete rule group from namespace", "user", userID, "namespace", namespace, "key", objectKey, "err", err) return err } } diff --git a/pkg/ruler/rulestore/bucketclient/bucket_client_test.go b/pkg/ruler/rulestore/bucketclient/bucket_client_test.go index b34aae4f3f..a1251892b5 100644 --- a/pkg/ruler/rulestore/bucketclient/bucket_client_test.go +++ b/pkg/ruler/rulestore/bucketclient/bucket_client_test.go @@ -13,7 +13,9 @@ import ( "time" "github.com/go-kit/log" + "github.com/grafana/dskit/cache" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/rulefmt" "github.com/stretchr/testify/assert" @@ -24,6 +26,7 @@ import ( "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/ruler/rulespb" "github.com/grafana/mimir/pkg/ruler/rulestore" + "github.com/grafana/mimir/pkg/storage/tsdb/bucketcache" ) type testGroup struct { @@ -443,3 +446,153 @@ func (mb mockBucket) Iter(_ context.Context, _ string, f func(string) error, _ . } return nil } + +func TestCachingAndInvalidation(t *testing.T) { + fixtureGroups := []testGroup{ + {user: "user1", namespace: "hello", ruleGroup: rulefmt.RuleGroup{Name: "first testGroup"}}, + {user: "user1", namespace: "hello", ruleGroup: rulefmt.RuleGroup{Name: "second testGroup"}}, + {user: "user1", namespace: "world", ruleGroup: rulefmt.RuleGroup{Name: "another namespace testGroup"}}, + {user: "user2", namespace: "+-!@#$%. ", ruleGroup: rulefmt.RuleGroup{Name: "different user"}}, + } + + setup := func(t *testing.T) (*cache.InstrumentedMockCache, *BucketRuleStore) { + iterCodec := &bucketcache.JSONIterCodec{} + baseClient := objstore.NewInMemBucket() + mockCache := cache.NewInstrumentedMockCache() + + cacheCfg := bucketcache.NewCachingBucketConfig() + cacheCfg.CacheIter("rule-iter", mockCache, matchAll, time.Minute, iterCodec) + cacheCfg.CacheGet("rule-groups", mockCache, matchAll, 1024^2, time.Minute, time.Minute, time.Minute, true) + cacheClient, err := bucketcache.NewCachingBucket("rule-store", baseClient, cacheCfg, log.NewNopLogger(), prometheus.NewPedanticRegistry()) + require.NoError(t, err) + + ruleStore := NewBucketRuleStore(cacheClient, nil, log.NewNopLogger()) + + for _, g := range fixtureGroups { + desc := rulespb.ToProto(g.user, g.namespace, g.ruleGroup) + require.NoError(t, ruleStore.SetRuleGroup(context.Background(), g.user, g.namespace, desc)) + } + + return mockCache, ruleStore + } + + t.Run("list users with cache", func(t *testing.T) { + mockCache, ruleStore := setup(t) + users, err := ruleStore.ListAllUsers(context.Background()) + + require.NoError(t, err) + require.Equal(t, []string{"user1", "user2"}, users) + + require.Equal(t, 1, mockCache.CountStoreCalls()) + require.Equal(t, 1, mockCache.CountFetchCalls()) + }) + + t.Run("list users no cache", func(t *testing.T) { + mockCache, rs := setup(t) + users, err := rs.ListAllUsers(context.Background(), rulestore.WithCacheDisabled()) + + require.NoError(t, err) + require.Equal(t, []string{"user1", "user2"}, users) + + require.Equal(t, 1, mockCache.CountStoreCalls()) + require.Equal(t, 0, mockCache.CountFetchCalls()) + }) + + t.Run("list rule groups with cache", func(t *testing.T) { + mockCache, ruleStore := setup(t) + groups, err := ruleStore.ListRuleGroupsForUserAndNamespace(context.Background(), "user1", "") + + require.NoError(t, err) + require.Equal(t, rulespb.RuleGroupList{ + { + Name: "first testGroup", + User: "user1", + Namespace: "hello", + }, + { + Name: "second testGroup", + User: "user1", + Namespace: "hello", + }, + { + Name: "another namespace testGroup", + User: "user1", + Namespace: "world", + }, + }, groups) + + require.Equal(t, 1, mockCache.CountStoreCalls()) + require.Equal(t, 1, mockCache.CountFetchCalls()) + }) + + t.Run("list rule groups no cache", func(t *testing.T) { + mockCache, ruleStore := setup(t) + groups, err := ruleStore.ListRuleGroupsForUserAndNamespace(context.Background(), "user1", "", rulestore.WithCacheDisabled()) + + require.NoError(t, err) + require.Equal(t, rulespb.RuleGroupList{ + { + Name: "first testGroup", + User: "user1", + Namespace: "hello", + }, + { + Name: "second testGroup", + User: "user1", + Namespace: "hello", + }, + { + Name: "another namespace testGroup", + User: "user1", + Namespace: "world", + }, + }, groups) + + require.Equal(t, 1, mockCache.CountStoreCalls()) + require.Equal(t, 0, mockCache.CountFetchCalls()) + }) + + t.Run("get rule group from cache", func(t *testing.T) { + mockCache, ruleStore := setup(t) + group, err := ruleStore.GetRuleGroup(context.Background(), "user1", "world", "another namespace testGroup") + + require.NoError(t, err) + require.NotNil(t, group) + + require.Equal(t, 2, mockCache.CountStoreCalls()) + require.Equal(t, 1, mockCache.CountFetchCalls()) + }) + + t.Run("get rule groups after invalidation", func(t *testing.T) { + mockCache, ruleStore := setup(t) + group, err := ruleStore.GetRuleGroup(context.Background(), "user1", "world", "another namespace testGroup") + + require.NoError(t, err) + require.NotNil(t, group) + require.Zero(t, group.QueryOffset) + + require.Equal(t, 2, mockCache.CountStoreCalls()) + require.Equal(t, 1, mockCache.CountFetchCalls()) + + origDeletes := mockCache.CountDeleteCalls() + group.QueryOffset = 42 * time.Second + require.NoError(t, ruleStore.SetRuleGroup(context.Background(), group.User, group.Namespace, group)) + + require.Equal(t, 2, mockCache.CountStoreCalls()) + require.Equal(t, 1, mockCache.CountFetchCalls()) + require.Equal(t, 2, mockCache.CountDeleteCalls()-origDeletes) + + modifiedGroup, err := ruleStore.GetRuleGroup(context.Background(), "user1", "world", "another namespace testGroup") + require.NoError(t, err) + require.NotNil(t, modifiedGroup) + require.Equal(t, 42*time.Second, modifiedGroup.QueryOffset) + + require.Equal(t, 4, mockCache.CountStoreCalls()) + require.Equal(t, 2, mockCache.CountFetchCalls()) + require.Equal(t, 2, mockCache.CountDeleteCalls()-origDeletes) + }) +} + +func matchAll(string) bool { + return true +} diff --git a/pkg/ruler/rulestore/local/local.go b/pkg/ruler/rulestore/local/local.go index ff849c2d56..acdde11031 100644 --- a/pkg/ruler/rulestore/local/local.go +++ b/pkg/ruler/rulestore/local/local.go @@ -37,7 +37,7 @@ func NewLocalRulesClient(cfg rulestore.LocalStoreConfig, loader promRules.GroupL }, nil } -func (l *Client) ListAllUsers(_ context.Context) ([]string, error) { +func (l *Client) ListAllUsers(_ context.Context, _ ...rulestore.Option) ([]string, error) { root := l.cfg.Directory infos, err := os.ReadDir(root) if err != nil { @@ -69,7 +69,7 @@ func (l *Client) ListAllUsers(_ context.Context) ([]string, error) { } // ListRuleGroupsForUserAndNamespace implements rules.RuleStore. This method also loads the rules. -func (l *Client) ListRuleGroupsForUserAndNamespace(ctx context.Context, userID string, namespace string) (rulespb.RuleGroupList, error) { +func (l *Client) ListRuleGroupsForUserAndNamespace(ctx context.Context, userID string, namespace string, _ ...rulestore.Option) (rulespb.RuleGroupList, error) { if namespace == "" { return l.loadAllRulesGroupsForUser(ctx, userID) } diff --git a/pkg/ruler/rulestore/store.go b/pkg/ruler/rulestore/store.go index 50f0ec60d2..06e753b453 100644 --- a/pkg/ruler/rulestore/store.go +++ b/pkg/ruler/rulestore/store.go @@ -21,18 +21,44 @@ var ( ErrUserNotFound = errors.New("no rule groups found for user") ) +// Options are per-call options that can be used to modify the behavior of RuleStore methods. +type Options struct { + DisableCache bool +} + +// CollectOptions applies one or more Option callbacks to produce an Options struct. +func CollectOptions(opts ...Option) *Options { + o := &Options{} + for _, opt := range opts { + opt(o) + } + + return o +} + +// Option is a callback the modifies per-call options for RuleStore methods. +type Option func(opts *Options) + +// WithCacheDisabled returns an Option callback to disable any caching used +// by a RuleStore method call. +func WithCacheDisabled() Option { + return func(opts *Options) { + opts.DisableCache = true + } +} + // RuleStore is used to store and retrieve rules. // Methods starting with "List" prefix may return partially loaded groups: with only group Name, Namespace and User fields set. // To make sure that rules within each group are loaded, client must use LoadRuleGroups method. type RuleStore interface { // ListAllUsers returns all users with rule groups configured. - ListAllUsers(ctx context.Context) ([]string, error) + ListAllUsers(ctx context.Context, opts ...Option) ([]string, error) // ListRuleGroupsForUserAndNamespace returns all the active rule groups for a user from given namespace. // It *MUST* populate fields User, Namespace, Name of all rule groups. // It *MAY* populate the actual rules. // If namespace is empty, groups from all namespaces are returned. - ListRuleGroupsForUserAndNamespace(ctx context.Context, userID string, namespace string) (rulespb.RuleGroupList, error) + ListRuleGroupsForUserAndNamespace(ctx context.Context, userID string, namespace string, opts ...Option) (rulespb.RuleGroupList, error) // LoadRuleGroups loads rules for each rule group in the map. // diff --git a/pkg/ruler/storage.go b/pkg/ruler/storage.go index 8a9398beaa..13dc1a32be 100644 --- a/pkg/ruler/storage.go +++ b/pkg/ruler/storage.go @@ -7,6 +7,7 @@ package ruler import ( "context" + "strings" "time" "github.com/go-kit/log" @@ -25,14 +26,14 @@ import ( ) // NewRuleStore returns a rule store backend client based on the provided cfg. -func NewRuleStore(ctx context.Context, cfg rulestore.Config, cfgProvider bucket.TenantConfigProvider, loader promRules.GroupLoader, cacheTTL time.Duration, logger log.Logger, reg prometheus.Registerer) (directStore, cachedStore rulestore.RuleStore, _ error) { +func NewRuleStore(ctx context.Context, cfg rulestore.Config, cfgProvider bucket.TenantConfigProvider, loader promRules.GroupLoader, cacheTTL time.Duration, logger log.Logger, reg prometheus.Registerer) (store rulestore.RuleStore, _ error) { if cfg.Backend == rulestore.BackendLocal { store, err := local.NewLocalRulesClient(cfg.Local, loader) if err != nil { - return nil, nil, err + return nil, err } - return store, store, nil + return store, nil } if cfg.Backend == bucket.Filesystem { @@ -41,18 +42,15 @@ func NewRuleStore(ctx context.Context, cfg rulestore.Config, cfgProvider bucket. directBucketClient, err := bucket.NewClient(ctx, cfg.Config, "ruler-storage", logger, reg) if err != nil { - return nil, nil, err + return nil, err } cachedBucketClient, err := wrapBucketWithCache(directBucketClient, cfg, cacheTTL, logger, reg) if err != nil { - return nil, nil, err + return nil, err } - directStore = bucketclient.NewBucketRuleStore(directBucketClient, cfgProvider, logger) - cachedStore = bucketclient.NewBucketRuleStore(cachedBucketClient, cfgProvider, logger) - - return directStore, cachedStore, nil + return bucketclient.NewBucketRuleStore(cachedBucketClient, cfgProvider, logger), nil } func wrapBucketWithCache(bkt objstore.Bucket, cfg rulestore.Config, cacheTTL time.Duration, logger log.Logger, reg prometheus.Registerer) (objstore.Bucket, error) { @@ -85,3 +83,7 @@ func wrapBucketWithCache(bkt objstore.Bucket, cfg rulestore.Config, cacheTTL tim func isNotTenantsDir(name string) bool { return name != "" } + +func isRuleGroup(name string) bool { + return strings.HasPrefix(name, "rules/") +} diff --git a/pkg/ruler/store_mock_test.go b/pkg/ruler/store_mock_test.go index 230aaa4410..34552ebe66 100644 --- a/pkg/ruler/store_mock_test.go +++ b/pkg/ruler/store_mock_test.go @@ -10,10 +10,19 @@ import ( "encoding/base64" "fmt" "sync" + "testing" "time" + "github.com/go-kit/log" + "github.com/grafana/dskit/cache" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" + "github.com/grafana/mimir/pkg/ruler/rulespb" "github.com/grafana/mimir/pkg/ruler/rulestore" + "github.com/grafana/mimir/pkg/ruler/rulestore/bucketclient" + "github.com/grafana/mimir/pkg/storage/tsdb/bucketcache" ) var ( @@ -41,6 +50,19 @@ var ( } ) +func newInMemoryRuleStore(t *testing.T) (*cache.InstrumentedMockCache, *bucketclient.BucketRuleStore) { + bkt := objstore.NewInMemBucket() + mockCache := cache.NewInstrumentedMockCache() + cfg := bucketcache.NewCachingBucketConfig() + cfg.CacheIter("iter", mockCache, isNotTenantsDir, time.Minute, &bucketcache.JSONIterCodec{}) + cfg.CacheGet("rules", mockCache, isRuleGroup, 1024^2, time.Minute, time.Minute, time.Minute, true) + + cachingBkt, err := bucketcache.NewCachingBucket("rules", bkt, cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry()) + require.NoError(t, err) + + return mockCache, bucketclient.NewBucketRuleStore(cachingBkt, nil, log.NewNopLogger()) +} + type mockRuleStore struct { rules map[string]rulespb.RuleGroupList missingRules rulespb.RuleGroupList @@ -61,7 +83,7 @@ func (m *mockRuleStore) setMissingRuleGroups(missing rulespb.RuleGroupList) { m.mtx.Unlock() } -func (m *mockRuleStore) ListAllUsers(_ context.Context) ([]string, error) { +func (m *mockRuleStore) ListAllUsers(_ context.Context, _ ...rulestore.Option) ([]string, error) { m.mtx.Lock() defer m.mtx.Unlock() @@ -72,7 +94,7 @@ func (m *mockRuleStore) ListAllUsers(_ context.Context) ([]string, error) { return result, nil } -func (m *mockRuleStore) ListRuleGroupsForUserAndNamespace(_ context.Context, userID, namespace string) (rulespb.RuleGroupList, error) { +func (m *mockRuleStore) ListRuleGroupsForUserAndNamespace(_ context.Context, userID, namespace string, _ ...rulestore.Option) (rulespb.RuleGroupList, error) { m.mtx.Lock() defer m.mtx.Unlock()