Skip to content

Commit

Permalink
fixed network statistics computing on antiflood component
Browse files Browse the repository at this point in the history
cleaned code
  • Loading branch information
iulianpascalau committed Dec 18, 2019
1 parent d2f0e41 commit c284681
Show file tree
Hide file tree
Showing 12 changed files with 220 additions and 50 deletions.
4 changes: 4 additions & 0 deletions integrationTests/p2p/antiflood/nilQuotaStatusHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ func (nqsh *nilQuotaStatusHandler) ResetStatistics() {
func (nqsh *nilQuotaStatusHandler) AddQuota(_ string, _ uint32, _ uint64, _ uint32, _ uint64) {
}

// SetGlobalQuota is not implemented
func (nqsh *nilQuotaStatusHandler) SetGlobalQuota(_ uint32, _ uint64, _ uint32, _ uint64) {
}

// IsInterfaceNil returns true if there is no value under the interface
func (nqsh *nilQuotaStatusHandler) IsInterfaceNil() bool {
return nqsh == nil
Expand Down
2 changes: 1 addition & 1 deletion p2p/antiflood/p2pAntiflood.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (af *p2pAntiflood) CanProcessMessage(message p2p.MessageP2P, fromConnectedP
}

//protect from directly connected peer
ok := floodPreventer.Increment(fromConnectedPeer.Pretty(), uint64(len(message.Data())))
ok := floodPreventer.IncrementAddingToSum(fromConnectedPeer.Pretty(), uint64(len(message.Data())))
if !ok {
return fmt.Errorf("%w in p2pAntiflood for connected peer", p2p.ErrSystemBusy)
}
Expand Down
17 changes: 12 additions & 5 deletions p2p/antiflood/p2pAntiflood_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestP2pAntiflood_CanNotIncrementFromConnectedPeerShouldError(t *testing.T)
FromField: messageOriginator,
}
afm, _ := antiflood.NewP2pAntiflood(&mock.FloodPreventerStub{
IncrementCalled: func(identifier string, size uint64) bool {
IncrementAddingToSumCalled: func(identifier string, size uint64) bool {
if identifier != fromConnectedPeer.Pretty() {
assert.Fail(t, "should have been the connected peer")
}
Expand All @@ -92,16 +92,20 @@ func TestP2pAntiflood_CanNotIncrementMessageOriginatorShouldError(t *testing.T)
PeerField: p2p.PeerID(messageOriginator),
}
afm, _ := antiflood.NewP2pAntiflood(&mock.FloodPreventerStub{
IncrementCalled: func(identifier string, size uint64) bool {
IncrementAddingToSumCalled: func(identifier string, size uint64) bool {
if identifier == fromConnectedPeer.Pretty() {
return true
}
if identifier != message.PeerField.Pretty() {
assert.Fail(t, "should have been the originator")
}

return false
},
IncrementCalled: func(identifier string, size uint64) bool {
if identifier == message.PeerField.Pretty() {
return false
}

return true
},
})

err := afm.CanProcessMessage(message, fromConnectedPeer)
Expand All @@ -118,6 +122,9 @@ func TestP2pAntiflood_ShouldWork(t *testing.T) {
PeerField: p2p.PeerID(messageOriginator),
}
afm, _ := antiflood.NewP2pAntiflood(&mock.FloodPreventerStub{
IncrementAddingToSumCalled: func(identifier string, size uint64) bool {
return true
},
IncrementCalled: func(identifier string, size uint64) bool {
return true
},
Expand Down
9 changes: 7 additions & 2 deletions p2p/mock/floodPreventerStub.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package mock

type FloodPreventerStub struct {
IncrementCalled func(identifier string, size uint64) bool
ResetCalled func()
IncrementAddingToSumCalled func(identifier string, size uint64) bool
IncrementCalled func(identifier string, size uint64) bool
ResetCalled func()
}

func (fps *FloodPreventerStub) IncrementAddingToSum(identifier string, size uint64) bool {
return fps.IncrementAddingToSumCalled(identifier, size)
}

func (fps *FloodPreventerStub) Increment(identifier string, size uint64) bool {
Expand Down
1 change: 1 addition & 0 deletions p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ type PeerDiscoveryFactory interface {
// FloodPreventer defines the behavior of a component that is able to signal that too many events occurred
// on a provided identifier between Reset calls
type FloodPreventer interface {
IncrementAddingToSum(identifier string, size uint64) bool
Increment(identifier string, size uint64) bool
Reset()
IsInterfaceNil() bool
Expand Down
1 change: 1 addition & 0 deletions process/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ type InterceptedHeaderSigVerifier interface {
// FloodPreventer defines the behavior of a component that is able to signal that too many events occurred
// on a provided identifier between Reset calls
type FloodPreventer interface {
IncrementAddingToSum(identifier string, size uint64) bool
Increment(identifier string, size uint64) bool
Reset()
IsInterfaceNil() bool
Expand Down
10 changes: 10 additions & 0 deletions process/mock/quotaStatusHandlerStub.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ type QuotaStatusHandlerStub struct {
ResetStatisticsCalled func()
AddQuotaCalled func(identifier string, numReceivedMessages uint32, sizeReceivedMessages uint64,
numProcessedMessages uint32, sizeProcessedMessages uint64)
SetGlobalQuotaCalled func(numReceived uint32, sizeReceived uint64, numProcessed uint32, sizeProcessed uint64)
}

func (qshs *QuotaStatusHandlerStub) ResetStatistics() {
Expand All @@ -20,6 +21,15 @@ func (qshs *QuotaStatusHandlerStub) AddQuota(
qshs.AddQuotaCalled(identifier, numReceived, sizeReceived, numProcessed, sizeProcessed)
}

func (qshs *QuotaStatusHandlerStub) SetGlobalQuota(
numReceived uint32,
sizeReceived uint64,
numProcessed uint32,
sizeProcessed uint64,
) {
qshs.SetGlobalQuotaCalled(numReceived, sizeReceived, numProcessed, sizeProcessed)
}

func (qshs *QuotaStatusHandlerStub) IsInterfaceNil() bool {
return qshs == nil
}
4 changes: 2 additions & 2 deletions process/throttle/antiflood/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package antiflood
// by the system
type QuotaStatusHandler interface {
ResetStatistics()
AddQuota(identifier string, numReceived uint32, sizeReceived uint64,
numProcessed uint32, sizeProcessed uint64)
AddQuota(identifier string, numReceived uint32, sizeReceived uint64, numProcessed uint32, sizeProcessed uint64)
SetGlobalQuota(numReceived uint32, sizeReceived uint64, numProcessed uint32, sizeProcessed uint64)
IsInterfaceNil() bool
}
34 changes: 32 additions & 2 deletions process/throttle/antiflood/quotaFloodPreventer.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,40 @@ func NewQuotaFloodPreventer(
}, nil
}

// Increment tries to increment the counter values held at "identifier" position
// IncrementAddingToSum tries to increment the counter values held at "identifier" position
// It returns true if it had succeeded incrementing (existing counter value is lower or equal with provided maxOperations)
// We need the mutOperation here as the get and put should be done atomically.
// Otherwise we might yield a slightly higher number of false valid increments
func (qfp *quotaFloodPreventer) Increment(identifier string, size uint64) bool {
// This method also checks the global sum quota and increment its values
func (qfp *quotaFloodPreventer) IncrementAddingToSum(identifier string, size uint64) bool {
qfp.mutOperation.Lock()
defer qfp.mutOperation.Unlock()

qfp.globalQuota.numReceivedMessages++
qfp.globalQuota.sizeReceivedMessages += size

result := qfp.increment(identifier, size)
if result {
qfp.globalQuota.numProcessedMessages++
qfp.globalQuota.sizeProcessedMessages += size
}

return result
}

// Increment tries to increment the counter values held at "identifier" position
// It returns true if it had succeeded incrementing (existing counter value is lower or equal with provided maxOperations)
// We need the mutOperation here as the get and put should be done atomically.
// Otherwise we might yield a slightly higher number of false valid increments
// This method also checks the global sum quota but does not increment its values
func (qfp *quotaFloodPreventer) Increment(identifier string, size uint64) bool {
qfp.mutOperation.Lock()
defer qfp.mutOperation.Unlock()

return qfp.increment(identifier, size)
}

func (qfp *quotaFloodPreventer) increment(identifier string, size uint64) bool {
isGlobalQuotaReached := qfp.globalQuota.numReceivedMessages > qfp.maxMessages ||
qfp.globalQuota.sizeReceivedMessages > qfp.maxSize
if isGlobalQuotaReached {
Expand Down Expand Up @@ -182,6 +205,13 @@ func (qfp quotaFloodPreventer) createStatistics() {
q.sizeProcessedMessages,
)
}

qfp.statusHandler.SetGlobalQuota(
qfp.globalQuota.numReceivedMessages,
qfp.globalQuota.sizeReceivedMessages,
qfp.globalQuota.numProcessedMessages,
qfp.globalQuota.sizeProcessedMessages,
)
}

// IsInterfaceNil returns true if there is no value under the interface
Expand Down
61 changes: 61 additions & 0 deletions process/throttle/antiflood/quotaFloodPreventer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func createMockQuotaStatusHandler() *mock.QuotaStatusHandlerStub {
return &mock.QuotaStatusHandlerStub{
ResetStatisticsCalled: func() {},
AddQuotaCalled: func(_ string, _ uint32, _ uint64, _ uint32, _ uint64) {},
SetGlobalQuotaCalled: func(_ uint32, _ uint64, _ uint32, _ uint64) {},
}
}

Expand Down Expand Up @@ -246,6 +247,65 @@ func TestNewQuotaFloodPreventer_IncrementUnderMaxValuesShouldIncrementAndReturnT
assert.True(t, putWasCalled)
}

func TestNewQuotaFloodPreventer_IncrementAddingSumWithResetShouldWork(t *testing.T) {
t.Parallel()

putWasCalled := 0
addedGlobalQuotaCalled := false
existingSize := uint64(0)
existingMessages := uint32(0)
existingQuota := &quota{
numReceivedMessages: existingMessages,
sizeReceivedMessages: existingSize,
}
identifier := "identifier"
size := uint64(minTotalSize * 2)
qfp, _ := NewQuotaFloodPreventer(
&mock.CacherStub{
GetCalled: func(key []byte) (value interface{}, ok bool) {
return existingQuota, true
},
PutCalled: func(key []byte, value interface{}) (evicted bool) {
if string(key) == identifier {
putWasCalled++
}

return
},
KeysCalled: func() [][]byte {
return make([][]byte, 0)
},
ClearCalled: func() {},
},
&mock.QuotaStatusHandlerStub{
AddQuotaCalled: func(_ string, _ uint32, _ uint64, _ uint32, _ uint64) {},
SetGlobalQuotaCalled: func(numReceived uint32, sizeReceived uint64, numProcessed uint32, sizeProcessed uint64) {
addedGlobalQuotaCalled = true
assert.Equal(t, uint32(2), numReceived)
assert.Equal(t, size+size+1, sizeReceived)
assert.Equal(t, uint32(2), numProcessed)
assert.Equal(t, size+size+1, sizeProcessed)
},
ResetStatisticsCalled: func() {},
},
minMessages*4,
minTotalSize*10,
minMessages*4,
minTotalSize*10,
)

ok := qfp.IncrementAddingToSum(identifier, size)
assert.True(t, ok)

ok = qfp.IncrementAddingToSum(identifier, size+1)
assert.True(t, ok)

qfp.Reset()

assert.Equal(t, 2, putWasCalled)
assert.True(t, addedGlobalQuotaCalled)
}

//------- Increment per peer

func TestNewQuotaFloodPreventer_IncrementOverMaxPeerNumMessagesShouldNotPutAndReturnFalse(t *testing.T) {
Expand Down Expand Up @@ -477,6 +537,7 @@ func TestCountersMap_ResetShouldCallQuotaStatus(t *testing.T) {

assert.Equal(t, quotaToCompare, quotaProvided)
},
SetGlobalQuotaCalled: func(_ uint32, _ uint64, _ uint32, _ uint64) {},
},
minTotalSize,
minMessages,
Expand Down
56 changes: 39 additions & 17 deletions statusHandler/p2pQuota/p2pQuotaProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type p2pQuotaProcessor struct {
mutStatistics sync.Mutex
statistics map[string]*quota
peakNetworkQuota *quota
networkQuota *quota
peakPeerQuota *quota
peakNumReceivers uint64
handler core.AppStatusHandler
Expand All @@ -35,47 +36,51 @@ func NewP2pQuotaProcessor(handler core.AppStatusHandler) (*p2pQuotaProcessor, er
return &p2pQuotaProcessor{
statistics: make(map[string]*quota),
peakNetworkQuota: &quota{},
networkQuota: &quota{},
peakPeerQuota: &quota{},
handler: handler,
}, nil
}

// ResetStatistics output gathered statistics, process and prints them. After that it empties the statistics map
func (pqp *p2pQuotaProcessor) ResetStatistics() {
networkQuota := &quota{}
peakPeerQuota := &quota{}

pqp.mutStatistics.Lock()
defer pqp.mutStatistics.Unlock()

for _, q := range pqp.statistics {
networkQuota.numReceivedMessages += q.numReceivedMessages
networkQuota.sizeReceivedMessages += q.sizeReceivedMessages
networkQuota.numProcessedMessages += q.numProcessedMessages
networkQuota.sizeProcessedMessages += q.sizeProcessedMessages
peakPeerQuota := pqp.computePeerStatistics()
numPeers := uint64(len(pqp.statistics))
pqp.setPeakStatistics(peakPeerQuota, numPeers)

pqp.moveStatisticsInAppStatusHandler(peakPeerQuota, pqp.networkQuota, numPeers, pqp.peakNumReceivers)

pqp.statistics = make(map[string]*quota)
}

func (pqp *p2pQuotaProcessor) computePeerStatistics() *quota {
peakPeerQuota := &quota{}

for _, q := range pqp.statistics {
peakPeerQuota.numReceivedMessages = core.MaxUint32(peakPeerQuota.numReceivedMessages, q.numReceivedMessages)
peakPeerQuota.sizeReceivedMessages = core.MaxUint64(peakPeerQuota.sizeReceivedMessages, q.sizeReceivedMessages)
peakPeerQuota.numProcessedMessages = core.MaxUint32(peakPeerQuota.numProcessedMessages, q.numProcessedMessages)
peakPeerQuota.sizeProcessedMessages = core.MaxUint64(peakPeerQuota.sizeProcessedMessages, q.sizeProcessedMessages)
}

return peakPeerQuota
}

func (pqp *p2pQuotaProcessor) setPeakStatistics(peakPeerQuota *quota, numPeers uint64) {
pqp.peakPeerQuota.numReceivedMessages = core.MaxUint32(peakPeerQuota.numReceivedMessages, pqp.peakPeerQuota.numReceivedMessages)
pqp.peakPeerQuota.sizeReceivedMessages = core.MaxUint64(peakPeerQuota.sizeReceivedMessages, pqp.peakPeerQuota.sizeReceivedMessages)
pqp.peakPeerQuota.numProcessedMessages = core.MaxUint32(peakPeerQuota.numProcessedMessages, pqp.peakPeerQuota.numProcessedMessages)
pqp.peakPeerQuota.sizeProcessedMessages = core.MaxUint64(peakPeerQuota.sizeProcessedMessages, pqp.peakPeerQuota.sizeProcessedMessages)

pqp.peakNetworkQuota.numReceivedMessages = core.MaxUint32(networkQuota.numReceivedMessages, pqp.peakNetworkQuota.numReceivedMessages)
pqp.peakNetworkQuota.sizeReceivedMessages = core.MaxUint64(networkQuota.sizeReceivedMessages, pqp.peakNetworkQuota.sizeReceivedMessages)
pqp.peakNetworkQuota.numProcessedMessages = core.MaxUint32(networkQuota.numProcessedMessages, pqp.peakNetworkQuota.numProcessedMessages)
pqp.peakNetworkQuota.sizeProcessedMessages = core.MaxUint64(networkQuota.sizeProcessedMessages, pqp.peakNetworkQuota.sizeProcessedMessages)
pqp.peakNetworkQuota.numReceivedMessages = core.MaxUint32(pqp.networkQuota.numReceivedMessages, pqp.peakNetworkQuota.numReceivedMessages)
pqp.peakNetworkQuota.sizeReceivedMessages = core.MaxUint64(pqp.networkQuota.sizeReceivedMessages, pqp.peakNetworkQuota.sizeReceivedMessages)
pqp.peakNetworkQuota.numProcessedMessages = core.MaxUint32(pqp.networkQuota.numProcessedMessages, pqp.peakNetworkQuota.numProcessedMessages)
pqp.peakNetworkQuota.sizeProcessedMessages = core.MaxUint64(pqp.networkQuota.sizeProcessedMessages, pqp.peakNetworkQuota.sizeProcessedMessages)

numPeers := uint64(len(pqp.statistics))
pqp.peakNumReceivers = core.MaxUint64(numPeers, pqp.peakNumReceivers)

pqp.moveStatisticsInAppStatusHandler(peakPeerQuota, networkQuota, numPeers, pqp.peakNumReceivers)

pqp.statistics = make(map[string]*quota)
}

func (pqp *p2pQuotaProcessor) moveStatisticsInAppStatusHandler(
Expand Down Expand Up @@ -129,6 +134,23 @@ func (pqp *p2pQuotaProcessor) AddQuota(
pqp.mutStatistics.Unlock()
}

// SetGlobalQuota sets the global quota statistics
func (pqp *p2pQuotaProcessor) SetGlobalQuota(
numReceived uint32,
sizeReceived uint64,
numProcessed uint32,
sizeProcessed uint64,
) {
pqp.mutStatistics.Lock()
pqp.networkQuota = &quota{
numReceivedMessages: numReceived,
sizeReceivedMessages: sizeReceived,
numProcessedMessages: numProcessed,
sizeProcessedMessages: sizeProcessed,
}
pqp.mutStatistics.Unlock()
}

// IsInterfaceNil returns true if there is no value under the interface
func (pqp *p2pQuotaProcessor) IsInterfaceNil() bool {
return pqp == nil
Expand Down
Loading

0 comments on commit c284681

Please sign in to comment.