diff --git a/CHANGELOG.md b/CHANGELOG.md index 20da746a0..f48161516 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Don't restrict server authenticator unless PasswordAuthentictor.AllowedAuthenticators is provided (CASSGO-19) +- Remove HostPoolHostPolicy from gocql package (CASSGO-21) + ### Fixed ## [1.7.0] - 2024-09-23 diff --git a/hostpool/hostpool.go b/hostpool/hostpool.go new file mode 100644 index 000000000..e4a648598 --- /dev/null +++ b/hostpool/hostpool.go @@ -0,0 +1,150 @@ +package hostpool + +import ( + "sync" + + "github.com/hailocab/go-hostpool" + + "github.com/gocql/gocql" +) + +// HostPoolHostPolicy is a host policy which uses the bitly/go-hostpool library +// to distribute queries between hosts and prevent sending queries to +// unresponsive hosts. When creating the host pool that is passed to the policy +// use an empty slice of hosts as the hostpool will be populated later by gocql. +// See below for examples of usage: +// +// // Create host selection policy using a simple host pool +// cluster.PoolConfig.HostSelectionPolicy = HostPoolHostPolicy(hostpool.New(nil)) +// +// // Create host selection policy using an epsilon greedy pool +// cluster.PoolConfig.HostSelectionPolicy = HostPoolHostPolicy( +// hostpool.NewEpsilonGreedy(nil, 0, &hostpool.LinearEpsilonValueCalculator{}), +// ) +func HostPoolHostPolicy(hp hostpool.HostPool) *hostPoolHostPolicy { + return &hostPoolHostPolicy{hostMap: map[string]*gocql.HostInfo{}, hp: hp} +} + +type hostPoolHostPolicy struct { + hp hostpool.HostPool + mu sync.RWMutex + hostMap map[string]*gocql.HostInfo +} + +func (r *hostPoolHostPolicy) Init(*gocql.Session) {} +func (r *hostPoolHostPolicy) KeyspaceChanged(gocql.KeyspaceUpdateEvent) {} +func (r *hostPoolHostPolicy) SetPartitioner(string) {} +func (r *hostPoolHostPolicy) IsLocal(*gocql.HostInfo) bool { return true } + +func (r *hostPoolHostPolicy) SetHosts(hosts []*gocql.HostInfo) { + peers := make([]string, len(hosts)) + hostMap := make(map[string]*gocql.HostInfo, len(hosts)) + + for i, host := range hosts { + ip := host.ConnectAddress().String() + peers[i] = ip + hostMap[ip] = host + } + + r.mu.Lock() + r.hp.SetHosts(peers) + r.hostMap = hostMap + r.mu.Unlock() +} + +func (r *hostPoolHostPolicy) AddHost(host *gocql.HostInfo) { + ip := host.ConnectAddress().String() + + r.mu.Lock() + defer r.mu.Unlock() + + // If the host addr is present and isn't nil return + if h, ok := r.hostMap[ip]; ok && h != nil { + return + } + // otherwise, add the host to the map + r.hostMap[ip] = host + // and construct a new peer list to give to the HostPool + hosts := make([]string, 0, len(r.hostMap)) + for addr := range r.hostMap { + hosts = append(hosts, addr) + } + + r.hp.SetHosts(hosts) +} + +func (r *hostPoolHostPolicy) RemoveHost(host *gocql.HostInfo) { + ip := host.ConnectAddress().String() + + r.mu.Lock() + defer r.mu.Unlock() + + if _, ok := r.hostMap[ip]; !ok { + return + } + + delete(r.hostMap, ip) + hosts := make([]string, 0, len(r.hostMap)) + for _, host := range r.hostMap { + hosts = append(hosts, host.ConnectAddress().String()) + } + + r.hp.SetHosts(hosts) +} + +func (r *hostPoolHostPolicy) HostUp(host *gocql.HostInfo) { + r.AddHost(host) +} + +func (r *hostPoolHostPolicy) HostDown(host *gocql.HostInfo) { + r.RemoveHost(host) +} + +func (r *hostPoolHostPolicy) Pick(qry gocql.ExecutableQuery) gocql.NextHost { + return func() gocql.SelectedHost { + r.mu.RLock() + defer r.mu.RUnlock() + + if len(r.hostMap) == 0 { + return nil + } + + hostR := r.hp.Get() + host, ok := r.hostMap[hostR.Host()] + if !ok { + return nil + } + + return selectedHostPoolHost{ + policy: r, + info: host, + hostR: hostR, + } + } +} + +// selectedHostPoolHost is a host returned by the hostPoolHostPolicy and +// implements the SelectedHost interface +type selectedHostPoolHost struct { + policy *hostPoolHostPolicy + info *gocql.HostInfo + hostR hostpool.HostPoolResponse +} + +func (host selectedHostPoolHost) Info() *gocql.HostInfo { + return host.info +} + +func (host selectedHostPoolHost) Mark(err error) { + ip := host.info.ConnectAddress().String() + + host.policy.mu.RLock() + defer host.policy.mu.RUnlock() + + if _, ok := host.policy.hostMap[ip]; !ok { + // host was removed between pick and mark + return + } + + host.hostR.Mark(err) +} diff --git a/hostpool/hostpool_test.go b/hostpool/hostpool_test.go new file mode 100644 index 000000000..19b2fb724 --- /dev/null +++ b/hostpool/hostpool_test.go @@ -0,0 +1,57 @@ +package hostpool + +import ( + "fmt" + "net" + "testing" + + "github.com/hailocab/go-hostpool" + + "github.com/gocql/gocql" +) + +func TestHostPolicy_HostPool(t *testing.T) { + policy := HostPoolHostPolicy(hostpool.New(nil)) + + //hosts := []*gocql.HostInfo{ + // {hostId: "0", connectAddress: net.IPv4(10, 0, 0, 0)}, + // {hostId: "1", connectAddress: net.IPv4(10, 0, 0, 1)}, + //} + firstHost := &gocql.HostInfo{} + firstHost.SetHostID("0") + firstHost.SetConnectAddress(net.IPv4(10, 0, 0, 0)) + secHost := &gocql.HostInfo{} + secHost.SetHostID("1") + secHost.SetConnectAddress(net.IPv4(10, 0, 0, 1)) + hosts := []*gocql.HostInfo{firstHost, secHost} + // Using set host to control the ordering of the hosts as calling "AddHost" iterates the map + // which will result in an unpredictable ordering + policy.SetHosts(hosts) + + // the first host selected is actually at [1], but this is ok for RR + // interleaved iteration should always increment the host + iter := policy.Pick(nil) + actualA := iter() + if actualA.Info().HostID() != "0" { + t.Errorf("Expected hosts[0] but was hosts[%s]", actualA.Info().HostID()) + } + actualA.Mark(nil) + + actualB := iter() + if actualB.Info().HostID() != "1" { + t.Errorf("Expected hosts[1] but was hosts[%s]", actualB.Info().HostID()) + } + actualB.Mark(fmt.Errorf("error")) + + actualC := iter() + if actualC.Info().HostID() != "0" { + t.Errorf("Expected hosts[0] but was hosts[%s]", actualC.Info().HostID()) + } + actualC.Mark(nil) + + actualD := iter() + if actualD.Info().HostID() != "0" { + t.Errorf("Expected hosts[0] but was hosts[%s]", actualD.Info().HostID()) + } + actualD.Mark(nil) +} diff --git a/policies.go b/policies.go index 1157da87b..7505289d2 100644 --- a/policies.go +++ b/policies.go @@ -36,8 +36,6 @@ import ( "sync" "sync/atomic" "time" - - "github.com/hailocab/go-hostpool" ) // cowHostList implements a copy on write host list, its equivalent type is []*HostInfo @@ -690,147 +688,6 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost { } } -// HostPoolHostPolicy is a host policy which uses the bitly/go-hostpool library -// to distribute queries between hosts and prevent sending queries to -// unresponsive hosts. When creating the host pool that is passed to the policy -// use an empty slice of hosts as the hostpool will be populated later by gocql. -// See below for examples of usage: -// -// // Create host selection policy using a simple host pool -// cluster.PoolConfig.HostSelectionPolicy = HostPoolHostPolicy(hostpool.New(nil)) -// -// // Create host selection policy using an epsilon greedy pool -// cluster.PoolConfig.HostSelectionPolicy = HostPoolHostPolicy( -// hostpool.NewEpsilonGreedy(nil, 0, &hostpool.LinearEpsilonValueCalculator{}), -// ) -func HostPoolHostPolicy(hp hostpool.HostPool) HostSelectionPolicy { - return &hostPoolHostPolicy{hostMap: map[string]*HostInfo{}, hp: hp} -} - -type hostPoolHostPolicy struct { - hp hostpool.HostPool - mu sync.RWMutex - hostMap map[string]*HostInfo -} - -func (r *hostPoolHostPolicy) Init(*Session) {} -func (r *hostPoolHostPolicy) KeyspaceChanged(KeyspaceUpdateEvent) {} -func (r *hostPoolHostPolicy) SetPartitioner(string) {} -func (r *hostPoolHostPolicy) IsLocal(*HostInfo) bool { return true } - -func (r *hostPoolHostPolicy) SetHosts(hosts []*HostInfo) { - peers := make([]string, len(hosts)) - hostMap := make(map[string]*HostInfo, len(hosts)) - - for i, host := range hosts { - ip := host.ConnectAddress().String() - peers[i] = ip - hostMap[ip] = host - } - - r.mu.Lock() - r.hp.SetHosts(peers) - r.hostMap = hostMap - r.mu.Unlock() -} - -func (r *hostPoolHostPolicy) AddHost(host *HostInfo) { - ip := host.ConnectAddress().String() - - r.mu.Lock() - defer r.mu.Unlock() - - // If the host addr is present and isn't nil return - if h, ok := r.hostMap[ip]; ok && h != nil { - return - } - // otherwise, add the host to the map - r.hostMap[ip] = host - // and construct a new peer list to give to the HostPool - hosts := make([]string, 0, len(r.hostMap)) - for addr := range r.hostMap { - hosts = append(hosts, addr) - } - - r.hp.SetHosts(hosts) -} - -func (r *hostPoolHostPolicy) RemoveHost(host *HostInfo) { - ip := host.ConnectAddress().String() - - r.mu.Lock() - defer r.mu.Unlock() - - if _, ok := r.hostMap[ip]; !ok { - return - } - - delete(r.hostMap, ip) - hosts := make([]string, 0, len(r.hostMap)) - for _, host := range r.hostMap { - hosts = append(hosts, host.ConnectAddress().String()) - } - - r.hp.SetHosts(hosts) -} - -func (r *hostPoolHostPolicy) HostUp(host *HostInfo) { - r.AddHost(host) -} - -func (r *hostPoolHostPolicy) HostDown(host *HostInfo) { - r.RemoveHost(host) -} - -func (r *hostPoolHostPolicy) Pick(qry ExecutableQuery) NextHost { - return func() SelectedHost { - r.mu.RLock() - defer r.mu.RUnlock() - - if len(r.hostMap) == 0 { - return nil - } - - hostR := r.hp.Get() - host, ok := r.hostMap[hostR.Host()] - if !ok { - return nil - } - - return selectedHostPoolHost{ - policy: r, - info: host, - hostR: hostR, - } - } -} - -// selectedHostPoolHost is a host returned by the hostPoolHostPolicy and -// implements the SelectedHost interface -type selectedHostPoolHost struct { - policy *hostPoolHostPolicy - info *HostInfo - hostR hostpool.HostPoolResponse -} - -func (host selectedHostPoolHost) Info() *HostInfo { - return host.info -} - -func (host selectedHostPoolHost) Mark(err error) { - ip := host.info.ConnectAddress().String() - - host.policy.mu.RLock() - defer host.policy.mu.RUnlock() - - if _, ok := host.policy.hostMap[ip]; !ok { - // host was removed between pick and mark - return - } - - host.hostR.Mark(err) -} - type dcAwareRR struct { local string localHosts cowHostList diff --git a/policies_test.go b/policies_test.go index 231c2a7e2..e8bda8908 100644 --- a/policies_test.go +++ b/policies_test.go @@ -36,8 +36,6 @@ import ( "strings" "testing" "time" - - "github.com/hailocab/go-hostpool" ) // Tests of the round-robin host selection policy implementation @@ -140,47 +138,6 @@ func TestHostPolicy_TokenAware_SimpleStrategy(t *testing.T) { expectNoMoreHosts(t, iter) } -// Tests of the host pool host selection policy implementation -func TestHostPolicy_HostPool(t *testing.T) { - policy := HostPoolHostPolicy(hostpool.New(nil)) - - hosts := []*HostInfo{ - {hostId: "0", connectAddress: net.IPv4(10, 0, 0, 0)}, - {hostId: "1", connectAddress: net.IPv4(10, 0, 0, 1)}, - } - - // Using set host to control the ordering of the hosts as calling "AddHost" iterates the map - // which will result in an unpredictable ordering - policy.(*hostPoolHostPolicy).SetHosts(hosts) - - // the first host selected is actually at [1], but this is ok for RR - // interleaved iteration should always increment the host - iter := policy.Pick(nil) - actualA := iter() - if actualA.Info().HostID() != "0" { - t.Errorf("Expected hosts[0] but was hosts[%s]", actualA.Info().HostID()) - } - actualA.Mark(nil) - - actualB := iter() - if actualB.Info().HostID() != "1" { - t.Errorf("Expected hosts[1] but was hosts[%s]", actualB.Info().HostID()) - } - actualB.Mark(fmt.Errorf("error")) - - actualC := iter() - if actualC.Info().HostID() != "0" { - t.Errorf("Expected hosts[0] but was hosts[%s]", actualC.Info().HostID()) - } - actualC.Mark(nil) - - actualD := iter() - if actualD.Info().HostID() != "0" { - t.Errorf("Expected hosts[0] but was hosts[%s]", actualD.Info().HostID()) - } - actualD.Mark(nil) -} - func TestHostPolicy_RoundRobin_NilHostInfo(t *testing.T) { policy := RoundRobinHostPolicy()