Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] add retries and backoffs for propeller sending events to admin #5166

Merged
merged 4 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,42 @@ Use the same gRPC credentials option as the flyteadmin client
"false"


max-retries (int)
------------------------------------------------------------------------------------------------------------------------

The max number of retries for event recording.

**Default Value**:

.. code-block:: yaml

"5"


base-scalar (int)
------------------------------------------------------------------------------------------------------------------------

The base/scalar backoff duration in milliseconds for event recording retries.

**Default Value**:

.. code-block:: yaml

"100"


backoff-jitter (string)
------------------------------------------------------------------------------------------------------------------------

A string representation of a floating point number between 0 and 1 specifying the jitter factor for event recording retries.

**Default Value**:

.. code-block:: yaml

"0.1"


default-service-config (string)
------------------------------------------------------------------------------------------------------------------------

Expand Down Expand Up @@ -671,6 +707,42 @@ The max bucket size for event recording tokens.
"1000"


max-retries (int)
------------------------------------------------------------------------------------------------------------------------

The max number of retries for event recording.

**Default Value**:

.. code-block:: yaml

"5"


base-scalar (int)
------------------------------------------------------------------------------------------------------------------------

The base/scalar backoff duration in milliseconds for event recording retries.

**Default Value**:

.. code-block:: yaml

"100"


backoff-jitter (string)
------------------------------------------------------------------------------------------------------------------------

A string representation of a floating point number between 0 and 1 specifying the jitter factor for event recording retries.

**Default Value**:

.. code-block:: yaml

"0.1"


Section: logger
========================================================================================================================

