Skip to content

Commit

Permalink
enable grpc keepalive.
Browse files Browse the repository at this point in the history
Signed-off-by: morvencao <[email protected]>
  • Loading branch information
morvencao committed Aug 12, 2024
1 parent a7e2542 commit 37faf51
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 20 deletions.
89 changes: 75 additions & 14 deletions pkg/cloudevents/generic/options/grpc/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/credentials/oauth"
"google.golang.org/grpc/keepalive"
"gopkg.in/yaml.v2"

"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options"
Expand All @@ -22,11 +23,20 @@ import (

// GRPCOptions holds the options that are used to build gRPC client.
type GRPCOptions struct {
URL string
CAFile string
ClientCertFile string
ClientKeyFile string
TokenFile string
URL string
CAFile string
ClientCertFile string
ClientKeyFile string
TokenFile string
KeepAliveOptions KeepAliveOptions
}

// KeepAliveOptions holds the keepalive options for the gRPC client.
type KeepAliveOptions struct {
Enable bool
Time time.Duration
Timeout time.Duration
PermitWithoutStream bool
}

// GRPCConfig holds the information needed to build connect to gRPC server as a given user.
Expand All @@ -41,6 +51,26 @@ type GRPCConfig struct {
ClientKeyFile string `json:"clientKeyFile,omitempty" yaml:"clientKeyFile,omitempty"`
// TokenFile is the file path to a token file for authentication.
TokenFile string `json:"tokenFile,omitempty" yaml:"tokenFile,omitempty"`
// keepalive options
KeepAliveConfig KeepAliveConfig `json:"keepAliveConfig,omitempty" yaml:"keepAliveConfig,omitempty"`
}

// KeepAliveConfig holds the keepalive options for the gRPC client.
type KeepAliveConfig struct {
// Enable specifies whether the keepalive option is enabled.
// When disabled, other keepalive configurations are ignored. Default is false.
Enable bool `json:"enable,omitempty" yaml:"enable,omitempty"`
// Time sets the duration after which the client pings the server if no activity is seen.
// A minimum value of 10s is enforced if set below that. Default is 30s.
Time *time.Duration `json:"time,omitempty" yaml:"time,omitempty"`

// Timeout sets the duration the client waits for a response after a keepalive ping.
// If no response is received, the connection is closed. Default is 10s.
Timeout *time.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"`

// PermitWithoutStream determines if keepalive pings are sent when there are no active RPCs.
// If false, pings are not sent and Time and Timeout are ignored. Default is false.
PermitWithoutStream bool `json:"permitWithoutStream,omitempty" yaml:"permitWithoutStream,omitempty"`
}

// BuildGRPCOptionsFromFlags builds configs from a config filepath.
Expand Down Expand Up @@ -70,20 +100,52 @@ func BuildGRPCOptionsFromFlags(configPath string) (*GRPCOptions, error) {
return nil, fmt.Errorf("setting tokenFile requires caFile")
}

return &GRPCOptions{
options := &GRPCOptions{
URL: config.URL,
CAFile: config.CAFile,
ClientCertFile: config.ClientCertFile,
ClientKeyFile: config.ClientKeyFile,
TokenFile: config.TokenFile,
}, nil
KeepAliveOptions: KeepAliveOptions{
Enable: false,
Time: 30 * time.Second,
Timeout: 10 * time.Second,
PermitWithoutStream: false,
},
}

options.KeepAliveOptions.Enable = config.KeepAliveConfig.Enable

if config.KeepAliveConfig.Time != nil {
options.KeepAliveOptions.Time = *config.KeepAliveConfig.Time
}

if config.KeepAliveConfig.Timeout != nil {
options.KeepAliveOptions.Timeout = *config.KeepAliveConfig.Timeout
}

options.KeepAliveOptions.PermitWithoutStream = config.KeepAliveConfig.PermitWithoutStream

return options, nil
}

func NewGRPCOptions() *GRPCOptions {
return &GRPCOptions{}
}

