Skip to content

Commit

Permalink
Merge branch 'development' into EN-4543-Improve-probable-highest-nonc…
Browse files Browse the repository at this point in the history
…e-reset-and-sync-status

# Conflicts:
#	process/constants.go
#	process/sync/baseSync.go
  • Loading branch information
SebastianMarian committed Oct 17, 2019
2 parents d95d6e1 + 6a38fb3 commit b787395
Show file tree
Hide file tree
Showing 30 changed files with 169 additions and 75 deletions.
4 changes: 2 additions & 2 deletions dataRetriever/dataPool/metaDataPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ func NewMetaDataPool(
}, nil
}

// MetaChainBlocks returns the holder for meta blocks
func (mdp *metaDataPool) MetaChainBlocks() storage.Cacher {
// MetaBlocks returns the holder for meta blocks
func (mdp *metaDataPool) MetaBlocks() storage.Cacher {
return mdp.metaBlocks
}

Expand Down
6 changes: 3 additions & 3 deletions dataRetriever/dataPool/metaDataPool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,15 @@ func TestNewMetaDataPool_NilUnsingedPoolNoncesShouldErr(t *testing.T) {
func TestNewMetaDataPool_ConfigOk(t *testing.T) {
t.Parallel()

metaChainBlocks := &mock.CacherStub{}
metaBlocks := &mock.CacherStub{}
shardHeaders := &mock.CacherStub{}
miniBlocks := &mock.CacherStub{}
hdrsNonces := &mock.Uint64SyncMapCacherStub{}
transactions := &mock.ShardedDataStub{}
unsigned := &mock.ShardedDataStub{}

tdp, err := dataPool.NewMetaDataPool(
metaChainBlocks,
metaBlocks,
miniBlocks,
shardHeaders,
hdrsNonces,
Expand All @@ -128,7 +128,7 @@ func TestNewMetaDataPool_ConfigOk(t *testing.T) {

assert.Nil(t, err)
//pointer checking
assert.True(t, metaChainBlocks == tdp.MetaChainBlocks())
assert.True(t, metaBlocks == tdp.MetaBlocks())
assert.True(t, shardHeaders == tdp.ShardHeaders())
assert.True(t, miniBlocks == tdp.MiniBlocks())
assert.True(t, hdrsNonces == tdp.HeadersNonces())
Expand Down
16 changes: 16 additions & 0 deletions dataRetriever/dataPool/nonceSyncMapCacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func (nspc *nonceSyncMapCacher) Remove(nonce uint64, shardId uint32) {
}

syncMap.Delete(shardId)
nspc.removeNonceFromCacheIfSyncMapIsEmpty(nonce, syncMap)
}

// RegisterHandler registers a new handler to be called when a new data is added
Expand Down Expand Up @@ -166,3 +167,18 @@ func (nspc *nonceSyncMapCacher) IsInterfaceNil() bool {
}
return false
}

func (nspc *nonceSyncMapCacher) removeNonceFromCacheIfSyncMapIsEmpty(nonce uint64, syncMap *ShardIdHashSyncMap) {
isSyncMapEmpty := true
syncMap.Range(func(shardId uint32, hash []byte) bool {
if hash != nil {
isSyncMapEmpty = false
return false
}
return true
})

if isSyncMapEmpty {
nspc.cacher.Remove(nspc.nonceConverter.ToByteSlice(nonce))
}
}
1 change: 1 addition & 0 deletions dataRetriever/dataPool/nonceSyncMapCacher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ func TestNonceSyncMapCacher_RemoveShardIdWhenExists(t *testing.T) {
PeekCalled: func(key []byte) (interface{}, bool) {
return syncMap, true
},
RemoveCalled: func(key []byte) {},
},
mock.NewNonceHashConverterMock())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func (rcf *resolversContainerFactory) createMetaChainHeaderResolver(identifier s
hdrNonceStore := rcf.store.GetStorer(dataRetriever.MetaHdrNonceHashDataUnit)
resolver, err := resolvers.NewHeaderResolver(
resolverSender,
rcf.dataPools.MetaChainBlocks(),
rcf.dataPools.MetaBlocks(),
rcf.dataPools.HeadersNonces(),
hdrStorer,
hdrNonceStore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func createDataPools() dataRetriever.MetaPoolsHolder {
MiniBlocksCalled: func() storage.Cacher {
return &mock.CacherStub{}
},
MetaChainBlocksCalled: func() storage.Cacher {
MetaBlocksCalled: func() storage.Cacher {
return &mock.CacherStub{}
},
HeadersNoncesCalled: func() dataRetriever.Uint64SyncMapCacher {
Expand Down
2 changes: 1 addition & 1 deletion dataRetriever/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ type PoolsHolder interface {

// MetaPoolsHolder defines getter for data pools for metachain
type MetaPoolsHolder interface {
MetaChainBlocks() storage.Cacher
MetaBlocks() storage.Cacher
MiniBlocks() storage.Cacher
ShardHeaders() storage.Cacher
HeadersNonces() Uint64SyncMapCacher
Expand Down
6 changes: 3 additions & 3 deletions dataRetriever/mock/metaPoolsHolderStub.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

type MetaPoolsHolderStub struct {
MetaChainBlocksCalled func() storage.Cacher
MetaBlocksCalled func() storage.Cacher
MiniBlocksCalled func() storage.Cacher
ShardHeadersCalled func() storage.Cacher
HeadersNoncesCalled func() dataRetriever.Uint64SyncMapCacher
Expand All @@ -22,8 +22,8 @@ func (mphs *MetaPoolsHolderStub) UnsignedTransactions() dataRetriever.ShardedDat
return mphs.UnsignedTransactionsCalled()
}

func (mphs *MetaPoolsHolderStub) MetaChainBlocks() storage.Cacher {
return mphs.MetaChainBlocksCalled()
func (mphs *MetaPoolsHolderStub) MetaBlocks() storage.Cacher {
return mphs.MetaBlocksCalled()
}

func (mphs *MetaPoolsHolderStub) MiniBlocks() storage.Cacher {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func TestMetaHeadersAreRequstedOnlyFromMetachain(t *testing.T) {
}

chanReceived := make(chan struct{}, 1000)
node4Meta.MetaDataPool.MetaChainBlocks().Put(metaHdrHashFromMetachain, metaHdrFromMetachain)
node4Meta.MetaDataPool.MetaBlocks().Put(metaHdrHashFromMetachain, metaHdrFromMetachain)
node1Shard0.ShardDataPool.MetaBlocks().Clear()
node1Shard0.ShardDataPool.MetaBlocks().RegisterHandler(func(key []byte) {
chanReceived <- struct{}{}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestInterceptedMetaBlockVerifiedWithCorrectConsensusGroup(t *testing.T) {

// all nodes in metachain do not have the block in pool as interceptor does not validate it with a wrong consensus
for _, metaNode := range nodesMap[sharding.MetachainShardId] {
v, ok := metaNode.MetaDataPool.MetaChainBlocks().Get(headerHash)
v, ok := metaNode.MetaDataPool.MetaBlocks().Get(headerHash)
assert.True(t, ok)
assert.Equal(t, header, v)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestHeadersAreResolvedByMetachainAndShard(t *testing.T) {
metaHeaderBytes, _ := integrationTests.TestMarshalizer.Marshal(metaHdr)
metaHeaderHash := integrationTests.TestHasher.Compute(string(metaHeaderBytes))
for i := 0; i < numMetaNodes; i++ {
nodes[i+1].MetaDataPool.MetaChainBlocks().HasOrAdd(metaHeaderHash, metaHdr)
nodes[i+1].MetaDataPool.MetaBlocks().HasOrAdd(metaHeaderHash, metaHdr)
}

for i := 0; i < maxNumRequests; i++ {
Expand All @@ -156,7 +156,7 @@ func TestHeadersAreResolvedByMetachainAndShard(t *testing.T) {
metaHeaderBytes2, _ := integrationTests.TestMarshalizer.Marshal(metaHdr2)
metaHeaderHash2 := integrationTests.TestHasher.Compute(string(metaHeaderBytes2))
for i := 0; i < numMetaNodes; i++ {
nodes[i+1].MetaDataPool.MetaChainBlocks().HasOrAdd(metaHeaderHash2, metaHdr2)
nodes[i+1].MetaDataPool.MetaBlocks().HasOrAdd(metaHeaderHash2, metaHdr2)

syncMap := &dataPool.ShardIdHashSyncMap{}
syncMap.Store(sharding.MetachainShardId, metaHeaderHash2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,7 @@ func createMetaNetNode(
tn.accntState = accntAdapter
tn.shardId = sharding.MetachainShardId

dPool.MetaChainBlocks().RegisterHandler(func(key []byte) {
dPool.MetaBlocks().RegisterHandler(func(key []byte) {
atomic.AddInt32(&tn.metachainHdrRecv, 1)
})
dPool.ShardHeaders().RegisterHandler(func(key []byte) {
Expand Down
2 changes: 1 addition & 1 deletion integrationTests/sync/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func emptyShardDataPool(sdp dataRetriever.PoolsHolder) {

func emptyMetaDataPool(holder dataRetriever.MetaPoolsHolder) {
holder.HeadersNonces().Clear()
holder.MetaChainBlocks().Clear()
holder.MetaBlocks().Clear()
holder.MiniBlocks().Clear()
holder.ShardHeaders().Clear()
}
Expand Down
6 changes: 3 additions & 3 deletions integrationTests/testProcessorNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ func (tpn *TestProcessorNode) addHandlersForCounters() {

if tpn.ShardCoordinator.SelfId() == sharding.MetachainShardId {
tpn.MetaDataPool.ShardHeaders().RegisterHandler(hdrHandlers)
tpn.MetaDataPool.MetaChainBlocks().RegisterHandler(metaHandlers)
tpn.MetaDataPool.MetaBlocks().RegisterHandler(metaHandlers)
} else {
txHandler := func(key []byte) {
atomic.AddInt32(&tpn.CounterTxRecv, 1)
Expand Down Expand Up @@ -768,7 +768,7 @@ func (tpn *TestProcessorNode) GetBlockBody(header *dataBlock.Header) (dataBlock.

// GetMetaHeader returns the first *dataBlock.MetaBlock stored in datapools having the nonce provided as parameter
func (tpn *TestProcessorNode) GetMetaHeader(nonce uint64) (*dataBlock.MetaBlock, error) {
invalidCachers := tpn.MetaDataPool == nil || tpn.MetaDataPool.MetaChainBlocks() == nil || tpn.MetaDataPool.HeadersNonces() == nil
invalidCachers := tpn.MetaDataPool == nil || tpn.MetaDataPool.MetaBlocks() == nil || tpn.MetaDataPool.HeadersNonces() == nil
if invalidCachers {
return nil, errors.New("invalid data pool")
}
Expand All @@ -783,7 +783,7 @@ func (tpn *TestProcessorNode) GetMetaHeader(nonce uint64) (*dataBlock.MetaBlock,
return nil, errors.New(fmt.Sprintf("no hash-nonce hash in HeadersNonces for nonce %d", nonce))
}

headerObject, ok := tpn.MetaDataPool.MetaChainBlocks().Get(headerHash)
headerObject, ok := tpn.MetaDataPool.MetaBlocks().Get(headerHash)
if !ok {
return nil, errors.New(fmt.Sprintf("no header found for hash %s", hex.EncodeToString(headerHash)))
}
Expand Down
2 changes: 1 addition & 1 deletion integrationTests/testProcessorNodeWithMultisigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func VerifyNodesHaveHeaders(

for _, metaNode := range nodesMap[sharding.MetachainShardId] {
if shHeader == sharding.MetachainShardId {
v, ok = metaNode.MetaDataPool.MetaChainBlocks().Get(headerHash)
v, ok = metaNode.MetaDataPool.MetaBlocks().Get(headerHash)
} else {
v, ok = metaNode.MetaDataPool.ShardHeaders().Get(headerHash)
}
Expand Down
6 changes: 3 additions & 3 deletions node/mock/metaPoolsHolderStub.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

type MetaPoolsHolderStub struct {
MetaChainBlocksCalled func() storage.Cacher
MetaBlocksCalled func() storage.Cacher
MiniBlocksCalled func() storage.Cacher
ShardHeadersCalled func() storage.Cacher
HeadersNoncesCalled func() dataRetriever.Uint64SyncMapCacher
Expand All @@ -22,8 +22,8 @@ func (mphs *MetaPoolsHolderStub) UnsignedTransactions() dataRetriever.ShardedDat
return mphs.UnsignedTransactionsCalled()
}

func (mphs *MetaPoolsHolderStub) MetaChainBlocks() storage.Cacher {
return mphs.MetaChainBlocksCalled()
func (mphs *MetaPoolsHolderStub) MetaBlocks() storage.Cacher {
return mphs.MetaBlocksCalled()
}

func (mphs *MetaPoolsHolderStub) MiniBlocks() storage.Cacher {
Expand Down
7 changes: 6 additions & 1 deletion process/block/baseProcess_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func initDataPool(testHash []byte) *mock.PoolsHolderStub {
HasCalled: func(nonce uint64, shardId uint32) bool {
return true
},
RemoveCalled: func(nonce uint64, shardId uint32) {},
}
},
MetaBlocksCalled: func() storage.Cacher {
Expand All @@ -130,6 +131,7 @@ func initDataPool(testHash []byte) *mock.PoolsHolderStub {
return nil, false
},
RegisterHandlerCalled: func(i func(key []byte)) {},
RemoveCalled: func(key []byte) {},
}
},
MiniBlocksCalled: func() storage.Cacher {
Expand Down Expand Up @@ -158,6 +160,8 @@ func initDataPool(testHash []byte) *mock.PoolsHolderStub {
cs := &mock.CacherStub{}
cs.RegisterHandlerCalled = func(i func(key []byte)) {
}
cs.RemoveCalled = func(key []byte) {
}
return cs
},
}
Expand All @@ -166,7 +170,7 @@ func initDataPool(testHash []byte) *mock.PoolsHolderStub {

func initMetaDataPool() *mock.MetaPoolsHolderStub {
mdp := &mock.MetaPoolsHolderStub{
MetaChainBlocksCalled: func() storage.Cacher {
MetaBlocksCalled: func() storage.Cacher {
return &mock.CacherStub{
GetCalled: func(key []byte) (value interface{}, ok bool) {
if reflect.DeepEqual(key, []byte("tx1_hash")) {
Expand All @@ -187,6 +191,7 @@ func initMetaDataPool() *mock.MetaPoolsHolderStub {
return nil, false
},
RegisterHandlerCalled: func(i func(key []byte)) {},
RemoveCalled: func(key []byte) {},
}
},
MiniBlocksCalled: func() storage.Cacher {
Expand Down
14 changes: 9 additions & 5 deletions process/block/metablock.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,14 +447,18 @@ func (mp *metaProcessor) CommitBlock(

headerNoncePool := mp.dataPool.HeadersNonces()
if headerNoncePool == nil {
err = process.ErrNilDataPoolHolder
err = process.ErrNilHeadersNoncesDataPool
return err
}

//TODO: Should be analyzed if put in pool is really necessary or not (right now there is no action of removing them)
syncMap := &dataPool.ShardIdHashSyncMap{}
syncMap.Store(headerHandler.GetShardID(), headerHash)
headerNoncePool.Merge(headerHandler.GetNonce(), syncMap)
metaBlockPool := mp.dataPool.MetaBlocks()
if metaBlockPool == nil {
err = process.ErrNilMetaBlockPool
return err
}

headerNoncePool.Remove(header.GetNonce(), header.GetShardID())
metaBlockPool.Remove(headerHash)

body, ok := bodyHandler.(*block.MetaBlockBody)
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion process/block/metablock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ func TestMetaProcessor_CommitBlockNilNoncesDataPoolShouldErr(t *testing.T) {
blkc := createTestBlockchain()
err := mp.CommitBlock(blkc, hdr, body)

assert.Equal(t, process.ErrNilDataPoolHolder, err)
assert.Equal(t, process.ErrNilHeadersNoncesDataPool, err)
}

func TestMetaProcessor_CommitBlockNoTxInPoolShouldErr(t *testing.T) {
Expand Down
19 changes: 14 additions & 5 deletions process/block/shardblock.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,14 +601,18 @@ func (sp *shardProcessor) CommitBlock(

headerNoncePool := sp.dataPool.HeadersNonces()
if headerNoncePool == nil {
err = process.ErrNilDataPoolHolder
err = process.ErrNilHeadersNoncesDataPool
return err
}

//TODO: Should be analyzed if put in pool is really necessary or not (right now there is no action of removing them)
syncMap := &dataPool.ShardIdHashSyncMap{}
syncMap.Store(headerHandler.GetShardID(), headerHash)
headerNoncePool.Merge(headerHandler.GetNonce(), syncMap)
headersPool := sp.dataPool.Headers()
if headersPool == nil {
err = process.ErrNilHeadersDataPool
return err
}

headerNoncePool.Remove(header.GetNonce(), header.GetShardID())
headersPool.Remove(headerHash)

body, ok := bodyHandler.(block.Body)
if !ok {
Expand Down Expand Up @@ -1077,6 +1081,11 @@ func (sp *shardProcessor) receivedMetaBlock(metaBlockHash []byte) {
return
}

isMetaBlockOutOfRange := metaBlock.GetNonce() > lastNotarizedHdr.GetNonce()+process.MaxHeadersToRequestInAdvance
if isMetaBlockOutOfRange {
return
}

sp.txCoordinator.RequestMiniBlocks(metaBlock)
}

Expand Down
2 changes: 1 addition & 1 deletion process/block/shardblock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1723,7 +1723,7 @@ func TestShardProcessor_CommitBlockNilNoncesDataPoolShouldErr(t *testing.T) {
blkc := createTestBlockchain()
err := sp.CommitBlock(blkc, hdr, body)

assert.Equal(t, process.ErrNilDataPoolHolder, err)
assert.Equal(t, process.ErrNilHeadersNoncesDataPool, err)
}

func TestShardProcessor_CommitBlockNoTxInPoolShouldErr(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions process/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,6 @@ const MaxNoncesDifference = 5
// TODO - calculate exactly in case of the VM, for every VM to have a similar constant, operations / seconds
const MaxGasLimitPerMiniBlock = uint64(100000)
const MaxRequestsWithTimeoutAllowed = 3

// MaxHeadersToRequestInAdvance defines the maximum number of headers which will be requested in advance if they are missing
const MaxHeadersToRequestInAdvance = 10
2 changes: 1 addition & 1 deletion process/factory/metachain/interceptorsContainerFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (icf *interceptorsContainerFactory) generateMetablockInterceptor() ([]strin

interceptor, err := interceptors.NewMetachainHeaderInterceptor(
icf.marshalizer,
icf.dataPool.MetaChainBlocks(),
icf.dataPool.MetaBlocks(),
icf.dataPool.HeadersNonces(),
hdrValidator,
icf.multiSigner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func createDataPools() dataRetriever.MetaPoolsHolder {
MiniBlocksCalled: func() storage.Cacher {
return &mock.CacherStub{}
},
MetaChainBlocksCalled: func() storage.Cacher {
MetaBlocksCalled: func() storage.Cacher {
return &mock.CacherStub{}
},
HeadersNoncesCalled: func() dataRetriever.Uint64SyncMapCacher {
Expand Down
Loading

0 comments on commit b787395

Please sign in to comment.