Skip to content

Commit

Permalink
Unify direct and caching RuleStores in ruler
Browse files Browse the repository at this point in the history
Instead of using two different `RuleStore` implementations within the Ruler,
use a single caching implementation and selectively disable caching when
required.

This change removes the "direct" `RuleStore` implementation from the Ruler's
gRPC and HTTP API layers. Instead, the caching implementation is used for all
calls. In cases where caching returning stale results would not be acceptable,
the caching is disabled _just_ for that call.

This allows rule group contents to be safety cached with the understanding
that it is safe to cache them because they will correctly invalidated when
deleted or modified.

Part of #9386
  • Loading branch information
56quarters committed Sep 26, 2024
1 parent 38dcd7a commit f7039db
Show file tree
Hide file tree
Showing 14 changed files with 144 additions and 84 deletions.
4 changes: 2 additions & 2 deletions development/mimir-microservices-mode/docker-compose.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ std.manifestYamlDoc({

memcached:: {
memcached: {
image: 'memcached:1.6.19-alpine',
image: 'memcached:1.6.28-alpine',
ports: [
'11211:11211',
],
Expand All @@ -303,7 +303,7 @@ std.manifestYamlDoc({

memcached_exporter:: {
'memcached-exporter': {
image: 'prom/memcached-exporter:v0.6.0',
image: 'prom/memcached-exporter:v0.14.4',
command: ['--memcached.address=memcached:11211', '--web.listen-address=0.0.0.0:9150'],
},
},
Expand Down
4 changes: 2 additions & 2 deletions development/mimir-microservices-mode/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -275,14 +275,14 @@
"ports":
- "9900:9900"
"memcached":
"image": "memcached:1.6.19-alpine"
"image": "memcached:1.6.28-alpine"
"ports":
- "11211:11211"
"memcached-exporter":
"command":
- "--memcached.address=memcached:11211"
- "--web.listen-address=0.0.0.0:9150"
"image": "prom/memcached-exporter:v0.6.0"
"image": "prom/memcached-exporter:v0.14.4"
"minio":
"command":
- "server"
Expand Down
3 changes: 1 addition & 2 deletions pkg/mimir/mimir.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,8 +723,7 @@ type Mimir struct {
QueryFrontendTopicOffsetsReader *ingest.TopicOffsetsReader
QueryFrontendCodec querymiddleware.Codec
Ruler *ruler.Ruler
RulerDirectStorage rulestore.RuleStore
RulerCachedStorage rulestore.RuleStore
RulerStorage rulestore.RuleStore
Alertmanager *alertmanager.MultitenantAlertmanager
Compactor *compactor.MultitenantCompactor
StoreGateway *storegateway.StoreGateway
Expand Down
10 changes: 4 additions & 6 deletions pkg/mimir/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,13 +830,12 @@ func (t *Mimir) initRulerStorage() (serv services.Service, err error) {
// we do accept stale data for about a polling interval (2 intervals in the worst
// case scenario due to the jitter applied).
cacheTTL := t.Cfg.Ruler.PollInterval

t.RulerDirectStorage, t.RulerCachedStorage, err = ruler.NewRuleStore(context.Background(), t.Cfg.RulerStorage, t.Overrides, rules.FileLoader{}, cacheTTL, util_log.Logger, t.Registerer)
t.RulerStorage, err = ruler.NewRuleStore(context.Background(), t.Cfg.RulerStorage, t.Overrides, rules.FileLoader{}, cacheTTL, util_log.Logger, t.Registerer)
return
}

func (t *Mimir) initRuler() (serv services.Service, err error) {
if t.RulerDirectStorage == nil {
if t.RulerStorage == nil {
level.Info(util_log.Logger).Log("msg", "The ruler storage has not been configured. Not starting the ruler.")
return nil, nil
}
Expand Down Expand Up @@ -939,8 +938,7 @@ func (t *Mimir) initRuler() (serv services.Service, err error) {
manager,
t.Registerer,
util_log.Logger,
t.RulerDirectStorage,
t.RulerCachedStorage,
t.RulerStorage,
t.Overrides,
)
if err != nil {
Expand All @@ -951,7 +949,7 @@ func (t *Mimir) initRuler() (serv services.Service, err error) {
t.API.RegisterRuler(t.Ruler)

// Expose HTTP configuration and prometheus-compatible Ruler APIs
t.API.RegisterRulerAPI(ruler.NewAPI(t.Ruler, t.RulerDirectStorage, util_log.Logger), t.Cfg.Ruler.EnableAPI, t.BuildInfoHandler)
t.API.RegisterRulerAPI(ruler.NewAPI(t.Ruler, t.RulerStorage, util_log.Logger), t.Cfg.Ruler.EnableAPI, t.BuildInfoHandler)

return t.Ruler, nil
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/mimir/modules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,9 @@ func TestMimir_InitRulerStorage(t *testing.T) {
require.NoError(t, err)

if testData.expectedInit {
assert.NotNil(t, mimir.RulerDirectStorage)
assert.NotNil(t, mimir.RulerCachedStorage)
assert.NotNil(t, mimir.RulerStorage)
} else {
assert.Nil(t, mimir.RulerDirectStorage)
assert.Nil(t, mimir.RulerCachedStorage)
assert.Nil(t, mimir.RulerStorage)
}
})
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/ruler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,9 @@ func (a *API) ListRules(w http.ResponseWriter, req *http.Request) {
}

level.Debug(logger).Log("msg", "retrieving rule groups with namespace", "userID", userID, "namespace", namespace)
rgs, err := a.store.ListRuleGroupsForUserAndNamespace(ctx, userID, namespace)
// Disable any caching when getting list of all rule groups since listing results
// are cached and not invalidated and this API is expected to be strongly consistent.
rgs, err := a.store.ListRuleGroupsForUserAndNamespace(ctx, userID, namespace, rulestore.WithCacheDisabled())
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
Expand Down Expand Up @@ -606,7 +608,9 @@ func (a *API) CreateRuleGroup(w http.ResponseWriter, req *http.Request) {

// Only list rule groups when enforcing a max number of groups for this tenant and namespace.
if a.ruler.IsMaxRuleGroupsLimited(userID, namespace) {
rgs, err := a.store.ListRuleGroupsForUserAndNamespace(ctx, userID, "")
// Disable any caching when getting list of all rule groups since listing results
// are cached and not invalidated and we need the most up-to-date number.
rgs, err := a.store.ListRuleGroupsForUserAndNamespace(ctx, userID, "", rulestore.WithCacheDisabled())
if err != nil {
level.Error(logger).Log("msg", "unable to fetch current rule groups for validation", "err", err.Error(), "user", userID)
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand Down
20 changes: 10 additions & 10 deletions pkg/ruler/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func TestRuler_ListRules(t *testing.T) {
store.setMissingRuleGroups(tc.missingRules)

r := prepareRuler(t, cfg, store, withStart())
a := NewAPI(r, r.directStore, log.NewNopLogger())
a := NewAPI(r, r.store, log.NewNopLogger())

router := mux.NewRouter()
router.Path("/prometheus/config/v1/rules").Methods("GET").HandlerFunc(a.ListRules)
Expand Down Expand Up @@ -936,7 +936,7 @@ func TestRuler_PrometheusRules(t *testing.T) {
return len(rls.Groups)
})

a := NewAPI(r, r.directStore, log.NewNopLogger())
a := NewAPI(r, r.store, log.NewNopLogger())

req := requestFor(t, http.MethodGet, "https://localhost:8080/prometheus/api/v1/rules"+tc.queryParams, nil, userID)
w := httptest.NewRecorder()
Expand Down Expand Up @@ -993,7 +993,7 @@ func TestRuler_PrometheusAlerts(t *testing.T) {
return len(rls.Groups)
})

a := NewAPI(r, r.directStore, log.NewNopLogger())
a := NewAPI(r, r.store, log.NewNopLogger())

req := requestFor(t, http.MethodGet, "https://localhost:8080/prometheus/api/v1/alerts", nil, "user1")
w := httptest.NewRecorder()
Expand Down Expand Up @@ -1172,7 +1172,7 @@ rules:

reg := prometheus.NewPedanticRegistry()
r := prepareRuler(t, rulerCfg, newMockRuleStore(make(map[string]rulespb.RuleGroupList)), withStart(), withRulerAddrAutomaticMapping(), withPrometheusRegisterer(reg))
a := NewAPI(r, r.directStore, log.NewNopLogger())
a := NewAPI(r, r.store, log.NewNopLogger())

router := mux.NewRouter()
router.Path("/prometheus/config/v1/rules/{namespace}").Methods("POST").HandlerFunc(a.CreateRuleGroup)
Expand Down Expand Up @@ -1237,7 +1237,7 @@ func TestAPI_DeleteNamespace(t *testing.T) {

reg := prometheus.NewPedanticRegistry()
r := prepareRuler(t, cfg, newMockRuleStore(mockRulesNamespaces), withStart(), withRulerAddrAutomaticMapping(), withPrometheusRegisterer(reg))
a := NewAPI(r, r.directStore, log.NewNopLogger())
a := NewAPI(r, r.store, log.NewNopLogger())

router := mux.NewRouter()
router.Path("/prometheus/config/v1/rules/{namespace}").Methods(http.MethodDelete).HandlerFunc(a.DeleteNamespace)
Expand Down Expand Up @@ -1294,7 +1294,7 @@ func TestAPI_DeleteRuleGroup(t *testing.T) {

reg := prometheus.NewPedanticRegistry()
r := prepareRuler(t, cfg, newMockRuleStore(mockRulesNamespaces), withStart(), withRulerAddrAutomaticMapping(), withPrometheusRegisterer(reg))
a := NewAPI(r, r.directStore, log.NewNopLogger())
a := NewAPI(r, r.store, log.NewNopLogger())

router := mux.NewRouter()
router.Path("/prometheus/config/v1/rules/{namespace}/{groupName}").Methods(http.MethodDelete).HandlerFunc(a.DeleteRuleGroup)
Expand Down Expand Up @@ -1336,7 +1336,7 @@ func TestRuler_LimitsPerGroup(t *testing.T) {
defaults.RulerMaxRulesPerRuleGroup = 1
})))

a := NewAPI(r, r.directStore, log.NewNopLogger())
a := NewAPI(r, r.store, log.NewNopLogger())

tc := []struct {
name string
Expand Down Expand Up @@ -1389,7 +1389,7 @@ func TestRuler_RulerGroupLimits(t *testing.T) {
defaults.RulerMaxRulesPerRuleGroup = 1
})))

a := NewAPI(r, r.directStore, log.NewNopLogger())
a := NewAPI(r, r.store, log.NewNopLogger())

tc := []struct {
name string
Expand Down Expand Up @@ -1449,7 +1449,7 @@ func TestRuler_RulerGroupLimitsDisabled(t *testing.T) {
defaults.RulerMaxRulesPerRuleGroup = 0
})))

a := NewAPI(r, r.directStore, log.NewNopLogger())
a := NewAPI(r, r.store, log.NewNopLogger())

tc := []struct {
name string
Expand Down Expand Up @@ -1551,7 +1551,7 @@ func TestAPIRoutesCorrectlyHandleInvalidOrgID(t *testing.T) {

r := prepareRuler(t, cfg, newMockRuleStore(map[string]rulespb.RuleGroupList{}), withStart())

a := NewAPI(r, r.directStore, log.NewNopLogger())
a := NewAPI(r, r.store, log.NewNopLogger())

router := mux.NewRouter()
router.Path("/api/v1/rules").Methods(http.MethodGet).HandlerFunc(a.PrometheusRules)
Expand Down
56 changes: 30 additions & 26 deletions pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/ruler/rulespb"
"github.com/grafana/mimir/pkg/ruler/rulestore"
"github.com/grafana/mimir/pkg/storage/tsdb/bucketcache"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/grpcencoding/s2"
util_log "github.com/grafana/mimir/pkg/util/log"
Expand Down Expand Up @@ -314,13 +313,12 @@ type MultiTenantManager interface {
type Ruler struct {
services.Service

cfg Config
lifecycler *ring.BasicLifecycler
ring *ring.Ring
directStore rulestore.RuleStore
cachedStore rulestore.RuleStore
manager MultiTenantManager
limits RulesLimits
cfg Config
lifecycler *ring.BasicLifecycler
ring *ring.Ring
store rulestore.RuleStore
manager MultiTenantManager
limits RulesLimits

metrics *rulerMetrics

Expand All @@ -346,20 +344,14 @@ type Ruler struct {
}

// NewRuler creates a new ruler from a distributor and chunk store.
func NewRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, logger log.Logger, directStore, cachedStore rulestore.RuleStore, limits RulesLimits) (*Ruler, error) {
// If the cached store is not configured, just fallback to the direct one.
if cachedStore == nil {
cachedStore = directStore
}

return newRuler(cfg, manager, reg, logger, directStore, cachedStore, limits, newRulerClientPool(cfg.ClientTLSConfig, logger, reg))
func NewRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, logger log.Logger, store rulestore.RuleStore, limits RulesLimits) (*Ruler, error) {
return newRuler(cfg, manager, reg, logger, store, limits, newRulerClientPool(cfg.ClientTLSConfig, logger, reg))
}

func newRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, logger log.Logger, directStore, cachedStore rulestore.RuleStore, limits RulesLimits, clientPool ClientsPool) (*Ruler, error) {
func newRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, logger log.Logger, store rulestore.RuleStore, limits RulesLimits, clientPool ClientsPool) (*Ruler, error) {
ruler := &Ruler{
cfg: cfg,
directStore: directStore,
cachedStore: cachedStore,
store: store,
manager: manager,
registry: reg,
logger: logger,
Expand Down Expand Up @@ -633,7 +625,7 @@ func (r *Ruler) syncRules(ctx context.Context, userIDs []string, reason rulesSyn
func (r *Ruler) loadRuleGroupsToSync(ctx context.Context, configs map[string]rulespb.RuleGroupList) (map[string]rulespb.RuleGroupList, error) {
// Load rule groups.
start := time.Now()
missing, err := r.directStore.LoadRuleGroups(ctx, configs)
missing, err := r.store.LoadRuleGroups(ctx, configs)
r.metrics.loadRuleGroups.Observe(time.Since(start).Seconds())

if err != nil {
Expand All @@ -660,7 +652,12 @@ func (r *Ruler) listRuleGroupsToSyncForAllUsers(ctx context.Context, reason rule

// In order to reduce API calls to the object storage among all ruler replicas,
// we support lookup of stale data for a short period.
users, err := r.cachedStore.ListAllUsers(bucketcache.WithCacheLookupEnabled(ctx, cacheLookupEnabled))
var opts []rulestore.Option
if !cacheLookupEnabled {
opts = append(opts, rulestore.WithCacheDisabled())
}

users, err := r.store.ListAllUsers(ctx, opts...)
if err != nil {
return nil, errors.Wrap(err, "unable to list users of ruler")
}
Expand Down Expand Up @@ -711,11 +708,16 @@ func (r *Ruler) listRuleGroupsToSyncForUsers(ctx context.Context, userIDs []stri
concurrency = len(userRings)
}

var opts []rulestore.Option
if !cacheLookupEnabled {
opts = append(opts, rulestore.WithCacheDisabled())
}

g, gctx := errgroup.WithContext(ctx)
for i := 0; i < concurrency; i++ {
g.Go(func() error {
for userID := range userCh {
groups, err := r.cachedStore.ListRuleGroupsForUserAndNamespace(bucketcache.WithCacheLookupEnabled(gctx, cacheLookupEnabled), userID, "")
groups, err := r.store.ListRuleGroupsForUserAndNamespace(gctx, userID, "", opts...)
if err != nil {
return errors.Wrapf(err, "failed to fetch rule groups for user %s", userID)
}
Expand Down Expand Up @@ -1224,7 +1226,7 @@ func (r *Ruler) DeleteTenantConfiguration(w http.ResponseWriter, req *http.Reque
return
}

err = r.directStore.DeleteNamespace(req.Context(), userID, "") // Empty namespace = delete all rule groups.
err = r.store.DeleteNamespace(req.Context(), userID, "") // Empty namespace = delete all rule groups.
if err != nil && !errors.Is(err, rulestore.ErrGroupNamespaceNotFound) {
respondServerError(logger, w, err.Error())
return
Expand All @@ -1238,8 +1240,8 @@ func (r *Ruler) DeleteTenantConfiguration(w http.ResponseWriter, req *http.Reque

func (r *Ruler) ListAllRules(w http.ResponseWriter, req *http.Request) {
logger := util_log.WithContext(req.Context(), r.logger)

userIDs, err := r.directStore.ListAllUsers(req.Context())
// Disable caching when getting a list of users since this API is expected to be strongly consistent.
userIDs, err := r.store.ListAllUsers(req.Context(), rulestore.WithCacheDisabled())
if err != nil {
level.Error(logger).Log("msg", errListAllUser, "err", err)
http.Error(w, fmt.Sprintf("%s: %s", errListAllUser, err.Error()), http.StatusInternalServerError)
Expand All @@ -1255,12 +1257,14 @@ func (r *Ruler) ListAllRules(w http.ResponseWriter, req *http.Request) {
}()

err = concurrency.ForEachUser(req.Context(), userIDs, fetchRulesConcurrency, func(ctx context.Context, userID string) error {
rg, err := r.directStore.ListRuleGroupsForUserAndNamespace(ctx, userID, "")
// Disable any caching when getting list of all rule groups since listing results
// are cached and not invalidated and this API is expected to be strongly consistent.
rg, err := r.store.ListRuleGroupsForUserAndNamespace(ctx, userID, "", rulestore.WithCacheDisabled())
if err != nil {
return errors.Wrapf(err, "failed to fetch ruler config for user %s", userID)
}
userRules := map[string]rulespb.RuleGroupList{userID: rg}
if missing, err := r.directStore.LoadRuleGroups(ctx, userRules); err != nil {
if missing, err := r.store.LoadRuleGroups(ctx, userRules); err != nil {
return errors.Wrapf(err, "failed to load ruler config for user %s", userID)
} else if len(missing) > 0 {
// This API is expected to be strongly consistent, so it's an error if any rule group was missing.
Expand Down
8 changes: 4 additions & 4 deletions pkg/ruler/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ import (
"github.com/grafana/mimir/pkg/storage/bucket"
"github.com/grafana/mimir/pkg/storage/bucket/filesystem"
"github.com/grafana/mimir/pkg/util"
util_test "github.com/grafana/mimir/pkg/util/test"
utiltest "github.com/grafana/mimir/pkg/util/test"
"github.com/grafana/mimir/pkg/util/validation"
)

func TestMain(m *testing.M) {
util_test.VerifyNoLeakTestMain(m)
utiltest.VerifyNoLeakTestMain(m)
}

func defaultRulerConfig(t testing.TB) Config {
Expand Down Expand Up @@ -210,7 +210,7 @@ func prepareRuler(t *testing.T, cfg Config, storage rulestore.RuleStore, opts ..
options := applyPrepareOptions(t, cfg.Ring.Common.InstanceID, opts...)
manager := prepareRulerManager(t, cfg, opts...)

ruler, err := newRuler(cfg, manager, options.registerer, options.logger, storage, storage, options.limits, newMockClientsPool(cfg, options.logger, options.registerer, options.rulerAddrMap))
ruler, err := newRuler(cfg, manager, options.registerer, options.logger, storage, options.limits, newMockClientsPool(cfg, options.logger, options.registerer, options.rulerAddrMap))
require.NoError(t, err)

if options.rulerAddrAutoMap {
Expand Down Expand Up @@ -1571,7 +1571,7 @@ func verifyExpectedDeletedRuleGroupsForUser(t *testing.T, r *Ruler, userID strin
ctx := context.Background()

t.Run("ListRuleGroupsForUserAndNamespace()", func(t *testing.T) {
list, err := r.directStore.ListRuleGroupsForUserAndNamespace(ctx, userID, "")
list, err := r.store.ListRuleGroupsForUserAndNamespace(ctx, userID, "")
require.NoError(t, err)

if expectedDeleted {
Expand Down
Loading

0 comments on commit f7039db

Please sign in to comment.