From 27ab3f853d3661cae2e02fae75408e9956076080 Mon Sep 17 00:00:00 2001 From: Sebastian Marian Date: Sat, 19 Oct 2019 11:30:44 +0300 Subject: [PATCH 1/8] * Fixed problem with pool capacity --- api/mock/cacherStub.go | 5 +++++ data/mock/cacherStub.go | 5 +++++ dataRetriever/mock/cacherMock.go | 9 ++++++++- dataRetriever/mock/cacherStub.go | 5 +++++ integrationTests/mock/cacherStub.go | 5 +++++ node/mock/cacherStub.go | 5 +++++ process/block/baseProcess.go | 29 ++++++++++++++++++++++++++- process/block/baseProcess_test.go | 6 ++++++ process/block/metablock.go | 11 +++++++--- process/block/shardblock.go | 8 +++++++- process/constants.go | 4 ++++ process/mock/cacherMock.go | 9 ++++++++- process/mock/cacherStub.go | 5 +++++ storage/fifocache/fifocacheSharded.go | 10 ++++++++- storage/interface.go | 2 ++ storage/lrucache/lrucache.go | 5 +++++ 16 files changed, 115 insertions(+), 8 deletions(-) diff --git a/api/mock/cacherStub.go b/api/mock/cacherStub.go index 57df4d4c423..a35caeff349 100644 --- a/api/mock/cacherStub.go +++ b/api/mock/cacherStub.go @@ -11,6 +11,7 @@ type CacherStub struct { RemoveOldestCalled func() KeysCalled func() [][]byte LenCalled func() int + MaxSizeCalled func() int RegisterHandlerCalled func(func(key []byte)) } @@ -54,6 +55,10 @@ func (cs *CacherStub) Len() int { return cs.LenCalled() } +func (cs *CacherStub) MaxSize() int { + return cs.MaxSizeCalled() +} + func (cs *CacherStub) RegisterHandler(handler func(key []byte)) { cs.RegisterHandlerCalled(handler) } diff --git a/data/mock/cacherStub.go b/data/mock/cacherStub.go index 57df4d4c423..a35caeff349 100644 --- a/data/mock/cacherStub.go +++ b/data/mock/cacherStub.go @@ -11,6 +11,7 @@ type CacherStub struct { RemoveOldestCalled func() KeysCalled func() [][]byte LenCalled func() int + MaxSizeCalled func() int RegisterHandlerCalled func(func(key []byte)) } @@ -54,6 +55,10 @@ func (cs *CacherStub) Len() int { return cs.LenCalled() } +func (cs *CacherStub) MaxSize() int { + return cs.MaxSizeCalled() +} + func (cs *CacherStub) RegisterHandler(handler func(key []byte)) { cs.RegisterHandlerCalled(handler) } diff --git a/dataRetriever/mock/cacherMock.go b/dataRetriever/mock/cacherMock.go index 554b2a24446..506571d2ddd 100644 --- a/dataRetriever/mock/cacherMock.go +++ b/dataRetriever/mock/cacherMock.go @@ -1,6 +1,9 @@ package mock -import "sync" +import ( + "math" + "sync" +) type CacherMock struct { mut sync.Mutex @@ -90,6 +93,10 @@ func (cm *CacherMock) Len() int { return len(cm.dataMap) } +func (cm *CacherMock) MaxSize() int { + return math.MaxInt32 +} + func (cm *CacherMock) RegisterHandler(func(key []byte)) { panic("implement me") } diff --git a/dataRetriever/mock/cacherStub.go b/dataRetriever/mock/cacherStub.go index 57df4d4c423..a35caeff349 100644 --- a/dataRetriever/mock/cacherStub.go +++ b/dataRetriever/mock/cacherStub.go @@ -11,6 +11,7 @@ type CacherStub struct { RemoveOldestCalled func() KeysCalled func() [][]byte LenCalled func() int + MaxSizeCalled func() int RegisterHandlerCalled func(func(key []byte)) } @@ -54,6 +55,10 @@ func (cs *CacherStub) Len() int { return cs.LenCalled() } +func (cs *CacherStub) MaxSize() int { + return cs.MaxSizeCalled() +} + func (cs *CacherStub) RegisterHandler(handler func(key []byte)) { cs.RegisterHandlerCalled(handler) } diff --git a/integrationTests/mock/cacherStub.go b/integrationTests/mock/cacherStub.go index 57df4d4c423..a35caeff349 100644 --- a/integrationTests/mock/cacherStub.go +++ b/integrationTests/mock/cacherStub.go @@ -11,6 +11,7 @@ type CacherStub struct { RemoveOldestCalled func() KeysCalled func() [][]byte LenCalled func() int + MaxSizeCalled func() int RegisterHandlerCalled func(func(key []byte)) } @@ -54,6 +55,10 @@ func (cs *CacherStub) Len() int { return cs.LenCalled() } +func (cs *CacherStub) MaxSize() int { + return cs.MaxSizeCalled() +} + func (cs *CacherStub) RegisterHandler(handler func(key []byte)) { cs.RegisterHandlerCalled(handler) } diff --git a/node/mock/cacherStub.go b/node/mock/cacherStub.go index 57df4d4c423..a35caeff349 100644 --- a/node/mock/cacherStub.go +++ b/node/mock/cacherStub.go @@ -11,6 +11,7 @@ type CacherStub struct { RemoveOldestCalled func() KeysCalled func() [][]byte LenCalled func() int + MaxSizeCalled func() int RegisterHandlerCalled func(func(key []byte)) } @@ -54,6 +55,10 @@ func (cs *CacherStub) Len() int { return cs.LenCalled() } +func (cs *CacherStub) MaxSize() int { + return cs.MaxSizeCalled() +} + func (cs *CacherStub) RegisterHandler(handler func(key []byte)) { cs.RegisterHandlerCalled(handler) } diff --git a/process/block/baseProcess.go b/process/block/baseProcess.go index 35332f2f535..f6851df7af7 100644 --- a/process/block/baseProcess.go +++ b/process/block/baseProcess.go @@ -18,6 +18,7 @@ import ( "github.com/ElrondNetwork/elrond-go/marshal" "github.com/ElrondNetwork/elrond-go/process" "github.com/ElrondNetwork/elrond-go/sharding" + "github.com/ElrondNetwork/elrond-go/storage" ) var log = logger.DefaultLogger() @@ -405,12 +406,21 @@ func (bp *baseProcessor) setLastNotarizedHeadersSlice(startHeaders map[uint32]da return nil } -func (bp *baseProcessor) requestHeadersIfMissing(sortedHdrs []data.HeaderHandler, shardId uint32, maxRound uint64) error { +func (bp *baseProcessor) requestHeadersIfMissing( + sortedHdrs []data.HeaderHandler, + shardId uint32, + maxRound uint64, + cacher storage.Cacher, +) error { + + allowedSize := uint64(float64(cacher.MaxSize()) * process.MaxOccupancyPercentageAllowed) + prevHdr, err := bp.getLastNotarizedHdr(shardId) if err != nil { return err } + lastNotarizedHdrNonce := prevHdr.GetNonce() highestHdr := prevHdr missingNonces := make([]uint64, 0) @@ -454,6 +464,11 @@ func (bp *baseProcessor) requestHeadersIfMissing(sortedHdrs []data.HeaderHandler return process.ErrNilRequestHeaderHandlerByNonce } + isHeaderOutOfRange := nonce > lastNotarizedHdrNonce+allowedSize + if isHeaderOutOfRange { + break + } + if requested >= process.MaxHeaderRequestsAllowed { break } @@ -617,3 +632,15 @@ func (bp *baseProcessor) sortHeaderHashesForCurrentBlockByNonce(usedInBlock bool return hdrsHashesForCurrentBlock } + +func (bp *baseProcessor) isHeaderOutOfRange(header data.HeaderHandler, cacher storage.Cacher) bool { + lastNotarizedHdr, err := bp.getLastNotarizedHdr(header.GetShardID()) + if err != nil { + return false + } + + allowedSize := uint64(float64(cacher.MaxSize()) * process.MaxOccupancyPercentageAllowed) + isHeaderOutOfRange := header.GetNonce() > lastNotarizedHdr.GetNonce()+allowedSize + + return isHeaderOutOfRange +} diff --git a/process/block/baseProcess_test.go b/process/block/baseProcess_test.go index 600ff02a799..1754e337ee3 100644 --- a/process/block/baseProcess_test.go +++ b/process/block/baseProcess_test.go @@ -132,6 +132,9 @@ func initDataPool(testHash []byte) *mock.PoolsHolderStub { }, RegisterHandlerCalled: func(i func(key []byte)) {}, RemoveCalled: func(key []byte) {}, + MaxSizeCalled: func() int { + return 10000 + }, } }, MiniBlocksCalled: func() storage.Cacher { @@ -230,6 +233,9 @@ func initMetaDataPool() *mock.MetaPoolsHolderStub { cs.KeysCalled = func() [][]byte { return nil } + cs.MaxSizeCalled = func() int { + return 10000 + } return cs }, HeadersNoncesCalled: func() dataRetriever.Uint64SyncMapCacher { diff --git a/process/block/metablock.go b/process/block/metablock.go index 8f7f34edb67..109cfbdb5e6 100644 --- a/process/block/metablock.go +++ b/process/block/metablock.go @@ -21,8 +21,8 @@ import ( // metaProcessor implements metaProcessor interface and actually it tries to execute block type metaProcessor struct { *baseProcessor - core serviceContainer.Core - dataPool dataRetriever.MetaPoolsHolder + core serviceContainer.Core + dataPool dataRetriever.MetaPoolsHolder //TODO: add txCoordinator process.TransactionCoordinator shardsHeadersNonce *sync.Map @@ -222,7 +222,7 @@ func (mp *metaProcessor) checkAndRequestIfShardHeadersMissing(round uint64) { sortedHdrs[j] = sortedHdrPerShard[i][j] } - err := mp.requestHeadersIfMissing(sortedHdrs, i, round) + err := mp.requestHeadersIfMissing(sortedHdrs, i, round, mp.dataPool.ShardHeaders()) if err != nil { log.Debug(err.Error()) continue @@ -827,6 +827,11 @@ func (mp *metaProcessor) receivedShardHeader(shardHeaderHash []byte) { mp.hdrsForCurrBlock.mutHdrsForBlock.Unlock() } + if mp.isHeaderOutOfRange(shardHeader, shardHeaderPool) { + shardHeaderPool.Remove(shardHeaderHash) + return + } + // request miniblocks for which metachain is destination for _, mb := range shardHeader.MiniBlockHeaders { if mb.ReceiverShardID == mp.shardCoordinator.SelfId() { diff --git a/process/block/shardblock.go b/process/block/shardblock.go index 957848475e1..fbe03e99b2e 100644 --- a/process/block/shardblock.go +++ b/process/block/shardblock.go @@ -405,7 +405,7 @@ func (sp *shardProcessor) checkAndRequestIfMetaHeadersMissing(round uint64) { sortedHdrs = append(sortedHdrs, hdr) } - err = sp.requestHeadersIfMissing(sortedHdrs, sharding.MetachainShardId, round) + err = sp.requestHeadersIfMissing(sortedHdrs, sharding.MetachainShardId, round, sp.dataPool.MetaBlocks()) if err != nil { log.Info(err.Error()) } @@ -1070,10 +1070,16 @@ func (sp *shardProcessor) receivedMetaBlock(metaBlockHash []byte) { sp.hdrsForCurrBlock.mutHdrsForBlock.Unlock() } + if sp.isHeaderOutOfRange(metaBlock, metaBlockPool) { + metaBlockPool.Remove(metaBlockHash) + return + } + lastNotarizedHdr, err := sp.getLastNotarizedHdr(sharding.MetachainShardId) if err != nil { return } + if metaBlock.GetNonce() <= lastNotarizedHdr.GetNonce() { return } diff --git a/process/constants.go b/process/constants.go index 789eef4beaf..197d22ef272 100644 --- a/process/constants.go +++ b/process/constants.go @@ -46,3 +46,7 @@ const MaxHeadersToRequestInAdvance = 10 // RoundModulusTrigger defines a round modulus on which a trigger for an action will be released const RoundModulusTrigger = 5 + +// MaxOccupancyPercentageAllowed defines the maximum occupancy percentage allowed to be used from the full pool capacity, for the +// received data which are not needed in the near future +const MaxOccupancyPercentageAllowed = float64(0.9) diff --git a/process/mock/cacherMock.go b/process/mock/cacherMock.go index 554b2a24446..506571d2ddd 100644 --- a/process/mock/cacherMock.go +++ b/process/mock/cacherMock.go @@ -1,6 +1,9 @@ package mock -import "sync" +import ( + "math" + "sync" +) type CacherMock struct { mut sync.Mutex @@ -90,6 +93,10 @@ func (cm *CacherMock) Len() int { return len(cm.dataMap) } +func (cm *CacherMock) MaxSize() int { + return math.MaxInt32 +} + func (cm *CacherMock) RegisterHandler(func(key []byte)) { panic("implement me") } diff --git a/process/mock/cacherStub.go b/process/mock/cacherStub.go index 57df4d4c423..a35caeff349 100644 --- a/process/mock/cacherStub.go +++ b/process/mock/cacherStub.go @@ -11,6 +11,7 @@ type CacherStub struct { RemoveOldestCalled func() KeysCalled func() [][]byte LenCalled func() int + MaxSizeCalled func() int RegisterHandlerCalled func(func(key []byte)) } @@ -54,6 +55,10 @@ func (cs *CacherStub) Len() int { return cs.LenCalled() } +func (cs *CacherStub) MaxSize() int { + return cs.MaxSizeCalled() +} + func (cs *CacherStub) RegisterHandler(handler func(key []byte)) { cs.RegisterHandlerCalled(handler) } diff --git a/storage/fifocache/fifocacheSharded.go b/storage/fifocache/fifocacheSharded.go index fa55be4ba35..a06e8f926b6 100644 --- a/storage/fifocache/fifocacheSharded.go +++ b/storage/fifocache/fifocacheSharded.go @@ -11,7 +11,9 @@ var log = logger.DefaultLogger() // FIFOShardedCache implements a First In First Out eviction cache type FIFOShardedCache struct { - cache *cmap.ConcurrentMap + cache *cmap.ConcurrentMap + maxsize int + mutAddedDataHandlers sync.RWMutex addedDataHandlers []func(key []byte) } @@ -21,6 +23,7 @@ func NewShardedCache(size int, shards int) (*FIFOShardedCache, error) { cache := cmap.New(size, shards) fifoShardedCache := &FIFOShardedCache{ cache: cache, + maxsize: size, mutAddedDataHandlers: sync.RWMutex{}, addedDataHandlers: make([]func(key []byte), 0), } @@ -122,6 +125,11 @@ func (c *FIFOShardedCache) Len() int { return c.cache.Count() } +// MaxSize returns the maximum number of items which can be stored in cache. +func (c *FIFOShardedCache) MaxSize() int { + return c.maxsize +} + // IsInterfaceNil returns true if there is no value under the interface func (c *FIFOShardedCache) IsInterfaceNil() bool { if c == nil { diff --git a/storage/interface.go b/storage/interface.go index 4fb8069d1b2..9ea9b7d48de 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -58,6 +58,8 @@ type Cacher interface { Keys() [][]byte // Len returns the number of items in the cache. Len() int + // MaxSize returns the maximum number of items which can be stored in the cache. + MaxSize() int // RegisterHandler registers a new handler to be called when a new data is added RegisterHandler(func(key []byte)) // IsInterfaceNil returns true if there is no value under the interface diff --git a/storage/lrucache/lrucache.go b/storage/lrucache/lrucache.go index c18a1e6ca81..9b4c394ee08 100644 --- a/storage/lrucache/lrucache.go +++ b/storage/lrucache/lrucache.go @@ -137,6 +137,11 @@ func (c *LRUCache) Len() int { return c.cache.Len() } +// MaxSize returns the maximum number of items which can be stored in cache. +func (c *LRUCache) MaxSize() int { + return c.maxsize +} + // IsInterfaceNil returns true if there is no value under the interface func (c *LRUCache) IsInterfaceNil() bool { if c == nil { From abf416c2c989cecadfc266441363d937c83cb61f Mon Sep 17 00:00:00 2001 From: Sebastian Marian Date: Sat, 19 Oct 2019 11:37:40 +0300 Subject: [PATCH 2/8] * Refactored some mocks --- dataRetriever/mock/cacherMock.go | 3 +-- process/block/shardblock.go | 1 - process/mock/cacherMock.go | 3 +-- 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/dataRetriever/mock/cacherMock.go b/dataRetriever/mock/cacherMock.go index 506571d2ddd..d168b0ec77b 100644 --- a/dataRetriever/mock/cacherMock.go +++ b/dataRetriever/mock/cacherMock.go @@ -1,7 +1,6 @@ package mock import ( - "math" "sync" ) @@ -94,7 +93,7 @@ func (cm *CacherMock) Len() int { } func (cm *CacherMock) MaxSize() int { - return math.MaxInt32 + return 10000 } func (cm *CacherMock) RegisterHandler(func(key []byte)) { diff --git a/process/block/shardblock.go b/process/block/shardblock.go index fbe03e99b2e..4732d0725a2 100644 --- a/process/block/shardblock.go +++ b/process/block/shardblock.go @@ -1079,7 +1079,6 @@ func (sp *shardProcessor) receivedMetaBlock(metaBlockHash []byte) { if err != nil { return } - if metaBlock.GetNonce() <= lastNotarizedHdr.GetNonce() { return } diff --git a/process/mock/cacherMock.go b/process/mock/cacherMock.go index 506571d2ddd..d168b0ec77b 100644 --- a/process/mock/cacherMock.go +++ b/process/mock/cacherMock.go @@ -1,7 +1,6 @@ package mock import ( - "math" "sync" ) @@ -94,7 +93,7 @@ func (cm *CacherMock) Len() int { } func (cm *CacherMock) MaxSize() int { - return math.MaxInt32 + return 10000 } func (cm *CacherMock) RegisterHandler(func(key []byte)) { From db4141c7f56585eb943ea5aaeeee8a10cbe3f682 Mon Sep 17 00:00:00 2001 From: Sebastian Marian Date: Sat, 19 Oct 2019 11:43:22 +0300 Subject: [PATCH 3/8] * Refactored some comments and imports --- dataRetriever/mock/cacherMock.go | 4 +--- process/constants.go | 4 ++-- process/mock/cacherMock.go | 4 +--- 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/dataRetriever/mock/cacherMock.go b/dataRetriever/mock/cacherMock.go index d168b0ec77b..c6aaf7ba7ef 100644 --- a/dataRetriever/mock/cacherMock.go +++ b/dataRetriever/mock/cacherMock.go @@ -1,8 +1,6 @@ package mock -import ( - "sync" -) +import "sync" type CacherMock struct { mut sync.Mutex diff --git a/process/constants.go b/process/constants.go index 197d22ef272..2b8c5912b10 100644 --- a/process/constants.go +++ b/process/constants.go @@ -47,6 +47,6 @@ const MaxHeadersToRequestInAdvance = 10 // RoundModulusTrigger defines a round modulus on which a trigger for an action will be released const RoundModulusTrigger = 5 -// MaxOccupancyPercentageAllowed defines the maximum occupancy percentage allowed to be used from the full pool capacity, for the -// received data which are not needed in the near future +// MaxOccupancyPercentageAllowed defines the maximum occupancy percentage allowed to be used, +// from the full pool capacity, for the received data which are not needed in the near future const MaxOccupancyPercentageAllowed = float64(0.9) diff --git a/process/mock/cacherMock.go b/process/mock/cacherMock.go index d168b0ec77b..c6aaf7ba7ef 100644 --- a/process/mock/cacherMock.go +++ b/process/mock/cacherMock.go @@ -1,8 +1,6 @@ package mock -import ( - "sync" -) +import "sync" type CacherMock struct { mut sync.Mutex From 6ff6a236475f2d26b2e3e1bc2b59f64f65bdf85b Mon Sep 17 00:00:00 2001 From: Sebastian Marian Date: Sun, 20 Oct 2019 00:19:03 +0300 Subject: [PATCH 4/8] * Fixed multiple requests for finality attesting headers * Fixed bug in requested/received reward transactions --- process/block/baseProcess.go | 27 +++++++++++++++---- process/block/export_test.go | 4 +++ process/block/metablock.go | 21 +++++++++++---- .../block/preprocess/rewardTxPreProcessor.go | 8 ++++-- .../block/preprocess/smartContractResults.go | 1 + process/block/shardblock.go | 23 +++++++++++----- 6 files changed, 66 insertions(+), 18 deletions(-) diff --git a/process/block/baseProcess.go b/process/block/baseProcess.go index f6851df7af7..90db032ccd9 100644 --- a/process/block/baseProcess.go +++ b/process/block/baseProcess.go @@ -39,11 +39,12 @@ type hdrInfo struct { } type hdrForBlock struct { - missingHdrs uint32 - missingFinalityAttestingHdrs uint32 - highestHdrNonce map[uint32]uint64 - mutHdrsForBlock sync.RWMutex - hdrHashAndInfo map[string]*hdrInfo + missingHdrs uint32 + missingFinalityAttestingHdrs uint32 + requestedFinalityAttestingHdrs map[uint32][]uint64 + highestHdrNonce map[uint32]uint64 + mutHdrsForBlock sync.RWMutex + hdrHashAndInfo map[string]*hdrInfo } type mapShardHeaders map[uint32][]data.HeaderHandler @@ -568,6 +569,7 @@ func (bp *baseProcessor) createBlockStarted() { bp.hdrsForCurrBlock.mutHdrsForBlock.Lock() bp.hdrsForCurrBlock.hdrHashAndInfo = make(map[string]*hdrInfo) bp.hdrsForCurrBlock.highestHdrNonce = make(map[uint32]uint64) + bp.hdrsForCurrBlock.requestedFinalityAttestingHdrs = make(map[uint32][]uint64) bp.hdrsForCurrBlock.mutHdrsForBlock.Unlock() } @@ -644,3 +646,18 @@ func (bp *baseProcessor) isHeaderOutOfRange(header data.HeaderHandler, cacher st return isHeaderOutOfRange } + +func (bp *baseProcessor) wasHeaderRequested(shardId uint32, nonce uint64) bool { + requestedNonces, ok := bp.hdrsForCurrBlock.requestedFinalityAttestingHdrs[shardId] + if !ok { + return false + } + + for _, requestedNonce := range requestedNonces { + if requestedNonce == nonce { + return true + } + } + + return false +} diff --git a/process/block/export_test.go b/process/block/export_test.go index 6da9619fab4..b0a6e96da0d 100644 --- a/process/block/export_test.go +++ b/process/block/export_test.go @@ -107,6 +107,10 @@ func (mp *metaProcessor) AddHdrHashToRequestedList(hdr *block.Header, hdrHash [] mp.hdrsForCurrBlock.highestHdrNonce = make(map[uint32]uint64, mp.shardCoordinator.NumberOfShards()) } + if mp.hdrsForCurrBlock.requestedFinalityAttestingHdrs == nil { + mp.hdrsForCurrBlock.requestedFinalityAttestingHdrs = make(map[uint32][]uint64, mp.shardCoordinator.NumberOfShards()) + } + mp.hdrsForCurrBlock.hdrHashAndInfo[string(hdrHash)] = &hdrInfo{hdr: hdr, usedInBlock: true} mp.hdrsForCurrBlock.missingHdrs++ } diff --git a/process/block/metablock.go b/process/block/metablock.go index 109cfbdb5e6..e5e67db6099 100644 --- a/process/block/metablock.go +++ b/process/block/metablock.go @@ -80,6 +80,7 @@ func NewMetaProcessor(arguments ArgMetaProcessor) (*metaProcessor, error) { mp.hdrsForCurrBlock.hdrHashAndInfo = make(map[string]*hdrInfo) mp.hdrsForCurrBlock.highestHdrNonce = make(map[uint32]uint64) + mp.hdrsForCurrBlock.requestedFinalityAttestingHdrs = make(map[uint32][]uint64) headerPool := mp.dataPool.ShardHeaders() headerPool.RegisterHandler(mp.receivedShardHeader) @@ -810,8 +811,6 @@ func (mp *metaProcessor) receivedShardHeader(shardHeaderHash []byte) { mp.hdrsForCurrBlock.missingFinalityAttestingHdrs = mp.requestMissingFinalityAttestingHeaders() if mp.hdrsForCurrBlock.missingFinalityAttestingHdrs == 0 { log.Info(fmt.Sprintf("received %d missing finality attesting shard headers\n", missingFinalityAttestingShardHdrs)) - } else { - log.Info(fmt.Sprintf("requested %d missing finality attesting shard headers\n", mp.hdrsForCurrBlock.missingFinalityAttestingHdrs)) } } @@ -845,6 +844,8 @@ func (mp *metaProcessor) receivedShardHeader(shardHeaderHash []byte) { // to the block which should be processed func (mp *metaProcessor) requestMissingFinalityAttestingHeaders() uint32 { requestedBlockHeaders := uint32(0) + missingFinalityAttestingHeaders := uint32(0) + for shardId := uint32(0); shardId < mp.shardCoordinator.NumberOfShards(); shardId++ { highestHdrNonce := mp.hdrsForCurrBlock.highestHdrNonce[shardId] if highestHdrNonce == uint64(0) { @@ -860,8 +861,14 @@ func (mp *metaProcessor) requestMissingFinalityAttestingHeaders() uint32 { mp.dataPool.HeadersNonces()) if err != nil { - requestedBlockHeaders++ - go mp.onRequestHeaderHandlerByNonce(shardId, i) + missingFinalityAttestingHeaders++ + wasHeaderRequested := mp.wasHeaderRequested(shardId, i) + if !wasHeaderRequested { + requestedBlockHeaders++ + mp.hdrsForCurrBlock.requestedFinalityAttestingHdrs[shardId] = append(mp.hdrsForCurrBlock.requestedFinalityAttestingHdrs[shardId], i) + go mp.onRequestHeaderHandlerByNonce(shardId, i) + } + continue } @@ -869,7 +876,11 @@ func (mp *metaProcessor) requestMissingFinalityAttestingHeaders() uint32 { } } - return requestedBlockHeaders + if requestedBlockHeaders > 0 { + log.Info(fmt.Sprintf("requested %d missing finality attesting shard headers\n", requestedBlockHeaders)) + } + + return missingFinalityAttestingHeaders } func (mp *metaProcessor) requestShardHeaders(metaBlock *block.MetaBlock) (uint32, uint32) { diff --git a/process/block/preprocess/rewardTxPreProcessor.go b/process/block/preprocess/rewardTxPreProcessor.go index a9a7b07e421..6dae9adbae9 100644 --- a/process/block/preprocess/rewardTxPreProcessor.go +++ b/process/block/preprocess/rewardTxPreProcessor.go @@ -106,12 +106,13 @@ func (rtp *rewardTxPreprocessor) waitForRewardTxHashes(waitTime time.Duration) e // IsDataPrepared returns non error if all the requested reward transactions arrived and were saved into the pool func (rtp *rewardTxPreprocessor) IsDataPrepared(requestedRewardTxs int, haveTime func() time.Duration) error { if requestedRewardTxs > 0 { - log.Info(fmt.Sprintf("requested %d missing reward Txs\n", requestedRewardTxs)) + log.Info(fmt.Sprintf("requested %d missing reward txs\n", requestedRewardTxs)) err := rtp.waitForRewardTxHashes(haveTime()) rtp.rewardTxsForBlock.mutTxsForBlock.RLock() missingRewardTxs := rtp.rewardTxsForBlock.missingTxs + rtp.rewardTxsForBlock.missingTxs = 0 rtp.rewardTxsForBlock.mutTxsForBlock.RUnlock() - log.Info(fmt.Sprintf("received %d missing reward Txs\n", requestedRewardTxs-missingRewardTxs)) + log.Info(fmt.Sprintf("received %d missing reward txs\n", requestedRewardTxs-missingRewardTxs)) if err != nil { return err } @@ -286,7 +287,10 @@ func (rtp *rewardTxPreprocessor) receivedRewardTransaction(txHash []byte) { // CreateBlockStarted cleans the local cache map for processed/created reward transactions at this round func (rtp *rewardTxPreprocessor) CreateBlockStarted() { + _ = process.EmptyChannel(rtp.chReceivedAllRewardTxs) + rtp.rewardTxsForBlock.mutTxsForBlock.Lock() + rtp.rewardTxsForBlock.missingTxs = 0 rtp.rewardTxsForBlock.txHashAndInfo = make(map[string]*txInfo) rtp.rewardTxsForBlock.mutTxsForBlock.Unlock() } diff --git a/process/block/preprocess/smartContractResults.go b/process/block/preprocess/smartContractResults.go index 469f04fe575..cd43f421519 100644 --- a/process/block/preprocess/smartContractResults.go +++ b/process/block/preprocess/smartContractResults.go @@ -106,6 +106,7 @@ func (scr *smartContractResults) IsDataPrepared(requestedScrs int, haveTime func err := scr.waitForScrHashes(haveTime()) scr.scrForBlock.mutTxsForBlock.RLock() missingScrs := scr.scrForBlock.missingTxs + scr.scrForBlock.missingTxs = 0 scr.scrForBlock.mutTxsForBlock.RUnlock() log.Info(fmt.Sprintf("received %d missing scr\n", requestedScrs-missingScrs)) if err != nil { diff --git a/process/block/shardblock.go b/process/block/shardblock.go index 4732d0725a2..af523e90f73 100644 --- a/process/block/shardblock.go +++ b/process/block/shardblock.go @@ -93,6 +93,7 @@ func NewShardProcessor(arguments ArgShardProcessor) (*shardProcessor, error) { sp.hdrsForCurrBlock.hdrHashAndInfo = make(map[string]*hdrInfo) sp.hdrsForCurrBlock.highestHdrNonce = make(map[uint32]uint64) + sp.hdrsForCurrBlock.requestedFinalityAttestingHdrs = make(map[uint32][]uint64) sp.processedMiniBlocks = make(map[string]map[string]struct{}) metaBlockPool := sp.dataPool.MetaBlocks() @@ -1053,8 +1054,6 @@ func (sp *shardProcessor) receivedMetaBlock(metaBlockHash []byte) { sp.hdrsForCurrBlock.missingFinalityAttestingHdrs = sp.requestMissingFinalityAttestingHeaders() if sp.hdrsForCurrBlock.missingFinalityAttestingHdrs == 0 { log.Info(fmt.Sprintf("received %d missing finality attesting meta headers\n", missingFinalityAttestingMetaHdrs)) - } else { - log.Info(fmt.Sprintf("requested %d missing finality attesting meta headers\n", sp.hdrsForCurrBlock.missingFinalityAttestingHdrs)) } } @@ -1099,9 +1098,11 @@ func (sp *shardProcessor) receivedMetaBlock(metaBlockHash []byte) { // which should be processed func (sp *shardProcessor) requestMissingFinalityAttestingHeaders() uint32 { requestedBlockHeaders := uint32(0) + missingFinalityAttestingHeaders := uint32(0) + highestHdrNonce := sp.hdrsForCurrBlock.highestHdrNonce[sharding.MetachainShardId] if highestHdrNonce == uint64(0) { - return requestedBlockHeaders + return missingFinalityAttestingHeaders } lastFinalityAttestingHeader := sp.hdrsForCurrBlock.highestHdrNonce[sharding.MetachainShardId] + uint64(sp.metaBlockFinality) @@ -1112,15 +1113,25 @@ func (sp *shardProcessor) requestMissingFinalityAttestingHeaders() uint32 { sp.dataPool.HeadersNonces()) if err != nil { - requestedBlockHeaders++ - go sp.onRequestHeaderHandlerByNonce(sharding.MetachainShardId, i) + missingFinalityAttestingHeaders++ + wasHeaderRequested := sp.wasHeaderRequested(sharding.MetachainShardId, i) + if !wasHeaderRequested { + requestedBlockHeaders++ + sp.hdrsForCurrBlock.requestedFinalityAttestingHdrs[sharding.MetachainShardId] = append(sp.hdrsForCurrBlock.requestedFinalityAttestingHdrs[sharding.MetachainShardId], i) + go sp.onRequestHeaderHandlerByNonce(sharding.MetachainShardId, i) + } + continue } sp.hdrsForCurrBlock.hdrHashAndInfo[string(metaBlockHash)] = &hdrInfo{hdr: metaBlock, usedInBlock: false} } - return requestedBlockHeaders + if requestedBlockHeaders > 0 { + log.Info(fmt.Sprintf("requested %d missing finality attesting meta headers\n", requestedBlockHeaders)) + } + + return missingFinalityAttestingHeaders } func (sp *shardProcessor) requestMetaHeaders(shardHeader *block.Header) (uint32, uint32) { From 9d880f529e70eafbe66a34ee2bb0da8345df5641 Mon Sep 17 00:00:00 2001 From: Sebastian Marian Date: Mon, 21 Oct 2019 14:35:49 +0300 Subject: [PATCH 5/8] * Fixed bug with bad cast to MiniBlock instead *MiniBlock * Fixed a problem when too many headers were requested in advance * Improved the mechanism which requests miniblocks from meta blocks --- process/block/baseProcess.go | 14 +++++---- process/block/baseProcess_test.go | 6 ++++ process/block/metablock.go | 5 ++++ .../block/preprocess/rewardTxPreProcessor.go | 18 +++++++----- .../preprocess/rewardTxPreProcessor_test.go | 2 +- .../block/preprocess/smartContractResults.go | 29 ++++++++++++------- .../preprocess/smartContractResults_test.go | 2 +- process/block/preprocess/transactions.go | 25 ++++++++++------ process/block/preprocess/transactions_test.go | 2 +- process/block/shardblock.go | 20 +++++++++++++ process/coordinator/process.go | 4 +-- process/coordinator/process_test.go | 2 +- process/interface.go | 2 +- process/mock/preprocessorMock.go | 6 ++-- 14 files changed, 94 insertions(+), 43 deletions(-) diff --git a/process/block/baseProcess.go b/process/block/baseProcess.go index 90db032ccd9..0ce919a13e0 100644 --- a/process/block/baseProcess.go +++ b/process/block/baseProcess.go @@ -450,11 +450,15 @@ func (bp *baseProcessor) requestHeadersIfMissing( } // ask for headers, if there most probably should be - if maxRound > highestHdr.GetRound() { - nbHeaderRequests := maxRound - highestHdr.GetRound() - startNonce := highestHdr.GetNonce() + 1 - for nonce := startNonce; nonce < startNonce+nbHeaderRequests; nonce++ { - missingNonces = append(missingNonces, nonce) + roundDiff := int64(maxRound) - int64(highestHdr.GetRound()) + if roundDiff > 0 { + nonceDiff := int64(lastNotarizedHdrNonce) + process.MaxHeadersToRequestInAdvance - int64(highestHdr.GetNonce()) + if nonceDiff > 0 { + nbHeadersToRequestInAdvance := core.MinUint64(uint64(roundDiff), uint64(nonceDiff)) + startNonce := highestHdr.GetNonce() + 1 + for nonce := startNonce; nonce < startNonce+nbHeadersToRequestInAdvance; nonce++ { + missingNonces = append(missingNonces, nonce) + } } } diff --git a/process/block/baseProcess_test.go b/process/block/baseProcess_test.go index 1754e337ee3..b5fc2f2266e 100644 --- a/process/block/baseProcess_test.go +++ b/process/block/baseProcess_test.go @@ -157,6 +157,9 @@ func initDataPool(testHash []byte) *mock.PoolsHolderStub { } cs.RegisterHandlerCalled = func(i func(key []byte)) {} cs.RemoveCalled = func(key []byte) {} + cs.LenCalled = func() int { + return 0 + } return cs }, HeadersCalled: func() storage.Cacher { @@ -165,6 +168,9 @@ func initDataPool(testHash []byte) *mock.PoolsHolderStub { } cs.RemoveCalled = func(key []byte) { } + cs.LenCalled = func() int { + return 0 + } return cs }, } diff --git a/process/block/metablock.go b/process/block/metablock.go index e5e67db6099..76df685bf44 100644 --- a/process/block/metablock.go +++ b/process/block/metablock.go @@ -563,6 +563,11 @@ func (mp *metaProcessor) CommitBlock( mp.blockSizeThrottler.Succeed(header.Round) + log.Info(fmt.Sprintf("Pools len: MetaBlocks = %d ShardHeaders = %d\n", + mp.dataPool.MetaBlocks().Len(), + mp.dataPool.ShardHeaders().Len(), + )) + return nil } diff --git a/process/block/preprocess/rewardTxPreProcessor.go b/process/block/preprocess/rewardTxPreProcessor.go index 6dae9adbae9..0b5c36a0191 100644 --- a/process/block/preprocess/rewardTxPreProcessor.go +++ b/process/block/preprocess/rewardTxPreProcessor.go @@ -374,24 +374,26 @@ func (rtp *rewardTxPreprocessor) processRewardTransaction( } // RequestTransactionsForMiniBlock requests missing reward transactions for a certain miniblock -func (rtp *rewardTxPreprocessor) RequestTransactionsForMiniBlock(mb block.MiniBlock) int { - missingRewardTxsForMiniBlock := rtp.computeMissingRewardTxsForMiniBlock(mb) - rtp.onRequestRewardTx(mb.SenderShardID, missingRewardTxsForMiniBlock) +func (rtp *rewardTxPreprocessor) RequestTransactionsForMiniBlock(miniBlock *block.MiniBlock) int { + missingRewardTxsForMiniBlock := rtp.computeMissingRewardTxsForMiniBlock(miniBlock) + if len(missingRewardTxsForMiniBlock) > 0 { + rtp.onRequestRewardTx(miniBlock.SenderShardID, missingRewardTxsForMiniBlock) + } return len(missingRewardTxsForMiniBlock) } // computeMissingRewardTxsForMiniBlock computes missing reward transactions for a certain miniblock -func (rtp *rewardTxPreprocessor) computeMissingRewardTxsForMiniBlock(mb block.MiniBlock) [][]byte { +func (rtp *rewardTxPreprocessor) computeMissingRewardTxsForMiniBlock(miniBlock *block.MiniBlock) [][]byte { missingRewardTxs := make([][]byte, 0) - if mb.Type != block.RewardsBlock { + if miniBlock.Type != block.RewardsBlock { return missingRewardTxs } - for _, txHash := range mb.TxHashes { + for _, txHash := range miniBlock.TxHashes { tx, _ := process.GetTransactionHandlerFromPool( - mb.SenderShardID, - mb.ReceiverShardID, + miniBlock.SenderShardID, + miniBlock.ReceiverShardID, txHash, rtp.rewardTxPool, ) diff --git a/process/block/preprocess/rewardTxPreProcessor_test.go b/process/block/preprocess/rewardTxPreProcessor_test.go index 226b432cc05..70bb0d0f61d 100644 --- a/process/block/preprocess/rewardTxPreProcessor_test.go +++ b/process/block/preprocess/rewardTxPreProcessor_test.go @@ -454,7 +454,7 @@ func TestRewardTxPreprocessor_RequestTransactionsForMiniBlockShouldWork(t *testi ) txHashes := [][]byte{[]byte(txHash)} - mb1 := block.MiniBlock{ + mb1 := &block.MiniBlock{ TxHashes: txHashes, ReceiverShardID: 1, SenderShardID: 0, diff --git a/process/block/preprocess/smartContractResults.go b/process/block/preprocess/smartContractResults.go index cd43f421519..a0ce55d1a75 100644 --- a/process/block/preprocess/smartContractResults.go +++ b/process/block/preprocess/smartContractResults.go @@ -306,7 +306,7 @@ func (scr *smartContractResults) setMissingSCResultsForShard(senderShardID uint3 // computeMissingAndExistingSCResultsForShards calculates what smartContractResults are available and what are missing from block.Body func (scr *smartContractResults) computeMissingAndExistingSCResultsForShards(body block.Body) map[uint32][]*txsHashesInfo { - onlyScrFromOthersBody := block.Body{} + scrTxs := block.Body{} for _, mb := range body { if mb.Type != block.SmartContractResultBlock { continue @@ -315,10 +315,15 @@ func (scr *smartContractResults) computeMissingAndExistingSCResultsForShards(bod continue } - onlyScrFromOthersBody = append(onlyScrFromOthersBody, mb) + scrTxs = append(scrTxs, mb) } - missingTxsForShard := scr.computeExistingAndMissing(onlyScrFromOthersBody, &scr.scrForBlock, scr.chRcvAllScrs, block.SmartContractResultBlock, scr.scrPool) + missingTxsForShard := scr.computeExistingAndMissing( + scrTxs, + &scr.scrForBlock, + scr.chRcvAllScrs, + block.SmartContractResultBlock, + scr.scrPool) return missingTxsForShard } @@ -346,24 +351,26 @@ func (scr *smartContractResults) processSmartContractResult( } // RequestTransactionsForMiniBlock requests missing smartContractResults for a certain miniblock -func (scr *smartContractResults) RequestTransactionsForMiniBlock(mb block.MiniBlock) int { - missingScrsForMiniBlock := scr.computeMissingScrsForMiniBlock(mb) - scr.onRequestSmartContractResult(mb.SenderShardID, missingScrsForMiniBlock) +func (scr *smartContractResults) RequestTransactionsForMiniBlock(miniBlock *block.MiniBlock) int { + missingScrsForMiniBlock := scr.computeMissingScrsForMiniBlock(miniBlock) + if len(missingScrsForMiniBlock) > 0 { + scr.onRequestSmartContractResult(miniBlock.SenderShardID, missingScrsForMiniBlock) + } return len(missingScrsForMiniBlock) } // computeMissingScrsForMiniBlock computes missing smartContractResults for a certain miniblock -func (scr *smartContractResults) computeMissingScrsForMiniBlock(mb block.MiniBlock) [][]byte { +func (scr *smartContractResults) computeMissingScrsForMiniBlock(miniBlock *block.MiniBlock) [][]byte { missingSmartContractResults := make([][]byte, 0) - if mb.Type != block.SmartContractResultBlock { + if miniBlock.Type != block.SmartContractResultBlock { return missingSmartContractResults } - for _, txHash := range mb.TxHashes { + for _, txHash := range miniBlock.TxHashes { tx, _ := process.GetTransactionHandlerFromPool( - mb.SenderShardID, - mb.ReceiverShardID, + miniBlock.SenderShardID, + miniBlock.ReceiverShardID, txHash, scr.scrPool) diff --git a/process/block/preprocess/smartContractResults_test.go b/process/block/preprocess/smartContractResults_test.go index 22851d4d989..f85c5aa1a33 100644 --- a/process/block/preprocess/smartContractResults_test.go +++ b/process/block/preprocess/smartContractResults_test.go @@ -290,7 +290,7 @@ func TestScrsPreprocessor_RequestBlockTransactionFromMiniBlockFromNetwork(t *tes txHashes := make([][]byte, 0) txHashes = append(txHashes, txHash1) txHashes = append(txHashes, txHash2) - mb := block.MiniBlock{ReceiverShardID: shardId, TxHashes: txHashes, Type: block.SmartContractResultBlock} + mb := &block.MiniBlock{ReceiverShardID: shardId, TxHashes: txHashes, Type: block.SmartContractResultBlock} txsRequested := txs.RequestTransactionsForMiniBlock(mb) diff --git a/process/block/preprocess/transactions.go b/process/block/preprocess/transactions.go index dcfd418598e..3a10c264c72 100644 --- a/process/block/preprocess/transactions.go +++ b/process/block/preprocess/transactions.go @@ -313,7 +313,12 @@ func (txs *transactions) setMissingTxsForShard(senderShardID uint32, mbTxHashes // computeMissingAndExistingTxsForShards calculates what transactions are available and what are missing from block.Body func (txs *transactions) computeMissingAndExistingTxsForShards(body block.Body) map[uint32][]*txsHashesInfo { - missingTxsForShard := txs.computeExistingAndMissing(body, &txs.txsForCurrBlock, txs.chRcvAllTxs, block.TxBlock, txs.txPool) + missingTxsForShard := txs.computeExistingAndMissing( + body, + &txs.txsForCurrBlock, + txs.chRcvAllTxs, + block.TxBlock, + txs.txPool) return missingTxsForShard } @@ -347,24 +352,26 @@ func (txs *transactions) processAndRemoveBadTransaction( } // RequestTransactionsForMiniBlock requests missing transactions for a certain miniblock -func (txs *transactions) RequestTransactionsForMiniBlock(mb block.MiniBlock) int { - missingTxsForMiniBlock := txs.computeMissingTxsForMiniBlock(mb) - txs.onRequestTransaction(mb.SenderShardID, missingTxsForMiniBlock) +func (txs *transactions) RequestTransactionsForMiniBlock(miniBlock *block.MiniBlock) int { + missingTxsForMiniBlock := txs.computeMissingTxsForMiniBlock(miniBlock) + if len(missingTxsForMiniBlock) > 0 { + txs.onRequestTransaction(miniBlock.SenderShardID, missingTxsForMiniBlock) + } return len(missingTxsForMiniBlock) } // computeMissingTxsForMiniBlock computes missing transactions for a certain miniblock -func (txs *transactions) computeMissingTxsForMiniBlock(mb block.MiniBlock) [][]byte { - if mb.Type != block.TxBlock { +func (txs *transactions) computeMissingTxsForMiniBlock(miniBlock *block.MiniBlock) [][]byte { + if miniBlock.Type != block.TxBlock { return nil } missingTransactions := make([][]byte, 0) - for _, txHash := range mb.TxHashes { + for _, txHash := range miniBlock.TxHashes { tx, _ := process.GetTransactionHandlerFromPool( - mb.SenderShardID, - mb.ReceiverShardID, + miniBlock.SenderShardID, + miniBlock.ReceiverShardID, txHash, txs.txPool) diff --git a/process/block/preprocess/transactions_test.go b/process/block/preprocess/transactions_test.go index 8f17d899762..74ab91b93c7 100644 --- a/process/block/preprocess/transactions_test.go +++ b/process/block/preprocess/transactions_test.go @@ -420,7 +420,7 @@ func TestTransactionPreprocessor_RequestBlockTransactionFromMiniBlockFromNetwork txHashes := make([][]byte, 0) txHashes = append(txHashes, txHash1) txHashes = append(txHashes, txHash2) - mb := block.MiniBlock{ReceiverShardID: shardId, TxHashes: txHashes} + mb := &block.MiniBlock{ReceiverShardID: shardId, TxHashes: txHashes} txsRequested := txs.RequestTransactionsForMiniBlock(mb) assert.Equal(t, 2, txsRequested) } diff --git a/process/block/shardblock.go b/process/block/shardblock.go index af523e90f73..a65b9b33fc7 100644 --- a/process/block/shardblock.go +++ b/process/block/shardblock.go @@ -411,6 +411,20 @@ func (sp *shardProcessor) checkAndRequestIfMetaHeadersMissing(round uint64) { log.Info(err.Error()) } + lastNotarizedHdr, err := sp.getLastNotarizedHdr(sharding.MetachainShardId) + if err != nil { + log.Info(err.Error()) + } + + for i := 0; i < len(sortedHdrs); i++ { + isMetaBlockOutOfRange := sortedHdrs[i].GetNonce() > lastNotarizedHdr.GetNonce()+process.MaxHeadersToRequestInAdvance + if isMetaBlockOutOfRange { + break + } + + sp.txCoordinator.RequestMiniBlocks(sortedHdrs[i]) + } + return } @@ -729,6 +743,12 @@ func (sp *shardProcessor) CommitBlock( sp.blockSizeThrottler.Succeed(header.Round) + log.Info(fmt.Sprintf("Pools len: Headers = %d MetaBlocks = %d MiniBlocks = %d\n", + sp.dataPool.Headers().Len(), + sp.dataPool.MetaBlocks().Len(), + sp.dataPool.MiniBlocks().Len(), + )) + return nil } diff --git a/process/coordinator/process.go b/process/coordinator/process.go index 9002696b0d2..2162a7d07a1 100644 --- a/process/coordinator/process.go +++ b/process/coordinator/process.go @@ -417,7 +417,7 @@ func (tc *transactionCoordinator) CreateMbsAndProcessCrossShardTransactionsDstMe return miniBlocks, nrTxAdded, false } - requestedTxs := preproc.RequestTransactionsForMiniBlock(*miniBlock) + requestedTxs := preproc.RequestTransactionsForMiniBlock(miniBlock) if requestedTxs > 0 { continue } @@ -667,7 +667,7 @@ func (tc *transactionCoordinator) receivedMiniBlock(miniBlockHash []byte) { return } - miniBlock, ok := val.(block.MiniBlock) + miniBlock, ok := val.(*block.MiniBlock) if !ok { return } diff --git a/process/coordinator/process_test.go b/process/coordinator/process_test.go index 8ab45760274..96d940d7237 100644 --- a/process/coordinator/process_test.go +++ b/process/coordinator/process_test.go @@ -1104,7 +1104,7 @@ func TestTransactionCoordinator_receivedMiniBlockRequestTxs(t *testing.T) { senderShardId := uint32(1) receiverShardId := uint32(2) - miniBlock := block.MiniBlock{ + miniBlock := &block.MiniBlock{ SenderShardID: senderShardId, ReceiverShardID: receiverShardId, TxHashes: [][]byte{txHash1, txHash2, txHash3}, diff --git a/process/interface.go b/process/interface.go index 8d3b7eb45b8..92ab0d4d819 100644 --- a/process/interface.go +++ b/process/interface.go @@ -154,7 +154,7 @@ type PreProcessor interface { CreateMarshalizedData(txHashes [][]byte) ([][]byte, error) - RequestTransactionsForMiniBlock(mb block.MiniBlock) int + RequestTransactionsForMiniBlock(miniBlock *block.MiniBlock) int ProcessMiniBlock(miniBlock *block.MiniBlock, haveTime func() bool, round uint64) error CreateAndProcessMiniBlock(sndShardId, dstShardId uint32, spaceRemained int, haveTime func() bool, round uint64) (*block.MiniBlock, error) CreateAndProcessMiniBlocks(maxTxSpaceRemained uint32, maxMbSpaceRemained uint32, round uint64, haveTime func() bool) (block.MiniBlockSlice, error) diff --git a/process/mock/preprocessorMock.go b/process/mock/preprocessorMock.go index ab03b54b001..dc31884fe20 100644 --- a/process/mock/preprocessorMock.go +++ b/process/mock/preprocessorMock.go @@ -17,7 +17,7 @@ type PreProcessorMock struct { ProcessBlockTransactionsCalled func(body block.Body, round uint64, haveTime func() bool) error RequestBlockTransactionsCalled func(body block.Body) int CreateMarshalizedDataCalled func(txHashes [][]byte) ([][]byte, error) - RequestTransactionsForMiniBlockCalled func(mb block.MiniBlock) int + RequestTransactionsForMiniBlockCalled func(miniBlock *block.MiniBlock) int ProcessMiniBlockCalled func(miniBlock *block.MiniBlock, haveTime func() bool, round uint64) error CreateAndProcessMiniBlocksCalled func(maxTxSpaceRemained uint32, maxMbSpaceRemained uint32, round uint64, haveTime func() bool) (block.MiniBlockSlice, error) CreateAndProcessMiniBlockCalled func(sndShardId, dstShardId uint32, spaceRemained int, haveTime func() bool, round uint64) (*block.MiniBlock, error) @@ -80,11 +80,11 @@ func (ppm *PreProcessorMock) CreateMarshalizedData(txHashes [][]byte) ([][]byte, return ppm.CreateMarshalizedDataCalled(txHashes) } -func (ppm *PreProcessorMock) RequestTransactionsForMiniBlock(mb block.MiniBlock) int { +func (ppm *PreProcessorMock) RequestTransactionsForMiniBlock(miniBlock *block.MiniBlock) int { if ppm.RequestTransactionsForMiniBlockCalled == nil { return 0 } - return ppm.RequestTransactionsForMiniBlockCalled(mb) + return ppm.RequestTransactionsForMiniBlockCalled(miniBlock) } func (ppm *PreProcessorMock) ProcessMiniBlock(miniBlock *block.MiniBlock, haveTime func() bool, round uint64) error { From 558d44247ff59e9e15873f45942eb9da0ae689d9 Mon Sep 17 00:00:00 2001 From: Sebastian Marian Date: Mon, 21 Oct 2019 16:14:33 +0300 Subject: [PATCH 6/8] * Fixed after review --- process/block/baseProcess.go | 60 ++++++++++++++++++++++---- process/block/export_test.go | 9 ++-- process/block/metablock.go | 74 +++++++++++++-------------------- process/block/metablock_test.go | 2 +- process/block/shardblock.go | 74 +++++++++++---------------------- 5 files changed, 113 insertions(+), 106 deletions(-) diff --git a/process/block/baseProcess.go b/process/block/baseProcess.go index 0ce919a13e0..60dfa838e6b 100644 --- a/process/block/baseProcess.go +++ b/process/block/baseProcess.go @@ -589,12 +589,12 @@ func (bp *baseProcessor) sortHeadersForCurrentBlockByNonce(usedInBlock bool) map hdrsForCurrentBlock := make(map[uint32][]data.HeaderHandler) bp.hdrsForCurrBlock.mutHdrsForBlock.RLock() - for _, hdrInfo := range bp.hdrsForCurrBlock.hdrHashAndInfo { - if hdrInfo.usedInBlock != usedInBlock { + for _, headerInfo := range bp.hdrsForCurrBlock.hdrHashAndInfo { + if headerInfo.usedInBlock != usedInBlock { continue } - hdrsForCurrentBlock[hdrInfo.hdr.GetShardID()] = append(hdrsForCurrentBlock[hdrInfo.hdr.GetShardID()], hdrInfo.hdr) + hdrsForCurrentBlock[headerInfo.hdr.GetShardID()] = append(hdrsForCurrentBlock[headerInfo.hdr.GetShardID()], headerInfo.hdr) } bp.hdrsForCurrBlock.mutHdrsForBlock.RUnlock() @@ -611,13 +611,13 @@ func (bp *baseProcessor) sortHeaderHashesForCurrentBlockByNonce(usedInBlock bool hdrsForCurrentBlockInfo := make(map[uint32][]*nonceAndHashInfo) bp.hdrsForCurrBlock.mutHdrsForBlock.RLock() - for metaBlockHash, hdrInfo := range bp.hdrsForCurrBlock.hdrHashAndInfo { - if hdrInfo.usedInBlock != usedInBlock { + for metaBlockHash, headerInfo := range bp.hdrsForCurrBlock.hdrHashAndInfo { + if headerInfo.usedInBlock != usedInBlock { continue } - hdrsForCurrentBlockInfo[hdrInfo.hdr.GetShardID()] = append(hdrsForCurrentBlockInfo[hdrInfo.hdr.GetShardID()], - &nonceAndHashInfo{nonce: hdrInfo.hdr.GetNonce(), hash: []byte(metaBlockHash)}) + hdrsForCurrentBlockInfo[headerInfo.hdr.GetShardID()] = append(hdrsForCurrentBlockInfo[headerInfo.hdr.GetShardID()], + &nonceAndHashInfo{nonce: headerInfo.hdr.GetNonce(), hash: []byte(metaBlockHash)}) } bp.hdrsForCurrBlock.mutHdrsForBlock.RUnlock() @@ -665,3 +665,49 @@ func (bp *baseProcessor) wasHeaderRequested(shardId uint32, nonce uint64) bool { return false } + +// requestMissingFinalityAttestingHeaders requests the headers needed to accept the current selected headers for +// processing the current block. It requests the finality headers greater than the highest header, for given shard, +// related to the block which should be processed +func (bp *baseProcessor) requestMissingFinalityAttestingHeaders( + shardId uint32, + finality uint32, + getHeaderFromPoolWithNonce func(uint64, uint32) (data.HeaderHandler, []byte, error), +) uint32 { + requestedHeaders := uint32(0) + missingFinalityAttestingHeaders := uint32(0) + + highestHdrNonce := bp.hdrsForCurrBlock.highestHdrNonce[shardId] + if highestHdrNonce == uint64(0) { + return missingFinalityAttestingHeaders + } + + lastFinalityAttestingHeader := bp.hdrsForCurrBlock.highestHdrNonce[shardId] + uint64(finality) + for i := highestHdrNonce + 1; i <= lastFinalityAttestingHeader; i++ { + header, headerHash, err := getHeaderFromPoolWithNonce( + i, + shardId) + + if err != nil { + missingFinalityAttestingHeaders++ + wasHeaderRequested := bp.wasHeaderRequested(shardId, i) + if !wasHeaderRequested { + requestedHeaders++ + bp.hdrsForCurrBlock.requestedFinalityAttestingHdrs[shardId] = append(bp.hdrsForCurrBlock.requestedFinalityAttestingHdrs[shardId], i) + go bp.onRequestHeaderHandlerByNonce(shardId, i) + } + + continue + } + + bp.hdrsForCurrBlock.hdrHashAndInfo[string(headerHash)] = &hdrInfo{hdr: header, usedInBlock: false} + } + + if requestedHeaders > 0 { + log.Info(fmt.Sprintf("requested %d missing finality attesting headers for shard %d\n", + requestedHeaders, + shardId)) + } + + return missingFinalityAttestingHeaders +} diff --git a/process/block/export_test.go b/process/block/export_test.go index b0a6e96da0d..a40673da7ea 100644 --- a/process/block/export_test.go +++ b/process/block/export_test.go @@ -135,11 +135,11 @@ func (mp *metaProcessor) ProcessBlockHeaders(header *block.MetaBlock, round uint return mp.processBlockHeaders(header, round, haveTime) } -func (mp *metaProcessor) RequestMissingFinalityAttestingHeaders() uint32 { +func (mp *metaProcessor) RequestMissingFinalityAttestingShardHeaders() uint32 { mp.hdrsForCurrBlock.mutHdrsForBlock.Lock() defer mp.hdrsForCurrBlock.mutHdrsForBlock.Unlock() - return mp.requestMissingFinalityAttestingHeaders() + return mp.requestMissingFinalityAttestingShardHeaders() } func (bp *baseProcessor) NotarizedHdrs() map[uint32][]data.HeaderHandler { @@ -232,7 +232,10 @@ func (sp *shardProcessor) RequestMissingFinalityAttestingHeaders() uint32 { sp.hdrsForCurrBlock.mutHdrsForBlock.Lock() defer sp.hdrsForCurrBlock.mutHdrsForBlock.Unlock() - return sp.requestMissingFinalityAttestingHeaders() + return sp.requestMissingFinalityAttestingHeaders( + sharding.MetachainShardId, + sp.metaBlockFinality, + sp.getMetaHeaderFromPoolWithNonce) } func (sp *shardProcessor) CheckMetaHeadersValidityAndFinality() error { diff --git a/process/block/metablock.go b/process/block/metablock.go index 76df685bf44..541cef1aae4 100644 --- a/process/block/metablock.go +++ b/process/block/metablock.go @@ -276,13 +276,13 @@ func (mp *metaProcessor) removeBlockInfoFromPool(header *block.MetaBlock) error mp.hdrsForCurrBlock.mutHdrsForBlock.RLock() for i := 0; i < len(header.ShardInfo); i++ { shardHeaderHash := header.ShardInfo[i].HeaderHash - hdrInfo, ok := mp.hdrsForCurrBlock.hdrHashAndInfo[string(shardHeaderHash)] + headerInfo, ok := mp.hdrsForCurrBlock.hdrHashAndInfo[string(shardHeaderHash)] if !ok { mp.hdrsForCurrBlock.mutHdrsForBlock.RUnlock() return process.ErrMissingHeader } - shardBlock, ok := hdrInfo.hdr.(*block.Header) + shardBlock, ok := headerInfo.hdr.(*block.Header) if !ok { mp.hdrsForCurrBlock.mutHdrsForBlock.RUnlock() return process.ErrWrongTypeAssertion @@ -470,13 +470,13 @@ func (mp *metaProcessor) CommitBlock( mp.hdrsForCurrBlock.mutHdrsForBlock.RLock() for i := 0; i < len(header.ShardInfo); i++ { shardHeaderHash := header.ShardInfo[i].HeaderHash - hdrInfo, ok := mp.hdrsForCurrBlock.hdrHashAndInfo[string(shardHeaderHash)] + headerInfo, ok := mp.hdrsForCurrBlock.hdrHashAndInfo[string(shardHeaderHash)] if !ok { mp.hdrsForCurrBlock.mutHdrsForBlock.RUnlock() return process.ErrMissingHeader } - shardBlock, ok := hdrInfo.hdr.(*block.Header) + shardBlock, ok := headerInfo.hdr.(*block.Header) if !ok { mp.hdrsForCurrBlock.mutHdrsForBlock.RUnlock() return process.ErrWrongTypeAssertion @@ -624,13 +624,13 @@ func (mp *metaProcessor) saveLastNotarizedHeader(header *block.MetaBlock) error mp.hdrsForCurrBlock.mutHdrsForBlock.RLock() for i := 0; i < len(header.ShardInfo); i++ { shardHeaderHash := header.ShardInfo[i].HeaderHash - hdrInfo, ok := mp.hdrsForCurrBlock.hdrHashAndInfo[string(shardHeaderHash)] + headerInfo, ok := mp.hdrsForCurrBlock.hdrHashAndInfo[string(shardHeaderHash)] if !ok { mp.hdrsForCurrBlock.mutHdrsForBlock.RUnlock() return process.ErrMissingHeader } - shardHdr, ok := hdrInfo.hdr.(*block.Header) + shardHdr, ok := headerInfo.hdr.(*block.Header) if !ok { mp.hdrsForCurrBlock.mutHdrsForBlock.RUnlock() return process.ErrWrongTypeAssertion @@ -813,7 +813,7 @@ func (mp *metaProcessor) receivedShardHeader(shardHeaderHash []byte) { if mp.hdrsForCurrBlock.missingHdrs == 0 { missingFinalityAttestingShardHdrs := mp.hdrsForCurrBlock.missingFinalityAttestingHdrs - mp.hdrsForCurrBlock.missingFinalityAttestingHdrs = mp.requestMissingFinalityAttestingHeaders() + mp.hdrsForCurrBlock.missingFinalityAttestingHdrs = mp.requestMissingFinalityAttestingShardHeaders() if mp.hdrsForCurrBlock.missingFinalityAttestingHdrs == 0 { log.Info(fmt.Sprintf("received %d missing finality attesting shard headers\n", missingFinalityAttestingShardHdrs)) } @@ -844,48 +844,22 @@ func (mp *metaProcessor) receivedShardHeader(shardHeaderHash []byte) { } } -// requestMissingFinalityAttestingHeaders requests the headers needed to accept the current selected headers for processing the -// current block. It requests the shardBlockFinality headers greater than the highest shard header, for each shard, related -// to the block which should be processed -func (mp *metaProcessor) requestMissingFinalityAttestingHeaders() uint32 { - requestedBlockHeaders := uint32(0) - missingFinalityAttestingHeaders := uint32(0) +// requestMissingFinalityAttestingShardHeaders requests the headers needed to accept the current selected headers for +// processing the current block. It requests the shardBlockFinality headers greater than the highest shard header, +// for each shard, related to the block which should be processed +func (mp *metaProcessor) requestMissingFinalityAttestingShardHeaders() uint32 { + missingFinalityAttestingShardHeaders := uint32(0) for shardId := uint32(0); shardId < mp.shardCoordinator.NumberOfShards(); shardId++ { - highestHdrNonce := mp.hdrsForCurrBlock.highestHdrNonce[shardId] - if highestHdrNonce == uint64(0) { - continue - } - - lastFinalityAttestingHeader := mp.hdrsForCurrBlock.highestHdrNonce[shardId] + uint64(mp.shardBlockFinality) - for i := highestHdrNonce + 1; i <= lastFinalityAttestingHeader; i++ { - shardHeader, shardHeaderHash, err := process.GetShardHeaderFromPoolWithNonce( - i, - shardId, - mp.dataPool.ShardHeaders(), - mp.dataPool.HeadersNonces()) - - if err != nil { - missingFinalityAttestingHeaders++ - wasHeaderRequested := mp.wasHeaderRequested(shardId, i) - if !wasHeaderRequested { - requestedBlockHeaders++ - mp.hdrsForCurrBlock.requestedFinalityAttestingHdrs[shardId] = append(mp.hdrsForCurrBlock.requestedFinalityAttestingHdrs[shardId], i) - go mp.onRequestHeaderHandlerByNonce(shardId, i) - } + missingFinalityAttestingHeaders := mp.requestMissingFinalityAttestingHeaders( + shardId, + mp.shardBlockFinality, + mp.getShardHeaderFromPoolWithNonce) - continue - } - - mp.hdrsForCurrBlock.hdrHashAndInfo[string(shardHeaderHash)] = &hdrInfo{hdr: shardHeader, usedInBlock: false} - } - } - - if requestedBlockHeaders > 0 { - log.Info(fmt.Sprintf("requested %d missing finality attesting shard headers\n", requestedBlockHeaders)) + missingFinalityAttestingShardHeaders += missingFinalityAttestingHeaders } - return missingFinalityAttestingHeaders + return missingFinalityAttestingShardHeaders } func (mp *metaProcessor) requestShardHeaders(metaBlock *block.MetaBlock) (uint32, uint32) { @@ -906,7 +880,7 @@ func (mp *metaProcessor) requestShardHeaders(metaBlock *block.MetaBlock) (uint32 } if mp.hdrsForCurrBlock.missingHdrs == 0 { - mp.hdrsForCurrBlock.missingFinalityAttestingHdrs = mp.requestMissingFinalityAttestingHeaders() + mp.hdrsForCurrBlock.missingFinalityAttestingHdrs = mp.requestMissingFinalityAttestingShardHeaders() } requestedHdrs := mp.hdrsForCurrBlock.missingHdrs @@ -1306,3 +1280,13 @@ func (mp *metaProcessor) IsInterfaceNil() bool { } return false } + +func (mp *metaProcessor) getShardHeaderFromPoolWithNonce(nonce uint64, shardId uint32) (data.HeaderHandler, []byte, error) { + shardHeader, shardHeaderHash, err := process.GetShardHeaderFromPoolWithNonce( + nonce, + shardId, + mp.dataPool.ShardHeaders(), + mp.dataPool.HeadersNonces()) + + return shardHeader, shardHeaderHash, err +} diff --git a/process/block/metablock_test.go b/process/block/metablock_test.go index 4b1eb1a5e74..706c9a9bebc 100644 --- a/process/block/metablock_test.go +++ b/process/block/metablock_test.go @@ -488,7 +488,7 @@ func TestMetaProcessor_RequestFinalMissingHeaderShouldPass(t *testing.T) { mp.SetHighestHdrNonceForCurrentBlock(0, 1) mp.SetHighestHdrNonceForCurrentBlock(1, 2) mp.SetHighestHdrNonceForCurrentBlock(2, 3) - res := mp.RequestMissingFinalityAttestingHeaders() + res := mp.RequestMissingFinalityAttestingShardHeaders() assert.Equal(t, res, uint32(3)) } diff --git a/process/block/shardblock.go b/process/block/shardblock.go index a65b9b33fc7..6a963c46760 100644 --- a/process/block/shardblock.go +++ b/process/block/shardblock.go @@ -861,13 +861,13 @@ func (sp *shardProcessor) addProcessedCrossMiniBlocksFromHeader(header *block.He sp.hdrsForCurrBlock.mutHdrsForBlock.RLock() for _, metaBlockHash := range header.MetaBlockHashes { - hdrInfo, ok := sp.hdrsForCurrBlock.hdrHashAndInfo[string(metaBlockHash)] + headerInfo, ok := sp.hdrsForCurrBlock.hdrHashAndInfo[string(metaBlockHash)] if !ok { sp.hdrsForCurrBlock.mutHdrsForBlock.RUnlock() return process.ErrMissingHeader } - metaBlock, ok := hdrInfo.hdr.(*block.MetaBlock) + metaBlock, ok := headerInfo.hdr.(*block.MetaBlock) if !ok { sp.hdrsForCurrBlock.mutHdrsForBlock.RUnlock() return process.ErrWrongTypeAssertion @@ -924,12 +924,12 @@ func (sp *shardProcessor) getOrderedProcessedMetaBlocksFromMiniBlockHashes( processedCrossMiniBlocksHashes := make(map[string]bool) sp.hdrsForCurrBlock.mutHdrsForBlock.RLock() - for metaBlockHash, hdrInfo := range sp.hdrsForCurrBlock.hdrHashAndInfo { - if !hdrInfo.usedInBlock { + for metaBlockHash, headerInfo := range sp.hdrsForCurrBlock.hdrHashAndInfo { + if !headerInfo.usedInBlock { continue } - metaBlock, ok := hdrInfo.hdr.(*block.MetaBlock) + metaBlock, ok := headerInfo.hdr.(*block.MetaBlock) if !ok { sp.hdrsForCurrBlock.mutHdrsForBlock.RUnlock() return nil, process.ErrWrongTypeAssertion @@ -1071,7 +1071,10 @@ func (sp *shardProcessor) receivedMetaBlock(metaBlockHash []byte) { // attesting something if sp.hdrsForCurrBlock.missingHdrs == 0 { missingFinalityAttestingMetaHdrs := sp.hdrsForCurrBlock.missingFinalityAttestingHdrs - sp.hdrsForCurrBlock.missingFinalityAttestingHdrs = sp.requestMissingFinalityAttestingHeaders() + sp.hdrsForCurrBlock.missingFinalityAttestingHdrs = sp.requestMissingFinalityAttestingHeaders( + sharding.MetachainShardId, + sp.metaBlockFinality, + sp.getMetaHeaderFromPoolWithNonce) if sp.hdrsForCurrBlock.missingFinalityAttestingHdrs == 0 { log.Info(fmt.Sprintf("received %d missing finality attesting meta headers\n", missingFinalityAttestingMetaHdrs)) } @@ -1113,47 +1116,6 @@ func (sp *shardProcessor) receivedMetaBlock(metaBlockHash []byte) { sp.txCoordinator.RequestMiniBlocks(metaBlock) } -// requestMissingFinalityAttestingHeaders requests the headers needed to accept the current selected headers for processing the -// current block. It requests the metaBlockFinality headers greater than the highest meta header related to the block -// which should be processed -func (sp *shardProcessor) requestMissingFinalityAttestingHeaders() uint32 { - requestedBlockHeaders := uint32(0) - missingFinalityAttestingHeaders := uint32(0) - - highestHdrNonce := sp.hdrsForCurrBlock.highestHdrNonce[sharding.MetachainShardId] - if highestHdrNonce == uint64(0) { - return missingFinalityAttestingHeaders - } - - lastFinalityAttestingHeader := sp.hdrsForCurrBlock.highestHdrNonce[sharding.MetachainShardId] + uint64(sp.metaBlockFinality) - for i := highestHdrNonce + 1; i <= lastFinalityAttestingHeader; i++ { - metaBlock, metaBlockHash, err := process.GetMetaHeaderFromPoolWithNonce( - i, - sp.dataPool.MetaBlocks(), - sp.dataPool.HeadersNonces()) - - if err != nil { - missingFinalityAttestingHeaders++ - wasHeaderRequested := sp.wasHeaderRequested(sharding.MetachainShardId, i) - if !wasHeaderRequested { - requestedBlockHeaders++ - sp.hdrsForCurrBlock.requestedFinalityAttestingHdrs[sharding.MetachainShardId] = append(sp.hdrsForCurrBlock.requestedFinalityAttestingHdrs[sharding.MetachainShardId], i) - go sp.onRequestHeaderHandlerByNonce(sharding.MetachainShardId, i) - } - - continue - } - - sp.hdrsForCurrBlock.hdrHashAndInfo[string(metaBlockHash)] = &hdrInfo{hdr: metaBlock, usedInBlock: false} - } - - if requestedBlockHeaders > 0 { - log.Info(fmt.Sprintf("requested %d missing finality attesting meta headers\n", requestedBlockHeaders)) - } - - return missingFinalityAttestingHeaders -} - func (sp *shardProcessor) requestMetaHeaders(shardHeader *block.Header) (uint32, uint32) { _ = process.EmptyChannel(sp.chRcvAllMetaHdrs) @@ -1170,7 +1132,10 @@ func (sp *shardProcessor) requestMetaHeaders(shardHeader *block.Header) (uint32, } if sp.hdrsForCurrBlock.missingHdrs == 0 { - sp.hdrsForCurrBlock.missingFinalityAttestingHdrs = sp.requestMissingFinalityAttestingHeaders() + sp.hdrsForCurrBlock.missingFinalityAttestingHdrs = sp.requestMissingFinalityAttestingHeaders( + sharding.MetachainShardId, + sp.metaBlockFinality, + sp.getMetaHeaderFromPoolWithNonce) } requestedHdrs := sp.hdrsForCurrBlock.missingHdrs @@ -1232,11 +1197,11 @@ func (sp *shardProcessor) getAllMiniBlockDstMeFromMeta(header *block.Header) (ma sp.hdrsForCurrBlock.mutHdrsForBlock.RLock() for _, metaBlockHash := range header.MetaBlockHashes { - hdrInfo, ok := sp.hdrsForCurrBlock.hdrHashAndInfo[string(metaBlockHash)] + headerInfo, ok := sp.hdrsForCurrBlock.hdrHashAndInfo[string(metaBlockHash)] if !ok { continue } - metaBlock, ok := hdrInfo.hdr.(*block.MetaBlock) + metaBlock, ok := headerInfo.hdr.(*block.MetaBlock) if !ok { continue } @@ -1718,3 +1683,12 @@ func (sp *shardProcessor) getMaxMiniBlocksSpaceRemained( return maxMbSpaceRemained } + +func (sp *shardProcessor) getMetaHeaderFromPoolWithNonce(nonce uint64, shardId uint32) (data.HeaderHandler, []byte, error) { + metaHeader, metaHeaderHash, err := process.GetMetaHeaderFromPoolWithNonce( + nonce, + sp.dataPool.MetaBlocks(), + sp.dataPool.HeadersNonces()) + + return metaHeader, metaHeaderHash, err +} From 70d0ce4d0f8dffed678c9c9204f590bda51c7d8d Mon Sep 17 00:00:00 2001 From: Sebastian Marian Date: Mon, 21 Oct 2019 19:46:36 +0300 Subject: [PATCH 7/8] * Fix after review --- process/block/baseProcess_test.go | 27 ++++++++++++++----- process/block/metablock.go | 15 ++++++++--- process/block/metablock_test.go | 3 +++ .../block/preprocess/rewardTxPreProcessor.go | 4 +++ .../block/preprocess/smartContractResults.go | 4 +++ process/block/preprocess/transactions.go | 4 +++ process/block/shardblock.go | 16 ++++++++--- 7 files changed, 59 insertions(+), 14 deletions(-) diff --git a/process/block/baseProcess_test.go b/process/block/baseProcess_test.go index b5fc2f2266e..86a3d60dbba 100644 --- a/process/block/baseProcess_test.go +++ b/process/block/baseProcess_test.go @@ -70,6 +70,9 @@ func createShardedDataChacherNotifier( LenCalled: func() int { return 0 }, + MaxSizeCalled: func() int { + return 1000 + }, } }, RemoveSetOfDataFromPoolCalled: func(keys [][]byte, id string) {}, @@ -124,6 +127,9 @@ func initDataPool(testHash []byte) *mock.PoolsHolderStub { LenCalled: func() int { return 0 }, + MaxSizeCalled: func() int { + return 1000 + }, PeekCalled: func(key []byte) (value interface{}, ok bool) { if reflect.DeepEqual(key, []byte("tx1_hash")) { return &transaction.Transaction{Nonce: 10}, true @@ -132,9 +138,6 @@ func initDataPool(testHash []byte) *mock.PoolsHolderStub { }, RegisterHandlerCalled: func(i func(key []byte)) {}, RemoveCalled: func(key []byte) {}, - MaxSizeCalled: func() int { - return 10000 - }, } }, MiniBlocksCalled: func() storage.Cacher { @@ -160,6 +163,9 @@ func initDataPool(testHash []byte) *mock.PoolsHolderStub { cs.LenCalled = func() int { return 0 } + cs.MaxSizeCalled = func() int { + return 300 + } return cs }, HeadersCalled: func() storage.Cacher { @@ -171,6 +177,9 @@ func initDataPool(testHash []byte) *mock.PoolsHolderStub { cs.LenCalled = func() int { return 0 } + cs.MaxSizeCalled = func() int { + return 1000 + } return cs }, } @@ -193,6 +202,9 @@ func initMetaDataPool() *mock.MetaPoolsHolderStub { LenCalled: func() int { return 0 }, + MaxSizeCalled: func() int { + return 1000 + }, PeekCalled: func(key []byte) (value interface{}, ok bool) { if reflect.DeepEqual(key, []byte("tx1_hash")) { return &transaction.Transaction{Nonce: 10}, true @@ -216,6 +228,9 @@ func initMetaDataPool() *mock.MetaPoolsHolderStub { cs.LenCalled = func() int { return 0 } + cs.MaxSizeCalled = func() int { + return 300 + } cs.RemoveCalled = func(key []byte) {} cs.KeysCalled = func() [][]byte { return nil @@ -235,13 +250,13 @@ func initMetaDataPool() *mock.MetaPoolsHolderStub { cs.LenCalled = func() int { return 0 } + cs.MaxSizeCalled = func() int { + return 1000 + } cs.RemoveCalled = func(key []byte) {} cs.KeysCalled = func() [][]byte { return nil } - cs.MaxSizeCalled = func() int { - return 10000 - } return cs }, HeadersNoncesCalled: func() dataRetriever.Uint64SyncMapCacher { diff --git a/process/block/metablock.go b/process/block/metablock.go index 541cef1aae4..7fd91d91890 100644 --- a/process/block/metablock.go +++ b/process/block/metablock.go @@ -563,9 +563,11 @@ func (mp *metaProcessor) CommitBlock( mp.blockSizeThrottler.Succeed(header.Round) - log.Info(fmt.Sprintf("Pools len: MetaBlocks = %d ShardHeaders = %d\n", + log.Info(fmt.Sprintf("pools info: MetaBlocks = %d from %d, ShardHeaders = %d from %d\n", mp.dataPool.MetaBlocks().Len(), + mp.dataPool.MetaBlocks().MaxSize(), mp.dataPool.ShardHeaders().Len(), + mp.dataPool.ShardHeaders().MaxSize(), )) return nil @@ -814,8 +816,9 @@ func (mp *metaProcessor) receivedShardHeader(shardHeaderHash []byte) { if mp.hdrsForCurrBlock.missingHdrs == 0 { missingFinalityAttestingShardHdrs := mp.hdrsForCurrBlock.missingFinalityAttestingHdrs mp.hdrsForCurrBlock.missingFinalityAttestingHdrs = mp.requestMissingFinalityAttestingShardHeaders() - if mp.hdrsForCurrBlock.missingFinalityAttestingHdrs == 0 { - log.Info(fmt.Sprintf("received %d missing finality attesting shard headers\n", missingFinalityAttestingShardHdrs)) + if missingFinalityAttestingShardHdrs > mp.hdrsForCurrBlock.missingFinalityAttestingHdrs { + log.Info(fmt.Sprintf("received %d missing finality attesting shard headers\n", + missingFinalityAttestingShardHdrs-mp.hdrsForCurrBlock.missingFinalityAttestingHdrs)) } } @@ -1281,7 +1284,11 @@ func (mp *metaProcessor) IsInterfaceNil() bool { return false } -func (mp *metaProcessor) getShardHeaderFromPoolWithNonce(nonce uint64, shardId uint32) (data.HeaderHandler, []byte, error) { +func (mp *metaProcessor) getShardHeaderFromPoolWithNonce( + nonce uint64, + shardId uint32, +) (data.HeaderHandler, []byte, error) { + shardHeader, shardHeaderHash, err := process.GetShardHeaderFromPoolWithNonce( nonce, shardId, diff --git a/process/block/metablock_test.go b/process/block/metablock_test.go index 706c9a9bebc..a410517ecb7 100644 --- a/process/block/metablock_test.go +++ b/process/block/metablock_test.go @@ -707,6 +707,9 @@ func TestMetaProcessor_CommitBlockOkValsShouldWork(t *testing.T) { cs.LenCalled = func() int { return 0 } + cs.MaxSizeCalled = func() int { + return 1000 + } return cs } diff --git a/process/block/preprocess/rewardTxPreProcessor.go b/process/block/preprocess/rewardTxPreProcessor.go index 0b5c36a0191..8dbe8f5a062 100644 --- a/process/block/preprocess/rewardTxPreProcessor.go +++ b/process/block/preprocess/rewardTxPreProcessor.go @@ -375,6 +375,10 @@ func (rtp *rewardTxPreprocessor) processRewardTransaction( // RequestTransactionsForMiniBlock requests missing reward transactions for a certain miniblock func (rtp *rewardTxPreprocessor) RequestTransactionsForMiniBlock(miniBlock *block.MiniBlock) int { + if miniBlock == nil { + return 0 + } + missingRewardTxsForMiniBlock := rtp.computeMissingRewardTxsForMiniBlock(miniBlock) if len(missingRewardTxsForMiniBlock) > 0 { rtp.onRequestRewardTx(miniBlock.SenderShardID, missingRewardTxsForMiniBlock) diff --git a/process/block/preprocess/smartContractResults.go b/process/block/preprocess/smartContractResults.go index a0ce55d1a75..65f29ff3418 100644 --- a/process/block/preprocess/smartContractResults.go +++ b/process/block/preprocess/smartContractResults.go @@ -352,6 +352,10 @@ func (scr *smartContractResults) processSmartContractResult( // RequestTransactionsForMiniBlock requests missing smartContractResults for a certain miniblock func (scr *smartContractResults) RequestTransactionsForMiniBlock(miniBlock *block.MiniBlock) int { + if miniBlock == nil { + return 0 + } + missingScrsForMiniBlock := scr.computeMissingScrsForMiniBlock(miniBlock) if len(missingScrsForMiniBlock) > 0 { scr.onRequestSmartContractResult(miniBlock.SenderShardID, missingScrsForMiniBlock) diff --git a/process/block/preprocess/transactions.go b/process/block/preprocess/transactions.go index 3a10c264c72..2462b5b7786 100644 --- a/process/block/preprocess/transactions.go +++ b/process/block/preprocess/transactions.go @@ -353,6 +353,10 @@ func (txs *transactions) processAndRemoveBadTransaction( // RequestTransactionsForMiniBlock requests missing transactions for a certain miniblock func (txs *transactions) RequestTransactionsForMiniBlock(miniBlock *block.MiniBlock) int { + if miniBlock == nil { + return 0 + } + missingTxsForMiniBlock := txs.computeMissingTxsForMiniBlock(miniBlock) if len(missingTxsForMiniBlock) > 0 { txs.onRequestTransaction(miniBlock.SenderShardID, missingTxsForMiniBlock) diff --git a/process/block/shardblock.go b/process/block/shardblock.go index 6a963c46760..e580b91690c 100644 --- a/process/block/shardblock.go +++ b/process/block/shardblock.go @@ -743,10 +743,13 @@ func (sp *shardProcessor) CommitBlock( sp.blockSizeThrottler.Succeed(header.Round) - log.Info(fmt.Sprintf("Pools len: Headers = %d MetaBlocks = %d MiniBlocks = %d\n", + log.Info(fmt.Sprintf("pools info: Headers = %d from %d, MetaBlocks = %d from %d, MiniBlocks = %d from %d\n", sp.dataPool.Headers().Len(), + sp.dataPool.Headers().MaxSize(), sp.dataPool.MetaBlocks().Len(), + sp.dataPool.MetaBlocks().MaxSize(), sp.dataPool.MiniBlocks().Len(), + sp.dataPool.MiniBlocks().MaxSize(), )) return nil @@ -1075,8 +1078,9 @@ func (sp *shardProcessor) receivedMetaBlock(metaBlockHash []byte) { sharding.MetachainShardId, sp.metaBlockFinality, sp.getMetaHeaderFromPoolWithNonce) - if sp.hdrsForCurrBlock.missingFinalityAttestingHdrs == 0 { - log.Info(fmt.Sprintf("received %d missing finality attesting meta headers\n", missingFinalityAttestingMetaHdrs)) + if missingFinalityAttestingMetaHdrs > sp.hdrsForCurrBlock.missingFinalityAttestingHdrs { + log.Info(fmt.Sprintf("received %d missing finality attesting meta headers\n", + missingFinalityAttestingMetaHdrs-sp.hdrsForCurrBlock.missingFinalityAttestingHdrs)) } } @@ -1684,7 +1688,11 @@ func (sp *shardProcessor) getMaxMiniBlocksSpaceRemained( return maxMbSpaceRemained } -func (sp *shardProcessor) getMetaHeaderFromPoolWithNonce(nonce uint64, shardId uint32) (data.HeaderHandler, []byte, error) { +func (sp *shardProcessor) getMetaHeaderFromPoolWithNonce( + nonce uint64, + shardId uint32, +) (data.HeaderHandler, []byte, error) { + metaHeader, metaHeaderHash, err := process.GetMetaHeaderFromPoolWithNonce( nonce, sp.dataPool.MetaBlocks(), From 490b7928502b19f76cb45f9a0766753d26ff4318 Mon Sep 17 00:00:00 2001 From: Sebastian Marian Date: Mon, 21 Oct 2019 20:00:47 +0300 Subject: [PATCH 8/8] * Fixed print --- process/block/metablock.go | 6 ++---- process/block/shardblock.go | 6 ++---- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/process/block/metablock.go b/process/block/metablock.go index 7fd91d91890..a8418ee0dec 100644 --- a/process/block/metablock.go +++ b/process/block/metablock.go @@ -814,11 +814,9 @@ func (mp *metaProcessor) receivedShardHeader(shardHeaderHash []byte) { } if mp.hdrsForCurrBlock.missingHdrs == 0 { - missingFinalityAttestingShardHdrs := mp.hdrsForCurrBlock.missingFinalityAttestingHdrs mp.hdrsForCurrBlock.missingFinalityAttestingHdrs = mp.requestMissingFinalityAttestingShardHeaders() - if missingFinalityAttestingShardHdrs > mp.hdrsForCurrBlock.missingFinalityAttestingHdrs { - log.Info(fmt.Sprintf("received %d missing finality attesting shard headers\n", - missingFinalityAttestingShardHdrs-mp.hdrsForCurrBlock.missingFinalityAttestingHdrs)) + if mp.hdrsForCurrBlock.missingFinalityAttestingHdrs == 0 { + log.Info(fmt.Sprintf("received all missing finality attesting shard headers\n")) } } diff --git a/process/block/shardblock.go b/process/block/shardblock.go index e580b91690c..8a8231b60a8 100644 --- a/process/block/shardblock.go +++ b/process/block/shardblock.go @@ -1073,14 +1073,12 @@ func (sp *shardProcessor) receivedMetaBlock(metaBlockHash []byte) { // attesting something if sp.hdrsForCurrBlock.missingHdrs == 0 { - missingFinalityAttestingMetaHdrs := sp.hdrsForCurrBlock.missingFinalityAttestingHdrs sp.hdrsForCurrBlock.missingFinalityAttestingHdrs = sp.requestMissingFinalityAttestingHeaders( sharding.MetachainShardId, sp.metaBlockFinality, sp.getMetaHeaderFromPoolWithNonce) - if missingFinalityAttestingMetaHdrs > sp.hdrsForCurrBlock.missingFinalityAttestingHdrs { - log.Info(fmt.Sprintf("received %d missing finality attesting meta headers\n", - missingFinalityAttestingMetaHdrs-sp.hdrsForCurrBlock.missingFinalityAttestingHdrs)) + if sp.hdrsForCurrBlock.missingFinalityAttestingHdrs == 0 { + log.Info(fmt.Sprintf("received all missing finality attesting meta headers\n")) } }