Skip to content

Commit

Permalink
Implement the circuit breaker pattern for asynchronous set operations…
Browse files Browse the repository at this point in the history
… in the cache client

Signed-off-by: Xiaochao Dong (@damnever) <[email protected]>
  • Loading branch information
damnever committed Dec 26, 2023
1 parent a59a3ef commit ec38a52
Show file tree
Hide file tree
Showing 4 changed files with 256 additions and 20 deletions.
84 changes: 67 additions & 17 deletions pkg/cacheutil/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/sony/gobreaker"
"gopkg.in/yaml.v2"

"github.com/thanos-io/thanos/pkg/discovery/dns"
Expand All @@ -40,9 +41,11 @@ const (
)

var (
errMemcachedConfigNoAddrs = errors.New("no memcached addresses provided")
errMemcachedDNSUpdateIntervalNotPositive = errors.New("DNS provider update interval must be positive")
errMemcachedMaxAsyncConcurrencyNotPositive = errors.New("max async concurrency must be positive")
errMemcachedConfigNoAddrs = errors.New("no memcached addresses provided")
errMemcachedDNSUpdateIntervalNotPositive = errors.New("DNS provider update interval must be positive")
errMemcachedMaxAsyncConcurrencyNotPositive = errors.New("max async concurrency must be positive")
errCircuitBreakerConsecutiveFailuresNotPositive = errors.New("set async circuit breaker: consecutive failures must be greater than 0")
errCircuitBreakerFailurePercentInvalid = errors.New("set async circuit breaker: failure percent must be in range (0,1]")

defaultMemcachedClientConfig = MemcachedClientConfig{
Timeout: 500 * time.Millisecond,
Expand All @@ -54,6 +57,11 @@ var (
MaxGetMultiBatchSize: 0,
DNSProviderUpdateInterval: 10 * time.Second,
AutoDiscovery: false,
SetAsyncCircuitBreakerHalfOpenMaxRequests: 10,
SetAsyncCircuitBreakerOpenDuration: 5 * time.Second,
SetAsyncCircuitBreakerMinRequests: 50,
SetAsyncCircuitBreakerConsecutiveFailures: 5,
SetAsyncCircuitBreakerFailurePercent: 0.05,
}
)

Expand Down Expand Up @@ -141,6 +149,20 @@ type MemcachedClientConfig struct {

// AutoDiscovery configures memached client to perform auto-discovery instead of DNS resolution
AutoDiscovery bool `yaml:"auto_discovery"`

// SetAsyncCircuitBreakerHalfOpenMaxRequests is the maximum number of requests allowed to pass through
// when the circuit breaker is half-open.
// If set to 0, the circuit breaker allows only 1 request.
SetAsyncCircuitBreakerHalfOpenMaxRequests uint32 `yaml:"set_async_circuit_breaker_half_open_max_requests"`
// SetAsyncCircuitBreakerOpenDuration is the period of the open state after which the state of the circuit breaker becomes half-open.
// If set to 0, the circuit breaker resets it to 60 seconds.
SetAsyncCircuitBreakerOpenDuration time.Duration `yaml:"set_async_circuit_breaker_open_duration"`
// SetAsyncCircuitBreakerMinRequests is minimal requests to trigger the circuit breaker.
SetAsyncCircuitBreakerMinRequests uint32 `yaml:"set_async_circuit_breaker_min_requests"`
// SetAsyncCircuitBreakerConsecutiveFailures represents consecutive failures based on CircuitBreakerMinRequests to determine if the circuit breaker should open.
SetAsyncCircuitBreakerConsecutiveFailures uint32 `yaml:"set_async_circuit_breaker_consecutive_failures"`
// SetAsyncCircuitBreakerFailurePercent represents the failure percentage, which is based on CircuitBreakerMinRequests, to determine if the circuit breaker should open.
SetAsyncCircuitBreakerFailurePercent float64 `yaml:"set_async_circuit_breaker_failure_percent"`
}

func (c *MemcachedClientConfig) validate() error {
Expand All @@ -158,6 +180,12 @@ func (c *MemcachedClientConfig) validate() error {
return errMemcachedMaxAsyncConcurrencyNotPositive
}

if c.SetAsyncCircuitBreakerConsecutiveFailures == 0 {
return errCircuitBreakerConsecutiveFailuresNotPositive
}
if c.SetAsyncCircuitBreakerFailurePercent <= 0 || c.SetAsyncCircuitBreakerFailurePercent > 1 {
return errCircuitBreakerFailurePercentInvalid
}
return nil
}

Expand Down Expand Up @@ -195,6 +223,8 @@ type memcachedClient struct {
dataSize *prometheus.HistogramVec

p *AsyncOperationProcessor

setAsyncCircuitBreaker *gobreaker.CircuitBreaker
}

// AddressProvider performs node address resolution given a list of clusters.
Expand Down Expand Up @@ -278,6 +308,17 @@ func newMemcachedClient(
gate.Gets,
),
p: NewAsyncOperationProcessor(config.MaxAsyncBufferSize, config.MaxAsyncConcurrency),
setAsyncCircuitBreaker: gobreaker.NewCircuitBreaker(gobreaker.Settings{
Name: "memcached-set-async",
MaxRequests: config.SetAsyncCircuitBreakerHalfOpenMaxRequests,
Interval: 10 * time.Second,
Timeout: config.SetAsyncCircuitBreakerOpenDuration,
ReadyToTrip: func(counts gobreaker.Counts) bool {
return counts.Requests >= config.SetAsyncCircuitBreakerMinRequests &&
(counts.ConsecutiveFailures >= uint32(config.SetAsyncCircuitBreakerConsecutiveFailures) ||
float64(counts.TotalFailures)/float64(counts.Requests) >= config.SetAsyncCircuitBreakerFailurePercent)
},
}),
}

