From 4fe2d196cc125082b5ad735c0fa2ca453903e210 Mon Sep 17 00:00:00 2001 From: jules01 Date: Fri, 21 Oct 2022 20:19:48 +0300 Subject: [PATCH] - fix after review: code optimization --- heartbeat/monitor/errors.go | 2 - heartbeat/monitor/heartbeatMessage.go | 84 +++++++++++++++++---------- heartbeat/monitor/monitor.go | 51 +++------------- heartbeat/monitor/monitor_test.go | 28 ++++++++- 4 files changed, 89 insertions(+), 76 deletions(-) diff --git a/heartbeat/monitor/errors.go b/heartbeat/monitor/errors.go index f88f18f373e..30aef5e971b 100644 --- a/heartbeat/monitor/errors.go +++ b/heartbeat/monitor/errors.go @@ -4,6 +4,4 @@ import "errors" var ( errEmptyHeartbeatMessagesInstance = errors.New("programming error: empty heartbeatMessages instance") - errInconsistentActivePidsList = errors.New("programming error: inconsistent active pids list") - errInconsistentActiveMap = errors.New("programming error: inconsistent active map") ) diff --git a/heartbeat/monitor/heartbeatMessage.go b/heartbeat/monitor/heartbeatMessage.go index a7c05bd6af4..aa376ab3da2 100644 --- a/heartbeat/monitor/heartbeatMessage.go +++ b/heartbeat/monitor/heartbeatMessage.go @@ -1,62 +1,86 @@ package monitor import ( - "sort" - "github.com/ElrondNetwork/elrond-go-core/core" "github.com/ElrondNetwork/elrond-go/heartbeat/data" ) type heartbeatMessages struct { - activePids []core.PeerID - activeHeartbeats map[core.PeerID]*data.PubKeyHeartbeat - inactiveHeartbeats map[core.PeerID]*data.PubKeyHeartbeat + firstActivePid core.PeerID + firstActiveHeartbeat *data.PubKeyHeartbeat + numActivePids uint64 + latestInactivePid core.PeerID + latestInactiveHeartbeat *data.PubKeyHeartbeat + inactivePids []core.PeerID } func newHeartbeatMessages() *heartbeatMessages { - return &heartbeatMessages{ - activePids: make([]core.PeerID, 0), - activeHeartbeats: make(map[core.PeerID]*data.PubKeyHeartbeat), - inactiveHeartbeats: make(map[core.PeerID]*data.PubKeyHeartbeat), - } + return &heartbeatMessages{} } func (instance *heartbeatMessages) addMessage(pid core.PeerID, message *data.PubKeyHeartbeat) { if message.IsActive { - instance.activePids = append(instance.activePids, pid) - instance.activeHeartbeats[pid] = message + instance.handleActiveMessage(pid, message) + return + } + + instance.handleInactiveMessage(pid, message) +} + +func (instance *heartbeatMessages) handleActiveMessage(pid core.PeerID, message *data.PubKeyHeartbeat) { + instance.numActivePids++ + if len(instance.firstActivePid) == 0 { + instance.firstActivePid = pid + instance.firstActiveHeartbeat = message return } - instance.inactiveHeartbeats[pid] = message + if string(instance.firstActivePid) > string(pid) { + instance.firstActivePid = pid + instance.firstActiveHeartbeat = message + } +} + +func (instance *heartbeatMessages) handleInactiveMessage(pid core.PeerID, message *data.PubKeyHeartbeat) { + instance.inactivePids = append(instance.inactivePids, pid) + + if instance.latestInactiveHeartbeat == nil { + instance.latestInactivePid = pid + instance.latestInactiveHeartbeat = message + return + } + + if message.TimeStamp.Unix() > instance.latestInactiveHeartbeat.TimeStamp.Unix() { + instance.latestInactivePid = pid + instance.latestInactiveHeartbeat = message + } } func (instance *heartbeatMessages) getHeartbeat() (*data.PubKeyHeartbeat, error) { - if len(instance.activeHeartbeats) > 0 { - return instance.getFirstActive() + if instance.firstActiveHeartbeat != nil { + return instance.firstActiveHeartbeat, nil } - if len(instance.inactiveHeartbeats) > 0 { - for _, inactive := range instance.inactiveHeartbeats { - return inactive, nil - } + if instance.latestInactiveHeartbeat != nil { + return instance.latestInactiveHeartbeat, nil } return nil, errEmptyHeartbeatMessagesInstance } -func (instance *heartbeatMessages) getFirstActive() (*data.PubKeyHeartbeat, error) { - sort.Slice(instance.activePids, func(i, j int) bool { - return string(instance.activePids[i]) < string(instance.activePids[j]) - }) - - if len(instance.activePids) == 0 { - return nil, errInconsistentActivePidsList +func (instance *heartbeatMessages) getInactivePids() []core.PeerID { + if instance.firstActiveHeartbeat != nil { + // at least one active heartbeat, return all pids containing inactive messages + return instance.inactivePids } - message, found := instance.activeHeartbeats[instance.activePids[0]] - if !found { - return nil, errInconsistentActiveMap + result := make([]core.PeerID, 0, len(instance.inactivePids)) + for _, pid := range instance.inactivePids { + if pid == instance.latestInactivePid { + continue + } + + result = append(result, pid) } - return message, nil + return result } diff --git a/heartbeat/monitor/monitor.go b/heartbeat/monitor/monitor.go index 8bdee9230f8..bb6af9ba559 100644 --- a/heartbeat/monitor/monitor.go +++ b/heartbeat/monitor/monitor.go @@ -90,7 +90,6 @@ func checkArgs(args ArgHeartbeatV2Monitor) error { // GetHeartbeats returns the heartbeat status func (monitor *heartbeatV2Monitor) GetHeartbeats() []data.PubKeyHeartbeat { heartbeatMessagesMap := monitor.processRAWDataFromCache() - monitor.removeFromInactiveNodes(heartbeatMessagesMap) heartbeatsV2 := make([]data.PubKeyHeartbeat, 0) for _, heartbeatMessagesInstance := range heartbeatMessagesMap { @@ -100,8 +99,10 @@ func (monitor *heartbeatV2Monitor) GetHeartbeats() []data.PubKeyHeartbeat { continue } - message.NumInstances = uint64(len(heartbeatMessagesInstance.activeHeartbeats)) + message.NumInstances = heartbeatMessagesInstance.numActivePids heartbeatsV2 = append(heartbeatsV2, *message) + + monitor.removeInactive(heartbeatMessagesInstance.getInactivePids()) } sort.Slice(heartbeatsV2, func(i, j int) bool { @@ -111,6 +112,12 @@ func (monitor *heartbeatV2Monitor) GetHeartbeats() []data.PubKeyHeartbeat { return heartbeatsV2 } +func (monitor *heartbeatV2Monitor) removeInactive(pids []core.PeerID) { + for _, pid := range pids { + monitor.cache.Remove([]byte(pid)) + } +} + func (monitor *heartbeatV2Monitor) processRAWDataFromCache() map[string]*heartbeatMessages { pids := monitor.cache.Keys() @@ -142,46 +149,6 @@ func (monitor *heartbeatV2Monitor) processRAWDataFromCache() map[string]*heartbe return heartbeatsV2 } -func (monitor *heartbeatV2Monitor) removeFromInactiveNodes(heartbeatsV2 map[string]*heartbeatMessages) { - for _, heartbeatMessagesInstance := range heartbeatsV2 { - if len(heartbeatMessagesInstance.activeHeartbeats) > 0 { - // at least one active heartbeat message found, old inactive messages should be discarded - monitor.removeAllInactiveMessages(heartbeatMessagesInstance) - continue - } - - monitor.removeInactiveMessagesExceptLatest(heartbeatMessagesInstance) - } -} - -func (monitor *heartbeatV2Monitor) removeAllInactiveMessages(heartbeatMessagesInstance *heartbeatMessages) { - for pid := range heartbeatMessagesInstance.inactiveHeartbeats { - monitor.cache.Remove([]byte(pid)) - } - - heartbeatMessagesInstance.inactiveHeartbeats = make(map[core.PeerID]*data.PubKeyHeartbeat) -} - -func (monitor *heartbeatV2Monitor) removeInactiveMessagesExceptLatest(heartbeatMessagesInstance *heartbeatMessages) { - latestPid := core.PeerID("") - latestTimeStamp := int64(0) - for pid, heartbeatMessage := range heartbeatMessagesInstance.inactiveHeartbeats { - if heartbeatMessage.TimeStamp.Unix() > latestTimeStamp { - latestTimeStamp = heartbeatMessage.TimeStamp.Unix() - latestPid = pid - } - } - - for pid := range heartbeatMessagesInstance.inactiveHeartbeats { - if pid == latestPid { - continue - } - - monitor.cache.Remove([]byte(pid)) - delete(heartbeatMessagesInstance.inactiveHeartbeats, pid) - } -} - func (monitor *heartbeatV2Monitor) parseMessage(pid core.PeerID, message interface{}) (*data.PubKeyHeartbeat, error) { heartbeatV2, ok := message.(*heartbeat.HeartbeatV2) if !ok { diff --git a/heartbeat/monitor/monitor_test.go b/heartbeat/monitor/monitor_test.go index 9b8ecdd8e87..424919bfdc5 100644 --- a/heartbeat/monitor/monitor_test.go +++ b/heartbeat/monitor/monitor_test.go @@ -385,7 +385,7 @@ func TestHeartbeatV2Monitor_GetHeartbeats(t *testing.T) { assert.Equal(t, 0, len(heartbeats)) assert.Equal(t, 0, args.Cache.Len()) }) - t.Run("should work", func(t *testing.T) { + t.Run("should work with 2 public keys on different pids and one public key", func(t *testing.T) { t.Parallel() args := createMockHeartbeatV2MonitorArgs() providedStatuses := []bool{true, true, true} @@ -418,7 +418,31 @@ func TestHeartbeatV2Monitor_GetHeartbeats(t *testing.T) { } checkResults(t, providedMessages[i], heartbeats[i], providedStatuses[i], providedPids, numInstances) } - assert.Equal(t, 1, len(providedPids)) // 1 inactive was removed from the heartbeat list + assert.Equal(t, 1, len(providedPids)) // 1 active was removed from the heartbeat list + }) + t.Run("should choose the 'smaller' pid if multiple active messages are found (sort should work)", func(t *testing.T) { + t.Parallel() + args := createMockHeartbeatV2MonitorArgs() + providedStatuses := []bool{true, true, true} + numOfMessages := len(providedStatuses) + providedPids := make(map[string]struct{}, numOfMessages) + providedMessages := make([]*heartbeat.HeartbeatV2, numOfMessages) + for i := numOfMessages - 1; i >= 0; i-- { + pid := core.PeerID(fmt.Sprintf("%s%d", "pid", i)) + providedPids[pid.Pretty()] = struct{}{} + + pkBytes := []byte("same pk") + providedMessages[i] = createHeartbeatMessage(providedStatuses[i], pkBytes) + + args.Cache.Put(pid.Bytes(), providedMessages[i], providedMessages[i].Size()) + } + + monitor, _ := NewHeartbeatV2Monitor(args) + assert.False(t, check.IfNil(monitor)) + + heartbeats := monitor.GetHeartbeats() + assert.Equal(t, 1, len(heartbeats)) + checkResults(t, providedMessages[0], heartbeats[0], providedStatuses[0], providedPids, 3) }) }