diff --git a/integrationTests/p2p/antiflood/nilQuotaStatusHandler.go b/integrationTests/p2p/antiflood/nilQuotaStatusHandler.go index 9e68cd794b7..af66bf30f54 100644 --- a/integrationTests/p2p/antiflood/nilQuotaStatusHandler.go +++ b/integrationTests/p2p/antiflood/nilQuotaStatusHandler.go @@ -11,6 +11,10 @@ func (nqsh *nilQuotaStatusHandler) ResetStatistics() { func (nqsh *nilQuotaStatusHandler) AddQuota(_ string, _ uint32, _ uint64, _ uint32, _ uint64) { } +// SetGlobalQuota is not implemented +func (nqsh *nilQuotaStatusHandler) SetGlobalQuota(_ uint32, _ uint64, _ uint32, _ uint64) { +} + // IsInterfaceNil returns true if there is no value under the interface func (nqsh *nilQuotaStatusHandler) IsInterfaceNil() bool { return nqsh == nil diff --git a/p2p/antiflood/p2pAntiflood.go b/p2p/antiflood/p2pAntiflood.go index dc61c8ca315..dd8bc75412e 100644 --- a/p2p/antiflood/p2pAntiflood.go +++ b/p2p/antiflood/p2pAntiflood.go @@ -34,7 +34,7 @@ func (af *p2pAntiflood) CanProcessMessage(message p2p.MessageP2P, fromConnectedP } //protect from directly connected peer - ok := floodPreventer.Increment(fromConnectedPeer.Pretty(), uint64(len(message.Data()))) + ok := floodPreventer.IncrementAddingToSum(fromConnectedPeer.Pretty(), uint64(len(message.Data()))) if !ok { return fmt.Errorf("%w in p2pAntiflood for connected peer", p2p.ErrSystemBusy) } diff --git a/p2p/antiflood/p2pAntiflood_test.go b/p2p/antiflood/p2pAntiflood_test.go index a59def23e02..6d1de450cf2 100644 --- a/p2p/antiflood/p2pAntiflood_test.go +++ b/p2p/antiflood/p2pAntiflood_test.go @@ -68,7 +68,7 @@ func TestP2pAntiflood_CanNotIncrementFromConnectedPeerShouldError(t *testing.T) FromField: messageOriginator, } afm, _ := antiflood.NewP2pAntiflood(&mock.FloodPreventerStub{ - IncrementCalled: func(identifier string, size uint64) bool { + IncrementAddingToSumCalled: func(identifier string, size uint64) bool { if identifier != fromConnectedPeer.Pretty() { assert.Fail(t, "should have been the connected peer") } @@ -92,16 +92,20 @@ func TestP2pAntiflood_CanNotIncrementMessageOriginatorShouldError(t *testing.T) PeerField: p2p.PeerID(messageOriginator), } afm, _ := antiflood.NewP2pAntiflood(&mock.FloodPreventerStub{ - IncrementCalled: func(identifier string, size uint64) bool { + IncrementAddingToSumCalled: func(identifier string, size uint64) bool { if identifier == fromConnectedPeer.Pretty() { return true } - if identifier != message.PeerField.Pretty() { - assert.Fail(t, "should have been the originator") - } return false }, + IncrementCalled: func(identifier string, size uint64) bool { + if identifier == message.PeerField.Pretty() { + return false + } + + return true + }, }) err := afm.CanProcessMessage(message, fromConnectedPeer) @@ -118,6 +122,9 @@ func TestP2pAntiflood_ShouldWork(t *testing.T) { PeerField: p2p.PeerID(messageOriginator), } afm, _ := antiflood.NewP2pAntiflood(&mock.FloodPreventerStub{ + IncrementAddingToSumCalled: func(identifier string, size uint64) bool { + return true + }, IncrementCalled: func(identifier string, size uint64) bool { return true }, diff --git a/p2p/mock/floodPreventerStub.go b/p2p/mock/floodPreventerStub.go index 6783c4afec7..27278e1aa96 100644 --- a/p2p/mock/floodPreventerStub.go +++ b/p2p/mock/floodPreventerStub.go @@ -1,8 +1,13 @@ package mock type FloodPreventerStub struct { - IncrementCalled func(identifier string, size uint64) bool - ResetCalled func() + IncrementAddingToSumCalled func(identifier string, size uint64) bool + IncrementCalled func(identifier string, size uint64) bool + ResetCalled func() +} + +func (fps *FloodPreventerStub) IncrementAddingToSum(identifier string, size uint64) bool { + return fps.IncrementAddingToSumCalled(identifier, size) } func (fps *FloodPreventerStub) Increment(identifier string, size uint64) bool { diff --git a/p2p/p2p.go b/p2p/p2p.go index 22766175f15..70ee40d007b 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -195,6 +195,7 @@ type PeerDiscoveryFactory interface { // FloodPreventer defines the behavior of a component that is able to signal that too many events occurred // on a provided identifier between Reset calls type FloodPreventer interface { + IncrementAddingToSum(identifier string, size uint64) bool Increment(identifier string, size uint64) bool Reset() IsInterfaceNil() bool diff --git a/process/interface.go b/process/interface.go index bf141d880f8..120267bc0a5 100644 --- a/process/interface.go +++ b/process/interface.go @@ -543,6 +543,7 @@ type InterceptedHeaderSigVerifier interface { // FloodPreventer defines the behavior of a component that is able to signal that too many events occurred // on a provided identifier between Reset calls type FloodPreventer interface { + IncrementAddingToSum(identifier string, size uint64) bool Increment(identifier string, size uint64) bool Reset() IsInterfaceNil() bool diff --git a/process/mock/quotaStatusHandlerStub.go b/process/mock/quotaStatusHandlerStub.go index 55c4bdbae75..ecdc10ba553 100644 --- a/process/mock/quotaStatusHandlerStub.go +++ b/process/mock/quotaStatusHandlerStub.go @@ -4,6 +4,7 @@ type QuotaStatusHandlerStub struct { ResetStatisticsCalled func() AddQuotaCalled func(identifier string, numReceivedMessages uint32, sizeReceivedMessages uint64, numProcessedMessages uint32, sizeProcessedMessages uint64) + SetGlobalQuotaCalled func(numReceived uint32, sizeReceived uint64, numProcessed uint32, sizeProcessed uint64) } func (qshs *QuotaStatusHandlerStub) ResetStatistics() { @@ -20,6 +21,15 @@ func (qshs *QuotaStatusHandlerStub) AddQuota( qshs.AddQuotaCalled(identifier, numReceived, sizeReceived, numProcessed, sizeProcessed) } +func (qshs *QuotaStatusHandlerStub) SetGlobalQuota( + numReceived uint32, + sizeReceived uint64, + numProcessed uint32, + sizeProcessed uint64, +) { + qshs.SetGlobalQuotaCalled(numReceived, sizeReceived, numProcessed, sizeProcessed) +} + func (qshs *QuotaStatusHandlerStub) IsInterfaceNil() bool { return qshs == nil } diff --git a/process/throttle/antiflood/interface.go b/process/throttle/antiflood/interface.go index 9e7b14355d9..18c3a441166 100644 --- a/process/throttle/antiflood/interface.go +++ b/process/throttle/antiflood/interface.go @@ -4,7 +4,7 @@ package antiflood // by the system type QuotaStatusHandler interface { ResetStatistics() - AddQuota(identifier string, numReceived uint32, sizeReceived uint64, - numProcessed uint32, sizeProcessed uint64) + AddQuota(identifier string, numReceived uint32, sizeReceived uint64, numProcessed uint32, sizeProcessed uint64) + SetGlobalQuota(numReceived uint32, sizeReceived uint64, numProcessed uint32, sizeProcessed uint64) IsInterfaceNil() bool } diff --git a/process/throttle/antiflood/quotaFloodPreventer.go b/process/throttle/antiflood/quotaFloodPreventer.go index 03c90ff1e6b..15aee8e7e4a 100644 --- a/process/throttle/antiflood/quotaFloodPreventer.go +++ b/process/throttle/antiflood/quotaFloodPreventer.go @@ -89,17 +89,40 @@ func NewQuotaFloodPreventer( }, nil } -// Increment tries to increment the counter values held at "identifier" position +// IncrementAddingToSum tries to increment the counter values held at "identifier" position // It returns true if it had succeeded incrementing (existing counter value is lower or equal with provided maxOperations) // We need the mutOperation here as the get and put should be done atomically. // Otherwise we might yield a slightly higher number of false valid increments -func (qfp *quotaFloodPreventer) Increment(identifier string, size uint64) bool { +// This method also checks the global sum quota and increment its values +func (qfp *quotaFloodPreventer) IncrementAddingToSum(identifier string, size uint64) bool { qfp.mutOperation.Lock() defer qfp.mutOperation.Unlock() qfp.globalQuota.numReceivedMessages++ qfp.globalQuota.sizeReceivedMessages += size + result := qfp.increment(identifier, size) + if result { + qfp.globalQuota.numProcessedMessages++ + qfp.globalQuota.sizeProcessedMessages += size + } + + return result +} + +// Increment tries to increment the counter values held at "identifier" position +// It returns true if it had succeeded incrementing (existing counter value is lower or equal with provided maxOperations) +// We need the mutOperation here as the get and put should be done atomically. +// Otherwise we might yield a slightly higher number of false valid increments +// This method also checks the global sum quota but does not increment its values +func (qfp *quotaFloodPreventer) Increment(identifier string, size uint64) bool { + qfp.mutOperation.Lock() + defer qfp.mutOperation.Unlock() + + return qfp.increment(identifier, size) +} + +func (qfp *quotaFloodPreventer) increment(identifier string, size uint64) bool { isGlobalQuotaReached := qfp.globalQuota.numReceivedMessages > qfp.maxMessages || qfp.globalQuota.sizeReceivedMessages > qfp.maxSize if isGlobalQuotaReached { @@ -182,6 +205,13 @@ func (qfp quotaFloodPreventer) createStatistics() { q.sizeProcessedMessages, ) } + + qfp.statusHandler.SetGlobalQuota( + qfp.globalQuota.numReceivedMessages, + qfp.globalQuota.sizeReceivedMessages, + qfp.globalQuota.numProcessedMessages, + qfp.globalQuota.sizeProcessedMessages, + ) } // IsInterfaceNil returns true if there is no value under the interface diff --git a/process/throttle/antiflood/quotaFloodPreventer_test.go b/process/throttle/antiflood/quotaFloodPreventer_test.go index d2a66cce3dd..b7f5d31d652 100644 --- a/process/throttle/antiflood/quotaFloodPreventer_test.go +++ b/process/throttle/antiflood/quotaFloodPreventer_test.go @@ -16,6 +16,7 @@ func createMockQuotaStatusHandler() *mock.QuotaStatusHandlerStub { return &mock.QuotaStatusHandlerStub{ ResetStatisticsCalled: func() {}, AddQuotaCalled: func(_ string, _ uint32, _ uint64, _ uint32, _ uint64) {}, + SetGlobalQuotaCalled: func(_ uint32, _ uint64, _ uint32, _ uint64) {}, } } @@ -246,6 +247,65 @@ func TestNewQuotaFloodPreventer_IncrementUnderMaxValuesShouldIncrementAndReturnT assert.True(t, putWasCalled) } +func TestNewQuotaFloodPreventer_IncrementAddingSumWithResetShouldWork(t *testing.T) { + t.Parallel() + + putWasCalled := 0 + addedGlobalQuotaCalled := false + existingSize := uint64(0) + existingMessages := uint32(0) + existingQuota := "a{ + numReceivedMessages: existingMessages, + sizeReceivedMessages: existingSize, + } + identifier := "identifier" + size := uint64(minTotalSize * 2) + qfp, _ := NewQuotaFloodPreventer( + &mock.CacherStub{ + GetCalled: func(key []byte) (value interface{}, ok bool) { + return existingQuota, true + }, + PutCalled: func(key []byte, value interface{}) (evicted bool) { + if string(key) == identifier { + putWasCalled++ + } + + return + }, + KeysCalled: func() [][]byte { + return make([][]byte, 0) + }, + ClearCalled: func() {}, + }, + &mock.QuotaStatusHandlerStub{ + AddQuotaCalled: func(_ string, _ uint32, _ uint64, _ uint32, _ uint64) {}, + SetGlobalQuotaCalled: func(numReceived uint32, sizeReceived uint64, numProcessed uint32, sizeProcessed uint64) { + addedGlobalQuotaCalled = true + assert.Equal(t, uint32(2), numReceived) + assert.Equal(t, size+size+1, sizeReceived) + assert.Equal(t, uint32(2), numProcessed) + assert.Equal(t, size+size+1, sizeProcessed) + }, + ResetStatisticsCalled: func() {}, + }, + minMessages*4, + minTotalSize*10, + minMessages*4, + minTotalSize*10, + ) + + ok := qfp.IncrementAddingToSum(identifier, size) + assert.True(t, ok) + + ok = qfp.IncrementAddingToSum(identifier, size+1) + assert.True(t, ok) + + qfp.Reset() + + assert.Equal(t, 2, putWasCalled) + assert.True(t, addedGlobalQuotaCalled) +} + //------- Increment per peer func TestNewQuotaFloodPreventer_IncrementOverMaxPeerNumMessagesShouldNotPutAndReturnFalse(t *testing.T) { @@ -477,6 +537,7 @@ func TestCountersMap_ResetShouldCallQuotaStatus(t *testing.T) { assert.Equal(t, quotaToCompare, quotaProvided) }, + SetGlobalQuotaCalled: func(_ uint32, _ uint64, _ uint32, _ uint64) {}, }, minTotalSize, minMessages, diff --git a/statusHandler/p2pQuota/p2pQuotaProcessor.go b/statusHandler/p2pQuota/p2pQuotaProcessor.go index 0ed39b48814..a05a1e13e4e 100644 --- a/statusHandler/p2pQuota/p2pQuotaProcessor.go +++ b/statusHandler/p2pQuota/p2pQuotaProcessor.go @@ -21,6 +21,7 @@ type p2pQuotaProcessor struct { mutStatistics sync.Mutex statistics map[string]*quota peakNetworkQuota *quota + networkQuota *quota peakPeerQuota *quota peakNumReceivers uint64 handler core.AppStatusHandler @@ -35,6 +36,7 @@ func NewP2pQuotaProcessor(handler core.AppStatusHandler) (*p2pQuotaProcessor, er return &p2pQuotaProcessor{ statistics: make(map[string]*quota), peakNetworkQuota: "a{}, + networkQuota: "a{}, peakPeerQuota: "a{}, handler: handler, }, nil @@ -42,40 +44,43 @@ func NewP2pQuotaProcessor(handler core.AppStatusHandler) (*p2pQuotaProcessor, er // ResetStatistics output gathered statistics, process and prints them. After that it empties the statistics map func (pqp *p2pQuotaProcessor) ResetStatistics() { - networkQuota := "a{} - peakPeerQuota := "a{} - pqp.mutStatistics.Lock() defer pqp.mutStatistics.Unlock() - for _, q := range pqp.statistics { - networkQuota.numReceivedMessages += q.numReceivedMessages - networkQuota.sizeReceivedMessages += q.sizeReceivedMessages - networkQuota.numProcessedMessages += q.numProcessedMessages - networkQuota.sizeProcessedMessages += q.sizeProcessedMessages + peakPeerQuota := pqp.computePeerStatistics() + numPeers := uint64(len(pqp.statistics)) + pqp.setPeakStatistics(peakPeerQuota, numPeers) + + pqp.moveStatisticsInAppStatusHandler(peakPeerQuota, pqp.networkQuota, numPeers, pqp.peakNumReceivers) + + pqp.statistics = make(map[string]*quota) +} +func (pqp *p2pQuotaProcessor) computePeerStatistics() *quota { + peakPeerQuota := "a{} + + for _, q := range pqp.statistics { peakPeerQuota.numReceivedMessages = core.MaxUint32(peakPeerQuota.numReceivedMessages, q.numReceivedMessages) peakPeerQuota.sizeReceivedMessages = core.MaxUint64(peakPeerQuota.sizeReceivedMessages, q.sizeReceivedMessages) peakPeerQuota.numProcessedMessages = core.MaxUint32(peakPeerQuota.numProcessedMessages, q.numProcessedMessages) peakPeerQuota.sizeProcessedMessages = core.MaxUint64(peakPeerQuota.sizeProcessedMessages, q.sizeProcessedMessages) } + return peakPeerQuota +} + +func (pqp *p2pQuotaProcessor) setPeakStatistics(peakPeerQuota *quota, numPeers uint64) { pqp.peakPeerQuota.numReceivedMessages = core.MaxUint32(peakPeerQuota.numReceivedMessages, pqp.peakPeerQuota.numReceivedMessages) pqp.peakPeerQuota.sizeReceivedMessages = core.MaxUint64(peakPeerQuota.sizeReceivedMessages, pqp.peakPeerQuota.sizeReceivedMessages) pqp.peakPeerQuota.numProcessedMessages = core.MaxUint32(peakPeerQuota.numProcessedMessages, pqp.peakPeerQuota.numProcessedMessages) pqp.peakPeerQuota.sizeProcessedMessages = core.MaxUint64(peakPeerQuota.sizeProcessedMessages, pqp.peakPeerQuota.sizeProcessedMessages) - pqp.peakNetworkQuota.numReceivedMessages = core.MaxUint32(networkQuota.numReceivedMessages, pqp.peakNetworkQuota.numReceivedMessages) - pqp.peakNetworkQuota.sizeReceivedMessages = core.MaxUint64(networkQuota.sizeReceivedMessages, pqp.peakNetworkQuota.sizeReceivedMessages) - pqp.peakNetworkQuota.numProcessedMessages = core.MaxUint32(networkQuota.numProcessedMessages, pqp.peakNetworkQuota.numProcessedMessages) - pqp.peakNetworkQuota.sizeProcessedMessages = core.MaxUint64(networkQuota.sizeProcessedMessages, pqp.peakNetworkQuota.sizeProcessedMessages) + pqp.peakNetworkQuota.numReceivedMessages = core.MaxUint32(pqp.networkQuota.numReceivedMessages, pqp.peakNetworkQuota.numReceivedMessages) + pqp.peakNetworkQuota.sizeReceivedMessages = core.MaxUint64(pqp.networkQuota.sizeReceivedMessages, pqp.peakNetworkQuota.sizeReceivedMessages) + pqp.peakNetworkQuota.numProcessedMessages = core.MaxUint32(pqp.networkQuota.numProcessedMessages, pqp.peakNetworkQuota.numProcessedMessages) + pqp.peakNetworkQuota.sizeProcessedMessages = core.MaxUint64(pqp.networkQuota.sizeProcessedMessages, pqp.peakNetworkQuota.sizeProcessedMessages) - numPeers := uint64(len(pqp.statistics)) pqp.peakNumReceivers = core.MaxUint64(numPeers, pqp.peakNumReceivers) - - pqp.moveStatisticsInAppStatusHandler(peakPeerQuota, networkQuota, numPeers, pqp.peakNumReceivers) - - pqp.statistics = make(map[string]*quota) } func (pqp *p2pQuotaProcessor) moveStatisticsInAppStatusHandler( @@ -129,6 +134,23 @@ func (pqp *p2pQuotaProcessor) AddQuota( pqp.mutStatistics.Unlock() } +// SetGlobalQuota sets the global quota statistics +func (pqp *p2pQuotaProcessor) SetGlobalQuota( + numReceived uint32, + sizeReceived uint64, + numProcessed uint32, + sizeProcessed uint64, +) { + pqp.mutStatistics.Lock() + pqp.networkQuota = "a{ + numReceivedMessages: numReceived, + sizeReceivedMessages: sizeReceived, + numProcessedMessages: numProcessed, + sizeProcessedMessages: sizeProcessed, + } + pqp.mutStatistics.Unlock() +} + // IsInterfaceNil returns true if there is no value under the interface func (pqp *p2pQuotaProcessor) IsInterfaceNil() bool { return pqp == nil diff --git a/statusHandler/p2pQuota/p2pQuotaProcessor_test.go b/statusHandler/p2pQuota/p2pQuotaProcessor_test.go index f9229c79515..ce7f2551867 100644 --- a/statusHandler/p2pQuota/p2pQuotaProcessor_test.go +++ b/statusHandler/p2pQuota/p2pQuotaProcessor_test.go @@ -63,36 +63,42 @@ func TestP2pQuotaProcessor_ResetStatisticsShouldEmptyStatsAndCallSetOnAllMetrics numProcessed1 := uint64(3) sizeProcessed1 := uint64(4) + numReceivedNetwork := uint64(5) + sizeReceivedNetwork := uint64(6) + numProcessedNetwork := uint64(7) + sizeProcessedNetwork := uint64(8) + status := mock.NewAppStatusHandlerMock() pqp, _ := p2pQuota.NewP2pQuotaProcessor(status) pqp.AddQuota(identifier1, uint32(numReceived1), sizeReceived1, uint32(numProcessed1), sizeProcessed1) + pqp.SetGlobalQuota(uint32(numReceivedNetwork), sizeReceivedNetwork, uint32(numProcessedNetwork), sizeProcessedNetwork) pqp.ResetStatistics() assert.Nil(t, pqp.GetQuota(identifier1)) numReceivers := uint64(1) - checkSumMetrics(t, status, numReceived1, sizeReceived1, numProcessed1, sizeProcessed1) - checkTopSumMetrics(t, status, numReceived1, sizeReceived1, numProcessed1, sizeProcessed1) - checkMaxMetrics(t, status, numReceived1, sizeReceived1, numProcessed1, sizeProcessed1) - checkTopMaxMetrics(t, status, numReceived1, sizeReceived1, numProcessed1, sizeProcessed1) + checkNetworkMetrics(t, status, numReceivedNetwork, sizeReceivedNetwork, numProcessedNetwork, sizeProcessedNetwork) + checkPeakNetworkMetrics(t, status, numReceivedNetwork, sizeReceivedNetwork, numProcessedNetwork, sizeProcessedNetwork) + checkPeerMetrics(t, status, numReceived1, sizeReceived1, numProcessed1, sizeProcessed1) + checkPeakPeerMetrics(t, status, numReceived1, sizeReceived1, numProcessed1, sizeProcessed1) checkNumReceivers(t, status, numReceivers, numReceivers) } -func TestP2pQuotaProcessor_ResetStatisticsShouldSetTops(t *testing.T) { +func TestP2pQuotaProcessor_ResetStatisticsShouldSetPeerStatisticsTops(t *testing.T) { t.Parallel() identifier1 := "identifier" - numReceived1 := uint64(1) - sizeReceived1 := uint64(2) - numProcessed1 := uint64(3) - sizeProcessed1 := uint64(4) + numReceived1 := uint64(10) + sizeReceived1 := uint64(20) + numProcessed1 := uint64(30) + sizeProcessed1 := uint64(40) identifier2 := "identifier" - numReceived2 := uint64(10) - sizeReceived2 := uint64(20) - numProcessed2 := uint64(30) - sizeProcessed2 := uint64(40) + numReceived2 := uint64(1) + sizeReceived2 := uint64(2) + numProcessed2 := uint64(3) + sizeProcessed2 := uint64(4) status := mock.NewAppStatusHandlerMock() pqp, _ := p2pQuota.NewP2pQuotaProcessor(status) @@ -103,14 +109,37 @@ func TestP2pQuotaProcessor_ResetStatisticsShouldSetTops(t *testing.T) { pqp.ResetStatistics() numReceivers := uint64(1) - checkSumMetrics(t, status, numReceived2, sizeReceived2, numProcessed2, sizeProcessed2) - checkTopSumMetrics(t, status, numReceived2, sizeReceived2, numProcessed2, sizeProcessed2) - checkMaxMetrics(t, status, numReceived2, sizeReceived2, numProcessed2, sizeProcessed2) - checkTopMaxMetrics(t, status, numReceived2, sizeReceived2, numProcessed2, sizeProcessed2) + checkPeerMetrics(t, status, numReceived2, sizeReceived2, numProcessed2, sizeProcessed2) + checkPeakPeerMetrics(t, status, numReceived1, sizeReceived1, numProcessed1, sizeProcessed1) checkNumReceivers(t, status, numReceivers, numReceivers) } -func checkSumMetrics( +func TestP2pQuotaProcessor_ResetStatisticsShouldSetNetworkStatisticsTops(t *testing.T) { + t.Parallel() + + numReceivedNetwork1 := uint64(10) + sizeReceivedNetwork1 := uint64(20) + numProcessedNetwork1 := uint64(30) + sizeProcessedNetwork1 := uint64(40) + + numReceivedNetwork2 := uint64(1) + sizeReceivedNetwork2 := uint64(2) + numProcessedNetwork2 := uint64(3) + sizeProcessedNetwork2 := uint64(4) + + status := mock.NewAppStatusHandlerMock() + pqp, _ := p2pQuota.NewP2pQuotaProcessor(status) + pqp.SetGlobalQuota(uint32(numReceivedNetwork1), sizeReceivedNetwork1, uint32(numProcessedNetwork1), sizeProcessedNetwork1) + pqp.ResetStatistics() + pqp.SetGlobalQuota(uint32(numReceivedNetwork2), sizeReceivedNetwork2, uint32(numProcessedNetwork2), sizeProcessedNetwork2) + + pqp.ResetStatistics() + + checkNetworkMetrics(t, status, numReceivedNetwork2, sizeReceivedNetwork2, numProcessedNetwork2, sizeProcessedNetwork2) + checkPeakNetworkMetrics(t, status, numReceivedNetwork1, sizeReceivedNetwork1, numProcessedNetwork1, sizeProcessedNetwork1) +} + +func checkNetworkMetrics( t *testing.T, status *mock.AppStatusHandlerMock, numReceived uint64, @@ -132,7 +161,7 @@ func checkSumMetrics( assert.Equal(t, value, sizeProcessed) } -func checkTopSumMetrics( +func checkPeakNetworkMetrics( t *testing.T, status *mock.AppStatusHandlerMock, numReceived uint64, @@ -154,7 +183,7 @@ func checkTopSumMetrics( assert.Equal(t, value, sizeProcessed) } -func checkMaxMetrics( +func checkPeerMetrics( t *testing.T, status *mock.AppStatusHandlerMock, numReceived uint64, @@ -176,7 +205,7 @@ func checkMaxMetrics( assert.Equal(t, value, sizeProcessed) } -func checkTopMaxMetrics( +func checkPeakPeerMetrics( t *testing.T, status *mock.AppStatusHandlerMock, numReceived uint64,