From c9b1f7cc9ace47801d4f7639ffeb407fbfa8bade Mon Sep 17 00:00:00 2001 From: "Xiaochao Dong (@damnever)" Date: Thu, 25 Jan 2024 15:33:04 +0800 Subject: [PATCH] Add feature flag for circuitbreaker Signed-off-by: Xiaochao Dong (@damnever) --- CHANGELOG.md | 1 + pkg/cacheutil/cacheutil.go | 22 +++++++++++++++++++ pkg/cacheutil/memcached_client.go | 29 ++++++++++++++++++++------ pkg/cacheutil/memcached_client_test.go | 6 ++++-- pkg/cacheutil/redis_client.go | 26 ++++++++++++++++++----- 5 files changed, 71 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 832465e3e0..d898fe976d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#6887](https://github.com/thanos-io/thanos/pull/6887) Query Frontend: *breaking :warning:* Add tenant label to relevant exported metrics. Note that this change may cause some pre-existing custom dashboard queries to be incorrect due to the added label. - [#7028](https://github.com/thanos-io/thanos/pull/7028) Query|Query Frontend: Add new `--query-frontend.enable-x-functions` flag to enable experimental extended functions. - [#6884](https://github.com/thanos-io/thanos/pull/6884) Tools: Add upload-block command to upload blocks to object storage. +- [#7010](https://github.com/thanos-io/thanos/pull/7010) Cache: Added `set_async_circuit_breaker_*` to utilize the circuit breaker pattern for dynamically thresholding asynchronous set operations. ### Changed diff --git a/pkg/cacheutil/cacheutil.go b/pkg/cacheutil/cacheutil.go index 5d91bb9a3f..73d183d340 100644 --- a/pkg/cacheutil/cacheutil.go +++ b/pkg/cacheutil/cacheutil.go @@ -8,6 +8,8 @@ import ( "golang.org/x/sync/errgroup" + "github.com/sony/gobreaker" + "github.com/thanos-io/thanos/pkg/gate" ) @@ -40,3 +42,23 @@ func doWithBatch(ctx context.Context, totalSize int, batchSize int, ga gate.Gate } return g.Wait() } + +// CircuitBreaker implements the circuit breaker pattern https://en.wikipedia.org/wiki/Circuit_breaker_design_pattern. +type CircuitBreaker interface { + Execute(func() error) error +} + +type noopCircuitBreaker struct{} + +func (noopCircuitBreaker) Execute(f func() error) error { return f() } + +type gobreakerCircuitBreaker struct { + *gobreaker.CircuitBreaker +} + +func (cb gobreakerCircuitBreaker) Execute(f func() error) error { + _, err := cb.CircuitBreaker.Execute(func() (any, error) { + return nil, f() + }) + return err +} diff --git a/pkg/cacheutil/memcached_client.go b/pkg/cacheutil/memcached_client.go index 6589ea89d3..a7d08274b3 100644 --- a/pkg/cacheutil/memcached_client.go +++ b/pkg/cacheutil/memcached_client.go @@ -57,6 +57,8 @@ var ( MaxGetMultiBatchSize: 0, DNSProviderUpdateInterval: 10 * time.Second, AutoDiscovery: false, + + SetAsyncCircuitBreakerEnabled: false, SetAsyncCircuitBreakerHalfOpenMaxRequests: 10, SetAsyncCircuitBreakerOpenDuration: 5 * time.Second, SetAsyncCircuitBreakerMinRequests: 50, @@ -150,6 +152,18 @@ type MemcachedClientConfig struct { // AutoDiscovery configures memached client to perform auto-discovery instead of DNS resolution AutoDiscovery bool `yaml:"auto_discovery"` + // SetAsyncCircuitBreakerEnabled enables circuite breaker for SetAsync operations. + // + // The circuit breaker consists of three states: closed, half-open, and open. + // It begins in the closed state. When the total requests exceed SetAsyncCircuitBreakerMinRequests, + // and either consecutive failures occur or the failure percentage is excessively high according + // to the configured values, the circuit breaker transitions to the open state. + // This results in the rejection of all SetAsync requests. After SetAsyncCircuitBreakerOpenDuration, + // the circuit breaker transitions to the half-open state, where it allows SetAsyncCircuitBreakerHalfOpenMaxRequests + // SetAsync requests to be processed in order to test if the conditions have improved. If they have not, + // the state transitions back to open; if they have, it transitions to the closed state. Following each 10 seconds + // interval in the closed state, the circuit breaker resets its metrics and repeats this cycle. + SetAsyncCircuitBreakerEnabled bool `yaml:"set_async_circuit_breaker_enabled"` // SetAsyncCircuitBreakerHalfOpenMaxRequests is the maximum number of requests allowed to pass through // when the circuit breaker is half-open. // If set to 0, the circuit breaker allows only 1 request. @@ -224,7 +238,7 @@ type memcachedClient struct { p *AsyncOperationProcessor - setAsyncCircuitBreaker *gobreaker.CircuitBreaker + setAsyncCircuitBreaker CircuitBreaker } // AddressProvider performs node address resolution given a list of clusters. @@ -307,8 +321,11 @@ func newMemcachedClient( config.MaxGetMultiConcurrency, gate.Gets, ), - p: NewAsyncOperationProcessor(config.MaxAsyncBufferSize, config.MaxAsyncConcurrency), - setAsyncCircuitBreaker: gobreaker.NewCircuitBreaker(gobreaker.Settings{ + p: NewAsyncOperationProcessor(config.MaxAsyncBufferSize, config.MaxAsyncConcurrency), + setAsyncCircuitBreaker: noopCircuitBreaker{}, + } + if config.SetAsyncCircuitBreakerEnabled { + c.setAsyncCircuitBreaker = gobreakerCircuitBreaker{gobreaker.NewCircuitBreaker(gobreaker.Settings{ Name: "memcached-set-async", MaxRequests: config.SetAsyncCircuitBreakerHalfOpenMaxRequests, Interval: 10 * time.Second, @@ -318,7 +335,7 @@ func newMemcachedClient( (counts.ConsecutiveFailures >= uint32(config.SetAsyncCircuitBreakerConsecutiveFailures) || float64(counts.TotalFailures)/float64(counts.Requests) >= config.SetAsyncCircuitBreakerFailurePercent) }, - }), + })} } c.clientInfo = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{ @@ -416,8 +433,8 @@ func (c *memcachedClient) SetAsync(key string, value []byte, ttl time.Duration) start := time.Now() c.operations.WithLabelValues(opSet).Inc() - _, err := c.setAsyncCircuitBreaker.Execute(func() (any, error) { - return nil, c.client.Set(&memcache.Item{ + err := c.setAsyncCircuitBreaker.Execute(func() error { + return c.client.Set(&memcache.Item{ Key: key, Value: value, Expiration: int32(time.Now().Add(ttl).Unix()), diff --git a/pkg/cacheutil/memcached_client_test.go b/pkg/cacheutil/memcached_client_test.go index 7b0571c32e..297ff7d281 100644 --- a/pkg/cacheutil/memcached_client_test.go +++ b/pkg/cacheutil/memcached_client_test.go @@ -724,6 +724,7 @@ func TestMemcachedClient_SetAsync_CircuitBreaker(t *testing.T) { t.Run(testdata.name, func(t *testing.T) { config := defaultMemcachedClientConfig config.Addresses = []string{"127.0.0.1:11211"} + config.SetAsyncCircuitBreakerEnabled = true config.SetAsyncCircuitBreakerOpenDuration = 1 * time.Millisecond config.SetAsyncCircuitBreakerHalfOpenMaxRequests = 100 config.SetAsyncCircuitBreakerMinRequests = testdata.minRequests @@ -743,15 +744,16 @@ func TestMemcachedClient_SetAsync_CircuitBreaker(t *testing.T) { } testutil.Ok(t, backendMock.waitSetCount(testdata.setErrors)) + cbimpl := client.setAsyncCircuitBreaker.(gobreakerCircuitBreaker).CircuitBreaker if testdata.expectCircuitBreakerOpen { - testutil.Equals(t, gobreaker.StateOpen, client.setAsyncCircuitBreaker.State()) + testutil.Equals(t, gobreaker.StateOpen, cbimpl.State()) time.Sleep(config.SetAsyncCircuitBreakerOpenDuration) for i := testdata.setErrors; i < testdata.setErrors+10; i++ { testutil.Ok(t, client.SetAsync(strconv.Itoa(i), []byte("value"), time.Second)) } testutil.Ok(t, backendMock.waitItems(10)) } else { - testutil.Equals(t, gobreaker.StateClosed, client.setAsyncCircuitBreaker.State()) + testutil.Equals(t, gobreaker.StateClosed, cbimpl.State()) } }) } diff --git a/pkg/cacheutil/redis_client.go b/pkg/cacheutil/redis_client.go index c3bf986d91..b2aef1197d 100644 --- a/pkg/cacheutil/redis_client.go +++ b/pkg/cacheutil/redis_client.go @@ -125,6 +125,18 @@ type RedisClientConfig struct { // MaxAsyncConcurrency specifies the maximum number of SetAsync goroutines. MaxAsyncConcurrency int `yaml:"max_async_concurrency"` + // SetAsyncCircuitBreakerEnabled enables circuite breaker for SetAsync operations. + // + // The circuit breaker consists of three states: closed, half-open, and open. + // It begins in the closed state. When the total requests exceed SetAsyncCircuitBreakerMinRequests, + // and either consecutive failures occur or the failure percentage is excessively high according + // to the configured values, the circuit breaker transitions to the open state. + // This results in the rejection of all SetAsync requests. After SetAsyncCircuitBreakerOpenDuration, + // the circuit breaker transitions to the half-open state, where it allows SetAsyncCircuitBreakerHalfOpenMaxRequests + // SetAsync requests to be processed in order to test if the conditions have improved. If they have not, + // the state transitions back to open; if they have, it transitions to the closed state. Following each 10 seconds + // interval in the closed state, the circuit breaker resets its metrics and repeats this cycle. + SetAsyncCircuitBreakerEnabled bool `yaml:"set_async_circuit_breaker_enabled"` // SetAsyncCircuitBreakerHalfOpenMaxRequests is the maximum number of requests allowed to pass through // when the circuit breaker is half-open. // If set to 0, the circuit breaker allows only 1 request. @@ -177,7 +189,7 @@ type RedisClient struct { p *AsyncOperationProcessor - setAsyncCircuitBreaker *gobreaker.CircuitBreaker + setAsyncCircuitBreaker CircuitBreaker } // NewRedisClient makes a new RedisClient. @@ -260,7 +272,10 @@ func NewRedisClientWithConfig(logger log.Logger, name string, config RedisClient config.MaxSetMultiConcurrency, gate.Sets, ), - setAsyncCircuitBreaker: gobreaker.NewCircuitBreaker(gobreaker.Settings{ + setAsyncCircuitBreaker: noopCircuitBreaker{}, + } + if config.SetAsyncCircuitBreakerEnabled { + c.setAsyncCircuitBreaker = gobreakerCircuitBreaker{gobreaker.NewCircuitBreaker(gobreaker.Settings{ Name: "redis-set-async", MaxRequests: config.SetAsyncCircuitBreakerHalfOpenMaxRequests, Interval: 10 * time.Second, @@ -270,8 +285,9 @@ func NewRedisClientWithConfig(logger log.Logger, name string, config RedisClient (counts.ConsecutiveFailures >= uint32(config.SetAsyncCircuitBreakerConsecutiveFailures) || float64(counts.TotalFailures)/float64(counts.Requests) >= config.SetAsyncCircuitBreakerFailurePercent) }, - }), + })} } + duration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "thanos_redis_operation_duration_seconds", Help: "Duration of operations against redis.", @@ -288,8 +304,8 @@ func NewRedisClientWithConfig(logger log.Logger, name string, config RedisClient func (c *RedisClient) SetAsync(key string, value []byte, ttl time.Duration) error { return c.p.EnqueueAsync(func() { start := time.Now() - _, err := c.setAsyncCircuitBreaker.Execute(func() (any, error) { - return nil, c.client.Do(context.Background(), c.client.B().Set().Key(key).Value(rueidis.BinaryString(value)).ExSeconds(int64(ttl.Seconds())).Build()).Error() + err := c.setAsyncCircuitBreaker.Execute(func() error { + return c.client.Do(context.Background(), c.client.B().Set().Key(key).Value(rueidis.BinaryString(value)).ExSeconds(int64(ttl.Seconds())).Build()).Error() }) if err != nil { level.Warn(c.logger).Log("msg", "failed to set item into redis", "err", err, "key", key, "value_size", len(value))