Skip to content

Commit

Permalink
Merge pull request #512 from ElrondNetwork/EN-4247-save-sorted-rounds…
Browse files Browse the repository at this point in the history
…-info-in-es

EN-4247 : add timestamp in round info
  • Loading branch information
bogdan-rosianu authored Oct 4, 2019
2 parents e35fd4b + 2f8ec02 commit 061dac9
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 20 deletions.
11 changes: 10 additions & 1 deletion consensus/spos/commonSubround/subroundStartRound.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,16 @@ func (sr *SubroundStartRound) indexRoundIfNeeded(pubKeys []string) {
shardId := sr.ShardCoordinator().SelfId()
signersIndexes := sr.NodesCoordinator().GetValidatorsIndexes(pubKeys)
round := sr.Rounder().Index()
go sr.indexer.SaveRoundInfo(round, shardId, signersIndexes, false)

roundInfo := indexer.RoundInfo{
Index: uint64(round),
SignersIndexes: signersIndexes,
BlockWasProposed: false,
ShardId: shardId,
Timestamp: time.Duration(sr.RoundTimeStamp.Unix()),
}

go sr.indexer.SaveRoundInfo(roundInfo)
}

func (sr *SubroundStartRound) generateNextConsensusGroup(roundIndex int64) error {
Expand Down
8 changes: 5 additions & 3 deletions core/indexer/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ type ValidatorsPublicKeys struct {

// RoundInfo is a structure containing block signers and shard id
type RoundInfo struct {
SignersIndexes []uint64 `json:"signersIndexes"`
BlockWasProposed bool `json:"blockWasProposed"`
ShardId uint32 `json:"shardId"`
Index uint64 `json:"-"`
SignersIndexes []uint64 `json:"signersIndexes"`
BlockWasProposed bool `json:"blockWasProposed"`
ShardId uint32 `json:"shardId"`
Timestamp time.Duration `json:"timestamp"`
}

// TPS is a structure containing all the fields that need to
Expand Down
12 changes: 3 additions & 9 deletions core/indexer/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func NewElasticIndexer(
return nil, err
}

err = indexer.checkAndCreateIndex(roundIndex, nil)
err = indexer.checkAndCreateIndex(roundIndex, timestampMapping())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -241,15 +241,9 @@ func (ei *elasticIndexer) SaveBlock(
}

// SaveRoundInfo will save data about a round on elastic search
func (ei *elasticIndexer) SaveRoundInfo(round int64, shardId uint32, signersIndexes []uint64, blockWasProposed bool) {
func (ei *elasticIndexer) SaveRoundInfo(roundInfo RoundInfo) {
var buff bytes.Buffer

roundInfo := RoundInfo{
SignersIndexes: signersIndexes,
BlockWasProposed: blockWasProposed,
ShardId: shardId,
}

marshalizedRoundInfo, err := ei.marshalizer.Marshal(roundInfo)
if err != nil {
ei.logger.Warn("could not marshal signers indexes")
Expand All @@ -261,7 +255,7 @@ func (ei *elasticIndexer) SaveRoundInfo(round int64, shardId uint32, signersInde

req := esapi.IndexRequest{
Index: roundIndex,
DocumentID: strconv.Itoa(int(round)),
DocumentID: strconv.FormatInt(int64(roundInfo.Index), 10),
Body: bytes.NewReader(buff.Bytes()),
Refresh: "true",
}
Expand Down
2 changes: 1 addition & 1 deletion core/indexer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
// This could be an elasticsearch index, a MySql database or any other external services.
type Indexer interface {
SaveBlock(body data.BodyHandler, header data.HeaderHandler, txPool map[string]data.TransactionHandler, signersIndexes []uint64)
SaveRoundInfo(round int64, shardId uint32, signersIndexes []uint64, blockWasProposed bool)
SaveRoundInfo(roundInfo RoundInfo)
UpdateTPS(tpsBenchmark statistics.TPSBenchmark)
SaveValidatorsPubKeys(validatorsPubKeys map[uint32][][]byte)
IsInterfaceNil() bool
Expand Down
2 changes: 1 addition & 1 deletion core/indexer/nilIndexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func (ni *NilIndexer) SaveBlock(body data.BodyHandler, header data.HeaderHandler
}

// SaveRoundInfo will do nothing
func (ni *NilIndexer) SaveRoundInfo(round int64, shardId uint32, signersIndexes []uint64, blockWasProposed bool) {
func (ni *NilIndexer) SaveRoundInfo(info RoundInfo) {
return
}

Expand Down
3 changes: 2 additions & 1 deletion core/mock/indexerMock.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mock

import (
"github.com/ElrondNetwork/elrond-go/core/indexer"
"github.com/ElrondNetwork/elrond-go/core/statistics"
"github.com/ElrondNetwork/elrond-go/data"
"github.com/ElrondNetwork/elrond-go/data/block"
Expand All @@ -19,7 +20,7 @@ func (im *IndexerMock) UpdateTPS(tpsBenchmark statistics.TPSBenchmark) {
panic("implement me")
}

func (im *IndexerMock) SaveRoundInfo(round int64, shardId uint32, signersIndexes []uint64, blockWasProposed bool) {
func (im *IndexerMock) SaveRoundInfo(roundInfo indexer.RoundInfo) {
panic("implement me")
}

Expand Down
3 changes: 2 additions & 1 deletion node/mock/indexerMock.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mock

import (
"github.com/ElrondNetwork/elrond-go/core/indexer"
"github.com/ElrondNetwork/elrond-go/core/statistics"
"github.com/ElrondNetwork/elrond-go/data"
"github.com/ElrondNetwork/elrond-go/data/block"
Expand All @@ -19,7 +20,7 @@ func (im *IndexerMock) UpdateTPS(tpsBenchmark statistics.TPSBenchmark) {
panic("implement me")
}

func (im *IndexerMock) SaveRoundInfo(round int64, shardId uint32, signersIndexes []uint64, blockWasProposed bool) {
func (im *IndexerMock) SaveRoundInfo(roundInfo indexer.RoundInfo) {
panic("implement me")
}

Expand Down
9 changes: 9 additions & 0 deletions process/block/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,3 +310,12 @@ func (sp *shardProcessor) IsMiniBlockProcessed(metaBlockHash []byte, miniBlockHa
func (sp *shardProcessor) AddProcessedMiniBlock(metaBlockHash []byte, miniBlockHash []byte) {
sp.addProcessedMiniBlock(metaBlockHash, miniBlockHash)
}

func (sp *shardProcessor) CalculateRoundDuration(
lastBlockTimestamp uint64,
currentBlockTimestamp uint64,
lastBlockRound uint64,
currentBlockRound uint64,
) uint64 {
return sp.calculateRoundDuration(lastBlockTimestamp, currentBlockTimestamp, lastBlockRound, currentBlockRound)
}
41 changes: 39 additions & 2 deletions process/block/shardblock.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/ElrondNetwork/elrond-go/core"
"github.com/ElrondNetwork/elrond-go/core/indexer"
"github.com/ElrondNetwork/elrond-go/core/serviceContainer"
"github.com/ElrondNetwork/elrond-go/data"
"github.com/ElrondNetwork/elrond-go/data/block"
Expand Down Expand Up @@ -528,9 +529,16 @@ func (sp *shardProcessor) indexBlockIfNeeded(
}

signersIndexes := sp.nodesCoordinator.GetValidatorsIndexes(pubKeys)
roundInfo := indexer.RoundInfo{
Index: header.GetRound(),
SignersIndexes: signersIndexes,
BlockWasProposed: true,
ShardId: shardId,
Timestamp: time.Duration(header.GetTimeStamp()),
}

go sp.core.Indexer().SaveBlock(body, header, txPool, signersIndexes)
go sp.core.Indexer().SaveRoundInfo(int64(header.GetRound()), shardId, signersIndexes, true)
go sp.core.Indexer().SaveRoundInfo(roundInfo)

if lastBlockHeader == nil {
return
Expand All @@ -539,15 +547,44 @@ func (sp *shardProcessor) indexBlockIfNeeded(
lastBlockRound := lastBlockHeader.GetRound()
currentBlockRound := header.GetRound()

roundDuration := sp.calculateRoundDuration(lastBlockHeader.GetTimeStamp(), header.GetTimeStamp(), lastBlockRound, currentBlockRound)
for i := lastBlockRound + 1; i < currentBlockRound; i++ {
publicKeys, err := sp.nodesCoordinator.GetValidatorsPublicKeys(lastBlockHeader.GetRandSeed(), i, shardId)
if err != nil {
continue
}
signersIndexes = sp.nodesCoordinator.GetValidatorsIndexes(publicKeys)
go sp.core.Indexer().SaveRoundInfo(int64(i), shardId, signersIndexes, true)
roundInfo = indexer.RoundInfo{
Index: i,
SignersIndexes: signersIndexes,
BlockWasProposed: true,
ShardId: shardId,
Timestamp: time.Duration(header.GetTimeStamp() - ((currentBlockRound - i) * roundDuration)),
}

go sp.core.Indexer().SaveRoundInfo(roundInfo)
}
}

func (sp *shardProcessor) calculateRoundDuration(
lastBlockTimestamp uint64,
currentBlockTimestamp uint64,
lastBlockRound uint64,
currentBlockRound uint64,
) uint64 {
if lastBlockTimestamp >= currentBlockTimestamp {
log.Error("last block timestamp is greater or equals than current block timestamp")
return 0
}
if lastBlockRound >= currentBlockRound {
log.Error("last block round is greater or equals than current block round")
return 0
}

diffTimeStamp := currentBlockTimestamp - lastBlockTimestamp
diffRounds := currentBlockRound - lastBlockRound

return diffTimeStamp / diffRounds
}

// RestoreBlockIntoPools restores the TxBlock and MetaBlock into associated pools
Expand Down
15 changes: 15 additions & 0 deletions process/block/shardblock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4191,3 +4191,18 @@ func TestShardProcessor_RestoreMetaBlockIntoPoolVerifyMiniblocks(t *testing.T) {
assert.Nil(t, err)
assert.True(t, sp.IsMiniBlockProcessed(metaHash, testMBHash))
}

func TestNewShardProcessor_CalculateRoundDuration(t *testing.T) {
t.Parallel()

arguments := CreateMockArguments()
sp, _ := blproc.NewShardProcessor(arguments)
lastBlockTimestamp := uint64(80)
currentBlockTimestamp := uint64(100)
lastBlockRound := uint64(5)
currentBlockRound := uint64(10)
expectedRoundDuration := uint64(4)

roundDuration := sp.CalculateRoundDuration(lastBlockTimestamp, currentBlockTimestamp, lastBlockRound, currentBlockRound)
assert.Equal(t, expectedRoundDuration, roundDuration)
}
3 changes: 2 additions & 1 deletion process/mock/indexerMock.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mock

import (
"github.com/ElrondNetwork/elrond-go/core/indexer"
"github.com/ElrondNetwork/elrond-go/core/statistics"
"github.com/ElrondNetwork/elrond-go/data"
)
Expand All @@ -20,7 +21,7 @@ func (im *IndexerMock) UpdateTPS(tpsBenchmark statistics.TPSBenchmark) {
panic("implement me")
}

func (im *IndexerMock) SaveRoundInfo(round int64, shardId uint32, signersIndexes []uint64, blockWasProposed bool) {
func (im *IndexerMock) SaveRoundInfo(roundInfo indexer.RoundInfo) {
return
}

Expand Down

0 comments on commit 061dac9

Please sign in to comment.