Skip to content

Commit

Permalink
- fix after review: code optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
iulianpascalau committed Oct 21, 2022
1 parent d8a0730 commit 4fe2d19
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 76 deletions.
2 changes: 0 additions & 2 deletions heartbeat/monitor/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,4 @@ import "errors"

var (
errEmptyHeartbeatMessagesInstance = errors.New("programming error: empty heartbeatMessages instance")
errInconsistentActivePidsList = errors.New("programming error: inconsistent active pids list")
errInconsistentActiveMap = errors.New("programming error: inconsistent active map")
)
84 changes: 54 additions & 30 deletions heartbeat/monitor/heartbeatMessage.go
Original file line number Diff line number Diff line change
@@ -1,62 +1,86 @@
package monitor

import (
"sort"

"github.com/ElrondNetwork/elrond-go-core/core"
"github.com/ElrondNetwork/elrond-go/heartbeat/data"
)

type heartbeatMessages struct {
activePids []core.PeerID
activeHeartbeats map[core.PeerID]*data.PubKeyHeartbeat
inactiveHeartbeats map[core.PeerID]*data.PubKeyHeartbeat
firstActivePid core.PeerID
firstActiveHeartbeat *data.PubKeyHeartbeat
numActivePids uint64
latestInactivePid core.PeerID
latestInactiveHeartbeat *data.PubKeyHeartbeat
inactivePids []core.PeerID
}

func newHeartbeatMessages() *heartbeatMessages {
return &heartbeatMessages{
activePids: make([]core.PeerID, 0),
activeHeartbeats: make(map[core.PeerID]*data.PubKeyHeartbeat),
inactiveHeartbeats: make(map[core.PeerID]*data.PubKeyHeartbeat),
}
return &heartbeatMessages{}
}

func (instance *heartbeatMessages) addMessage(pid core.PeerID, message *data.PubKeyHeartbeat) {
if message.IsActive {
instance.activePids = append(instance.activePids, pid)
instance.activeHeartbeats[pid] = message
instance.handleActiveMessage(pid, message)
return
}

instance.handleInactiveMessage(pid, message)
}

func (instance *heartbeatMessages) handleActiveMessage(pid core.PeerID, message *data.PubKeyHeartbeat) {
instance.numActivePids++

if len(instance.firstActivePid) == 0 {
instance.firstActivePid = pid
instance.firstActiveHeartbeat = message
return
}

instance.inactiveHeartbeats[pid] = message
if string(instance.firstActivePid) > string(pid) {
instance.firstActivePid = pid
instance.firstActiveHeartbeat = message
}
}

func (instance *heartbeatMessages) handleInactiveMessage(pid core.PeerID, message *data.PubKeyHeartbeat) {
instance.inactivePids = append(instance.inactivePids, pid)

if instance.latestInactiveHeartbeat == nil {
instance.latestInactivePid = pid
instance.latestInactiveHeartbeat = message
return
}

if message.TimeStamp.Unix() > instance.latestInactiveHeartbeat.TimeStamp.Unix() {
instance.latestInactivePid = pid
instance.latestInactiveHeartbeat = message
}
}