c.clientInfo = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
Expand Down Expand Up @@ -375,22 +416,31 @@ func (c *memcachedClient) SetAsync(key string, value []byte, ttl time.Duration)
start := time.Now()
c.operations.WithLabelValues(opSet).Inc()

err := c.client.Set(&memcache.Item{
Key: key,
Value: value,
Expiration: int32(time.Now().Add(ttl).Unix()),
_, err := c.setAsyncCircuitBreaker.Execute(func() (any, error) {
return nil, c.client.Set(&memcache.Item{
Key: key,
Value: value,
Expiration: int32(time.Now().Add(ttl).Unix()),
})
})
if err != nil {
// If the PickServer will fail for any reason the server address will be nil
// and so missing in the logs. We're OK with that (it's a best effort).
serverAddr, _ := c.selector.PickServer(key)
level.Debug(c.logger).Log(
"msg", "failed to store item to memcached",
"key", key,
"sizeBytes", len(value),
"server", serverAddr,
"err", err,
)
if errors.Is(err, gobreaker.ErrOpenState) || errors.Is(err, gobreaker.ErrTooManyRequests) {
level.Warn(c.logger).Log(
"msg", "circuit breaker disallows storing item in memcached",
"key", key,
"err", err)
} else {
// If the PickServer will fail for any reason the server address will be nil
// and so missing in the logs. We're OK with that (it's a best effort).
serverAddr, _ := c.selector.PickServer(key)
level.Debug(c.logger).Log(
"msg", "failed to store item to memcached",
"key", key,
"sizeBytes", len(value),
"server", serverAddr,
"err", err,
)
}
c.trackError(opSet, err)
return
}
Expand Down
130 changes: 128 additions & 2 deletions pkg/cacheutil/memcached_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"net"
"strconv"
"sync"
"testing"
"time"
Expand All @@ -16,9 +17,11 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/sony/gobreaker"
"go.uber.org/atomic"

"github.com/efficientgo/core/testutil"

"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/model"
)
Expand All @@ -33,6 +36,8 @@ func TestMemcachedClientConfig_validate(t *testing.T) {
Addresses: []string{"127.0.0.1:11211"},
MaxAsyncConcurrency: 1,
DNSProviderUpdateInterval: time.Second,
SetAsyncCircuitBreakerConsecutiveFailures: 1,
SetAsyncCircuitBreakerFailurePercent: 0.5,
},
expected: nil,
},
Expand All @@ -41,6 +46,8 @@ func TestMemcachedClientConfig_validate(t *testing.T) {
Addresses: []string{},
MaxAsyncConcurrency: 1,
DNSProviderUpdateInterval: time.Second,
SetAsyncCircuitBreakerConsecutiveFailures: 1,
SetAsyncCircuitBreakerFailurePercent: 0.5,
},
expected: errMemcachedConfigNoAddrs,
},
Expand All @@ -49,16 +56,49 @@ func TestMemcachedClientConfig_validate(t *testing.T) {
Addresses: []string{"127.0.0.1:11211"},
MaxAsyncConcurrency: 0,
DNSProviderUpdateInterval: time.Second,
SetAsyncCircuitBreakerConsecutiveFailures: 1,
SetAsyncCircuitBreakerFailurePercent: 0.5,
},
expected: errMemcachedMaxAsyncConcurrencyNotPositive,
},
"should fail on dns_provider_update_interval <= 0": {
config: MemcachedClientConfig{
Addresses: []string{"127.0.0.1:11211"},
MaxAsyncConcurrency: 1,
SetAsyncCircuitBreakerConsecutiveFailures: 1,
SetAsyncCircuitBreakerFailurePercent: 0.5,
},
expected: errMemcachedDNSUpdateIntervalNotPositive,
},
"should fail on circuit_breaker_consecutive_failures = 0": {
config: MemcachedClientConfig{
Addresses: []string{"127.0.0.1:11211"},
MaxAsyncConcurrency: 1,
DNSProviderUpdateInterval: time.Second,
SetAsyncCircuitBreakerConsecutiveFailures: 0,
},
expected: errCircuitBreakerConsecutiveFailuresNotPositive,
},
"should fail on circuit_breaker_failure_percent <= 0": {
config: MemcachedClientConfig{
Addresses: []string{"127.0.0.1:11211"},
MaxAsyncConcurrency: 1,
DNSProviderUpdateInterval: time.Second,
SetAsyncCircuitBreakerConsecutiveFailures: 1,
SetAsyncCircuitBreakerFailurePercent: 0,
},
expected: errCircuitBreakerFailurePercentInvalid,
},
"should fail on circuit_breaker_failure_percent >= 1": {
config: MemcachedClientConfig{
Addresses: []string{"127.0.0.1:11211"},
MaxAsyncConcurrency: 1,
DNSProviderUpdateInterval: time.Second,
SetAsyncCircuitBreakerConsecutiveFailures: 1,
SetAsyncCircuitBreakerFailurePercent: 1.1,
},
expected: errCircuitBreakerFailurePercentInvalid,
},
}

