Skip to content

Commit

Permalink
Merge pull request #4626 from ElrondNetwork/fix-hb-v2-monitor-after-n…
Browse files Browse the repository at this point in the history
…odes-restart

Heartbeat v2 monitor better filtering
  • Loading branch information
iulianpascalau authored Oct 24, 2022
2 parents 821d981 + 8ff569c commit 949a212
Show file tree
Hide file tree
Showing 4 changed files with 259 additions and 51 deletions.
7 changes: 7 additions & 0 deletions heartbeat/monitor/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package monitor

import "errors"

var (
errEmptyHeartbeatMessagesInstance = errors.New("programming error: empty heartbeatMessages instance")
)
86 changes: 86 additions & 0 deletions heartbeat/monitor/heartbeatMessage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package monitor

import (
"github.com/ElrondNetwork/elrond-go-core/core"
"github.com/ElrondNetwork/elrond-go/heartbeat/data"
)

type heartbeatMessages struct {
firstActivePid core.PeerID
firstActiveHeartbeat *data.PubKeyHeartbeat
numActivePids uint64
latestInactivePid core.PeerID
latestInactiveHeartbeat *data.PubKeyHeartbeat
inactivePids []core.PeerID
}

func newHeartbeatMessages() *heartbeatMessages {
return &heartbeatMessages{}
}

func (instance *heartbeatMessages) addMessage(pid core.PeerID, message *data.PubKeyHeartbeat) {
if message.IsActive {
instance.handleActiveMessage(pid, message)
return
}

instance.handleInactiveMessage(pid, message)
}

func (instance *heartbeatMessages) handleActiveMessage(pid core.PeerID, message *data.PubKeyHeartbeat) {
instance.numActivePids++

if len(instance.firstActivePid) == 0 {
instance.firstActivePid = pid
instance.firstActiveHeartbeat = message
return
}

if string(instance.firstActivePid) > string(pid) {
instance.firstActivePid = pid
instance.firstActiveHeartbeat = message
}
}

func (instance *heartbeatMessages) handleInactiveMessage(pid core.PeerID, message *data.PubKeyHeartbeat) {
instance.inactivePids = append(instance.inactivePids, pid)

if instance.latestInactiveHeartbeat == nil {
instance.latestInactivePid = pid
instance.latestInactiveHeartbeat = message
return
}

if message.TimeStamp.Unix() > instance.latestInactiveHeartbeat.TimeStamp.Unix() {
instance.latestInactivePid = pid
instance.latestInactiveHeartbeat = message
}
}

func (instance *heartbeatMessages) getHeartbeat() (*data.PubKeyHeartbeat, error) {
if instance.firstActiveHeartbeat != nil {
return instance.firstActiveHeartbeat, nil
}
if instance.latestInactiveHeartbeat != nil {
return instance.latestInactiveHeartbeat, nil
}
return nil, errEmptyHeartbeatMessagesInstance
}

