Skip to content

Commit

Permalink
Fail to load singe rulegroup cause rulers to halt the sync of all rul…
Browse files Browse the repository at this point in the history
…egroups. (#5563)

* Not fail to load remaining rules when a single ruler fail

Signed-off-by: Alan Protasio <[email protected]>

* changelog

Signed-off-by: Alan Protasio <[email protected]>

* comments

Signed-off-by: Alan Protasio <[email protected]>

---------

Signed-off-by: Alan Protasio <[email protected]>
  • Loading branch information
alanprot authored Sep 15, 2023
1 parent a897070 commit 6a49e3b
Show file tree
Hide file tree
Showing 12 changed files with 201 additions and 55 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
* [BUGFIX] Query Frontend: Handle context error before decoding and merging responses. #5499
* [BUGFIX] DDBKV: When no change detected in ring, retry the CAS until there is change. #5502
* [BUGFIX] Fix bug on objstore when configured to use S3 fips endpoints. #5540
* [BUGFIX] Ruler: Fix bug on ruler where a failure to load a single RuleGroup would prevent rulers to sync all RuleGroup. #5563

## 1.15.1 2023-04-26

Expand Down
2 changes: 1 addition & 1 deletion pkg/ruler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ func (a *API) ListRules(w http.ResponseWriter, req *http.Request) {
return
}

err = a.store.LoadRuleGroups(req.Context(), map[string]rulespb.RuleGroupList{userID: rgs})
_, err = a.store.LoadRuleGroups(req.Context(), map[string]rulespb.RuleGroupList{userID: rgs})
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
Expand Down
18 changes: 9 additions & 9 deletions pkg/ruler/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
)

