Skip to content

Commit

Permalink
Unify direct and caching RuleStores in ruler
Browse files Browse the repository at this point in the history
Instead of using two different `RuleStore` implementations within the Ruler,
use a single caching implementation and selectively disable caching when
required.

This change removes the "direct" `RuleStore` implementation from the Ruler's
gRPC and HTTP API layers. Instead, the caching implementation is used for all
calls. In cases where caching returning stale results would not be acceptable,
the caching is disabled _just_ for that call.

This allows rule group contents to be safety cached with the understanding
that it is safe to cache them because they will correctly invalidated when
deleted or modified.

Part of #9386

Signed-off-by: Nick Pillitteri <[email protected]>
  • Loading branch information
56quarters committed Oct 3, 2024
1 parent 42dd5e2 commit d6abaff
Show file tree
Hide file tree
Showing 15 changed files with 410 additions and 84 deletions.
4 changes: 2 additions & 2 deletions development/mimir-microservices-mode/docker-compose.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ std.manifestYamlDoc({

memcached:: {
memcached: {
image: 'memcached:1.6.19-alpine',
image: 'memcached:1.6.28-alpine',
ports: [
'11211:11211',
],
Expand All @@ -303,7 +303,7 @@ std.manifestYamlDoc({

memcached_exporter:: {
'memcached-exporter': {
image: 'prom/memcached-exporter:v0.6.0',
image: 'prom/memcached-exporter:v0.14.4',
command: ['--memcached.address=memcached:11211', '--web.listen-address=0.0.0.0:9150'],
},
},
Expand Down
4 changes: 2 additions & 2 deletions development/mimir-microservices-mode/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -275,14 +275,14 @@
"ports":
- "9900:9900"
"memcached":
"image": "memcached:1.6.19-alpine"
"image": "memcached:1.6.28-alpine"
"ports":
- "11211:11211"
"memcached-exporter":
"command":
- "--memcached.address=memcached:11211"
- "--web.listen-address=0.0.0.0:9150"
"image": "prom/memcached-exporter:v0.6.0"
"image": "prom/memcached-exporter:v0.14.4"
"minio":
"command":
- "server"
Expand Down
3 changes: 1 addition & 2 deletions pkg/mimir/mimir.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,8 +723,7 @@ type Mimir struct {
QueryFrontendTopicOffsetsReader *ingest.TopicOffsetsReader
QueryFrontendCodec querymiddleware.Codec
Ruler *ruler.Ruler
RulerDirectStorage rulestore.RuleStore
RulerCachedStorage rulestore.RuleStore
RulerStorage rulestore.RuleStore
Alertmanager *alertmanager.MultitenantAlertmanager
Compactor *compactor.MultitenantCompactor
StoreGateway *storegateway.StoreGateway
Expand Down
10 changes: 4 additions & 6 deletions pkg/mimir/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,13 +830,12 @@ func (t *Mimir) initRulerStorage() (serv services.Service, err error) {
// we do accept stale data for about a polling interval (2 intervals in the worst
// case scenario due to the jitter applied).
cacheTTL := t.Cfg.Ruler.PollInterval

t.RulerDirectStorage, t.RulerCachedStorage, err = ruler.NewRuleStore(context.Background(), t.Cfg.RulerStorage, t.Overrides, rules.FileLoader{}, cacheTTL, util_log.Logger, t.Registerer)
t.RulerStorage, err = ruler.NewRuleStore(context.Background(), t.Cfg.RulerStorage, t.Overrides, rules.FileLoader{}, cacheTTL, util_log.Logger, t.Registerer)
return
}

func (t *Mimir) initRuler() (serv services.Service, err error) {
if t.RulerDirectStorage == nil {
if t.RulerStorage == nil {
level.Info(util_log.Logger).Log("msg", "The ruler storage has not been configured. Not starting the ruler.")
return nil, nil
}
Expand Down Expand Up @@ -939,8 +938,7 @@ func (t *Mimir) initRuler() (serv services.Service, err error) {
manager,
t.Registerer,
util_log.Logger,
t.RulerDirectStorage,
t.RulerCachedStorage,
t.RulerStorage,
t.Overrides,
)
if err != nil {
Expand All @@ -951,7 +949,7 @@ func (t *Mimir) initRuler() (serv services.Service, err error) {
t.API.RegisterRuler(t.Ruler)

// Expose HTTP configuration and prometheus-compatible Ruler APIs
t.API.RegisterRulerAPI(ruler.NewAPI(t.Ruler, t.RulerDirectStorage, util_log.Logger), t.Cfg.Ruler.EnableAPI, t.BuildInfoHandler)
t.API.RegisterRulerAPI(ruler.NewAPI(t.Ruler, t.RulerStorage, util_log.Logger), t.Cfg.Ruler.EnableAPI, t.BuildInfoHandler)

return t.Ruler, nil
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/mimir/modules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,9 @@ func TestMimir_InitRulerStorage(t *testing.T) {
require.NoError(t, err)

if testData.expectedInit {
assert.NotNil(t, mimir.RulerDirectStorage)
assert.NotNil(t, mimir.RulerCachedStorage)
assert.NotNil(t, mimir.RulerStorage)
} else {
assert.Nil(t, mimir.RulerDirectStorage)
assert.Nil(t, mimir.RulerCachedStorage)
assert.Nil(t, mimir.RulerStorage)
}
})
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/ruler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,9 @@ func (a *API) ListRules(w http.ResponseWriter, req *http.Request) {
}

level.Debug(logger).Log("msg", "retrieving rule groups with namespace", "userID", userID, "namespace", namespace)
rgs, err := a.store.ListRuleGroupsForUserAndNamespace(ctx, userID, namespace)
// Disable any caching when getting list of all rule groups since listing results
// are cached and not invalidated and this API is expected to be strongly consistent.
rgs, err := a.store.ListRuleGroupsForUserAndNamespace(ctx, userID, namespace, rulestore.WithCacheDisabled())
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
Expand Down Expand Up @@ -606,7 +608,9 @@ func (a *API) CreateRuleGroup(w http.ResponseWriter, req *http.Request) {

// Only list rule groups when enforcing a max number of groups for this tenant and namespace.
if a.ruler.IsMaxRuleGroupsLimited(userID, namespace) {
rgs, err := a.store.ListRuleGroupsForUserAndNamespace(ctx, userID, "")
// Disable any caching when getting list of all rule groups since listing results
// are cached and not invalidated and we need the most up-to-date number.
rgs, err := a.store.ListRuleGroupsForUserAndNamespace(ctx, userID, "", rulestore.WithCacheDisabled())
if err != nil {
level.Error(logger).Log("msg", "unable to fetch current rule groups for validation", "err", err.Error(), "user", userID)
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand Down
106 changes: 96 additions & 10 deletions pkg/ruler/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func TestRuler_ListRules(t *testing.T) {
store.setMissingRuleGroups(tc.missingRules)

r := prepareRuler(t, cfg, store, withStart())
a := NewAPI(r, r.directStore, log.NewNopLogger())
a := NewAPI(r, r.store, log.NewNopLogger())

router := mux.NewRouter()
router.Path("/prometheus/config/v1/rules").Methods("GET").HandlerFunc(a.ListRules)
Expand Down Expand Up @@ -936,7 +936,7 @@ func TestRuler_PrometheusRules(t *testing.T) {
return len(rls.Groups)
})

a := NewAPI(r, r.directStore, log.NewNopLogger())
a := NewAPI(r, r.store, log.NewNopLogger())

req := requestFor(t, http.MethodGet, "https://localhost:8080/prometheus/api/v1/rules"+tc.queryParams, nil, userID)
w := httptest.NewRecorder()
Expand Down Expand Up @@ -993,7 +993,7 @@ func TestRuler_PrometheusAlerts(t *testing.T) {
return len(rls.Groups)
})

a := NewAPI(r, r.directStore, log.NewNopLogger())
a := NewAPI(r, r.store, log.NewNopLogger())

req := requestFor(t, http.MethodGet, "https://localhost:8080/prometheus/api/v1/alerts", nil, "user1")
w := httptest.NewRecorder()
Expand Down Expand Up @@ -1172,7 +1172,7 @@ rules:

reg := prometheus.NewPedanticRegistry()
r := prepareRuler(t, rulerCfg, newMockRuleStore(make(map[string]rulespb.RuleGroupList)), withStart(), withRulerAddrAutomaticMapping(), withPrometheusRegisterer(reg))
a := NewAPI(r, r.directStore, log.NewNopLogger())
a := NewAPI(r, r.store, log.NewNopLogger())

router := mux.NewRouter()
router.Path("/prometheus/config/v1/rules/{namespace}").Methods("POST").HandlerFunc(a.CreateRuleGroup)
Expand Down Expand Up @@ -1207,6 +1207,92 @@ rules:
}
}

func TestAPI_CreateRuleGroupWithCaching(t *testing.T) {
// Configure the ruler to only sync the rules based on notifications upon API changes.
cfg := defaultRulerConfig(t)
cfg.PollInterval = time.Hour
cfg.OutboundSyncQueuePollInterval = 100 * time.Millisecond
cfg.InboundSyncQueuePollInterval = 100 * time.Millisecond

const successResponse = `{"status":"success","data":null,"errorType":"","error":""}`

ruleGroupVersion1 := `name: group1
interval: 15s
rules:
- record: up_rule
expr: up
- alert: up_alert
expr: up < 1
`
ruleGroupVersion2 := `name: group1
interval: 15s
rules:
- record: up_rule
expr: up
- alert: up_alert
expr: up <= 1
`

mockCache, store := newInMemoryRuleStore(t)

reg := prometheus.NewPedanticRegistry()
// Set rule group limits since this performs a list call to count the current number of rule groups
// and we're testing if the API layer is correctly telling the rule store not to serve cached results.
r := prepareRuler(t, cfg, store, withStart(), withRulerAddrAutomaticMapping(), withPrometheusRegisterer(reg), withLimits(validation.MockOverrides(func(defaults *validation.Limits, _ map[string]*validation.Limits) {
defaults.RulerMaxRuleGroupsPerTenant = 2
defaults.RulerMaxRulesPerRuleGroup = 2
})))
a := NewAPI(r, r.store, log.NewNopLogger())

router := mux.NewRouter()
router.Path("/prometheus/config/v1/rules/{namespace}/{groupName}").Methods(http.MethodGet).HandlerFunc(a.GetRuleGroup)
router.Path("/prometheus/config/v1/rules/{namespace}").Methods(http.MethodPost).HandlerFunc(a.CreateRuleGroup)

// Pre-condition check: the ruler should have run the initial rules sync.
verifySyncRulesMetric(t, reg, 1, 0)

// Store the initial version of the rule group
req := requestFor(t, http.MethodPost, "https://localhost:8080/prometheus/config/v1/rules/namespace1", strings.NewReader(ruleGroupVersion1), "user1")
w := httptest.NewRecorder()
router.ServeHTTP(w, req)
assert.Equal(t, http.StatusAccepted, w.Code)
assert.Equal(t, successResponse, w.Body.String())
// Invalidation of exists and content
assert.Equal(t, 2, mockCache.CountDeleteCalls())

verifySyncRulesMetric(t, reg, 1, 1)

// Fetch it back and ensure the content is what we expect even though content can be cached
req = requestFor(t, http.MethodGet, "https://localhost:8080/prometheus/config/v1/rules/namespace1/group1", nil, "user1")
w = httptest.NewRecorder()
router.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
assert.Equal(t, ruleGroupVersion1, w.Body.String())
// Iter from initial sync, get, iter from sync
assert.Equal(t, 3, mockCache.CountFetchCalls())

// Store a new version of the group that is slightly different
req = requestFor(t, http.MethodPost, "https://localhost:8080/prometheus/config/v1/rules/namespace1", strings.NewReader(ruleGroupVersion2), "user1")
w = httptest.NewRecorder()
router.ServeHTTP(w, req)
assert.Equal(t, http.StatusAccepted, w.Code)
assert.Equal(t, successResponse, w.Body.String())
// Invalidating exists and content again
assert.Equal(t, 4, mockCache.CountDeleteCalls())

verifySyncRulesMetric(t, reg, 1, 2)

// Fetch it back and ensure content is updated to the new version meaning the cache was invalidated
req = requestFor(t, http.MethodGet, "https://localhost:8080/prometheus/config/v1/rules/namespace1/group1", nil, "user1")
w = httptest.NewRecorder()
router.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
assert.Equal(t, ruleGroupVersion2, w.Body.String())
// Iter from initial sync, get, iter from sync, another get, iter from sync
assert.Equal(t, 5, mockCache.CountFetchCalls())

}

func TestAPI_DeleteNamespace(t *testing.T) {
// Configure the ruler to only sync the rules based on notifications upon API changes.
cfg := defaultRulerConfig(t)
Expand Down Expand Up @@ -1237,7 +1323,7 @@ func TestAPI_DeleteNamespace(t *testing.T) {

reg := prometheus.NewPedanticRegistry()
r := prepareRuler(t, cfg, newMockRuleStore(mockRulesNamespaces), withStart(), withRulerAddrAutomaticMapping(), withPrometheusRegisterer(reg))
a := NewAPI(r, r.directStore, log.NewNopLogger())
a := NewAPI(r, r.store, log.NewNopLogger())

router := mux.NewRouter()
router.Path("/prometheus/config/v1/rules/{namespace}").Methods(http.MethodDelete).HandlerFunc(a.DeleteNamespace)
Expand Down Expand Up @@ -1294,7 +1380,7 @@ func TestAPI_DeleteRuleGroup(t *testing.T) {

reg := prometheus.NewPedanticRegistry()
r := prepareRuler(t, cfg, newMockRuleStore(mockRulesNamespaces), withStart(), withRulerAddrAutomaticMapping(), withPrometheusRegisterer(reg))
a := NewAPI(r, r.directStore, log.NewNopLogger())
a := NewAPI(r, r.store, log.NewNopLogger())

router := mux.NewRouter()
router.Path("/prometheus/config/v1/rules/{namespace}/{groupName}").Methods(http.MethodDelete).HandlerFunc(a.DeleteRuleGroup)
Expand Down Expand Up @@ -1336,7 +1422,7 @@ func TestRuler_LimitsPerGroup(t *testing.T) {
defaults.RulerMaxRulesPerRuleGroup = 1
})))

a := NewAPI(r, r.directStore, log.NewNopLogger())
a := NewAPI(r, r.store, log.NewNopLogger())

tc := []struct {
name string
Expand Down Expand Up @@ -1389,7 +1475,7 @@ func TestRuler_RulerGroupLimits(t *testing.T) {
defaults.RulerMaxRulesPerRuleGroup = 1
})))

a := NewAPI(r, r.directStore, log.NewNopLogger())
a := NewAPI(r, r.store, log.NewNopLogger())

tc := []struct {
name string
Expand Down Expand Up @@ -1449,7 +1535,7 @@ func TestRuler_RulerGroupLimitsDisabled(t *testing.T) {
defaults.RulerMaxRulesPerRuleGroup = 0
})))

a := NewAPI(r, r.directStore, log.NewNopLogger())
a := NewAPI(r, r.store, log.NewNopLogger())

tc := []struct {
name string
Expand Down Expand Up @@ -1551,7 +1637,7 @@ func TestAPIRoutesCorrectlyHandleInvalidOrgID(t *testing.T) {

r := prepareRuler(t, cfg, newMockRuleStore(map[string]rulespb.RuleGroupList{}), withStart())

a := NewAPI(r, r.directStore, log.NewNopLogger())
a := NewAPI(r, r.store, log.NewNopLogger())

router := mux.NewRouter()
router.Path("/api/v1/rules").Methods(http.MethodGet).HandlerFunc(a.PrometheusRules)
Expand Down
Loading

0 comments on commit d6abaff

Please sign in to comment.