From a512ec8da872bac72e73462b07b6a9c408f62a50 Mon Sep 17 00:00:00 2001 From: morvencao Date: Thu, 18 Jul 2024 08:09:10 +0000 Subject: [PATCH] enable grpc keepalive. Signed-off-by: morvencao --- .../generic/options/grpc/options.go | 79 +++++++++++++++++-- .../generic/options/grpc/options_test.go | 38 +++++++++ .../generic/optionsbuilder_test.go | 13 ++- test/integration/cloudevents/broker/grpc.go | 12 ++- .../cloudevents/cloudevetns_grpc_test.go | 8 ++ test/integration/cloudevents/server/grpc.go | 12 ++- test/integration/cloudevents/util/grpc.go | 8 ++ 7 files changed, 156 insertions(+), 14 deletions(-) diff --git a/pkg/cloudevents/generic/options/grpc/options.go b/pkg/cloudevents/generic/options/grpc/options.go index 151618f9..35c241e4 100644 --- a/pkg/cloudevents/generic/options/grpc/options.go +++ b/pkg/cloudevents/generic/options/grpc/options.go @@ -12,6 +12,7 @@ import ( "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/keepalive" "gopkg.in/yaml.v2" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options" @@ -20,10 +21,19 @@ import ( // GRPCOptions holds the options that are used to build gRPC client. type GRPCOptions struct { - URL string - CAFile string - ClientCertFile string - ClientKeyFile string + URL string + CAFile string + ClientCertFile string + ClientKeyFile string + KeepAliveOptions KeepAliveOptions +} + +// KeepAliveOptions holds the keepalive options for the gRPC client. +type KeepAliveOptions struct { + Enable bool + Time time.Duration + Timeout time.Duration + PermitWithoutStream bool } // GRPCConfig holds the information needed to build connect to gRPC server as a given user. @@ -36,6 +46,26 @@ type GRPCConfig struct { ClientCertFile string `json:"clientCertFile,omitempty" yaml:"clientCertFile,omitempty"` // ClientKeyFile is the file path to a client key file for TLS. ClientKeyFile string `json:"clientKeyFile,omitempty" yaml:"clientKeyFile,omitempty"` + // keepalive options + KeepAliveConfig KeepAliveConfig `json:"keepAliveConfig,omitempty" yaml:"keepAliveConfig,omitempty"` +} + +// KeepAliveConfig holds the keepalive options for the gRPC client. +type KeepAliveConfig struct { + // Enable specifies whether the keepalive option is enabled. + // When disabled, other keepalive configurations are ignored. Default is false. + Enable bool `json:"enable,omitempty" yaml:"enable,omitempty"` + // Time sets the duration after which the client pings the server if no activity is seen. + // A minimum value of 10s is enforced if set below that. Default is 30s. + Time *time.Duration `json:"time,omitempty" yaml:"time,omitempty"` + + // Timeout sets the duration the client waits for a response after a keepalive ping. + // If no response is received, the connection is closed. Default is 10s. + Timeout *time.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"` + + // PermitWithoutStream determines if keepalive pings are sent when there are no active RPCs. + // If false, pings are not sent and Time and Timeout are ignored. Default is false. + PermitWithoutStream bool `json:"permitWithoutStream,omitempty" yaml:"permitWithoutStream,omitempty"` } // BuildGRPCOptionsFromFlags builds configs from a config filepath. @@ -62,12 +92,32 @@ func BuildGRPCOptionsFromFlags(configPath string) (*GRPCOptions, error) { return nil, fmt.Errorf("setting clientCertFile and clientKeyFile requires caFile") } - return &GRPCOptions{ + options := &GRPCOptions{ URL: config.URL, CAFile: config.CAFile, ClientCertFile: config.ClientCertFile, ClientKeyFile: config.ClientKeyFile, - }, nil + KeepAliveOptions: KeepAliveOptions{ + Enable: false, + Time: 30 * time.Second, + Timeout: 10 * time.Second, + PermitWithoutStream: false, + }, + } + + options.KeepAliveOptions.Enable = config.KeepAliveConfig.Enable + + if config.KeepAliveConfig.Time != nil { + options.KeepAliveOptions.Time = *config.KeepAliveConfig.Time + } + + if config.KeepAliveConfig.Timeout != nil { + options.KeepAliveOptions.Timeout = *config.KeepAliveConfig.Timeout + } + + options.KeepAliveOptions.PermitWithoutStream = config.KeepAliveConfig.PermitWithoutStream + + return options, nil } func NewGRPCOptions() *GRPCOptions { @@ -75,6 +125,17 @@ func NewGRPCOptions() *GRPCOptions { } func (o *GRPCOptions) GetGRPCClientConn() (*grpc.ClientConn, error) { + var kacp = keepalive.ClientParameters{ + Time: o.KeepAliveOptions.Time, + Timeout: o.KeepAliveOptions.Timeout, + PermitWithoutStream: o.KeepAliveOptions.PermitWithoutStream, + } + + dialOpts := []grpc.DialOption{} + if o.KeepAliveOptions.Enable { + dialOpts = append(dialOpts, grpc.WithKeepaliveParams(kacp)) + } + if len(o.CAFile) != 0 { certPool, err := x509.SystemCertPool() if err != nil { @@ -102,7 +163,8 @@ func (o *GRPCOptions) GetGRPCClientConn() (*grpc.ClientConn, error) { MaxVersion: tls.VersionTLS13, } - conn, err := grpc.Dial(o.URL, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))) + dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))) + conn, err := grpc.Dial(o.URL, dialOpts...) if err != nil { return nil, fmt.Errorf("failed to connect to grpc server %s, %v", o.URL, err) } @@ -110,7 +172,8 @@ func (o *GRPCOptions) GetGRPCClientConn() (*grpc.ClientConn, error) { return conn, nil } - conn, err := grpc.Dial(o.URL, grpc.WithTransportCredentials(insecure.NewCredentials())) + dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err := grpc.Dial(o.URL, dialOpts...) if err != nil { return nil, fmt.Errorf("failed to connect to grpc server %s, %v", o.URL, err) } diff --git a/pkg/cloudevents/generic/options/grpc/options_test.go b/pkg/cloudevents/generic/options/grpc/options_test.go index ff6a3ff6..1a7a9dc0 100644 --- a/pkg/cloudevents/generic/options/grpc/options_test.go +++ b/pkg/cloudevents/generic/options/grpc/options_test.go @@ -5,6 +5,7 @@ import ( "os" "reflect" "testing" + "time" ) func TestBuildGRPCOptionsFromFlags(t *testing.T) { @@ -40,6 +41,12 @@ func TestBuildGRPCOptionsFromFlags(t *testing.T) { config: "{\"url\":\"test\"}", expectedOptions: &GRPCOptions{ URL: "test", + KeepAliveOptions: KeepAliveOptions{ + Enable: false, + Time: 30 * time.Second, + Timeout: 10 * time.Second, + PermitWithoutStream: false, + }, }, }, { @@ -47,6 +54,25 @@ func TestBuildGRPCOptionsFromFlags(t *testing.T) { config: "url: test", expectedOptions: &GRPCOptions{ URL: "test", + KeepAliveOptions: KeepAliveOptions{ + Enable: false, + Time: 30 * time.Second, + Timeout: 10 * time.Second, + PermitWithoutStream: false, + }, + }, + }, + { + name: "customized options with keepalive", + config: "{\"url\":\"test\",\"keepAliveConfig\":{\"enable\":true,\"time\":10s,\"timeout\":5s,\"permitWithoutStream\":true}}", + expectedOptions: &GRPCOptions{ + URL: "test", + KeepAliveOptions: KeepAliveOptions{ + Enable: true, + Time: 10 * time.Second, + Timeout: 5 * time.Second, + PermitWithoutStream: true, + }, }, }, { @@ -55,6 +81,12 @@ func TestBuildGRPCOptionsFromFlags(t *testing.T) { expectedOptions: &GRPCOptions{ URL: "test", CAFile: "test", + KeepAliveOptions: KeepAliveOptions{ + Enable: false, + Time: 30 * time.Second, + Timeout: 10 * time.Second, + PermitWithoutStream: false, + }, }, }, { @@ -65,6 +97,12 @@ func TestBuildGRPCOptionsFromFlags(t *testing.T) { CAFile: "test", ClientCertFile: "test", ClientKeyFile: "test", + KeepAliveOptions: KeepAliveOptions{ + Enable: false, + Time: 30 * time.Second, + Timeout: 10 * time.Second, + PermitWithoutStream: false, + }, }, }, } diff --git a/pkg/cloudevents/generic/optionsbuilder_test.go b/pkg/cloudevents/generic/optionsbuilder_test.go index 5ac3e960..769ba6eb 100644 --- a/pkg/cloudevents/generic/optionsbuilder_test.go +++ b/pkg/cloudevents/generic/optionsbuilder_test.go @@ -56,10 +56,15 @@ func TestBuildCloudEventsSourceOptions(t *testing.T) { }, }, { - name: "grpc config", - configType: "grpc", - configFile: configFile(t, "grpc-config-test-", []byte(grpcConfig)), - expectedOptions: &grpc.GRPCOptions{URL: "grpc"}, + name: "grpc config", + configType: "grpc", + configFile: configFile(t, "grpc-config-test-", []byte(grpcConfig)), + expectedOptions: &grpc.GRPCOptions{URL: "grpc", KeepAliveOptions: grpc.KeepAliveOptions{ + Enable: false, + Time: 30 * time.Second, + Timeout: 10 * time.Second, + PermitWithoutStream: false, + }}, }, } diff --git a/test/integration/cloudevents/broker/grpc.go b/test/integration/cloudevents/broker/grpc.go index 1c8f3f9d..1acd29bc 100644 --- a/test/integration/cloudevents/broker/grpc.go +++ b/test/integration/cloudevents/broker/grpc.go @@ -5,12 +5,14 @@ import ( "fmt" "log" "net" + "time" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/cloudevents/sdk-go/v2/binding" cloudeventstypes "github.com/cloudevents/sdk-go/v2/types" "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" "google.golang.org/protobuf/types/known/emptypb" "k8s.io/klog/v2" @@ -100,7 +102,15 @@ func (bkr *GRPCBroker) Start(addr string) error { log.Printf("failed to listen: %v", err) return err } - grpcBroker := grpc.NewServer() + kaep := keepalive.EnforcementPolicy{ + MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection + PermitWithoutStream: true, // Allow pings even when there are no active streams + } + kasp := keepalive.ServerParameters{ + Time: 5 * time.Second, + Timeout: 1 * time.Second, + } + grpcBroker := grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp)) pbv1.RegisterCloudEventServiceServer(grpcBroker, bkr) return grpcBroker.Serve(lis) } diff --git a/test/integration/cloudevents/cloudevetns_grpc_test.go b/test/integration/cloudevents/cloudevetns_grpc_test.go index 54929730..ca4ace84 100644 --- a/test/integration/cloudevents/cloudevetns_grpc_test.go +++ b/test/integration/cloudevents/cloudevetns_grpc_test.go @@ -3,6 +3,7 @@ package cloudevents import ( "context" "log" + "time" "github.com/onsi/ginkgo" @@ -41,5 +42,12 @@ func GetGRPCSourceOptions(ctx context.Context, sourceID string) (*options.CloudE grpcOptions := grpc.NewGRPCOptions() grpcOptions.URL = grpcServerHost + grpcOptions.KeepAliveOptions = grpc.KeepAliveOptions{ + Enable: true, + Time: 10 * time.Second, + Timeout: 5 * time.Second, + PermitWithoutStream: true, + } + return grpc.NewSourceOptions(grpcOptions, sourceID), constants.ConfigTypeGRPC } diff --git a/test/integration/cloudevents/server/grpc.go b/test/integration/cloudevents/server/grpc.go index 6a113bd6..db7cca86 100644 --- a/test/integration/cloudevents/server/grpc.go +++ b/test/integration/cloudevents/server/grpc.go @@ -5,12 +5,14 @@ import ( "fmt" "log" "net" + "time" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/cloudevents/sdk-go/v2/binding" cloudeventstypes "github.com/cloudevents/sdk-go/v2/types" "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" "google.golang.org/protobuf/types/known/emptypb" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -91,7 +93,15 @@ func (svr *GRPCServer) Start(addr string) error { log.Printf("failed to listen: %v", err) return err } - grpcServer := grpc.NewServer() + kaep := keepalive.EnforcementPolicy{ + MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection + PermitWithoutStream: true, // Allow pings even when there are no active streams + } + kasp := keepalive.ServerParameters{ + Time: 5 * time.Second, + Timeout: 1 * time.Second, + } + grpcServer := grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp)) pbv1.RegisterCloudEventServiceServer(grpcServer, svr) return grpcServer.Serve(lis) } diff --git a/test/integration/cloudevents/util/grpc.go b/test/integration/cloudevents/util/grpc.go index 1c6c6058..aac5d91e 100644 --- a/test/integration/cloudevents/util/grpc.go +++ b/test/integration/cloudevents/util/grpc.go @@ -1,6 +1,8 @@ package util import ( + "time" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc" ) @@ -11,5 +13,11 @@ func NewGRPCAgentOptions(brokerURL string) *grpc.GRPCOptions { func newGRPCOptions(brokerURL string) *grpc.GRPCOptions { return &grpc.GRPCOptions{ URL: brokerURL, + KeepAliveOptions: grpc.KeepAliveOptions{ + Enable: true, + Time: 10 * time.Second, + Timeout: 5 * time.Second, + PermitWithoutStream: true, + }, } }