From 075bb3695a966f0480c4edd16c47c2a473b28a80 Mon Sep 17 00:00:00 2001 From: Nick Pillitteri Date: Thu, 14 Nov 2024 12:05:36 -0500 Subject: [PATCH] ring: add GetWithOptions method to adjust per call behavior This change adds a new method that accepts 0 or more `Option` instances that modify the behavior of the call. These options can (currently) be used to adjust the replication factor for a particular key or use buffers to avoid excessive allocation. Part of grafana/mimir#9944 --- ring/ring.go | 59 +++++++++++++++++++++++++++++++++++++++++++---- ring/util_test.go | 5 ++++ 2 files changed, 59 insertions(+), 5 deletions(-) diff --git a/ring/ring.go b/ring/ring.go index d47eb8fe2..866d7067f 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -34,21 +34,58 @@ const ( GetBufferSize = 5 ) +type Options struct { + ReplicationFactor int + BufDescs []InstanceDesc + BufHosts []string + BufZones []string +} + +type Option func(opts *Options) + +func WithBuffers(bufDescs []InstanceDesc, bufHosts, bufZones []string) Option { + return func(opts *Options) { + opts.BufDescs = bufDescs + opts.BufHosts = bufHosts + opts.BufZones = bufZones + } +} + +func WithReplicationFactor(replication int) Option { + return func(opts *Options) { + opts.ReplicationFactor = replication + } +} + +func collectOptions(opts ...Option) *Options { + final := &Options{} + for _, opt := range opts { + opt(final) + } + return final +} + // ReadRing represents the read interface to the ring. // Support for read-only instances requires use of ShuffleShard or ShuffleShardWithLookback prior to getting a ReplicationSet. type ReadRing interface { // Get returns n (or more) instances which form the replicas for the given key. + // // bufDescs, bufHosts and bufZones are slices to be overwritten for the return value // to avoid memory allocation; can be nil, or created with ring.MakeBuffersForGet(). Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string) (ReplicationSet, error) + // GetWithOptions returns n (or more) instances which form the replicas for the given key + // with 0 or more options to change the behavior of the method call. This method is a superset + // of the functionality of the Get method. + GetWithOptions(key uint32, op Operation, opts ...Option) (ReplicationSet, error) + // GetAllHealthy returns all healthy instances in the ring, for the given operation. // This function doesn't check if the quorum is honored, so doesn't fail if the number // of unhealthy instances is greater than the tolerated max unavailable. GetAllHealthy(op Operation) (ReplicationSet, error) // GetReplicationSetForOperation returns all instances where the input operation should be executed. - // The resulting ReplicationSet doesn't necessarily contains all healthy instances + // The resulting ReplicationSet doesn't necessarily contain all healthy instances // in the ring, but could contain the minimum set of instances required to execute // the input operation. GetReplicationSetForOperation(op Operation) (ReplicationSet, error) @@ -422,13 +459,21 @@ func (r *Ring) setRingStateFromDesc(ringDesc *Desc, updateMetrics, updateRegiste // Get returns n (or more) instances which form the replicas for the given key. func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string) (ReplicationSet, error) { + return r.GetWithOptions(key, op, WithBuffers(bufDescs, bufHosts, bufZones)) +} + +// GetWithOptions returns n (or more) instances which form the replicas for the given key +// with 0 or more options to change the behavior of the method call. +func (r *Ring) GetWithOptions(key uint32, op Operation, opts ...Option) (ReplicationSet, error) { + options := collectOptions(opts...) + r.mtx.RLock() defer r.mtx.RUnlock() if r.ringDesc == nil || len(r.ringTokens) == 0 { return ReplicationSet{}, ErrEmptyRing } - instances, err := r.findInstancesForKey(key, op, bufDescs, bufHosts, bufZones, nil) + instances, err := r.findInstancesForKey(key, op, options.BufDescs, options.BufHosts, options.BufZones, options.ReplicationFactor, nil) if err != nil { return ReplicationSet{}, err } @@ -447,9 +492,13 @@ func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, // Returns instances for given key and operation. Instances are not filtered through ReplicationStrategy. // InstanceFilter can ignore uninteresting instances that would otherwise be part of the output, and can also stop search early. // This function needs to be called with read lock on the ring. -func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts []string, bufZones []string, instanceFilter func(instanceID string) (include, keepGoing bool)) ([]InstanceDesc, error) { +func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts []string, bufZones []string, replicationFactor int, instanceFilter func(instanceID string) (include, keepGoing bool)) ([]InstanceDesc, error) { + if replicationFactor <= 0 || replicationFactor < r.cfg.ReplicationFactor { + replicationFactor = r.cfg.ReplicationFactor + } + var ( - n = r.cfg.ReplicationFactor + n = replicationFactor instances = bufDescs[:0] start = searchToken(r.ringTokens, key) iterations = 0 @@ -1349,7 +1398,7 @@ func (r *Ring) numberOfKeysOwnedByInstance(keys []uint32, op Operation, instance owned := 0 for _, tok := range keys { - i, err := r.findInstancesForKey(tok, op, bufDescs, bufHosts, bufZones, func(foundInstanceID string) (include, keepGoing bool) { + i, err := r.findInstancesForKey(tok, op, bufDescs, bufHosts, bufZones, 0, func(foundInstanceID string) (include, keepGoing bool) { if foundInstanceID == instanceID { // If we've found our instance, we can stop. return true, false diff --git a/ring/util_test.go b/ring/util_test.go index ec52c5077..9af58628b 100644 --- a/ring/util_test.go +++ b/ring/util_test.go @@ -28,6 +28,11 @@ func (r *RingMock) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHos return args.Get(0).(ReplicationSet), args.Error(1) } +func (r *RingMock) GetWithOptions(key uint32, op Operation, opts ...Option) (ReplicationSet, error) { + args := r.Called(key, op, opts) + return args.Get(0).(ReplicationSet), args.Error(1) +} + func (r *RingMock) GetAllHealthy(op Operation) (ReplicationSet, error) { args := r.Called(op) return args.Get(0).(ReplicationSet), args.Error(1)