for testName, testData := range tests {
Expand Down Expand Up @@ -491,6 +531,9 @@ type memcachedClientBackendMock struct {
items map[string]*memcache.Item
getMultiCount int
getMultiErrors int

setCount int
setErrors int
}

func newMemcachedClientBackendMock() *memcachedClientBackendMock {
Expand Down Expand Up @@ -522,25 +565,38 @@ func (c *memcachedClientBackendMock) Set(item *memcache.Item) error {
c.lock.Lock()
defer c.lock.Unlock()

c.setCount++
if c.setCount <= c.setErrors {
return errors.New("mocked Set error")
}

c.items[item.Key] = item

return nil
}

func (c *memcachedClientBackendMock) waitSetCount(expected int) error {
return c.waitFor(expected, "the number of set operations", func() int { return c.setCount })
}

func (c *memcachedClientBackendMock) waitItems(expected int) error {
return c.waitFor(expected, "items", func() int { return len(c.items) })
}

func (c *memcachedClientBackendMock) waitFor(expected int, name string, valueFunc func() int) error {
deadline := time.Now().Add(1 * time.Second)

for time.Now().Before(deadline) {
c.lock.Lock()
count := len(c.items)
count := valueFunc()
c.lock.Unlock()

if count >= expected {
return nil
}
}

return errors.New("timeout expired while waiting for items in the memcached mock")
return fmt.Errorf("timeout expired while waiting for %s in the memcached mock", name)
}

// countingGate implements gate.Gate and counts the number of times Start is called.
Expand Down Expand Up @@ -630,3 +686,73 @@ func (c *memcachedClientBlockingMock) GetMulti([]string) (map[string]*memcache.I
func (c *memcachedClientBlockingMock) Set(*memcache.Item) error {
return nil
}

func TestMemcachedClient_SetAsync_CircuitBreaker(t *testing.T) {
for _, testdata := range []struct {
name string
setErrors int
minRequests uint32
consecutiveFailures uint32
failurePercent float64
expectCircuitBreakerOpen bool
}{
{
name: "remains closed due to min requests not satisfied",
setErrors: 10,
minRequests: 100,
consecutiveFailures: 1,
failurePercent: 0.00001,
expectCircuitBreakerOpen: false,
},
{
name: "opened because too many consecutive failures",
setErrors: 10,
minRequests: 10,
consecutiveFailures: 10,
failurePercent: 1,
expectCircuitBreakerOpen: true,
},
{
name: "opened because failure percent too high",
setErrors: 10,
minRequests: 10,
consecutiveFailures: 100,
failurePercent: 0.1,
expectCircuitBreakerOpen: true,
},
} {
t.Run(testdata.name, func(t *testing.T) {
config := defaultMemcachedClientConfig
config.Addresses = []string{"127.0.0.1:11211"}
config.SetAsyncCircuitBreakerOpenDuration = 1 * time.Millisecond
config.SetAsyncCircuitBreakerHalfOpenMaxRequests = 100
config.SetAsyncCircuitBreakerMinRequests = testdata.minRequests
config.SetAsyncCircuitBreakerConsecutiveFailures = testdata.consecutiveFailures
config.SetAsyncCircuitBreakerFailurePercent = testdata.failurePercent

backendMock := newMemcachedClientBackendMock()
backendMock.setErrors = testdata.setErrors

client, err := prepare(config, backendMock)
testutil.Ok(t, err)
defer client.Stop()

// Populate memcached with the initial items.
for i := 0; i < testdata.setErrors; i++ {
testutil.Ok(t, client.SetAsync(strconv.Itoa(i), []byte("value"), time.Second))
}

testutil.Ok(t, backendMock.waitSetCount(testdata.setErrors))
if testdata.expectCircuitBreakerOpen {
testutil.Equals(t, gobreaker.StateOpen, client.setAsyncCircuitBreaker.State())
time.Sleep(config.SetAsyncCircuitBreakerOpenDuration)
for i := testdata.setErrors; i < testdata.setErrors+10; i++ {
testutil.Ok(t, client.SetAsync(strconv.Itoa(i), []byte("value"), time.Second))
}
testutil.Ok(t, backendMock.waitItems(10))
} else {
testutil.Equals(t, gobreaker.StateClosed, client.setAsyncCircuitBreaker.State())
}
})
}
}
Loading

0 comments on commit ec38a52

Please sign in to comment.