From 9a14cefb000b0dc3d5a0059a3c36dfe6ee147a72 Mon Sep 17 00:00:00 2001 From: Sina Darbouy Date: Thu, 19 Dec 2024 15:48:49 +0100 Subject: [PATCH] feat: Add weighted round-robin state to FSM snapshots Add support for persisting and restoring weighted round-robin load balancer states in the Raft FSM snapshots. This ensures the weighted round-robin configuration survives cluster restarts and leader changes. Changes: - Add weightedRRStates to FSMSnapshot struct - Update Snapshot() to copy weighted round-robin states - Extend Restore() and Persist() to handle weighted round-robin data --- raft/raft.go | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/raft/raft.go b/raft/raft.go index 69bc13ae..270492b7 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -452,7 +452,7 @@ func (f *FSM) Snapshot() (raft.FSMSnapshot, error) { f.mu.RLock() defer f.mu.RUnlock() - // Create copies of both maps + // Create copies of all maps hashMapCopy := make(map[string]string) for k, v := range f.lbHashToBlockName { hashMapCopy[k] = v @@ -463,17 +463,28 @@ func (f *FSM) Snapshot() (raft.FSMSnapshot, error) { roundRobinCopy[k] = v.Load() } + // Copy weightedRRStates + weightedRRCopy := make(map[string]map[string]WeightedProxy) + for groupName, group := range f.weightedRRStates { + weightedRRCopy[groupName] = make(map[string]WeightedProxy) + for proxyName, weight := range group { + weightedRRCopy[groupName][proxyName] = weight + } + } + return &FSMSnapshot{ lbHashToBlockName: hashMapCopy, roundRobinIndex: roundRobinCopy, + weightedRRStates: weightedRRCopy, }, nil } // Restore restores the FSM from a snapshot. func (f *FSM) Restore(readCloser io.ReadCloser) error { var data struct { - HashToBlock map[string]string `json:"hashToBlock"` - RoundRobin map[string]uint32 `json:"roundRobin"` + HashToBlock map[string]string `json:"hashToBlock"` + RoundRobin map[string]uint32 `json:"roundRobin"` + WeightedRRState map[string]map[string]WeightedProxy `json:"weightedRrState"` } if err := json.NewDecoder(readCloser).Decode(&data); err != nil { @@ -490,6 +501,7 @@ func (f *FSM) Restore(readCloser io.ReadCloser) error { atomicVal.Store(v) f.roundRobinIndex[k] = atomicVal } + f.weightedRRStates = data.WeightedRRState return nil } @@ -498,16 +510,19 @@ func (f *FSM) Restore(readCloser io.ReadCloser) error { type FSMSnapshot struct { lbHashToBlockName map[string]string roundRobinIndex map[string]uint32 + weightedRRStates map[string]map[string]WeightedProxy } // Persist writes the FSMSnapshot data to the given SnapshotSink. func (f *FSMSnapshot) Persist(sink raft.SnapshotSink) error { data := struct { - HashToBlock map[string]string `json:"hashToBlock"` - RoundRobin map[string]uint32 `json:"roundRobin"` + HashToBlock map[string]string `json:"hashToBlock"` + RoundRobin map[string]uint32 `json:"roundRobin"` + WeightedRRState map[string]map[string]WeightedProxy `json:"weightedRrState"` }{ - HashToBlock: f.lbHashToBlockName, - RoundRobin: f.roundRobinIndex, + HashToBlock: f.lbHashToBlockName, + RoundRobin: f.roundRobinIndex, + WeightedRRState: f.weightedRRStates, } if err := json.NewEncoder(sink).Encode(data); err != nil {