diff --git a/consensus/spos/commonSubround/subroundStartRound.go b/consensus/spos/commonSubround/subroundStartRound.go index 217ae51ad20..886e3d48359 100644 --- a/consensus/spos/commonSubround/subroundStartRound.go +++ b/consensus/spos/commonSubround/subroundStartRound.go @@ -201,7 +201,16 @@ func (sr *SubroundStartRound) indexRoundIfNeeded(pubKeys []string) { shardId := sr.ShardCoordinator().SelfId() signersIndexes := sr.NodesCoordinator().GetValidatorsIndexes(pubKeys) round := sr.Rounder().Index() - go sr.indexer.SaveRoundInfo(round, shardId, signersIndexes, false) + + roundInfo := indexer.RoundInfo{ + Index: uint64(round), + SignersIndexes: signersIndexes, + BlockWasProposed: false, + ShardId: shardId, + Timestamp: time.Duration(sr.RoundTimeStamp.Unix()), + } + + go sr.indexer.SaveRoundInfo(roundInfo) } func (sr *SubroundStartRound) generateNextConsensusGroup(roundIndex int64) error { diff --git a/core/indexer/data.go b/core/indexer/data.go index ef7011046f4..2bce4546333 100644 --- a/core/indexer/data.go +++ b/core/indexer/data.go @@ -52,9 +52,11 @@ type ValidatorsPublicKeys struct { // RoundInfo is a structure containing block signers and shard id type RoundInfo struct { - SignersIndexes []uint64 `json:"signersIndexes"` - BlockWasProposed bool `json:"blockWasProposed"` - ShardId uint32 `json:"shardId"` + Index uint64 `json:"-"` + SignersIndexes []uint64 `json:"signersIndexes"` + BlockWasProposed bool `json:"blockWasProposed"` + ShardId uint32 `json:"shardId"` + Timestamp time.Duration `json:"timestamp"` } // TPS is a structure containing all the fields that need to diff --git a/core/indexer/elasticsearch.go b/core/indexer/elasticsearch.go index 0da6da2956b..ae8de78fd35 100644 --- a/core/indexer/elasticsearch.go +++ b/core/indexer/elasticsearch.go @@ -120,7 +120,7 @@ func NewElasticIndexer( return nil, err } - err = indexer.checkAndCreateIndex(roundIndex, nil) + err = indexer.checkAndCreateIndex(roundIndex, timestampMapping()) if err != nil { return nil, err } @@ -241,15 +241,9 @@ func (ei *elasticIndexer) SaveBlock( } // SaveRoundInfo will save data about a round on elastic search -func (ei *elasticIndexer) SaveRoundInfo(round int64, shardId uint32, signersIndexes []uint64, blockWasProposed bool) { +func (ei *elasticIndexer) SaveRoundInfo(roundInfo RoundInfo) { var buff bytes.Buffer - roundInfo := RoundInfo{ - SignersIndexes: signersIndexes, - BlockWasProposed: blockWasProposed, - ShardId: shardId, - } - marshalizedRoundInfo, err := ei.marshalizer.Marshal(roundInfo) if err != nil { ei.logger.Warn("could not marshal signers indexes") @@ -261,7 +255,7 @@ func (ei *elasticIndexer) SaveRoundInfo(round int64, shardId uint32, signersInde req := esapi.IndexRequest{ Index: roundIndex, - DocumentID: strconv.Itoa(int(round)), + DocumentID: strconv.FormatInt(int64(roundInfo.Index), 10), Body: bytes.NewReader(buff.Bytes()), Refresh: "true", } diff --git a/core/indexer/interface.go b/core/indexer/interface.go index 9d9b327ec1d..ad29e9aa896 100644 --- a/core/indexer/interface.go +++ b/core/indexer/interface.go @@ -9,7 +9,7 @@ import ( // This could be an elasticsearch index, a MySql database or any other external services. type Indexer interface { SaveBlock(body data.BodyHandler, header data.HeaderHandler, txPool map[string]data.TransactionHandler, signersIndexes []uint64) - SaveRoundInfo(round int64, shardId uint32, signersIndexes []uint64, blockWasProposed bool) + SaveRoundInfo(roundInfo RoundInfo) UpdateTPS(tpsBenchmark statistics.TPSBenchmark) SaveValidatorsPubKeys(validatorsPubKeys map[uint32][][]byte) IsInterfaceNil() bool diff --git a/core/indexer/nilIndexer.go b/core/indexer/nilIndexer.go index ad252de9f1f..a63e08462c9 100644 --- a/core/indexer/nilIndexer.go +++ b/core/indexer/nilIndexer.go @@ -20,7 +20,7 @@ func (ni *NilIndexer) SaveBlock(body data.BodyHandler, header data.HeaderHandler } // SaveRoundInfo will do nothing -func (ni *NilIndexer) SaveRoundInfo(round int64, shardId uint32, signersIndexes []uint64, blockWasProposed bool) { +func (ni *NilIndexer) SaveRoundInfo(info RoundInfo) { return } diff --git a/core/mock/indexerMock.go b/core/mock/indexerMock.go index e1adb30ed97..7f43e764569 100644 --- a/core/mock/indexerMock.go +++ b/core/mock/indexerMock.go @@ -1,6 +1,7 @@ package mock import ( + "github.com/ElrondNetwork/elrond-go/core/indexer" "github.com/ElrondNetwork/elrond-go/core/statistics" "github.com/ElrondNetwork/elrond-go/data" "github.com/ElrondNetwork/elrond-go/data/block" @@ -19,7 +20,7 @@ func (im *IndexerMock) UpdateTPS(tpsBenchmark statistics.TPSBenchmark) { panic("implement me") } -func (im *IndexerMock) SaveRoundInfo(round int64, shardId uint32, signersIndexes []uint64, blockWasProposed bool) { +func (im *IndexerMock) SaveRoundInfo(roundInfo indexer.RoundInfo) { panic("implement me") } diff --git a/node/mock/indexerMock.go b/node/mock/indexerMock.go index e1adb30ed97..7f43e764569 100644 --- a/node/mock/indexerMock.go +++ b/node/mock/indexerMock.go @@ -1,6 +1,7 @@ package mock import ( + "github.com/ElrondNetwork/elrond-go/core/indexer" "github.com/ElrondNetwork/elrond-go/core/statistics" "github.com/ElrondNetwork/elrond-go/data" "github.com/ElrondNetwork/elrond-go/data/block" @@ -19,7 +20,7 @@ func (im *IndexerMock) UpdateTPS(tpsBenchmark statistics.TPSBenchmark) { panic("implement me") } -func (im *IndexerMock) SaveRoundInfo(round int64, shardId uint32, signersIndexes []uint64, blockWasProposed bool) { +func (im *IndexerMock) SaveRoundInfo(roundInfo indexer.RoundInfo) { panic("implement me") } diff --git a/process/block/export_test.go b/process/block/export_test.go index 17ae9410ae3..651a70c4834 100644 --- a/process/block/export_test.go +++ b/process/block/export_test.go @@ -310,3 +310,12 @@ func (sp *shardProcessor) IsMiniBlockProcessed(metaBlockHash []byte, miniBlockHa func (sp *shardProcessor) AddProcessedMiniBlock(metaBlockHash []byte, miniBlockHash []byte) { sp.addProcessedMiniBlock(metaBlockHash, miniBlockHash) } + +func (sp *shardProcessor) CalculateRoundDuration( + lastBlockTimestamp uint64, + currentBlockTimestamp uint64, + lastBlockRound uint64, + currentBlockRound uint64, +) uint64 { + return sp.calculateRoundDuration(lastBlockTimestamp, currentBlockTimestamp, lastBlockRound, currentBlockRound) +} diff --git a/process/block/shardblock.go b/process/block/shardblock.go index 07631e23735..d542ba699b8 100644 --- a/process/block/shardblock.go +++ b/process/block/shardblock.go @@ -7,6 +7,7 @@ import ( "time" "github.com/ElrondNetwork/elrond-go/core" + "github.com/ElrondNetwork/elrond-go/core/indexer" "github.com/ElrondNetwork/elrond-go/core/serviceContainer" "github.com/ElrondNetwork/elrond-go/data" "github.com/ElrondNetwork/elrond-go/data/block" @@ -528,9 +529,16 @@ func (sp *shardProcessor) indexBlockIfNeeded( } signersIndexes := sp.nodesCoordinator.GetValidatorsIndexes(pubKeys) + roundInfo := indexer.RoundInfo{ + Index: header.GetRound(), + SignersIndexes: signersIndexes, + BlockWasProposed: true, + ShardId: shardId, + Timestamp: time.Duration(header.GetTimeStamp()), + } go sp.core.Indexer().SaveBlock(body, header, txPool, signersIndexes) - go sp.core.Indexer().SaveRoundInfo(int64(header.GetRound()), shardId, signersIndexes, true) + go sp.core.Indexer().SaveRoundInfo(roundInfo) if lastBlockHeader == nil { return @@ -539,15 +547,44 @@ func (sp *shardProcessor) indexBlockIfNeeded( lastBlockRound := lastBlockHeader.GetRound() currentBlockRound := header.GetRound() + roundDuration := sp.calculateRoundDuration(lastBlockHeader.GetTimeStamp(), header.GetTimeStamp(), lastBlockRound, currentBlockRound) for i := lastBlockRound + 1; i < currentBlockRound; i++ { publicKeys, err := sp.nodesCoordinator.GetValidatorsPublicKeys(lastBlockHeader.GetRandSeed(), i, shardId) if err != nil { continue } signersIndexes = sp.nodesCoordinator.GetValidatorsIndexes(publicKeys) - go sp.core.Indexer().SaveRoundInfo(int64(i), shardId, signersIndexes, true) + roundInfo = indexer.RoundInfo{ + Index: i, + SignersIndexes: signersIndexes, + BlockWasProposed: true, + ShardId: shardId, + Timestamp: time.Duration(header.GetTimeStamp() - ((currentBlockRound - i) * roundDuration)), + } + + go sp.core.Indexer().SaveRoundInfo(roundInfo) + } +} + +func (sp *shardProcessor) calculateRoundDuration( + lastBlockTimestamp uint64, + currentBlockTimestamp uint64, + lastBlockRound uint64, + currentBlockRound uint64, +) uint64 { + if lastBlockTimestamp >= currentBlockTimestamp { + log.Error("last block timestamp is greater or equals than current block timestamp") + return 0 } + if lastBlockRound >= currentBlockRound { + log.Error("last block round is greater or equals than current block round") + return 0 + } + + diffTimeStamp := currentBlockTimestamp - lastBlockTimestamp + diffRounds := currentBlockRound - lastBlockRound + return diffTimeStamp / diffRounds } // RestoreBlockIntoPools restores the TxBlock and MetaBlock into associated pools diff --git a/process/block/shardblock_test.go b/process/block/shardblock_test.go index d2750dbc509..4154123adbb 100644 --- a/process/block/shardblock_test.go +++ b/process/block/shardblock_test.go @@ -4191,3 +4191,18 @@ func TestShardProcessor_RestoreMetaBlockIntoPoolVerifyMiniblocks(t *testing.T) { assert.Nil(t, err) assert.True(t, sp.IsMiniBlockProcessed(metaHash, testMBHash)) } + +func TestNewShardProcessor_CalculateRoundDuration(t *testing.T) { + t.Parallel() + + arguments := CreateMockArguments() + sp, _ := blproc.NewShardProcessor(arguments) + lastBlockTimestamp := uint64(80) + currentBlockTimestamp := uint64(100) + lastBlockRound := uint64(5) + currentBlockRound := uint64(10) + expectedRoundDuration := uint64(4) + + roundDuration := sp.CalculateRoundDuration(lastBlockTimestamp, currentBlockTimestamp, lastBlockRound, currentBlockRound) + assert.Equal(t, expectedRoundDuration, roundDuration) +} diff --git a/process/mock/indexerMock.go b/process/mock/indexerMock.go index 5bb645a3d81..2d2552a6da5 100644 --- a/process/mock/indexerMock.go +++ b/process/mock/indexerMock.go @@ -1,6 +1,7 @@ package mock import ( + "github.com/ElrondNetwork/elrond-go/core/indexer" "github.com/ElrondNetwork/elrond-go/core/statistics" "github.com/ElrondNetwork/elrond-go/data" ) @@ -20,7 +21,7 @@ func (im *IndexerMock) UpdateTPS(tpsBenchmark statistics.TPSBenchmark) { panic("implement me") } -func (im *IndexerMock) SaveRoundInfo(round int64, shardId uint32, signersIndexes []uint64, blockWasProposed bool) { +func (im *IndexerMock) SaveRoundInfo(roundInfo indexer.RoundInfo) { return }