Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GetWithRF() method to DynamicReplicationReadRing interface #304

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
* [CHANGE] Cache: Remove the `context.Context` argument from the `Cache.Store` method and rename the method to `Cache.StoreAsync`. #273
* [FEATURE] Cache: Add support for configuring a Redis cache backend. #268 #271 #276
* [FEATURE] Add support for waiting on the rate limiter using the new `WaitN` method. #279
* [ENHANCEMENT] Add `GetWithRF()` function to DynamicReplicationReadRing interface. #304
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feature or Enhancement?

* [ENHANCEMENT] Add configuration to customize backoff for the gRPC clients.
* [ENHANCEMENT] Use `SecretReader` interface to fetch secrets when configuring TLS. #274
* [ENHANCEMENT] Add middleware package. #38
Expand Down
21 changes: 20 additions & 1 deletion ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ type ReadRing interface {
CleanupShuffleShardCache(identifier string)
}

// DynamicReplicationReadRing extends the ReadRing with an additional function to get a
// ReplicationSet based on a provided replication factor.
type DynamicReplicationReadRing interface {
ReadRing
GetWithRF(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string, rf int) (ReplicationSet, error)
}

var (
// Write operation that also extends replica set, if instance state is not ACTIVE.
Write = NewOp([]InstanceState{ACTIVE}, func(s InstanceState) bool {
Expand Down Expand Up @@ -347,14 +354,26 @@ func (r *Ring) updateRingState(ringDesc *Desc) {

// Get returns n (or more) instances which form the replicas for the given key.
func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string) (ReplicationSet, error) {
return r.get(key, op, bufDescs, bufHosts, bufZones, r.cfg.ReplicationFactor)
}

// GetWithRF returns n (or more) instances which form the replicas for the
// given key and given replication factor.
// If you have zone-aware replication enabled the current GetWithRF()
// implementation doesn't work with a RF > number of zones.
func (r *Ring) GetWithRF(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string, rf int) (ReplicationSet, error) {
return r.get(key, op, bufDescs, bufHosts, bufZones, rf)
}

// get returns n (or more) instances which form the replicas for the given key.
func (r *Ring) get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string, n int) (ReplicationSet, error) {
r.mtx.RLock()
defer r.mtx.RUnlock()
if r.ringDesc == nil || len(r.ringTokens) == 0 {
return ReplicationSet{}, ErrEmptyRing
}

var (
n = r.cfg.ReplicationFactor
instances = bufDescs[:0]
start = searchToken(r.ringTokens, key)
iterations = 0
Expand Down
30 changes: 30 additions & 0 deletions ring/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2561,6 +2561,36 @@ func BenchmarkRing_Get(b *testing.B) {
}
}

func TestRing_GetWithRF(t *testing.T) {
ringDesc := &Desc{Ingesters: generateRingInstances(16, 1, 128)}
ring := Ring{
cfg: Config{
HeartbeatTimeout: time.Hour,
ZoneAwarenessEnabled: false,
SubringCacheDisabled: true,
ReplicationFactor: 3,
},
ringDesc: ringDesc,
ringTokens: ringDesc.GetTokens(),
ringTokensByZone: ringDesc.getTokensByZone(),
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
shuffledSubringCache: map[subringCacheKey]*Ring{},
strategy: NewDefaultReplicationStrategy(),
lastTopologyChange: time.Now(),
}

r := rand.New(rand.NewSource(time.Now().UnixNano()))

// test a dynamic RF >= cfg.RF and dynamic RF <= max instances
for i := 3; i <= 16; i++ {
buf, bufHosts, bufZones := MakeBuffersForGet()
rs, err := ring.GetWithRF(r.Uint32(), Write, buf, bufHosts, bufZones, i)
assert.Equal(t, i, len(rs.Instances))
assert.NoError(t, err)
}
}

func TestRing_Get_NoMemoryAllocations(t *testing.T) {
// Initialise the ring.
ringDesc := &Desc{Ingesters: generateRingInstances(3, 3, 128)}
Expand Down