func (o *GRPCOptions) GetGRPCClientConn() (*grpc.ClientConn, error) {
var kacp = keepalive.ClientParameters{
Time: o.KeepAliveOptions.Time,
Timeout: o.KeepAliveOptions.Timeout,
PermitWithoutStream: o.KeepAliveOptions.PermitWithoutStream,
}

// Prepare gRPC dial options.
dialOpts := []grpc.DialOption{}
if o.KeepAliveOptions.Enable {
dialOpts = append(dialOpts, grpc.WithKeepaliveParams(kacp))
}

if len(o.CAFile) != 0 {
certPool, err := x509.SystemCertPool()
if err != nil {
Expand All @@ -99,8 +161,6 @@ func (o *GRPCOptions) GetGRPCClientConn() (*grpc.ClientConn, error) {
return nil, fmt.Errorf("invalid CA %s", o.CAFile)
}

// Prepare gRPC dial options.
diaOpts := []grpc.DialOption{}
// Create a TLS configuration with CA pool and TLS 1.3.
tlsConfig := &tls.Config{
RootCAs: certPool,
Expand All @@ -117,10 +177,10 @@ func (o *GRPCOptions) GetGRPCClientConn() (*grpc.ClientConn, error) {
}
// Add client certificates to the TLS configuration.
tlsConfig.Certificates = []tls.Certificate{clientCerts}
diaOpts = append(diaOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
} else {
// token based authentication requires the configuration of transport credentials.
diaOpts = append(diaOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
if len(o.TokenFile) != 0 {
// Use token-based authentication if token file is provided.
token, err := os.ReadFile(o.TokenFile)
Expand All @@ -132,12 +192,12 @@ func (o *GRPCOptions) GetGRPCClientConn() (*grpc.ClientConn, error) {
AccessToken: string(token),
})}
// Add per-RPC credentials to the dial options.
diaOpts = append(diaOpts, grpc.WithPerRPCCredentials(perRPCCred))
dialOpts = append(dialOpts, grpc.WithPerRPCCredentials(perRPCCred))
}
}

// Establish a connection to the gRPC server.
conn, err := grpc.Dial(o.URL, diaOpts...)
conn, err := grpc.Dial(o.URL, dialOpts...)
if err != nil {
return nil, fmt.Errorf("failed to connect to grpc server %s, %v", o.URL, err)
}
Expand All @@ -146,7 +206,8 @@ func (o *GRPCOptions) GetGRPCClientConn() (*grpc.ClientConn, error) {
}

// Insecure connection option; should not be used in production.
conn, err := grpc.Dial(o.URL, grpc.WithTransportCredentials(insecure.NewCredentials()))
dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.Dial(o.URL, dialOpts...)
if err != nil {
return nil, fmt.Errorf("failed to connect to grpc server %s, %v", o.URL, err)
}
Expand Down
38 changes: 38 additions & 0 deletions pkg/cloudevents/generic/options/grpc/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"os"
"reflect"
"testing"
"time"

clienttesting "open-cluster-management.io/sdk-go/pkg/testing"
)
Expand Down Expand Up @@ -40,13 +41,38 @@ func TestBuildGRPCOptionsFromFlags(t *testing.T) {
config: "{\"url\":\"test\"}",
expectedOptions: &GRPCOptions{
URL: "test",
KeepAliveOptions: KeepAliveOptions{
Enable: false,
Time: 30 * time.Second,
Timeout: 10 * time.Second,
PermitWithoutStream: false,
},
},
},
{
name: "customized options with yaml format",
config: "url: test",
expectedOptions: &GRPCOptions{
URL: "test",
KeepAliveOptions: KeepAliveOptions{
Enable: false,
Time: 30 * time.Second,
Timeout: 10 * time.Second,
PermitWithoutStream: false,
},
},
},
{
name: "customized options with keepalive",
config: "{\"url\":\"test\",\"keepAliveConfig\":{\"enable\":true,\"time\":10s,\"timeout\":5s,\"permitWithoutStream\":true}}",
expectedOptions: &GRPCOptions{
URL: "test",
KeepAliveOptions: KeepAliveOptions{
Enable: true,
Time: 10 * time.Second,
Timeout: 5 * time.Second,
PermitWithoutStream: true,
},
},
},
{
Expand All @@ -55,6 +81,12 @@ func TestBuildGRPCOptionsFromFlags(t *testing.T) {
expectedOptions: &GRPCOptions{
URL: "test",
CAFile: "test",
KeepAliveOptions: KeepAliveOptions{
Enable: false,
Time: 30 * time.Second,
Timeout: 10 * time.Second,
PermitWithoutStream: false,
},
},
},
{
Expand All @@ -65,6 +97,12 @@ func TestBuildGRPCOptionsFromFlags(t *testing.T) {
CAFile: "test",
ClientCertFile: "test",
ClientKeyFile: "test",
KeepAliveOptions: KeepAliveOptions{
Enable: false,
Time: 30 * time.Second,
Timeout: 10 * time.Second,
PermitWithoutStream: false,
},
},
},
{
Expand Down
13 changes: 9 additions & 4 deletions pkg/cloudevents/generic/optionsbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,15 @@ func TestBuildCloudEventsSourceOptions(t *testing.T) {
},
},
{
name: "grpc config",
configType: "grpc",
configFile: configFile(t, "grpc-config-test-", []byte(grpcConfig)),
expectedOptions: &grpc.GRPCOptions{URL: "grpc"},
name: "grpc config",
configType: "grpc",
configFile: configFile(t, "grpc-config-test-", []byte(grpcConfig)),
expectedOptions: &grpc.GRPCOptions{URL: "grpc", KeepAliveOptions: grpc.KeepAliveOptions{
Enable: false,
Time: 30 * time.Second,
Timeout: 10 * time.Second,
PermitWithoutStream: false,
}},
},
}

Expand Down
12 changes: 11 additions & 1 deletion test/integration/cloudevents/broker/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import (
"fmt"
"log"
"net"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
cloudeventstypes "github.com/cloudevents/sdk-go/v2/types"

"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"google.golang.org/protobuf/types/known/emptypb"

"k8s.io/klog/v2"
Expand Down Expand Up @@ -100,7 +102,15 @@ func (bkr *GRPCBroker) Start(addr string) error {
log.Printf("failed to listen: %v", err)
return err
}
grpcBroker := grpc.NewServer()
kaep := keepalive.EnforcementPolicy{
MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection
PermitWithoutStream: true, // Allow pings even when there are no active streams
}
kasp := keepalive.ServerParameters{
Time: 5 * time.Second,
Timeout: 1 * time.Second,
}
grpcBroker := grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp))
pbv1.RegisterCloudEventServiceServer(grpcBroker, bkr)
return grpcBroker.Serve(lis)
}
Expand Down
7 changes: 7 additions & 0 deletions test/integration/cloudevents/cloudevetns_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"log"
"strings"
"time"

