Skip to content

Commit

Permalink
Merge pull request #53 from caesarxuchao/exponential-backoff
Browse files Browse the repository at this point in the history
Do exponential backoff in clientset sync
  • Loading branch information
k8s-ci-robot authored Feb 10, 2020
2 parents 65cfded + 2a115c2 commit 08d981c
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 35 deletions.
2 changes: 1 addition & 1 deletion cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet {
flags.StringVar(&o.proxyServerHost, "proxy-server-host", o.proxyServerHost, "The hostname to use to connect to the proxy-server.")
flags.IntVar(&o.proxyServerPort, "proxy-server-port", o.proxyServerPort, "The port the proxy server is listening on.")
flags.StringVar(&o.agentID, "agent-id", o.agentID, "The unique ID of this agent. Default to a generated uuid if not set.")
flags.DurationVar(&o.syncInterval, "sync-interval", o.syncInterval, "The interval by which the agent periodically checks that it has connections to all instances of the proxy server.")
flags.DurationVar(&o.syncInterval, "sync-interval", o.syncInterval, "The initial interval by which the agent periodically checks if it has connections to all instances of the proxy server.")
flags.DurationVar(&o.probeInterval, "probe-interval", o.probeInterval, "The interval by which the agent periodically checks if its connections to the proxy server are ready.")
flags.DurationVar(&o.reconnectInterval, "reconnect-interval", o.reconnectInterval, "The interval by which the agent tries to reconnect.")
flags.StringVar(&o.serviceAccountTokenPath, "service-account-token-path", o.serviceAccountTokenPath, "If non-empty proxy agent uses this token to prove its identity to the proxy server.")
Expand Down
87 changes: 53 additions & 34 deletions pkg/agent/agentclient/clientset.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@ package agentclient

import (
"fmt"
"math/rand"
"sync"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"
)

// ClientSet consists of clients connected to each instance of an HA proxy server.
type ClientSet struct {
mu sync.Mutex //protects the clients.
clients map[string]*AgentClient // map between serverID and the client
// connects to this proxy server.
// connects to this server.

agentID string // ID of this agent
address string // proxy server address. Assuming HA proxy server
Expand Down Expand Up @@ -101,24 +101,24 @@ func (cs *ClientSet) RemoveClient(serverID string) {
}

type ClientSetConfig struct {
Address string
AgentID string
SyncInterval time.Duration
ProbeInterval time.Duration
ReconnectInterval time.Duration
DialOption grpc.DialOption
Address string
AgentID string
SyncInterval time.Duration
ProbeInterval time.Duration
ReconnectInterval time.Duration
DialOption grpc.DialOption
ServiceAccountTokenPath string
}

func (cc *ClientSetConfig) NewAgentClientSet() *ClientSet {
return &ClientSet{
clients: make(map[string]*AgentClient),
agentID: cc.AgentID,
address: cc.Address,
syncInterval: cc.SyncInterval,
probeInterval: cc.ProbeInterval,
reconnectInterval: cc.ReconnectInterval,
dialOption: cc.DialOption,
clients: make(map[string]*AgentClient),
agentID: cc.AgentID,
address: cc.Address,
syncInterval: cc.SyncInterval,
probeInterval: cc.ProbeInterval,
reconnectInterval: cc.ReconnectInterval,
dialOption: cc.DialOption,
serviceAccountTokenPath: cc.ServiceAccountTokenPath,
}

Expand All @@ -128,30 +128,49 @@ func (cs *ClientSet) newAgentClient() (*AgentClient, error) {
return newAgentClient(cs.address, cs.agentID, cs, cs.dialOption)
}

func (cs *ClientSet) resetBackoff() *wait.Backoff {
return &wait.Backoff{
Steps: 3,
Jitter: 0.1,
Factor: 1.5,
Duration: cs.syncInterval,
Cap: 60 * time.Second,
}
}

// sync makes sure that #clients >= #proxy servers
func (cs *ClientSet) sync() {
jitter := float64(0.2)
backoff := cs.resetBackoff()
var duration time.Duration
for {
if cs.serverCount != 0 {
sleep := cs.syncInterval + time.Duration(rand.Float64()*jitter*float64(cs.syncInterval))
time.Sleep(sleep)
}
if cs.serverCount == 0 || cs.ClientsCount() < cs.serverCount {
c, err := cs.newAgentClient()
if err != nil {
klog.Error(err)
continue
}
cs.serverCount = c.stream.serverCount
if err := cs.AddClient(c.stream.serverID, c); err != nil {
klog.Infof("closing connection: %v", err)
c.Close()
continue
}
klog.Infof("sync added client connecting to proxy server %s", c.stream.serverID)
go c.Serve()
if err := cs.syncOnce(); err != nil {
klog.Error(err)
duration = backoff.Step()
} else {
backoff = cs.resetBackoff()
duration = wait.Jitter(backoff.Duration, backoff.Jitter)
}
time.Sleep(duration)
}
}

func (cs *ClientSet) syncOnce() error {
if cs.serverCount != 0 && cs.ClientsCount() >= cs.serverCount {
return nil
}
c, err := cs.newAgentClient()
if err != nil {
return err
}
cs.serverCount = c.stream.serverCount
if err := cs.AddClient(c.stream.serverID, c); err != nil {
klog.Infof("closing connection: %v", err)
c.Close()
return nil
}
klog.Infof("sync added client connecting to proxy server %s", c.stream.serverID)
go c.Serve()
return nil
}

func (cs *ClientSet) Serve() {
Expand Down

0 comments on commit 08d981c

Please sign in to comment.