Skip to content

Commit

Permalink
refactor: replace mutex using NodeManager and atomic
Browse files Browse the repository at this point in the history
  • Loading branch information
anniemon committed Aug 26, 2024
1 parent 249f2a4 commit b5ead45
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 154 deletions.
Binary file modified goLoadBalancer
Binary file not shown.
170 changes: 85 additions & 85 deletions loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,123 +3,123 @@ package main
import (
"log"
"net/http"
"sync"
"sync/atomic"
"time"

"github.com/jonboulle/clockwork"
)

type LoadBalancer struct {
Nodes []*Node
Next int
Mutex sync.Mutex
Clock clockwork.Clock
// TODO: refactor to separate NodeManager file
type NodeManager interface {
GetNextNode() *Node
isRateLimitExceeded(node *Node, bodyLen int) bool
ResetLimits()
CheckHealth()
}

func NewLoadBalancer(clock clockwork.Clock) *LoadBalancer {
return &LoadBalancer{
Nodes: []*Node{},
Next: 0,
Mutex: sync.Mutex{},
Clock: clock,
}
type SafeNodeManager struct {
nodes []*Node
next int32
clock clockwork.Clock
}

func (lb *LoadBalancer) AddNode(node *Node) {
lb.Nodes = append(lb.Nodes, node)
type LoadBalancer struct {
manager NodeManager
}

func (lb *LoadBalancer) GetNextNode() *Node {
lb.Mutex.Lock()
defer lb.Mutex.Unlock()
func NewSafeNodeManager(nodes []*Node, clock clockwork.Clock) *SafeNodeManager {
return &SafeNodeManager{
nodes: nodes,
next: 0,
clock: clock,
}
}

node := lb.Nodes[lb.Next]
lb.Next = (lb.Next + 1) % len(lb.Nodes)
func (m *SafeNodeManager) GetNextNode() *Node {
idx := atomic.AddInt32(&m.next, 1) % int32(len(m.nodes))
node := m.nodes[idx]

if node.Healthy {
return node
if node.Healthy == 1 {
return node
}

log.Printf("Skipping unhealthy node %d (%s)",
node.ID, node.URL)
log.Printf("Skipping unhealthy node %d (%s)", node.ID, node.URL)
return nil
}

func isRateLimitExceeded(node *Node, bodyLen int) (bool) {
if node.ReqCount >= node.ReqLimit || node.BodyCount+bodyLen > node.BodyLimit {
return true
}
return false
}

func (lb *LoadBalancer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
bodyLen := int(r.ContentLength)

for i := 0; i < len(lb.Nodes); i++ {
node := lb.GetNextNode()

if node == nil {
continue
}

node.Mutex.Lock()
defer node.Mutex.Unlock()

if isRateLimitExceeded(node, bodyLen) {
func (m *SafeNodeManager) isRateLimitExceeded(node *Node, bodyLen int) bool {
if atomic.LoadUint32(&node.ReqCount) >= node.ReqLimit ||
atomic.LoadUint64(&node.BodyCount)+uint64(bodyLen) > node.BodyLimit {
log.Printf("Rate limit hit for node %d (%s) - RPM: %d/%d, BodyLimit: %d, RequestBody: %d\n",
node.ID, node.URL, node.ReqCount, node.ReqLimit, node.BodyLimit, bodyLen)

continue
}

node.ReqCount++
node.BodyCount += bodyLen

log.Printf("Forwarding request to node %d (%s) - RPM: %d/%d, BPM: %d/%d\n",
node.ID, node.URL, node.ReqCount, node.ReqLimit, node.BodyCount, node.BodyLimit)

node.ReverseProxy.ServeHTTP(w, r)
return
return true
}

log.Println("No available node")
http.Error(w, "No available node", http.StatusServiceUnavailable)
atomic.AddUint32(&node.ReqCount, 1)
atomic.AddUint64(&node.BodyCount, uint64(bodyLen))

return false
}

func (lb *LoadBalancer) StartPeriodicTasks() {
lb.Clock.AfterFunc(1*time.Minute, lb.resetLimits)
lb.Clock.AfterFunc(30*time.Second, lb.checkHealth)
func (m *SafeNodeManager) ResetLimits() {
for _, node := range m.nodes {
atomic.StoreUint32(&node.ReqCount, 0)
atomic.StoreUint64(&node.BodyCount, 0)
}
}

func (lb *LoadBalancer) resetLimits() {
for _, node := range lb.Nodes {
node.Mutex.Lock()
node.ReqCount = 0
node.BodyCount = 0
node.Mutex.Unlock()
func (m *SafeNodeManager) CheckHealth() {
for _, node := range m.nodes {
go func(n *Node) {
resp, err := http.Get(n.URL + "/health")

if err != nil || resp.StatusCode != http.StatusOK {
atomic.StoreUint32(&n.Healthy, 0)
log.Printf("Node %d (%s) is unhealthy\n", n.ID, n.URL)
} else {
atomic.StoreUint32(&n.Healthy, 1)
log.Printf("Node %d (%s) is healthy\n", n.ID, n.URL)
}

if resp != nil {
resp.Body.Close()
}
}(node)
}
lb.Clock.AfterFunc(1*time.Minute, lb.resetLimits)
}

func (lb *LoadBalancer) checkHealth() {
for _, node := range lb.Nodes {
go func(n *Node) {
resp, err := http.Get(n.URL + "/health")
n.Mutex.Lock()
func NewLoadBalancer(manager NodeManager) *LoadBalancer {
return &LoadBalancer{
manager: manager,
}
}

if err != nil || resp.StatusCode != http.StatusOK {
n.Healthy = false
log.Printf("Node %d (%s) is unhealthy\n", node.ID, node.URL)
func (lb *LoadBalancer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
bodyLen := int(r.ContentLength)
nodeLen := len(lb.manager.(*SafeNodeManager).nodes)

} else {
n.Healthy = true
log.Printf("Node %d (%s) is healthy\n", node.ID, node.URL)
var node *Node
for i := 0; i < nodeLen; i++ {
node = lb.manager.GetNextNode()
if node == nil {
continue
}
n.Mutex.Unlock()
if resp != nil {
resp.Body.Close()

if lb.manager.isRateLimitExceeded(node, bodyLen) {
continue
}
}(node)

log.Printf("Forwarding request to node %d (%s)\n - RPM: %d/%d, BPM: %d/%d\n",
node.ID, node.URL, node.ReqCount, node.ReqLimit, node.BodyCount, node.BodyLimit)

node.ReverseProxy.ServeHTTP(w, r)
return;
}
lb.Clock.AfterFunc(30*time.Second, lb.checkHealth)

http.Error(w, "No available node", http.StatusServiceUnavailable)
}

func (m *SafeNodeManager) StartPeriodicTasks() {
m.clock.AfterFunc(1*time.Minute, m.ResetLimits)
m.clock.AfterFunc(30*time.Second, m.CheckHealth)
}
22 changes: 11 additions & 11 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,21 @@ import (
)

func main() {
clock := clockwork.NewRealClock()
lb := NewLoadBalancer(clock)

nodes := []NodeParams{
{ID: 1, URL: "http://localhost:8081", ReqLimit: 2, BodyLimit: 123},
{ID: 2, URL: "http://localhost:8082", ReqLimit: 5, BodyLimit: 2 * 1024 * 1024},
{ID: 3, URL: "http://localhost:8083", ReqLimit: 7, BodyLimit: 1 * 1024 * 1024},
nodeParams := []NodeParams{
{ID: 1, URL: "http://localhost:8081", ReqLimit: 2, BodyLimit: 123},
{ID: 2, URL: "http://localhost:8082", ReqLimit: 5, BodyLimit: 2 * 1024 * 1024},
{ID: 3, URL: "http://localhost:8083", ReqLimit: 7, BodyLimit: 1 * 1024 * 1024},
}

for _, nodeParams := range nodes {
go startBackendServer(nodeParams.ID, nodeParams.URL)
lb.AddNode(NewNode(nodeParams))
nodes := make([]*Node, len(nodeParams))
for i, params := range nodeParams {
go startBackendServer(params.ID, params.URL)
nodes[i] = NewNode(params)
}

lb.StartPeriodicTasks()
nodeManager := NewSafeNodeManager(nodes, clockwork.NewRealClock())
nodeManager.StartPeriodicTasks()
lb := NewLoadBalancer(nodeManager)

time.Sleep(1 * time.Second)

Expand Down
Loading

0 comments on commit b5ead45

Please sign in to comment.