From 37faf51c8553440e0c5011931ea50ad172e276dd Mon Sep 17 00:00:00 2001 From: morvencao Date: Thu, 18 Jul 2024 08:09:10 +0000 Subject: [PATCH] enable grpc keepalive. Signed-off-by: morvencao --- .../generic/options/grpc/options.go | 89 ++++++++++++++++--- .../generic/options/grpc/options_test.go | 38 ++++++++ .../generic/optionsbuilder_test.go | 13 ++- test/integration/cloudevents/broker/grpc.go | 12 ++- .../cloudevents/cloudevetns_grpc_test.go | 7 ++ test/integration/cloudevents/server/grpc.go | 12 ++- test/integration/cloudevents/util/grpc.go | 8 ++ 7 files changed, 159 insertions(+), 20 deletions(-) diff --git a/pkg/cloudevents/generic/options/grpc/options.go b/pkg/cloudevents/generic/options/grpc/options.go index 564ccf33..755734c5 100644 --- a/pkg/cloudevents/generic/options/grpc/options.go +++ b/pkg/cloudevents/generic/options/grpc/options.go @@ -14,6 +14,7 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/oauth" + "google.golang.org/grpc/keepalive" "gopkg.in/yaml.v2" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options" @@ -22,11 +23,20 @@ import ( // GRPCOptions holds the options that are used to build gRPC client. type GRPCOptions struct { - URL string - CAFile string - ClientCertFile string - ClientKeyFile string - TokenFile string + URL string + CAFile string + ClientCertFile string + ClientKeyFile string + TokenFile string + KeepAliveOptions KeepAliveOptions +} + +// KeepAliveOptions holds the keepalive options for the gRPC client. +type KeepAliveOptions struct { + Enable bool + Time time.Duration + Timeout time.Duration + PermitWithoutStream bool } // GRPCConfig holds the information needed to build connect to gRPC server as a given user. @@ -41,6 +51,26 @@ type GRPCConfig struct { ClientKeyFile string `json:"clientKeyFile,omitempty" yaml:"clientKeyFile,omitempty"` // TokenFile is the file path to a token file for authentication. TokenFile string `json:"tokenFile,omitempty" yaml:"tokenFile,omitempty"` + // keepalive options + KeepAliveConfig KeepAliveConfig `json:"keepAliveConfig,omitempty" yaml:"keepAliveConfig,omitempty"` +} + +// KeepAliveConfig holds the keepalive options for the gRPC client. +type KeepAliveConfig struct { + // Enable specifies whether the keepalive option is enabled. + // When disabled, other keepalive configurations are ignored. Default is false. + Enable bool `json:"enable,omitempty" yaml:"enable,omitempty"` + // Time sets the duration after which the client pings the server if no activity is seen. + // A minimum value of 10s is enforced if set below that. Default is 30s. + Time *time.Duration `json:"time,omitempty" yaml:"time,omitempty"` + + // Timeout sets the duration the client waits for a response after a keepalive ping. + // If no response is received, the connection is closed. Default is 10s. + Timeout *time.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"` + + // PermitWithoutStream determines if keepalive pings are sent when there are no active RPCs. + // If false, pings are not sent and Time and Timeout are ignored. Default is false. + PermitWithoutStream bool `json:"permitWithoutStream,omitempty" yaml:"permitWithoutStream,omitempty"` } // BuildGRPCOptionsFromFlags builds configs from a config filepath. @@ -70,13 +100,33 @@ func BuildGRPCOptionsFromFlags(configPath string) (*GRPCOptions, error) { return nil, fmt.Errorf("setting tokenFile requires caFile") } - return &GRPCOptions{ + options := &GRPCOptions{ URL: config.URL, CAFile: config.CAFile, ClientCertFile: config.ClientCertFile, ClientKeyFile: config.ClientKeyFile, TokenFile: config.TokenFile, - }, nil + KeepAliveOptions: KeepAliveOptions{ + Enable: false, + Time: 30 * time.Second, + Timeout: 10 * time.Second, + PermitWithoutStream: false, + }, + } + + options.KeepAliveOptions.Enable = config.KeepAliveConfig.Enable + + if config.KeepAliveConfig.Time != nil { + options.KeepAliveOptions.Time = *config.KeepAliveConfig.Time + } + + if config.KeepAliveConfig.Timeout != nil { + options.KeepAliveOptions.Timeout = *config.KeepAliveConfig.Timeout + } + + options.KeepAliveOptions.PermitWithoutStream = config.KeepAliveConfig.PermitWithoutStream + + return options, nil } func NewGRPCOptions() *GRPCOptions { @@ -84,6 +134,18 @@ func NewGRPCOptions() *GRPCOptions { } func (o *GRPCOptions) GetGRPCClientConn() (*grpc.ClientConn, error) { + var kacp = keepalive.ClientParameters{ + Time: o.KeepAliveOptions.Time, + Timeout: o.KeepAliveOptions.Timeout, + PermitWithoutStream: o.KeepAliveOptions.PermitWithoutStream, + } + + // Prepare gRPC dial options. + dialOpts := []grpc.DialOption{} + if o.KeepAliveOptions.Enable { + dialOpts = append(dialOpts, grpc.WithKeepaliveParams(kacp)) + } + if len(o.CAFile) != 0 { certPool, err := x509.SystemCertPool() if err != nil { @@ -99,8 +161,6 @@ func (o *GRPCOptions) GetGRPCClientConn() (*grpc.ClientConn, error) { return nil, fmt.Errorf("invalid CA %s", o.CAFile) } - // Prepare gRPC dial options. - diaOpts := []grpc.DialOption{} // Create a TLS configuration with CA pool and TLS 1.3. tlsConfig := &tls.Config{ RootCAs: certPool, @@ -117,10 +177,10 @@ func (o *GRPCOptions) GetGRPCClientConn() (*grpc.ClientConn, error) { } // Add client certificates to the TLS configuration. tlsConfig.Certificates = []tls.Certificate{clientCerts} - diaOpts = append(diaOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))) + dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))) } else { // token based authentication requires the configuration of transport credentials. - diaOpts = append(diaOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))) + dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))) if len(o.TokenFile) != 0 { // Use token-based authentication if token file is provided. token, err := os.ReadFile(o.TokenFile) @@ -132,12 +192,12 @@ func (o *GRPCOptions) GetGRPCClientConn() (*grpc.ClientConn, error) { AccessToken: string(token), })} // Add per-RPC credentials to the dial options. - diaOpts = append(diaOpts, grpc.WithPerRPCCredentials(perRPCCred)) + dialOpts = append(dialOpts, grpc.WithPerRPCCredentials(perRPCCred)) } } // Establish a connection to the gRPC server. - conn, err := grpc.Dial(o.URL, diaOpts...) + conn, err := grpc.Dial(o.URL, dialOpts...) if err != nil { return nil, fmt.Errorf("failed to connect to grpc server %s, %v", o.URL, err) } @@ -146,7 +206,8 @@ func (o *GRPCOptions) GetGRPCClientConn() (*grpc.ClientConn, error) { } // Insecure connection option; should not be used in production. - conn, err := grpc.Dial(o.URL, grpc.WithTransportCredentials(insecure.NewCredentials())) + dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err := grpc.Dial(o.URL, dialOpts...) if err != nil { return nil, fmt.Errorf("failed to connect to grpc server %s, %v", o.URL, err) } diff --git a/pkg/cloudevents/generic/options/grpc/options_test.go b/pkg/cloudevents/generic/options/grpc/options_test.go index f376b0aa..f6e132cb 100644 --- a/pkg/cloudevents/generic/options/grpc/options_test.go +++ b/pkg/cloudevents/generic/options/grpc/options_test.go @@ -4,6 +4,7 @@ import ( "os" "reflect" "testing" + "time" clienttesting "open-cluster-management.io/sdk-go/pkg/testing" ) @@ -40,6 +41,12 @@ func TestBuildGRPCOptionsFromFlags(t *testing.T) { config: "{\"url\":\"test\"}", expectedOptions: &GRPCOptions{ URL: "test", + KeepAliveOptions: KeepAliveOptions{ + Enable: false, + Time: 30 * time.Second, + Timeout: 10 * time.Second, + PermitWithoutStream: false, + }, }, }, { @@ -47,6 +54,25 @@ func TestBuildGRPCOptionsFromFlags(t *testing.T) { config: "url: test", expectedOptions: &GRPCOptions{ URL: "test", + KeepAliveOptions: KeepAliveOptions{ + Enable: false, + Time: 30 * time.Second, + Timeout: 10 * time.Second, + PermitWithoutStream: false, + }, + }, + }, + { + name: "customized options with keepalive", + config: "{\"url\":\"test\",\"keepAliveConfig\":{\"enable\":true,\"time\":10s,\"timeout\":5s,\"permitWithoutStream\":true}}", + expectedOptions: &GRPCOptions{ + URL: "test", + KeepAliveOptions: KeepAliveOptions{ + Enable: true, + Time: 10 * time.Second, + Timeout: 5 * time.Second, + PermitWithoutStream: true, + }, }, }, { @@ -55,6 +81,12 @@ func TestBuildGRPCOptionsFromFlags(t *testing.T) { expectedOptions: &GRPCOptions{ URL: "test", CAFile: "test", + KeepAliveOptions: KeepAliveOptions{ + Enable: false, + Time: 30 * time.Second, + Timeout: 10 * time.Second, + PermitWithoutStream: false, + }, }, }, { @@ -65,6 +97,12 @@ func TestBuildGRPCOptionsFromFlags(t *testing.T) { CAFile: "test", ClientCertFile: "test", ClientKeyFile: "test", + KeepAliveOptions: KeepAliveOptions{ + Enable: false, + Time: 30 * time.Second, + Timeout: 10 * time.Second, + PermitWithoutStream: false, + }, }, }, { diff --git a/pkg/cloudevents/generic/optionsbuilder_test.go b/pkg/cloudevents/generic/optionsbuilder_test.go index aacf578f..d2425122 100644 --- a/pkg/cloudevents/generic/optionsbuilder_test.go +++ b/pkg/cloudevents/generic/optionsbuilder_test.go @@ -57,10 +57,15 @@ func TestBuildCloudEventsSourceOptions(t *testing.T) { }, }, { - name: "grpc config", - configType: "grpc", - configFile: configFile(t, "grpc-config-test-", []byte(grpcConfig)), - expectedOptions: &grpc.GRPCOptions{URL: "grpc"}, + name: "grpc config", + configType: "grpc", + configFile: configFile(t, "grpc-config-test-", []byte(grpcConfig)), + expectedOptions: &grpc.GRPCOptions{URL: "grpc", KeepAliveOptions: grpc.KeepAliveOptions{ + Enable: false, + Time: 30 * time.Second, + Timeout: 10 * time.Second, + PermitWithoutStream: false, + }}, }, } diff --git a/test/integration/cloudevents/broker/grpc.go b/test/integration/cloudevents/broker/grpc.go index 1c8f3f9d..1acd29bc 100644 --- a/test/integration/cloudevents/broker/grpc.go +++ b/test/integration/cloudevents/broker/grpc.go @@ -5,12 +5,14 @@ import ( "fmt" "log" "net" + "time" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/cloudevents/sdk-go/v2/binding" cloudeventstypes "github.com/cloudevents/sdk-go/v2/types" "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" "google.golang.org/protobuf/types/known/emptypb" "k8s.io/klog/v2" @@ -100,7 +102,15 @@ func (bkr *GRPCBroker) Start(addr string) error { log.Printf("failed to listen: %v", err) return err } - grpcBroker := grpc.NewServer() + kaep := keepalive.EnforcementPolicy{ + MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection + PermitWithoutStream: true, // Allow pings even when there are no active streams + } + kasp := keepalive.ServerParameters{ + Time: 5 * time.Second, + Timeout: 1 * time.Second, + } + grpcBroker := grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp)) pbv1.RegisterCloudEventServiceServer(grpcBroker, bkr) return grpcBroker.Serve(lis) } diff --git a/test/integration/cloudevents/cloudevetns_grpc_test.go b/test/integration/cloudevents/cloudevetns_grpc_test.go index a8f8f4e9..d2ba180c 100644 --- a/test/integration/cloudevents/cloudevetns_grpc_test.go +++ b/test/integration/cloudevents/cloudevetns_grpc_test.go @@ -4,6 +4,7 @@ import ( "context" "log" "strings" + "time" "github.com/onsi/ginkgo" @@ -49,6 +50,12 @@ func GetGRPCSourceOptions(ctx context.Context, sourceID string) (*options.CloudE grpcOptions.URL = grpcServerHost grpcOptions.CAFile = serverCAFile grpcOptions.TokenFile = tokenFile + grpcOptions.KeepAliveOptions = grpcoptions.KeepAliveOptions{ + Enable: true, + Time: 10 * time.Second, + Timeout: 5 * time.Second, + PermitWithoutStream: true, + } return grpcoptions.NewSourceOptions(grpcOptions, sourceID), constants.ConfigTypeGRPC } diff --git a/test/integration/cloudevents/server/grpc.go b/test/integration/cloudevents/server/grpc.go index dd374bfa..5a6acb57 100644 --- a/test/integration/cloudevents/server/grpc.go +++ b/test/integration/cloudevents/server/grpc.go @@ -5,12 +5,14 @@ import ( "fmt" "log" "net" + "time" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/cloudevents/sdk-go/v2/binding" cloudeventstypes "github.com/cloudevents/sdk-go/v2/types" "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" "google.golang.org/protobuf/types/known/emptypb" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -91,7 +93,15 @@ func (svr *GRPCServer) Start(addr string, serverOpts []grpc.ServerOption) error log.Printf("failed to listen: %v", err) return err } - grpcServer := grpc.NewServer(serverOpts...) + kaep := keepalive.EnforcementPolicy{ + MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection + PermitWithoutStream: true, // Allow pings even when there are no active streams + } + kasp := keepalive.ServerParameters{ + Time: 5 * time.Second, + Timeout: 1 * time.Second, + } + grpcServer := grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp)) pbv1.RegisterCloudEventServiceServer(grpcServer, svr) return grpcServer.Serve(lis) } diff --git a/test/integration/cloudevents/util/grpc.go b/test/integration/cloudevents/util/grpc.go index 1c6c6058..aac5d91e 100644 --- a/test/integration/cloudevents/util/grpc.go +++ b/test/integration/cloudevents/util/grpc.go @@ -1,6 +1,8 @@ package util import ( + "time" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc" ) @@ -11,5 +13,11 @@ func NewGRPCAgentOptions(brokerURL string) *grpc.GRPCOptions { func newGRPCOptions(brokerURL string) *grpc.GRPCOptions { return &grpc.GRPCOptions{ URL: brokerURL, + KeepAliveOptions: grpc.KeepAliveOptions{ + Enable: true, + Time: 10 * time.Second, + Timeout: 5 * time.Second, + PermitWithoutStream: true, + }, } }