Skip to content

Commit

Permalink
Split ReadRing.GetAll() into two functions (cortexproject#3460)
Browse files Browse the repository at this point in the history
* Split ReadRing.GetAll() into two functions

Signed-off-by: Marco Pracucci <[email protected]>

* Addressed review comments

Signed-off-by: Marco Pracucci <[email protected]>

* Added unit tests

Signed-off-by: Marco Pracucci <[email protected]>
  • Loading branch information
pracucci authored Nov 4, 2020
1 parent 55312fe commit e69d628
Show file tree
Hide file tree
Showing 10 changed files with 252 additions and 40 deletions.
2 changes: 1 addition & 1 deletion pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM
for _, c := range compactors {
cortex_testutil.Poll(t, 10*time.Second, len(compactors), func() interface{} {
// it is safe to access c.ring here, since we know that all compactors are Running now
rs, err := c.ring.GetAll(ring.Compactor)
rs, err := c.ring.GetAllHealthy(ring.Compactor)
if err != nil {
return 0
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@ func (d *Distributor) AllUserStats(ctx context.Context) ([]UserIDStats, error) {
req := &client.UserStatsRequest{}
ctx = user.InjectOrgID(ctx, "1") // fake: ingester insists on having an org ID
// Not using d.ForReplicationSet(), so we can fail after first error.
replicationSet, err := d.ingestersRing.GetAll(ring.Read)
replicationSet, err := d.ingestersRing.GetAllHealthy(ring.Read)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (d *Distributor) GetIngestersForQuery(ctx context.Context, matchers ...*lab
lookbackPeriod := d.cfg.ShuffleShardingLookbackPeriod

if shardSize > 0 && lookbackPeriod > 0 {
return d.ingestersRing.ShuffleShardWithLookback(userID, shardSize, lookbackPeriod, time.Now()).GetAll(ring.Read)
return d.ingestersRing.ShuffleShardWithLookback(userID, shardSize, lookbackPeriod, time.Now()).GetReplicationSetForOperation(ring.Read)
}
}

Expand All @@ -101,7 +101,7 @@ func (d *Distributor) GetIngestersForQuery(ctx context.Context, matchers ...*lab
}
}

return d.ingestersRing.GetAll(ring.Read)
return d.ingestersRing.GetReplicationSetForOperation(ring.Read)
}

// GetIngestersForMetadata returns a replication set including all ingesters that should be queried
Expand All @@ -119,11 +119,11 @@ func (d *Distributor) GetIngestersForMetadata(ctx context.Context) (ring.Replica
lookbackPeriod := d.cfg.ShuffleShardingLookbackPeriod

if shardSize > 0 && lookbackPeriod > 0 {
return d.ingestersRing.ShuffleShardWithLookback(userID, shardSize, lookbackPeriod, time.Now()).GetAll(ring.Read)
return d.ingestersRing.ShuffleShardWithLookback(userID, shardSize, lookbackPeriod, time.Now()).GetReplicationSetForOperation(ring.Read)
}
}

return d.ingestersRing.GetAll(ring.Read)
return d.ingestersRing.GetReplicationSetForOperation(ring.Read)
}

// queryIngesters queries the ingesters via the older, sample-based API.
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/blocks_store_replicated_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) {

// Wait until the ring client has initialised the state.
test.Poll(t, time.Second, true, func() interface{} {
all, err := r.GetAll(ring.Read)
all, err := r.GetAllHealthy(ring.Read)
return err == nil && len(all.Ingesters) > 0
})

Expand Down
2 changes: 1 addition & 1 deletion pkg/ring/client/ring_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

func NewRingServiceDiscovery(r ring.ReadRing) PoolServiceDiscovery {
return func() ([]string, error) {
replicationSet, err := r.GetAll(ring.Read)
replicationSet, err := r.GetAllHealthy(ring.Reporting)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ring/client/ring_service_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,6 @@ type mockReadRing struct {
mockedErr error
}

func (m *mockReadRing) GetAll(_ ring.Operation) (ring.ReplicationSet, error) {
func (m *mockReadRing) GetAllHealthy(_ ring.Operation) (ring.ReplicationSet, error) {
return m.mockedReplicationSet, m.mockedErr
}
45 changes: 41 additions & 4 deletions pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,18 @@ type ReadRing interface {
// buf is a slice to be overwritten for the return value
// to avoid memory allocation; can be nil.
Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet, error)
GetAll(op Operation) (ReplicationSet, error)

// GetAllHealthy returns all healthy instances in the ring, for the given operation.
// This function doesn't check if the quorum is honored, so doesn't fail if the number
// of unhealthy ingesters is greater than the tolerated max unavailable.
GetAllHealthy(op Operation) (ReplicationSet, error)

// GetReplicationSetForOperation returns all instances where the input operation should be executed.
// The resulting ReplicationSet doesn't necessarily contains all healthy instances
// in the ring, but could contain the minimum set of instances required to execute
// the input operation.
GetReplicationSetForOperation(op Operation) (ReplicationSet, error)

ReplicationFactor() int
IngesterCount() int

Expand Down Expand Up @@ -89,6 +100,10 @@ var (
// ErrInstanceNotFound is the error returned when trying to get information for an instance
// not registered within the ring.
ErrInstanceNotFound = errors.New("instance not found in the ring")

// ErrTooManyFailedIngesters is the error returned when there are too many failed ingesters for a
// specific operation.
ErrTooManyFailedIngesters = errors.New("too many failed ingesters")
)

// Config for a Ring
Expand Down Expand Up @@ -317,8 +332,30 @@ func (r *Ring) Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet
}, nil
}

// GetAll returns all available ingesters in the ring.
func (r *Ring) GetAll(op Operation) (ReplicationSet, error) {
// GetAllHealthy implements ReadRing.
func (r *Ring) GetAllHealthy(op Operation) (ReplicationSet, error) {
r.mtx.RLock()
defer r.mtx.RUnlock()

if r.ringDesc == nil || len(r.ringDesc.Ingesters) == 0 {
return ReplicationSet{}, ErrEmptyRing
}

ingesters := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters))
for _, ingester := range r.ringDesc.Ingesters {
if r.IsHealthy(&ingester, op) {
ingesters = append(ingesters, ingester)
}
}

return ReplicationSet{
Ingesters: ingesters,
MaxErrors: 0,
}, nil
}

// GetReplicationSetForOperation implements ReadRing.
func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, error) {
r.mtx.RLock()
defer r.mtx.RUnlock()

Expand All @@ -343,7 +380,7 @@ func (r *Ring) GetAll(op Operation) (ReplicationSet, error) {
}

if len(ingesters) < numRequired {
return ReplicationSet{}, fmt.Errorf("too many failed ingesters")
return ReplicationSet{}, ErrTooManyFailedIngesters
}

return ReplicationSet{
Expand Down
Loading

0 comments on commit e69d628

Please sign in to comment.