Skip to content

Commit

Permalink
fixes after review: method and variables renaming
Browse files Browse the repository at this point in the history
  • Loading branch information
iulianpascalau committed Dec 26, 2019
1 parent 4628de3 commit 7382b7a
Show file tree
Hide file tree
Showing 11 changed files with 90 additions and 91 deletions.
24 changes: 12 additions & 12 deletions cmd/node/factory/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,16 +482,16 @@ func createAntifloodComponent(mainConfig *config.Config, status core.AppStatusHa
return nil, err
}

maxMessagesPerPeer := mainConfig.Antiflood.PeerMaxMessagesPerSecond
maxTotalSizePerPeer := mainConfig.Antiflood.PeerMaxTotalSizePerSecond
maxMessages := mainConfig.Antiflood.MaxMessagesPerSecond
maxTotalSize := mainConfig.Antiflood.MaxTotalSizePerSecond
peerMaxMessagesPerSecond := mainConfig.Antiflood.PeerMaxMessagesPerSecond
peerMaxTotalSizePerSecond := mainConfig.Antiflood.PeerMaxTotalSizePerSecond
maxMessagesPerSecond := mainConfig.Antiflood.MaxMessagesPerSecond
maxTotalSizePerSecond := mainConfig.Antiflood.MaxTotalSizePerSecond

log.Debug("started antiflood component",
"maxMessagesPerPeer", maxMessagesPerPeer,
"maxTotalSizePerPeer", core.ConvertBytes(maxTotalSizePerPeer),
"maxMessages", maxMessages,
"maxTotalSize", core.ConvertBytes(maxTotalSize),
"peerMaxMessagesPerSecond", peerMaxMessagesPerSecond,
"peerMaxTotalSizePerSecond", core.ConvertBytes(peerMaxTotalSizePerSecond),
"maxMessagesPerSecond", maxMessagesPerSecond,
"maxTotalSizePerSecond", core.ConvertBytes(maxTotalSizePerSecond),
)

