diff --git a/api/mock/cacherStub.go b/api/mock/cacherStub.go index 57df4d4c423..a35caeff349 100644 --- a/api/mock/cacherStub.go +++ b/api/mock/cacherStub.go @@ -11,6 +11,7 @@ type CacherStub struct { RemoveOldestCalled func() KeysCalled func() [][]byte LenCalled func() int + MaxSizeCalled func() int RegisterHandlerCalled func(func(key []byte)) } @@ -54,6 +55,10 @@ func (cs *CacherStub) Len() int { return cs.LenCalled() } +func (cs *CacherStub) MaxSize() int { + return cs.MaxSizeCalled() +} + func (cs *CacherStub) RegisterHandler(handler func(key []byte)) { cs.RegisterHandlerCalled(handler) } diff --git a/data/mock/cacherStub.go b/data/mock/cacherStub.go index 57df4d4c423..a35caeff349 100644 --- a/data/mock/cacherStub.go +++ b/data/mock/cacherStub.go @@ -11,6 +11,7 @@ type CacherStub struct { RemoveOldestCalled func() KeysCalled func() [][]byte LenCalled func() int + MaxSizeCalled func() int RegisterHandlerCalled func(func(key []byte)) } @@ -54,6 +55,10 @@ func (cs *CacherStub) Len() int { return cs.LenCalled() } +func (cs *CacherStub) MaxSize() int { + return cs.MaxSizeCalled() +} + func (cs *CacherStub) RegisterHandler(handler func(key []byte)) { cs.RegisterHandlerCalled(handler) } diff --git a/dataRetriever/mock/cacherMock.go b/dataRetriever/mock/cacherMock.go index 554b2a24446..c6aaf7ba7ef 100644 --- a/dataRetriever/mock/cacherMock.go +++ b/dataRetriever/mock/cacherMock.go @@ -90,6 +90,10 @@ func (cm *CacherMock) Len() int { return len(cm.dataMap) } +func (cm *CacherMock) MaxSize() int { + return 10000 +} + func (cm *CacherMock) RegisterHandler(func(key []byte)) { panic("implement me") } diff --git a/dataRetriever/mock/cacherStub.go b/dataRetriever/mock/cacherStub.go index 57df4d4c423..a35caeff349 100644 --- a/dataRetriever/mock/cacherStub.go +++ b/dataRetriever/mock/cacherStub.go @@ -11,6 +11,7 @@ type CacherStub struct { RemoveOldestCalled func() KeysCalled func() [][]byte LenCalled func() int + MaxSizeCalled func() int RegisterHandlerCalled func(func(key []byte)) } @@ -54,6 +55,10 @@ func (cs *CacherStub) Len() int { return cs.LenCalled() } +func (cs *CacherStub) MaxSize() int { + return cs.MaxSizeCalled() +} + func (cs *CacherStub) RegisterHandler(handler func(key []byte)) { cs.RegisterHandlerCalled(handler) } diff --git a/integrationTests/mock/cacherStub.go b/integrationTests/mock/cacherStub.go index 57df4d4c423..a35caeff349 100644 --- a/integrationTests/mock/cacherStub.go +++ b/integrationTests/mock/cacherStub.go @@ -11,6 +11,7 @@ type CacherStub struct { RemoveOldestCalled func() KeysCalled func() [][]byte LenCalled func() int + MaxSizeCalled func() int RegisterHandlerCalled func(func(key []byte)) } @@ -54,6 +55,10 @@ func (cs *CacherStub) Len() int { return cs.LenCalled() } +func (cs *CacherStub) MaxSize() int { + return cs.MaxSizeCalled() +} + func (cs *CacherStub) RegisterHandler(handler func(key []byte)) { cs.RegisterHandlerCalled(handler) } diff --git a/node/mock/cacherStub.go b/node/mock/cacherStub.go index 57df4d4c423..a35caeff349 100644 --- a/node/mock/cacherStub.go +++ b/node/mock/cacherStub.go @@ -11,6 +11,7 @@ type CacherStub struct { RemoveOldestCalled func() KeysCalled func() [][]byte LenCalled func() int + MaxSizeCalled func() int RegisterHandlerCalled func(func(key []byte)) } @@ -54,6 +55,10 @@ func (cs *CacherStub) Len() int { return cs.LenCalled() } +func (cs *CacherStub) MaxSize() int { + return cs.MaxSizeCalled() +} + func (cs *CacherStub) RegisterHandler(handler func(key []byte)) { cs.RegisterHandlerCalled(handler) } diff --git a/process/block/baseProcess.go b/process/block/baseProcess.go index 35332f2f535..60dfa838e6b 100644 --- a/process/block/baseProcess.go +++ b/process/block/baseProcess.go @@ -18,6 +18,7 @@ import ( "github.com/ElrondNetwork/elrond-go/marshal" "github.com/ElrondNetwork/elrond-go/process" "github.com/ElrondNetwork/elrond-go/sharding" + "github.com/ElrondNetwork/elrond-go/storage" ) var log = logger.DefaultLogger() @@ -38,11 +39,12 @@ type hdrInfo struct { } type hdrForBlock struct { - missingHdrs uint32 - missingFinalityAttestingHdrs uint32 - highestHdrNonce map[uint32]uint64 - mutHdrsForBlock sync.RWMutex - hdrHashAndInfo map[string]*hdrInfo + missingHdrs uint32 + missingFinalityAttestingHdrs uint32 + requestedFinalityAttestingHdrs map[uint32][]uint64 + highestHdrNonce map[uint32]uint64 + mutHdrsForBlock sync.RWMutex + hdrHashAndInfo map[string]*hdrInfo } type mapShardHeaders map[uint32][]data.HeaderHandler @@ -405,12 +407,21 @@ func (bp *baseProcessor) setLastNotarizedHeadersSlice(startHeaders map[uint32]da return nil } -func (bp *baseProcessor) requestHeadersIfMissing(sortedHdrs []data.HeaderHandler, shardId uint32, maxRound uint64) error { +func (bp *baseProcessor) requestHeadersIfMissing( + sortedHdrs []data.HeaderHandler, + shardId uint32, + maxRound uint64, + cacher storage.Cacher, +) error { + + allowedSize := uint64(float64(cacher.MaxSize()) * process.MaxOccupancyPercentageAllowed) + prevHdr, err := bp.getLastNotarizedHdr(shardId) if err != nil { return err } + lastNotarizedHdrNonce := prevHdr.GetNonce() highestHdr := prevHdr missingNonces := make([]uint64, 0) @@ -439,11 +450,15 @@ func (bp *baseProcessor) requestHeadersIfMissing(sortedHdrs []data.HeaderHandler } // ask for headers, if there most probably should be - if maxRound > highestHdr.GetRound() { - nbHeaderRequests := maxRound - highestHdr.GetRound() - startNonce := highestHdr.GetNonce() + 1 - for nonce := startNonce; nonce < startNonce+nbHeaderRequests; nonce++ { - missingNonces = append(missingNonces, nonce) + roundDiff := int64(maxRound) - int64(highestHdr.GetRound()) + if roundDiff > 0 { + nonceDiff := int64(lastNotarizedHdrNonce) + process.MaxHeadersToRequestInAdvance - int64(highestHdr.GetNonce()) + if nonceDiff > 0 { + nbHeadersToRequestInAdvance := core.MinUint64(uint64(roundDiff), uint64(nonceDiff)) + startNonce := highestHdr.GetNonce() + 1 + for nonce := startNonce; nonce < startNonce+nbHeadersToRequestInAdvance; nonce++ { + missingNonces = append(missingNonces, nonce) + } } } @@ -454,6 +469,11 @@ func (bp *baseProcessor) requestHeadersIfMissing(sortedHdrs []data.HeaderHandler return process.ErrNilRequestHeaderHandlerByNonce } + isHeaderOutOfRange := nonce > lastNotarizedHdrNonce+allowedSize + if isHeaderOutOfRange { + break + } + if requested >= process.MaxHeaderRequestsAllowed { break } @@ -553,6 +573,7 @@ func (bp *baseProcessor) createBlockStarted() { bp.hdrsForCurrBlock.mutHdrsForBlock.Lock() bp.hdrsForCurrBlock.hdrHashAndInfo = make(map[string]*hdrInfo) bp.hdrsForCurrBlock.highestHdrNonce = make(map[uint32]uint64) + bp.hdrsForCurrBlock.requestedFinalityAttestingHdrs = make(map[uint32][]uint64) bp.hdrsForCurrBlock.mutHdrsForBlock.Unlock() } @@ -568,12 +589,12 @@ func (bp *baseProcessor) sortHeadersForCurrentBlockByNonce(usedInBlock bool) map hdrsForCurrentBlock := make(map[uint32][]data.HeaderHandler) bp.hdrsForCurrBlock.mutHdrsForBlock.RLock() - for _, hdrInfo := range bp.hdrsForCurrBlock.hdrHashAndInfo { - if hdrInfo.usedInBlock != usedInBlock { + for _, headerInfo := range bp.hdrsForCurrBlock.hdrHashAndInfo { + if headerInfo.usedInBlock != usedInBlock { continue } - hdrsForCurrentBlock[hdrInfo.hdr.GetShardID()] = append(hdrsForCurrentBlock[hdrInfo.hdr.GetShardID()], hdrInfo.hdr) + hdrsForCurrentBlock[headerInfo.hdr.GetShardID()] = append(hdrsForCurrentBlock[headerInfo.hdr.GetShardID()], headerInfo.hdr) } bp.hdrsForCurrBlock.mutHdrsForBlock.RUnlock() @@ -590,13 +611,13 @@ func (bp *baseProcessor) sortHeaderHashesForCurrentBlockByNonce(usedInBlock bool hdrsForCurrentBlockInfo := make(map[uint32][]*nonceAndHashInfo) bp.hdrsForCurrBlock.mutHdrsForBlock.RLock() - for metaBlockHash, hdrInfo := range bp.hdrsForCurrBlock.hdrHashAndInfo { - if hdrInfo.usedInBlock != usedInBlock { + for metaBlockHash, headerInfo := range bp.hdrsForCurrBlock.hdrHashAndInfo { + if headerInfo.usedInBlock != usedInBlock { continue } - hdrsForCurrentBlockInfo[hdrInfo.hdr.GetShardID()] = append(hdrsForCurrentBlockInfo[hdrInfo.hdr.GetShardID()], - &nonceAndHashInfo{nonce: hdrInfo.hdr.GetNonce(), hash: []byte(metaBlockHash)}) + hdrsForCurrentBlockInfo[headerInfo.hdr.GetShardID()] = append(hdrsForCurrentBlockInfo[headerInfo.hdr.GetShardID()], + &nonceAndHashInfo{nonce: headerInfo.hdr.GetNonce(), hash: []byte(metaBlockHash)}) } bp.hdrsForCurrBlock.mutHdrsForBlock.RUnlock() @@ -617,3 +638,76 @@ func (bp *baseProcessor) sortHeaderHashesForCurrentBlockByNonce(usedInBlock bool return hdrsHashesForCurrentBlock } + +func (bp *baseProcessor) isHeaderOutOfRange(header data.HeaderHandler, cacher storage.Cacher) bool { + lastNotarizedHdr, err := bp.getLastNotarizedHdr(header.GetShardID()) + if err != nil { + return false + } + + allowedSize := uint64(float64(cacher.MaxSize()) * process.MaxOccupancyPercentageAllowed) + isHeaderOutOfRange := header.GetNonce() > lastNotarizedHdr.GetNonce()+allowedSize + + return isHeaderOutOfRange +} + +func (bp *baseProcessor) wasHeaderRequested(shardId uint32, nonce uint64) bool { + requestedNonces, ok := bp.hdrsForCurrBlock.requestedFinalityAttestingHdrs[shardId] + if !ok { + return false + } + + for _, requestedNonce := range requestedNonces { + if requestedNonce == nonce { + return true + } + } + + return false +} + +// requestMissingFinalityAttestingHeaders requests the headers needed to accept the current selected headers for +// processing the current block. It requests the finality headers greater than the highest header, for given shard, +// related to the block which should be processed +func (bp *baseProcessor) requestMissingFinalityAttestingHeaders( + shardId uint32, + finality uint32, + getHeaderFromPoolWithNonce func(uint64, uint32) (data.HeaderHandler, []byte, error), +) uint32 { + requestedHeaders := uint32(0) + missingFinalityAttestingHeaders := uint32(0) + + highestHdrNonce := bp.hdrsForCurrBlock.highestHdrNonce[shardId] + if highestHdrNonce == uint64(0) { + return missingFinalityAttestingHeaders + } + + lastFinalityAttestingHeader := bp.hdrsForCurrBlock.highestHdrNonce[shardId] + uint64(finality) + for i := highestHdrNonce + 1; i <= lastFinalityAttestingHeader; i++ { + header, headerHash, err := getHeaderFromPoolWithNonce( + i, + shardId) + + if err != nil { + missingFinalityAttestingHeaders++ + wasHeaderRequested := bp.wasHeaderRequested(shardId, i) + if !wasHeaderRequested { + requestedHeaders++ + bp.hdrsForCurrBlock.requestedFinalityAttestingHdrs[shardId] = append(bp.hdrsForCurrBlock.requestedFinalityAttestingHdrs[shardId], i) + go bp.onRequestHeaderHandlerByNonce(shardId, i) + } + + continue + } + + bp.hdrsForCurrBlock.hdrHashAndInfo[string(headerHash)] = &hdrInfo{hdr: header, usedInBlock: false} + } + + if requestedHeaders > 0 { + log.Info(fmt.Sprintf("requested %d missing finality attesting headers for shard %d\n", + requestedHeaders, + shardId)) + } + + return missingFinalityAttestingHeaders +} diff --git a/process/block/baseProcess_test.go b/process/block/baseProcess_test.go index 600ff02a799..86a3d60dbba 100644 --- a/process/block/baseProcess_test.go +++ b/process/block/baseProcess_test.go @@ -70,6 +70,9 @@ func createShardedDataChacherNotifier( LenCalled: func() int { return 0 }, + MaxSizeCalled: func() int { + return 1000 + }, } }, RemoveSetOfDataFromPoolCalled: func(keys [][]byte, id string) {}, @@ -124,6 +127,9 @@ func initDataPool(testHash []byte) *mock.PoolsHolderStub { LenCalled: func() int { return 0 }, + MaxSizeCalled: func() int { + return 1000 + }, PeekCalled: func(key []byte) (value interface{}, ok bool) { if reflect.DeepEqual(key, []byte("tx1_hash")) { return &transaction.Transaction{Nonce: 10}, true @@ -154,6 +160,12 @@ func initDataPool(testHash []byte) *mock.PoolsHolderStub { } cs.RegisterHandlerCalled = func(i func(key []byte)) {} cs.RemoveCalled = func(key []byte) {} + cs.LenCalled = func() int { + return 0 + } + cs.MaxSizeCalled = func() int { + return 300 + } return cs }, HeadersCalled: func() storage.Cacher { @@ -162,6 +174,12 @@ func initDataPool(testHash []byte) *mock.PoolsHolderStub { } cs.RemoveCalled = func(key []byte) { } + cs.LenCalled = func() int { + return 0 + } + cs.MaxSizeCalled = func() int { + return 1000 + } return cs }, } @@ -184,6 +202,9 @@ func initMetaDataPool() *mock.MetaPoolsHolderStub { LenCalled: func() int { return 0 }, + MaxSizeCalled: func() int { + return 1000 + }, PeekCalled: func(key []byte) (value interface{}, ok bool) { if reflect.DeepEqual(key, []byte("tx1_hash")) { return &transaction.Transaction{Nonce: 10}, true @@ -207,6 +228,9 @@ func initMetaDataPool() *mock.MetaPoolsHolderStub { cs.LenCalled = func() int { return 0 } + cs.MaxSizeCalled = func() int { + return 300 + } cs.RemoveCalled = func(key []byte) {} cs.KeysCalled = func() [][]byte { return nil @@ -226,6 +250,9 @@ func initMetaDataPool() *mock.MetaPoolsHolderStub { cs.LenCalled = func() int { return 0 } + cs.MaxSizeCalled = func() int { + return 1000 + } cs.RemoveCalled = func(key []byte) {} cs.KeysCalled = func() [][]byte { return nil diff --git a/process/block/export_test.go b/process/block/export_test.go index 6da9619fab4..a40673da7ea 100644 --- a/process/block/export_test.go +++ b/process/block/export_test.go @@ -107,6 +107,10 @@ func (mp *metaProcessor) AddHdrHashToRequestedList(hdr *block.Header, hdrHash [] mp.hdrsForCurrBlock.highestHdrNonce = make(map[uint32]uint64, mp.shardCoordinator.NumberOfShards()) } + if mp.hdrsForCurrBlock.requestedFinalityAttestingHdrs == nil { + mp.hdrsForCurrBlock.requestedFinalityAttestingHdrs = make(map[uint32][]uint64, mp.shardCoordinator.NumberOfShards()) + } + mp.hdrsForCurrBlock.hdrHashAndInfo[string(hdrHash)] = &hdrInfo{hdr: hdr, usedInBlock: true} mp.hdrsForCurrBlock.missingHdrs++ } @@ -131,11 +135,11 @@ func (mp *metaProcessor) ProcessBlockHeaders(header *block.MetaBlock, round uint return mp.processBlockHeaders(header, round, haveTime) } -func (mp *metaProcessor) RequestMissingFinalityAttestingHeaders() uint32 { +func (mp *metaProcessor) RequestMissingFinalityAttestingShardHeaders() uint32 { mp.hdrsForCurrBlock.mutHdrsForBlock.Lock() defer mp.hdrsForCurrBlock.mutHdrsForBlock.Unlock() - return mp.requestMissingFinalityAttestingHeaders() + return mp.requestMissingFinalityAttestingShardHeaders() } func (bp *baseProcessor) NotarizedHdrs() map[uint32][]data.HeaderHandler { @@ -228,7 +232,10 @@ func (sp *shardProcessor) RequestMissingFinalityAttestingHeaders() uint32 { sp.hdrsForCurrBlock.mutHdrsForBlock.Lock() defer sp.hdrsForCurrBlock.mutHdrsForBlock.Unlock() - return sp.requestMissingFinalityAttestingHeaders() + return sp.requestMissingFinalityAttestingHeaders( + sharding.MetachainShardId, + sp.metaBlockFinality, + sp.getMetaHeaderFromPoolWithNonce) } func (sp *shardProcessor) CheckMetaHeadersValidityAndFinality() error { diff --git a/process/block/metablock.go b/process/block/metablock.go index 8f7f34edb67..a8418ee0dec 100644 --- a/process/block/metablock.go +++ b/process/block/metablock.go @@ -21,8 +21,8 @@ import ( // metaProcessor implements metaProcessor interface and actually it tries to execute block type metaProcessor struct { *baseProcessor - core serviceContainer.Core - dataPool dataRetriever.MetaPoolsHolder + core serviceContainer.Core + dataPool dataRetriever.MetaPoolsHolder //TODO: add txCoordinator process.TransactionCoordinator shardsHeadersNonce *sync.Map @@ -80,6 +80,7 @@ func NewMetaProcessor(arguments ArgMetaProcessor) (*metaProcessor, error) { mp.hdrsForCurrBlock.hdrHashAndInfo = make(map[string]*hdrInfo) mp.hdrsForCurrBlock.highestHdrNonce = make(map[uint32]uint64) + mp.hdrsForCurrBlock.requestedFinalityAttestingHdrs = make(map[uint32][]uint64) headerPool := mp.dataPool.ShardHeaders() headerPool.RegisterHandler(mp.receivedShardHeader) @@ -222,7 +223,7 @@ func (mp *metaProcessor) checkAndRequestIfShardHeadersMissing(round uint64) { sortedHdrs[j] = sortedHdrPerShard[i][j] } - err := mp.requestHeadersIfMissing(sortedHdrs, i, round) + err := mp.requestHeadersIfMissing(sortedHdrs, i, round, mp.dataPool.ShardHeaders()) if err != nil { log.Debug(err.Error()) continue @@ -275,13 +276,13 @@ func (mp *metaProcessor) removeBlockInfoFromPool(header *block.MetaBlock) error mp.hdrsForCurrBlock.mutHdrsForBlock.RLock() for i := 0; i < len(header.ShardInfo); i++ { shardHeaderHash := header.ShardInfo[i].HeaderHash - hdrInfo, ok := mp.hdrsForCurrBlock.hdrHashAndInfo[string(shardHeaderHash)] + headerInfo, ok := mp.hdrsForCurrBlock.hdrHashAndInfo[string(shardHeaderHash)] if !ok { mp.hdrsForCurrBlock.mutHdrsForBlock.RUnlock() return process.ErrMissingHeader } - shardBlock, ok := hdrInfo.hdr.(*block.Header) + shardBlock, ok := headerInfo.hdr.(*block.Header) if !ok { mp.hdrsForCurrBlock.mutHdrsForBlock.RUnlock() return process.ErrWrongTypeAssertion @@ -469,13 +470,13 @@ func (mp *metaProcessor) CommitBlock( mp.hdrsForCurrBlock.mutHdrsForBlock.RLock() for i := 0; i < len(header.ShardInfo); i++ { shardHeaderHash := header.ShardInfo[i].HeaderHash - hdrInfo, ok := mp.hdrsForCurrBlock.hdrHashAndInfo[string(shardHeaderHash)] + headerInfo, ok := mp.hdrsForCurrBlock.hdrHashAndInfo[string(shardHeaderHash)] if !ok { mp.hdrsForCurrBlock.mutHdrsForBlock.RUnlock() return process.ErrMissingHeader } - shardBlock, ok := hdrInfo.hdr.(*block.Header) + shardBlock, ok := headerInfo.hdr.(*block.Header) if !ok { mp.hdrsForCurrBlock.mutHdrsForBlock.RUnlock() return process.ErrWrongTypeAssertion @@ -562,6 +563,13 @@ func (mp *metaProcessor) CommitBlock( mp.blockSizeThrottler.Succeed(header.Round) + log.Info(fmt.Sprintf("pools info: MetaBlocks = %d from %d, ShardHeaders = %d from %d\n", + mp.dataPool.MetaBlocks().Len(), + mp.dataPool.MetaBlocks().MaxSize(), + mp.dataPool.ShardHeaders().Len(), + mp.dataPool.ShardHeaders().MaxSize(), + )) + return nil } @@ -618,13 +626,13 @@ func (mp *metaProcessor) saveLastNotarizedHeader(header *block.MetaBlock) error mp.hdrsForCurrBlock.mutHdrsForBlock.RLock() for i := 0; i < len(header.ShardInfo); i++ { shardHeaderHash := header.ShardInfo[i].HeaderHash - hdrInfo, ok := mp.hdrsForCurrBlock.hdrHashAndInfo[string(shardHeaderHash)] + headerInfo, ok := mp.hdrsForCurrBlock.hdrHashAndInfo[string(shardHeaderHash)] if !ok { mp.hdrsForCurrBlock.mutHdrsForBlock.RUnlock() return process.ErrMissingHeader } - shardHdr, ok := hdrInfo.hdr.(*block.Header) + shardHdr, ok := headerInfo.hdr.(*block.Header) if !ok { mp.hdrsForCurrBlock.mutHdrsForBlock.RUnlock() return process.ErrWrongTypeAssertion @@ -806,12 +814,9 @@ func (mp *metaProcessor) receivedShardHeader(shardHeaderHash []byte) { } if mp.hdrsForCurrBlock.missingHdrs == 0 { - missingFinalityAttestingShardHdrs := mp.hdrsForCurrBlock.missingFinalityAttestingHdrs - mp.hdrsForCurrBlock.missingFinalityAttestingHdrs = mp.requestMissingFinalityAttestingHeaders() + mp.hdrsForCurrBlock.missingFinalityAttestingHdrs = mp.requestMissingFinalityAttestingShardHeaders() if mp.hdrsForCurrBlock.missingFinalityAttestingHdrs == 0 { - log.Info(fmt.Sprintf("received %d missing finality attesting shard headers\n", missingFinalityAttestingShardHdrs)) - } else { - log.Info(fmt.Sprintf("requested %d missing finality attesting shard headers\n", mp.hdrsForCurrBlock.missingFinalityAttestingHdrs)) + log.Info(fmt.Sprintf("received all missing finality attesting shard headers\n")) } } @@ -827,6 +832,11 @@ func (mp *metaProcessor) receivedShardHeader(shardHeaderHash []byte) { mp.hdrsForCurrBlock.mutHdrsForBlock.Unlock() } + if mp.isHeaderOutOfRange(shardHeader, shardHeaderPool) { + shardHeaderPool.Remove(shardHeaderHash) + return + } + // request miniblocks for which metachain is destination for _, mb := range shardHeader.MiniBlockHeaders { if mb.ReceiverShardID == mp.shardCoordinator.SelfId() { @@ -835,36 +845,22 @@ func (mp *metaProcessor) receivedShardHeader(shardHeaderHash []byte) { } } -// requestMissingFinalityAttestingHeaders requests the headers needed to accept the current selected headers for processing the -// current block. It requests the shardBlockFinality headers greater than the highest shard header, for each shard, related -// to the block which should be processed -func (mp *metaProcessor) requestMissingFinalityAttestingHeaders() uint32 { - requestedBlockHeaders := uint32(0) - for shardId := uint32(0); shardId < mp.shardCoordinator.NumberOfShards(); shardId++ { - highestHdrNonce := mp.hdrsForCurrBlock.highestHdrNonce[shardId] - if highestHdrNonce == uint64(0) { - continue - } +// requestMissingFinalityAttestingShardHeaders requests the headers needed to accept the current selected headers for +// processing the current block. It requests the shardBlockFinality headers greater than the highest shard header, +// for each shard, related to the block which should be processed +func (mp *metaProcessor) requestMissingFinalityAttestingShardHeaders() uint32 { + missingFinalityAttestingShardHeaders := uint32(0) - lastFinalityAttestingHeader := mp.hdrsForCurrBlock.highestHdrNonce[shardId] + uint64(mp.shardBlockFinality) - for i := highestHdrNonce + 1; i <= lastFinalityAttestingHeader; i++ { - shardHeader, shardHeaderHash, err := process.GetShardHeaderFromPoolWithNonce( - i, - shardId, - mp.dataPool.ShardHeaders(), - mp.dataPool.HeadersNonces()) - - if err != nil { - requestedBlockHeaders++ - go mp.onRequestHeaderHandlerByNonce(shardId, i) - continue - } + for shardId := uint32(0); shardId < mp.shardCoordinator.NumberOfShards(); shardId++ { + missingFinalityAttestingHeaders := mp.requestMissingFinalityAttestingHeaders( + shardId, + mp.shardBlockFinality, + mp.getShardHeaderFromPoolWithNonce) - mp.hdrsForCurrBlock.hdrHashAndInfo[string(shardHeaderHash)] = &hdrInfo{hdr: shardHeader, usedInBlock: false} - } + missingFinalityAttestingShardHeaders += missingFinalityAttestingHeaders } - return requestedBlockHeaders + return missingFinalityAttestingShardHeaders } func (mp *metaProcessor) requestShardHeaders(metaBlock *block.MetaBlock) (uint32, uint32) { @@ -885,7 +881,7 @@ func (mp *metaProcessor) requestShardHeaders(metaBlock *block.MetaBlock) (uint32 } if mp.hdrsForCurrBlock.missingHdrs == 0 { - mp.hdrsForCurrBlock.missingFinalityAttestingHdrs = mp.requestMissingFinalityAttestingHeaders() + mp.hdrsForCurrBlock.missingFinalityAttestingHdrs = mp.requestMissingFinalityAttestingShardHeaders() } requestedHdrs := mp.hdrsForCurrBlock.missingHdrs @@ -1285,3 +1281,17 @@ func (mp *metaProcessor) IsInterfaceNil() bool { } return false } + +func (mp *metaProcessor) getShardHeaderFromPoolWithNonce( + nonce uint64, + shardId uint32, +) (data.HeaderHandler, []byte, error) { + + shardHeader, shardHeaderHash, err := process.GetShardHeaderFromPoolWithNonce( + nonce, + shardId, + mp.dataPool.ShardHeaders(), + mp.dataPool.HeadersNonces()) + + return shardHeader, shardHeaderHash, err +} diff --git a/process/block/metablock_test.go b/process/block/metablock_test.go index 4b1eb1a5e74..a410517ecb7 100644 --- a/process/block/metablock_test.go +++ b/process/block/metablock_test.go @@ -488,7 +488,7 @@ func TestMetaProcessor_RequestFinalMissingHeaderShouldPass(t *testing.T) { mp.SetHighestHdrNonceForCurrentBlock(0, 1) mp.SetHighestHdrNonceForCurrentBlock(1, 2) mp.SetHighestHdrNonceForCurrentBlock(2, 3) - res := mp.RequestMissingFinalityAttestingHeaders() + res := mp.RequestMissingFinalityAttestingShardHeaders() assert.Equal(t, res, uint32(3)) } @@ -707,6 +707,9 @@ func TestMetaProcessor_CommitBlockOkValsShouldWork(t *testing.T) { cs.LenCalled = func() int { return 0 } + cs.MaxSizeCalled = func() int { + return 1000 + } return cs } diff --git a/process/block/preprocess/rewardTxPreProcessor.go b/process/block/preprocess/rewardTxPreProcessor.go index a9a7b07e421..8dbe8f5a062 100644 --- a/process/block/preprocess/rewardTxPreProcessor.go +++ b/process/block/preprocess/rewardTxPreProcessor.go @@ -106,12 +106,13 @@ func (rtp *rewardTxPreprocessor) waitForRewardTxHashes(waitTime time.Duration) e // IsDataPrepared returns non error if all the requested reward transactions arrived and were saved into the pool func (rtp *rewardTxPreprocessor) IsDataPrepared(requestedRewardTxs int, haveTime func() time.Duration) error { if requestedRewardTxs > 0 { - log.Info(fmt.Sprintf("requested %d missing reward Txs\n", requestedRewardTxs)) + log.Info(fmt.Sprintf("requested %d missing reward txs\n", requestedRewardTxs)) err := rtp.waitForRewardTxHashes(haveTime()) rtp.rewardTxsForBlock.mutTxsForBlock.RLock() missingRewardTxs := rtp.rewardTxsForBlock.missingTxs + rtp.rewardTxsForBlock.missingTxs = 0 rtp.rewardTxsForBlock.mutTxsForBlock.RUnlock() - log.Info(fmt.Sprintf("received %d missing reward Txs\n", requestedRewardTxs-missingRewardTxs)) + log.Info(fmt.Sprintf("received %d missing reward txs\n", requestedRewardTxs-missingRewardTxs)) if err != nil { return err } @@ -286,7 +287,10 @@ func (rtp *rewardTxPreprocessor) receivedRewardTransaction(txHash []byte) { // CreateBlockStarted cleans the local cache map for processed/created reward transactions at this round func (rtp *rewardTxPreprocessor) CreateBlockStarted() { + _ = process.EmptyChannel(rtp.chReceivedAllRewardTxs) + rtp.rewardTxsForBlock.mutTxsForBlock.Lock() + rtp.rewardTxsForBlock.missingTxs = 0 rtp.rewardTxsForBlock.txHashAndInfo = make(map[string]*txInfo) rtp.rewardTxsForBlock.mutTxsForBlock.Unlock() } @@ -370,24 +374,30 @@ func (rtp *rewardTxPreprocessor) processRewardTransaction( } // RequestTransactionsForMiniBlock requests missing reward transactions for a certain miniblock -func (rtp *rewardTxPreprocessor) RequestTransactionsForMiniBlock(mb block.MiniBlock) int { - missingRewardTxsForMiniBlock := rtp.computeMissingRewardTxsForMiniBlock(mb) - rtp.onRequestRewardTx(mb.SenderShardID, missingRewardTxsForMiniBlock) +func (rtp *rewardTxPreprocessor) RequestTransactionsForMiniBlock(miniBlock *block.MiniBlock) int { + if miniBlock == nil { + return 0 + } + + missingRewardTxsForMiniBlock := rtp.computeMissingRewardTxsForMiniBlock(miniBlock) + if len(missingRewardTxsForMiniBlock) > 0 { + rtp.onRequestRewardTx(miniBlock.SenderShardID, missingRewardTxsForMiniBlock) + } return len(missingRewardTxsForMiniBlock) } // computeMissingRewardTxsForMiniBlock computes missing reward transactions for a certain miniblock -func (rtp *rewardTxPreprocessor) computeMissingRewardTxsForMiniBlock(mb block.MiniBlock) [][]byte { +func (rtp *rewardTxPreprocessor) computeMissingRewardTxsForMiniBlock(miniBlock *block.MiniBlock) [][]byte { missingRewardTxs := make([][]byte, 0) - if mb.Type != block.RewardsBlock { + if miniBlock.Type != block.RewardsBlock { return missingRewardTxs } - for _, txHash := range mb.TxHashes { + for _, txHash := range miniBlock.TxHashes { tx, _ := process.GetTransactionHandlerFromPool( - mb.SenderShardID, - mb.ReceiverShardID, + miniBlock.SenderShardID, + miniBlock.ReceiverShardID, txHash, rtp.rewardTxPool, ) diff --git a/process/block/preprocess/rewardTxPreProcessor_test.go b/process/block/preprocess/rewardTxPreProcessor_test.go index 226b432cc05..70bb0d0f61d 100644 --- a/process/block/preprocess/rewardTxPreProcessor_test.go +++ b/process/block/preprocess/rewardTxPreProcessor_test.go @@ -454,7 +454,7 @@ func TestRewardTxPreprocessor_RequestTransactionsForMiniBlockShouldWork(t *testi ) txHashes := [][]byte{[]byte(txHash)} - mb1 := block.MiniBlock{ + mb1 := &block.MiniBlock{ TxHashes: txHashes, ReceiverShardID: 1, SenderShardID: 0, diff --git a/process/block/preprocess/smartContractResults.go b/process/block/preprocess/smartContractResults.go index 469f04fe575..65f29ff3418 100644 --- a/process/block/preprocess/smartContractResults.go +++ b/process/block/preprocess/smartContractResults.go @@ -106,6 +106,7 @@ func (scr *smartContractResults) IsDataPrepared(requestedScrs int, haveTime func err := scr.waitForScrHashes(haveTime()) scr.scrForBlock.mutTxsForBlock.RLock() missingScrs := scr.scrForBlock.missingTxs + scr.scrForBlock.missingTxs = 0 scr.scrForBlock.mutTxsForBlock.RUnlock() log.Info(fmt.Sprintf("received %d missing scr\n", requestedScrs-missingScrs)) if err != nil { @@ -305,7 +306,7 @@ func (scr *smartContractResults) setMissingSCResultsForShard(senderShardID uint3 // computeMissingAndExistingSCResultsForShards calculates what smartContractResults are available and what are missing from block.Body func (scr *smartContractResults) computeMissingAndExistingSCResultsForShards(body block.Body) map[uint32][]*txsHashesInfo { - onlyScrFromOthersBody := block.Body{} + scrTxs := block.Body{} for _, mb := range body { if mb.Type != block.SmartContractResultBlock { continue @@ -314,10 +315,15 @@ func (scr *smartContractResults) computeMissingAndExistingSCResultsForShards(bod continue } - onlyScrFromOthersBody = append(onlyScrFromOthersBody, mb) + scrTxs = append(scrTxs, mb) } - missingTxsForShard := scr.computeExistingAndMissing(onlyScrFromOthersBody, &scr.scrForBlock, scr.chRcvAllScrs, block.SmartContractResultBlock, scr.scrPool) + missingTxsForShard := scr.computeExistingAndMissing( + scrTxs, + &scr.scrForBlock, + scr.chRcvAllScrs, + block.SmartContractResultBlock, + scr.scrPool) return missingTxsForShard } @@ -345,24 +351,30 @@ func (scr *smartContractResults) processSmartContractResult( } // RequestTransactionsForMiniBlock requests missing smartContractResults for a certain miniblock -func (scr *smartContractResults) RequestTransactionsForMiniBlock(mb block.MiniBlock) int { - missingScrsForMiniBlock := scr.computeMissingScrsForMiniBlock(mb) - scr.onRequestSmartContractResult(mb.SenderShardID, missingScrsForMiniBlock) +func (scr *smartContractResults) RequestTransactionsForMiniBlock(miniBlock *block.MiniBlock) int { + if miniBlock == nil { + return 0 + } + + missingScrsForMiniBlock := scr.computeMissingScrsForMiniBlock(miniBlock) + if len(missingScrsForMiniBlock) > 0 { + scr.onRequestSmartContractResult(miniBlock.SenderShardID, missingScrsForMiniBlock) + } return len(missingScrsForMiniBlock) } // computeMissingScrsForMiniBlock computes missing smartContractResults for a certain miniblock -func (scr *smartContractResults) computeMissingScrsForMiniBlock(mb block.MiniBlock) [][]byte { +func (scr *smartContractResults) computeMissingScrsForMiniBlock(miniBlock *block.MiniBlock) [][]byte { missingSmartContractResults := make([][]byte, 0) - if mb.Type != block.SmartContractResultBlock { + if miniBlock.Type != block.SmartContractResultBlock { return missingSmartContractResults } - for _, txHash := range mb.TxHashes { + for _, txHash := range miniBlock.TxHashes { tx, _ := process.GetTransactionHandlerFromPool( - mb.SenderShardID, - mb.ReceiverShardID, + miniBlock.SenderShardID, + miniBlock.ReceiverShardID, txHash, scr.scrPool) diff --git a/process/block/preprocess/smartContractResults_test.go b/process/block/preprocess/smartContractResults_test.go index 22851d4d989..f85c5aa1a33 100644 --- a/process/block/preprocess/smartContractResults_test.go +++ b/process/block/preprocess/smartContractResults_test.go @@ -290,7 +290,7 @@ func TestScrsPreprocessor_RequestBlockTransactionFromMiniBlockFromNetwork(t *tes txHashes := make([][]byte, 0) txHashes = append(txHashes, txHash1) txHashes = append(txHashes, txHash2) - mb := block.MiniBlock{ReceiverShardID: shardId, TxHashes: txHashes, Type: block.SmartContractResultBlock} + mb := &block.MiniBlock{ReceiverShardID: shardId, TxHashes: txHashes, Type: block.SmartContractResultBlock} txsRequested := txs.RequestTransactionsForMiniBlock(mb) diff --git a/process/block/preprocess/transactions.go b/process/block/preprocess/transactions.go index dcfd418598e..2462b5b7786 100644 --- a/process/block/preprocess/transactions.go +++ b/process/block/preprocess/transactions.go @@ -313,7 +313,12 @@ func (txs *transactions) setMissingTxsForShard(senderShardID uint32, mbTxHashes // computeMissingAndExistingTxsForShards calculates what transactions are available and what are missing from block.Body func (txs *transactions) computeMissingAndExistingTxsForShards(body block.Body) map[uint32][]*txsHashesInfo { - missingTxsForShard := txs.computeExistingAndMissing(body, &txs.txsForCurrBlock, txs.chRcvAllTxs, block.TxBlock, txs.txPool) + missingTxsForShard := txs.computeExistingAndMissing( + body, + &txs.txsForCurrBlock, + txs.chRcvAllTxs, + block.TxBlock, + txs.txPool) return missingTxsForShard } @@ -347,24 +352,30 @@ func (txs *transactions) processAndRemoveBadTransaction( } // RequestTransactionsForMiniBlock requests missing transactions for a certain miniblock -func (txs *transactions) RequestTransactionsForMiniBlock(mb block.MiniBlock) int { - missingTxsForMiniBlock := txs.computeMissingTxsForMiniBlock(mb) - txs.onRequestTransaction(mb.SenderShardID, missingTxsForMiniBlock) +func (txs *transactions) RequestTransactionsForMiniBlock(miniBlock *block.MiniBlock) int { + if miniBlock == nil { + return 0 + } + + missingTxsForMiniBlock := txs.computeMissingTxsForMiniBlock(miniBlock) + if len(missingTxsForMiniBlock) > 0 { + txs.onRequestTransaction(miniBlock.SenderShardID, missingTxsForMiniBlock) + } return len(missingTxsForMiniBlock) } // computeMissingTxsForMiniBlock computes missing transactions for a certain miniblock -func (txs *transactions) computeMissingTxsForMiniBlock(mb block.MiniBlock) [][]byte { - if mb.Type != block.TxBlock { +func (txs *transactions) computeMissingTxsForMiniBlock(miniBlock *block.MiniBlock) [][]byte { + if miniBlock.Type != block.TxBlock { return nil } missingTransactions := make([][]byte, 0) - for _, txHash := range mb.TxHashes { + for _, txHash := range miniBlock.TxHashes { tx, _ := process.GetTransactionHandlerFromPool( - mb.SenderShardID, - mb.ReceiverShardID, + miniBlock.SenderShardID, + miniBlock.ReceiverShardID, txHash, txs.txPool) diff --git a/process/block/preprocess/transactions_test.go b/process/block/preprocess/transactions_test.go index 8f17d899762..74ab91b93c7 100644 --- a/process/block/preprocess/transactions_test.go +++ b/process/block/preprocess/transactions_test.go @@ -420,7 +420,7 @@ func TestTransactionPreprocessor_RequestBlockTransactionFromMiniBlockFromNetwork txHashes := make([][]byte, 0) txHashes = append(txHashes, txHash1) txHashes = append(txHashes, txHash2) - mb := block.MiniBlock{ReceiverShardID: shardId, TxHashes: txHashes} + mb := &block.MiniBlock{ReceiverShardID: shardId, TxHashes: txHashes} txsRequested := txs.RequestTransactionsForMiniBlock(mb) assert.Equal(t, 2, txsRequested) } diff --git a/process/block/shardblock.go b/process/block/shardblock.go index 957848475e1..8a8231b60a8 100644 --- a/process/block/shardblock.go +++ b/process/block/shardblock.go @@ -93,6 +93,7 @@ func NewShardProcessor(arguments ArgShardProcessor) (*shardProcessor, error) { sp.hdrsForCurrBlock.hdrHashAndInfo = make(map[string]*hdrInfo) sp.hdrsForCurrBlock.highestHdrNonce = make(map[uint32]uint64) + sp.hdrsForCurrBlock.requestedFinalityAttestingHdrs = make(map[uint32][]uint64) sp.processedMiniBlocks = make(map[string]map[string]struct{}) metaBlockPool := sp.dataPool.MetaBlocks() @@ -405,11 +406,25 @@ func (sp *shardProcessor) checkAndRequestIfMetaHeadersMissing(round uint64) { sortedHdrs = append(sortedHdrs, hdr) } - err = sp.requestHeadersIfMissing(sortedHdrs, sharding.MetachainShardId, round) + err = sp.requestHeadersIfMissing(sortedHdrs, sharding.MetachainShardId, round, sp.dataPool.MetaBlocks()) if err != nil { log.Info(err.Error()) } + lastNotarizedHdr, err := sp.getLastNotarizedHdr(sharding.MetachainShardId) + if err != nil { + log.Info(err.Error()) + } + + for i := 0; i < len(sortedHdrs); i++ { + isMetaBlockOutOfRange := sortedHdrs[i].GetNonce() > lastNotarizedHdr.GetNonce()+process.MaxHeadersToRequestInAdvance + if isMetaBlockOutOfRange { + break + } + + sp.txCoordinator.RequestMiniBlocks(sortedHdrs[i]) + } + return } @@ -728,6 +743,15 @@ func (sp *shardProcessor) CommitBlock( sp.blockSizeThrottler.Succeed(header.Round) + log.Info(fmt.Sprintf("pools info: Headers = %d from %d, MetaBlocks = %d from %d, MiniBlocks = %d from %d\n", + sp.dataPool.Headers().Len(), + sp.dataPool.Headers().MaxSize(), + sp.dataPool.MetaBlocks().Len(), + sp.dataPool.MetaBlocks().MaxSize(), + sp.dataPool.MiniBlocks().Len(), + sp.dataPool.MiniBlocks().MaxSize(), + )) + return nil } @@ -840,13 +864,13 @@ func (sp *shardProcessor) addProcessedCrossMiniBlocksFromHeader(header *block.He sp.hdrsForCurrBlock.mutHdrsForBlock.RLock() for _, metaBlockHash := range header.MetaBlockHashes { - hdrInfo, ok := sp.hdrsForCurrBlock.hdrHashAndInfo[string(metaBlockHash)] + headerInfo, ok := sp.hdrsForCurrBlock.hdrHashAndInfo[string(metaBlockHash)] if !ok { sp.hdrsForCurrBlock.mutHdrsForBlock.RUnlock() return process.ErrMissingHeader } - metaBlock, ok := hdrInfo.hdr.(*block.MetaBlock) + metaBlock, ok := headerInfo.hdr.(*block.MetaBlock) if !ok { sp.hdrsForCurrBlock.mutHdrsForBlock.RUnlock() return process.ErrWrongTypeAssertion @@ -903,12 +927,12 @@ func (sp *shardProcessor) getOrderedProcessedMetaBlocksFromMiniBlockHashes( processedCrossMiniBlocksHashes := make(map[string]bool) sp.hdrsForCurrBlock.mutHdrsForBlock.RLock() - for metaBlockHash, hdrInfo := range sp.hdrsForCurrBlock.hdrHashAndInfo { - if !hdrInfo.usedInBlock { + for metaBlockHash, headerInfo := range sp.hdrsForCurrBlock.hdrHashAndInfo { + if !headerInfo.usedInBlock { continue } - metaBlock, ok := hdrInfo.hdr.(*block.MetaBlock) + metaBlock, ok := headerInfo.hdr.(*block.MetaBlock) if !ok { sp.hdrsForCurrBlock.mutHdrsForBlock.RUnlock() return nil, process.ErrWrongTypeAssertion @@ -1049,12 +1073,12 @@ func (sp *shardProcessor) receivedMetaBlock(metaBlockHash []byte) { // attesting something if sp.hdrsForCurrBlock.missingHdrs == 0 { - missingFinalityAttestingMetaHdrs := sp.hdrsForCurrBlock.missingFinalityAttestingHdrs - sp.hdrsForCurrBlock.missingFinalityAttestingHdrs = sp.requestMissingFinalityAttestingHeaders() + sp.hdrsForCurrBlock.missingFinalityAttestingHdrs = sp.requestMissingFinalityAttestingHeaders( + sharding.MetachainShardId, + sp.metaBlockFinality, + sp.getMetaHeaderFromPoolWithNonce) if sp.hdrsForCurrBlock.missingFinalityAttestingHdrs == 0 { - log.Info(fmt.Sprintf("received %d missing finality attesting meta headers\n", missingFinalityAttestingMetaHdrs)) - } else { - log.Info(fmt.Sprintf("requested %d missing finality attesting meta headers\n", sp.hdrsForCurrBlock.missingFinalityAttestingHdrs)) + log.Info(fmt.Sprintf("received all missing finality attesting meta headers\n")) } } @@ -1070,6 +1094,11 @@ func (sp *shardProcessor) receivedMetaBlock(metaBlockHash []byte) { sp.hdrsForCurrBlock.mutHdrsForBlock.Unlock() } + if sp.isHeaderOutOfRange(metaBlock, metaBlockPool) { + metaBlockPool.Remove(metaBlockHash) + return + } + lastNotarizedHdr, err := sp.getLastNotarizedHdr(sharding.MetachainShardId) if err != nil { return @@ -1089,35 +1118,6 @@ func (sp *shardProcessor) receivedMetaBlock(metaBlockHash []byte) { sp.txCoordinator.RequestMiniBlocks(metaBlock) } -// requestMissingFinalityAttestingHeaders requests the headers needed to accept the current selected headers for processing the -// current block. It requests the metaBlockFinality headers greater than the highest meta header related to the block -// which should be processed -func (sp *shardProcessor) requestMissingFinalityAttestingHeaders() uint32 { - requestedBlockHeaders := uint32(0) - highestHdrNonce := sp.hdrsForCurrBlock.highestHdrNonce[sharding.MetachainShardId] - if highestHdrNonce == uint64(0) { - return requestedBlockHeaders - } - - lastFinalityAttestingHeader := sp.hdrsForCurrBlock.highestHdrNonce[sharding.MetachainShardId] + uint64(sp.metaBlockFinality) - for i := highestHdrNonce + 1; i <= lastFinalityAttestingHeader; i++ { - metaBlock, metaBlockHash, err := process.GetMetaHeaderFromPoolWithNonce( - i, - sp.dataPool.MetaBlocks(), - sp.dataPool.HeadersNonces()) - - if err != nil { - requestedBlockHeaders++ - go sp.onRequestHeaderHandlerByNonce(sharding.MetachainShardId, i) - continue - } - - sp.hdrsForCurrBlock.hdrHashAndInfo[string(metaBlockHash)] = &hdrInfo{hdr: metaBlock, usedInBlock: false} - } - - return requestedBlockHeaders -} - func (sp *shardProcessor) requestMetaHeaders(shardHeader *block.Header) (uint32, uint32) { _ = process.EmptyChannel(sp.chRcvAllMetaHdrs) @@ -1134,7 +1134,10 @@ func (sp *shardProcessor) requestMetaHeaders(shardHeader *block.Header) (uint32, } if sp.hdrsForCurrBlock.missingHdrs == 0 { - sp.hdrsForCurrBlock.missingFinalityAttestingHdrs = sp.requestMissingFinalityAttestingHeaders() + sp.hdrsForCurrBlock.missingFinalityAttestingHdrs = sp.requestMissingFinalityAttestingHeaders( + sharding.MetachainShardId, + sp.metaBlockFinality, + sp.getMetaHeaderFromPoolWithNonce) } requestedHdrs := sp.hdrsForCurrBlock.missingHdrs @@ -1196,11 +1199,11 @@ func (sp *shardProcessor) getAllMiniBlockDstMeFromMeta(header *block.Header) (ma sp.hdrsForCurrBlock.mutHdrsForBlock.RLock() for _, metaBlockHash := range header.MetaBlockHashes { - hdrInfo, ok := sp.hdrsForCurrBlock.hdrHashAndInfo[string(metaBlockHash)] + headerInfo, ok := sp.hdrsForCurrBlock.hdrHashAndInfo[string(metaBlockHash)] if !ok { continue } - metaBlock, ok := hdrInfo.hdr.(*block.MetaBlock) + metaBlock, ok := headerInfo.hdr.(*block.MetaBlock) if !ok { continue } @@ -1682,3 +1685,16 @@ func (sp *shardProcessor) getMaxMiniBlocksSpaceRemained( return maxMbSpaceRemained } + +func (sp *shardProcessor) getMetaHeaderFromPoolWithNonce( + nonce uint64, + shardId uint32, +) (data.HeaderHandler, []byte, error) { + + metaHeader, metaHeaderHash, err := process.GetMetaHeaderFromPoolWithNonce( + nonce, + sp.dataPool.MetaBlocks(), + sp.dataPool.HeadersNonces()) + + return metaHeader, metaHeaderHash, err +} diff --git a/process/constants.go b/process/constants.go index 789eef4beaf..2b8c5912b10 100644 --- a/process/constants.go +++ b/process/constants.go @@ -46,3 +46,7 @@ const MaxHeadersToRequestInAdvance = 10 // RoundModulusTrigger defines a round modulus on which a trigger for an action will be released const RoundModulusTrigger = 5 + +// MaxOccupancyPercentageAllowed defines the maximum occupancy percentage allowed to be used, +// from the full pool capacity, for the received data which are not needed in the near future +const MaxOccupancyPercentageAllowed = float64(0.9) diff --git a/process/coordinator/process.go b/process/coordinator/process.go index 9002696b0d2..2162a7d07a1 100644 --- a/process/coordinator/process.go +++ b/process/coordinator/process.go @@ -417,7 +417,7 @@ func (tc *transactionCoordinator) CreateMbsAndProcessCrossShardTransactionsDstMe return miniBlocks, nrTxAdded, false } - requestedTxs := preproc.RequestTransactionsForMiniBlock(*miniBlock) + requestedTxs := preproc.RequestTransactionsForMiniBlock(miniBlock) if requestedTxs > 0 { continue } @@ -667,7 +667,7 @@ func (tc *transactionCoordinator) receivedMiniBlock(miniBlockHash []byte) { return } - miniBlock, ok := val.(block.MiniBlock) + miniBlock, ok := val.(*block.MiniBlock) if !ok { return } diff --git a/process/coordinator/process_test.go b/process/coordinator/process_test.go index 8ab45760274..96d940d7237 100644 --- a/process/coordinator/process_test.go +++ b/process/coordinator/process_test.go @@ -1104,7 +1104,7 @@ func TestTransactionCoordinator_receivedMiniBlockRequestTxs(t *testing.T) { senderShardId := uint32(1) receiverShardId := uint32(2) - miniBlock := block.MiniBlock{ + miniBlock := &block.MiniBlock{ SenderShardID: senderShardId, ReceiverShardID: receiverShardId, TxHashes: [][]byte{txHash1, txHash2, txHash3}, diff --git a/process/interface.go b/process/interface.go index 8d3b7eb45b8..92ab0d4d819 100644 --- a/process/interface.go +++ b/process/interface.go @@ -154,7 +154,7 @@ type PreProcessor interface { CreateMarshalizedData(txHashes [][]byte) ([][]byte, error) - RequestTransactionsForMiniBlock(mb block.MiniBlock) int + RequestTransactionsForMiniBlock(miniBlock *block.MiniBlock) int ProcessMiniBlock(miniBlock *block.MiniBlock, haveTime func() bool, round uint64) error CreateAndProcessMiniBlock(sndShardId, dstShardId uint32, spaceRemained int, haveTime func() bool, round uint64) (*block.MiniBlock, error) CreateAndProcessMiniBlocks(maxTxSpaceRemained uint32, maxMbSpaceRemained uint32, round uint64, haveTime func() bool) (block.MiniBlockSlice, error) diff --git a/process/mock/cacherMock.go b/process/mock/cacherMock.go index 554b2a24446..c6aaf7ba7ef 100644 --- a/process/mock/cacherMock.go +++ b/process/mock/cacherMock.go @@ -90,6 +90,10 @@ func (cm *CacherMock) Len() int { return len(cm.dataMap) } +func (cm *CacherMock) MaxSize() int { + return 10000 +} + func (cm *CacherMock) RegisterHandler(func(key []byte)) { panic("implement me") } diff --git a/process/mock/cacherStub.go b/process/mock/cacherStub.go index 57df4d4c423..a35caeff349 100644 --- a/process/mock/cacherStub.go +++ b/process/mock/cacherStub.go @@ -11,6 +11,7 @@ type CacherStub struct { RemoveOldestCalled func() KeysCalled func() [][]byte LenCalled func() int + MaxSizeCalled func() int RegisterHandlerCalled func(func(key []byte)) } @@ -54,6 +55,10 @@ func (cs *CacherStub) Len() int { return cs.LenCalled() } +func (cs *CacherStub) MaxSize() int { + return cs.MaxSizeCalled() +} + func (cs *CacherStub) RegisterHandler(handler func(key []byte)) { cs.RegisterHandlerCalled(handler) } diff --git a/process/mock/preprocessorMock.go b/process/mock/preprocessorMock.go index ab03b54b001..dc31884fe20 100644 --- a/process/mock/preprocessorMock.go +++ b/process/mock/preprocessorMock.go @@ -17,7 +17,7 @@ type PreProcessorMock struct { ProcessBlockTransactionsCalled func(body block.Body, round uint64, haveTime func() bool) error RequestBlockTransactionsCalled func(body block.Body) int CreateMarshalizedDataCalled func(txHashes [][]byte) ([][]byte, error) - RequestTransactionsForMiniBlockCalled func(mb block.MiniBlock) int + RequestTransactionsForMiniBlockCalled func(miniBlock *block.MiniBlock) int ProcessMiniBlockCalled func(miniBlock *block.MiniBlock, haveTime func() bool, round uint64) error CreateAndProcessMiniBlocksCalled func(maxTxSpaceRemained uint32, maxMbSpaceRemained uint32, round uint64, haveTime func() bool) (block.MiniBlockSlice, error) CreateAndProcessMiniBlockCalled func(sndShardId, dstShardId uint32, spaceRemained int, haveTime func() bool, round uint64) (*block.MiniBlock, error) @@ -80,11 +80,11 @@ func (ppm *PreProcessorMock) CreateMarshalizedData(txHashes [][]byte) ([][]byte, return ppm.CreateMarshalizedDataCalled(txHashes) } -func (ppm *PreProcessorMock) RequestTransactionsForMiniBlock(mb block.MiniBlock) int { +func (ppm *PreProcessorMock) RequestTransactionsForMiniBlock(miniBlock *block.MiniBlock) int { if ppm.RequestTransactionsForMiniBlockCalled == nil { return 0 } - return ppm.RequestTransactionsForMiniBlockCalled(mb) + return ppm.RequestTransactionsForMiniBlockCalled(miniBlock) } func (ppm *PreProcessorMock) ProcessMiniBlock(miniBlock *block.MiniBlock, haveTime func() bool, round uint64) error { diff --git a/storage/fifocache/fifocacheSharded.go b/storage/fifocache/fifocacheSharded.go index fa55be4ba35..a06e8f926b6 100644 --- a/storage/fifocache/fifocacheSharded.go +++ b/storage/fifocache/fifocacheSharded.go @@ -11,7 +11,9 @@ var log = logger.DefaultLogger() // FIFOShardedCache implements a First In First Out eviction cache type FIFOShardedCache struct { - cache *cmap.ConcurrentMap + cache *cmap.ConcurrentMap + maxsize int + mutAddedDataHandlers sync.RWMutex addedDataHandlers []func(key []byte) } @@ -21,6 +23,7 @@ func NewShardedCache(size int, shards int) (*FIFOShardedCache, error) { cache := cmap.New(size, shards) fifoShardedCache := &FIFOShardedCache{ cache: cache, + maxsize: size, mutAddedDataHandlers: sync.RWMutex{}, addedDataHandlers: make([]func(key []byte), 0), } @@ -122,6 +125,11 @@ func (c *FIFOShardedCache) Len() int { return c.cache.Count() } +// MaxSize returns the maximum number of items which can be stored in cache. +func (c *FIFOShardedCache) MaxSize() int { + return c.maxsize +} + // IsInterfaceNil returns true if there is no value under the interface func (c *FIFOShardedCache) IsInterfaceNil() bool { if c == nil { diff --git a/storage/interface.go b/storage/interface.go index 4fb8069d1b2..9ea9b7d48de 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -58,6 +58,8 @@ type Cacher interface { Keys() [][]byte // Len returns the number of items in the cache. Len() int + // MaxSize returns the maximum number of items which can be stored in the cache. + MaxSize() int // RegisterHandler registers a new handler to be called when a new data is added RegisterHandler(func(key []byte)) // IsInterfaceNil returns true if there is no value under the interface diff --git a/storage/lrucache/lrucache.go b/storage/lrucache/lrucache.go index c18a1e6ca81..9b4c394ee08 100644 --- a/storage/lrucache/lrucache.go +++ b/storage/lrucache/lrucache.go @@ -137,6 +137,11 @@ func (c *LRUCache) Len() int { return c.cache.Len() } +// MaxSize returns the maximum number of items which can be stored in cache. +func (c *LRUCache) MaxSize() int { + return c.maxsize +} + // IsInterfaceNil returns true if there is no value under the interface func (c *LRUCache) IsInterfaceNil() bool { if c == nil {