Skip to content

Commit

Permalink
Merge pull request #566 from ElrondNetwork/EN-4565-Fix-problem-with-p…
Browse files Browse the repository at this point in the history
…ool-capacity

En 4565 fix problem with pool capacity
  • Loading branch information
iulianpascalau authored Oct 22, 2019
2 parents 1e43532 + 490b792 commit b933fc6
Show file tree
Hide file tree
Showing 28 changed files with 394 additions and 147 deletions.
5 changes: 5 additions & 0 deletions api/mock/cacherStub.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type CacherStub struct {
RemoveOldestCalled func()
KeysCalled func() [][]byte
LenCalled func() int
MaxSizeCalled func() int
RegisterHandlerCalled func(func(key []byte))
}

Expand Down Expand Up @@ -54,6 +55,10 @@ func (cs *CacherStub) Len() int {
return cs.LenCalled()
}

func (cs *CacherStub) MaxSize() int {
return cs.MaxSizeCalled()
}

func (cs *CacherStub) RegisterHandler(handler func(key []byte)) {
cs.RegisterHandlerCalled(handler)
}
Expand Down
5 changes: 5 additions & 0 deletions data/mock/cacherStub.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type CacherStub struct {
RemoveOldestCalled func()
KeysCalled func() [][]byte
LenCalled func() int
MaxSizeCalled func() int
RegisterHandlerCalled func(func(key []byte))
}

Expand Down Expand Up @@ -54,6 +55,10 @@ func (cs *CacherStub) Len() int {
return cs.LenCalled()
}

func (cs *CacherStub) MaxSize() int {
return cs.MaxSizeCalled()
}

func (cs *CacherStub) RegisterHandler(handler func(key []byte)) {
cs.RegisterHandlerCalled(handler)
}
Expand Down
4 changes: 4 additions & 0 deletions dataRetriever/mock/cacherMock.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ func (cm *CacherMock) Len() int {
return len(cm.dataMap)
}

func (cm *CacherMock) MaxSize() int {
return 10000
}

func (cm *CacherMock) RegisterHandler(func(key []byte)) {
panic("implement me")
}
Expand Down
5 changes: 5 additions & 0 deletions dataRetriever/mock/cacherStub.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type CacherStub struct {
RemoveOldestCalled func()
KeysCalled func() [][]byte
LenCalled func() int
MaxSizeCalled func() int
RegisterHandlerCalled func(func(key []byte))
}

Expand Down Expand Up @@ -54,6 +55,10 @@ func (cs *CacherStub) Len() int {
return cs.LenCalled()
}

func (cs *CacherStub) MaxSize() int {
return cs.MaxSizeCalled()
}

func (cs *CacherStub) RegisterHandler(handler func(key []byte)) {
cs.RegisterHandlerCalled(handler)
}
Expand Down
5 changes: 5 additions & 0 deletions integrationTests/mock/cacherStub.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type CacherStub struct {
RemoveOldestCalled func()
KeysCalled func() [][]byte
LenCalled func() int
MaxSizeCalled func() int
RegisterHandlerCalled func(func(key []byte))
}

Expand Down Expand Up @@ -54,6 +55,10 @@ func (cs *CacherStub) Len() int {
return cs.LenCalled()
}

func (cs *CacherStub) MaxSize() int {
return cs.MaxSizeCalled()
}

func (cs *CacherStub) RegisterHandler(handler func(key []byte)) {
cs.RegisterHandlerCalled(handler)
}
Expand Down
5 changes: 5 additions & 0 deletions node/mock/cacherStub.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type CacherStub struct {
RemoveOldestCalled func()
KeysCalled func() [][]byte
LenCalled func() int
MaxSizeCalled func() int
RegisterHandlerCalled func(func(key []byte))
}

Expand Down Expand Up @@ -54,6 +55,10 @@ func (cs *CacherStub) Len() int {
return cs.LenCalled()
}

func (cs *CacherStub) MaxSize() int {
return cs.MaxSizeCalled()
}

