-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Consistent Hashing Load Balancing Strategy #592
Merged
mostafa
merged 6 commits into
gatewayd-io:main
from
sinadarbouy:feature/ConsistentHashLB
Aug 19, 2024
Merged
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
7d6d02d
Added ConsistentHash
sinadarbouy aeb13c1
changed net conn into Iconnwrapper
sinadarbouy 2dd919b
added test cases for onsistentHash
sinadarbouy 1a256ca
fixed lint issues
sinadarbouy 10e947f
replace RWMutex to Mutex and added test case for concurency access
sinadarbouy dabf0a3
added github.com/spaolacci/murmur3 into depguard
sinadarbouy File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
package network | ||
|
||
import ( | ||
"fmt" | ||
"net" | ||
"sync" | ||
|
||
gerr "github.com/gatewayd-io/gatewayd/errors" | ||
"github.com/spaolacci/murmur3" | ||
) | ||
|
||
// ConsistentHash implements a load balancing strategy based on consistent hashing. | ||
// It routes client connections to specific proxies by hashing the client's IP address or the full connection address. | ||
type ConsistentHash struct { | ||
originalStrategy LoadBalancerStrategy | ||
useSourceIP bool | ||
hashMap map[uint64]IProxy | ||
mu sync.Mutex | ||
} | ||
|
||
// NewConsistentHash creates a new ConsistentHash instance. It requires a server configuration and an original | ||
// load balancing strategy. The consistent hash can use either the source IP or the full connection address | ||
// as the key for hashing. | ||
func NewConsistentHash(server *Server, originalStrategy LoadBalancerStrategy) *ConsistentHash { | ||
return &ConsistentHash{ | ||
originalStrategy: originalStrategy, | ||
useSourceIP: server.LoadbalancerConsistentHash.UseSourceIP, | ||
hashMap: make(map[uint64]IProxy), | ||
} | ||
} | ||
|
||
// NextProxy selects the appropriate proxy for a given client connection. It first tries to find an existing | ||
// proxy in the hash map based on the hashed key (either the source IP or the full address). If no match is found, | ||
// it falls back to the original load balancing strategy, adds the selected proxy to the hash map, and returns it. | ||
func (ch *ConsistentHash) NextProxy(conn IConnWrapper) (IProxy, *gerr.GatewayDError) { | ||
ch.mu.Lock() | ||
defer ch.mu.Unlock() | ||
|
||
var key string | ||
|
||
if ch.useSourceIP { | ||
sourceIP, err := extractIPFromConn(conn) | ||
if err != nil { | ||
return nil, gerr.ErrNoProxiesAvailable.Wrap(err) | ||
} | ||
key = sourceIP | ||
} else { | ||
key = conn.LocalAddr().String() // Fallback to use full address as the key if `useSourceIp` is false | ||
} | ||
|
||
hash := hashKey(key) | ||
|
||
proxy, exists := ch.hashMap[hash] | ||
|
||
if exists { | ||
return proxy, nil | ||
} | ||
|
||
// If no hash exists, fallback to the original strategy | ||
proxy, err := ch.originalStrategy.NextProxy(conn) | ||
if err != nil { | ||
return nil, gerr.ErrNoProxiesAvailable.Wrap(err) | ||
} | ||
|
||
// Add the selected proxy to the hash map for future requests | ||
ch.hashMap[hash] = proxy | ||
|
||
return proxy, nil | ||
} | ||
|
||
// hashKey hashes a given key using the MurmurHash3 algorithm. It is used to generate consistent hash values | ||
// for IP addresses or connection strings. | ||
func hashKey(key string) uint64 { | ||
return murmur3.Sum64([]byte(key)) | ||
} | ||
|
||
// extractIPFromConn extracts the IP address from the connection's local address. It splits the address | ||
// into IP and port components and returns the IP part. This is useful for hashing based on the source IP. | ||
func extractIPFromConn(con IConnWrapper) (string, error) { | ||
addr := con.LocalAddr().String() | ||
// addr will be in the format "IP:port" | ||
ip, _, err := net.SplitHostPort(addr) | ||
if err != nil { | ||
return "", fmt.Errorf("failed to split host and port from address %s: %w", addr, err) | ||
} | ||
return ip, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,154 @@ | ||
package network | ||
|
||
import ( | ||
"net" | ||
"sync" | ||
"testing" | ||
|
||
"github.com/gatewayd-io/gatewayd/config" | ||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
// TestNewConsistentHash verifies that a new ConsistentHash instance is properly created. | ||
// It checks that the original load balancing strategy is preserved, that the useSourceIp | ||
// setting is correctly applied, and that the hashMap is initialized. | ||
func TestNewConsistentHash(t *testing.T) { | ||
server := &Server{ | ||
LoadbalancerConsistentHash: &config.ConsistentHash{UseSourceIP: true}, | ||
} | ||
originalStrategy := NewRandom(server) | ||
consistentHash := NewConsistentHash(server, originalStrategy) | ||
|
||
assert.NotNil(t, consistentHash) | ||
assert.Equal(t, originalStrategy, consistentHash.originalStrategy) | ||
assert.True(t, consistentHash.useSourceIP) | ||
assert.NotNil(t, consistentHash.hashMap) | ||
} | ||
|
||
// TestConsistentHashNextProxyUseSourceIpExists ensures that when useSourceIp is enabled, | ||
// and the hashed IP exists in the hashMap, the correct proxy is returned. | ||
// It mocks a connection with a specific IP and verifies the proxy retrieval from the hashMap. | ||
func TestConsistentHashNextProxyUseSourceIpExists(t *testing.T) { | ||
proxies := []IProxy{ | ||
MockProxy{name: "proxy1"}, | ||
MockProxy{name: "proxy2"}, | ||
MockProxy{name: "proxy3"}, | ||
} | ||
server := &Server{ | ||
Proxies: proxies, | ||
LoadbalancerConsistentHash: &config.ConsistentHash{UseSourceIP: true}, | ||
} | ||
originalStrategy := NewRandom(server) | ||
consistentHash := NewConsistentHash(server, originalStrategy) | ||
mockConn := new(MockConnWrapper) | ||
|
||
// Mock LocalAddr to return a specific IP:port format | ||
mockAddr := &net.TCPAddr{IP: net.ParseIP("192.168.1.1"), Port: 1234} | ||
mockConn.On("LocalAddr").Return(mockAddr) | ||
|
||
key := "192.168.1.1" | ||
hash := hashKey(key) | ||
|
||
consistentHash.hashMap[hash] = proxies[2] | ||
|
||
proxy, err := consistentHash.NextProxy(mockConn) | ||
assert.Nil(t, err) | ||
assert.Equal(t, proxies[2], proxy) | ||
|
||
// Clean up | ||
mockConn.AssertExpectations(t) | ||
} | ||
|
||
// TestConsistentHashNextProxyUseFullAddress verifies the behavior when useSourceIp is disabled. | ||
// It ensures that the full connection address is used for hashing, and the correct proxy is returned | ||
// and cached in the hashMap. The test also checks that the hash value is computed based on the full address. | ||
func TestConsistentHashNextProxyUseFullAddress(t *testing.T) { | ||
mockConn := new(MockConnWrapper) | ||
proxies := []IProxy{ | ||
MockProxy{name: "proxy1"}, | ||
MockProxy{name: "proxy2"}, | ||
MockProxy{name: "proxy3"}, | ||
} | ||
server := &Server{ | ||
Proxies: proxies, | ||
LoadbalancerConsistentHash: &config.ConsistentHash{ | ||
UseSourceIP: false, | ||
}, | ||
} | ||
mockStrategy := NewRoundRobin(server) | ||
|
||
// Mock LocalAddr to return full address | ||
mockAddr := &net.TCPAddr{IP: net.ParseIP("192.168.1.1"), Port: 1234} | ||
mockConn.On("LocalAddr").Return(mockAddr) | ||
|
||
consistentHash := NewConsistentHash(server, mockStrategy) | ||
|
||
proxy, err := consistentHash.NextProxy(mockConn) | ||
assert.Nil(t, err) | ||
assert.NotNil(t, proxy) | ||
assert.Equal(t, proxies[1], proxy) | ||
|
||
// Hash should be calculated using the full address and cached in hashMap | ||
hash := hashKey("192.168.1.1:1234") | ||
cachedProxy, exists := consistentHash.hashMap[hash] | ||
|
||
assert.True(t, exists) | ||
assert.Equal(t, proxies[1], cachedProxy) | ||
|
||
// Clean up | ||
mockConn.AssertExpectations(t) | ||
} | ||
|
||
// TestConsistentHashNextProxyConcurrency tests the concurrency safety of the NextProxy method | ||
// in the ConsistentHash struct. It ensures that multiple goroutines can concurrently call | ||
// NextProxy without causing race conditions or inconsistent behavior. | ||
func TestConsistentHashNextProxyConcurrency(t *testing.T) { | ||
// Setup mocks | ||
conn1 := new(MockConnWrapper) | ||
conn2 := new(MockConnWrapper) | ||
proxies := []IProxy{ | ||
MockProxy{name: "proxy1"}, | ||
MockProxy{name: "proxy2"}, | ||
MockProxy{name: "proxy3"}, | ||
} | ||
server := &Server{ | ||
Proxies: proxies, | ||
LoadbalancerConsistentHash: &config.ConsistentHash{UseSourceIP: true}, | ||
} | ||
originalStrategy := NewRoundRobin(server) | ||
|
||
// Mock IP addresses | ||
mockAddr1 := &net.TCPAddr{IP: net.ParseIP("192.168.1.1"), Port: 1234} | ||
mockAddr2 := &net.TCPAddr{IP: net.ParseIP("192.168.1.2"), Port: 1234} | ||
conn1.On("LocalAddr").Return(mockAddr1) | ||
conn2.On("LocalAddr").Return(mockAddr2) | ||
|
||
// Initialize the ConsistentHash | ||
consistentHash := NewConsistentHash(server, originalStrategy) | ||
|
||
// Run the test concurrently | ||
var waitGroup sync.WaitGroup | ||
const numGoroutines = 100 | ||
|
||
for range numGoroutines { | ||
waitGroup.Add(1) | ||
go func() { | ||
defer waitGroup.Done() | ||
p, err := consistentHash.NextProxy(conn1) | ||
assert.Nil(t, err) | ||
assert.Equal(t, proxies[1], p) | ||
}() | ||
} | ||
|
||
waitGroup.Wait() | ||
|
||
// Ensure that the proxy is consistently the same | ||
proxy, err := consistentHash.NextProxy(conn1) | ||
assert.Nil(t, err) | ||
assert.Equal(t, proxies[1], proxy) | ||
|
||
// Ensure that connecting from a different address returns a different proxy | ||
proxy, err = consistentHash.NextProxy(conn2) | ||
assert.Nil(t, err) | ||
assert.Equal(t, proxies[2], proxy) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you could use sync.Map, which handles locking behind the scenes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At first, I messed up by just locking the
load
andstore
process. Now, I’ve added async.Mutex
to lock the wholeNextProxy
connection. also added a test case to check for concurrency. (so I guess we can not use sync.Map here anymore)