diff --git a/pkg/ring/basic_lifecycler.go b/pkg/ring/basic_lifecycler.go index 19b3e9cb40..5fa6f463c0 100644 --- a/pkg/ring/basic_lifecycler.go +++ b/pkg/ring/basic_lifecycler.go @@ -469,20 +469,21 @@ func (l *BasicLifecycler) heartbeat(ctx context.Context) { // changeState of the instance within the ring. This function is guaranteed // to be called within the lifecycler main goroutine. func (l *BasicLifecycler) changeState(ctx context.Context, state InstanceState) error { + oldState := l.GetState() err := l.updateInstance(ctx, func(_ *Desc, i *InstanceDesc) bool { // No-op if the state hasn't changed. if i.State == state { return false } - i.State = state return true }) if err != nil { - level.Warn(l.logger).Log("msg", "failed to change instance state in the ring", "from", l.GetState(), "to", state, "err", err) + level.Info(l.logger).Log("msg", "failed to change instance state in the ring", "from", oldState, "to", state, "ring", l.ringName, "err", err) + } else { + level.Info(l.logger).Log("msg", "successfully changed instance state from", "old_state", oldState, "to", state, "ring", l.ringName) } - return err } diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index b231014ed7..aad67332ef 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -930,7 +930,7 @@ func NewOp(healthyStates []InstanceState, shouldExtendReplicaSet func(s Instance } if shouldExtendReplicaSet != nil { - for _, s := range []InstanceState{ACTIVE, LEAVING, PENDING, JOINING, LEAVING, LEFT} { + for _, s := range []InstanceState{ACTIVE, LEAVING, PENDING, JOINING, LEFT} { if shouldExtendReplicaSet(s) { op |= (0x10000 << s) } diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index 29799acc45..7476eaadbe 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -878,7 +878,7 @@ func (r *Ruler) getShardedRules(ctx context.Context, userID string, rulesRequest ring = r.ring.ShuffleShard(userID, shardSize) } - rulers, err := ring.GetReplicationSetForOperation(RingOp) + rulers, err := ring.GetReplicationSetForOperation(ListRuleRingOp) if err != nil { return nil, err } diff --git a/pkg/ruler/ruler_ring.go b/pkg/ruler/ruler_ring.go index 1f5422f060..a2078d56db 100644 --- a/pkg/ruler/ruler_ring.go +++ b/pkg/ruler/ruler_ring.go @@ -26,6 +26,13 @@ var RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, func(s ring.InstanceS return s != ring.ACTIVE }) +// ListRuleRingOp is the operation used for getting rule groups from rulers. +var ListRuleRingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE, ring.LEAVING}, func(s ring.InstanceState) bool { + // Although LEAVING ruler does not get any rule groups. If it is excluded, list rule will fail because not enough healthy instance. + // So we still consider LEAVING as healthy. We also want to extend the listRule calls when the instance in the shard is not ACTIVE + return s != ring.ACTIVE +}) + // RingConfig masks the ring lifecycler config which contains // many options not really required by the rulers ring. This config // is used to strip down the config to the minimum, and avoid confusion diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index aa3d5e0e02..35e6fdb16d 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -340,6 +340,8 @@ func TestGetRules(t *testing.T) { shuffleShardSize int rulesRequest RulesRequest expectedCount map[string]int + rulerStateMap map[string]ring.InstanceState + expectedError error } ruleMap := rulesMap{ @@ -433,6 +435,24 @@ func TestGetRules(t *testing.T) { }, } + rulerStateMapAllActive := map[string]ring.InstanceState{ + "ruler1": ring.ACTIVE, + "ruler2": ring.ACTIVE, + "ruler3": ring.ACTIVE, + } + + rulerStateMapOneLeaving := map[string]ring.InstanceState{ + "ruler1": ring.ACTIVE, + "ruler2": ring.LEAVING, + "ruler3": ring.ACTIVE, + } + + rulerStateMapOnePending := map[string]ring.InstanceState{ + "ruler1": ring.ACTIVE, + "ruler2": ring.PENDING, + "ruler3": ring.ACTIVE, + } + expectedRules := expectedRulesMap{ "ruler1": map[string]rulespb.RuleGroupList{ "user1": { @@ -472,6 +492,7 @@ func TestGetRules(t *testing.T) { rulesRequest: RulesRequest{ Type: alertingRuleFilter, }, + rulerStateMap: rulerStateMapAllActive, expectedCount: map[string]int{ "user1": 2, "user2": 4, @@ -481,6 +502,7 @@ func TestGetRules(t *testing.T) { "Default Sharding with No Filter": { sharding: true, shardingStrategy: util.ShardingStrategyDefault, + rulerStateMap: rulerStateMapAllActive, expectedCount: map[string]int{ "user1": 5, "user2": 9, @@ -491,6 +513,7 @@ func TestGetRules(t *testing.T) { sharding: true, shuffleShardSize: 2, shardingStrategy: util.ShardingStrategyShuffle, + rulerStateMap: rulerStateMapAllActive, rulesRequest: RulesRequest{ Type: recordingRuleFilter, }, @@ -507,6 +530,7 @@ func TestGetRules(t *testing.T) { rulesRequest: RulesRequest{ RuleGroupNames: []string{"third"}, }, + rulerStateMap: rulerStateMapAllActive, expectedCount: map[string]int{ "user1": 2, "user2": 1, @@ -517,6 +541,7 @@ func TestGetRules(t *testing.T) { sharding: true, shuffleShardSize: 2, shardingStrategy: util.ShardingStrategyShuffle, + rulerStateMap: rulerStateMapAllActive, rulesRequest: RulesRequest{ RuleGroupNames: []string{"second", "third"}, Type: recordingRuleFilter, @@ -531,6 +556,7 @@ func TestGetRules(t *testing.T) { sharding: true, shuffleShardSize: 2, shardingStrategy: util.ShardingStrategyShuffle, + rulerStateMap: rulerStateMapAllActive, rulesRequest: RulesRequest{ Type: alertingRuleFilter, Files: []string{"latency-test"}, @@ -541,6 +567,30 @@ func TestGetRules(t *testing.T) { "user3": 1, }, }, + "Shuffle Sharding and ShardSize = 2 with Rule Type Filter and one ruler is in LEAVING state": { + sharding: true, + shuffleShardSize: 2, + shardingStrategy: util.ShardingStrategyShuffle, + rulerStateMap: rulerStateMapOneLeaving, + rulesRequest: RulesRequest{ + Type: recordingRuleFilter, + }, + expectedCount: map[string]int{ + "user1": 3, + "user2": 5, + "user3": 1, + }, + }, + "Shuffle Sharding and ShardSize = 2 with Rule Type Filter and one ruler is in Pending state": { + sharding: true, + shuffleShardSize: 2, + shardingStrategy: util.ShardingStrategyShuffle, + rulerStateMap: rulerStateMapOnePending, + rulesRequest: RulesRequest{ + Type: recordingRuleFilter, + }, + expectedError: ring.ErrTooManyUnhealthyInstances, + }, } for name, tc := range testCases { @@ -593,7 +643,7 @@ func TestGetRules(t *testing.T) { d = ring.NewDesc() } for rID, tokens := range allTokensByRuler { - d.AddIngester(rID, rulerAddrMap[rID].lifecycler.GetInstanceAddr(), "", tokens, ring.ACTIVE, time.Now()) + d.AddIngester(rID, rulerAddrMap[rID].lifecycler.GetInstanceAddr(), "", tokens, tc.rulerStateMap[rID], time.Now()) } return d, true, nil }) @@ -616,7 +666,12 @@ func TestGetRules(t *testing.T) { ctx := user.InjectOrgID(context.Background(), u) forEachRuler(func(_ string, r *Ruler) { ruleStateDescriptions, err := r.GetRules(ctx, tc.rulesRequest) - require.NoError(t, err) + if tc.expectedError != nil { + require.Error(t, tc.expectedError) + return + } else { + require.NoError(t, err) + } rct := 0 for _, ruleStateDesc := range ruleStateDescriptions { rct += len(ruleStateDesc.ActiveRules)