func (cs *CacherStub) RegisterHandler(handler func(key []byte)) {
cs.RegisterHandlerCalled(handler)
}
Expand Down
130 changes: 112 additions & 18 deletions process/block/baseProcess.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/ElrondNetwork/elrond-go/marshal"
"github.com/ElrondNetwork/elrond-go/process"
"github.com/ElrondNetwork/elrond-go/sharding"
"github.com/ElrondNetwork/elrond-go/storage"
)

var log = logger.DefaultLogger()
Expand All @@ -38,11 +39,12 @@ type hdrInfo struct {
}

type hdrForBlock struct {
missingHdrs uint32
missingFinalityAttestingHdrs uint32
highestHdrNonce map[uint32]uint64
mutHdrsForBlock sync.RWMutex
hdrHashAndInfo map[string]*hdrInfo
missingHdrs uint32
missingFinalityAttestingHdrs uint32
requestedFinalityAttestingHdrs map[uint32][]uint64
highestHdrNonce map[uint32]uint64
mutHdrsForBlock sync.RWMutex
hdrHashAndInfo map[string]*hdrInfo
}

type mapShardHeaders map[uint32][]data.HeaderHandler
Expand Down Expand Up @@ -405,12 +407,21 @@ func (bp *baseProcessor) setLastNotarizedHeadersSlice(startHeaders map[uint32]da
return nil
}

