Skip to content

Commit

Permalink
Implement retry logic on connection to the database server
Browse files Browse the repository at this point in the history
  • Loading branch information
mostafa committed Nov 19, 2023
1 parent 199c72c commit 96c2e7d
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 15 deletions.
11 changes: 10 additions & 1 deletion cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,16 @@ var runCmd = &cobra.Command{
// Add clients to the pool.
for i := 0; i < cfg.GetSize(); i++ {
clientConfig := clients[name]
client := network.NewClient(runCtx, clientConfig, logger)
client := network.NewClient(
runCtx, clientConfig, logger,
network.NewRetry(
clientConfig.Retries,
clientConfig.GetBackoff(),
clientConfig.BackoffMultiplier,
clientConfig.DisableBackoffCaps,
loggers[name],
),
)

if client != nil {
eventOptions := trace.WithAttributes(
Expand Down
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ func (c *Config) LoadDefaults(ctx context.Context) {
ReceiveTimeout: DefaultReceiveTimeout,
SendDeadline: DefaultSendDeadline,
DialTimeout: DefaultDialTimeout,
Retries: DefaultRetries,
Backoff: DefaultBackoff,
BackoffMultiplier: DefaultBackoffMultiplier,
DisableBackoffCaps: DefaultDisableBackoffCaps,
}

defaultPool := Pool{
Expand Down
4 changes: 4 additions & 0 deletions config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ const (
DefaultTCPKeepAlive = false
DefaultReceiveTimeout = 0
DefaultDialTimeout = 60 * time.Second
DefaultRetries = 3
DefaultBackoff = 1 * time.Second
DefaultBackoffMultiplier = 2
DefaultDisableBackoffCaps = false

// Pool constants.
EmptyPoolCapacity = 0
Expand Down
8 changes: 8 additions & 0 deletions config/getters.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,14 @@ func (c Client) GetDialTimeout() time.Duration {
return c.DialTimeout
}

// GetBackoff returns the backoff from config file or default value.
func (c Client) GetBackoff() time.Duration {
if c.Backoff < 0 {
return DefaultBackoff
}
return c.Backoff
}

// GetHealthCheckPeriod returns the health check period from config file or default value.
func (pr Proxy) GetHealthCheckPeriod() time.Duration {
if pr.HealthCheckPeriod <= 0 {
Expand Down
4 changes: 4 additions & 0 deletions config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ type Client struct {
ReceiveTimeout time.Duration `json:"receiveTimeout" jsonschema:"oneof_type=string;integer"`
SendDeadline time.Duration `json:"sendDeadline" jsonschema:"oneof_type=string;integer"`
DialTimeout time.Duration `json:"dialTimeout" jsonschema:"oneof_type=string;integer"`
Retries int `json:"retries"`
Backoff time.Duration `json:"backoff" jsonschema:"oneof_type=string;integer"`
BackoffMultiplier float64 `json:"backoffMultiplier"`
DisableBackoffCaps bool `json:"disableBackoffCaps"`
}

type Logger struct {
Expand Down
5 changes: 5 additions & 0 deletions gatewayd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ clients:
receiveTimeout: 0s # duration, 0ms/0s means no timeout
sendDeadline: 0s # duration, 0ms/0s means no deadline
dialTimeout: 60s # duration
# Retry configuration
retries: 3 # 0 means no retry
backoff: 1s # duration
backoffMultiplier: 2 # 0 means no backoff
disableBackoffCaps: false

pools:
default:
Expand Down
21 changes: 9 additions & 12 deletions network/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Client struct {
ctx context.Context //nolint:containedctx
connected atomic.Bool
mu sync.Mutex
retry IRetry

TCPKeepAlive bool
TCPKeepAlivePeriod time.Duration
Expand All @@ -48,7 +49,9 @@ type Client struct {
var _ IClient = (*Client)(nil)

// NewClient creates a new client.
func NewClient(ctx context.Context, clientConfig *config.Client, logger zerolog.Logger) *Client {
func NewClient(
ctx context.Context, clientConfig *config.Client, logger zerolog.Logger, retry *Retry,
) *Client {
clientCtx, span := otel.Tracer(config.TracerName).Start(ctx, "NewClient")
defer span.End()

Expand All @@ -72,6 +75,7 @@ func NewClient(ctx context.Context, clientConfig *config.Client, logger zerolog.
client = Client{
ctx: clientCtx,
mu: sync.Mutex{},
retry: retry,
Network: clientConfig.Network,
Address: addr,
DialTimeout: clientConfig.DialTimeout,
Expand All @@ -85,16 +89,12 @@ func NewClient(ctx context.Context, clientConfig *config.Client, logger zerolog.
}
}

// Create a new connection.
var (
conn net.Conn
origErr error
)
if client.DialTimeout == 0 {
conn, origErr = net.Dial(client.Network, client.Address)
} else {
conn, origErr = net.DialTimeout(client.Network, client.Address, client.DialTimeout)
}
// Create a new connection and retry a few times if needed.
conn, origErr = client.retry.DialTimeout(client.Network, client.Address, client.DialTimeout)
if origErr != nil {
err := gerr.ErrClientConnectionFailed.Wrap(origErr)
logger.Error().Err(err).Msg("Failed to create a new connection")
Expand Down Expand Up @@ -264,11 +264,8 @@ func (c *Client) Reconnect() error {
conn net.Conn
err error
)
if c.DialTimeout == 0 {
conn, err = net.Dial(c.Network, c.Address)
} else {
conn, err = net.DialTimeout(c.Network, c.Address, c.DialTimeout)
}
// Create a new connection and retry a few times if needed.
conn, err = c.retry.DialTimeout(c.Network, c.Address, c.DialTimeout)
if err != nil {
c.logger.Error().Err(err).Msg("Failed to reconnect")
span.RecordError(err)
Expand Down
22 changes: 20 additions & 2 deletions network/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,16 @@ func NewProxy(
proxy.availableConnections.Remove(client.ID)
client.Close()
// Create a new client.
client = NewClient(proxyCtx, proxy.ClientConfig, proxy.logger)
client = NewClient(
proxyCtx, proxy.ClientConfig, proxy.logger,
NewRetry(
proxy.ClientConfig.Retries,
proxy.ClientConfig.GetBackoff(),
proxy.ClientConfig.BackoffMultiplier,
proxy.ClientConfig.DisableBackoffCaps,
proxy.logger,
),
)
if client != nil && client.ID != "" {
if err := proxy.availableConnections.Put(client.ID, client); err != nil {
proxy.logger.Err(err).Msg("Failed to update the client connection")
Expand Down Expand Up @@ -146,7 +155,16 @@ func (pr *Proxy) Connect(conn *ConnWrapper) *gerr.GatewayDError {
// Pool is exhausted or is elastic.
if pr.Elastic {
// Create a new client.
client = NewClient(pr.ctx, pr.ClientConfig, pr.logger)
client = NewClient(
pr.ctx, pr.ClientConfig, pr.logger,
NewRetry(
pr.ClientConfig.Retries,
pr.ClientConfig.GetBackoff(),
pr.ClientConfig.BackoffMultiplier,
pr.ClientConfig.DisableBackoffCaps,
pr.logger,
),
)
span.AddEvent("Created a new client connection")
pr.logger.Debug().Str("id", client.ID[:7]).Msg("Reused the client connection")
} else {
Expand Down
9 changes: 9 additions & 0 deletions network/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ func (r *Retry) DialTimeout(network, address string, timeout time.Duration) (net
retry int
)

if r == nil {
// Just dial the connection once.
if timeout == 0 {
return net.Dial(network, address)
} else {
return net.DialTimeout(network, address, timeout)
}
}

for ; retry < r.Retries; retry++ {
// Wait for the backoff duration before retrying. The backoff duration is
// calculated by multiplying the backoff duration by the backoff multiplier
Expand Down

0 comments on commit 96c2e7d

Please sign in to comment.