Skip to content

Commit

Permalink
Fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
nstogner committed Dec 10, 2024
1 parent 214816f commit ccbbe53
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 31 deletions.
42 changes: 27 additions & 15 deletions internal/loadbalancer/balance_chwbl.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,42 +7,54 @@ import (
"github.com/cespare/xxhash"
)

func (g *group) chwblGetAddr(key string, loadFactor float64) (endpoint, bool) {
func (g *group) chwblGetAddr(key string, loadFactor float64, adapter string) (endpoint, bool) {
if len(g.chwblHashes) == 0 {
return endpoint{}, false
}

h := chwblHash(key)
_, idx := g.chwblSearch(h)
_, i0 := g.chwblSearch(h)

i := idx
var defaultEndpoint *endpoint

i := i0
// Avoid an infinite loop by checking if we've checked all the endpoints.
for n := 0; n < len(g.chwblSortedHashes); n++ {
name := g.chwblHashes[g.chwblSortedHashes[i]]
ep, ok := g.endpoints[name]
if !ok {
continue
panic(fmt.Sprintf("endpoints corrupted, %q should be in map", name))
}

var adapterMatches bool
if adapter == "" {
adapterMatches = true
} else {
_, adapterMatches = ep.adapters[adapter]
}
if chwblLoadOK(ep.inFlight.Load(), g.totalInFlight.Load(), len(g.endpoints), loadFactor) {
return ep, true

if adapterMatches {
if defaultEndpoint == nil {
// Save the first endpoint that has the adapter in case no
// endpoint is found with acceptable load.
defaultEndpoint = &ep
}
if chwblLoadOK(ep.inFlight.Load(), g.totalInFlight.Load(), len(g.endpoints), loadFactor) {
return ep, true
}
}

i++
if i >= len(g.chwblSortedHashes) {
// wrap around
i = 0
}
}

// If we reach this point, we have not found a suitable endpoint.
// Default to the first endpoint.
// This could happen if all endpoints have equal load and the factor
// is set to 1.
name := g.chwblHashes[g.chwblSortedHashes[idx]]
preferredEp, ok := g.endpoints[name]
if !ok {
return endpoint{}, false
if defaultEndpoint != nil {
return *defaultEndpoint, true
}
return preferredEp, false
return endpoint{}, false
}

func (g *group) chwblAddEndpoint(name string) {
Expand Down
2 changes: 1 addition & 1 deletion internal/loadbalancer/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (g *group) getBestAddr(ctx context.Context, req *apiutils.Request, awaitCha
var found bool
switch req.LoadBalancing.Strategy {
case v1.PrefixHashStrategy:
ep, found = g.chwblGetAddr(req.Adapter+req.Prefix, float64(req.LoadBalancing.PrefixHash.MeanLoadPercentage)/100)
ep, found = g.chwblGetAddr(req.Adapter+req.Prefix, float64(req.LoadBalancing.PrefixHash.MeanLoadPercentage)/100, req.Adapter)
case v1.LeastLoadStrategy:
ep, found = g.getAddrLeastLoad(req.Adapter)
default:
Expand Down
55 changes: 40 additions & 15 deletions internal/loadbalancer/load_balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,25 @@ func TestAwaitBestHostBehavior(t *testing.T) {
)

testCases := map[string]struct {
model string
adapter string
endpoints map[string]endpoint
expAddr string
expErr error
model string
adapter string
endpoints map[string]endpoint
strategies []v1.LoadBalancingStrategy
expAddr string
expErr error
}{
"model without adapter": {
model: myModel,
"model only": {
model: myModel,
strategies: []v1.LoadBalancingStrategy{
v1.LeastLoadStrategy,
v1.PrefixHashStrategy,
},
expAddr: myAddrWithoutAdapter,
endpoints: map[string]endpoint{
myPodWithoutAdapter: {address: myAddrWithoutAdapter},
},
},
"model with adapter": {
"model and adapter": {
model: myModel,
adapter: myAdapter,
endpoints: map[string]endpoint{
Expand All @@ -48,27 +53,43 @@ func TestAwaitBestHostBehavior(t *testing.T) {
myAdapter: {},
}},
},
strategies: []v1.LoadBalancingStrategy{
v1.LeastLoadStrategy,
v1.PrefixHashStrategy,
},
expAddr: myAddrWithAdapter,
},
"unknown model blocks until timeout": {
"no matching model blocks until timeout": {
model: "unknown-model",
endpoints: map[string]endpoint{
myPodWithoutAdapter: {address: myAddrWithoutAdapter},
},
strategies: []v1.LoadBalancingStrategy{
v1.LeastLoadStrategy,
v1.PrefixHashStrategy,
},
expErr: context.DeadlineExceeded,
},
"no matching adapter blocks until timeout": {
model: myModel,
adapter: "unknown-adapter",
endpoints: map[string]endpoint{
myPodWithoutAdapter: {address: myAddrWithoutAdapter},
},
strategies: []v1.LoadBalancingStrategy{
v1.LeastLoadStrategy,
v1.PrefixHashStrategy,
},
expErr: context.DeadlineExceeded,
},
// not covered: unknown port with multiple ports on entrypoint
}

for name, spec := range testCases {
// Behavior in these tests should be the same for both strategies.
for _, strategy := range []v1.LoadBalancingStrategy{
v1.LeastLoadStrategy,
v1.PrefixHashStrategy,
} {
for _, strategy := range spec.strategies {
t.Run(name+" with "+string(strategy)+" strategy", func(t *testing.T) {
manager := &LoadBalancer{
groups: make(map[string]*group, 1),
groups: map[string]*group{},
}

manager.getEndpoints(myModel).reconcileEndpoints(spec.endpoints)
Expand All @@ -81,6 +102,10 @@ func TestAwaitBestHostBehavior(t *testing.T) {
Adapter: spec.adapter,
LoadBalancing: v1.LoadBalancing{
Strategy: strategy,
PrefixHash: v1.PrefixHash{
MeanLoadPercentage: 125,
Replication: 1,
},
},
})
if spec.expErr != nil {
Expand Down

0 comments on commit ccbbe53

Please sign in to comment.