Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cache: implement the circuit breaker pattern for asynchronous set operations in the cache client #7010

Merged
merged 6 commits into from
Feb 25, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6887](https://github.com/thanos-io/thanos/pull/6887) Query Frontend: *breaking :warning:* Add tenant label to relevant exported metrics. Note that this change may cause some pre-existing custom dashboard queries to be incorrect due to the added label.
- [#7028](https://github.com/thanos-io/thanos/pull/7028) Query|Query Frontend: Add new `--query-frontend.enable-x-functions` flag to enable experimental extended functions.
- [#6884](https://github.com/thanos-io/thanos/pull/6884) Tools: Add upload-block command to upload blocks to object storage.
- [#7010](https://github.com/thanos-io/thanos/pull/7010) Cache: Added `set_async_circuit_breaker_*` to utilize the circuit breaker pattern for dynamically thresholding asynchronous set operations.

### Changed

Expand Down
12 changes: 12 additions & 0 deletions docs/components/query-frontend.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ config:
max_get_multi_batch_size: 0
dns_provider_update_interval: 0s
auto_discovery: false
set_async_circuit_breaker_enabled: false
set_async_circuit_breaker_half_open_max_requests: 0
set_async_circuit_breaker_open_duration: 0s
set_async_circuit_breaker_min_requests: 0
set_async_circuit_breaker_consecutive_failures: 0
set_async_circuit_breaker_failure_percent: 0
expiration: 0s
```

Expand Down Expand Up @@ -132,6 +138,12 @@ config:
master_name: ""
max_async_buffer_size: 10000
max_async_concurrency: 20
set_async_circuit_breaker_enabled: false
set_async_circuit_breaker_half_open_max_requests: 10
set_async_circuit_breaker_open_duration: 5s
set_async_circuit_breaker_min_requests: 50
set_async_circuit_breaker_consecutive_failures: 5
set_async_circuit_breaker_failure_percent: 0.05
expiration: 24h0m0s
```

Expand Down
18 changes: 18 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,12 @@ config:
max_get_multi_batch_size: 0
dns_provider_update_interval: 0s
auto_discovery: false
set_async_circuit_breaker_enabled: false
set_async_circuit_breaker_half_open_max_requests: 0
set_async_circuit_breaker_open_duration: 0s
set_async_circuit_breaker_min_requests: 0
set_async_circuit_breaker_consecutive_failures: 0
set_async_circuit_breaker_failure_percent: 0
enabled_items: []
ttl: 0s
```
Expand All @@ -340,6 +346,12 @@ While the remaining settings are **optional**:
- `max_async_concurrency`: maximum number of concurrent asynchronous operations can occur.
- `max_async_buffer_size`: maximum number of enqueued asynchronous operations allowed.
- `max_get_multi_concurrency`: maximum number of concurrent connections when fetching keys. If set to `0`, the concurrency is unlimited.
- `set_async_circuit_breaker_enabled`: `true` to enable circuite breaker for asynchronous operations. The circuit breaker consists of three states: closed, half-open, and open. It begins in the closed state. When the total requests exceed `set_async_circuit_breaker_min_requests`, and either consecutive failures occur or the failure percentage is excessively high according to the configured values, the circuit breaker transitions to the open state. This results in the rejection of all asynchronous operations. After `set_async_circuit_breaker_open_duration`, the circuit breaker transitions to the half-open state, where it allows `set_async_circuit_breaker_half_open_max_requests` asynchronous operations to be processed in order to test if the conditions have improved. If they have not, the state transitions back to open; if they have, it transitions to the closed state. Following each 10 seconds interval in the closed state, the circuit breaker resets its metrics and repeats this cycle.
- `set_async_circuit_breaker_half_open_max_requests`: maximum number of requests allowed to pass through when the circuit breaker is half-open. If set to 0, the circuit breaker allows only 1 request.
- `set_async_circuit_breaker_open_duration`: the period of the open state after which the state of the circuit breaker becomes half-open. If set to 0, the circuit breaker resets it to 60 seconds.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If set to 0, the circuit breaker resets it to 60 seconds.

What does it mean? That the default value of this is 60 seconds? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, github.com/sony/gobreaker uses this default value.

- `set_async_circuit_breaker_min_requests`: minimal requests to trigger the circuit breaker, 0 signifies no requirements.
- `set_async_circuit_breaker_consecutive_failures`: consecutive failures based on `set_async_circuit_breaker_min_requests` to determine if the circuit breaker should open.
- `set_async_circuit_breaker_failure_percent`: the failure percentage, which is based on `set_async_circuit_breaker_min_requests`, to determine if the circuit breaker should open.
- `max_get_multi_batch_size`: maximum number of keys a single underlying operation should fetch. If more keys are specified, internally keys are splitted into multiple batches and fetched concurrently, honoring `max_get_multi_concurrency`. If set to `0`, the batch size is unlimited.
- `max_item_size`: maximum size of an item to be stored in memcached. This option should be set to the same value of memcached `-I` flag (defaults to 1MB) in order to avoid wasting network round trips to store items larger than the max item size allowed in memcached. If set to `0`, the item size is unlimited.
- `dns_provider_update_interval`: the DNS discovery update interval.
Expand Down Expand Up @@ -376,6 +388,12 @@ config:
master_name: ""
max_async_buffer_size: 10000
max_async_concurrency: 20
set_async_circuit_breaker_enabled: false
set_async_circuit_breaker_half_open_max_requests: 10
set_async_circuit_breaker_open_duration: 5s
set_async_circuit_breaker_min_requests: 50
set_async_circuit_breaker_consecutive_failures: 5
set_async_circuit_breaker_failure_percent: 0.05
enabled_items: []
ttl: 0s
```
Expand Down
22 changes: 22 additions & 0 deletions pkg/cacheutil/cacheutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (

"golang.org/x/sync/errgroup"

"github.com/sony/gobreaker"

"github.com/thanos-io/thanos/pkg/gate"
)

Expand Down Expand Up @@ -40,3 +42,23 @@ func doWithBatch(ctx context.Context, totalSize int, batchSize int, ga gate.Gate
}
return g.Wait()
}

// CircuitBreaker implements the circuit breaker pattern https://en.wikipedia.org/wiki/Circuit_breaker_design_pattern.
type CircuitBreaker interface {
Execute(func() error) error
}

type noopCircuitBreaker struct{}

func (noopCircuitBreaker) Execute(f func() error) error { return f() }

type gobreakerCircuitBreaker struct {
*gobreaker.CircuitBreaker
}

func (cb gobreakerCircuitBreaker) Execute(f func() error) error {
_, err := cb.CircuitBreaker.Execute(func() (any, error) {
return nil, f()
})
return err
}
93 changes: 75 additions & 18 deletions pkg/cacheutil/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/sony/gobreaker"
"gopkg.in/yaml.v2"

"github.com/thanos-io/thanos/pkg/discovery/dns"
Expand All @@ -40,9 +41,11 @@ const (
)

var (
errMemcachedConfigNoAddrs = errors.New("no memcached addresses provided")
errMemcachedDNSUpdateIntervalNotPositive = errors.New("DNS provider update interval must be positive")
errMemcachedMaxAsyncConcurrencyNotPositive = errors.New("max async concurrency must be positive")
errMemcachedConfigNoAddrs = errors.New("no memcached addresses provided")
errMemcachedDNSUpdateIntervalNotPositive = errors.New("DNS provider update interval must be positive")
errMemcachedMaxAsyncConcurrencyNotPositive = errors.New("max async concurrency must be positive")
errCircuitBreakerConsecutiveFailuresNotPositive = errors.New("set async circuit breaker: consecutive failures must be greater than 0")
errCircuitBreakerFailurePercentInvalid = errors.New("set async circuit breaker: failure percent must be in range (0,1]")

defaultMemcachedClientConfig = MemcachedClientConfig{
Timeout: 500 * time.Millisecond,
Expand All @@ -54,6 +57,13 @@ var (
MaxGetMultiBatchSize: 0,
DNSProviderUpdateInterval: 10 * time.Second,
AutoDiscovery: false,

SetAsyncCircuitBreakerEnabled: false,
SetAsyncCircuitBreakerHalfOpenMaxRequests: 10,
SetAsyncCircuitBreakerOpenDuration: 5 * time.Second,
SetAsyncCircuitBreakerMinRequests: 50,
SetAsyncCircuitBreakerConsecutiveFailures: 5,
SetAsyncCircuitBreakerFailurePercent: 0.05,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need to keep an eye on this to make sure those values sane.
So far, it looks good to me. Would be better to have another pair of eyes on it

}
)

Expand Down Expand Up @@ -141,6 +151,22 @@ type MemcachedClientConfig struct {

// AutoDiscovery configures memached client to perform auto-discovery instead of DNS resolution
AutoDiscovery bool `yaml:"auto_discovery"`

// SetAsyncCircuitBreakerEnabled enables circuite breaker for SetAsync operations.
SetAsyncCircuitBreakerEnabled bool `yaml:"set_async_circuit_breaker_enabled"`
// SetAsyncCircuitBreakerHalfOpenMaxRequests is the maximum number of requests allowed to pass through
// when the circuit breaker is half-open.
// If set to 0, the circuit breaker allows only 1 request.
SetAsyncCircuitBreakerHalfOpenMaxRequests uint32 `yaml:"set_async_circuit_breaker_half_open_max_requests"`
// SetAsyncCircuitBreakerOpenDuration is the period of the open state after which the state of the circuit breaker becomes half-open.
// If set to 0, the circuit breaker resets it to 60 seconds.
SetAsyncCircuitBreakerOpenDuration time.Duration `yaml:"set_async_circuit_breaker_open_duration"`
// SetAsyncCircuitBreakerMinRequests is minimal requests to trigger the circuit breaker.
SetAsyncCircuitBreakerMinRequests uint32 `yaml:"set_async_circuit_breaker_min_requests"`
// SetAsyncCircuitBreakerConsecutiveFailures represents consecutive failures based on CircuitBreakerMinRequests to determine if the circuit breaker should open.
SetAsyncCircuitBreakerConsecutiveFailures uint32 `yaml:"set_async_circuit_breaker_consecutive_failures"`
// SetAsyncCircuitBreakerFailurePercent represents the failure percentage, which is based on CircuitBreakerMinRequests, to determine if the circuit breaker should open.
SetAsyncCircuitBreakerFailurePercent float64 `yaml:"set_async_circuit_breaker_failure_percent"`
}

func (c *MemcachedClientConfig) validate() error {
Expand All @@ -158,6 +184,12 @@ func (c *MemcachedClientConfig) validate() error {
return errMemcachedMaxAsyncConcurrencyNotPositive
}

if c.SetAsyncCircuitBreakerConsecutiveFailures == 0 {
return errCircuitBreakerConsecutiveFailuresNotPositive
}
if c.SetAsyncCircuitBreakerFailurePercent <= 0 || c.SetAsyncCircuitBreakerFailurePercent > 1 {
return errCircuitBreakerFailurePercentInvalid
}
damnever marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

Expand Down Expand Up @@ -195,6 +227,8 @@ type memcachedClient struct {
dataSize *prometheus.HistogramVec

p *AsyncOperationProcessor

setAsyncCircuitBreaker CircuitBreaker
}

// AddressProvider performs node address resolution given a list of clusters.
Expand Down Expand Up @@ -277,7 +311,21 @@ func newMemcachedClient(
config.MaxGetMultiConcurrency,
gate.Gets,
),
p: NewAsyncOperationProcessor(config.MaxAsyncBufferSize, config.MaxAsyncConcurrency),
p: NewAsyncOperationProcessor(config.MaxAsyncBufferSize, config.MaxAsyncConcurrency),
setAsyncCircuitBreaker: noopCircuitBreaker{},
}
if config.SetAsyncCircuitBreakerEnabled {
c.setAsyncCircuitBreaker = gobreakerCircuitBreaker{gobreaker.NewCircuitBreaker(gobreaker.Settings{
Name: "memcached-set-async",
MaxRequests: config.SetAsyncCircuitBreakerHalfOpenMaxRequests,
Interval: 10 * time.Second,
Timeout: config.SetAsyncCircuitBreakerOpenDuration,
ReadyToTrip: func(counts gobreaker.Counts) bool {
return counts.Requests >= config.SetAsyncCircuitBreakerMinRequests &&
(counts.ConsecutiveFailures >= uint32(config.SetAsyncCircuitBreakerConsecutiveFailures) ||
float64(counts.TotalFailures)/float64(counts.Requests) >= config.SetAsyncCircuitBreakerFailurePercent)
},
})}
}

c.clientInfo = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
Expand Down Expand Up @@ -375,22 +423,31 @@ func (c *memcachedClient) SetAsync(key string, value []byte, ttl time.Duration)
start := time.Now()
c.operations.WithLabelValues(opSet).Inc()

err := c.client.Set(&memcache.Item{
Key: key,
Value: value,
Expiration: int32(time.Now().Add(ttl).Unix()),
err := c.setAsyncCircuitBreaker.Execute(func() error {
return c.client.Set(&memcache.Item{
Key: key,
Value: value,
Expiration: int32(time.Now().Add(ttl).Unix()),
})
})
if err != nil {
// If the PickServer will fail for any reason the server address will be nil
// and so missing in the logs. We're OK with that (it's a best effort).
serverAddr, _ := c.selector.PickServer(key)
level.Debug(c.logger).Log(
"msg", "failed to store item to memcached",
"key", key,
"sizeBytes", len(value),
"server", serverAddr,
"err", err,
)
if errors.Is(err, gobreaker.ErrOpenState) || errors.Is(err, gobreaker.ErrTooManyRequests) {
level.Warn(c.logger).Log(
"msg", "circuit breaker disallows storing item in memcached",
"key", key,
"err", err)
} else {
// If the PickServer will fail for any reason the server address will be nil
// and so missing in the logs. We're OK with that (it's a best effort).
serverAddr, _ := c.selector.PickServer(key)
level.Debug(c.logger).Log(
"msg", "failed to store item to memcached",
"key", key,
"sizeBytes", len(value),
"server", serverAddr,
"err", err,
)
}
c.trackError(opSet, err)
return
}
Expand Down
Loading
Loading