Skip to content

Commit

Permalink
enable grpc keepalive.
Browse files Browse the repository at this point in the history
Signed-off-by: morvencao <[email protected]>
  • Loading branch information
morvencao committed Jul 19, 2024
1 parent 955108a commit a512ec8
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 14 deletions.
79 changes: 71 additions & 8 deletions pkg/cloudevents/generic/options/grpc/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"gopkg.in/yaml.v2"

"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options"
Expand All @@ -20,10 +21,19 @@ import (

// GRPCOptions holds the options that are used to build gRPC client.
type GRPCOptions struct {
URL string
CAFile string
ClientCertFile string
ClientKeyFile string
URL string
CAFile string
ClientCertFile string
ClientKeyFile string
KeepAliveOptions KeepAliveOptions
}

// KeepAliveOptions holds the keepalive options for the gRPC client.
type KeepAliveOptions struct {
Enable bool
Time time.Duration
Timeout time.Duration
PermitWithoutStream bool
}

// GRPCConfig holds the information needed to build connect to gRPC server as a given user.
Expand All @@ -36,6 +46,26 @@ type GRPCConfig struct {
ClientCertFile string `json:"clientCertFile,omitempty" yaml:"clientCertFile,omitempty"`
// ClientKeyFile is the file path to a client key file for TLS.
ClientKeyFile string `json:"clientKeyFile,omitempty" yaml:"clientKeyFile,omitempty"`
// keepalive options
KeepAliveConfig KeepAliveConfig `json:"keepAliveConfig,omitempty" yaml:"keepAliveConfig,omitempty"`
}

// KeepAliveConfig holds the keepalive options for the gRPC client.
type KeepAliveConfig struct {
// Enable specifies whether the keepalive option is enabled.
// When disabled, other keepalive configurations are ignored. Default is false.
Enable bool `json:"enable,omitempty" yaml:"enable,omitempty"`
// Time sets the duration after which the client pings the server if no activity is seen.
// A minimum value of 10s is enforced if set below that. Default is 30s.
Time *time.Duration `json:"time,omitempty" yaml:"time,omitempty"`

// Timeout sets the duration the client waits for a response after a keepalive ping.
// If no response is received, the connection is closed. Default is 10s.
Timeout *time.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"`

// PermitWithoutStream determines if keepalive pings are sent when there are no active RPCs.
// If false, pings are not sent and Time and Timeout are ignored. Default is false.
PermitWithoutStream bool `json:"permitWithoutStream,omitempty" yaml:"permitWithoutStream,omitempty"`
}

// BuildGRPCOptionsFromFlags builds configs from a config filepath.
Expand All @@ -62,19 +92,50 @@ func BuildGRPCOptionsFromFlags(configPath string) (*GRPCOptions, error) {
return nil, fmt.Errorf("setting clientCertFile and clientKeyFile requires caFile")
}

return &GRPCOptions{
options := &GRPCOptions{
URL: config.URL,
CAFile: config.CAFile,
ClientCertFile: config.ClientCertFile,
ClientKeyFile: config.ClientKeyFile,
}, nil
KeepAliveOptions: KeepAliveOptions{
Enable: false,
Time: 30 * time.Second,
Timeout: 10 * time.Second,
PermitWithoutStream: false,
},
}

options.KeepAliveOptions.Enable = config.KeepAliveConfig.Enable

if config.KeepAliveConfig.Time != nil {
options.KeepAliveOptions.Time = *config.KeepAliveConfig.Time
}

if config.KeepAliveConfig.Timeout != nil {
options.KeepAliveOptions.Timeout = *config.KeepAliveConfig.Timeout
}

options.KeepAliveOptions.PermitWithoutStream = config.KeepAliveConfig.PermitWithoutStream

return options, nil
}

func NewGRPCOptions() *GRPCOptions {
return &GRPCOptions{}
}

