diff --git a/CHANGELOG.md b/CHANGELOG.md index ee265c18d..81b123987 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,8 @@ * [CHANGE] Added CHANGELOG.md and Pull Request template to reference the changelog * [CHANGE] Remove `cortex_` prefix for metrics registered in the ring. #46 * [CHANGE] Rename `kv/kvtls` to `crypto/tls`. #39 +* [CHANGE] grpcclient: Make ping time and timeout into configuration parameters. #56 * [ENHANCEMENT] Add middleware package. #38 * [ENHANCEMENT] Add the ring package #45 * [ENHANCEMENT] Add limiter package. #41 -* [ENHANCEMENT] Add grpcclient, grpcencoding and grpcutil packages. #39 \ No newline at end of file +* [ENHANCEMENT] Add grpcclient, grpcencoding and grpcutil packages. #39 diff --git a/crypto/tls/tls.go b/crypto/tls/tls.go index 9886b208d..2e2312cca 100644 --- a/crypto/tls/tls.go +++ b/crypto/tls/tls.go @@ -72,10 +72,17 @@ func (cfg *ClientConfig) GetTLSConfig() (*tls.Config, error) { return config, nil } -// GetGRPCDialOptions creates GRPC DialOptions for TLS +// WithInsecure wraps grpc.WithInsecure. +// +// Stubbable for tests. +var WithInsecure = func() grpc.DialOption { + return grpc.WithInsecure() +} + +// GetGRPCDialOptions creates GRPC DialOptions for TLS. func (cfg *ClientConfig) GetGRPCDialOptions(enabled bool) ([]grpc.DialOption, error) { if !enabled { - return []grpc.DialOption{grpc.WithInsecure()}, nil + return []grpc.DialOption{WithInsecure()}, nil } tlsConfig, err := cfg.GetTLSConfig() diff --git a/go.mod b/go.mod index 4ad5dc184..a0954c0fe 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/gogo/protobuf v1.3.2 github.com/gogo/status v1.1.0 github.com/golang/snappy v0.0.4 + github.com/google/go-cmp v0.5.6 github.com/gorilla/mux v1.8.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 github.com/hashicorp/consul/api v1.9.1 diff --git a/go.sum b/go.sum index 56c23c16a..da15d9111 100644 --- a/go.sum +++ b/go.sum @@ -143,8 +143,9 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= diff --git a/grpcclient/grpcclient.go b/grpcclient/grpcclient.go index 6114d15d8..275780c45 100644 --- a/grpcclient/grpcclient.go +++ b/grpcclient/grpcclient.go @@ -23,6 +23,15 @@ type Config struct { GRPCCompression string `yaml:"grpc_compression"` RateLimit float64 `yaml:"rate_limit"` RateLimitBurst int `yaml:"rate_limit_burst"` + // PingTime is the number of seconds after which the client will ping the server in case of inactivity. + // + // See `google.golang.org/grpc/keepalive.ClientParameters.Time` for reference. + PingTime int64 `yaml:"ping_time"` + // PingTimeOut is the number of seconds the client waits after pinging the server, and if no activity is seen + // after that, the connection is closed. + // + // See `google.golang.org/grpc/keepalive.ClientParameters.Timeout` for reference. + PingTimeout int64 `yaml:"ping_timeout"` BackoffOnRatelimits bool `yaml:"backoff_on_ratelimits"` BackoffConfig backoff.Config `yaml:"backoff_config"` @@ -74,12 +83,10 @@ func (cfg *Config) CallOptions() []grpc.CallOption { // DialOption returns the config as a grpc.DialOptions. func (cfg *Config) DialOption(unaryClientInterceptors []grpc.UnaryClientInterceptor, streamClientInterceptors []grpc.StreamClientInterceptor) ([]grpc.DialOption, error) { - var opts []grpc.DialOption - tlsOpts, err := cfg.TLS.GetGRPCDialOptions(cfg.TLSEnabled) + opts, err := cfg.TLS.GetGRPCDialOptions(cfg.TLSEnabled) if err != nil { return nil, err } - opts = append(opts, tlsOpts...) if cfg.BackoffOnRatelimits { unaryClientInterceptors = append([]grpc.UnaryClientInterceptor{NewBackoffRetry(cfg.BackoffConfig)}, unaryClientInterceptors...) @@ -91,13 +98,29 @@ func (cfg *Config) DialOption(unaryClientInterceptors []grpc.UnaryClientIntercep return append( opts, - grpc.WithDefaultCallOptions(cfg.CallOptions()...), - grpc.WithUnaryInterceptor(middleware.ChainUnaryClient(unaryClientInterceptors...)), - grpc.WithStreamInterceptor(middleware.ChainStreamClient(streamClientInterceptors...)), - grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: time.Second * 20, - Timeout: time.Second * 10, + withDefaultCallOptions(cfg.CallOptions()...), + withUnaryInterceptor(middleware.ChainUnaryClient(unaryClientInterceptors...)), + withStreamInterceptor(middleware.ChainStreamClient(streamClientInterceptors...)), + withKeepaliveParams(keepalive.ClientParameters{ + Time: time.Duration(cfg.PingTime) * time.Second, + Timeout: time.Duration(cfg.PingTimeout) * time.Second, PermitWithoutStream: true, }), ), nil } + +var withDefaultCallOptions = func(cos ...grpc.CallOption) grpc.DialOption { + return grpc.WithDefaultCallOptions(cos...) +} + +var withUnaryInterceptor = func(f grpc.UnaryClientInterceptor) grpc.DialOption { + return grpc.WithUnaryInterceptor(f) +} + +var withStreamInterceptor = func(f grpc.StreamClientInterceptor) grpc.DialOption { + return grpc.WithStreamInterceptor(f) +} + +var withKeepaliveParams = func(kp keepalive.ClientParameters) grpc.DialOption { + return grpc.WithKeepaliveParams(kp) +} diff --git a/grpcclient/grpcclient_test.go b/grpcclient/grpcclient_test.go new file mode 100644 index 000000000..177efd6b1 --- /dev/null +++ b/grpcclient/grpcclient_test.go @@ -0,0 +1,120 @@ +package grpcclient + +import ( + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/grafana/dskit/crypto/tls" + middleware "github.com/grpc-ecosystem/go-grpc-middleware" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" +) + +type fakeDialOption struct { + grpc.EmptyDialOption + + callOpts []grpc.CallOption + isInsecure bool + unaryClientInterceptor grpc.UnaryClientInterceptor + streamClientInterceptor grpc.StreamClientInterceptor + keepaliveParams keepalive.ClientParameters +} + +func (o fakeDialOption) Equal(other fakeDialOption) bool { + if len(o.callOpts) != len(other.callOpts) { + return false + } + + for i, arg := range o.callOpts { + if maxRecv, ok := arg.(grpc.MaxRecvMsgSizeCallOption); ok { + if maxRecv != other.callOpts[i].(grpc.MaxRecvMsgSizeCallOption) { + return false + } + continue + } + if maxSend, ok := arg.(grpc.MaxSendMsgSizeCallOption); ok { + if maxSend != other.callOpts[i].(grpc.MaxSendMsgSizeCallOption) { + return false + } + continue + } + } + + hasUnaryInterceptor := o.unaryClientInterceptor != nil + otherHasUnaryInterceptor := other.unaryClientInterceptor != nil + hasStreamInterceptor := o.streamClientInterceptor != nil + otherHasStreamInterceptor := other.streamClientInterceptor != nil + + return o.isInsecure == other.isInsecure && hasUnaryInterceptor == otherHasUnaryInterceptor && + hasStreamInterceptor == otherHasStreamInterceptor && o.keepaliveParams == other.keepaliveParams +} + +func TestConfig(t *testing.T) { + origWithDefaultCallOptions := withDefaultCallOptions + origWithUnaryInterceptor := withUnaryInterceptor + origWithStreamInterceptor := withStreamInterceptor + origWithKeepaliveParams := withKeepaliveParams + origWithInsecure := tls.WithInsecure + t.Cleanup(func() { + withDefaultCallOptions = origWithDefaultCallOptions + withUnaryInterceptor = origWithUnaryInterceptor + withStreamInterceptor = origWithStreamInterceptor + withKeepaliveParams = origWithKeepaliveParams + tls.WithInsecure = origWithInsecure + }) + + withDefaultCallOptions = func(cos ...grpc.CallOption) grpc.DialOption { + t.Log("Received call options", "options", cos) + return fakeDialOption{callOpts: cos} + } + withUnaryInterceptor = func(f grpc.UnaryClientInterceptor) grpc.DialOption { + t.Log("Received unary client interceptor", f) + return fakeDialOption{unaryClientInterceptor: f} + } + withStreamInterceptor = func(f grpc.StreamClientInterceptor) grpc.DialOption { + t.Log("Received stream client interceptor", f) + return fakeDialOption{streamClientInterceptor: f} + } + withKeepaliveParams = func(kp keepalive.ClientParameters) grpc.DialOption { + t.Log("Received keepalive params", kp) + return fakeDialOption{ + keepaliveParams: kp, + } + } + tls.WithInsecure = func() grpc.DialOption { + return fakeDialOption{isInsecure: true} + } + + cfg := Config{ + PingTime: 10, + PingTimeout: 20, + } + expOpts := []grpc.DialOption{ + fakeDialOption{isInsecure: true}, + fakeDialOption{callOpts: []grpc.CallOption{ + grpc.MaxCallRecvMsgSize(0), + grpc.MaxCallSendMsgSize(0), + }}, + fakeDialOption{ + unaryClientInterceptor: middleware.ChainUnaryClient(), + }, + fakeDialOption{ + streamClientInterceptor: middleware.ChainStreamClient(), + }, + fakeDialOption{ + keepaliveParams: keepalive.ClientParameters{ + Time: 10 * time.Second, + Timeout: 20 * time.Second, + PermitWithoutStream: true, + }, + }, + } + + opts, err := cfg.DialOption(nil, nil) + require.NoError(t, err) + + assert.Empty(t, cmp.Diff(expOpts, opts)) +}