Skip to content

Commit

Permalink
finished implementation and integration
Browse files Browse the repository at this point in the history
  • Loading branch information
iulianpascalau committed Jan 2, 2020
1 parent 067cccd commit aadfc74
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 20 deletions.
9 changes: 7 additions & 2 deletions cmd/node/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -257,13 +257,18 @@
Type = "LRU"

[Antiflood]
PeerMaxMessagesPerSecond = 68
PeerMaxMessagesPerSecond = 75
PeerMaxTotalSizePerSecond = 2097152
MaxMessagesPerSecond = 400
MaxTotalSizePerSecond = 9437184
MaxTotalSizePerSecond = 4194304
[Antiflood.Cache]
Size = 5000
Type = "LRU"
[Antiflood.BlackList]
ThresholdNumMessagesPerSecond = 150
ThresholdSizePerSecond = 4194304
NumFloodingRounds = 10
PeerBanDurationInSeconds = 300

[Logger]
Path = "logs"
Expand Down
86 changes: 69 additions & 17 deletions cmd/node/factory/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ import (
"github.com/ElrondNetwork/elrond-go/process/smartContract"
"github.com/ElrondNetwork/elrond-go/process/smartContract/hooks"
processSync "github.com/ElrondNetwork/elrond-go/process/sync"
antifloodThrottle "github.com/ElrondNetwork/elrond-go/process/throttle/antiflood"
processAntiflood "github.com/ElrondNetwork/elrond-go/process/throttle/antiflood"
"github.com/ElrondNetwork/elrond-go/process/transaction"
"github.com/ElrondNetwork/elrond-go/sharding"
"github.com/ElrondNetwork/elrond-go/statusHandler"
Expand All @@ -84,7 +84,7 @@ import (
"github.com/ElrondNetwork/elrond-go/storage/memorydb"
"github.com/ElrondNetwork/elrond-go/storage/storageUnit"
"github.com/ElrondNetwork/elrond-go/storage/timecache"
"github.com/ElrondNetwork/elrond-vm-common"
vmcommon "github.com/ElrondNetwork/elrond-vm-common"
"github.com/btcsuite/btcd/btcec"
libp2pCrypto "github.com/libp2p/go-libp2p-core/crypto"
"github.com/urfave/cli"
Expand Down Expand Up @@ -490,7 +490,14 @@ func NetworkComponentsFactory(p2pConfig *config.P2PConfig, mainConfig *config.Co
return nil, err
}

antifloodHandler, err := createAntifloodComponent(mainConfig, core.StatusHandler)
antifloodHandler, p2pPeerBlackList, err := createAntifloodAndBlackListComponents(mainConfig, core.StatusHandler)
if err != nil {
return nil, err
}

err = netMessenger.ApplyOptions(
libp2p.WithPeerBlackList(p2pPeerBlackList),
)
if err != nil {
return nil, err
}
Expand All @@ -501,45 +508,81 @@ func NetworkComponentsFactory(p2pConfig *config.P2PConfig, mainConfig *config.Co
}, nil
}

func createAntifloodComponent(mainConfig *config.Config, status core.AppStatusHandler) (consensus.P2PAntifloodHandler, error) {
func createAntifloodAndBlackListComponents(
mainConfig *config.Config,
status core.AppStatusHandler,
) (consensus.P2PAntifloodHandler, p2p.BlacklistHandler, error) {

cacheConfig := storageFactory.GetCacherFromConfig(mainConfig.Antiflood.Cache)
antifloodCache, err := storageUnit.NewCache(cacheConfig.Type, cacheConfig.Size, cacheConfig.Shards)
if err != nil {
return nil, err
return nil, nil, err
}

blackListCache, err := storageUnit.NewCache(cacheConfig.Type, cacheConfig.Size, cacheConfig.Shards)
if err != nil {
return nil, nil, err
}

peerMaxMessagesPerSecond := mainConfig.Antiflood.PeerMaxMessagesPerSecond
peerMaxTotalSizePerSecond := mainConfig.Antiflood.PeerMaxTotalSizePerSecond
maxMessagesPerSecond := mainConfig.Antiflood.MaxMessagesPerSecond
maxTotalSizePerSecond := mainConfig.Antiflood.MaxTotalSizePerSecond

log.Debug("started antiflood component",
"peerMaxMessagesPerSecond", peerMaxMessagesPerSecond,
"peerMaxTotalSizePerSecond", core.ConvertBytes(peerMaxTotalSizePerSecond),
"maxMessagesPerSecond", maxMessagesPerSecond,
"maxTotalSizePerSecond", core.ConvertBytes(maxTotalSizePerSecond),
)

quotaProcessor, err := p2pQuota.NewP2pQuotaProcessor(status)
if err != nil {
return nil, err
return nil, nil, err
}

floodPreventer, err := antifloodThrottle.NewQuotaFloodPreventer(
peerBanInSeconds := mainConfig.Antiflood.BlackList.PeerBanDurationInSeconds
if peerBanInSeconds == 0 {
return nil, nil, fmt.Errorf("Antiflood.BlackList.PeerBanDurationInSeconds should be greater than 0")
}

p2pPeerBlackList := timecache.NewTimeCache(time.Second * time.Duration(peerBanInSeconds))
blackListProcessor, err := processAntiflood.NewP2pBlackListProcessor(
blackListCache,
p2pPeerBlackList,
mainConfig.Antiflood.BlackList.ThresholdNumMessagesPerSecond,
mainConfig.Antiflood.BlackList.ThresholdSizePerSecond,
mainConfig.Antiflood.BlackList.NumFloodingRounds,
)
if err != nil {
return nil, nil, err
}

floodPreventer, err := processAntiflood.NewQuotaFloodPreventer(
antifloodCache,
[]antifloodThrottle.QuotaStatusHandler{quotaProcessor},
[]processAntiflood.QuotaStatusHandler{quotaProcessor, blackListProcessor},
peerMaxMessagesPerSecond,
peerMaxTotalSizePerSecond,
maxMessagesPerSecond,
maxTotalSizePerSecond,
)
if err != nil {
return nil, err
return nil, nil, err
}

log.Debug("started antiflood & blacklist components",
"peerMaxMessagesPerSecond", peerMaxMessagesPerSecond,
"peerMaxTotalSizePerSecond", core.ConvertBytes(peerMaxTotalSizePerSecond),
"maxMessagesPerSecond", maxMessagesPerSecond,
"maxTotalSizePerSecond", core.ConvertBytes(maxTotalSizePerSecond),
"peerBanDurationInSeconds", peerBanInSeconds,
"thresholdNumMessagesPerSecond", mainConfig.Antiflood.BlackList.ThresholdNumMessagesPerSecond,
"thresholdSizePerSecond", mainConfig.Antiflood.BlackList.ThresholdSizePerSecond,
"numFloodingRounds", mainConfig.Antiflood.BlackList.NumFloodingRounds,
)

startResetingFloodPreventer(floodPreventer)
startSweepingP2pPeerBlackList(p2pPeerBlackList)

p2pAntiflood, err := antiflood.NewP2pAntiflood(floodPreventer)
if err != nil {
return nil, nil, err
}

return antiflood.NewP2pAntiflood(floodPreventer)
return p2pAntiflood, p2pPeerBlackList, nil
}

func startResetingFloodPreventer(floodPreventer p2p.FloodPreventer) {
Expand All @@ -551,6 +594,15 @@ func startResetingFloodPreventer(floodPreventer p2p.FloodPreventer) {
}()
}

func startSweepingP2pPeerBlackList(p2pPeerBlackList process.BlackListHandler) {
go func() {
for {
time.Sleep(time.Second * 5)
p2pPeerBlackList.Sweep()
}
}()
}

type processComponentsFactoryArgs struct {
coreComponents *coreComponentsFactoryArgs
genesisConfig *sharding.Genesis
Expand Down
11 changes: 10 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ type Config struct {
ShardHeadersDataPool CacheConfig
MetaHeaderNoncesDataPool CacheConfig

Antiflood AntifloodConfig
Antiflood AntifloodConfig
EpochStartConfig EpochStartConfig
Logger LoggerConfig
Address AddressConfig
Expand Down Expand Up @@ -194,9 +194,18 @@ type FacadeConfig struct {
PprofEnabled bool
}

// BlackListConfig will hold the p2p peer black list threshold values
type BlackListConfig struct {
ThresholdNumMessagesPerSecond uint32
ThresholdSizePerSecond uint64
NumFloodingRounds uint32
PeerBanDurationInSeconds uint32
}

// AntifloodConfig will hold all p2p antiflood parameters
type AntifloodConfig struct {
Cache CacheConfig
BlackList BlackListConfig
PeerMaxMessagesPerSecond uint32
PeerMaxTotalSizePerSecond uint64
MaxMessagesPerSecond uint32
Expand Down

0 comments on commit aadfc74

Please sign in to comment.