"github.com/onsi/ginkgo"

Expand Down Expand Up @@ -49,6 +50,12 @@ func GetGRPCSourceOptions(ctx context.Context, sourceID string) (*options.CloudE
grpcOptions.URL = grpcServerHost
grpcOptions.CAFile = serverCAFile
grpcOptions.TokenFile = tokenFile
grpcOptions.KeepAliveOptions = grpcoptions.KeepAliveOptions{
Enable: true,
Time: 10 * time.Second,
Timeout: 5 * time.Second,
PermitWithoutStream: true,
}

return grpcoptions.NewSourceOptions(grpcOptions, sourceID), constants.ConfigTypeGRPC
}
Expand Down
12 changes: 11 additions & 1 deletion test/integration/cloudevents/server/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import (
"fmt"
"log"
"net"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
cloudeventstypes "github.com/cloudevents/sdk-go/v2/types"

"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"google.golang.org/protobuf/types/known/emptypb"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -91,7 +93,15 @@ func (svr *GRPCServer) Start(addr string, serverOpts []grpc.ServerOption) error
log.Printf("failed to listen: %v", err)
return err
}
grpcServer := grpc.NewServer(serverOpts...)
kaep := keepalive.EnforcementPolicy{
MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection
PermitWithoutStream: true, // Allow pings even when there are no active streams
}
kasp := keepalive.ServerParameters{
Time: 5 * time.Second,
Timeout: 1 * time.Second,
}
grpcServer := grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp))
pbv1.RegisterCloudEventServiceServer(grpcServer, svr)
return grpcServer.Serve(lis)
}
Expand Down
8 changes: 8 additions & 0 deletions test/integration/cloudevents/util/grpc.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package util

import (
"time"

"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc"
)

Expand All @@ -11,5 +13,11 @@ func NewGRPCAgentOptions(brokerURL string) *grpc.GRPCOptions {
func newGRPCOptions(brokerURL string) *grpc.GRPCOptions {
return &grpc.GRPCOptions{
URL: brokerURL,
KeepAliveOptions: grpc.KeepAliveOptions{
Enable: true,
Time: 10 * time.Second,
Timeout: 5 * time.Second,
PermitWithoutStream: true,
},
}
}

0 comments on commit 37faf51

Please sign in to comment.