From 96c2e7d7013c99119f2d38eb4913bb67217b2ac5 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Sun, 19 Nov 2023 01:17:16 +0100 Subject: [PATCH] Implement retry logic on connection to the database server --- cmd/run.go | 11 ++++++++++- config/config.go | 4 ++++ config/constants.go | 4 ++++ config/getters.go | 8 ++++++++ config/types.go | 4 ++++ gatewayd.yaml | 5 +++++ network/client.go | 21 +++++++++------------ network/proxy.go | 22 ++++++++++++++++++++-- network/retry.go | 9 +++++++++ 9 files changed, 73 insertions(+), 15 deletions(-) diff --git a/cmd/run.go b/cmd/run.go index 0f539200..74073a12 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -527,7 +527,16 @@ var runCmd = &cobra.Command{ // Add clients to the pool. for i := 0; i < cfg.GetSize(); i++ { clientConfig := clients[name] - client := network.NewClient(runCtx, clientConfig, logger) + client := network.NewClient( + runCtx, clientConfig, logger, + network.NewRetry( + clientConfig.Retries, + clientConfig.GetBackoff(), + clientConfig.BackoffMultiplier, + clientConfig.DisableBackoffCaps, + loggers[name], + ), + ) if client != nil { eventOptions := trace.WithAttributes( diff --git a/config/config.go b/config/config.go index 240f9a9e..e98b9109 100644 --- a/config/config.go +++ b/config/config.go @@ -117,6 +117,10 @@ func (c *Config) LoadDefaults(ctx context.Context) { ReceiveTimeout: DefaultReceiveTimeout, SendDeadline: DefaultSendDeadline, DialTimeout: DefaultDialTimeout, + Retries: DefaultRetries, + Backoff: DefaultBackoff, + BackoffMultiplier: DefaultBackoffMultiplier, + DisableBackoffCaps: DefaultDisableBackoffCaps, } defaultPool := Pool{ diff --git a/config/constants.go b/config/constants.go index 6165e816..64919f30 100644 --- a/config/constants.go +++ b/config/constants.go @@ -104,6 +104,10 @@ const ( DefaultTCPKeepAlive = false DefaultReceiveTimeout = 0 DefaultDialTimeout = 60 * time.Second + DefaultRetries = 3 + DefaultBackoff = 1 * time.Second + DefaultBackoffMultiplier = 2 + DefaultDisableBackoffCaps = false // Pool constants. EmptyPoolCapacity = 0 diff --git a/config/getters.go b/config/getters.go index d2c84668..29dabd65 100644 --- a/config/getters.go +++ b/config/getters.go @@ -152,6 +152,14 @@ func (c Client) GetDialTimeout() time.Duration { return c.DialTimeout } +// GetBackoff returns the backoff from config file or default value. +func (c Client) GetBackoff() time.Duration { + if c.Backoff < 0 { + return DefaultBackoff + } + return c.Backoff +} + // GetHealthCheckPeriod returns the health check period from config file or default value. func (pr Proxy) GetHealthCheckPeriod() time.Duration { if pr.HealthCheckPeriod <= 0 { diff --git a/config/types.go b/config/types.go index 47525846..5e7e5780 100644 --- a/config/types.go +++ b/config/types.go @@ -38,6 +38,10 @@ type Client struct { ReceiveTimeout time.Duration `json:"receiveTimeout" jsonschema:"oneof_type=string;integer"` SendDeadline time.Duration `json:"sendDeadline" jsonschema:"oneof_type=string;integer"` DialTimeout time.Duration `json:"dialTimeout" jsonschema:"oneof_type=string;integer"` + Retries int `json:"retries"` + Backoff time.Duration `json:"backoff" jsonschema:"oneof_type=string;integer"` + BackoffMultiplier float64 `json:"backoffMultiplier"` + DisableBackoffCaps bool `json:"disableBackoffCaps"` } type Logger struct { diff --git a/gatewayd.yaml b/gatewayd.yaml index 6bf9bb61..258de481 100644 --- a/gatewayd.yaml +++ b/gatewayd.yaml @@ -41,6 +41,11 @@ clients: receiveTimeout: 0s # duration, 0ms/0s means no timeout sendDeadline: 0s # duration, 0ms/0s means no deadline dialTimeout: 60s # duration + # Retry configuration + retries: 3 # 0 means no retry + backoff: 1s # duration + backoffMultiplier: 2 # 0 means no backoff + disableBackoffCaps: false pools: default: diff --git a/network/client.go b/network/client.go index 8e95e81a..4d1b91ab 100644 --- a/network/client.go +++ b/network/client.go @@ -32,6 +32,7 @@ type Client struct { ctx context.Context //nolint:containedctx connected atomic.Bool mu sync.Mutex + retry IRetry TCPKeepAlive bool TCPKeepAlivePeriod time.Duration @@ -48,7 +49,9 @@ type Client struct { var _ IClient = (*Client)(nil) // NewClient creates a new client. -func NewClient(ctx context.Context, clientConfig *config.Client, logger zerolog.Logger) *Client { +func NewClient( + ctx context.Context, clientConfig *config.Client, logger zerolog.Logger, retry *Retry, +) *Client { clientCtx, span := otel.Tracer(config.TracerName).Start(ctx, "NewClient") defer span.End() @@ -72,6 +75,7 @@ func NewClient(ctx context.Context, clientConfig *config.Client, logger zerolog. client = Client{ ctx: clientCtx, mu: sync.Mutex{}, + retry: retry, Network: clientConfig.Network, Address: addr, DialTimeout: clientConfig.DialTimeout, @@ -85,16 +89,12 @@ func NewClient(ctx context.Context, clientConfig *config.Client, logger zerolog. } } - // Create a new connection. var ( conn net.Conn origErr error ) - if client.DialTimeout == 0 { - conn, origErr = net.Dial(client.Network, client.Address) - } else { - conn, origErr = net.DialTimeout(client.Network, client.Address, client.DialTimeout) - } + // Create a new connection and retry a few times if needed. + conn, origErr = client.retry.DialTimeout(client.Network, client.Address, client.DialTimeout) if origErr != nil { err := gerr.ErrClientConnectionFailed.Wrap(origErr) logger.Error().Err(err).Msg("Failed to create a new connection") @@ -264,11 +264,8 @@ func (c *Client) Reconnect() error { conn net.Conn err error ) - if c.DialTimeout == 0 { - conn, err = net.Dial(c.Network, c.Address) - } else { - conn, err = net.DialTimeout(c.Network, c.Address, c.DialTimeout) - } + // Create a new connection and retry a few times if needed. + conn, err = c.retry.DialTimeout(c.Network, c.Address, c.DialTimeout) if err != nil { c.logger.Error().Err(err).Msg("Failed to reconnect") span.RecordError(err) diff --git a/network/proxy.go b/network/proxy.go index 8760eeb9..bec9ac92 100644 --- a/network/proxy.go +++ b/network/proxy.go @@ -89,7 +89,16 @@ func NewProxy( proxy.availableConnections.Remove(client.ID) client.Close() // Create a new client. - client = NewClient(proxyCtx, proxy.ClientConfig, proxy.logger) + client = NewClient( + proxyCtx, proxy.ClientConfig, proxy.logger, + NewRetry( + proxy.ClientConfig.Retries, + proxy.ClientConfig.GetBackoff(), + proxy.ClientConfig.BackoffMultiplier, + proxy.ClientConfig.DisableBackoffCaps, + proxy.logger, + ), + ) if client != nil && client.ID != "" { if err := proxy.availableConnections.Put(client.ID, client); err != nil { proxy.logger.Err(err).Msg("Failed to update the client connection") @@ -146,7 +155,16 @@ func (pr *Proxy) Connect(conn *ConnWrapper) *gerr.GatewayDError { // Pool is exhausted or is elastic. if pr.Elastic { // Create a new client. - client = NewClient(pr.ctx, pr.ClientConfig, pr.logger) + client = NewClient( + pr.ctx, pr.ClientConfig, pr.logger, + NewRetry( + pr.ClientConfig.Retries, + pr.ClientConfig.GetBackoff(), + pr.ClientConfig.BackoffMultiplier, + pr.ClientConfig.DisableBackoffCaps, + pr.logger, + ), + ) span.AddEvent("Created a new client connection") pr.logger.Debug().Str("id", client.ID[:7]).Msg("Reused the client connection") } else { diff --git a/network/retry.go b/network/retry.go index 5f0f72ae..0bfc83ef 100644 --- a/network/retry.go +++ b/network/retry.go @@ -37,6 +37,15 @@ func (r *Retry) DialTimeout(network, address string, timeout time.Duration) (net retry int ) + if r == nil { + // Just dial the connection once. + if timeout == 0 { + return net.Dial(network, address) + } else { + return net.DialTimeout(network, address, timeout) + } + } + for ; retry < r.Retries; retry++ { // Wait for the backoff duration before retrying. The backoff duration is // calculated by multiplying the backoff duration by the backoff multiplier