func (o *GRPCOptions) GetGRPCClientConn() (*grpc.ClientConn, error) {
var kacp = keepalive.ClientParameters{
Time: o.KeepAliveOptions.Time,
Timeout: o.KeepAliveOptions.Timeout,
PermitWithoutStream: o.KeepAliveOptions.PermitWithoutStream,
}

dialOpts := []grpc.DialOption{}
if o.KeepAliveOptions.Enable {
dialOpts = append(dialOpts, grpc.WithKeepaliveParams(kacp))
}

if len(o.CAFile) != 0 {
certPool, err := x509.SystemCertPool()
if err != nil {
Expand Down Expand Up @@ -102,15 +163,17 @@ func (o *GRPCOptions) GetGRPCClientConn() (*grpc.ClientConn, error) {
MaxVersion: tls.VersionTLS13,
}

conn, err := grpc.Dial(o.URL, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
conn, err := grpc.Dial(o.URL, dialOpts...)
if err != nil {
return nil, fmt.Errorf("failed to connect to grpc server %s, %v", o.URL, err)
}

return conn, nil
}

conn, err := grpc.Dial(o.URL, grpc.WithTransportCredentials(insecure.NewCredentials()))
dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.Dial(o.URL, dialOpts...)
if err != nil {
return nil, fmt.Errorf("failed to connect to grpc server %s, %v", o.URL, err)
}
Expand Down
38 changes: 38 additions & 0 deletions pkg/cloudevents/generic/options/grpc/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"reflect"
"testing"
"time"
)

func TestBuildGRPCOptionsFromFlags(t *testing.T) {
Expand Down Expand Up @@ -40,13 +41,38 @@ func TestBuildGRPCOptionsFromFlags(t *testing.T) {
config: "{\"url\":\"test\"}",
expectedOptions: &GRPCOptions{
URL: "test",
KeepAliveOptions: KeepAliveOptions{
Enable: false,
Time: 30 * time.Second,
Timeout: 10 * time.Second,
PermitWithoutStream: false,
},
},
},
{
name: "customized options with yaml format",
config: "url: test",
expectedOptions: &GRPCOptions{
URL: "test",
KeepAliveOptions: KeepAliveOptions{
Enable: false,
Time: 30 * time.Second,
Timeout: 10 * time.Second,
PermitWithoutStream: false,
},
},
},
{
name: "customized options with keepalive",
config: "{\"url\":\"test\",\"keepAliveConfig\":{\"enable\":true,\"time\":10s,\"timeout\":5s,\"permitWithoutStream\":true}}",
expectedOptions: &GRPCOptions{
URL: "test",
KeepAliveOptions: KeepAliveOptions{
Enable: true,
Time: 10 * time.Second,
Timeout: 5 * time.Second,
PermitWithoutStream: true,
},
},
},
{
Expand All @@ -55,6 +81,12 @@ func TestBuildGRPCOptionsFromFlags(t *testing.T) {
expectedOptions: &GRPCOptions{
URL: "test",
CAFile: "test",
KeepAliveOptions: KeepAliveOptions{
Enable: false,
Time: 30 * time.Second,
Timeout: 10 * time.Second,
PermitWithoutStream: false,
},
},
},
{
Expand All @@ -65,6 +97,12 @@ func TestBuildGRPCOptionsFromFlags(t *testing.T) {
CAFile: "test",
ClientCertFile: "test",
ClientKeyFile: "test",
KeepAliveOptions: KeepAliveOptions{
Enable: false,
Time: 30 * time.Second,
Timeout: 10 * time.Second,
PermitWithoutStream: false,
},
},
},
}
Expand Down
13 changes: 9 additions & 4 deletions pkg/cloudevents/generic/optionsbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,15 @@ func TestBuildCloudEventsSourceOptions(t *testing.T) {
},
},
{
name: "grpc config",
configType: "grpc",
configFile: configFile(t, "grpc-config-test-", []byte(grpcConfig)),
expectedOptions: &grpc.GRPCOptions{URL: "grpc"},
name: "grpc config",
configType: "grpc",
configFile: configFile(t, "grpc-config-test-", []byte(grpcConfig)),
expectedOptions: &grpc.GRPCOptions{URL: "grpc", KeepAliveOptions: grpc.KeepAliveOptions{
Enable: false,
Time: 30 * time.Second,
Timeout: 10 * time.Second,
PermitWithoutStream: false,
}},
},
}

Expand Down
12 changes: 11 additions & 1 deletion test/integration/cloudevents/broker/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import (
"fmt"
"log"
"net"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
cloudeventstypes "github.com/cloudevents/sdk-go/v2/types"

"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"google.golang.org/protobuf/types/known/emptypb"

"k8s.io/klog/v2"
Expand Down Expand Up @@ -100,7 +102,15 @@ func (bkr *GRPCBroker) Start(addr string) error {
log.Printf("failed to listen: %v", err)
return err
}
grpcBroker := grpc.NewServer()
kaep := keepalive.EnforcementPolicy{
MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection
PermitWithoutStream: true, // Allow pings even when there are no active streams
}
kasp := keepalive.ServerParameters{
Time: 5 * time.Second,
Timeout: 1 * time.Second,
}
grpcBroker := grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp))
pbv1.RegisterCloudEventServiceServer(grpcBroker, bkr)
return grpcBroker.Serve(lis)
}
Expand Down
8 changes: 8 additions & 0 deletions test/integration/cloudevents/cloudevetns_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cloudevents
import (
"context"
"log"
"time"

"github.com/onsi/ginkgo"

Expand Down Expand Up @@ -41,5 +42,12 @@ func GetGRPCSourceOptions(ctx context.Context, sourceID string) (*options.CloudE

grpcOptions := grpc.NewGRPCOptions()
grpcOptions.URL = grpcServerHost
grpcOptions.KeepAliveOptions = grpc.KeepAliveOptions{
Enable: true,
Time: 10 * time.Second,
Timeout: 5 * time.Second,
PermitWithoutStream: true,
}

return grpc.NewSourceOptions(grpcOptions, sourceID), constants.ConfigTypeGRPC
}
12 changes: 11 additions & 1 deletion test/integration/cloudevents/server/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import (
"fmt"
"log"
"net"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
cloudeventstypes "github.com/cloudevents/sdk-go/v2/types"

"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"google.golang.org/protobuf/types/known/emptypb"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -91,7 +93,15 @@ func (svr *GRPCServer) Start(addr string) error {
log.Printf("failed to listen: %v", err)
return err
}
grpcServer := grpc.NewServer()
kaep := keepalive.EnforcementPolicy{
MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection
PermitWithoutStream: true, // Allow pings even when there are no active streams
}
kasp := keepalive.ServerParameters{
Time: 5 * time.Second,
Timeout: 1 * time.Second,
}
grpcServer := grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp))
pbv1.RegisterCloudEventServiceServer(grpcServer, svr)
return grpcServer.Serve(lis)
}
Expand Down
8 changes: 8 additions & 0 deletions test/integration/cloudevents/util/grpc.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package util

import (
"time"

"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc"
)

Expand All @@ -11,5 +13,11 @@ func NewGRPCAgentOptions(brokerURL string) *grpc.GRPCOptions {
func newGRPCOptions(brokerURL string) *grpc.GRPCOptions {
return &grpc.GRPCOptions{
URL: brokerURL,
KeepAliveOptions: grpc.KeepAliveOptions{
Enable: true,
Time: 10 * time.Second,
Timeout: 5 * time.Second,
PermitWithoutStream: true,
},
}
}

0 comments on commit a512ec8

Please sign in to comment.