Skip to content

Commit

Permalink
Merge pull request #3919 from ElrondNetwork/fix-tx-sorting
Browse files Browse the repository at this point in the history
Fixed the epoch activation in transactions pre-processors
  • Loading branch information
iulianpascalau authored Mar 20, 2022
2 parents 93fbdac + 34619b1 commit 6fa07c6
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 23 deletions.
9 changes: 6 additions & 3 deletions process/block/preprocess/basePreProcess.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ type txsForBlock struct {
txHashAndInfo map[string]*txInfo
}

// basePreProcess is the base struct for all pre-processors
// beware of calling basePreProcess.epochConfirmed in all extensions of this struct if the flags from the basePreProcess are
// used in those extensions instances
type basePreProcess struct {
gasTracker
hasher hashing.Hasher
Expand Down Expand Up @@ -245,7 +248,7 @@ func (bpp *basePreProcess) saveTransactionToStorage(
forBlock.mutTxsForBlock.RUnlock()

if txInfoFromMap == nil || txInfoFromMap.tx == nil {
log.Warn("basePreProcess.saveTransactionToStorage", "type", dataUnit, "txHash", txHash,"error", process.ErrMissingTransaction.Error())
log.Warn("basePreProcess.saveTransactionToStorage", "type", dataUnit, "txHash", txHash, "error", process.ErrMissingTransaction.Error())
return
}

Expand Down Expand Up @@ -478,8 +481,8 @@ func (bpp *basePreProcess) updateGasConsumedWithGasRefundedAndGasPenalized(
gasInfo.totalGasConsumedInSelfShard -= gasToBeSubtracted
}

// EpochConfirmed is called whenever a new epoch is confirmed
func (bpp *basePreProcess) EpochConfirmed(epoch uint32, _ uint64) {
// epochConfirmed is called whenever a new epoch is confirmed from the structs that extend this instance
func (bpp *basePreProcess) epochConfirmed(epoch uint32, _ uint64) {
bpp.flagOptimizeGasUsedInCrossMiniBlocks.SetValue(epoch >= bpp.optimizeGasUsedInCrossMiniBlocksEnableEpoch)
log.Debug("basePreProcess: optimize gas used in cross mini blocks", "enabled", bpp.flagOptimizeGasUsedInCrossMiniBlocks.IsSet())
bpp.flagFrontRunningProtection.SetValue(epoch >= bpp.frontRunningProtectionEnableEpoch)
Expand Down
6 changes: 3 additions & 3 deletions process/block/preprocess/smartContractResults.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func NewSmartContractResultPreprocessor(
balanceComputation: balanceComputation,
accounts: accounts,
pubkeyConverter: pubkeyConverter,

optimizeGasUsedInCrossMiniBlocksEnableEpoch: optimizeGasUsedInCrossMiniBlocksEnableEpoch,
}

Expand Down Expand Up @@ -646,7 +647,6 @@ func (scr *smartContractResults) isMiniBlockCorrect(mbType block.Type) bool {
}

// EpochConfirmed is called whenever a new epoch is confirmed
func (scr *smartContractResults) EpochConfirmed(epoch uint32, _ uint64) {
scr.flagOptimizeGasUsedInCrossMiniBlocks.SetValue(epoch >= scr.optimizeGasUsedInCrossMiniBlocksEnableEpoch)
log.Debug("smartContractResults: optimize gas used in cross mini blocks", "enabled", scr.flagOptimizeGasUsedInCrossMiniBlocks.IsSet())
func (scr *smartContractResults) EpochConfirmed(epoch uint32, timestamp uint64) {
scr.epochConfirmed(epoch, timestamp)
}
19 changes: 19 additions & 0 deletions process/block/preprocess/smartContractResults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1395,3 +1395,22 @@ func TestSmartContractResults_GetAllCurrentUsedTxs(t *testing.T) {
retMap := scrPreproc.GetAllCurrentUsedTxs()
assert.NotNil(t, retMap)
}

func TestSmartContractResults_EpochConfirmed(t *testing.T) {
t.Parallel()

srcs := smartContractResults{
basePreProcess: &basePreProcess{
optimizeGasUsedInCrossMiniBlocksEnableEpoch: 1,
},
}

srcs.EpochConfirmed(0, 0)
assert.False(t, srcs.flagOptimizeGasUsedInCrossMiniBlocks.IsSet())

srcs.EpochConfirmed(1, 0)
assert.True(t, srcs.flagOptimizeGasUsedInCrossMiniBlocks.IsSet())

srcs.EpochConfirmed(2, 0)
assert.True(t, srcs.flagOptimizeGasUsedInCrossMiniBlocks.IsSet())
}
13 changes: 7 additions & 6 deletions process/block/preprocess/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func NewTransactionPreprocessor(
balanceComputation: args.BalanceComputation,
accounts: args.Accounts,
pubkeyConverter: args.PubkeyConverter,

optimizeGasUsedInCrossMiniBlocksEnableEpoch: args.OptimizeGasUsedInCrossMiniBlocksEnableEpoch,
frontRunningProtectionEnableEpoch: args.FrontRunningProtectionEnableEpoch,
}
Expand Down Expand Up @@ -735,6 +736,7 @@ func (txs *transactions) CreateBlockStarted() {
txs.scheduledTxsExecutionHandler.Init()
}

// AddTxsFromMiniBlocks will add the transactions from the provided miniblocks into the internal cache
func (txs *transactions) AddTxsFromMiniBlocks(miniBlocks block.MiniBlockSlice) {
for _, mb := range miniBlocks {
if !txs.isMiniBlockCorrect(mb.Type) {
Expand Down Expand Up @@ -777,7 +779,7 @@ func (txs *transactions) AddTransactions(txHandlers []data.TransactionHandler) {
senderShardID := txs.getShardFromAddress(tx.GetSndAddr())
receiverShardID := txs.getShardFromAddress(tx.GetRcvAddr())
txShardInfoToSet := &txShardInfo{senderShardID: senderShardID, receiverShardID: receiverShardID}
txHash, err:= core.CalculateHash(txs.marshalizer, txs.hasher, tx)
txHash, err := core.CalculateHash(txs.marshalizer, txs.hasher, tx)
if err != nil {
log.Warn("transactions.AddTransactions CalculateHash", "error", err.Error())
continue
Expand Down Expand Up @@ -1299,7 +1301,7 @@ func (txs *transactions) splitMiniBlockBasedOnMaxGasLimitIfNeeded(miniBlock *blo
gasLimitInReceiverShard := uint64(0)

for _, txHash := range miniBlock.TxHashes {
txInfo, ok := txs.txsForCurrBlock.txHashAndInfo[string(txHash)]
txInfoInstance, ok := txs.txsForCurrBlock.txHashAndInfo[string(txHash)]
if !ok {
log.Warn("transactions.splitMiniBlockIfNeeded: missing tx", "hash", txHash)
currentMiniBlock.TxHashes = append(currentMiniBlock.TxHashes, txHash)
Expand All @@ -1309,7 +1311,7 @@ func (txs *transactions) splitMiniBlockBasedOnMaxGasLimitIfNeeded(miniBlock *blo
_, gasProvidedByTxInReceiverShard, err := txs.computeGasProvidedByTx(
miniBlock.SenderShardID,
miniBlock.ReceiverShardID,
txInfo.tx,
txInfoInstance.tx,
txHash)
if err != nil {
log.Warn("transactions.splitMiniBlockIfNeeded: failed to compute gas consumed by tx", "hash", txHash, "error", err.Error())
Expand Down Expand Up @@ -1564,9 +1566,8 @@ func (txs *transactions) GetAllCurrentUsedTxs() map[string]data.TransactionHandl
}

// EpochConfirmed is called whenever a new epoch is confirmed
func (txs *transactions) EpochConfirmed(epoch uint32, _ uint64) {
txs.flagOptimizeGasUsedInCrossMiniBlocks.SetValue(epoch >= txs.optimizeGasUsedInCrossMiniBlocksEnableEpoch)
log.Debug("transactions: optimize gas used in cross mini blocks", "enabled", txs.flagOptimizeGasUsedInCrossMiniBlocks.IsSet())
func (txs *transactions) EpochConfirmed(epoch uint32, timestamp uint64) {
txs.epochConfirmed(epoch, timestamp)

txs.flagScheduledMiniBlocks.SetValue(epoch >= txs.scheduledMiniBlocksEnableEpoch)
log.Debug("transactions: scheduled mini blocks", "enabled", txs.flagScheduledMiniBlocks.IsSet())
Expand Down
111 changes: 104 additions & 7 deletions process/block/preprocess/transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package preprocess

import (
"bytes"
"crypto/rand"
"encoding/hex"
"errors"
"fmt"
Expand All @@ -12,12 +13,14 @@ import (

"github.com/ElrondNetwork/elrond-go-core/core"
"github.com/ElrondNetwork/elrond-go-core/core/atomic"
"github.com/ElrondNetwork/elrond-go-core/core/pubkeyConverter"
"github.com/ElrondNetwork/elrond-go-core/data"
"github.com/ElrondNetwork/elrond-go-core/data/block"
"github.com/ElrondNetwork/elrond-go-core/data/rewardTx"
"github.com/ElrondNetwork/elrond-go-core/data/smartContractResult"
"github.com/ElrondNetwork/elrond-go-core/data/transaction"
"github.com/ElrondNetwork/elrond-go-core/hashing"
"github.com/ElrondNetwork/elrond-go-core/hashing/blake2b"
"github.com/ElrondNetwork/elrond-go-core/hashing/sha256"
"github.com/ElrondNetwork/elrond-go-core/marshal"
"github.com/ElrondNetwork/elrond-go/common"
Expand Down Expand Up @@ -480,7 +483,7 @@ func TestTransactionPreprocessor_ReceivedTransactionShouldEraseRequested(t *test

txs := createGoodPreprocessor(dataPool)

//add 3 tx hashes on requested list
// add 3 tx hashes on requested list
txHash1 := []byte("tx hash 1")
txHash2 := []byte("tx hash 2")
txHash3 := []byte("tx hash 3")
Expand All @@ -491,15 +494,15 @@ func TestTransactionPreprocessor_ReceivedTransactionShouldEraseRequested(t *test

txs.SetMissingTxs(3)

//received txHash2
// received txHash2
txs.ReceivedTransaction(txHash2, &txcache.WrappedTransaction{Tx: &transaction.Transaction{}})

assert.True(t, txs.IsTxHashRequested(txHash1))
assert.False(t, txs.IsTxHashRequested(txHash2))
assert.True(t, txs.IsTxHashRequested(txHash3))
}

//------- GetAllTxsFromMiniBlock
// ------- GetAllTxsFromMiniBlock

func computeHash(data interface{}, marshalizer marshal.Marshalizer, hasher hashing.Hasher) []byte {
buff, _ := marshalizer.Marshal(data)
Expand All @@ -522,7 +525,7 @@ func TestTransactionPreprocessor_GetAllTxsFromMiniBlockShouldWork(t *testing.T)
}
transactionsHashes := make([][]byte, len(txsSlice))

//add defined transactions to sender-destination cacher
// add defined transactions to sender-destination cacher
for idx, tx := range txsSlice {
transactionsHashes[idx] = computeHash(tx, marshalizer, hasher)

Expand All @@ -534,7 +537,7 @@ func TestTransactionPreprocessor_GetAllTxsFromMiniBlockShouldWork(t *testing.T)
)
}

//add some random data
// add some random data
txRandom := &transaction.Transaction{Nonce: 4}
dataPool.Transactions().AddData(
computeHash(txRandom, marshalizer, hasher),
Expand All @@ -557,9 +560,9 @@ func TestTransactionPreprocessor_GetAllTxsFromMiniBlockShouldWork(t *testing.T)
assert.Equal(t, len(txsSlice), len(txsRetrieved))
assert.Equal(t, len(txsSlice), len(txHashesRetrieved))
for idx, tx := range txsSlice {
//txReceived should be all txs in the same order
// txReceived should be all txs in the same order
assert.Equal(t, txsRetrieved[idx], tx)
//verify corresponding transaction hashes
// verify corresponding transaction hashes
assert.Equal(t, txHashesRetrieved[idx], computeHash(tx, marshalizer, hasher))
}
}
Expand Down Expand Up @@ -1770,3 +1773,97 @@ func TestTransactions_AddTransactions(t *testing.T) {
require.Equal(t, 2, numTxsSaved)
})
}

func TestSortTransactionsBySenderAndNonceWithFrontRunningProtection_TestnetBids(t *testing.T) {
txPreproc := transactions{
basePreProcess: &basePreProcess{
hasher: blake2b.NewBlake2b(),
},
}

addresses := []string{
"erd1lr7k9z8l6lgud6709pr3lnm84mfnqqrj40rq66n4rtassfyvcl8starqtf",
"erd1pvr8n50q9tqvng03c450d3ac4pz5dt0gxedvvf80rj9r77s3ds0swj33ea",
"erd1xls5cejdna07m3jptt43trhhcw39hz5xe673d6lmfnapmcxz9a3s88ycvk",
"erd18ljvzsj74ehku7ej80lm35jsxdcxxrwc9t5swkgkyzayep52qe2sujv9xj",
"erd1qrzudpvn7xmqvx8w0sc726arp4rpuxxw5zk87rjh9yy3v09knjas9w9077",
"erd18dp32dj2gm626uhtd3mezkd24phzev2gmef06y9fs4f94uyy4swsllu24j",
"erd19rywmefgq6m0ddmwv9uc23ns7q8s236hag2qp9h8aps0cnxf9qnsnyavkx",
"erd1hshz86ke95z58920xl59jnakv5ppmsfarwtump6scjjcyfr9zxwsd0cy8y",
"erd13l5pgsz32u2t7mpanr9hyalahn2newj6ew85s8pgaln5kglm5s3s7w657h",
}
bch32, _ := pubkeyConverter.NewBech32PubkeyConverter(32, log)
txs := make([]*txcache.WrappedTransaction, 0)

for idx, addr := range addresses {
addrBytes, _ := bch32.Decode(addr)
txs = append(txs, &txcache.WrappedTransaction{
Tx: &transaction.Transaction{Nonce: 2, SndAddr: addrBytes}, TxHash: []byte(fmt.Sprintf("hash%d", idx)),
})
}

numWinsForAddresses := make(map[string]int)
numCalls := 10000
for i := 0; i < numCalls; i++ {
randomness := make([]byte, 32)
_, _ = rand.Read(randomness)
txPreproc.sortTransactionsBySenderAndNonceWithFrontRunningProtection(txs, randomness)
winner := bch32.Encode(txs[0].Tx.GetSndAddr())
numWinsForAddresses[winner]++
}

expectedWinsPerSender := numCalls / len(addresses)
allowedDifferencePercent := 10
allowedDelta := allowedDifferencePercent * expectedWinsPerSender / 100
minWins := expectedWinsPerSender - allowedDelta
maxWins := expectedWinsPerSender + allowedDelta

log.Info("test parameters",
"num calls", numCalls,
"expected wins per sender", expectedWinsPerSender,
"delta", allowedDelta,
"min wins", minWins,
"max wins", maxWins)

for addr, wins := range numWinsForAddresses {
log.Info("address wins", "address", addr, "num wins", wins)
assert.True(t, minWins <= wins && wins <= maxWins)
}
}

func TestTransactions_EpochConfirmed(t *testing.T) {
t.Parallel()

txs := transactions{
basePreProcess: &basePreProcess{
frontRunningProtectionEnableEpoch: 1,
optimizeGasUsedInCrossMiniBlocksEnableEpoch: 2,
},
scheduledMiniBlocksEnableEpoch: 3,
}

txs.EpochConfirmed(0, 0)
assert.False(t, txs.flagFrontRunningProtection.IsSet())
assert.False(t, txs.flagOptimizeGasUsedInCrossMiniBlocks.IsSet())
assert.False(t, txs.flagScheduledMiniBlocks.IsSet())

txs.EpochConfirmed(1, 0)
assert.True(t, txs.flagFrontRunningProtection.IsSet())
assert.False(t, txs.flagOptimizeGasUsedInCrossMiniBlocks.IsSet())
assert.False(t, txs.flagScheduledMiniBlocks.IsSet())

txs.EpochConfirmed(2, 0)
assert.True(t, txs.flagFrontRunningProtection.IsSet())
assert.True(t, txs.flagOptimizeGasUsedInCrossMiniBlocks.IsSet())
assert.False(t, txs.flagScheduledMiniBlocks.IsSet())

txs.EpochConfirmed(3, 0)
assert.True(t, txs.flagFrontRunningProtection.IsSet())
assert.True(t, txs.flagOptimizeGasUsedInCrossMiniBlocks.IsSet())
assert.True(t, txs.flagScheduledMiniBlocks.IsSet())

txs.EpochConfirmed(4, 0)
assert.True(t, txs.flagFrontRunningProtection.IsSet())
assert.True(t, txs.flagOptimizeGasUsedInCrossMiniBlocks.IsSet())
assert.True(t, txs.flagScheduledMiniBlocks.IsSet())
}
8 changes: 4 additions & 4 deletions process/block/shardblock.go
Original file line number Diff line number Diff line change
Expand Up @@ -1454,15 +1454,15 @@ func (sp *shardProcessor) addProcessedCrossMiniBlocksFromHeader(header data.Head

sp.hdrsForCurrBlock.mutHdrsForBlock.RLock()
for _, metaBlockHash := range shardHeader.GetMetaBlockHashes() {
headerInfo, ok := sp.hdrsForCurrBlock.hdrHashAndInfo[string(metaBlockHash)]
if !ok {
headerInfo, found := sp.hdrsForCurrBlock.hdrHashAndInfo[string(metaBlockHash)]
if !found {
sp.hdrsForCurrBlock.mutHdrsForBlock.RUnlock()
return fmt.Errorf("%w : addProcessedCrossMiniBlocksFromHeader metaBlockHash = %s",
process.ErrMissingHeader, logger.DisplayByteSlice(metaBlockHash))
}

metaBlock, ok := headerInfo.hdr.(*block.MetaBlock)
if !ok {
metaBlock, isMetaBlock := headerInfo.hdr.(*block.MetaBlock)
if !isMetaBlock {
sp.hdrsForCurrBlock.mutHdrsForBlock.RUnlock()
return process.ErrWrongTypeAssertion
}
Expand Down

0 comments on commit 6fa07c6

Please sign in to comment.