From 140578d836590e70e613b7769833a55465c485cf Mon Sep 17 00:00:00 2001 From: Sebastian Marian Date: Fri, 4 Oct 2019 20:06:29 +0300 Subject: [PATCH] * Fixed after review --- core/computers.go | 8 +- process/block/export_test.go | 20 ++-- process/block/shardblock.go | 179 +++++++++++++++++-------------- process/block/shardblock_test.go | 20 ++-- process/common.go | 11 ++ process/common_test.go | 19 ++++ process/interface.go | 2 +- 7 files changed, 160 insertions(+), 99 deletions(-) diff --git a/core/computers.go b/core/computers.go index e06d1742b98..d04b880699f 100644 --- a/core/computers.go +++ b/core/computers.go @@ -1,6 +1,6 @@ package core -// MaxInt32 returns the maximum number between two given +// MaxInt32 returns the maximum of two given numbers func MaxInt32(a int32, b int32) int32 { if a > b { return a @@ -8,7 +8,7 @@ func MaxInt32(a int32, b int32) int32 { return b } -// MinInt32 returns the minimum number between two given +// MinInt32 returns the minimum of two given numbers func MinInt32(a int32, b int32) int32 { if a < b { return a @@ -16,7 +16,7 @@ func MinInt32(a int32, b int32) int32 { return b } -// MaxUint32 returns the maximum number between two given +// MaxUint32 returns the maximum of two given numbers func MaxUint32(a uint32, b uint32) uint32 { if a > b { return a @@ -24,7 +24,7 @@ func MaxUint32(a uint32, b uint32) uint32 { return b } -// MinUint32 returns the minimum number between two given +// MinUint32 returns the minimum of two given numbers func MinUint32(a uint32, b uint32) uint32 { if a < b { return a diff --git a/process/block/export_test.go b/process/block/export_test.go index 08856ae8b10..1e4b5b3db2e 100644 --- a/process/block/export_test.go +++ b/process/block/export_test.go @@ -44,8 +44,8 @@ func (sp *shardProcessor) CreateMiniBlocks(noShards uint32, maxItemsInBlock uint return sp.createMiniBlocks(noShards, maxItemsInBlock, round, haveTime) } -func (sp *shardProcessor) GetProcessedMetaBlocksFromHeader(header *block.Header) ([]data.HeaderHandler, error) { - return sp.getProcessedMetaBlocksFromHeader(header) +func (sp *shardProcessor) GetOrderedProcessedMetaBlocksFromHeader(header *block.Header) ([]data.HeaderHandler, error) { + return sp.getOrderedProcessedMetaBlocksFromHeader(header) } func (sp *shardProcessor) RemoveProcessedMetaBlocksFromPool(processedMetaHdrs []data.HeaderHandler) error { @@ -245,8 +245,8 @@ func (sp *shardProcessor) GetHashAndHdrStruct(header data.HeaderHandler, hash [] return &hashAndHdr{header, hash} } -func (sp *shardProcessor) RequestFinalMissingHeaders() uint32 { - return sp.requestFinalMissingHeaders() +func (sp *shardProcessor) RequestMissingFinalityAttestingHeaders() uint32 { + return sp.requestMissingFinalityAttestingHeaders() } func (sp *shardProcessor) CheckMetaHeadersValidityAndFinality() error { @@ -270,8 +270,8 @@ func (bp *baseProcessor) SetBlockSizeThrottler(blockSizeThrottler process.BlockS bp.blockSizeThrottler = blockSizeThrottler } -func (sp *shardProcessor) SetCurrHighestMetaHdrNonce(value uint64) { - sp.currHighestMetaHdrNonce = value +func (sp *shardProcessor) SetHighestHdrNonceForCurrentBlock(value uint64) { + sp.hdrsForCurrBlock.highestHdrNonce = value } func (sp *shardProcessor) DisplayLogInfo( @@ -324,3 +324,11 @@ func (sp *shardProcessor) CalculateRoundDuration( ) uint64 { return sp.calculateRoundDuration(lastBlockTimestamp, currentBlockTimestamp, lastBlockRound, currentBlockRound) } + +func (sp *shardProcessor) CreateBlockStarted() { + sp.createBlockStarted() +} + +func (sp *shardProcessor) AddProcessedCrossMiniBlocksFromHeader(header *block.Header) error { + return sp.addProcessedCrossMiniBlocksFromHeader(header) +} diff --git a/process/block/shardblock.go b/process/block/shardblock.go index 92457724ea6..b5e3c76b3f8 100644 --- a/process/block/shardblock.go +++ b/process/block/shardblock.go @@ -32,10 +32,11 @@ type hdrInfo struct { } type hdrForBlock struct { - missingHdrs uint32 - missingFinalHdrs uint32 - mutHdrsForBlock sync.RWMutex - hdrHashAndInfo map[string]*hdrInfo + missingHdrs uint32 + missingFinalityAttestingHdrs uint32 + highestHdrNonce uint64 + mutHdrsForBlock sync.RWMutex + hdrHashAndInfo map[string]*hdrInfo } // shardProcessor implements shardProcessor interface and actually it tries to execute block @@ -44,9 +45,8 @@ type shardProcessor struct { dataPool dataRetriever.PoolsHolder metaBlockFinality int - chRcvAllMetaHdrs chan bool - hdrsForCurrBlock hdrForBlock - currHighestMetaHdrNonce uint64 + chRcvAllMetaHdrs chan bool + hdrsForCurrBlock hdrForBlock processedMiniBlocks map[string]map[string]struct{} mutProcessedMiniBlocks sync.RWMutex @@ -205,9 +205,9 @@ func (sp *shardProcessor) ProcessBlock( return err } - sp.CreateBlockStarted() + sp.createBlockStarted() sp.txCoordinator.RequestBlockTransactions(body) - requestedMetaHdrs, requestedFinalMetaHdrs := sp.requestMetaHeaders(header) + requestedMetaHdrs, requestedFinalityAttestingMetaHdrs := sp.requestMetaHeaders(header) if haveTime() < 0 { return process.ErrTimeIsOut @@ -218,14 +218,17 @@ func (sp *shardProcessor) ProcessBlock( return err } - if requestedMetaHdrs > 0 || requestedFinalMetaHdrs > 0 { - log.Info(fmt.Sprintf("requested %d missing meta headers and %d final meta headers\n", requestedMetaHdrs, requestedFinalMetaHdrs)) + if requestedMetaHdrs > 0 || requestedFinalityAttestingMetaHdrs > 0 { + log.Info(fmt.Sprintf("requested %d missing meta headers and %d finality attesting meta headers\n", + requestedMetaHdrs, + requestedFinalityAttestingMetaHdrs)) + err = sp.waitForMetaHdrHashes(haveTime()) sp.hdrsForCurrBlock.mutHdrsForBlock.Lock() missingHdrs := sp.hdrsForCurrBlock.missingHdrs sp.hdrsForCurrBlock.missingHdrs = 0 - sp.hdrsForCurrBlock.missingFinalHdrs = 0 + sp.hdrsForCurrBlock.missingFinalityAttestingHdrs = 0 sp.hdrsForCurrBlock.mutHdrsForBlock.Unlock() if requestedMetaHdrs > 0 { @@ -261,7 +264,7 @@ func (sp *shardProcessor) ProcessBlock( } }() - processedMetaHdrs, err := sp.getProcessedMetaBlocksFromMiniBlocks(body) + processedMetaHdrs, err := sp.getOrderedProcessedMetaBlocksFromMiniBlocks(body) if err != nil { return err } @@ -320,7 +323,7 @@ func (sp *shardProcessor) checkMetaHeadersValidityAndFinality() error { return err } - usedMetaHdrs, err := sp.sortHdrsForCurrentBlock(true) + usedMetaHdrs, err := sp.sortMetaHeadersForCurrentBlockByNonce(true) if err != nil { return err } @@ -352,7 +355,7 @@ func (sp *shardProcessor) checkMetaHdrFinality(header data.HeaderHandler) error return process.ErrNilBlockHeader } - finalMetaHdrs, err := sp.sortHdrsForCurrentBlock(false) + finalityAttestingMetaHdrs, err := sp.sortMetaHeadersForCurrentBlockByNonce(false) if err != nil { return err } @@ -360,14 +363,14 @@ func (sp *shardProcessor) checkMetaHdrFinality(header data.HeaderHandler) error lastVerifiedHdr := header // verify if there are "K" block after current to make this one final nextBlocksVerified := 0 - for _, metaHdr := range finalMetaHdrs { + for _, metaHdr := range finalityAttestingMetaHdrs { if nextBlocksVerified >= sp.metaBlockFinality { break } // found a header with the next nonce if metaHdr.Nonce == lastVerifiedHdr.GetNonce()+1 { - err := sp.isHdrConstructionValid(metaHdr, lastVerifiedHdr) + err = sp.isHdrConstructionValid(metaHdr, lastVerifiedHdr) if err != nil { log.Debug(err.Error()) continue @@ -629,7 +632,7 @@ func (sp *shardProcessor) restoreMetaBlockIntoPool(miniBlockHashes map[string]ui // as long as the transactions limit for the block has not been reached and there is still time to add transactions func (sp *shardProcessor) CreateBlockBody(round uint64, haveTime func() bool) (data.BodyHandler, error) { log.Debug(fmt.Sprintf("started creating block body in round %d\n", round)) - sp.CreateBlockStarted() + sp.createBlockStarted() sp.blockSizeThrottler.ComputeMaxItems() miniBlocks, err := sp.createMiniBlocks(sp.shardCoordinator.NumberOfShards(), sp.blockSizeThrottler.MaxItemsToAdd(), round, haveTime) @@ -722,7 +725,12 @@ func (sp *shardProcessor) CommitBlock( log.LogIfError(errNotCritical) } - processedMetaHdrs, err := sp.getProcessedMetaBlocksFromHeader(header) + processedMetaHdrs, err := sp.getOrderedProcessedMetaBlocksFromHeader(header) + if err != nil { + return err + } + + err = sp.addProcessedCrossMiniBlocksFromHeader(header) if err != nil { return err } @@ -824,9 +832,7 @@ func (sp *shardProcessor) getHighestHdrForOwnShardFromMetachain( ownShIdHdrs := make([]data.HeaderHandler, 0) - sort.Slice(processedHdrs, func(i, j int) bool { - return processedHdrs[i].GetNonce() < processedHdrs[j].GetNonce() - }) + process.SortHeadersByNonce(processedHdrs) for i := 0; i < len(processedHdrs); i++ { hdr, ok := processedHdrs[i].(*block.MetaBlock) @@ -846,9 +852,7 @@ func (sp *shardProcessor) getHighestHdrForOwnShardFromMetachain( ownShIdHdrs = append(ownShIdHdrs, &block.Header{}) } - sort.Slice(ownShIdHdrs, func(i, j int) bool { - return ownShIdHdrs[i].GetNonce() < ownShIdHdrs[j].GetNonce() - }) + process.SortHeadersByNonce(ownShIdHdrs) ownShIdHdrsHashes := make([][]byte, 0) for i := 0; i < len(ownShIdHdrs); i++ { @@ -891,8 +895,8 @@ func (sp *shardProcessor) getHighestHdrForShardFromMetachain(shardId uint32, hdr return ownShIdHdr, nil } -// getProcessedMetaBlocksFromHeader returns all the meta blocks fully processed -func (sp *shardProcessor) getProcessedMetaBlocksFromHeader(header *block.Header) ([]data.HeaderHandler, error) { +// getOrderedProcessedMetaBlocksFromHeader returns all the meta blocks fully processed +func (sp *shardProcessor) getOrderedProcessedMetaBlocksFromHeader(header *block.Header) ([]data.HeaderHandler, error) { if header == nil { return nil, process.ErrNilBlockHeader } @@ -904,11 +908,24 @@ func (sp *shardProcessor) getProcessedMetaBlocksFromHeader(header *block.Header) log.Debug(fmt.Sprintf("cross mini blocks in body: %d\n", len(miniBlockHashes))) - processedMetaBlocks, processedCrossMiniBlocksHashes, err := sp.getProcessedMetaBlocksFromMiniBlockHashes(miniBlockHashes) + processedMetaBlocks, err := sp.getOrderedProcessedMetaBlocksFromMiniBlockHashes(miniBlockHashes) if err != nil { return nil, err } + return processedMetaBlocks, nil +} + +func (sp *shardProcessor) addProcessedCrossMiniBlocksFromHeader(header *block.Header) error { + if header == nil { + return process.ErrNilBlockHeader + } + + miniBlockHashes := make(map[int][]byte, 0) + for i := 0; i < len(header.MiniBlockHeaders); i++ { + miniBlockHashes[i] = header.MiniBlockHeaders[i].Hash + } + sp.hdrsForCurrBlock.mutHdrsForBlock.RLock() for metaBlockHash, hdrInfo := range sp.hdrsForCurrBlock.hdrHashAndInfo { if !hdrInfo.usedInBlock { @@ -918,23 +935,28 @@ func (sp *shardProcessor) getProcessedMetaBlocksFromHeader(header *block.Header) metaBlock, ok := hdrInfo.hdr.(*block.MetaBlock) if !ok { sp.hdrsForCurrBlock.mutHdrsForBlock.RUnlock() - return nil, process.ErrWrongTypeAssertion + return process.ErrWrongTypeAssertion } crossMiniBlockHashes := metaBlock.GetMiniBlockHeadersWithDst(sp.shardCoordinator.SelfId()) - for hash := range crossMiniBlockHashes { - if processedCrossMiniBlocksHashes[hash] { - sp.addProcessedMiniBlock([]byte(metaBlockHash), []byte(hash)) + for key, miniBlockHash := range miniBlockHashes { + _, ok = crossMiniBlockHashes[string(miniBlockHash)] + if !ok { + continue } + + sp.addProcessedMiniBlock([]byte(metaBlockHash), miniBlockHash) + + delete(miniBlockHashes, key) } } sp.hdrsForCurrBlock.mutHdrsForBlock.RUnlock() - return processedMetaBlocks, nil + return nil } -// getProcessedMetaBlocks returns all the meta blocks fully processed -func (sp *shardProcessor) getProcessedMetaBlocksFromMiniBlocks( +// getOrderedProcessedMetaBlocksFromMiniBlocks returns all the meta blocks fully processed ordered +func (sp *shardProcessor) getOrderedProcessedMetaBlocksFromMiniBlocks( usedMiniBlocks []*block.MiniBlock, ) ([]data.HeaderHandler, error) { @@ -954,14 +976,14 @@ func (sp *shardProcessor) getProcessedMetaBlocksFromMiniBlocks( } log.Debug(fmt.Sprintf("cross mini blocks in body: %d\n", len(miniBlockHashes))) - processedMetaBlocks, _, err := sp.getProcessedMetaBlocksFromMiniBlockHashes(miniBlockHashes) + processedMetaBlocks, err := sp.getOrderedProcessedMetaBlocksFromMiniBlockHashes(miniBlockHashes) return processedMetaBlocks, err } -func (sp *shardProcessor) getProcessedMetaBlocksFromMiniBlockHashes( +func (sp *shardProcessor) getOrderedProcessedMetaBlocksFromMiniBlockHashes( miniBlockHashes map[int][]byte, -) ([]data.HeaderHandler, map[string]bool, error) { +) ([]data.HeaderHandler, error) { processedMetaHdrs := make([]data.HeaderHandler, 0) processedCrossMiniBlocksHashes := make(map[string]bool) @@ -975,7 +997,7 @@ func (sp *shardProcessor) getProcessedMetaBlocksFromMiniBlockHashes( metaBlock, ok := hdrInfo.hdr.(*block.MetaBlock) if !ok { sp.hdrsForCurrBlock.mutHdrsForBlock.RUnlock() - return nil, nil, process.ErrWrongTypeAssertion + return nil, process.ErrWrongTypeAssertion } log.Debug(fmt.Sprintf("meta header nonce: %d\n", metaBlock.Nonce)) @@ -1012,13 +1034,9 @@ func (sp *shardProcessor) getProcessedMetaBlocksFromMiniBlockHashes( } sp.hdrsForCurrBlock.mutHdrsForBlock.RUnlock() - if len(processedMetaHdrs) > 1 { - sort.Slice(processedMetaHdrs, func(i, j int) bool { - return processedMetaHdrs[i].GetNonce() < processedMetaHdrs[j].GetNonce() - }) - } + process.SortHeadersByNonce(processedMetaHdrs) - return processedMetaHdrs, processedCrossMiniBlocksHashes, nil + return processedMetaHdrs, nil } func (sp *shardProcessor) removeProcessedMetaBlocksFromPool(processedMetaHdrs []data.HeaderHandler) error { @@ -1102,33 +1120,36 @@ func (sp *shardProcessor) receivedMetaBlock(metaBlockHash []byte) { sp.hdrsForCurrBlock.mutHdrsForBlock.Lock() - if sp.hdrsForCurrBlock.missingHdrs > 0 || sp.hdrsForCurrBlock.missingFinalHdrs > 0 { + haveMissingMetaHeaders := sp.hdrsForCurrBlock.missingHdrs > 0 || sp.hdrsForCurrBlock.missingFinalityAttestingHdrs > 0 + if haveMissingMetaHeaders { hdrInfoForHash := sp.hdrsForCurrBlock.hdrHashAndInfo[string(metaBlockHash)] - if hdrInfoForHash != nil && (hdrInfoForHash.hdr == nil || hdrInfoForHash.hdr.IsInterfaceNil()) { + receivedMissingMetaHeader := hdrInfoForHash != nil && (hdrInfoForHash.hdr == nil || hdrInfoForHash.hdr.IsInterfaceNil()) + if receivedMissingMetaHeader { hdrInfoForHash.hdr = metaBlock sp.hdrsForCurrBlock.missingHdrs-- - if metaBlock.Nonce > sp.currHighestMetaHdrNonce { - sp.currHighestMetaHdrNonce = metaBlock.Nonce + if metaBlock.Nonce > sp.hdrsForCurrBlock.highestHdrNonce { + sp.hdrsForCurrBlock.highestHdrNonce = metaBlock.Nonce } } + // attesting something if sp.hdrsForCurrBlock.missingHdrs == 0 { - missingFinalHdrs := sp.hdrsForCurrBlock.missingFinalHdrs - sp.hdrsForCurrBlock.missingFinalHdrs = sp.requestFinalMissingHeaders() - if sp.hdrsForCurrBlock.missingFinalHdrs == 0 { - log.Info(fmt.Sprintf("received %d missing final meta headers\n", missingFinalHdrs)) + missingFinalityAttestingMetaHdrs := sp.hdrsForCurrBlock.missingFinalityAttestingHdrs + sp.hdrsForCurrBlock.missingFinalityAttestingHdrs = sp.requestMissingFinalityAttestingHeaders() + if sp.hdrsForCurrBlock.missingFinalityAttestingHdrs == 0 { + log.Info(fmt.Sprintf("received %d missing finality attesting meta headers\n", missingFinalityAttestingMetaHdrs)) } else { - log.Info(fmt.Sprintf("requested %d missing final meta headers\n", sp.hdrsForCurrBlock.missingFinalHdrs)) + log.Info(fmt.Sprintf("requested %d missing finality attesting meta headers\n", sp.hdrsForCurrBlock.missingFinalityAttestingHdrs)) } } - missingHdrs := sp.hdrsForCurrBlock.missingHdrs - missingFinalHdrs := sp.hdrsForCurrBlock.missingFinalHdrs + missingMetaHdrs := sp.hdrsForCurrBlock.missingHdrs + missingFinalityAttestingMetaHdrs := sp.hdrsForCurrBlock.missingFinalityAttestingHdrs sp.hdrsForCurrBlock.mutHdrsForBlock.Unlock() - allMissingNeededHdrsReceived := missingHdrs == 0 && missingFinalHdrs == 0 - if allMissingNeededHdrsReceived { + allMissingMetaHeadersReceived := missingMetaHdrs == 0 && missingFinalityAttestingMetaHdrs == 0 + if allMissingMetaHeadersReceived { sp.chRcvAllMetaHdrs <- true } } else { @@ -1149,13 +1170,13 @@ func (sp *shardProcessor) receivedMetaBlock(metaBlockHash []byte) { sp.txCoordinator.RequestMiniBlocks(metaBlock) } -// requestFinalMissingHeaders requests the headers needed to accept the current selected headers for processing the +// requestMissingFinalityAttestingHeaders requests the headers needed to accept the current selected headers for processing the // current block. It requests the metaBlockFinality headers greater than the highest meta header related to the block // which should be processed -func (sp *shardProcessor) requestFinalMissingHeaders() uint32 { +func (sp *shardProcessor) requestMissingFinalityAttestingHeaders() uint32 { requestedBlockHeaders := uint32(0) - for i := sp.currHighestMetaHdrNonce + 1; i <= sp.currHighestMetaHdrNonce+uint64(sp.metaBlockFinality); i++ { - if sp.currHighestMetaHdrNonce == uint64(0) { + for i := sp.hdrsForCurrBlock.highestHdrNonce + 1; i <= sp.hdrsForCurrBlock.highestHdrNonce+uint64(sp.metaBlockFinality); i++ { + if sp.hdrsForCurrBlock.highestHdrNonce == uint64(0) { continue } @@ -1192,19 +1213,18 @@ func (sp *shardProcessor) requestMetaHeaders(header *block.Header) (uint32, uint } if sp.hdrsForCurrBlock.missingHdrs == 0 { - sp.hdrsForCurrBlock.missingFinalHdrs = sp.requestFinalMissingHeaders() + sp.hdrsForCurrBlock.missingFinalityAttestingHdrs = sp.requestMissingFinalityAttestingHeaders() } requestedHdrs := sp.hdrsForCurrBlock.missingHdrs - requestedFinalHdrs := sp.hdrsForCurrBlock.missingFinalHdrs + requestedFinalityAttestingHdrs := sp.hdrsForCurrBlock.missingFinalityAttestingHdrs sp.hdrsForCurrBlock.mutHdrsForBlock.Unlock() - return requestedHdrs, requestedFinalHdrs + return requestedHdrs, requestedFinalityAttestingHdrs } func (sp *shardProcessor) computeMissingAndExistingMetaHeaders(header *block.Header) [][]byte { missingHeadersHashes := make([][]byte, 0) - sp.currHighestMetaHdrNonce = uint64(0) sp.hdrsForCurrBlock.mutHdrsForBlock.Lock() for i := 0; i < len(header.MetaBlockHashes); i++ { @@ -1220,8 +1240,8 @@ func (sp *shardProcessor) computeMissingAndExistingMetaHeaders(header *block.Hea sp.hdrsForCurrBlock.hdrHashAndInfo[string(header.MetaBlockHashes[i])] = &hdrInfo{hdr: hdr, usedInBlock: true} - if hdr.Nonce > sp.currHighestMetaHdrNonce { - sp.currHighestMetaHdrNonce = hdr.Nonce + if hdr.Nonce > sp.hdrsForCurrBlock.highestHdrNonce { + sp.hdrsForCurrBlock.highestHdrNonce = hdr.Nonce } } sp.hdrsForCurrBlock.mutHdrsForBlock.Unlock() @@ -1495,12 +1515,12 @@ func (sp *shardProcessor) createMiniBlocks( return nil, process.ErrNilTransactionPool } - destMeMiniBlocks, txs, hdrs, err := sp.createAndProcessCrossMiniBlocksDstMe(noShards, maxItemsInBlock, round, haveTime) + destMeMiniBlocks, nbTxs, nbHdrs, err := sp.createAndProcessCrossMiniBlocksDstMe(noShards, maxItemsInBlock, round, haveTime) if err != nil { log.Info(err.Error()) } - processedMetaHdrs, errNotCritical := sp.getProcessedMetaBlocksFromMiniBlocks(destMeMiniBlocks) + processedMetaHdrs, errNotCritical := sp.getOrderedProcessedMetaBlocksFromMiniBlocks(destMeMiniBlocks) if errNotCritical != nil { log.Debug(errNotCritical.Error()) } @@ -1510,16 +1530,16 @@ func (sp *shardProcessor) createMiniBlocks( return nil, err } - log.Info(fmt.Sprintf("processed %d miniblocks and %d txs with destination in self shard\n", len(destMeMiniBlocks), txs)) + log.Info(fmt.Sprintf("processed %d miniblocks and %d txs with destination in self shard\n", len(destMeMiniBlocks), nbTxs)) if len(destMeMiniBlocks) > 0 { miniBlocks = append(miniBlocks, destMeMiniBlocks...) } - maxTxSpaceRemained := int32(maxItemsInBlock) - int32(txs) + maxTxSpaceRemained := int32(maxItemsInBlock) - int32(nbTxs) maxMbSpaceRemained := sp.getMaxMiniBlocksSpaceRemained( maxItemsInBlock, - uint32(len(destMeMiniBlocks))+hdrs, + uint32(len(destMeMiniBlocks))+nbHdrs, uint32(len(miniBlocks))) mbFromMe := sp.txCoordinator.CreateMbsAndProcessTransactionsFromMe( @@ -1587,7 +1607,7 @@ func (sp *shardProcessor) CreateBlockHeader(bodyHandler data.BodyHandler, round sp.appStatusHandler.SetUInt64Value(core.MetricNumTxInBlock, uint64(totalTxCount)) sp.appStatusHandler.SetUInt64Value(core.MetricNumMiniBlocks, uint64(len(body))) - header.MetaBlockHashes = sp.sortHdrsHashesForCurrentBlock(true) + header.MetaBlockHashes = sp.sortMetaHeaderHashesForCurrentBlockByNonce(true) sp.blockSizeThrottler.Add( round, @@ -1743,17 +1763,19 @@ func (sp *shardProcessor) getMaxMiniBlocksSpaceRemained( return maxMbSpaceRemained } -func (sp *shardProcessor) CreateBlockStarted() { +func (sp *shardProcessor) createBlockStarted() { sp.txCoordinator.CreateBlockStarted() sp.hdrsForCurrBlock.mutHdrsForBlock.Lock() sp.hdrsForCurrBlock.missingHdrs = 0 - sp.hdrsForCurrBlock.missingFinalHdrs = 0 + sp.hdrsForCurrBlock.missingFinalityAttestingHdrs = 0 + sp.hdrsForCurrBlock.highestHdrNonce = 0 sp.hdrsForCurrBlock.hdrHashAndInfo = make(map[string]*hdrInfo) sp.hdrsForCurrBlock.mutHdrsForBlock.Unlock() } -func (sp *shardProcessor) sortHdrsForCurrentBlock(usedInBlock bool) ([]*block.MetaBlock, error) { +//TODO: remove bool parameter and give instead the set to sort +func (sp *shardProcessor) sortMetaHeadersForCurrentBlockByNonce(usedInBlock bool) ([]*block.MetaBlock, error) { hdrsForCurrentBlock := make([]*block.MetaBlock, 0) sp.hdrsForCurrBlock.mutHdrsForBlock.RLock() @@ -1781,7 +1803,8 @@ func (sp *shardProcessor) sortHdrsForCurrentBlock(usedInBlock bool) ([]*block.Me return hdrsForCurrentBlock, nil } -func (sp *shardProcessor) sortHdrsHashesForCurrentBlock(usedInBlock bool) [][]byte { +//TODO: remove bool parameter and give instead the set to sort +func (sp *shardProcessor) sortMetaHeaderHashesForCurrentBlockByNonce(usedInBlock bool) [][]byte { hdrsForCurrentBlockInfo := make([]*nonceAndHashInfo, 0) sp.hdrsForCurrBlock.mutHdrsForBlock.RLock() diff --git a/process/block/shardblock_test.go b/process/block/shardblock_test.go index f6c0a1d0db1..d1e8da10797 100644 --- a/process/block/shardblock_test.go +++ b/process/block/shardblock_test.go @@ -1344,8 +1344,8 @@ func TestShardProcessor_IsMetaHeaderFinalShouldPass(t *testing.T) { assert.True(t, res) } -//-------- requestFinalMissingHeaders -func TestShardProcessor_RequestFinalMissingHeaders(t *testing.T) { +//-------- requestMissingFinalityAttestingHeaders +func TestShardProcessor_RequestMissingFinalityAttestingHeaders(t *testing.T) { t.Parallel() tdp := mock.NewPoolsHolderMock() @@ -1353,8 +1353,8 @@ func TestShardProcessor_RequestFinalMissingHeaders(t *testing.T) { arguments.DataPool = tdp sp, _ := blproc.NewShardProcessor(arguments) - sp.SetCurrHighestMetaHdrNonce(1) - res := sp.RequestFinalMissingHeaders() + sp.SetHighestHdrNonceForCurrentBlock(1) + res := sp.RequestMissingFinalityAttestingHeaders() assert.Equal(t, res > 0, true) } @@ -2964,7 +2964,7 @@ func TestShardProcessor_GetProcessedMetaBlockFromPoolShouldWork(t *testing.T) { blockHeader := &block.Header{MetaBlockHashes: hashes, MiniBlockHeaders: mbHeaders} - _, err := bp.GetProcessedMetaBlocksFromHeader(blockHeader) + err := bp.AddProcessedCrossMiniBlocksFromHeader(blockHeader) assert.Nil(t, err) //check WasMiniBlockProcessed for remaining metablocks @@ -3317,7 +3317,7 @@ func TestShardProcessor_RemoveAndSaveLastNotarizedMetaHdrNoDstMB(t *testing.T) { blockHeader := &block.Header{} // test header not in pool and defer called - processedMetaHdrs, err := sp.GetProcessedMetaBlocksFromHeader(blockHeader) + processedMetaHdrs, err := sp.GetOrderedProcessedMetaBlocksFromHeader(blockHeader) assert.Nil(t, err) err = sp.SaveLastNotarizedHeader(sharding.MetachainShardId, processedMetaHdrs) @@ -3339,7 +3339,7 @@ func TestShardProcessor_RemoveAndSaveLastNotarizedMetaHdrNoDstMB(t *testing.T) { hashes = append(hashes, currHash) blockHeader = &block.Header{MetaBlockHashes: hashes, MiniBlockHeaders: mbHeaders} - processedMetaHdrs, err = sp.GetProcessedMetaBlocksFromHeader(blockHeader) + processedMetaHdrs, err = sp.GetOrderedProcessedMetaBlocksFromHeader(blockHeader) assert.Equal(t, process.ErrWrongTypeAssertion, err) err = sp.SaveLastNotarizedHeader(sharding.MetachainShardId, processedMetaHdrs) @@ -3365,7 +3365,7 @@ func TestShardProcessor_RemoveAndSaveLastNotarizedMetaHdrNoDstMB(t *testing.T) { hashes = append(hashes, prevHash) blockHeader = &block.Header{MetaBlockHashes: hashes, MiniBlockHeaders: mbHeaders} - processedMetaHdrs, err = sp.GetProcessedMetaBlocksFromHeader(blockHeader) + processedMetaHdrs, err = sp.GetOrderedProcessedMetaBlocksFromHeader(blockHeader) assert.Nil(t, err) err = sp.SaveLastNotarizedHeader(sharding.MetachainShardId, processedMetaHdrs) @@ -3520,7 +3520,7 @@ func TestShardProcessor_RemoveAndSaveLastNotarizedMetaHdrNotAllMBFinished(t *tes hashes = append(hashes, prevHash) blockHeader := &block.Header{MetaBlockHashes: hashes, MiniBlockHeaders: mbHeaders} - processedMetaHdrs, err := sp.GetProcessedMetaBlocksFromHeader(blockHeader) + processedMetaHdrs, err := sp.GetOrderedProcessedMetaBlocksFromHeader(blockHeader) assert.Nil(t, err) err = sp.SaveLastNotarizedHeader(sharding.MetachainShardId, processedMetaHdrs) @@ -3660,7 +3660,7 @@ func TestShardProcessor_RemoveAndSaveLastNotarizedMetaHdrAllMBFinished(t *testin hashes = append(hashes, prevHash) blockHeader := &block.Header{MetaBlockHashes: hashes, MiniBlockHeaders: mbHeaders} - processedMetaHdrs, err := sp.GetProcessedMetaBlocksFromHeader(blockHeader) + processedMetaHdrs, err := sp.GetOrderedProcessedMetaBlocksFromHeader(blockHeader) assert.Nil(t, err) assert.Equal(t, 2, len(processedMetaHdrs)) diff --git a/process/common.go b/process/common.go index 5cca6b46cd1..840667a59df 100644 --- a/process/common.go +++ b/process/common.go @@ -1,6 +1,8 @@ package process import ( + "sort" + "github.com/ElrondNetwork/elrond-go/data" "github.com/ElrondNetwork/elrond-go/data/block" "github.com/ElrondNetwork/elrond-go/data/transaction" @@ -552,3 +554,12 @@ func getHeaderHashFromStorageWithNonce( return hash, nil } + +// SortHeadersByNonce will sort a given list of headers by nonce +func SortHeadersByNonce(headers []data.HeaderHandler) { + if len(headers) > 1 { + sort.Slice(headers, func(i, j int) bool { + return headers[i].GetNonce() < headers[j].GetNonce() + }) + } +} diff --git a/process/common_test.go b/process/common_test.go index c8baa447161..42bad1ed51f 100644 --- a/process/common_test.go +++ b/process/common_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/ElrondNetwork/elrond-go/data" "github.com/ElrondNetwork/elrond-go/data/block" "github.com/ElrondNetwork/elrond-go/data/transaction" "github.com/ElrondNetwork/elrond-go/dataRetriever" @@ -2110,3 +2111,21 @@ func TestGetTransactionHandlerFromStorageShouldWork(t *testing.T) { assert.Nil(t, err) assert.Equal(t, txFromPool, tx) } + +func TestSortHeadersByNonceShouldWork(t *testing.T) { + headers := []data.HeaderHandler{ + &block.Header{Nonce: 3}, + &block.Header{Nonce: 2}, + &block.Header{Nonce: 1}, + } + + assert.Equal(t, uint64(3), headers[0].GetNonce()) + assert.Equal(t, uint64(2), headers[1].GetNonce()) + assert.Equal(t, uint64(1), headers[2].GetNonce()) + + process.SortHeadersByNonce(headers) + + assert.Equal(t, uint64(1), headers[0].GetNonce()) + assert.Equal(t, uint64(2), headers[1].GetNonce()) + assert.Equal(t, uint64(3), headers[2].GetNonce()) +} diff --git a/process/interface.go b/process/interface.go index 3c124ceebbb..f0b6c932381 100644 --- a/process/interface.go +++ b/process/interface.go @@ -399,7 +399,7 @@ type PoolsCleaner interface { IsInterfaceNil() bool } -// InterceptorThrottler can determine if the a new go routine can start +// InterceptorThrottler can determine if a new go routine can start type InterceptorThrottler interface { CanProcess() bool StartProcessing()