Expand Down
16 changes: 13 additions & 3 deletions flytepropeller/events/admin_eventsink.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package events
import (
"context"
"fmt"
"time"

"github.com/golang/protobuf/proto"
grpcRetry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/propagation"
"golang.org/x/time/rate"
Expand Down Expand Up @@ -128,15 +130,23 @@ func IDFromMessage(message proto.Message) ([]byte, error) {
return []byte(id), nil
}

func initializeAdminClientFromConfig(ctx context.Context) (client service.AdminServiceClient, err error) {
func initializeAdminClientFromConfig(ctx context.Context, config *Config) (client service.AdminServiceClient, err error) {
cfg := admin2.GetConfig(ctx)
tracerProvider := otelutils.GetTracerProvider(otelutils.AdminClientTracer)
opt := grpc.WithUnaryInterceptor(

grpcOptions := []grpcRetry.CallOption{
grpcRetry.WithBackoff(grpcRetry.BackoffExponentialWithJitter(time.Duration(config.BackoffScalar)*time.Millisecond, config.GetBackoffJitter(ctx))),
grpcRetry.WithMax(uint(config.MaxRetries)),
}

opt := grpc.WithChainUnaryInterceptor(
otelgrpc.UnaryClientInterceptor(
otelgrpc.WithTracerProvider(tracerProvider),
otelgrpc.WithPropagators(propagation.TraceContext{}),
),
grpcRetry.UnaryClientInterceptor(grpcOptions...),
)

clients, err := admin2.NewClientsetBuilder().WithDialOptions(opt).WithConfig(cfg).Build(ctx)
if err != nil {
return nil, fmt.Errorf("failed to initialize clientset. Error: %w", err)
Expand All @@ -152,7 +162,7 @@ func ConstructEventSink(ctx context.Context, config *Config, scope promutils.Sco
case EventSinkFile:
return NewFileSink(config.FilePath)
case EventSinkAdmin:
adminClient, err := initializeAdminClientFromConfig(ctx)
adminClient, err := initializeAdminClientFromConfig(ctx, config)
if err != nil {
return nil, err
}
Expand Down
41 changes: 33 additions & 8 deletions flytepropeller/events/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package events

import (
"context"
"strconv"

"github.com/flyteorg/flyte/flytestdlib/config"
"github.com/flyteorg/flyte/flytestdlib/logger"
Expand All @@ -21,22 +22,46 @@ const (
)

type Config struct {
Type EventReportingType `json:"type" pflag:",Sets the type of EventSink to configure [log/admin/file]."`
FilePath string `json:"file-path" pflag:",For file types, specify where the file should be located."`
Rate int64 `json:"rate" pflag:",Max rate at which events can be recorded per second."`
Capacity int `json:"capacity" pflag:",The max bucket size for event recording tokens."`
Type EventReportingType `json:"type" pflag:",Sets the type of EventSink to configure [log/admin/file]."`
FilePath string `json:"file-path" pflag:",For file types, specify where the file should be located."`
Rate int64 `json:"rate" pflag:",Max rate at which events can be recorded per second."`
Capacity int `json:"capacity" pflag:",The max bucket size for event recording tokens."`
MaxRetries int `json:"max-retries" pflag:",The max number of retries for event recording."`
BackoffScalar int `json:"base-scalar" pflag:",The base/scalar backoff duration in milliseconds for event recording retries."`
BackoffJitter string `json:"backoff-jitter" pflag:",A string representation of a floating point number between 0 and 1 specifying the jitter factor for event recording retries."`
}

var (
defaultConfig = Config{
Rate: int64(500),
Capacity: 1000,
Type: EventSinkAdmin,
Rate: int64(500),
Capacity: 1000,
Type: EventSinkAdmin,
MaxRetries: 5,
BackoffScalar: 100,
BackoffJitter: "0.1",
}

configSection = config.MustRegisterSection(configSectionKey, &defaultConfig)
configSection = config.MustRegisterSectionWithUpdates(configSectionKey, &defaultConfig, func(ctx context.Context, newValue config.Config) {
if newValue.(*Config).MaxRetries < 0 {
logger.Panicf(ctx, "Admin configuration given with negative gRPC retry value.")
}

if jitter, err := strconv.ParseFloat(newValue.(*Config).BackoffJitter, 64); err != nil || jitter < 0 || jitter > 1 {
logger.Panicf(ctx, "Invalid jitter value [%v]. Must be between 0 and 1.", jitter)
}
})
)

func (c Config) GetBackoffJitter(ctx context.Context) float64 {
jitter, err := strconv.ParseFloat(c.BackoffJitter, 64)
if err != nil {
logger.Warnf(ctx, "Failed to parse backoff jitter [%v]. Error: %v", c.BackoffJitter, err)
return 0.1
}

return jitter
}

// GetConfig Retrieves current global config for storage.
func GetConfig(ctx context.Context) *Config {
if c, ok := configSection.GetConfig().(*Config); ok {
Expand Down
3 changes: 3 additions & 0 deletions flytepropeller/events/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 42 additions & 0 deletions flytepropeller/events/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 35 additions & 9 deletions flytepropeller/pkg/controller/nodes/catalog/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,37 @@ package catalog
import (
"context"
"fmt"
"strconv"

"google.golang.org/grpc"

"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/catalog"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/catalog/datacatalog"
"github.com/flyteorg/flyte/flytestdlib/config"
"github.com/flyteorg/flyte/flytestdlib/logger"
)

//go:generate pflags Config --default-var defaultConfig

const ConfigSectionKey = "catalog-cache"

var (
defaultConfig = &Config{
Type: NoOpDiscoveryType,
defaultConfig = Config{
Type: NoOpDiscoveryType,
MaxRetries: 5,
BackoffScalar: 100,
BackoffJitter: "0.1",
}

configSection = config.MustRegisterSection(ConfigSectionKey, defaultConfig)
configSection = config.MustRegisterSectionWithUpdates(ConfigSectionKey, &defaultConfig, func(ctx context.Context, newValue config.Config) {
if newValue.(*Config).MaxRetries < 0 {
logger.Panicf(ctx, "Admin configuration given with negative gRPC retry value.")
}

if jitter, err := strconv.ParseFloat(newValue.(*Config).BackoffJitter, 64); err != nil || jitter < 0 || jitter > 1 {
logger.Panicf(ctx, "Invalid jitter value [%v]. Must be between 0 and 1.", jitter)
}
})
)

type DiscoveryType = string
Expand All @@ -31,11 +44,14 @@ const (
)

type Config struct {
Type DiscoveryType `json:"type" pflag:"\"noop\", Catalog Implementation to use"`
Endpoint string `json:"endpoint" pflag:"\"\", Endpoint for catalog service"`
Insecure bool `json:"insecure" pflag:"false, Use insecure grpc connection"`
MaxCacheAge config.Duration `json:"max-cache-age" pflag:", Cache entries past this age will incur cache miss. 0 means cache never expires"`
UseAdminAuth bool `json:"use-admin-auth" pflag:"false, Use the same gRPC credentials option as the flyteadmin client"`
Type DiscoveryType `json:"type" pflag:"\"noop\", Catalog Implementation to use"`
Endpoint string `json:"endpoint" pflag:"\"\", Endpoint for catalog service"`
Insecure bool `json:"insecure" pflag:"false, Use insecure grpc connection"`
MaxCacheAge config.Duration `json:"max-cache-age" pflag:", Cache entries past this age will incur cache miss. 0 means cache never expires"`
UseAdminAuth bool `json:"use-admin-auth" pflag:"false, Use the same gRPC credentials option as the flyteadmin client"`
MaxRetries int `json:"max-retries" pflag:",The max number of retries for event recording."`
BackoffScalar int `json:"base-scalar" pflag:",The base/scalar backoff duration in milliseconds for event recording retries."`
BackoffJitter string `json:"backoff-jitter" pflag:",A string representation of a floating point number between 0 and 1 specifying the jitter factor for event recording retries."`

// Set the gRPC service config formatted as a json string https://github.com/grpc/grpc/blob/master/doc/service_config.md
// eg. {"loadBalancingConfig": [{"round_robin":{}}], "methodConfig": [{"name":[{"service": "foo", "method": "bar"}, {"service": "baz"}], "timeout": "1.000000001s"}]}
Expand All @@ -44,6 +60,16 @@ type Config struct {
DefaultServiceConfig string `json:"default-service-config" pflag:"\"\", Set the default service config for the catalog gRPC client"`
}

func (c Config) GetBackoffJitter(ctx context.Context) float64 {
jitter, err := strconv.ParseFloat(c.BackoffJitter, 64)
if err != nil {
logger.Warnf(ctx, "Failed to parse backoff jitter [%v]. Error: %v", c.BackoffJitter, err)
return 0.1
}

return jitter
}

// GetConfig gets loaded config for Discovery
func GetConfig() *Config {
return configSection.GetConfig().(*Config)
Expand All @@ -56,7 +82,7 @@ func NewCatalogClient(ctx context.Context, authOpt ...grpc.DialOption) (catalog.
case DataCatalogType:
return datacatalog.NewDataCatalog(ctx, catalogConfig.Endpoint, catalogConfig.Insecure,
catalogConfig.MaxCacheAge.Duration, catalogConfig.UseAdminAuth, catalogConfig.DefaultServiceConfig,
authOpt...)
uint(catalogConfig.MaxRetries), catalogConfig.BackoffScalar, catalogConfig.GetBackoffJitter(ctx), authOpt...)
case NoOpDiscoveryType, "":
return NOOPCatalog{}, nil
}
Expand Down
3 changes: 3 additions & 0 deletions flytepropeller/pkg/controller/nodes/catalog/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 42 additions & 0 deletions flytepropeller/pkg/controller/nodes/catalog/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading