Skip to content
This repository has been archived by the owner on Aug 26, 2024. It is now read-only.

Commit

Permalink
Retry control connection setup if getting of peers fails
Browse files Browse the repository at this point in the history
This could help with long startup times in case of issues
when fetching peer host info.
  • Loading branch information
martin-sucha committed Nov 3, 2023
1 parent d7209ab commit e6a3cbf
Showing 1 changed file with 35 additions and 6 deletions.
41 changes: 35 additions & 6 deletions control.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,9 @@ func (c *controlConn) discoverProtocol(hosts []*HostInfo) (int, error) {
return 0, err
}

func (c *controlConn) connect(hosts []*HostInfo) error {
func (c *controlConn) connect(hosts []*HostInfo) (newHosts []*HostInfo, partitionerName string, errOut error) {
if len(hosts) == 0 {
return errors.New("control: no endpoints specified")
return hosts, "", errors.New("control: no endpoints specified")
}

// shuffle endpoints so not all drivers will connect to the same initial
Expand All @@ -264,31 +264,59 @@ func (c *controlConn) connect(hosts []*HostInfo) error {
c.session.logger.Printf("gocql: unable to dial control conn %v:%v: %v\n", host.ConnectAddress(), host.Port(), err)
continue
}
err = c.setupConn(conn)

newHosts, partitionerName, err = c.connectSetupConn(conn, hosts)
if err == nil {
break
}
c.session.logger.Printf("gocql: unable setup control conn %v:%v: %v\n", host.ConnectAddress(), host.Port(), err)
err = fmt.Errorf("%v:%v: %v", host.ConnectAddress(), host.Port(), err)
c.session.logger.Printf("gocql: unable setup control conn %v\n", err)
conn.Close()
conn = nil
}
if conn == nil {
return fmt.Errorf("unable to connect to initial hosts: %v", err)
return hosts, "", fmt.Errorf("unable to connect to initial hosts: %v", err)
}

// we could fetch the initial ring here and update initial host data. So that
// when we return from here we have a ring topology ready to go.

go c.heartBeat()

return nil
return newHosts, partitionerName, nil
}

type connHost struct {
conn *Conn
host *HostInfo
}

func (c *controlConn) connectSetupConn(conn *Conn, hosts []*HostInfo,
) (newHosts []*HostInfo, partitionerName string, errOut error) {
err := c.setupConn(conn)
if err != nil {
return hosts, "", err
}

if c.session.cfg.DisableInitialHostLookup {
return hosts, "", nil
}

allHosts, partitionerName, err := c.session.hostSource.GetHosts()
if err != nil {
return hosts, "", err
}
c.session.policy.SetPartitioner(partitionerName)
filteredHosts := make([]*HostInfo, 0, len(allHosts))
for _, host := range allHosts {
if !c.session.cfg.filterHost(host) {
filteredHosts = append(filteredHosts, host)
}
}

return filteredHosts, partitionerName, nil
}

func (c *controlConn) setupConn(conn *Conn) error {
// we need up-to-date host info for the filterHost call below
iter := conn.querySystemLocal(context.TODO())
Expand Down Expand Up @@ -322,6 +350,7 @@ func (c *controlConn) setupConn(conn *Conn) error {
// TODO(martin-sucha): Trigger pool refill for all hosts, like in reconnectDownedHosts?
go c.session.startPoolFill(host)
}

return nil
}

Expand Down

0 comments on commit e6a3cbf

Please sign in to comment.