From 557574573d895f5055e0a52a3f5b4020d337745d Mon Sep 17 00:00:00 2001 From: jules01 Date: Fri, 21 Oct 2022 17:20:48 +0300 Subject: [PATCH 1/3] - made the heartbeat v2 monitor struct better filter-out inactive messages --- heartbeat/monitor/errors.go | 9 ++ heartbeat/monitor/heartbeatMessage.go | 62 +++++++++++++ heartbeat/monitor/monitor.go | 96 +++++++++++++++----- heartbeat/monitor/monitor_test.go | 122 ++++++++++++++++++++------ 4 files changed, 241 insertions(+), 48 deletions(-) create mode 100644 heartbeat/monitor/errors.go create mode 100644 heartbeat/monitor/heartbeatMessage.go diff --git a/heartbeat/monitor/errors.go b/heartbeat/monitor/errors.go new file mode 100644 index 00000000000..f88f18f373e --- /dev/null +++ b/heartbeat/monitor/errors.go @@ -0,0 +1,9 @@ +package monitor + +import "errors" + +var ( + errEmptyHeartbeatMessagesInstance = errors.New("programming error: empty heartbeatMessages instance") + errInconsistentActivePidsList = errors.New("programming error: inconsistent active pids list") + errInconsistentActiveMap = errors.New("programming error: inconsistent active map") +) diff --git a/heartbeat/monitor/heartbeatMessage.go b/heartbeat/monitor/heartbeatMessage.go new file mode 100644 index 00000000000..a7c05bd6af4 --- /dev/null +++ b/heartbeat/monitor/heartbeatMessage.go @@ -0,0 +1,62 @@ +package monitor + +import ( + "sort" + + "github.com/ElrondNetwork/elrond-go-core/core" + "github.com/ElrondNetwork/elrond-go/heartbeat/data" +) + +type heartbeatMessages struct { + activePids []core.PeerID + activeHeartbeats map[core.PeerID]*data.PubKeyHeartbeat + inactiveHeartbeats map[core.PeerID]*data.PubKeyHeartbeat +} + +func newHeartbeatMessages() *heartbeatMessages { + return &heartbeatMessages{ + activePids: make([]core.PeerID, 0), + activeHeartbeats: make(map[core.PeerID]*data.PubKeyHeartbeat), + inactiveHeartbeats: make(map[core.PeerID]*data.PubKeyHeartbeat), + } +} + +func (instance *heartbeatMessages) addMessage(pid core.PeerID, message *data.PubKeyHeartbeat) { + if message.IsActive { + instance.activePids = append(instance.activePids, pid) + instance.activeHeartbeats[pid] = message + + return + } + + instance.inactiveHeartbeats[pid] = message +} + +func (instance *heartbeatMessages) getHeartbeat() (*data.PubKeyHeartbeat, error) { + if len(instance.activeHeartbeats) > 0 { + return instance.getFirstActive() + } + if len(instance.inactiveHeartbeats) > 0 { + for _, inactive := range instance.inactiveHeartbeats { + return inactive, nil + } + } + return nil, errEmptyHeartbeatMessagesInstance +} + +func (instance *heartbeatMessages) getFirstActive() (*data.PubKeyHeartbeat, error) { + sort.Slice(instance.activePids, func(i, j int) bool { + return string(instance.activePids[i]) < string(instance.activePids[j]) + }) + + if len(instance.activePids) == 0 { + return nil, errInconsistentActivePidsList + } + + message, found := instance.activeHeartbeats[instance.activePids[0]] + if !found { + return nil, errInconsistentActiveMap + } + + return message, nil +} diff --git a/heartbeat/monitor/monitor.go b/heartbeat/monitor/monitor.go index 6db49ce35c2..8bdee9230f8 100644 --- a/heartbeat/monitor/monitor.go +++ b/heartbeat/monitor/monitor.go @@ -89,11 +89,32 @@ func checkArgs(args ArgHeartbeatV2Monitor) error { // GetHeartbeats returns the heartbeat status func (monitor *heartbeatV2Monitor) GetHeartbeats() []data.PubKeyHeartbeat { - numInstances := make(map[string]uint64) + heartbeatMessagesMap := monitor.processRAWDataFromCache() + monitor.removeFromInactiveNodes(heartbeatMessagesMap) + heartbeatsV2 := make([]data.PubKeyHeartbeat, 0) + for _, heartbeatMessagesInstance := range heartbeatMessagesMap { + message, err := heartbeatMessagesInstance.getHeartbeat() + if err != nil { + log.Warn("heartbeatV2Monitor.GetHeartbeats", "error", err) + continue + } + + message.NumInstances = uint64(len(heartbeatMessagesInstance.activeHeartbeats)) + heartbeatsV2 = append(heartbeatsV2, *message) + } + + sort.Slice(heartbeatsV2, func(i, j int) bool { + return strings.Compare(heartbeatsV2[i].PublicKey, heartbeatsV2[j].PublicKey) < 0 + }) + + return heartbeatsV2 +} + +func (monitor *heartbeatV2Monitor) processRAWDataFromCache() map[string]*heartbeatMessages { pids := monitor.cache.Keys() - heartbeatsV2 := make([]data.PubKeyHeartbeat, 0) + heartbeatsV2 := make(map[string]*heartbeatMessages) for idx := 0; idx < len(pids); idx++ { pid := pids[idx] hb, ok := monitor.cache.Get(pid) @@ -102,41 +123,75 @@ func (monitor *heartbeatV2Monitor) GetHeartbeats() []data.PubKeyHeartbeat { } peerId := core.PeerID(pid) - heartbeatData, err := monitor.parseMessage(peerId, hb, numInstances) + heartbeatData, err := monitor.parseMessage(peerId, hb) if err != nil { monitor.cache.Remove(pid) log.Trace("could not parse message for pid, removed message", "pid", peerId.Pretty(), "error", err.Error()) continue } - heartbeatsV2 = append(heartbeatsV2, heartbeatData) + heartbeatMessagesInstance, found := heartbeatsV2[heartbeatData.PublicKey] + if !found { + heartbeatMessagesInstance = newHeartbeatMessages() + heartbeatsV2[heartbeatData.PublicKey] = heartbeatMessagesInstance + } + + heartbeatMessagesInstance.addMessage(peerId, heartbeatData) } - for idx := range heartbeatsV2 { - hbData := &heartbeatsV2[idx] - pk := hbData.PublicKey - hbData.NumInstances = numInstances[pk] + return heartbeatsV2 +} + +func (monitor *heartbeatV2Monitor) removeFromInactiveNodes(heartbeatsV2 map[string]*heartbeatMessages) { + for _, heartbeatMessagesInstance := range heartbeatsV2 { + if len(heartbeatMessagesInstance.activeHeartbeats) > 0 { + // at least one active heartbeat message found, old inactive messages should be discarded + monitor.removeAllInactiveMessages(heartbeatMessagesInstance) + continue + } + + monitor.removeInactiveMessagesExceptLatest(heartbeatMessagesInstance) } +} - sort.Slice(heartbeatsV2, func(i, j int) bool { - return strings.Compare(heartbeatsV2[i].PublicKey, heartbeatsV2[j].PublicKey) < 0 - }) +func (monitor *heartbeatV2Monitor) removeAllInactiveMessages(heartbeatMessagesInstance *heartbeatMessages) { + for pid := range heartbeatMessagesInstance.inactiveHeartbeats { + monitor.cache.Remove([]byte(pid)) + } - return heartbeatsV2 + heartbeatMessagesInstance.inactiveHeartbeats = make(map[core.PeerID]*data.PubKeyHeartbeat) } -func (monitor *heartbeatV2Monitor) parseMessage(pid core.PeerID, message interface{}, numInstances map[string]uint64) (data.PubKeyHeartbeat, error) { - pubKeyHeartbeat := data.PubKeyHeartbeat{} +func (monitor *heartbeatV2Monitor) removeInactiveMessagesExceptLatest(heartbeatMessagesInstance *heartbeatMessages) { + latestPid := core.PeerID("") + latestTimeStamp := int64(0) + for pid, heartbeatMessage := range heartbeatMessagesInstance.inactiveHeartbeats { + if heartbeatMessage.TimeStamp.Unix() > latestTimeStamp { + latestTimeStamp = heartbeatMessage.TimeStamp.Unix() + latestPid = pid + } + } + + for pid := range heartbeatMessagesInstance.inactiveHeartbeats { + if pid == latestPid { + continue + } + monitor.cache.Remove([]byte(pid)) + delete(heartbeatMessagesInstance.inactiveHeartbeats, pid) + } +} + +func (monitor *heartbeatV2Monitor) parseMessage(pid core.PeerID, message interface{}) (*data.PubKeyHeartbeat, error) { heartbeatV2, ok := message.(*heartbeat.HeartbeatV2) if !ok { - return pubKeyHeartbeat, process.ErrWrongTypeAssertion + return nil, process.ErrWrongTypeAssertion } - payload := heartbeat.Payload{} - err := monitor.marshaller.Unmarshal(&payload, heartbeatV2.Payload) + payload := &heartbeat.Payload{} + err := monitor.marshaller.Unmarshal(payload, heartbeatV2.Payload) if err != nil { - return pubKeyHeartbeat, err + return nil, err } crtTime := time.Now() @@ -144,13 +199,12 @@ func (monitor *heartbeatV2Monitor) parseMessage(pid core.PeerID, message interfa messageAge := monitor.getMessageAge(crtTime, messageTime) computedShardID, stringType := monitor.computePeerTypeAndShardID(heartbeatV2) if monitor.shouldSkipMessage(messageAge) { - return pubKeyHeartbeat, fmt.Errorf("%w, messageAge %v", heartbeat.ErrShouldSkipValidator, messageAge) + return nil, fmt.Errorf("%w, messageAge %v", heartbeat.ErrShouldSkipValidator, messageAge) } pkHexString := monitor.pubKeyConverter.Encode(heartbeatV2.GetPubkey()) - numInstances[pkHexString]++ - pubKeyHeartbeat = data.PubKeyHeartbeat{ + pubKeyHeartbeat := &data.PubKeyHeartbeat{ PublicKey: pkHexString, TimeStamp: messageTime, IsActive: monitor.isActive(messageAge), diff --git a/heartbeat/monitor/monitor_test.go b/heartbeat/monitor/monitor_test.go index 0a601fc5f90..2a856ba83e3 100644 --- a/heartbeat/monitor/monitor_test.go +++ b/heartbeat/monitor/monitor_test.go @@ -144,7 +144,7 @@ func TestHeartbeatV2Monitor_parseMessage(t *testing.T) { monitor, _ := NewHeartbeatV2Monitor(args) assert.False(t, check.IfNil(monitor)) - _, err := monitor.parseMessage("pid", "dummy msg", nil) + _, err := monitor.parseMessage("pid", "dummy msg") assert.Equal(t, process.ErrWrongTypeAssertion, err) }) t.Run("unmarshal returns error", func(t *testing.T) { @@ -156,7 +156,7 @@ func TestHeartbeatV2Monitor_parseMessage(t *testing.T) { message := createHeartbeatMessage(true, []byte("public key")) message.Payload = []byte("dummy payload") - _, err := monitor.parseMessage("pid", message, nil) + _, err := monitor.parseMessage("pid", message) assert.NotNil(t, err) }) t.Run("skippable message should return error", func(t *testing.T) { @@ -167,7 +167,7 @@ func TestHeartbeatV2Monitor_parseMessage(t *testing.T) { assert.False(t, check.IfNil(monitor)) message := createHeartbeatMessage(false, []byte("public key")) - _, err := monitor.parseMessage("pid", message, nil) + _, err := monitor.parseMessage("pid", message) assert.True(t, errors.Is(err, heartbeat.ErrShouldSkipValidator)) }) t.Run("should work, peer type provider returns error", func(t *testing.T) { @@ -184,26 +184,17 @@ func TestHeartbeatV2Monitor_parseMessage(t *testing.T) { monitor, _ := NewHeartbeatV2Monitor(args) assert.False(t, check.IfNil(monitor)) - numInstances := make(map[string]uint64) message := createHeartbeatMessage(true, providedPkBytes) message.Pubkey = providedPkBytesFromMessage providedPid := core.PeerID("pid") providedMap := map[string]struct{}{ providedPid.Pretty(): {}, } - hb, err := monitor.parseMessage(providedPid, message, numInstances) + hb, err := monitor.parseMessage(providedPid, message) assert.Nil(t, err) - checkResults(t, *message, hb, true, providedMap, 0) + checkResults(t, message, *hb, true, providedMap, 0) assert.Equal(t, 0, len(providedMap)) - pkFromMsg := args.PubKeyConverter.Encode(providedPkBytesFromMessage) - entries, ok := numInstances[pkFromMsg] - assert.True(t, ok) - assert.Equal(t, uint64(1), entries) assert.Equal(t, string(common.ObserverList), hb.PeerType) - - pkFromPSM := args.PubKeyConverter.Encode(providedPkBytes) - _, ok = numInstances[pkFromPSM] - assert.False(t, ok) }) t.Run("should work", func(t *testing.T) { t.Parallel() @@ -219,20 +210,15 @@ func TestHeartbeatV2Monitor_parseMessage(t *testing.T) { monitor, _ := NewHeartbeatV2Monitor(args) assert.False(t, check.IfNil(monitor)) - numInstances := make(map[string]uint64) message := createHeartbeatMessage(true, providedPkBytes) providedPid := core.PeerID("pid") providedMap := map[string]struct{}{ providedPid.Pretty(): {}, } - hb, err := monitor.parseMessage(providedPid, message, numInstances) + hb, err := monitor.parseMessage(providedPid, message) assert.Nil(t, err) - checkResults(t, *message, hb, true, providedMap, 0) + checkResults(t, message, *hb, true, providedMap, 0) assert.Equal(t, 0, len(providedMap)) - pk := args.PubKeyConverter.Encode(providedPkBytes) - entries, ok := numInstances[pk] - assert.True(t, ok) - assert.Equal(t, uint64(1), entries) assert.Equal(t, string(expectedPeerType), hb.PeerType) }) } @@ -311,10 +297,92 @@ func TestHeartbeatV2Monitor_GetHeartbeats(t *testing.T) { assert.Equal(t, len(providedStatuses)-1, len(heartbeats)) assert.Equal(t, len(providedStatuses)-1, args.Cache.Len()) // faulty message was removed from cache for i := 0; i < len(heartbeats); i++ { - checkResults(t, *providedMessages[i], heartbeats[i], providedStatuses[i], providedPids, 1) + checkResults(t, providedMessages[i], heartbeats[i], providedStatuses[i], providedPids, 1) } assert.Equal(t, 1, len(providedPids)) // one message is skipped }) + t.Run("should remove all inactive messages except from the latest message", func(t *testing.T) { + t.Parallel() + args := createMockHeartbeatV2MonitorArgs() + args.HideInactiveValidatorInterval = time.Minute + providedStatuses := []bool{false, false, false} + numOfMessages := len(providedStatuses) + providedPids := make(map[string]struct{}, numOfMessages) + providedMessages := make([]*heartbeat.HeartbeatV2, numOfMessages) + for i := 0; i < numOfMessages; i++ { + pid := core.PeerID(fmt.Sprintf("%s%d", "pid", i)) + providedPids[pid.Pretty()] = struct{}{} + pkBytes := []byte("same pk") + + providedMessages[i] = createHeartbeatMessage(providedStatuses[i], pkBytes) + payload := heartbeat.Payload{ + Timestamp: time.Now().Unix() - 30 + int64(i), // the last message will be the latest, so it will be returned + } + + marshaller := testscommon.MarshalizerMock{} + payloadBytes, _ := marshaller.Marshal(payload) + providedMessages[i].Payload = payloadBytes + + args.Cache.Put(pid.Bytes(), providedMessages[i], providedMessages[i].Size()) + } + + monitor, _ := NewHeartbeatV2Monitor(args) + assert.False(t, check.IfNil(monitor)) + + heartbeats := monitor.GetHeartbeats() + assert.Equal(t, 1, len(heartbeats)) + checkResults(t, providedMessages[2], heartbeats[0], providedStatuses[2], providedPids, 0) + assert.Equal(t, 2, len(providedPids)) // 1 inactive was removed from the heartbeat list + }) + t.Run("should remove all inactive messages if one is active", func(t *testing.T) { + t.Parallel() + args := createMockHeartbeatV2MonitorArgs() + args.HideInactiveValidatorInterval = time.Minute + providedStatuses := []bool{true, false, false} + numOfMessages := len(providedStatuses) + providedPids := make(map[string]struct{}, numOfMessages) + providedMessages := make([]*heartbeat.HeartbeatV2, numOfMessages) + for i := 0; i < numOfMessages; i++ { + pid := core.PeerID(fmt.Sprintf("%s%d", "pid", i)) + providedPids[pid.Pretty()] = struct{}{} + pkBytes := []byte("same pk") + + providedMessages[i] = createHeartbeatMessage(providedStatuses[i], pkBytes) + if i != 0 { + // 1, 2 ... are inactive messages + payload := heartbeat.Payload{ + Timestamp: time.Now().Unix() - 30 + int64(i), // the last message will be the latest, so it will be returned + } + + marshaller := testscommon.MarshalizerMock{} + payloadBytes, _ := marshaller.Marshal(payload) + providedMessages[i].Payload = payloadBytes + } + + args.Cache.Put(pid.Bytes(), providedMessages[i], providedMessages[i].Size()) + } + + monitor, _ := NewHeartbeatV2Monitor(args) + assert.False(t, check.IfNil(monitor)) + + heartbeats := monitor.GetHeartbeats() + assert.Equal(t, 1, len(heartbeats)) + checkResults(t, providedMessages[0], heartbeats[0], providedStatuses[0], providedPids, 1) + assert.Equal(t, 2, len(providedPids)) // 1 inactive was removed from the heartbeat list + }) + t.Run("nil message in cache should remove", func(t *testing.T) { + t.Parallel() + args := createMockHeartbeatV2MonitorArgs() + args.HideInactiveValidatorInterval = time.Minute + args.Cache.Put([]byte("pid"), nil, 0) + + monitor, _ := NewHeartbeatV2Monitor(args) + assert.False(t, check.IfNil(monitor)) + + heartbeats := monitor.GetHeartbeats() + assert.Equal(t, 0, len(heartbeats)) + assert.Equal(t, 0, args.Cache.Len()) + }) t.Run("should work", func(t *testing.T) { t.Parallel() args := createMockHeartbeatV2MonitorArgs() @@ -340,19 +408,19 @@ func TestHeartbeatV2Monitor_GetHeartbeats(t *testing.T) { assert.False(t, check.IfNil(monitor)) heartbeats := monitor.GetHeartbeats() - assert.Equal(t, args.Cache.Len(), len(heartbeats)) - for i := 0; i < numOfMessages; i++ { + assert.Equal(t, 2, len(heartbeats)) // 2 have the same pk + for i := 0; i < 2; i++ { numInstances := uint64(1) if i > 0 { numInstances = 2 } - checkResults(t, *providedMessages[i], heartbeats[i], providedStatuses[i], providedPids, numInstances) + checkResults(t, providedMessages[i], heartbeats[i], providedStatuses[i], providedPids, numInstances) } - assert.Equal(t, 0, len(providedPids)) + assert.Equal(t, 1, len(providedPids)) // 1 inactive was removed from the heartbeat list }) } -func checkResults(t *testing.T, message heartbeat.HeartbeatV2, hb data.PubKeyHeartbeat, isActive bool, providedPids map[string]struct{}, numInstances uint64) { +func checkResults(t *testing.T, message *heartbeat.HeartbeatV2, hb data.PubKeyHeartbeat, isActive bool, providedPids map[string]struct{}, numInstances uint64) { assert.Equal(t, isActive, hb.IsActive) assert.Equal(t, message.VersionNumber, hb.VersionNumber) assert.Equal(t, message.NodeDisplayName, hb.NodeDisplayName) From d8a073084e523adf45d2c757aeea5c3609013110 Mon Sep 17 00:00:00 2001 From: jules01 Date: Fri, 21 Oct 2022 17:30:59 +0300 Subject: [PATCH 2/3] - added extra cache checks --- heartbeat/monitor/monitor_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/heartbeat/monitor/monitor_test.go b/heartbeat/monitor/monitor_test.go index 2a856ba83e3..9b8ecdd8e87 100644 --- a/heartbeat/monitor/monitor_test.go +++ b/heartbeat/monitor/monitor_test.go @@ -331,6 +331,7 @@ func TestHeartbeatV2Monitor_GetHeartbeats(t *testing.T) { heartbeats := monitor.GetHeartbeats() assert.Equal(t, 1, len(heartbeats)) + assert.Equal(t, 1, args.Cache.Len()) checkResults(t, providedMessages[2], heartbeats[0], providedStatuses[2], providedPids, 0) assert.Equal(t, 2, len(providedPids)) // 1 inactive was removed from the heartbeat list }) @@ -367,6 +368,7 @@ func TestHeartbeatV2Monitor_GetHeartbeats(t *testing.T) { heartbeats := monitor.GetHeartbeats() assert.Equal(t, 1, len(heartbeats)) + assert.Equal(t, 1, args.Cache.Len()) checkResults(t, providedMessages[0], heartbeats[0], providedStatuses[0], providedPids, 1) assert.Equal(t, 2, len(providedPids)) // 1 inactive was removed from the heartbeat list }) From 4fe2d196cc125082b5ad735c0fa2ca453903e210 Mon Sep 17 00:00:00 2001 From: jules01 Date: Fri, 21 Oct 2022 20:19:48 +0300 Subject: [PATCH 3/3] - fix after review: code optimization --- heartbeat/monitor/errors.go | 2 - heartbeat/monitor/heartbeatMessage.go | 84 +++++++++++++++++---------- heartbeat/monitor/monitor.go | 51 +++------------- heartbeat/monitor/monitor_test.go | 28 ++++++++- 4 files changed, 89 insertions(+), 76 deletions(-) diff --git a/heartbeat/monitor/errors.go b/heartbeat/monitor/errors.go index f88f18f373e..30aef5e971b 100644 --- a/heartbeat/monitor/errors.go +++ b/heartbeat/monitor/errors.go @@ -4,6 +4,4 @@ import "errors" var ( errEmptyHeartbeatMessagesInstance = errors.New("programming error: empty heartbeatMessages instance") - errInconsistentActivePidsList = errors.New("programming error: inconsistent active pids list") - errInconsistentActiveMap = errors.New("programming error: inconsistent active map") ) diff --git a/heartbeat/monitor/heartbeatMessage.go b/heartbeat/monitor/heartbeatMessage.go index a7c05bd6af4..aa376ab3da2 100644 --- a/heartbeat/monitor/heartbeatMessage.go +++ b/heartbeat/monitor/heartbeatMessage.go @@ -1,62 +1,86 @@ package monitor import ( - "sort" - "github.com/ElrondNetwork/elrond-go-core/core" "github.com/ElrondNetwork/elrond-go/heartbeat/data" ) type heartbeatMessages struct { - activePids []core.PeerID - activeHeartbeats map[core.PeerID]*data.PubKeyHeartbeat - inactiveHeartbeats map[core.PeerID]*data.PubKeyHeartbeat + firstActivePid core.PeerID + firstActiveHeartbeat *data.PubKeyHeartbeat + numActivePids uint64 + latestInactivePid core.PeerID + latestInactiveHeartbeat *data.PubKeyHeartbeat + inactivePids []core.PeerID } func newHeartbeatMessages() *heartbeatMessages { - return &heartbeatMessages{ - activePids: make([]core.PeerID, 0), - activeHeartbeats: make(map[core.PeerID]*data.PubKeyHeartbeat), - inactiveHeartbeats: make(map[core.PeerID]*data.PubKeyHeartbeat), - } + return &heartbeatMessages{} } func (instance *heartbeatMessages) addMessage(pid core.PeerID, message *data.PubKeyHeartbeat) { if message.IsActive { - instance.activePids = append(instance.activePids, pid) - instance.activeHeartbeats[pid] = message + instance.handleActiveMessage(pid, message) + return + } + + instance.handleInactiveMessage(pid, message) +} + +func (instance *heartbeatMessages) handleActiveMessage(pid core.PeerID, message *data.PubKeyHeartbeat) { + instance.numActivePids++ + if len(instance.firstActivePid) == 0 { + instance.firstActivePid = pid + instance.firstActiveHeartbeat = message return } - instance.inactiveHeartbeats[pid] = message + if string(instance.firstActivePid) > string(pid) { + instance.firstActivePid = pid + instance.firstActiveHeartbeat = message + } +} + +func (instance *heartbeatMessages) handleInactiveMessage(pid core.PeerID, message *data.PubKeyHeartbeat) { + instance.inactivePids = append(instance.inactivePids, pid) + + if instance.latestInactiveHeartbeat == nil { + instance.latestInactivePid = pid + instance.latestInactiveHeartbeat = message + return + } + + if message.TimeStamp.Unix() > instance.latestInactiveHeartbeat.TimeStamp.Unix() { + instance.latestInactivePid = pid + instance.latestInactiveHeartbeat = message + } } func (instance *heartbeatMessages) getHeartbeat() (*data.PubKeyHeartbeat, error) { - if len(instance.activeHeartbeats) > 0 { - return instance.getFirstActive() + if instance.firstActiveHeartbeat != nil { + return instance.firstActiveHeartbeat, nil } - if len(instance.inactiveHeartbeats) > 0 { - for _, inactive := range instance.inactiveHeartbeats { - return inactive, nil - } + if instance.latestInactiveHeartbeat != nil { + return instance.latestInactiveHeartbeat, nil } return nil, errEmptyHeartbeatMessagesInstance } -func (instance *heartbeatMessages) getFirstActive() (*data.PubKeyHeartbeat, error) { - sort.Slice(instance.activePids, func(i, j int) bool { - return string(instance.activePids[i]) < string(instance.activePids[j]) - }) - - if len(instance.activePids) == 0 { - return nil, errInconsistentActivePidsList +func (instance *heartbeatMessages) getInactivePids() []core.PeerID { + if instance.firstActiveHeartbeat != nil { + // at least one active heartbeat, return all pids containing inactive messages + return instance.inactivePids } - message, found := instance.activeHeartbeats[instance.activePids[0]] - if !found { - return nil, errInconsistentActiveMap + result := make([]core.PeerID, 0, len(instance.inactivePids)) + for _, pid := range instance.inactivePids { + if pid == instance.latestInactivePid { + continue + } + + result = append(result, pid) } - return message, nil + return result } diff --git a/heartbeat/monitor/monitor.go b/heartbeat/monitor/monitor.go index 8bdee9230f8..bb6af9ba559 100644 --- a/heartbeat/monitor/monitor.go +++ b/heartbeat/monitor/monitor.go @@ -90,7 +90,6 @@ func checkArgs(args ArgHeartbeatV2Monitor) error { // GetHeartbeats returns the heartbeat status func (monitor *heartbeatV2Monitor) GetHeartbeats() []data.PubKeyHeartbeat { heartbeatMessagesMap := monitor.processRAWDataFromCache() - monitor.removeFromInactiveNodes(heartbeatMessagesMap) heartbeatsV2 := make([]data.PubKeyHeartbeat, 0) for _, heartbeatMessagesInstance := range heartbeatMessagesMap { @@ -100,8 +99,10 @@ func (monitor *heartbeatV2Monitor) GetHeartbeats() []data.PubKeyHeartbeat { continue } - message.NumInstances = uint64(len(heartbeatMessagesInstance.activeHeartbeats)) + message.NumInstances = heartbeatMessagesInstance.numActivePids heartbeatsV2 = append(heartbeatsV2, *message) + + monitor.removeInactive(heartbeatMessagesInstance.getInactivePids()) } sort.Slice(heartbeatsV2, func(i, j int) bool { @@ -111,6 +112,12 @@ func (monitor *heartbeatV2Monitor) GetHeartbeats() []data.PubKeyHeartbeat { return heartbeatsV2 } +func (monitor *heartbeatV2Monitor) removeInactive(pids []core.PeerID) { + for _, pid := range pids { + monitor.cache.Remove([]byte(pid)) + } +} + func (monitor *heartbeatV2Monitor) processRAWDataFromCache() map[string]*heartbeatMessages { pids := monitor.cache.Keys() @@ -142,46 +149,6 @@ func (monitor *heartbeatV2Monitor) processRAWDataFromCache() map[string]*heartbe return heartbeatsV2 } -func (monitor *heartbeatV2Monitor) removeFromInactiveNodes(heartbeatsV2 map[string]*heartbeatMessages) { - for _, heartbeatMessagesInstance := range heartbeatsV2 { - if len(heartbeatMessagesInstance.activeHeartbeats) > 0 { - // at least one active heartbeat message found, old inactive messages should be discarded - monitor.removeAllInactiveMessages(heartbeatMessagesInstance) - continue - } - - monitor.removeInactiveMessagesExceptLatest(heartbeatMessagesInstance) - } -} - -func (monitor *heartbeatV2Monitor) removeAllInactiveMessages(heartbeatMessagesInstance *heartbeatMessages) { - for pid := range heartbeatMessagesInstance.inactiveHeartbeats { - monitor.cache.Remove([]byte(pid)) - } - - heartbeatMessagesInstance.inactiveHeartbeats = make(map[core.PeerID]*data.PubKeyHeartbeat) -} - -func (monitor *heartbeatV2Monitor) removeInactiveMessagesExceptLatest(heartbeatMessagesInstance *heartbeatMessages) { - latestPid := core.PeerID("") - latestTimeStamp := int64(0) - for pid, heartbeatMessage := range heartbeatMessagesInstance.inactiveHeartbeats { - if heartbeatMessage.TimeStamp.Unix() > latestTimeStamp { - latestTimeStamp = heartbeatMessage.TimeStamp.Unix() - latestPid = pid - } - } - - for pid := range heartbeatMessagesInstance.inactiveHeartbeats { - if pid == latestPid { - continue - } - - monitor.cache.Remove([]byte(pid)) - delete(heartbeatMessagesInstance.inactiveHeartbeats, pid) - } -} - func (monitor *heartbeatV2Monitor) parseMessage(pid core.PeerID, message interface{}) (*data.PubKeyHeartbeat, error) { heartbeatV2, ok := message.(*heartbeat.HeartbeatV2) if !ok { diff --git a/heartbeat/monitor/monitor_test.go b/heartbeat/monitor/monitor_test.go index 9b8ecdd8e87..424919bfdc5 100644 --- a/heartbeat/monitor/monitor_test.go +++ b/heartbeat/monitor/monitor_test.go @@ -385,7 +385,7 @@ func TestHeartbeatV2Monitor_GetHeartbeats(t *testing.T) { assert.Equal(t, 0, len(heartbeats)) assert.Equal(t, 0, args.Cache.Len()) }) - t.Run("should work", func(t *testing.T) { + t.Run("should work with 2 public keys on different pids and one public key", func(t *testing.T) { t.Parallel() args := createMockHeartbeatV2MonitorArgs() providedStatuses := []bool{true, true, true} @@ -418,7 +418,31 @@ func TestHeartbeatV2Monitor_GetHeartbeats(t *testing.T) { } checkResults(t, providedMessages[i], heartbeats[i], providedStatuses[i], providedPids, numInstances) } - assert.Equal(t, 1, len(providedPids)) // 1 inactive was removed from the heartbeat list + assert.Equal(t, 1, len(providedPids)) // 1 active was removed from the heartbeat list + }) + t.Run("should choose the 'smaller' pid if multiple active messages are found (sort should work)", func(t *testing.T) { + t.Parallel() + args := createMockHeartbeatV2MonitorArgs() + providedStatuses := []bool{true, true, true} + numOfMessages := len(providedStatuses) + providedPids := make(map[string]struct{}, numOfMessages) + providedMessages := make([]*heartbeat.HeartbeatV2, numOfMessages) + for i := numOfMessages - 1; i >= 0; i-- { + pid := core.PeerID(fmt.Sprintf("%s%d", "pid", i)) + providedPids[pid.Pretty()] = struct{}{} + + pkBytes := []byte("same pk") + providedMessages[i] = createHeartbeatMessage(providedStatuses[i], pkBytes) + + args.Cache.Put(pid.Bytes(), providedMessages[i], providedMessages[i].Size()) + } + + monitor, _ := NewHeartbeatV2Monitor(args) + assert.False(t, check.IfNil(monitor)) + + heartbeats := monitor.GetHeartbeats() + assert.Equal(t, 1, len(heartbeats)) + checkResults(t, providedMessages[0], heartbeats[0], providedStatuses[0], providedPids, 3) }) }