Skip to content

Commit

Permalink
Adding Test for getShardedRules call (cortexproject#4449)
Browse files Browse the repository at this point in the history
Signed-off-by: Alan Protasio <[email protected]>
  • Loading branch information
alanprot authored Sep 2, 2021
1 parent 32b1b40 commit 03f926b
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 14 deletions.
27 changes: 25 additions & 2 deletions pkg/ruler/client_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package ruler
import (
"time"

"github.com/grafana/dskit/services"

"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -14,7 +16,26 @@ import (
"github.com/cortexproject/cortex/pkg/util/grpcclient"
)

func newRulerClientPool(clientCfg grpcclient.Config, logger log.Logger, reg prometheus.Registerer) *client.Pool {
// ClientsPool is the interface used to get the client from the pool for a specified address.
type ClientsPool interface {
services.Service
// GetClientFor returns the ruler client for the given address.
GetClientFor(addr string) (RulerClient, error)
}

type rulerClientsPool struct {
*client.Pool
}

func (p *rulerClientsPool) GetClientFor(addr string) (RulerClient, error) {
c, err := p.Pool.GetClientFor(addr)
if err != nil {
return nil, err
}
return c.(RulerClient), nil
}

func newRulerClientPool(clientCfg grpcclient.Config, logger log.Logger, reg prometheus.Registerer) ClientsPool {
// We prefer sane defaults instead of exposing further config options.
poolCfg := client.PoolConfig{
CheckInterval: time.Minute,
Expand All @@ -27,7 +48,9 @@ func newRulerClientPool(clientCfg grpcclient.Config, logger log.Logger, reg prom
Help: "The current number of ruler clients in the pool.",
})

return client.NewPool("ruler", poolCfg, nil, newRulerClientFactory(clientCfg, reg), clientsCount, logger)
return &rulerClientsPool{
client.NewPool("ruler", poolCfg, nil, newRulerClientFactory(clientCfg, reg), clientsCount, logger),
}
}

func newRulerClientFactory(clientCfg grpcclient.Config, reg prometheus.Registerer) client.PoolFactory {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ruler/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestRulerShutdown(t *testing.T) {
config, cleanup := defaultRulerConfig(t, newMockRuleStore(mockRules))
defer cleanup()

r, rcleanup := newRuler(t, config)
r, rcleanup := buildRuler(t, config, nil)
defer rcleanup()

r.cfg.EnableSharding = true
Expand Down Expand Up @@ -59,7 +59,7 @@ func TestRuler_RingLifecyclerShouldAutoForgetUnhealthyInstances(t *testing.T) {
ctx := context.Background()
config, cleanup := defaultRulerConfig(t, newMockRuleStore(mockRules))
defer cleanup()
r, rcleanup := newRuler(t, config)
r, rcleanup := buildRuler(t, config, nil)
defer rcleanup()
r.cfg.EnableSharding = true
r.cfg.Ring.HeartbeatPeriod = 100 * time.Millisecond
Expand Down
13 changes: 8 additions & 5 deletions pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/ring"
ring_client "github.com/cortexproject/cortex/pkg/ring/client"
"github.com/cortexproject/cortex/pkg/ruler/rulespb"
"github.com/cortexproject/cortex/pkg/ruler/rulestore"
"github.com/cortexproject/cortex/pkg/tenant"
Expand Down Expand Up @@ -237,7 +236,7 @@ type Ruler struct {
subservicesWatcher *services.FailureWatcher

// Pool of clients used to connect to other ruler replicas.
clientsPool *ring_client.Pool
clientsPool ClientsPool

ringCheckErrors prometheus.Counter
rulerSync *prometheus.CounterVec
Expand All @@ -250,14 +249,18 @@ type Ruler struct {

// NewRuler creates a new ruler from a distributor and chunk store.
func NewRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, logger log.Logger, ruleStore rulestore.RuleStore, limits RulesLimits) (*Ruler, error) {
return newRuler(cfg, manager, reg, logger, ruleStore, limits, newRulerClientPool(cfg.ClientTLSConfig, logger, reg))
}

func newRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, logger log.Logger, ruleStore rulestore.RuleStore, limits RulesLimits, clientPool ClientsPool) (*Ruler, error) {
ruler := &Ruler{
cfg: cfg,
store: ruleStore,
manager: manager,
registry: reg,
logger: logger,
limits: limits,
clientsPool: newRulerClientPool(cfg.ClientTLSConfig, logger, reg),
clientsPool: clientPool,
allowedTenants: util.NewAllowedTenants(cfg.EnabledTenants, cfg.DisabledTenants),

ringCheckErrors: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Expand Down Expand Up @@ -763,12 +766,12 @@ func (r *Ruler) getShardedRules(ctx context.Context) ([]*GroupStateDesc, error)
err = concurrency.ForEach(ctx, jobs, len(jobs), func(ctx context.Context, job interface{}) error {
addr := job.(string)

grpcClient, err := r.clientsPool.GetClientFor(addr)
rulerClient, err := r.clientsPool.GetClientFor(addr)
if err != nil {
return errors.Wrapf(err, "unable to get client for ruler %s", addr)
}

newGrps, err := grpcClient.(RulerClient).Rules(ctx, &RulesRequest{})
newGrps, err := rulerClient.Rules(ctx, &RulesRequest{})
if err != nil {
return errors.Wrapf(err, "unable to retrieve rules from ruler %s", addr)
}
Expand Down
227 changes: 222 additions & 5 deletions pkg/ruler/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ import (
"testing"
"time"

"go.uber.org/atomic"

"google.golang.org/grpc"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/gorilla/mux"
Expand Down Expand Up @@ -135,7 +139,45 @@ func newManager(t *testing.T, cfg Config) (*DefaultMultiTenantManager, func()) {
return manager, cleanup
}

func newRuler(t *testing.T, cfg Config) (*Ruler, func()) {
type mockRulerClientsPool struct {
ClientsPool
cfg Config
rulerAddrMap map[string]*Ruler
numberOfCalls atomic.Int32
}

type mockRulerClient struct {
ruler *Ruler
numberOfCalls *atomic.Int32
}

func (c *mockRulerClient) Rules(ctx context.Context, in *RulesRequest, _ ...grpc.CallOption) (*RulesResponse, error) {
c.numberOfCalls.Inc()
return c.ruler.Rules(ctx, in)
}

func (p *mockRulerClientsPool) GetClientFor(addr string) (RulerClient, error) {
for _, r := range p.rulerAddrMap {
if r.lifecycler.GetInstanceAddr() == addr {
return &mockRulerClient{
ruler: r,
numberOfCalls: &p.numberOfCalls,
}, nil
}
}

return nil, fmt.Errorf("unable to find ruler for add %s", addr)
}

func newMockClientsPool(cfg Config, logger log.Logger, reg prometheus.Registerer, rulerAddrMap map[string]*Ruler) *mockRulerClientsPool {
return &mockRulerClientsPool{
ClientsPool: newRulerClientPool(cfg.ClientTLSConfig, logger, reg),
cfg: cfg,
rulerAddrMap: rulerAddrMap,
}
}

func buildRuler(t *testing.T, cfg Config, rulerAddrMap map[string]*Ruler) (*Ruler, func()) {
engine, noopQueryable, pusher, logger, overrides, cleanup := testSetup(t, cfg)
storage, err := NewLegacyRuleStore(cfg.StoreConfig, promRules.FileLoader{}, log.NewNopLogger())
require.NoError(t, err)
Expand All @@ -145,21 +187,21 @@ func newRuler(t *testing.T, cfg Config) (*Ruler, func()) {
manager, err := NewDefaultMultiTenantManager(cfg, managerFactory, reg, log.NewNopLogger())
require.NoError(t, err)

ruler, err := NewRuler(
ruler, err := newRuler(
cfg,
manager,
reg,
logger,
storage,
overrides,
newMockClientsPool(cfg, logger, reg, rulerAddrMap),
)
require.NoError(t, err)

return ruler, cleanup
}

func newTestRuler(t *testing.T, cfg Config) (*Ruler, func()) {
ruler, cleanup := newRuler(t, cfg)
ruler, cleanup := buildRuler(t, cfg, nil)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ruler))

// Ensure all rules are loaded before usage
Expand Down Expand Up @@ -252,6 +294,171 @@ func compareRuleGroupDescToStateDesc(t *testing.T, expected *rulespb.RuleGroupDe
}
}

func TestGetRules(t *testing.T) {
// ruler ID -> (user ID -> list of groups).
type expectedRulesMap map[string]map[string]rulespb.RuleGroupList

type testCase struct {
sharding bool
shardingStrategy string
shuffleShardSize int
}

expectedRules := expectedRulesMap{
"ruler1": map[string]rulespb.RuleGroupList{
"user1": {
&rulespb.RuleGroupDesc{User: "user1", Namespace: "namespace", Name: "first", Interval: 10 * time.Second},
&rulespb.RuleGroupDesc{User: "user1", Namespace: "namespace", Name: "second", Interval: 10 * time.Second},
},
"user2": {
&rulespb.RuleGroupDesc{User: "user2", Namespace: "namespace", Name: "third", Interval: 10 * time.Second},
},
},
"ruler2": map[string]rulespb.RuleGroupList{
"user1": {
&rulespb.RuleGroupDesc{User: "user1", Namespace: "namespace", Name: "third", Interval: 10 * time.Second},
},
"user2": {
&rulespb.RuleGroupDesc{User: "user2", Namespace: "namespace", Name: "first", Interval: 10 * time.Second},
&rulespb.RuleGroupDesc{User: "user2", Namespace: "namespace", Name: "second", Interval: 10 * time.Second},
},
},
"ruler3": map[string]rulespb.RuleGroupList{
"user3": {
&rulespb.RuleGroupDesc{User: "user3", Namespace: "namespace", Name: "third", Interval: 10 * time.Second},
},
"user2": {
&rulespb.RuleGroupDesc{User: "user2", Namespace: "namespace", Name: "forth", Interval: 10 * time.Second},
&rulespb.RuleGroupDesc{User: "user2", Namespace: "namespace", Name: "fifty", Interval: 10 * time.Second},
},
},
}

testCases := map[string]testCase{
"No Sharding": {
sharding: false,
},
"Default Sharding": {
sharding: true,
shardingStrategy: util.ShardingStrategyDefault,
},
"Shuffle Sharding and ShardSize = 2": {
sharding: true,
shuffleShardSize: 2,
shardingStrategy: util.ShardingStrategyShuffle,
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
kvStore, cleanUp := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger())
t.Cleanup(func() { assert.NoError(t, cleanUp.Close()) })
allRulesByUser := map[string]rulespb.RuleGroupList{}
allRulesByRuler := map[string]rulespb.RuleGroupList{}
allTokensByRuler := map[string][]uint32{}
rulerAddrMap := map[string]*Ruler{}

createRuler := func(id string) *Ruler {
cfg, cleanUp := defaultRulerConfig(t, newMockRuleStore(allRulesByUser))
t.Cleanup(cleanUp)

cfg.ShardingStrategy = tc.shardingStrategy
cfg.EnableSharding = tc.sharding

cfg.Ring = RingConfig{
InstanceID: id,
InstanceAddr: id,
KVStore: kv.Config{
Mock: kvStore,
},
}

r, cleanUp := buildRuler(t, cfg, rulerAddrMap)
r.limits = ruleLimits{evalDelay: 0, tenantShard: tc.shuffleShardSize}
t.Cleanup(cleanUp)
rulerAddrMap[id] = r
if r.ring != nil {
require.NoError(t, services.StartAndAwaitRunning(context.Background(), r.ring))
t.Cleanup(r.ring.StopAsync)
}
return r
}

for rID, r := range expectedRules {
createRuler(rID)
for user, rules := range r {
allRulesByUser[user] = append(allRulesByUser[user], rules...)
allRulesByRuler[rID] = append(allRulesByRuler[rID], rules...)
allTokensByRuler[rID] = generateTokenForGroups(rules, 1)
}
}

if tc.sharding {
err := kvStore.CAS(context.Background(), ring.RulerRingKey, func(in interface{}) (out interface{}, retry bool, err error) {
d, _ := in.(*ring.Desc)
if d == nil {
d = ring.NewDesc()
}
for rID, tokens := range allTokensByRuler {
d.AddIngester(rID, rulerAddrMap[rID].lifecycler.GetInstanceAddr(), "", tokens, ring.ACTIVE, time.Now())
}
return d, true, nil
})
require.NoError(t, err)
// Wait a bit to make sure ruler's ring is updated.
time.Sleep(100 * time.Millisecond)
}

forEachRuler := func(f func(rID string, r *Ruler)) {
for rID, r := range rulerAddrMap {
f(rID, r)
}
}

// Sync Rules
forEachRuler(func(_ string, r *Ruler) {
r.syncRules(context.Background(), rulerSyncReasonInitial)
})

for u := range allRulesByUser {
ctx := user.InjectOrgID(context.Background(), u)
forEachRuler(func(_ string, r *Ruler) {
rules, err := r.GetRules(ctx)
require.NoError(t, err)
require.Equal(t, len(allRulesByUser[u]), len(rules))
if tc.sharding {
mockPoolLClient := r.clientsPool.(*mockRulerClientsPool)

// Right now we are calling all rules even with shuffle sharding
require.Equal(t, int32(len(rulerAddrMap)), mockPoolLClient.numberOfCalls.Load())
mockPoolLClient.numberOfCalls.Store(0)
}
})
}

totalLoadedRules := 0
totalConfiguredRules := 0

forEachRuler(func(rID string, r *Ruler) {
localRules, err := r.listRules(context.Background())
require.NoError(t, err)
for _, rules := range localRules {
totalLoadedRules += len(rules)
}
totalConfiguredRules += len(allRulesByRuler[rID])
})

if tc.sharding {
require.Equal(t, totalConfiguredRules, totalLoadedRules)
} else {
// Not sharding means that all rules will be loaded on all rulers
numberOfRulers := len(rulerAddrMap)
require.Equal(t, totalConfiguredRules*numberOfRulers, totalLoadedRules)
}
})
}
}

func TestSharding(t *testing.T) {
const (
user1 = "user1"
Expand Down Expand Up @@ -666,7 +873,7 @@ func TestSharding(t *testing.T) {
DisabledTenants: tc.disabledUsers,
}

r, cleanup := newRuler(t, cfg)
r, cleanup := buildRuler(t, cfg, nil)
r.limits = ruleLimits{evalDelay: 0, tenantShard: tc.shuffleShardSize}
t.Cleanup(cleanup)

Expand Down Expand Up @@ -814,6 +1021,16 @@ func TestDeleteTenantRuleGroups(t *testing.T) {
}
}

func generateTokenForGroups(groups []*rulespb.RuleGroupDesc, offset uint32) []uint32 {
var tokens []uint32

for _, g := range groups {
tokens = append(tokens, tokenForGroup(g)+offset)
}

return tokens
}

func callDeleteTenantAPI(t *testing.T, api *Ruler, userID string) {
ctx := user.InjectOrgID(context.Background(), userID)

Expand Down
Loading

0 comments on commit 03f926b

Please sign in to comment.