quotaProcessor, err := p2pQuota.NewP2pQuotaProcessor(status)
Expand All @@ -502,10 +502,10 @@ func createAntifloodComponent(mainConfig *config.Config, status core.AppStatusHa
floodPreventer, err := antifloodThrottle.NewQuotaFloodPreventer(
antifloodCache,
quotaProcessor,
maxMessagesPerPeer,
maxTotalSizePerPeer,
maxMessages,
maxTotalSize,
peerMaxMessagesPerSecond,
peerMaxTotalSizePerSecond,
maxMessagesPerSecond,
maxTotalSizePerSecond,
)
if err != nil {
return nil, err
Expand Down
26 changes: 13 additions & 13 deletions integrationTests/p2p/antiflood/antiflooding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ func TestAntifloodWithNumMessagesFromTheSamePeer(t *testing.T) {

topic := "test_topic"
broadcastMessageDuration := time.Second * 2
peerMaxMumProcessMessages := uint32(5)
maxMumProcessMessages := uint32(math.MaxUint32)
peerMaxNumProcessMessages := uint32(5)
maxNumProcessMessages := uint32(math.MaxUint32)
maxMessageSize := uint64(1 << 20) //1MB
interceptors, err := createTopicsAndMockInterceptors(
peers,
topic,
peerMaxMumProcessMessages,
peerMaxNumProcessMessages,
maxMessageSize,
maxMumProcessMessages,
maxNumProcessMessages,
maxMessageSize,
)
assert.Nil(t, err)
Expand Down Expand Up @@ -76,7 +76,7 @@ func TestAntifloodWithNumMessagesFromTheSamePeer(t *testing.T) {

isFlooding.Store(false)

checkMessagesOnPeers(t, peers, interceptors, peerMaxMumProcessMessages, floodedIdxes, protectedIdexes)
checkMessagesOnPeers(t, peers, interceptors, peerMaxNumProcessMessages, floodedIdxes, protectedIdexes)
}

// TestAntifloodWithMessagesFromOtherPeers tests what happens if a peer decide to send a number of messages
Expand All @@ -99,15 +99,15 @@ func TestAntifloodWithNumMessagesFromOtherPeers(t *testing.T) {
// (check integrationTests.CreateFixedNetworkOf14Peers function)
topic := "test_topic"
broadcastMessageDuration := time.Second * 2
peerMaxMumProcessMessages := uint32(5)
maxMumProcessMessages := uint32(math.MaxUint32)
peerMaxNumProcessMessages := uint32(5)
maxNumProcessMessages := uint32(math.MaxUint32)
maxMessageSize := uint64(1 << 20) //1MB
interceptors, err := createTopicsAndMockInterceptors(
peers,
topic,
peerMaxMumProcessMessages,
peerMaxNumProcessMessages,
maxMessageSize,
maxMumProcessMessages,
maxNumProcessMessages,
maxMessageSize,
)
assert.Nil(t, err)
Expand All @@ -131,7 +131,7 @@ func TestAntifloodWithNumMessagesFromOtherPeers(t *testing.T) {
}
time.Sleep(broadcastMessageDuration)

checkMessagesOnPeers(t, peers, interceptors, peerMaxMumProcessMessages, floodedIdxes, protectedIdexes)
checkMessagesOnPeers(t, peers, interceptors, peerMaxNumProcessMessages, floodedIdxes, protectedIdexes)
}

// TestAntifloodWithMessagesFromTheSamePeer tests what happens if a peer decide to send large messages
Expand All @@ -154,15 +154,15 @@ func TestAntifloodWithLargeSizeMessagesFromTheSamePeer(t *testing.T) {

topic := "test_topic"
broadcastMessageDuration := time.Second * 2
maxMumProcessMessages := uint32(math.MaxUint32)
maxNumProcessMessages := uint32(math.MaxUint32)
maxMessageSize := uint64(math.MaxUint64)
peerMaxMessageSize := uint64(1 << 10) //1KB
interceptors, err := createTopicsAndMockInterceptors(
peers,
topic,
maxMumProcessMessages,
maxNumProcessMessages,
peerMaxMessageSize,
maxMumProcessMessages,
maxNumProcessMessages,
maxMessageSize,
)
assert.Nil(t, err)
Expand Down
4 changes: 2 additions & 2 deletions p2p/antiflood/p2pAntiflood.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ func (af *p2pAntiflood) CanProcessMessage(message p2p.MessageP2P, fromConnectedP
}

//protect from directly connected peer
ok := floodPreventer.IncrementAddingToSum(fromConnectedPeer.Pretty(), uint64(len(message.Data())))
ok := floodPreventer.AccumulateGlobal(fromConnectedPeer.Pretty(), uint64(len(message.Data())))
if !ok {
return fmt.Errorf("%w in p2pAntiflood for connected peer", p2p.ErrSystemBusy)
}

if fromConnectedPeer != message.Peer() {
//protect from the flooding messages that originate from the same source but come from different peers
ok = floodPreventer.Increment(message.Peer().Pretty(), uint64(len(message.Data())))
ok = floodPreventer.Accumulate(message.Peer().Pretty(), uint64(len(message.Data())))
if !ok {
return fmt.Errorf("%w in p2pAntiflood for originator", p2p.ErrSystemBusy)
}
Expand Down
10 changes: 5 additions & 5 deletions p2p/antiflood/p2pAntiflood_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestP2pAntiflood_CanNotIncrementFromConnectedPeerShouldError(t *testing.T)
FromField: messageOriginator,
}
afm, _ := antiflood.NewP2pAntiflood(&mock.FloodPreventerStub{
IncrementAddingToSumCalled: func(identifier string, size uint64) bool {
AccumulateGlobalCalled: func(identifier string, size uint64) bool {
if identifier != fromConnectedPeer.Pretty() {
assert.Fail(t, "should have been the connected peer")
}
Expand All @@ -92,10 +92,10 @@ func TestP2pAntiflood_CanNotIncrementMessageOriginatorShouldError(t *testing.T)
PeerField: p2p.PeerID(messageOriginator),
}
afm, _ := antiflood.NewP2pAntiflood(&mock.FloodPreventerStub{
IncrementAddingToSumCalled: func(identifier string, size uint64) bool {
AccumulateGlobalCalled: func(identifier string, size uint64) bool {
return identifier == fromConnectedPeer.Pretty()
},
IncrementCalled: func(identifier string, size uint64) bool {
AccumulateCalled: func(identifier string, size uint64) bool {
return identifier != message.PeerField.Pretty()
},
})
Expand All @@ -114,10 +114,10 @@ func TestP2pAntiflood_ShouldWork(t *testing.T) {
PeerField: p2p.PeerID(messageOriginator),
}
afm, _ := antiflood.NewP2pAntiflood(&mock.FloodPreventerStub{
IncrementAddingToSumCalled: func(identifier string, size uint64) bool {
AccumulateGlobalCalled: func(identifier string, size uint64) bool {
return true
},
IncrementCalled: func(identifier string, size uint64) bool {
AccumulateCalled: func(identifier string, size uint64) bool {
return true
},
})
Expand Down
14 changes: 7 additions & 7 deletions p2p/mock/floodPreventerStub.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package mock

type FloodPreventerStub struct {
IncrementAddingToSumCalled func(identifier string, size uint64) bool
IncrementCalled func(identifier string, size uint64) bool
ResetCalled func()
AccumulateGlobalCalled func(identifier string, size uint64) bool
AccumulateCalled func(identifier string, size uint64) bool
ResetCalled func()
}

func (fps *FloodPreventerStub) IncrementAddingToSum(identifier string, size uint64) bool {
return fps.IncrementAddingToSumCalled(identifier, size)
func (fps *FloodPreventerStub) AccumulateGlobal(identifier string, size uint64) bool {
return fps.AccumulateGlobalCalled(identifier, size)
}

func (fps *FloodPreventerStub) Increment(identifier string, size uint64) bool {
return fps.IncrementCalled(identifier, size)
func (fps *FloodPreventerStub) Accumulate(identifier string, size uint64) bool {
return fps.AccumulateCalled(identifier, size)
}

func (fps *FloodPreventerStub) Reset() {
Expand Down
4 changes: 2 additions & 2 deletions p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ type PeerDiscoveryFactory interface {
// FloodPreventer defines the behavior of a component that is able to signal that too many events occurred
// on a provided identifier between Reset calls
type FloodPreventer interface {
IncrementAddingToSum(identifier string, size uint64) bool
Increment(identifier string, size uint64) bool
AccumulateGlobal(identifier string, size uint64) bool
Accumulate(identifier string, size uint64) bool
Reset()
IsInterfaceNil() bool
}
4 changes: 2 additions & 2 deletions process/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,8 +543,8 @@ type InterceptedHeaderSigVerifier interface {
// FloodPreventer defines the behavior of a component that is able to signal that too many events occurred
// on a provided identifier between Reset calls
type FloodPreventer interface {
IncrementAddingToSum(identifier string, size uint64) bool
Increment(identifier string, size uint64) bool
AccumulateGlobal(identifier string, size uint64) bool
Accumulate(identifier string, size uint64) bool
Reset()
IsInterfaceNil() bool
}
Expand Down
23 changes: 11 additions & 12 deletions process/throttle/antiflood/quotaFloodPreventer.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,40 +89,40 @@ func NewQuotaFloodPreventer(
}, nil
}

// IncrementAddingToSum tries to increment the counter values held at "identifier" position
// AccumulateGlobal tries to increment the counter values held at "identifier" position
// It returns true if it had succeeded incrementing (existing counter value is lower or equal with provided maxOperations)
// We need the mutOperation here as the get and put should be done atomically.
// Otherwise we might yield a slightly higher number of false valid increments
// This method also checks the global sum quota and increment its values
func (qfp *quotaFloodPreventer) IncrementAddingToSum(identifier string, size uint64) bool {
func (qfp *quotaFloodPreventer) AccumulateGlobal(identifier string, size uint64) bool {
qfp.mutOperation.Lock()
defer qfp.mutOperation.Unlock()

qfp.globalQuota.numReceivedMessages++
qfp.globalQuota.sizeReceivedMessages += size

result := qfp.increment(identifier, size)
if result {
isQuotaNotReached := qfp.accumulate(identifier, size)
if isQuotaNotReached {
qfp.globalQuota.numProcessedMessages++
qfp.globalQuota.sizeProcessedMessages += size
}
qfp.mutOperation.Unlock()

return result
return isQuotaNotReached
}

// Increment tries to increment the counter values held at "identifier" position
// Accumulate tries to increment the counter values held at "identifier" position
// It returns true if it had succeeded incrementing (existing counter value is lower or equal with provided maxOperations)
// We need the mutOperation here as the get and put should be done atomically.
// Otherwise we might yield a slightly higher number of false valid increments
// This method also checks the global sum quota but does not increment its values
func (qfp *quotaFloodPreventer) Increment(identifier string, size uint64) bool {
func (qfp *quotaFloodPreventer) Accumulate(identifier string, size uint64) bool {
qfp.mutOperation.Lock()
defer qfp.mutOperation.Unlock()

return qfp.increment(identifier, size)
return qfp.accumulate(identifier, size)
}

func (qfp *quotaFloodPreventer) increment(identifier string, size uint64) bool {
func (qfp *quotaFloodPreventer) accumulate(identifier string, size uint64) bool {
isGlobalQuotaReached := qfp.globalQuota.numReceivedMessages > qfp.maxMessages ||
qfp.globalQuota.sizeReceivedMessages > qfp.maxSize
if isGlobalQuotaReached {
Expand Down Expand Up @@ -174,6 +174,7 @@ func (qfp *quotaFloodPreventer) Reset() {
qfp.mutOperation.Lock()
defer qfp.mutOperation.Unlock()

qfp.statusHandler.ResetStatistics()
qfp.createStatistics()

//TODO change this if cacher.Clear() is time consuming
Expand All @@ -183,8 +184,6 @@ func (qfp *quotaFloodPreventer) Reset() {

// createStatistics is useful to benchmark the system when running
func (qfp quotaFloodPreventer) createStatistics() {
qfp.statusHandler.ResetStatistics()

keys := qfp.cacher.Keys()
for _, k := range keys {
val, ok := qfp.cacher.Get(k)
Expand Down
Loading

0 comments on commit 7382b7a

Please sign in to comment.