diff --git a/pkg/cacheutil/memcached_client.go b/pkg/cacheutil/memcached_client.go index 29caaed02b1..6589ea89d31 100644 --- a/pkg/cacheutil/memcached_client.go +++ b/pkg/cacheutil/memcached_client.go @@ -17,6 +17,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/sony/gobreaker" "gopkg.in/yaml.v2" "github.com/thanos-io/thanos/pkg/discovery/dns" @@ -40,9 +41,11 @@ const ( ) var ( - errMemcachedConfigNoAddrs = errors.New("no memcached addresses provided") - errMemcachedDNSUpdateIntervalNotPositive = errors.New("DNS provider update interval must be positive") - errMemcachedMaxAsyncConcurrencyNotPositive = errors.New("max async concurrency must be positive") + errMemcachedConfigNoAddrs = errors.New("no memcached addresses provided") + errMemcachedDNSUpdateIntervalNotPositive = errors.New("DNS provider update interval must be positive") + errMemcachedMaxAsyncConcurrencyNotPositive = errors.New("max async concurrency must be positive") + errCircuitBreakerConsecutiveFailuresNotPositive = errors.New("set async circuit breaker: consecutive failures must be greater than 0") + errCircuitBreakerFailurePercentInvalid = errors.New("set async circuit breaker: failure percent must be in range (0,1]") defaultMemcachedClientConfig = MemcachedClientConfig{ Timeout: 500 * time.Millisecond, @@ -54,6 +57,11 @@ var ( MaxGetMultiBatchSize: 0, DNSProviderUpdateInterval: 10 * time.Second, AutoDiscovery: false, + SetAsyncCircuitBreakerHalfOpenMaxRequests: 10, + SetAsyncCircuitBreakerOpenDuration: 5 * time.Second, + SetAsyncCircuitBreakerMinRequests: 50, + SetAsyncCircuitBreakerConsecutiveFailures: 5, + SetAsyncCircuitBreakerFailurePercent: 0.05, } ) @@ -141,6 +149,20 @@ type MemcachedClientConfig struct { // AutoDiscovery configures memached client to perform auto-discovery instead of DNS resolution AutoDiscovery bool `yaml:"auto_discovery"` + + // SetAsyncCircuitBreakerHalfOpenMaxRequests is the maximum number of requests allowed to pass through + // when the circuit breaker is half-open. + // If set to 0, the circuit breaker allows only 1 request. + SetAsyncCircuitBreakerHalfOpenMaxRequests uint32 `yaml:"set_async_circuit_breaker_half_open_max_requests"` + // SetAsyncCircuitBreakerOpenDuration is the period of the open state after which the state of the circuit breaker becomes half-open. + // If set to 0, the circuit breaker resets it to 60 seconds. + SetAsyncCircuitBreakerOpenDuration time.Duration `yaml:"set_async_circuit_breaker_open_duration"` + // SetAsyncCircuitBreakerMinRequests is minimal requests to trigger the circuit breaker. + SetAsyncCircuitBreakerMinRequests uint32 `yaml:"set_async_circuit_breaker_min_requests"` + // SetAsyncCircuitBreakerConsecutiveFailures represents consecutive failures based on CircuitBreakerMinRequests to determine if the circuit breaker should open. + SetAsyncCircuitBreakerConsecutiveFailures uint32 `yaml:"set_async_circuit_breaker_consecutive_failures"` + // SetAsyncCircuitBreakerFailurePercent represents the failure percentage, which is based on CircuitBreakerMinRequests, to determine if the circuit breaker should open. + SetAsyncCircuitBreakerFailurePercent float64 `yaml:"set_async_circuit_breaker_failure_percent"` } func (c *MemcachedClientConfig) validate() error { @@ -158,6 +180,12 @@ func (c *MemcachedClientConfig) validate() error { return errMemcachedMaxAsyncConcurrencyNotPositive } + if c.SetAsyncCircuitBreakerConsecutiveFailures == 0 { + return errCircuitBreakerConsecutiveFailuresNotPositive + } + if c.SetAsyncCircuitBreakerFailurePercent <= 0 || c.SetAsyncCircuitBreakerFailurePercent > 1 { + return errCircuitBreakerFailurePercentInvalid + } return nil } @@ -195,6 +223,8 @@ type memcachedClient struct { dataSize *prometheus.HistogramVec p *AsyncOperationProcessor + + setAsyncCircuitBreaker *gobreaker.CircuitBreaker } // AddressProvider performs node address resolution given a list of clusters. @@ -278,6 +308,17 @@ func newMemcachedClient( gate.Gets, ), p: NewAsyncOperationProcessor(config.MaxAsyncBufferSize, config.MaxAsyncConcurrency), + setAsyncCircuitBreaker: gobreaker.NewCircuitBreaker(gobreaker.Settings{ + Name: "memcached-set-async", + MaxRequests: config.SetAsyncCircuitBreakerHalfOpenMaxRequests, + Interval: 10 * time.Second, + Timeout: config.SetAsyncCircuitBreakerOpenDuration, + ReadyToTrip: func(counts gobreaker.Counts) bool { + return counts.Requests >= config.SetAsyncCircuitBreakerMinRequests && + (counts.ConsecutiveFailures >= uint32(config.SetAsyncCircuitBreakerConsecutiveFailures) || + float64(counts.TotalFailures)/float64(counts.Requests) >= config.SetAsyncCircuitBreakerFailurePercent) + }, + }), } c.clientInfo = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{ @@ -375,22 +416,31 @@ func (c *memcachedClient) SetAsync(key string, value []byte, ttl time.Duration) start := time.Now() c.operations.WithLabelValues(opSet).Inc() - err := c.client.Set(&memcache.Item{ - Key: key, - Value: value, - Expiration: int32(time.Now().Add(ttl).Unix()), + _, err := c.setAsyncCircuitBreaker.Execute(func() (any, error) { + return nil, c.client.Set(&memcache.Item{ + Key: key, + Value: value, + Expiration: int32(time.Now().Add(ttl).Unix()), + }) }) if err != nil { - // If the PickServer will fail for any reason the server address will be nil - // and so missing in the logs. We're OK with that (it's a best effort). - serverAddr, _ := c.selector.PickServer(key) - level.Debug(c.logger).Log( - "msg", "failed to store item to memcached", - "key", key, - "sizeBytes", len(value), - "server", serverAddr, - "err", err, - ) + if errors.Is(err, gobreaker.ErrOpenState) || errors.Is(err, gobreaker.ErrTooManyRequests) { + level.Warn(c.logger).Log( + "msg", "circuit breaker disallows storing item in memcached", + "key", key, + "err", err) + } else { + // If the PickServer will fail for any reason the server address will be nil + // and so missing in the logs. We're OK with that (it's a best effort). + serverAddr, _ := c.selector.PickServer(key) + level.Debug(c.logger).Log( + "msg", "failed to store item to memcached", + "key", key, + "sizeBytes", len(value), + "server", serverAddr, + "err", err, + ) + } c.trackError(opSet, err) return } diff --git a/pkg/cacheutil/memcached_client_test.go b/pkg/cacheutil/memcached_client_test.go index 4aa355deaa2..7b0571c32e3 100644 --- a/pkg/cacheutil/memcached_client_test.go +++ b/pkg/cacheutil/memcached_client_test.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "net" + "strconv" "sync" "testing" "time" @@ -16,9 +17,11 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/sony/gobreaker" "go.uber.org/atomic" "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/gate" "github.com/thanos-io/thanos/pkg/model" ) @@ -33,6 +36,8 @@ func TestMemcachedClientConfig_validate(t *testing.T) { Addresses: []string{"127.0.0.1:11211"}, MaxAsyncConcurrency: 1, DNSProviderUpdateInterval: time.Second, + SetAsyncCircuitBreakerConsecutiveFailures: 1, + SetAsyncCircuitBreakerFailurePercent: 0.5, }, expected: nil, }, @@ -41,6 +46,8 @@ func TestMemcachedClientConfig_validate(t *testing.T) { Addresses: []string{}, MaxAsyncConcurrency: 1, DNSProviderUpdateInterval: time.Second, + SetAsyncCircuitBreakerConsecutiveFailures: 1, + SetAsyncCircuitBreakerFailurePercent: 0.5, }, expected: errMemcachedConfigNoAddrs, }, @@ -49,6 +56,8 @@ func TestMemcachedClientConfig_validate(t *testing.T) { Addresses: []string{"127.0.0.1:11211"}, MaxAsyncConcurrency: 0, DNSProviderUpdateInterval: time.Second, + SetAsyncCircuitBreakerConsecutiveFailures: 1, + SetAsyncCircuitBreakerFailurePercent: 0.5, }, expected: errMemcachedMaxAsyncConcurrencyNotPositive, }, @@ -56,9 +65,40 @@ func TestMemcachedClientConfig_validate(t *testing.T) { config: MemcachedClientConfig{ Addresses: []string{"127.0.0.1:11211"}, MaxAsyncConcurrency: 1, + SetAsyncCircuitBreakerConsecutiveFailures: 1, + SetAsyncCircuitBreakerFailurePercent: 0.5, }, expected: errMemcachedDNSUpdateIntervalNotPositive, }, + "should fail on circuit_breaker_consecutive_failures = 0": { + config: MemcachedClientConfig{ + Addresses: []string{"127.0.0.1:11211"}, + MaxAsyncConcurrency: 1, + DNSProviderUpdateInterval: time.Second, + SetAsyncCircuitBreakerConsecutiveFailures: 0, + }, + expected: errCircuitBreakerConsecutiveFailuresNotPositive, + }, + "should fail on circuit_breaker_failure_percent <= 0": { + config: MemcachedClientConfig{ + Addresses: []string{"127.0.0.1:11211"}, + MaxAsyncConcurrency: 1, + DNSProviderUpdateInterval: time.Second, + SetAsyncCircuitBreakerConsecutiveFailures: 1, + SetAsyncCircuitBreakerFailurePercent: 0, + }, + expected: errCircuitBreakerFailurePercentInvalid, + }, + "should fail on circuit_breaker_failure_percent >= 1": { + config: MemcachedClientConfig{ + Addresses: []string{"127.0.0.1:11211"}, + MaxAsyncConcurrency: 1, + DNSProviderUpdateInterval: time.Second, + SetAsyncCircuitBreakerConsecutiveFailures: 1, + SetAsyncCircuitBreakerFailurePercent: 1.1, + }, + expected: errCircuitBreakerFailurePercentInvalid, + }, } for testName, testData := range tests { @@ -491,6 +531,9 @@ type memcachedClientBackendMock struct { items map[string]*memcache.Item getMultiCount int getMultiErrors int + + setCount int + setErrors int } func newMemcachedClientBackendMock() *memcachedClientBackendMock { @@ -522,17 +565,30 @@ func (c *memcachedClientBackendMock) Set(item *memcache.Item) error { c.lock.Lock() defer c.lock.Unlock() + c.setCount++ + if c.setCount <= c.setErrors { + return errors.New("mocked Set error") + } + c.items[item.Key] = item return nil } +func (c *memcachedClientBackendMock) waitSetCount(expected int) error { + return c.waitFor(expected, "the number of set operations", func() int { return c.setCount }) +} + func (c *memcachedClientBackendMock) waitItems(expected int) error { + return c.waitFor(expected, "items", func() int { return len(c.items) }) +} + +func (c *memcachedClientBackendMock) waitFor(expected int, name string, valueFunc func() int) error { deadline := time.Now().Add(1 * time.Second) for time.Now().Before(deadline) { c.lock.Lock() - count := len(c.items) + count := valueFunc() c.lock.Unlock() if count >= expected { @@ -540,7 +596,7 @@ func (c *memcachedClientBackendMock) waitItems(expected int) error { } } - return errors.New("timeout expired while waiting for items in the memcached mock") + return fmt.Errorf("timeout expired while waiting for %s in the memcached mock", name) } // countingGate implements gate.Gate and counts the number of times Start is called. @@ -630,3 +686,73 @@ func (c *memcachedClientBlockingMock) GetMulti([]string) (map[string]*memcache.I func (c *memcachedClientBlockingMock) Set(*memcache.Item) error { return nil } + +func TestMemcachedClient_SetAsync_CircuitBreaker(t *testing.T) { + for _, testdata := range []struct { + name string + setErrors int + minRequests uint32 + consecutiveFailures uint32 + failurePercent float64 + expectCircuitBreakerOpen bool + }{ + { + name: "remains closed due to min requests not satisfied", + setErrors: 10, + minRequests: 100, + consecutiveFailures: 1, + failurePercent: 0.00001, + expectCircuitBreakerOpen: false, + }, + { + name: "opened because too many consecutive failures", + setErrors: 10, + minRequests: 10, + consecutiveFailures: 10, + failurePercent: 1, + expectCircuitBreakerOpen: true, + }, + { + name: "opened because failure percent too high", + setErrors: 10, + minRequests: 10, + consecutiveFailures: 100, + failurePercent: 0.1, + expectCircuitBreakerOpen: true, + }, + } { + t.Run(testdata.name, func(t *testing.T) { + config := defaultMemcachedClientConfig + config.Addresses = []string{"127.0.0.1:11211"} + config.SetAsyncCircuitBreakerOpenDuration = 1 * time.Millisecond + config.SetAsyncCircuitBreakerHalfOpenMaxRequests = 100 + config.SetAsyncCircuitBreakerMinRequests = testdata.minRequests + config.SetAsyncCircuitBreakerConsecutiveFailures = testdata.consecutiveFailures + config.SetAsyncCircuitBreakerFailurePercent = testdata.failurePercent + + backendMock := newMemcachedClientBackendMock() + backendMock.setErrors = testdata.setErrors + + client, err := prepare(config, backendMock) + testutil.Ok(t, err) + defer client.Stop() + + // Populate memcached with the initial items. + for i := 0; i < testdata.setErrors; i++ { + testutil.Ok(t, client.SetAsync(strconv.Itoa(i), []byte("value"), time.Second)) + } + + testutil.Ok(t, backendMock.waitSetCount(testdata.setErrors)) + if testdata.expectCircuitBreakerOpen { + testutil.Equals(t, gobreaker.StateOpen, client.setAsyncCircuitBreaker.State()) + time.Sleep(config.SetAsyncCircuitBreakerOpenDuration) + for i := testdata.setErrors; i < testdata.setErrors+10; i++ { + testutil.Ok(t, client.SetAsync(strconv.Itoa(i), []byte("value"), time.Second)) + } + testutil.Ok(t, backendMock.waitItems(10)) + } else { + testutil.Equals(t, gobreaker.StateClosed, client.setAsyncCircuitBreaker.State()) + } + }) + } +} diff --git a/pkg/cacheutil/redis_client.go b/pkg/cacheutil/redis_client.go index 032ed2942db..c3bf986d914 100644 --- a/pkg/cacheutil/redis_client.go +++ b/pkg/cacheutil/redis_client.go @@ -17,6 +17,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/redis/rueidis" + "github.com/sony/gobreaker" "gopkg.in/yaml.v3" "github.com/thanos-io/thanos/pkg/extprom" @@ -39,6 +40,11 @@ var ( TLSConfig: TLSConfig{}, MaxAsyncConcurrency: 20, MaxAsyncBufferSize: 10000, + SetAsyncCircuitBreakerHalfOpenMaxRequests: 10, + SetAsyncCircuitBreakerOpenDuration: 5 * time.Second, + SetAsyncCircuitBreakerMinRequests: 50, + SetAsyncCircuitBreakerConsecutiveFailures: 5, + SetAsyncCircuitBreakerFailurePercent: 0.05, } ) @@ -118,6 +124,20 @@ type RedisClientConfig struct { // MaxAsyncConcurrency specifies the maximum number of SetAsync goroutines. MaxAsyncConcurrency int `yaml:"max_async_concurrency"` + + // SetAsyncCircuitBreakerHalfOpenMaxRequests is the maximum number of requests allowed to pass through + // when the circuit breaker is half-open. + // If set to 0, the circuit breaker allows only 1 request. + SetAsyncCircuitBreakerHalfOpenMaxRequests uint32 `yaml:"set_async_circuit_breaker_half_open_max_requests"` + // SetAsyncCircuitBreakerOpenDuration is the period of the open state after which the state of the circuit breaker becomes half-open. + // If set to 0, the circuit breaker resets it to 60 seconds. + SetAsyncCircuitBreakerOpenDuration time.Duration `yaml:"set_async_circuit_breaker_open_duration"` + // SetAsyncCircuitBreakerMinRequests is minimal requests to trigger the circuit breaker. + SetAsyncCircuitBreakerMinRequests uint32 `yaml:"set_async_circuit_breaker_min_requests"` + // SetAsyncCircuitBreakerConsecutiveFailures represents consecutive failures based on CircuitBreakerMinRequests to determine if the circuit breaker should open. + SetAsyncCircuitBreakerConsecutiveFailures uint32 `yaml:"set_async_circuit_breaker_consecutive_failures"` + // SetAsyncCircuitBreakerFailurePercent represents the failure percentage, which is based on CircuitBreakerMinRequests, to determine if the circuit breaker should open. + SetAsyncCircuitBreakerFailurePercent float64 `yaml:"set_async_circuit_breaker_failure_percent"` } func (c *RedisClientConfig) validate() error { @@ -131,6 +151,12 @@ func (c *RedisClientConfig) validate() error { } } + if c.SetAsyncCircuitBreakerConsecutiveFailures == 0 { + return errCircuitBreakerConsecutiveFailuresNotPositive + } + if c.SetAsyncCircuitBreakerFailurePercent <= 0 || c.SetAsyncCircuitBreakerFailurePercent > 1 { + return errCircuitBreakerFailurePercentInvalid + } return nil } @@ -150,6 +176,8 @@ type RedisClient struct { durationGetMulti prometheus.Observer p *AsyncOperationProcessor + + setAsyncCircuitBreaker *gobreaker.CircuitBreaker } // NewRedisClient makes a new RedisClient. @@ -232,6 +260,17 @@ func NewRedisClientWithConfig(logger log.Logger, name string, config RedisClient config.MaxSetMultiConcurrency, gate.Sets, ), + setAsyncCircuitBreaker: gobreaker.NewCircuitBreaker(gobreaker.Settings{ + Name: "redis-set-async", + MaxRequests: config.SetAsyncCircuitBreakerHalfOpenMaxRequests, + Interval: 10 * time.Second, + Timeout: config.SetAsyncCircuitBreakerOpenDuration, + ReadyToTrip: func(counts gobreaker.Counts) bool { + return counts.Requests >= config.SetAsyncCircuitBreakerMinRequests && + (counts.ConsecutiveFailures >= uint32(config.SetAsyncCircuitBreakerConsecutiveFailures) || + float64(counts.TotalFailures)/float64(counts.Requests) >= config.SetAsyncCircuitBreakerFailurePercent) + }, + }), } duration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_redis_operation_duration_seconds", @@ -249,7 +288,10 @@ func NewRedisClientWithConfig(logger log.Logger, name string, config RedisClient func (c *RedisClient) SetAsync(key string, value []byte, ttl time.Duration) error { return c.p.EnqueueAsync(func() { start := time.Now() - if err := c.client.Do(context.Background(), c.client.B().Set().Key(key).Value(rueidis.BinaryString(value)).ExSeconds(int64(ttl.Seconds())).Build()).Error(); err != nil { + _, err := c.setAsyncCircuitBreaker.Execute(func() (any, error) { + return nil, c.client.Do(context.Background(), c.client.B().Set().Key(key).Value(rueidis.BinaryString(value)).ExSeconds(int64(ttl.Seconds())).Build()).Error() + }) + if err != nil { level.Warn(c.logger).Log("msg", "failed to set item into redis", "err", err, "key", key, "value_size", len(value)) return } diff --git a/pkg/cacheutil/redis_client_test.go b/pkg/cacheutil/redis_client_test.go index dcdb714012b..6e755235fd7 100644 --- a/pkg/cacheutil/redis_client_test.go +++ b/pkg/cacheutil/redis_client_test.go @@ -197,6 +197,24 @@ func TestValidateRedisConfig(t *testing.T) { }, expect_err: true, }, + { + name: "invalidCircuitBreakerFailurePercent", + config: func() RedisClientConfig { + cfg := DefaultRedisClientConfig + cfg.SetAsyncCircuitBreakerConsecutiveFailures = 0 + return cfg + }, + expect_err: true, + }, + { + name: "invalidCircuitBreakerFailurePercent", + config: func() RedisClientConfig { + cfg := DefaultRedisClientConfig + cfg.SetAsyncCircuitBreakerFailurePercent = 0 + return cfg + }, + expect_err: true, + }, } for _, tt := range tests {