Skip to content

Commit

Permalink
ratelimits/wfe: redis source should use go-redis timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
beautifulentropy committed Sep 18, 2023
1 parent 417dfb5 commit 79f905d
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 35 deletions.
16 changes: 13 additions & 3 deletions cmd/boulder-wfe2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,18 @@ func setupWFE(c Config, scope prometheus.Registerer, clk clock.Clock, log blog.L
var limiterLookup *bredis.Lookup
if c.WFE.Limiter.Defaults != "" {
// Setup rate limiting.

// The default read and write timeouts for go-redis clients are 3s. We
// want to be more aggressive than that.
readTimeout := c.WFE.Limiter.Redis.ReadTimeout.Duration
if readTimeout > 250*time.Millisecond || readTimeout <= 0 {
cmd.Fail("limiter.redis.readTimeout must be <= 250ms and > 0 ns")
}
writeTimeout := c.WFE.Limiter.Redis.WriteTimeout.Duration
if writeTimeout > 250*time.Millisecond || writeTimeout <= 0 {
cmd.Fail("limiter.redis.writeTimeout must be <= 250ms and > 0 ns")
}

var ring *redis.Ring
if len(c.WFE.Limiter.Redis.Lookups) > 0 {
// Configure a Redis client with periodic SRV lookups.
Expand All @@ -269,9 +281,7 @@ func setupWFE(c Config, scope prometheus.Registerer, clk clock.Clock, log blog.L
ring, err = c.WFE.Limiter.Redis.NewRing(scope)
cmd.FailOnError(err, "Failed to create Redis client for rate limiting")
}

timeout := c.WFE.Limiter.Redis.Timeout.Duration
source := ratelimits.NewRedisSource(ring, timeout, clk, scope)
source := ratelimits.NewRedisSource(ring, clk, scope)
limiter, err = ratelimits.NewLimiter(clk, source, c.WFE.Limiter.Defaults, c.WFE.Limiter.Overrides, scope)
cmd.FailOnError(err, "Failed to create rate limiter")
}
Expand Down
37 changes: 15 additions & 22 deletions ratelimits/source_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ratelimits
import (
"context"
"errors"
"net"
"time"

"github.com/jmhodges/clock"
Expand All @@ -16,7 +17,6 @@ var _ source = (*RedisSource)(nil)
// RedisSource is a ratelimits source backed by sharded Redis.
type RedisSource struct {
client *redis.Ring
timeout time.Duration
clk clock.Clock
setLatency *prometheus.HistogramVec
getLatency *prometheus.HistogramVec
Expand All @@ -25,9 +25,9 @@ type RedisSource struct {

// NewRedisSource returns a new Redis backed source using the provided
// *redis.Ring client.
func NewRedisSource(client *redis.Ring, timeout time.Duration, clk clock.Clock, stats prometheus.Registerer) *RedisSource {
// Exponential buckets ranging from 0.0005s to 2s
buckets := prometheus.ExponentialBucketsRange(0.0005, 2, 8)
func NewRedisSource(client *redis.Ring, clk clock.Clock, stats prometheus.Registerer) *RedisSource {
// Exponential buckets ranging from 0.0005s to 300ms.
buckets := prometheus.ExponentialBucketsRange(0.0005, 0.3, 8)

setLatency := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Expand Down Expand Up @@ -61,7 +61,6 @@ func NewRedisSource(client *redis.Ring, timeout time.Duration, clk clock.Clock,

return &RedisSource{
client: client,
timeout: timeout,
clk: clk,
setLatency: setLatency,
getLatency: getLatency,
Expand All @@ -74,8 +73,6 @@ func NewRedisSource(client *redis.Ring, timeout time.Duration, clk clock.Clock,
// exist, it will be created.
func (r *RedisSource) Set(ctx context.Context, bucketKey string, tat time.Time) error {
start := r.clk.Now()
ctx, cancel := context.WithTimeout(ctx, r.timeout)
defer cancel()

err := r.client.Set(ctx, bucketKey, tat.UnixNano(), 0).Err()
if err != nil {
Expand All @@ -85,11 +82,11 @@ func (r *RedisSource) Set(ctx context.Context, bucketKey string, tat time.Time)
} else if errors.Is(err, context.Canceled) {
state = "canceled"
}
r.setLatency.With(prometheus.Labels{"result": state}).Observe(time.Since(start).Seconds())
r.setLatency.With(prometheus.Labels{"result": state}).Observe(r.clk.Since(start).Seconds())
return err
}

r.setLatency.With(prometheus.Labels{"result": "success"}).Observe(time.Since(start).Seconds())
r.setLatency.With(prometheus.Labels{"result": "success"}).Observe(r.clk.Since(start).Seconds())
return nil
}

Expand All @@ -98,28 +95,29 @@ func (r *RedisSource) Set(ctx context.Context, bucketKey string, tat time.Time)
// If the bucketKey does not exist, it returns ErrBucketNotFound.
func (r *RedisSource) Get(ctx context.Context, bucketKey string) (time.Time, error) {
start := r.clk.Now()
ctx, cancel := context.WithTimeout(ctx, r.timeout)
defer cancel()

tatNano, err := r.client.Get(ctx, bucketKey).Int64()
if err != nil {
if errors.Is(err, redis.Nil) {
// Bucket key does not exist.
r.getLatency.With(prometheus.Labels{"result": "notFound"}).Observe(time.Since(start).Seconds())
r.getLatency.With(prometheus.Labels{"result": "notFound"}).Observe(r.clk.Since(start).Seconds())
return time.Time{}, ErrBucketNotFound
}

state := "failed"
if errors.Is(err, context.DeadlineExceeded) {
var netErr net.Error
if errors.As(err, &netErr) && netErr.Timeout() {
state = "timeout"
} else if errors.Is(err, context.DeadlineExceeded) {
state = "deadlineExceeded"
} else if errors.Is(err, context.Canceled) {
state = "canceled"
}
r.getLatency.With(prometheus.Labels{"result": state}).Observe(time.Since(start).Seconds())
r.getLatency.With(prometheus.Labels{"result": state}).Observe(r.clk.Since(start).Seconds())
return time.Time{}, err
}

r.getLatency.With(prometheus.Labels{"result": "success"}).Observe(time.Since(start).Seconds())
r.getLatency.With(prometheus.Labels{"result": "success"}).Observe(r.clk.Since(start).Seconds())
return time.Unix(0, tatNano).UTC(), nil
}

Expand All @@ -128,25 +126,20 @@ func (r *RedisSource) Get(ctx context.Context, bucketKey string) (time.Time, err
// indicate that the bucketKey existed.
func (r *RedisSource) Delete(ctx context.Context, bucketKey string) error {
start := r.clk.Now()
ctx, cancel := context.WithTimeout(ctx, r.timeout)
defer cancel()

err := r.client.Del(ctx, bucketKey).Err()
if err != nil {
r.deleteLatency.With(prometheus.Labels{"result": "failed"}).Observe(time.Since(start).Seconds())
r.deleteLatency.With(prometheus.Labels{"result": "failed"}).Observe(r.clk.Since(start).Seconds())
return err
}

r.deleteLatency.With(prometheus.Labels{"result": "success"}).Observe(time.Since(start).Seconds())
r.deleteLatency.With(prometheus.Labels{"result": "success"}).Observe(r.clk.Since(start).Seconds())
return nil
}

// Ping checks that each shard of the *redis.Ring is reachable using the PING
// command. It returns an error if any shard is unreachable and nil otherwise.
func (r *RedisSource) Ping(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, r.timeout)
defer cancel()

err := r.client.ForEachShard(ctx, func(ctx context.Context, shard *redis.Client) error {
return shard.Ping(ctx).Err()
})
Expand Down
3 changes: 1 addition & 2 deletions ratelimits/source_redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package ratelimits

import (
"testing"
"time"

"github.com/letsencrypt/boulder/cmd"
"github.com/letsencrypt/boulder/metrics"
Expand Down Expand Up @@ -33,7 +32,7 @@ func newTestRedisSource(clk clock.FakeClock, addrs map[string]string) *RedisSour
Password: "824968fa490f4ecec1e52d5e34916bdb60d45f8d",
TLSConfig: tlsConfig2,
})
return NewRedisSource(client, 5*time.Second, clk, metrics.NoopRegisterer)
return NewRedisSource(client, clk, metrics.NoopRegisterer)
}

func newRedisTestLimiter(t *testing.T, clk clock.FakeClock) *Limiter {
Expand Down
5 changes: 1 addition & 4 deletions redis/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ type Config struct {
// the system DNS will be used for resolution.
LookupDNSAuthority string `validate:"excluded_without=Lookups,omitempty,ip|hostname|hostname_port"`

// Timeout is a per-request timeout applied to all Redis requests.
Timeout config.Duration `validate:"-"`

// Enables read-only commands on replicas.
ReadOnly bool
// Allows routing read-only commands to the closest primary or replica.
Expand Down Expand Up @@ -142,7 +139,7 @@ func (c *Config) NewRing(stats prometheus.Registerer) (*redis.Ring, error) {
PoolTimeout: c.PoolTimeout.Duration,
ConnMaxIdleTime: c.IdleTimeout.Duration,
})
if len(c.ShardAddrs) != 0 {
if len(c.ShardAddrs) > 0 {
// Client was statically configured with a list of shards.
MustRegisterClientMetricsCollector(client, stats, c.ShardAddrs, c.Username)
}
Expand Down
3 changes: 2 additions & 1 deletion test/config-next/wfe2.json
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@
}
],
"lookupDNSAuthority": "consul.service.consul",
"timeout": "5s",
"readTimeout": "250ms",
"writeTimeout": "250ms",
"poolSize": 100,
"routeRandomly": true,
"tls": {
Expand Down
4 changes: 1 addition & 3 deletions wfe2/wfe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (

capb "github.com/letsencrypt/boulder/ca/proto"
"github.com/letsencrypt/boulder/cmd"
"github.com/letsencrypt/boulder/config"
"github.com/letsencrypt/boulder/core"
corepb "github.com/letsencrypt/boulder/core/proto"
berrors "github.com/letsencrypt/boulder/errors"
Expand Down Expand Up @@ -377,14 +376,13 @@ func setupWFE(t *testing.T) (WebFrontEndImpl, clock.FakeClock, requestSigner) {
},
},
LookupDNSAuthority: "consul.service.consul",
Timeout: config.Duration{Duration: 1 * time.Second},
}
rc.PasswordConfig = cmd.PasswordConfig{
PasswordFile: "../test/secrets/ratelimits_redis_password",
}
ring, _, err := rc.NewRingWithPeriodicLookups(stats, log)
test.AssertNotError(t, err, "making redis ring and lookup")
source := ratelimits.NewRedisSource(ring, rc.Timeout.Duration, fc, stats)
source := ratelimits.NewRedisSource(ring, fc, stats)
test.AssertNotNil(t, source, "source should not be nil")
limiter, err = ratelimits.NewLimiter(fc, source, "../test/config-next/wfe2-ratelimit-defaults.yml", "", stats)
test.AssertNotError(t, err, "making limiter")
Expand Down

0 comments on commit 79f905d

Please sign in to comment.