From 6a49e3b95378a9d9b554c8140165c6dbb9bd5d9f Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Fri, 15 Sep 2023 03:30:54 -0700 Subject: [PATCH] Fail to load singe rulegroup cause rulers to halt the sync of all rulegroups. (#5563) * Not fail to load remaining rules when a single ruler fail Signed-off-by: Alan Protasio * changelog Signed-off-by: Alan Protasio * comments Signed-off-by: Alan Protasio --------- Signed-off-by: Alan Protasio --- CHANGELOG.md | 1 + pkg/ruler/api.go | 2 +- pkg/ruler/api_test.go | 18 +-- pkg/ruler/lifecycle_test.go | 8 +- pkg/ruler/ruler.go | 9 +- pkg/ruler/ruler_test.go | 106 +++++++++++++++--- .../rulestore/bucketclient/bucket_client.go | 36 +++++- .../bucketclient/bucket_client_test.go | 38 ++++++- pkg/ruler/rulestore/configdb/store.go | 4 +- pkg/ruler/rulestore/local/local.go | 4 +- pkg/ruler/rulestore/store.go | 7 +- pkg/ruler/store_mock_test.go | 23 ++-- 12 files changed, 201 insertions(+), 55 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8f212edcc7..c5a0b8b039 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -81,6 +81,7 @@ * [BUGFIX] Query Frontend: Handle context error before decoding and merging responses. #5499 * [BUGFIX] DDBKV: When no change detected in ring, retry the CAS until there is change. #5502 * [BUGFIX] Fix bug on objstore when configured to use S3 fips endpoints. #5540 +* [BUGFIX] Ruler: Fix bug on ruler where a failure to load a single RuleGroup would prevent rulers to sync all RuleGroup. #5563 ## 1.15.1 2023-04-26 diff --git a/pkg/ruler/api.go b/pkg/ruler/api.go index ecbbc94d58..48debd328a 100644 --- a/pkg/ruler/api.go +++ b/pkg/ruler/api.go @@ -450,7 +450,7 @@ func (a *API) ListRules(w http.ResponseWriter, req *http.Request) { return } - err = a.store.LoadRuleGroups(req.Context(), map[string]rulespb.RuleGroupList{userID: rgs}) + _, err = a.store.LoadRuleGroups(req.Context(), map[string]rulespb.RuleGroupList{userID: rgs}) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return diff --git a/pkg/ruler/api_test.go b/pkg/ruler/api_test.go index 24fbe0b832..0edc67450c 100644 --- a/pkg/ruler/api_test.go +++ b/pkg/ruler/api_test.go @@ -20,7 +20,7 @@ import ( ) func TestRuler_rules(t *testing.T) { - store := newMockRuleStore(mockRules) + store := newMockRuleStore(mockRules, nil) cfg := defaultRulerConfig(t) r := newTestRuler(t, cfg, store, nil) @@ -76,7 +76,7 @@ func TestRuler_rules(t *testing.T) { } func TestRuler_rules_special_characters(t *testing.T) { - store := newMockRuleStore(mockSpecialCharRules) + store := newMockRuleStore(mockSpecialCharRules, nil) cfg := defaultRulerConfig(t) r := newTestRuler(t, cfg, store, nil) @@ -131,7 +131,7 @@ func TestRuler_rules_special_characters(t *testing.T) { } func TestRuler_rules_limit(t *testing.T) { - store := newMockRuleStore(mockRulesLimit) + store := newMockRuleStore(mockRulesLimit, nil) cfg := defaultRulerConfig(t) r := newTestRuler(t, cfg, store, nil) @@ -185,7 +185,7 @@ func TestRuler_rules_limit(t *testing.T) { require.Equal(t, string(expectedResponse), string(body)) } func TestRuler_alerts(t *testing.T) { - store := newMockRuleStore(mockRules) + store := newMockRuleStore(mockRules, nil) cfg := defaultRulerConfig(t) r := newTestRuler(t, cfg, store, nil) @@ -220,7 +220,7 @@ func TestRuler_alerts(t *testing.T) { } func TestRuler_Create(t *testing.T) { - store := newMockRuleStore(make(map[string]rulespb.RuleGroupList)) + store := newMockRuleStore(make(map[string]rulespb.RuleGroupList), nil) cfg := defaultRulerConfig(t) r := newTestRuler(t, cfg, store, nil) @@ -310,7 +310,7 @@ rules: } func TestRuler_DeleteNamespace(t *testing.T) { - store := newMockRuleStore(mockRulesNamespaces) + store := newMockRuleStore(mockRulesNamespaces, nil) cfg := defaultRulerConfig(t) r := newTestRuler(t, cfg, store, nil) @@ -348,7 +348,7 @@ func TestRuler_DeleteNamespace(t *testing.T) { } func TestRuler_LimitsPerGroup(t *testing.T) { - store := newMockRuleStore(make(map[string]rulespb.RuleGroupList)) + store := newMockRuleStore(make(map[string]rulespb.RuleGroupList), nil) cfg := defaultRulerConfig(t) r := newTestRuler(t, cfg, store, nil) @@ -402,7 +402,7 @@ rules: } func TestRuler_RulerGroupLimits(t *testing.T) { - store := newMockRuleStore(make(map[string]rulespb.RuleGroupList)) + store := newMockRuleStore(make(map[string]rulespb.RuleGroupList), nil) cfg := defaultRulerConfig(t) r := newTestRuler(t, cfg, store, nil) @@ -463,7 +463,7 @@ rules: } func TestRuler_ProtoToRuleGroupYamlConvertion(t *testing.T) { - store := newMockRuleStore(make(map[string]rulespb.RuleGroupList)) + store := newMockRuleStore(make(map[string]rulespb.RuleGroupList), nil) cfg := defaultRulerConfig(t) r := newTestRuler(t, cfg, store, nil) diff --git a/pkg/ruler/lifecycle_test.go b/pkg/ruler/lifecycle_test.go index d4a062df1b..b9494264d6 100644 --- a/pkg/ruler/lifecycle_test.go +++ b/pkg/ruler/lifecycle_test.go @@ -20,10 +20,10 @@ import ( func TestRulerShutdown(t *testing.T) { ctx := context.Background() - store := newMockRuleStore(mockRules) + store := newMockRuleStore(mockRules, nil) config := defaultRulerConfig(t) - r := buildRuler(t, config, nil, store, nil) + r, _ := buildRuler(t, config, nil, store, nil) r.cfg.EnableSharding = true ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) @@ -55,9 +55,9 @@ func TestRuler_RingLifecyclerShouldAutoForgetUnhealthyInstances(t *testing.T) { const heartbeatTimeout = time.Minute ctx := context.Background() - store := newMockRuleStore(mockRules) + store := newMockRuleStore(mockRules, nil) config := defaultRulerConfig(t) - r := buildRuler(t, config, nil, store, nil) + r, _ := buildRuler(t, config, nil, store, nil) r.cfg.EnableSharding = true r.cfg.Ring.HeartbeatPeriod = 100 * time.Millisecond r.cfg.Ring.HeartbeatTimeout = heartbeatTimeout diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index e90c9b7e2c..b676913b78 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -519,14 +519,13 @@ func (r *Ruler) syncRules(ctx context.Context, reason string) { return } - err = r.store.LoadRuleGroups(ctx, configs) + loadedConfigs, err := r.store.LoadRuleGroups(ctx, configs) if err != nil { - level.Error(r.logger).Log("msg", "unable to load rules owned by this ruler", "err", err) - return + level.Warn(r.logger).Log("msg", "failed to load some rules owned by this ruler", "count", len(configs)-len(loadedConfigs), "err", err) } // This will also delete local group files for users that are no longer in 'configs' map. - r.manager.SyncRuleGroups(ctx, configs) + r.manager.SyncRuleGroups(ctx, loadedConfigs) } func (r *Ruler) listRules(ctx context.Context) (result map[string]rulespb.RuleGroupList, err error) { @@ -983,7 +982,7 @@ func (r *Ruler) ListAllRules(w http.ResponseWriter, req *http.Request) { return errors.Wrapf(err, "failed to fetch ruler config for user %s", userID) } userRules := map[string]rulespb.RuleGroupList{userID: rg} - if err := r.store.LoadRuleGroups(ctx, userRules); err != nil { + if userRules, err = r.store.LoadRuleGroups(ctx, userRules); err != nil { return errors.Wrapf(err, "failed to load ruler config for user %s", userID) } data := map[string]map[string][]rulefmt.RuleGroup{userID: userRules[userID].Formatted()} diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index 4d89b9d693..32734cf751 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -7,6 +7,7 @@ import ( "math/rand" "net/http" "net/http/httptest" + "net/url" "os" "reflect" "sort" @@ -18,8 +19,6 @@ import ( "github.com/thanos-io/objstore" - "github.com/cortexproject/cortex/pkg/ruler/rulestore/bucketclient" - "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/gorilla/mux" @@ -48,10 +47,12 @@ import ( "github.com/cortexproject/cortex/pkg/ring/kv/consul" "github.com/cortexproject/cortex/pkg/ruler/rulespb" "github.com/cortexproject/cortex/pkg/ruler/rulestore" + "github.com/cortexproject/cortex/pkg/ruler/rulestore/bucketclient" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/services" + "github.com/cortexproject/cortex/pkg/util/test" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -218,7 +219,7 @@ func newMockClientsPool(cfg Config, logger log.Logger, reg prometheus.Registerer } } -func buildRuler(t *testing.T, rulerConfig Config, querierTestConfig *querier.TestConfig, store rulestore.RuleStore, rulerAddrMap map[string]*Ruler) *Ruler { +func buildRuler(t *testing.T, rulerConfig Config, querierTestConfig *querier.TestConfig, store rulestore.RuleStore, rulerAddrMap map[string]*Ruler) (*Ruler, *DefaultMultiTenantManager) { engine, queryable, pusher, logger, overrides, reg := testSetup(t, querierTestConfig) managerFactory := DefaultTenantManagerFactory(rulerConfig, pusher, queryable, engine, overrides, reg) @@ -235,11 +236,11 @@ func buildRuler(t *testing.T, rulerConfig Config, querierTestConfig *querier.Tes newMockClientsPool(rulerConfig, logger, reg, rulerAddrMap), ) require.NoError(t, err) - return ruler + return ruler, manager } func newTestRuler(t *testing.T, rulerConfig Config, store rulestore.RuleStore, querierTestConfig *querier.TestConfig) *Ruler { - ruler := buildRuler(t, rulerConfig, querierTestConfig, store, nil) + ruler, _ := buildRuler(t, rulerConfig, querierTestConfig, store, nil) require.NoError(t, services.StartAndAwaitRunning(context.Background(), ruler)) // Ensure all rules are loaded before usage @@ -293,7 +294,7 @@ func TestNotifierSendsUserIDHeader(t *testing.T) { } func TestRuler_Rules(t *testing.T) { - store := newMockRuleStore(mockRules) + store := newMockRuleStore(mockRules, nil) cfg := defaultRulerConfig(t) r := newTestRuler(t, cfg, store, nil) @@ -552,7 +553,7 @@ func TestGetRules(t *testing.T) { rulerAddrMap := map[string]*Ruler{} createRuler := func(id string) *Ruler { - store := newMockRuleStore(allRulesByUser) + store := newMockRuleStore(allRulesByUser, nil) cfg := defaultRulerConfig(t) cfg.ShardingStrategy = tc.shardingStrategy @@ -566,7 +567,7 @@ func TestGetRules(t *testing.T) { }, } - r := buildRuler(t, cfg, nil, store, rulerAddrMap) + r, _ := buildRuler(t, cfg, nil, store, rulerAddrMap) r.limits = ruleLimits{evalDelay: 0, tenantShard: tc.shuffleShardSize} rulerAddrMap[id] = r if r.ring != nil { @@ -1053,7 +1054,7 @@ func TestSharding(t *testing.T) { t.Cleanup(func() { assert.NoError(t, closer.Close()) }) setupRuler := func(id string, host string, port int, forceRing *ring.Ring) *Ruler { - store := newMockRuleStore(allRules) + store := newMockRuleStore(allRules, nil) cfg := Config{ EnableSharding: tc.sharding, ShardingStrategy: tc.shardingStrategy, @@ -1071,7 +1072,7 @@ func TestSharding(t *testing.T) { DisabledTenants: tc.disabledUsers, } - r := buildRuler(t, cfg, nil, store, nil) + r, _ := buildRuler(t, cfg, nil, store, nil) r.limits = ruleLimits{evalDelay: 0, tenantShard: tc.shuffleShardSize} if forceRing != nil { @@ -1159,6 +1160,81 @@ func sortTokens(tokens []uint32) []uint32 { return tokens } +func Test_LoadPartialGroups(t *testing.T) { + const ( + user1 = "user1" + user2 = "user2" + user3 = "user3" + ) + + const ( + ruler1 = "ruler-1" + ruler1Host = "1.1.1.1" + ruler1Port = 9999 + ) + + user1Group1 := &rulespb.RuleGroupDesc{User: user1, Namespace: "namespace", Name: "first", Interval: time.Minute} + user1Group2 := &rulespb.RuleGroupDesc{User: user1, Namespace: "namespace", Name: "second", Interval: time.Minute} + user2Group1 := &rulespb.RuleGroupDesc{User: user2, Namespace: "namespace", Name: "first", Interval: time.Minute} + user3Group1 := &rulespb.RuleGroupDesc{User: user3, Namespace: "namespace", Name: "first", Interval: time.Minute} + + allRules := map[string]rulespb.RuleGroupList{ + user1: {user1Group1, user1Group2}, + user2: {user2Group1}, + user3: {user3Group1}, + } + + kvStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + + store := newMockRuleStore(allRules, map[string]error{user1: fmt.Errorf("test")}) + u, _ := url.Parse("") + cfg := Config{ + EnableSharding: true, + ExternalURL: flagext.URLValue{URL: u}, + PollInterval: time.Millisecond * 100, + RingCheckPeriod: time.Minute, + ShardingStrategy: util.ShardingStrategyShuffle, + Ring: RingConfig{ + InstanceID: ruler1, + InstanceAddr: ruler1Host, + InstancePort: ruler1Port, + KVStore: kv.Config{ + Mock: kvStore, + }, + HeartbeatTimeout: 1 * time.Minute, + }, + FlushCheckPeriod: 0, + } + + r1, manager := buildRuler(t, cfg, nil, store, nil) + r1.limits = ruleLimits{evalDelay: 0, tenantShard: 1} + + require.NoError(t, services.StartAndAwaitRunning(context.Background(), r1)) + t.Cleanup(r1.StopAsync) + + err := kvStore.CAS(context.Background(), ringKey, func(in interface{}) (out interface{}, retry bool, err error) { + d, _ := in.(*ring.Desc) + if d == nil { + d = ring.NewDesc() + } + d.AddIngester(ruler1, fmt.Sprintf("%v:%v", ruler1Host, ruler1Port), "", []uint32{0}, ring.ACTIVE, time.Now()) + return d, true, nil + }) + + require.NoError(t, err) + + test.Poll(t, time.Second*5, true, func() interface{} { + return len(r1.manager.GetRules(user2)) > 0 && + len(r1.manager.GetRules(user3)) > 0 + }) + + returned, err := r1.listRules(context.Background()) + require.NoError(t, err) + require.Equal(t, returned, allRules) + require.Equal(t, 2, len(manager.userManagers)) +} + func TestDeleteTenantRuleGroups(t *testing.T) { ruleGroups := []ruleGroupKey{ {user: "userA", namespace: "namespace", group: "group"}, @@ -1267,7 +1343,7 @@ type ruleGroupKey struct { } func TestRuler_ListAllRules(t *testing.T) { - store := newMockRuleStore(mockRules) + store := newMockRuleStore(mockRules, nil) cfg := defaultRulerConfig(t) r := newTestRuler(t, cfg, store, nil) @@ -1400,7 +1476,7 @@ func TestRecoverAlertsPostOutage(t *testing.T) { } // NEXT, set up ruler config with outage tolerance = 1hr - store := newMockRuleStore(mockRules) + store := newMockRuleStore(mockRules, nil) rulerCfg := defaultRulerConfig(t) rulerCfg.OutageTolerance, _ = time.ParseDuration("1h") @@ -1434,7 +1510,7 @@ func TestRecoverAlertsPostOutage(t *testing.T) { } // create a ruler but don't start it. instead, we'll evaluate the rule groups manually. - r := buildRuler(t, rulerCfg, &querier.TestConfig{Cfg: querierConfig, Distributor: d, Stores: queryables}, store, nil) + r, _ := buildRuler(t, rulerCfg, &querier.TestConfig{Cfg: querierConfig, Distributor: d, Stores: queryables}, store, nil) r.syncRules(context.Background(), rulerSyncReasonInitial) // assert initial state of rule group @@ -1638,7 +1714,7 @@ func TestRulerDisablesRuleGroups(t *testing.T) { t.Cleanup(func() { assert.NoError(t, closer.Close()) }) setupRuler := func(id string, host string, port int, forceRing *ring.Ring) *Ruler { - store := newMockRuleStore(tc.rules) + store := newMockRuleStore(tc.rules, nil) cfg := Config{ EnableSharding: tc.sharding, ShardingStrategy: tc.shardingStrategy, @@ -1654,7 +1730,7 @@ func TestRulerDisablesRuleGroups(t *testing.T) { FlushCheckPeriod: 0, } - r := buildRuler(t, cfg, nil, store, nil) + r, _ := buildRuler(t, cfg, nil, store, nil) r.limits = ruleLimits{evalDelay: 0, tenantShard: 3, disabledRuleGroups: tc.disabledRuleGroups} if forceRing != nil { diff --git a/pkg/ruler/rulestore/bucketclient/bucket_client.go b/pkg/ruler/rulestore/bucketclient/bucket_client.go index 21f47f5570..ae9dd2c05f 100644 --- a/pkg/ruler/rulestore/bucketclient/bucket_client.go +++ b/pkg/ruler/rulestore/bucketclient/bucket_client.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "strings" + "sync" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -18,6 +19,7 @@ import ( "github.com/cortexproject/cortex/pkg/ruler/rulespb" "github.com/cortexproject/cortex/pkg/ruler/rulestore" "github.com/cortexproject/cortex/pkg/storage/bucket" + "github.com/cortexproject/cortex/pkg/util/multierror" ) const ( @@ -61,6 +63,11 @@ func (b *BucketRuleStore) getRuleGroup(ctx context.Context, userID, namespace, g return nil, rulestore.ErrGroupNotFound } + if userBucket.IsAccessDeniedErr(err) { + level.Debug(b.logger).Log("msg", "permission denied when loading group", "user", userID, "key", objectKey) + return nil, rulestore.ErrAccessDenied + } + if err != nil { return nil, errors.Wrapf(err, "failed to get rule group %s", objectKey) } @@ -165,8 +172,11 @@ func (b *BucketRuleStore) ListRuleGroupsForUserAndNamespace(ctx context.Context, } // LoadRuleGroups implements rules.RuleStore. -func (b *BucketRuleStore) LoadRuleGroups(ctx context.Context, groupsToLoad map[string]rulespb.RuleGroupList) error { +func (b *BucketRuleStore) LoadRuleGroups(ctx context.Context, groupsToLoad map[string]rulespb.RuleGroupList) (map[string]rulespb.RuleGroupList, error) { ch := make(chan *rulespb.RuleGroupDesc) + loadedGroups := make(map[string]rulespb.RuleGroupList, len(groupsToLoad)) + errs := multierror.MultiError{} + m := sync.Mutex{} // Given we store one file per rule group. With this, we create a pool of workers that will // download all rule groups in parallel. We limit the number of workers to avoid a @@ -177,17 +187,29 @@ func (b *BucketRuleStore) LoadRuleGroups(ctx context.Context, groupsToLoad map[s for gr := range ch { user, namespace, group := gr.GetUser(), gr.GetNamespace(), gr.GetName() if user == "" || namespace == "" || group == "" { - return fmt.Errorf("invalid rule group: user=%q, namespace=%q, group=%q", user, namespace, group) + m.Lock() + errs.Add(fmt.Errorf("invalid rule group: user=%q, namespace=%q, group=%q", user, namespace, group)) + m.Unlock() + continue } gr, err := b.getRuleGroup(gCtx, user, namespace, group, gr) // reuse group pointer from the map. if err != nil { - return errors.Wrapf(err, "get rule group user=%q, namespace=%q, name=%q", user, namespace, group) + m.Lock() + errs.Add(errors.Wrapf(err, "get rule group user=%q, namespace=%q, name=%q", user, namespace, group)) + m.Unlock() + continue } if user != gr.User || namespace != gr.Namespace || group != gr.Name { - return fmt.Errorf("mismatch between requested rule group and loaded rule group, requested: user=%q, namespace=%q, group=%q, loaded: user=%q, namespace=%q, group=%q", user, namespace, group, gr.User, gr.Namespace, gr.Name) + m.Lock() + errs.Add(fmt.Errorf("mismatch between requested rule group and loaded rule group, requested: user=%q, namespace=%q, group=%q, loaded: user=%q, namespace=%q, group=%q", user, namespace, group, gr.User, gr.Namespace, gr.Name)) + m.Unlock() + continue } + m.Lock() + loadedGroups[user] = append(loadedGroups[user], gr) + m.Unlock() } return nil @@ -210,7 +232,11 @@ outer: } close(ch) - return g.Wait() + if e := g.Wait(); e != nil { + return loadedGroups, e + } + + return loadedGroups, errs.Err() } // GetRuleGroup implements rules.RuleStore. diff --git a/pkg/ruler/rulestore/bucketclient/bucket_client_test.go b/pkg/ruler/rulestore/bucketclient/bucket_client_test.go index 899176c901..d371acd42f 100644 --- a/pkg/ruler/rulestore/bucketclient/bucket_client_test.go +++ b/pkg/ruler/rulestore/bucketclient/bucket_client_test.go @@ -19,6 +19,7 @@ import ( "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/ruler/rulespb" "github.com/cortexproject/cortex/pkg/ruler/rulestore" + "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" ) type testGroup struct { @@ -101,6 +102,35 @@ func TestListRules(t *testing.T) { }) } +func TestLoadPartialRules(t *testing.T) { + bucketClient := objstore.NewInMemBucket() + mockedBucketClient := &testutil.MockBucketFailure{Bucket: bucketClient, GetFailures: map[string]error{}} + bucketStore := NewBucketRuleStore(mockedBucketClient, nil, log.NewNopLogger()) + + groups := []testGroup{ + {user: "user1", namespace: "hello", ruleGroup: rulefmt.RuleGroup{Name: "second testGroup", Interval: model.Duration(2 * time.Minute)}}, + {user: "user2", namespace: "+-!@#$%. ", ruleGroup: rulefmt.RuleGroup{Name: "different user", Interval: model.Duration(5 * time.Minute)}}, + {user: "user3", namespace: "+-!@#$%. ", ruleGroup: rulefmt.RuleGroup{Name: "different user", Interval: model.Duration(5 * time.Minute)}}, + } + + for _, g := range groups { + desc := rulespb.ToProto(g.user, g.namespace, g.ruleGroup) + require.NoError(t, bucketStore.SetRuleGroup(context.Background(), g.user, g.namespace, desc)) + } + allGroups, err := bucketStore.ListAllRuleGroups(context.Background()) + require.NoError(t, err) + + loadedGroups, err := bucketStore.LoadRuleGroups(context.Background(), allGroups) + require.NoError(t, err) + require.Equal(t, 3, len(loadedGroups)) + + // Fail user1 + mockedBucketClient.GetFailures["rules/user2"] = testutil.ErrKeyAccessDeniedError + loadedGroups, err = bucketStore.LoadRuleGroups(context.Background(), allGroups) + require.ErrorContains(t, err, "access denied") + require.Equal(t, 2, len(loadedGroups)) +} + func TestLoadRules(t *testing.T) { runForEachRuleStore(t, func(t *testing.T, rs rulestore.RuleStore, _ interface{}) { groups := []testGroup{ @@ -134,7 +164,7 @@ func TestLoadRules(t *testing.T) { }, allGroupsMap["user2"]) } - err = rs.LoadRuleGroups(context.Background(), allGroupsMap) + allGroupsMap, err = rs.LoadRuleGroups(context.Background(), allGroupsMap) require.NoError(t, err) // After load, rules are loaded. @@ -160,11 +190,13 @@ func TestLoadRules(t *testing.T) { // Loading group with mismatched info fails. require.NoError(t, rs.SetRuleGroup(context.Background(), "user1", "hello", &rulespb.RuleGroupDesc{User: "user2", Namespace: "world", Name: "first testGroup"})) - require.EqualError(t, rs.LoadRuleGroups(context.Background(), allGroupsMap), "mismatch between requested rule group and loaded rule group, requested: user=\"user1\", namespace=\"hello\", group=\"first testGroup\", loaded: user=\"user2\", namespace=\"world\", group=\"first testGroup\"") + _, err = rs.LoadRuleGroups(context.Background(), allGroupsMap) + require.EqualError(t, err, "mismatch between requested rule group and loaded rule group, requested: user=\"user1\", namespace=\"hello\", group=\"first testGroup\", loaded: user=\"user2\", namespace=\"world\", group=\"first testGroup\"") // Load with missing rule groups fails. require.NoError(t, rs.DeleteRuleGroup(context.Background(), "user1", "hello", "first testGroup")) - require.EqualError(t, rs.LoadRuleGroups(context.Background(), allGroupsMap), "get rule group user=\"user2\", namespace=\"world\", name=\"first testGroup\": group does not exist") + _, err = rs.LoadRuleGroups(context.Background(), allGroupsMap) + require.EqualError(t, err, "get rule group user=\"user2\", namespace=\"world\", name=\"first testGroup\": group does not exist") }) } diff --git a/pkg/ruler/rulestore/configdb/store.go b/pkg/ruler/rulestore/configdb/store.go index 3400773128..f29b126116 100644 --- a/pkg/ruler/rulestore/configdb/store.go +++ b/pkg/ruler/rulestore/configdb/store.go @@ -110,9 +110,9 @@ func (c *ConfigRuleStore) ListRuleGroupsForUserAndNamespace(ctx context.Context, return list, nil } -func (c *ConfigRuleStore) LoadRuleGroups(ctx context.Context, groupsToLoad map[string]rulespb.RuleGroupList) error { +func (c *ConfigRuleStore) LoadRuleGroups(ctx context.Context, groupsToLoad map[string]rulespb.RuleGroupList) (map[string]rulespb.RuleGroupList, error) { // Since ConfigRuleStore already Loads the rules in the List methods, there is nothing left to do here. - return nil + return groupsToLoad, nil } // GetRuleGroup is not implemented diff --git a/pkg/ruler/rulestore/local/local.go b/pkg/ruler/rulestore/local/local.go index 131ad3fa0d..333687e6e5 100644 --- a/pkg/ruler/rulestore/local/local.go +++ b/pkg/ruler/rulestore/local/local.go @@ -103,9 +103,9 @@ func (l *Client) ListRuleGroupsForUserAndNamespace(ctx context.Context, userID s return l.loadAllRulesGroupsForUser(ctx, userID) } -func (l *Client) LoadRuleGroups(_ context.Context, _ map[string]rulespb.RuleGroupList) error { +func (l *Client) LoadRuleGroups(_ context.Context, load map[string]rulespb.RuleGroupList) (map[string]rulespb.RuleGroupList, error) { // This Client already loads the rules in its List methods, there is nothing left to do here. - return nil + return load, nil } // GetRuleGroup implements RuleStore diff --git a/pkg/ruler/rulestore/store.go b/pkg/ruler/rulestore/store.go index d8b97ed05a..b247ebe281 100644 --- a/pkg/ruler/rulestore/store.go +++ b/pkg/ruler/rulestore/store.go @@ -10,6 +10,8 @@ import ( var ( // ErrGroupNotFound is returned if a rule group does not exist ErrGroupNotFound = errors.New("group does not exist") + // ErrAccessDenied is returned access denied error was returned when trying to laod the group + ErrAccessDenied = errors.New("access denied") // ErrGroupNamespaceNotFound is returned if a namespace does not exist ErrGroupNamespaceNotFound = errors.New("group namespace does not exist") // ErrUserNotFound is returned if the user does not currently exist @@ -30,10 +32,11 @@ type RuleStore interface { // If namespace is empty, groups from all namespaces are returned. ListRuleGroupsForUserAndNamespace(ctx context.Context, userID string, namespace string) (rulespb.RuleGroupList, error) - // LoadRuleGroups loads rules for each rule group in the map. + // LoadRuleGroups try to load rules for each rule group in the map. + // Returns a map with the loaded groups and any eventual error occurred while loading the groups // Parameter with groups to load *MUST* be coming from one of the List methods. // Reason is that some implementations don't do anything, since their List method already loads the rules. - LoadRuleGroups(ctx context.Context, groupsToLoad map[string]rulespb.RuleGroupList) error + LoadRuleGroups(ctx context.Context, groupsToLoad map[string]rulespb.RuleGroupList) (map[string]rulespb.RuleGroupList, error) GetRuleGroup(ctx context.Context, userID, namespace, group string) (*rulespb.RuleGroupDesc, error) SetRuleGroup(ctx context.Context, userID, namespace string, group *rulespb.RuleGroupDesc) error diff --git a/pkg/ruler/store_mock_test.go b/pkg/ruler/store_mock_test.go index 2f1ad78a3f..da3707e840 100644 --- a/pkg/ruler/store_mock_test.go +++ b/pkg/ruler/store_mock_test.go @@ -12,8 +12,9 @@ import ( ) type mockRuleStore struct { - rules map[string]rulespb.RuleGroupList - mtx sync.Mutex + rules map[string]rulespb.RuleGroupList + errorMap map[string]error + mtx sync.Mutex } var ( @@ -133,9 +134,10 @@ var ( } ) -func newMockRuleStore(rules map[string]rulespb.RuleGroupList) *mockRuleStore { +func newMockRuleStore(rules map[string]rulespb.RuleGroupList, errorMap map[string]error) *mockRuleStore { return &mockRuleStore{ - rules: rules, + rules: rules, + errorMap: errorMap, } } @@ -189,9 +191,11 @@ func (m *mockRuleStore) ListRuleGroupsForUserAndNamespace(_ context.Context, use return result, nil } -func (m *mockRuleStore) LoadRuleGroups(ctx context.Context, groupsToLoad map[string]rulespb.RuleGroupList) error { +func (m *mockRuleStore) LoadRuleGroups(ctx context.Context, groupsToLoad map[string]rulespb.RuleGroupList) (map[string]rulespb.RuleGroupList, error) { m.mtx.Lock() defer m.mtx.Unlock() + result := make(map[string]rulespb.RuleGroupList, len(groupsToLoad)) + var err error gm := make(map[string]*rulespb.RuleGroupDesc) for _, gs := range m.rules { @@ -205,15 +209,20 @@ func (m *mockRuleStore) LoadRuleGroups(ctx context.Context, groupsToLoad map[str for _, gs := range groupsToLoad { for _, gr := range gs { user, namespace, name := gr.GetUser(), gr.GetNamespace(), gr.GetName() + if e, ok := m.errorMap[user]; ok { + err = e + continue + } key := user + delim + base64.URLEncoding.EncodeToString([]byte(namespace)) + delim + base64.URLEncoding.EncodeToString([]byte(name)) mgr, ok := gm[key] if !ok { - return fmt.Errorf("failed to get rule group user %s", gr.GetUser()) + return nil, fmt.Errorf("failed to get rule group user %s", gr.GetUser()) } *gr = *mgr + result[user] = append(result[user], gr) } } - return nil + return result, err } func (m *mockRuleStore) GetRuleGroup(_ context.Context, userID string, namespace string, group string) (*rulespb.RuleGroupDesc, error) {