func (instance *heartbeatMessages) getHeartbeat() (*data.PubKeyHeartbeat, error) {
if len(instance.activeHeartbeats) > 0 {
return instance.getFirstActive()
if instance.firstActiveHeartbeat != nil {
return instance.firstActiveHeartbeat, nil
}
if len(instance.inactiveHeartbeats) > 0 {
for _, inactive := range instance.inactiveHeartbeats {
return inactive, nil
}
if instance.latestInactiveHeartbeat != nil {
return instance.latestInactiveHeartbeat, nil
}
return nil, errEmptyHeartbeatMessagesInstance
}

func (instance *heartbeatMessages) getFirstActive() (*data.PubKeyHeartbeat, error) {
sort.Slice(instance.activePids, func(i, j int) bool {
return string(instance.activePids[i]) < string(instance.activePids[j])
})

if len(instance.activePids) == 0 {
return nil, errInconsistentActivePidsList
func (instance *heartbeatMessages) getInactivePids() []core.PeerID {
if instance.firstActiveHeartbeat != nil {
// at least one active heartbeat, return all pids containing inactive messages
return instance.inactivePids
}

message, found := instance.activeHeartbeats[instance.activePids[0]]
if !found {
return nil, errInconsistentActiveMap
result := make([]core.PeerID, 0, len(instance.inactivePids))
for _, pid := range instance.inactivePids {
if pid == instance.latestInactivePid {
continue
}

result = append(result, pid)
}

return message, nil
return result
}
51 changes: 9 additions & 42 deletions heartbeat/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ func checkArgs(args ArgHeartbeatV2Monitor) error {
// GetHeartbeats returns the heartbeat status
func (monitor *heartbeatV2Monitor) GetHeartbeats() []data.PubKeyHeartbeat {
heartbeatMessagesMap := monitor.processRAWDataFromCache()
monitor.removeFromInactiveNodes(heartbeatMessagesMap)

heartbeatsV2 := make([]data.PubKeyHeartbeat, 0)
for _, heartbeatMessagesInstance := range heartbeatMessagesMap {
Expand All @@ -100,8 +99,10 @@ func (monitor *heartbeatV2Monitor) GetHeartbeats() []data.PubKeyHeartbeat {
continue
}

message.NumInstances = uint64(len(heartbeatMessagesInstance.activeHeartbeats))
message.NumInstances = heartbeatMessagesInstance.numActivePids
heartbeatsV2 = append(heartbeatsV2, *message)

monitor.removeInactive(heartbeatMessagesInstance.getInactivePids())
}

sort.Slice(heartbeatsV2, func(i, j int) bool {
Expand All @@ -111,6 +112,12 @@ func (monitor *heartbeatV2Monitor) GetHeartbeats() []data.PubKeyHeartbeat {
return heartbeatsV2
}

func (monitor *heartbeatV2Monitor) removeInactive(pids []core.PeerID) {
for _, pid := range pids {
monitor.cache.Remove([]byte(pid))
}
}

func (monitor *heartbeatV2Monitor) processRAWDataFromCache() map[string]*heartbeatMessages {
pids := monitor.cache.Keys()

Expand Down Expand Up @@ -142,46 +149,6 @@ func (monitor *heartbeatV2Monitor) processRAWDataFromCache() map[string]*heartbe
return heartbeatsV2
}

func (monitor *heartbeatV2Monitor) removeFromInactiveNodes(heartbeatsV2 map[string]*heartbeatMessages) {
for _, heartbeatMessagesInstance := range heartbeatsV2 {
if len(heartbeatMessagesInstance.activeHeartbeats) > 0 {
// at least one active heartbeat message found, old inactive messages should be discarded
monitor.removeAllInactiveMessages(heartbeatMessagesInstance)
continue
}

monitor.removeInactiveMessagesExceptLatest(heartbeatMessagesInstance)
}
}

func (monitor *heartbeatV2Monitor) removeAllInactiveMessages(heartbeatMessagesInstance *heartbeatMessages) {
for pid := range heartbeatMessagesInstance.inactiveHeartbeats {
monitor.cache.Remove([]byte(pid))
}

heartbeatMessagesInstance.inactiveHeartbeats = make(map[core.PeerID]*data.PubKeyHeartbeat)
}

func (monitor *heartbeatV2Monitor) removeInactiveMessagesExceptLatest(heartbeatMessagesInstance *heartbeatMessages) {
latestPid := core.PeerID("")
latestTimeStamp := int64(0)
for pid, heartbeatMessage := range heartbeatMessagesInstance.inactiveHeartbeats {
if heartbeatMessage.TimeStamp.Unix() > latestTimeStamp {
latestTimeStamp = heartbeatMessage.TimeStamp.Unix()
latestPid = pid
}
}

for pid := range heartbeatMessagesInstance.inactiveHeartbeats {
if pid == latestPid {
continue
}

monitor.cache.Remove([]byte(pid))
delete(heartbeatMessagesInstance.inactiveHeartbeats, pid)
}
}

func (monitor *heartbeatV2Monitor) parseMessage(pid core.PeerID, message interface{}) (*data.PubKeyHeartbeat, error) {
heartbeatV2, ok := message.(*heartbeat.HeartbeatV2)
if !ok {
Expand Down
28 changes: 26 additions & 2 deletions heartbeat/monitor/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ func TestHeartbeatV2Monitor_GetHeartbeats(t *testing.T) {
assert.Equal(t, 0, len(heartbeats))
assert.Equal(t, 0, args.Cache.Len())
})
t.Run("should work", func(t *testing.T) {
t.Run("should work with 2 public keys on different pids and one public key", func(t *testing.T) {
t.Parallel()
args := createMockHeartbeatV2MonitorArgs()
providedStatuses := []bool{true, true, true}
Expand Down Expand Up @@ -418,7 +418,31 @@ func TestHeartbeatV2Monitor_GetHeartbeats(t *testing.T) {
}
checkResults(t, providedMessages[i], heartbeats[i], providedStatuses[i], providedPids, numInstances)
}
assert.Equal(t, 1, len(providedPids)) // 1 inactive was removed from the heartbeat list
assert.Equal(t, 1, len(providedPids)) // 1 active was removed from the heartbeat list
})
t.Run("should choose the 'smaller' pid if multiple active messages are found (sort should work)", func(t *testing.T) {
t.Parallel()
args := createMockHeartbeatV2MonitorArgs()
providedStatuses := []bool{true, true, true}
numOfMessages := len(providedStatuses)
providedPids := make(map[string]struct{}, numOfMessages)
providedMessages := make([]*heartbeat.HeartbeatV2, numOfMessages)
for i := numOfMessages - 1; i >= 0; i-- {
pid := core.PeerID(fmt.Sprintf("%s%d", "pid", i))
providedPids[pid.Pretty()] = struct{}{}

pkBytes := []byte("same pk")
providedMessages[i] = createHeartbeatMessage(providedStatuses[i], pkBytes)

args.Cache.Put(pid.Bytes(), providedMessages[i], providedMessages[i].Size())
}

monitor, _ := NewHeartbeatV2Monitor(args)
assert.False(t, check.IfNil(monitor))

heartbeats := monitor.GetHeartbeats()
assert.Equal(t, 1, len(heartbeats))
checkResults(t, providedMessages[0], heartbeats[0], providedStatuses[0], providedPids, 3)
})
}

Expand Down

0 comments on commit 4fe2d19

Please sign in to comment.