func TestRuler_rules(t *testing.T) {
store := newMockRuleStore(mockRules)
store := newMockRuleStore(mockRules, nil)
cfg := defaultRulerConfig(t)

r := newTestRuler(t, cfg, store, nil)
Expand Down Expand Up @@ -76,7 +76,7 @@ func TestRuler_rules(t *testing.T) {
}

func TestRuler_rules_special_characters(t *testing.T) {
store := newMockRuleStore(mockSpecialCharRules)
store := newMockRuleStore(mockSpecialCharRules, nil)
cfg := defaultRulerConfig(t)

r := newTestRuler(t, cfg, store, nil)
Expand Down Expand Up @@ -131,7 +131,7 @@ func TestRuler_rules_special_characters(t *testing.T) {
}

func TestRuler_rules_limit(t *testing.T) {
store := newMockRuleStore(mockRulesLimit)
store := newMockRuleStore(mockRulesLimit, nil)
cfg := defaultRulerConfig(t)

r := newTestRuler(t, cfg, store, nil)
Expand Down Expand Up @@ -185,7 +185,7 @@ func TestRuler_rules_limit(t *testing.T) {
require.Equal(t, string(expectedResponse), string(body))
}
func TestRuler_alerts(t *testing.T) {
store := newMockRuleStore(mockRules)
store := newMockRuleStore(mockRules, nil)
cfg := defaultRulerConfig(t)

r := newTestRuler(t, cfg, store, nil)
Expand Down Expand Up @@ -220,7 +220,7 @@ func TestRuler_alerts(t *testing.T) {
}

func TestRuler_Create(t *testing.T) {
store := newMockRuleStore(make(map[string]rulespb.RuleGroupList))
store := newMockRuleStore(make(map[string]rulespb.RuleGroupList), nil)
cfg := defaultRulerConfig(t)

r := newTestRuler(t, cfg, store, nil)
Expand Down Expand Up @@ -310,7 +310,7 @@ rules:
}

func TestRuler_DeleteNamespace(t *testing.T) {
store := newMockRuleStore(mockRulesNamespaces)
store := newMockRuleStore(mockRulesNamespaces, nil)
cfg := defaultRulerConfig(t)

r := newTestRuler(t, cfg, store, nil)
Expand Down Expand Up @@ -348,7 +348,7 @@ func TestRuler_DeleteNamespace(t *testing.T) {
}

func TestRuler_LimitsPerGroup(t *testing.T) {
store := newMockRuleStore(make(map[string]rulespb.RuleGroupList))
store := newMockRuleStore(make(map[string]rulespb.RuleGroupList), nil)
cfg := defaultRulerConfig(t)

r := newTestRuler(t, cfg, store, nil)
Expand Down Expand Up @@ -402,7 +402,7 @@ rules:
}

func TestRuler_RulerGroupLimits(t *testing.T) {
store := newMockRuleStore(make(map[string]rulespb.RuleGroupList))
store := newMockRuleStore(make(map[string]rulespb.RuleGroupList), nil)
cfg := defaultRulerConfig(t)

r := newTestRuler(t, cfg, store, nil)
Expand Down Expand Up @@ -463,7 +463,7 @@ rules:
}

func TestRuler_ProtoToRuleGroupYamlConvertion(t *testing.T) {
store := newMockRuleStore(make(map[string]rulespb.RuleGroupList))
store := newMockRuleStore(make(map[string]rulespb.RuleGroupList), nil)
cfg := defaultRulerConfig(t)

r := newTestRuler(t, cfg, store, nil)
Expand Down
8 changes: 4 additions & 4 deletions pkg/ruler/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import (
func TestRulerShutdown(t *testing.T) {
ctx := context.Background()

store := newMockRuleStore(mockRules)
store := newMockRuleStore(mockRules, nil)
config := defaultRulerConfig(t)

r := buildRuler(t, config, nil, store, nil)
r, _ := buildRuler(t, config, nil, store, nil)

r.cfg.EnableSharding = true
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
Expand Down Expand Up @@ -55,9 +55,9 @@ func TestRuler_RingLifecyclerShouldAutoForgetUnhealthyInstances(t *testing.T) {
const heartbeatTimeout = time.Minute

ctx := context.Background()
store := newMockRuleStore(mockRules)
store := newMockRuleStore(mockRules, nil)
config := defaultRulerConfig(t)
r := buildRuler(t, config, nil, store, nil)
r, _ := buildRuler(t, config, nil, store, nil)
r.cfg.EnableSharding = true
r.cfg.Ring.HeartbeatPeriod = 100 * time.Millisecond
r.cfg.Ring.HeartbeatTimeout = heartbeatTimeout
Expand Down
9 changes: 4 additions & 5 deletions pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,14 +519,13 @@ func (r *Ruler) syncRules(ctx context.Context, reason string) {
return
}

err = r.store.LoadRuleGroups(ctx, configs)
loadedConfigs, err := r.store.LoadRuleGroups(ctx, configs)
if err != nil {
level.Error(r.logger).Log("msg", "unable to load rules owned by this ruler", "err", err)
return
level.Warn(r.logger).Log("msg", "failed to load some rules owned by this ruler", "count", len(configs)-len(loadedConfigs), "err", err)
}

// This will also delete local group files for users that are no longer in 'configs' map.
r.manager.SyncRuleGroups(ctx, configs)
r.manager.SyncRuleGroups(ctx, loadedConfigs)
}

func (r *Ruler) listRules(ctx context.Context) (result map[string]rulespb.RuleGroupList, err error) {
Expand Down Expand Up @@ -983,7 +982,7 @@ func (r *Ruler) ListAllRules(w http.ResponseWriter, req *http.Request) {
return errors.Wrapf(err, "failed to fetch ruler config for user %s", userID)
}
userRules := map[string]rulespb.RuleGroupList{userID: rg}
if err := r.store.LoadRuleGroups(ctx, userRules); err != nil {
if userRules, err = r.store.LoadRuleGroups(ctx, userRules); err != nil {
return errors.Wrapf(err, "failed to load ruler config for user %s", userID)
}
data := map[string]map[string][]rulefmt.RuleGroup{userID: userRules[userID].Formatted()}
Expand Down
106 changes: 91 additions & 15 deletions pkg/ruler/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math/rand"
"net/http"
"net/http/httptest"
"net/url"
"os"
"reflect"
"sort"
Expand All @@ -18,8 +19,6 @@ import (

"github.com/thanos-io/objstore"

"github.com/cortexproject/cortex/pkg/ruler/rulestore/bucketclient"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gorilla/mux"
Expand Down Expand Up @@ -48,10 +47,12 @@ import (
"github.com/cortexproject/cortex/pkg/ring/kv/consul"
"github.com/cortexproject/cortex/pkg/ruler/rulespb"
"github.com/cortexproject/cortex/pkg/ruler/rulestore"
"github.com/cortexproject/cortex/pkg/ruler/rulestore/bucketclient"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/pkg/util/test"
"github.com/cortexproject/cortex/pkg/util/validation"
)

Expand Down Expand Up @@ -218,7 +219,7 @@ func newMockClientsPool(cfg Config, logger log.Logger, reg prometheus.Registerer
}
}

func buildRuler(t *testing.T, rulerConfig Config, querierTestConfig *querier.TestConfig, store rulestore.RuleStore, rulerAddrMap map[string]*Ruler) *Ruler {
func buildRuler(t *testing.T, rulerConfig Config, querierTestConfig *querier.TestConfig, store rulestore.RuleStore, rulerAddrMap map[string]*Ruler) (*Ruler, *DefaultMultiTenantManager) {
engine, queryable, pusher, logger, overrides, reg := testSetup(t, querierTestConfig)

managerFactory := DefaultTenantManagerFactory(rulerConfig, pusher, queryable, engine, overrides, reg)
Expand All @@ -235,11 +236,11 @@ func buildRuler(t *testing.T, rulerConfig Config, querierTestConfig *querier.Tes
newMockClientsPool(rulerConfig, logger, reg, rulerAddrMap),
)
require.NoError(t, err)
return ruler
return ruler, manager
}

func newTestRuler(t *testing.T, rulerConfig Config, store rulestore.RuleStore, querierTestConfig *querier.TestConfig) *Ruler {
ruler := buildRuler(t, rulerConfig, querierTestConfig, store, nil)
ruler, _ := buildRuler(t, rulerConfig, querierTestConfig, store, nil)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ruler))

// Ensure all rules are loaded before usage
Expand Down Expand Up @@ -293,7 +294,7 @@ func TestNotifierSendsUserIDHeader(t *testing.T) {
}

func TestRuler_Rules(t *testing.T) {
store := newMockRuleStore(mockRules)
store := newMockRuleStore(mockRules, nil)
cfg := defaultRulerConfig(t)

r := newTestRuler(t, cfg, store, nil)
Expand Down Expand Up @@ -552,7 +553,7 @@ func TestGetRules(t *testing.T) {
rulerAddrMap := map[string]*Ruler{}

createRuler := func(id string) *Ruler {
store := newMockRuleStore(allRulesByUser)
store := newMockRuleStore(allRulesByUser, nil)
cfg := defaultRulerConfig(t)

cfg.ShardingStrategy = tc.shardingStrategy
Expand All @@ -566,7 +567,7 @@ func TestGetRules(t *testing.T) {
},
}

r := buildRuler(t, cfg, nil, store, rulerAddrMap)
r, _ := buildRuler(t, cfg, nil, store, rulerAddrMap)
r.limits = ruleLimits{evalDelay: 0, tenantShard: tc.shuffleShardSize}
rulerAddrMap[id] = r
if r.ring != nil {
Expand Down Expand Up @@ -1053,7 +1054,7 @@ func TestSharding(t *testing.T) {
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

setupRuler := func(id string, host string, port int, forceRing *ring.Ring) *Ruler {
store := newMockRuleStore(allRules)
store := newMockRuleStore(allRules, nil)
cfg := Config{
EnableSharding: tc.sharding,
ShardingStrategy: tc.shardingStrategy,
Expand All @@ -1071,7 +1072,7 @@ func TestSharding(t *testing.T) {
DisabledTenants: tc.disabledUsers,
}

r := buildRuler(t, cfg, nil, store, nil)
r, _ := buildRuler(t, cfg, nil, store, nil)
r.limits = ruleLimits{evalDelay: 0, tenantShard: tc.shuffleShardSize}

if forceRing != nil {
Expand Down Expand Up @@ -1159,6 +1160,81 @@ func sortTokens(tokens []uint32) []uint32 {
return tokens
}

func Test_LoadPartialGroups(t *testing.T) {
const (
user1 = "user1"
user2 = "user2"
user3 = "user3"
)

const (
ruler1 = "ruler-1"
ruler1Host = "1.1.1.1"
ruler1Port = 9999
)

user1Group1 := &rulespb.RuleGroupDesc{User: user1, Namespace: "namespace", Name: "first", Interval: time.Minute}
user1Group2 := &rulespb.RuleGroupDesc{User: user1, Namespace: "namespace", Name: "second", Interval: time.Minute}
user2Group1 := &rulespb.RuleGroupDesc{User: user2, Namespace: "namespace", Name: "first", Interval: time.Minute}
user3Group1 := &rulespb.RuleGroupDesc{User: user3, Namespace: "namespace", Name: "first", Interval: time.Minute}

allRules := map[string]rulespb.RuleGroupList{
user1: {user1Group1, user1Group2},
user2: {user2Group1},
user3: {user3Group1},
}

kvStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

store := newMockRuleStore(allRules, map[string]error{user1: fmt.Errorf("test")})
u, _ := url.Parse("")
cfg := Config{
EnableSharding: true,
ExternalURL: flagext.URLValue{URL: u},
PollInterval: time.Millisecond * 100,
RingCheckPeriod: time.Minute,
ShardingStrategy: util.ShardingStrategyShuffle,
Ring: RingConfig{
InstanceID: ruler1,
InstanceAddr: ruler1Host,
InstancePort: ruler1Port,
KVStore: kv.Config{
Mock: kvStore,
},
HeartbeatTimeout: 1 * time.Minute,
},
FlushCheckPeriod: 0,
}

r1, manager := buildRuler(t, cfg, nil, store, nil)
r1.limits = ruleLimits{evalDelay: 0, tenantShard: 1}

require.NoError(t, services.StartAndAwaitRunning(context.Background(), r1))
t.Cleanup(r1.StopAsync)

err := kvStore.CAS(context.Background(), ringKey, func(in interface{}) (out interface{}, retry bool, err error) {
d, _ := in.(*ring.Desc)
if d == nil {
d = ring.NewDesc()
}
d.AddIngester(ruler1, fmt.Sprintf("%v:%v", ruler1Host, ruler1Port), "", []uint32{0}, ring.ACTIVE, time.Now())
return d, true, nil
})

require.NoError(t, err)

test.Poll(t, time.Second*5, true, func() interface{} {
return len(r1.manager.GetRules(user2)) > 0 &&
len(r1.manager.GetRules(user3)) > 0
})

returned, err := r1.listRules(context.Background())
require.NoError(t, err)
require.Equal(t, returned, allRules)
require.Equal(t, 2, len(manager.userManagers))
}

func TestDeleteTenantRuleGroups(t *testing.T) {
ruleGroups := []ruleGroupKey{
{user: "userA", namespace: "namespace", group: "group"},
Expand Down Expand Up @@ -1267,7 +1343,7 @@ type ruleGroupKey struct {
}

func TestRuler_ListAllRules(t *testing.T) {
store := newMockRuleStore(mockRules)
store := newMockRuleStore(mockRules, nil)
cfg := defaultRulerConfig(t)

r := newTestRuler(t, cfg, store, nil)
Expand Down Expand Up @@ -1400,7 +1476,7 @@ func TestRecoverAlertsPostOutage(t *testing.T) {
}

// NEXT, set up ruler config with outage tolerance = 1hr
store := newMockRuleStore(mockRules)
store := newMockRuleStore(mockRules, nil)
rulerCfg := defaultRulerConfig(t)
rulerCfg.OutageTolerance, _ = time.ParseDuration("1h")

Expand Down Expand Up @@ -1434,7 +1510,7 @@ func TestRecoverAlertsPostOutage(t *testing.T) {
}

// create a ruler but don't start it. instead, we'll evaluate the rule groups manually.
r := buildRuler(t, rulerCfg, &querier.TestConfig{Cfg: querierConfig, Distributor: d, Stores: queryables}, store, nil)
r, _ := buildRuler(t, rulerCfg, &querier.TestConfig{Cfg: querierConfig, Distributor: d, Stores: queryables}, store, nil)
r.syncRules(context.Background(), rulerSyncReasonInitial)

// assert initial state of rule group
Expand Down Expand Up @@ -1638,7 +1714,7 @@ func TestRulerDisablesRuleGroups(t *testing.T) {
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

setupRuler := func(id string, host string, port int, forceRing *ring.Ring) *Ruler {
store := newMockRuleStore(tc.rules)
store := newMockRuleStore(tc.rules, nil)
cfg := Config{
EnableSharding: tc.sharding,
ShardingStrategy: tc.shardingStrategy,
Expand All @@ -1654,7 +1730,7 @@ func TestRulerDisablesRuleGroups(t *testing.T) {
FlushCheckPeriod: 0,
}

r := buildRuler(t, cfg, nil, store, nil)
r, _ := buildRuler(t, cfg, nil, store, nil)
r.limits = ruleLimits{evalDelay: 0, tenantShard: 3, disabledRuleGroups: tc.disabledRuleGroups}

if forceRing != nil {
Expand Down
Loading

0 comments on commit 6a49e3b

Please sign in to comment.