func (bp *baseProcessor) requestHeadersIfMissing(sortedHdrs []data.HeaderHandler, shardId uint32, maxRound uint64) error {
func (bp *baseProcessor) requestHeadersIfMissing(
sortedHdrs []data.HeaderHandler,
shardId uint32,
maxRound uint64,
cacher storage.Cacher,
) error {

allowedSize := uint64(float64(cacher.MaxSize()) * process.MaxOccupancyPercentageAllowed)

prevHdr, err := bp.getLastNotarizedHdr(shardId)
if err != nil {
return err
}

lastNotarizedHdrNonce := prevHdr.GetNonce()
highestHdr := prevHdr

missingNonces := make([]uint64, 0)
Expand Down Expand Up @@ -439,11 +450,15 @@ func (bp *baseProcessor) requestHeadersIfMissing(sortedHdrs []data.HeaderHandler
}

// ask for headers, if there most probably should be
if maxRound > highestHdr.GetRound() {
nbHeaderRequests := maxRound - highestHdr.GetRound()
startNonce := highestHdr.GetNonce() + 1
for nonce := startNonce; nonce < startNonce+nbHeaderRequests; nonce++ {
missingNonces = append(missingNonces, nonce)
roundDiff := int64(maxRound) - int64(highestHdr.GetRound())
if roundDiff > 0 {
nonceDiff := int64(lastNotarizedHdrNonce) + process.MaxHeadersToRequestInAdvance - int64(highestHdr.GetNonce())
if nonceDiff > 0 {
nbHeadersToRequestInAdvance := core.MinUint64(uint64(roundDiff), uint64(nonceDiff))
startNonce := highestHdr.GetNonce() + 1
for nonce := startNonce; nonce < startNonce+nbHeadersToRequestInAdvance; nonce++ {
missingNonces = append(missingNonces, nonce)
}
}
}

Expand All @@ -454,6 +469,11 @@ func (bp *baseProcessor) requestHeadersIfMissing(sortedHdrs []data.HeaderHandler
return process.ErrNilRequestHeaderHandlerByNonce
}

isHeaderOutOfRange := nonce > lastNotarizedHdrNonce+allowedSize
if isHeaderOutOfRange {
break
}

if requested >= process.MaxHeaderRequestsAllowed {
break
}
Expand Down Expand Up @@ -553,6 +573,7 @@ func (bp *baseProcessor) createBlockStarted() {
bp.hdrsForCurrBlock.mutHdrsForBlock.Lock()
bp.hdrsForCurrBlock.hdrHashAndInfo = make(map[string]*hdrInfo)
bp.hdrsForCurrBlock.highestHdrNonce = make(map[uint32]uint64)
bp.hdrsForCurrBlock.requestedFinalityAttestingHdrs = make(map[uint32][]uint64)
bp.hdrsForCurrBlock.mutHdrsForBlock.Unlock()
}

Expand All @@ -568,12 +589,12 @@ func (bp *baseProcessor) sortHeadersForCurrentBlockByNonce(usedInBlock bool) map
hdrsForCurrentBlock := make(map[uint32][]data.HeaderHandler)

bp.hdrsForCurrBlock.mutHdrsForBlock.RLock()
for _, hdrInfo := range bp.hdrsForCurrBlock.hdrHashAndInfo {
if hdrInfo.usedInBlock != usedInBlock {
for _, headerInfo := range bp.hdrsForCurrBlock.hdrHashAndInfo {
if headerInfo.usedInBlock != usedInBlock {
continue
}

hdrsForCurrentBlock[hdrInfo.hdr.GetShardID()] = append(hdrsForCurrentBlock[hdrInfo.hdr.GetShardID()], hdrInfo.hdr)
hdrsForCurrentBlock[headerInfo.hdr.GetShardID()] = append(hdrsForCurrentBlock[headerInfo.hdr.GetShardID()], headerInfo.hdr)
}
bp.hdrsForCurrBlock.mutHdrsForBlock.RUnlock()

Expand All @@ -590,13 +611,13 @@ func (bp *baseProcessor) sortHeaderHashesForCurrentBlockByNonce(usedInBlock bool
hdrsForCurrentBlockInfo := make(map[uint32][]*nonceAndHashInfo)

bp.hdrsForCurrBlock.mutHdrsForBlock.RLock()
for metaBlockHash, hdrInfo := range bp.hdrsForCurrBlock.hdrHashAndInfo {
if hdrInfo.usedInBlock != usedInBlock {
for metaBlockHash, headerInfo := range bp.hdrsForCurrBlock.hdrHashAndInfo {
if headerInfo.usedInBlock != usedInBlock {
continue
}

hdrsForCurrentBlockInfo[hdrInfo.hdr.GetShardID()] = append(hdrsForCurrentBlockInfo[hdrInfo.hdr.GetShardID()],
&nonceAndHashInfo{nonce: hdrInfo.hdr.GetNonce(), hash: []byte(metaBlockHash)})
hdrsForCurrentBlockInfo[headerInfo.hdr.GetShardID()] = append(hdrsForCurrentBlockInfo[headerInfo.hdr.GetShardID()],
&nonceAndHashInfo{nonce: headerInfo.hdr.GetNonce(), hash: []byte(metaBlockHash)})
}
bp.hdrsForCurrBlock.mutHdrsForBlock.RUnlock()

Expand All @@ -617,3 +638,76 @@ func (bp *baseProcessor) sortHeaderHashesForCurrentBlockByNonce(usedInBlock bool

return hdrsHashesForCurrentBlock
}

func (bp *baseProcessor) isHeaderOutOfRange(header data.HeaderHandler, cacher storage.Cacher) bool {
lastNotarizedHdr, err := bp.getLastNotarizedHdr(header.GetShardID())
if err != nil {
return false
}

allowedSize := uint64(float64(cacher.MaxSize()) * process.MaxOccupancyPercentageAllowed)
isHeaderOutOfRange := header.GetNonce() > lastNotarizedHdr.GetNonce()+allowedSize

return isHeaderOutOfRange
}

func (bp *baseProcessor) wasHeaderRequested(shardId uint32, nonce uint64) bool {
requestedNonces, ok := bp.hdrsForCurrBlock.requestedFinalityAttestingHdrs[shardId]
if !ok {
return false
}

for _, requestedNonce := range requestedNonces {
if requestedNonce == nonce {
return true
}
}

return false
}

// requestMissingFinalityAttestingHeaders requests the headers needed to accept the current selected headers for
// processing the current block. It requests the finality headers greater than the highest header, for given shard,
// related to the block which should be processed
func (bp *baseProcessor) requestMissingFinalityAttestingHeaders(
shardId uint32,
finality uint32,
getHeaderFromPoolWithNonce func(uint64, uint32) (data.HeaderHandler, []byte, error),
) uint32 {
requestedHeaders := uint32(0)
missingFinalityAttestingHeaders := uint32(0)

highestHdrNonce := bp.hdrsForCurrBlock.highestHdrNonce[shardId]
if highestHdrNonce == uint64(0) {
return missingFinalityAttestingHeaders
}

lastFinalityAttestingHeader := bp.hdrsForCurrBlock.highestHdrNonce[shardId] + uint64(finality)
for i := highestHdrNonce + 1; i <= lastFinalityAttestingHeader; i++ {
header, headerHash, err := getHeaderFromPoolWithNonce(
i,
shardId)

if err != nil {
missingFinalityAttestingHeaders++
wasHeaderRequested := bp.wasHeaderRequested(shardId, i)
if !wasHeaderRequested {
requestedHeaders++
bp.hdrsForCurrBlock.requestedFinalityAttestingHdrs[shardId] = append(bp.hdrsForCurrBlock.requestedFinalityAttestingHdrs[shardId], i)
go bp.onRequestHeaderHandlerByNonce(shardId, i)
}

continue
}

bp.hdrsForCurrBlock.hdrHashAndInfo[string(headerHash)] = &hdrInfo{hdr: header, usedInBlock: false}
}

if requestedHeaders > 0 {
log.Info(fmt.Sprintf("requested %d missing finality attesting headers for shard %d\n",
requestedHeaders,
shardId))
}

return missingFinalityAttestingHeaders
}
27 changes: 27 additions & 0 deletions process/block/baseProcess_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ func createShardedDataChacherNotifier(
LenCalled: func() int {
return 0
},
MaxSizeCalled: func() int {
return 1000
},
}
},
RemoveSetOfDataFromPoolCalled: func(keys [][]byte, id string) {},
Expand Down Expand Up @@ -124,6 +127,9 @@ func initDataPool(testHash []byte) *mock.PoolsHolderStub {
LenCalled: func() int {
return 0
},
MaxSizeCalled: func() int {
return 1000
},
PeekCalled: func(key []byte) (value interface{}, ok bool) {
if reflect.DeepEqual(key, []byte("tx1_hash")) {
return &transaction.Transaction{Nonce: 10}, true
Expand Down Expand Up @@ -154,6 +160,12 @@ func initDataPool(testHash []byte) *mock.PoolsHolderStub {
}
cs.RegisterHandlerCalled = func(i func(key []byte)) {}
cs.RemoveCalled = func(key []byte) {}
cs.LenCalled = func() int {
return 0
}
cs.MaxSizeCalled = func() int {
return 300
}
return cs
},
HeadersCalled: func() storage.Cacher {
Expand All @@ -162,6 +174,12 @@ func initDataPool(testHash []byte) *mock.PoolsHolderStub {
}
cs.RemoveCalled = func(key []byte) {
}
cs.LenCalled = func() int {
return 0
}
cs.MaxSizeCalled = func() int {
return 1000
}
return cs
},
}
Expand All @@ -184,6 +202,9 @@ func initMetaDataPool() *mock.MetaPoolsHolderStub {
LenCalled: func() int {
return 0
},
MaxSizeCalled: func() int {
return 1000
},
PeekCalled: func(key []byte) (value interface{}, ok bool) {
if reflect.DeepEqual(key, []byte("tx1_hash")) {
return &transaction.Transaction{Nonce: 10}, true
Expand All @@ -207,6 +228,9 @@ func initMetaDataPool() *mock.MetaPoolsHolderStub {
cs.LenCalled = func() int {
return 0
}
cs.MaxSizeCalled = func() int {
return 300
}
cs.RemoveCalled = func(key []byte) {}
cs.KeysCalled = func() [][]byte {
return nil
Expand All @@ -226,6 +250,9 @@ func initMetaDataPool() *mock.MetaPoolsHolderStub {
cs.LenCalled = func() int {
return 0
}
cs.MaxSizeCalled = func() int {
return 1000
}
cs.RemoveCalled = func(key []byte) {}
cs.KeysCalled = func() [][]byte {
return nil
Expand Down
Loading

0 comments on commit b933fc6

Please sign in to comment.