From b9c0737a3eb7162d5cb8e1b5c58bed7b4de981bd Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Wed, 31 May 2023 08:21:45 +0200 Subject: [PATCH 1/2] Add GetWithRF() method to DynamicReplicationReadRing interface The newly created DynamicReplicationReadRing interfaces extends the ReadRing interface with the new function that allows for getting instances of the ring based on the provided replication factor, where in contrast the existing Get() function used the globally configured replication factor. The new function allows for example for per-tenant replication factors. Signed-off-by: Christian Haudum --- ring/ring.go | 21 ++++++++++++++++++++- ring/ring_test.go | 30 ++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/ring/ring.go b/ring/ring.go index 9cc4e757c..afeb3488b 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -78,6 +78,13 @@ type ReadRing interface { CleanupShuffleShardCache(identifier string) } +// DynamicReplicationReadRing extends the ReadRing with an additional function to get a +// ReplicationSet based on a provided replication factor. +type DynamicReplicationReadRing interface { + ReadRing + GetWithRF(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string, rf int) (ReplicationSet, error) +} + var ( // Write operation that also extends replica set, if instance state is not ACTIVE. Write = NewOp([]InstanceState{ACTIVE}, func(s InstanceState) bool { @@ -347,6 +354,19 @@ func (r *Ring) updateRingState(ringDesc *Desc) { // Get returns n (or more) instances which form the replicas for the given key. func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string) (ReplicationSet, error) { + return r.get(key, op, bufDescs, bufHosts, bufZones, r.cfg.ReplicationFactor) +} + +// GetWithRF returns n (or more) instances which form the replicas for the +// given key and given replication factor. +// If you have zone-aware replication enabled the current GetWithRF() +// implementation doesn't work with a RF > number of zones. +func (r *Ring) GetWithRF(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string, rf int) (ReplicationSet, error) { + return r.get(key, op, bufDescs, bufHosts, bufZones, rf) +} + +// get returns n (or more) instances which form the replicas for the given key. +func (r *Ring) get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string, n int) (ReplicationSet, error) { r.mtx.RLock() defer r.mtx.RUnlock() if r.ringDesc == nil || len(r.ringTokens) == 0 { @@ -354,7 +374,6 @@ func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, } var ( - n = r.cfg.ReplicationFactor instances = bufDescs[:0] start = searchToken(r.ringTokens, key) iterations = 0 diff --git a/ring/ring_test.go b/ring/ring_test.go index d686fd7c2..af5cfc4af 100644 --- a/ring/ring_test.go +++ b/ring/ring_test.go @@ -2561,6 +2561,36 @@ func BenchmarkRing_Get(b *testing.B) { } } +func TestRing_GetWithRF(t *testing.T) { + ringDesc := &Desc{Ingesters: generateRingInstances(16, 1, 128)} + ring := Ring{ + cfg: Config{ + HeartbeatTimeout: time.Hour, + ZoneAwarenessEnabled: false, + SubringCacheDisabled: true, + ReplicationFactor: 3, + }, + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + shuffledSubringCache: map[subringCacheKey]*Ring{}, + strategy: NewDefaultReplicationStrategy(), + lastTopologyChange: time.Now(), + } + + r := rand.New(rand.NewSource(time.Now().UnixNano())) + + // test a dynamic RF >= cfg.RF and dynamic RF <= max instances + for i := 3; i <= 16; i++ { + buf, bufHosts, bufZones := MakeBuffersForGet() + rs, err := ring.GetWithRF(r.Uint32(), Write, buf, bufHosts, bufZones, i) + assert.Equal(t, i, len(rs.Instances)) + assert.NoError(t, err) + } +} + func TestRing_Get_NoMemoryAllocations(t *testing.T) { // Initialise the ring. ringDesc := &Desc{Ingesters: generateRingInstances(3, 3, 128)} From 9bd0e035aecdce4a1f03d39b7858da5d55b9a293 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Wed, 31 May 2023 08:35:21 +0200 Subject: [PATCH 2/2] Add changelog entry Signed-off-by: Christian Haudum --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 683d4c851..f15af0218 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -46,6 +46,7 @@ * [CHANGE] Cache: Remove the `context.Context` argument from the `Cache.Store` method and rename the method to `Cache.StoreAsync`. #273 * [FEATURE] Cache: Add support for configuring a Redis cache backend. #268 #271 #276 * [FEATURE] Add support for waiting on the rate limiter using the new `WaitN` method. #279 +* [ENHANCEMENT] Add `GetWithRF()` function to DynamicReplicationReadRing interface. #304 * [ENHANCEMENT] Add configuration to customize backoff for the gRPC clients. * [ENHANCEMENT] Use `SecretReader` interface to fetch secrets when configuring TLS. #274 * [ENHANCEMENT] Add middleware package. #38