Skip to content

Commit

Permalink
feat: Add weighted round-robin state to FSM snapshots
Browse files Browse the repository at this point in the history
Add support for persisting and restoring weighted round-robin load balancer
states in the Raft FSM snapshots. This ensures the weighted round-robin
configuration survives cluster restarts and leader changes.

Changes:
- Add weightedRRStates to FSMSnapshot struct
- Update Snapshot() to copy weighted round-robin states
- Extend Restore() and Persist() to handle weighted round-robin data
  • Loading branch information
sinadarbouy committed Dec 19, 2024
1 parent 5860abb commit 9a14cef
Showing 1 changed file with 22 additions and 7 deletions.
29 changes: 22 additions & 7 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func (f *FSM) Snapshot() (raft.FSMSnapshot, error) {
f.mu.RLock()
defer f.mu.RUnlock()

// Create copies of both maps
// Create copies of all maps
hashMapCopy := make(map[string]string)
for k, v := range f.lbHashToBlockName {
hashMapCopy[k] = v
Expand All @@ -463,17 +463,28 @@ func (f *FSM) Snapshot() (raft.FSMSnapshot, error) {
roundRobinCopy[k] = v.Load()
}

// Copy weightedRRStates
weightedRRCopy := make(map[string]map[string]WeightedProxy)
for groupName, group := range f.weightedRRStates {
weightedRRCopy[groupName] = make(map[string]WeightedProxy)
for proxyName, weight := range group {
weightedRRCopy[groupName][proxyName] = weight
}
}

return &FSMSnapshot{
lbHashToBlockName: hashMapCopy,
roundRobinIndex: roundRobinCopy,
weightedRRStates: weightedRRCopy,
}, nil
}

// Restore restores the FSM from a snapshot.
func (f *FSM) Restore(readCloser io.ReadCloser) error {
var data struct {
HashToBlock map[string]string `json:"hashToBlock"`
RoundRobin map[string]uint32 `json:"roundRobin"`
HashToBlock map[string]string `json:"hashToBlock"`
RoundRobin map[string]uint32 `json:"roundRobin"`
WeightedRRState map[string]map[string]WeightedProxy `json:"weightedRrState"`
}

if err := json.NewDecoder(readCloser).Decode(&data); err != nil {
Expand All @@ -490,6 +501,7 @@ func (f *FSM) Restore(readCloser io.ReadCloser) error {
atomicVal.Store(v)
f.roundRobinIndex[k] = atomicVal
}
f.weightedRRStates = data.WeightedRRState

return nil
}
Expand All @@ -498,16 +510,19 @@ func (f *FSM) Restore(readCloser io.ReadCloser) error {
type FSMSnapshot struct {
lbHashToBlockName map[string]string
roundRobinIndex map[string]uint32
weightedRRStates map[string]map[string]WeightedProxy
}

// Persist writes the FSMSnapshot data to the given SnapshotSink.
func (f *FSMSnapshot) Persist(sink raft.SnapshotSink) error {
data := struct {
HashToBlock map[string]string `json:"hashToBlock"`
RoundRobin map[string]uint32 `json:"roundRobin"`
HashToBlock map[string]string `json:"hashToBlock"`
RoundRobin map[string]uint32 `json:"roundRobin"`
WeightedRRState map[string]map[string]WeightedProxy `json:"weightedRrState"`
}{
HashToBlock: f.lbHashToBlockName,
RoundRobin: f.roundRobinIndex,
HashToBlock: f.lbHashToBlockName,
RoundRobin: f.roundRobinIndex,
WeightedRRState: f.weightedRRStates,
}

if err := json.NewEncoder(sink).Encode(data); err != nil {
Expand Down

0 comments on commit 9a14cef

Please sign in to comment.