func (instance *heartbeatMessages) getInactivePids() []core.PeerID {
if instance.firstActiveHeartbeat != nil {
// at least one active heartbeat, return all pids containing inactive messages
return instance.inactivePids
}

result := make([]core.PeerID, 0, len(instance.inactivePids))
for _, pid := range instance.inactivePids {
if pid == instance.latestInactivePid {
continue
}

result = append(result, pid)
}

return result
}
67 changes: 44 additions & 23 deletions heartbeat/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,39 @@ func checkArgs(args ArgHeartbeatV2Monitor) error {

// GetHeartbeats returns the heartbeat status
func (monitor *heartbeatV2Monitor) GetHeartbeats() []data.PubKeyHeartbeat {
numInstances := make(map[string]uint64)
heartbeatMessagesMap := monitor.processRAWDataFromCache()

heartbeatsV2 := make([]data.PubKeyHeartbeat, 0)
for _, heartbeatMessagesInstance := range heartbeatMessagesMap {
message, err := heartbeatMessagesInstance.getHeartbeat()
if err != nil {
log.Warn("heartbeatV2Monitor.GetHeartbeats", "error", err)
continue
}

message.NumInstances = heartbeatMessagesInstance.numActivePids
heartbeatsV2 = append(heartbeatsV2, *message)

monitor.removeInactive(heartbeatMessagesInstance.getInactivePids())
}

sort.Slice(heartbeatsV2, func(i, j int) bool {
return strings.Compare(heartbeatsV2[i].PublicKey, heartbeatsV2[j].PublicKey) < 0
})

return heartbeatsV2
}

func (monitor *heartbeatV2Monitor) removeInactive(pids []core.PeerID) {
for _, pid := range pids {
monitor.cache.Remove([]byte(pid))
}
}

func (monitor *heartbeatV2Monitor) processRAWDataFromCache() map[string]*heartbeatMessages {
pids := monitor.cache.Keys()

heartbeatsV2 := make([]data.PubKeyHeartbeat, 0)
heartbeatsV2 := make(map[string]*heartbeatMessages)
for idx := 0; idx < len(pids); idx++ {
pid := pids[idx]
hb, ok := monitor.cache.Get(pid)
Expand All @@ -102,55 +130,48 @@ func (monitor *heartbeatV2Monitor) GetHeartbeats() []data.PubKeyHeartbeat {
}

peerId := core.PeerID(pid)
heartbeatData, err := monitor.parseMessage(peerId, hb, numInstances)
heartbeatData, err := monitor.parseMessage(peerId, hb)
if err != nil {
monitor.cache.Remove(pid)
log.Trace("could not parse message for pid, removed message", "pid", peerId.Pretty(), "error", err.Error())
continue
}

heartbeatsV2 = append(heartbeatsV2, heartbeatData)
}
heartbeatMessagesInstance, found := heartbeatsV2[heartbeatData.PublicKey]
if !found {
heartbeatMessagesInstance = newHeartbeatMessages()
heartbeatsV2[heartbeatData.PublicKey] = heartbeatMessagesInstance
}

for idx := range heartbeatsV2 {
hbData := &heartbeatsV2[idx]
pk := hbData.PublicKey
hbData.NumInstances = numInstances[pk]
heartbeatMessagesInstance.addMessage(peerId, heartbeatData)
}

sort.Slice(heartbeatsV2, func(i, j int) bool {
return strings.Compare(heartbeatsV2[i].PublicKey, heartbeatsV2[j].PublicKey) < 0
})

return heartbeatsV2
}

func (monitor *heartbeatV2Monitor) parseMessage(pid core.PeerID, message interface{}, numInstances map[string]uint64) (data.PubKeyHeartbeat, error) {
pubKeyHeartbeat := data.PubKeyHeartbeat{}

func (monitor *heartbeatV2Monitor) parseMessage(pid core.PeerID, message interface{}) (*data.PubKeyHeartbeat, error) {
heartbeatV2, ok := message.(*heartbeat.HeartbeatV2)
if !ok {
return pubKeyHeartbeat, process.ErrWrongTypeAssertion
return nil, process.ErrWrongTypeAssertion
}

payload := heartbeat.Payload{}
err := monitor.marshaller.Unmarshal(&payload, heartbeatV2.Payload)
payload := &heartbeat.Payload{}
err := monitor.marshaller.Unmarshal(payload, heartbeatV2.Payload)
if err != nil {
return pubKeyHeartbeat, err
return nil, err
}

crtTime := time.Now()
messageTime := time.Unix(payload.Timestamp, 0)
messageAge := monitor.getMessageAge(crtTime, messageTime)
computedShardID, stringType := monitor.computePeerTypeAndShardID(heartbeatV2)
if monitor.shouldSkipMessage(messageAge) {
return pubKeyHeartbeat, fmt.Errorf("%w, messageAge %v", heartbeat.ErrShouldSkipValidator, messageAge)
return nil, fmt.Errorf("%w, messageAge %v", heartbeat.ErrShouldSkipValidator, messageAge)
}

pkHexString := monitor.pubKeyConverter.Encode(heartbeatV2.GetPubkey())
numInstances[pkHexString]++

pubKeyHeartbeat = data.PubKeyHeartbeat{
pubKeyHeartbeat := &data.PubKeyHeartbeat{
PublicKey: pkHexString,
TimeStamp: messageTime,
IsActive: monitor.isActive(messageAge),
Expand Down
Loading

0 comments on commit 949a212

Please sign in to comment.