Skip to content

Commit

Permalink
Add new ListRuleRingOp to ruler and add more logs to the changeState …
Browse files Browse the repository at this point in the history
…in basic lifecycler (#5707)

* considere leaving ruler as healthy when listrules

Signed-off-by: Wen Xu <[email protected]>

* add log to basic lifecycler changeState

Signed-off-by: Wen Xu <[email protected]>

* add unit test for cases when one ruler in leaving and case when one ruler is in pending

Signed-off-by: Wen Xu <[email protected]>

---------

Signed-off-by: Wen Xu <[email protected]>
  • Loading branch information
wenxu1024 authored Jan 11, 2024
1 parent 35c178c commit 30023dc
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 7 deletions.
7 changes: 4 additions & 3 deletions pkg/ring/basic_lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,20 +469,21 @@ func (l *BasicLifecycler) heartbeat(ctx context.Context) {
// changeState of the instance within the ring. This function is guaranteed
// to be called within the lifecycler main goroutine.
func (l *BasicLifecycler) changeState(ctx context.Context, state InstanceState) error {
oldState := l.GetState()
err := l.updateInstance(ctx, func(_ *Desc, i *InstanceDesc) bool {
// No-op if the state hasn't changed.
if i.State == state {
return false
}

i.State = state
return true
})

if err != nil {
level.Warn(l.logger).Log("msg", "failed to change instance state in the ring", "from", l.GetState(), "to", state, "err", err)
level.Info(l.logger).Log("msg", "failed to change instance state in the ring", "from", oldState, "to", state, "ring", l.ringName, "err", err)
} else {
level.Info(l.logger).Log("msg", "successfully changed instance state from", "old_state", oldState, "to", state, "ring", l.ringName)
}

return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,7 @@ func NewOp(healthyStates []InstanceState, shouldExtendReplicaSet func(s Instance
}

if shouldExtendReplicaSet != nil {
for _, s := range []InstanceState{ACTIVE, LEAVING, PENDING, JOINING, LEAVING, LEFT} {
for _, s := range []InstanceState{ACTIVE, LEAVING, PENDING, JOINING, LEFT} {
if shouldExtendReplicaSet(s) {
op |= (0x10000 << s)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,7 @@ func (r *Ruler) getShardedRules(ctx context.Context, userID string, rulesRequest
ring = r.ring.ShuffleShard(userID, shardSize)
}

rulers, err := ring.GetReplicationSetForOperation(RingOp)
rulers, err := ring.GetReplicationSetForOperation(ListRuleRingOp)
if err != nil {
return nil, err
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/ruler/ruler_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ var RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, func(s ring.InstanceS
return s != ring.ACTIVE
})

// ListRuleRingOp is the operation used for getting rule groups from rulers.
var ListRuleRingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE, ring.LEAVING}, func(s ring.InstanceState) bool {
// Although LEAVING ruler does not get any rule groups. If it is excluded, list rule will fail because not enough healthy instance.
// So we still consider LEAVING as healthy. We also want to extend the listRule calls when the instance in the shard is not ACTIVE
return s != ring.ACTIVE
})

// RingConfig masks the ring lifecycler config which contains
// many options not really required by the rulers ring. This config
// is used to strip down the config to the minimum, and avoid confusion
Expand Down
59 changes: 57 additions & 2 deletions pkg/ruler/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,8 @@ func TestGetRules(t *testing.T) {
shuffleShardSize int
rulesRequest RulesRequest
expectedCount map[string]int
rulerStateMap map[string]ring.InstanceState
expectedError error
}

ruleMap := rulesMap{
Expand Down Expand Up @@ -433,6 +435,24 @@ func TestGetRules(t *testing.T) {
},
}

rulerStateMapAllActive := map[string]ring.InstanceState{
"ruler1": ring.ACTIVE,
"ruler2": ring.ACTIVE,
"ruler3": ring.ACTIVE,
}

rulerStateMapOneLeaving := map[string]ring.InstanceState{
"ruler1": ring.ACTIVE,
"ruler2": ring.LEAVING,
"ruler3": ring.ACTIVE,
}

rulerStateMapOnePending := map[string]ring.InstanceState{
"ruler1": ring.ACTIVE,
"ruler2": ring.PENDING,
"ruler3": ring.ACTIVE,
}

expectedRules := expectedRulesMap{
"ruler1": map[string]rulespb.RuleGroupList{
"user1": {
Expand Down Expand Up @@ -472,6 +492,7 @@ func TestGetRules(t *testing.T) {
rulesRequest: RulesRequest{
Type: alertingRuleFilter,
},
rulerStateMap: rulerStateMapAllActive,
expectedCount: map[string]int{
"user1": 2,
"user2": 4,
Expand All @@ -481,6 +502,7 @@ func TestGetRules(t *testing.T) {
"Default Sharding with No Filter": {
sharding: true,
shardingStrategy: util.ShardingStrategyDefault,
rulerStateMap: rulerStateMapAllActive,
expectedCount: map[string]int{
"user1": 5,
"user2": 9,
Expand All @@ -491,6 +513,7 @@ func TestGetRules(t *testing.T) {
sharding: true,
shuffleShardSize: 2,
shardingStrategy: util.ShardingStrategyShuffle,
rulerStateMap: rulerStateMapAllActive,
rulesRequest: RulesRequest{
Type: recordingRuleFilter,
},
Expand All @@ -507,6 +530,7 @@ func TestGetRules(t *testing.T) {
rulesRequest: RulesRequest{
RuleGroupNames: []string{"third"},
},
rulerStateMap: rulerStateMapAllActive,
expectedCount: map[string]int{
"user1": 2,
"user2": 1,
Expand All @@ -517,6 +541,7 @@ func TestGetRules(t *testing.T) {
sharding: true,
shuffleShardSize: 2,
shardingStrategy: util.ShardingStrategyShuffle,
rulerStateMap: rulerStateMapAllActive,
rulesRequest: RulesRequest{
RuleGroupNames: []string{"second", "third"},
Type: recordingRuleFilter,
Expand All @@ -531,6 +556,7 @@ func TestGetRules(t *testing.T) {
sharding: true,
shuffleShardSize: 2,
shardingStrategy: util.ShardingStrategyShuffle,
rulerStateMap: rulerStateMapAllActive,
rulesRequest: RulesRequest{
Type: alertingRuleFilter,
Files: []string{"latency-test"},
Expand All @@ -541,6 +567,30 @@ func TestGetRules(t *testing.T) {
"user3": 1,
},
},
"Shuffle Sharding and ShardSize = 2 with Rule Type Filter and one ruler is in LEAVING state": {
sharding: true,
shuffleShardSize: 2,
shardingStrategy: util.ShardingStrategyShuffle,
rulerStateMap: rulerStateMapOneLeaving,
rulesRequest: RulesRequest{
Type: recordingRuleFilter,
},
expectedCount: map[string]int{
"user1": 3,
"user2": 5,
"user3": 1,
},
},
"Shuffle Sharding and ShardSize = 2 with Rule Type Filter and one ruler is in Pending state": {
sharding: true,
shuffleShardSize: 2,
shardingStrategy: util.ShardingStrategyShuffle,
rulerStateMap: rulerStateMapOnePending,
rulesRequest: RulesRequest{
Type: recordingRuleFilter,
},
expectedError: ring.ErrTooManyUnhealthyInstances,
},
}

for name, tc := range testCases {
Expand Down Expand Up @@ -593,7 +643,7 @@ func TestGetRules(t *testing.T) {
d = ring.NewDesc()
}
for rID, tokens := range allTokensByRuler {
d.AddIngester(rID, rulerAddrMap[rID].lifecycler.GetInstanceAddr(), "", tokens, ring.ACTIVE, time.Now())
d.AddIngester(rID, rulerAddrMap[rID].lifecycler.GetInstanceAddr(), "", tokens, tc.rulerStateMap[rID], time.Now())
}
return d, true, nil
})
Expand All @@ -616,7 +666,12 @@ func TestGetRules(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), u)
forEachRuler(func(_ string, r *Ruler) {
ruleStateDescriptions, err := r.GetRules(ctx, tc.rulesRequest)
require.NoError(t, err)
if tc.expectedError != nil {
require.Error(t, tc.expectedError)
return
} else {
require.NoError(t, err)
}
rct := 0
for _, ruleStateDesc := range ruleStateDescriptions {
rct += len(ruleStateDesc.ActiveRules)
Expand Down

0 comments on commit 30023dc

Please sign in to comment.