Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use goroutines manager for trie commit #6611

Draft
wants to merge 10 commits into
base: refactor-trie-set-hash
Choose a base branch
from
14 changes: 9 additions & 5 deletions common/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,9 @@ type Trie interface {
Update(key, value []byte) error
Delete(key []byte)
RootHash() ([]byte, error)
Commit() error
Commit(collector TrieHashesCollector) error
Recreate(root []byte) (Trie, error)
RecreateFromEpoch(options RootHashHolder) (Trie, error)
String() string
GetObsoleteHashes() [][]byte
GetDirtyHashes() (ModifiedHashes, error)
GetOldRoot() []byte
GetSerializedNodes([]byte, uint64) ([][]byte, uint64, error)
GetSerializedNode([]byte) ([]byte, error)
GetAllLeavesOnChannel(allLeavesChan *TrieIteratorChannels, ctx context.Context, rootHash []byte, keyBuilder KeyBuilder, trieLeafParser TrieLeafParser) error
Expand Down Expand Up @@ -410,3 +406,11 @@ type AtomicBytesSlice interface {
Append(data [][]byte)
Get() [][]byte
}

// TrieHashesCollector defines the methods needed for collecting trie hashes
type TrieHashesCollector interface {
AddDirtyHash(hash []byte)
GetDirtyHashes() ModifiedHashes
AddObsoleteHashes(oldRootHash []byte, oldHashes [][]byte)
GetCollectedData() ([]byte, ModifiedHashes, ModifiedHashes)
}
3 changes: 2 additions & 1 deletion integrationTests/benchmarks/loadFromTrie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/multiversx/mx-chain-go/common"
disabledStatistics "github.com/multiversx/mx-chain-go/common/statistics/disabled"
"github.com/multiversx/mx-chain-go/integrationTests"
"github.com/multiversx/mx-chain-go/state/hashesCollector"
"github.com/multiversx/mx-chain-go/storage"
"github.com/multiversx/mx-chain-go/storage/database"
"github.com/multiversx/mx-chain-go/storage/storageunit"
Expand Down Expand Up @@ -120,7 +121,7 @@ func insertKeysIntoTrie(t *testing.T, tr common.Trie, numTrieLevels int, numChil

_, depth, _ := tr.Get(key)
require.Equal(t, uint32(numTrieLevels), depth+1)
_ = tr.Commit()
_ = tr.Commit(hashesCollector.NewDisabledHashesCollector())
return key
}

Expand Down
3 changes: 2 additions & 1 deletion integrationTests/longTests/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/multiversx/mx-chain-core-go/hashing/blake2b"
"github.com/multiversx/mx-chain-core-go/marshal"
"github.com/multiversx/mx-chain-go/integrationTests"
"github.com/multiversx/mx-chain-go/state/hashesCollector"
"github.com/multiversx/mx-chain-go/testscommon/enableEpochsHandlerMock"
"github.com/multiversx/mx-chain-go/testscommon/storage"
"github.com/multiversx/mx-chain-go/trie"
Expand Down Expand Up @@ -130,7 +131,7 @@ func TestWriteContinuouslyInTree(t *testing.T) {
if i%written == 0 {
endTime := time.Now()
diff := endTime.Sub(startTime)
_ = tr.Commit()
_ = tr.Commit(hashesCollector.NewDisabledHashesCollector())
fmt.Printf("Written %d, total %d in %f s\n", written, i, diff.Seconds())
startTime = time.Now()
}
Expand Down
17 changes: 11 additions & 6 deletions integrationTests/state/genesisState/genesisState_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ type testPair struct {
const generate32ByteSlices = 0
const generate32HexByteSlices = 1

type trieWithToString interface {
common.Trie
ToString() string
}

func TestCreationOfTheGenesisState(t *testing.T) {
if testing.Short() {
t.Skip("this is not a short test")
Expand Down Expand Up @@ -85,7 +90,7 @@ func TestExtensionNodeToBranchEdgeCaseSet1(t *testing.T) {
_ = tr1.Update([]byte(key3), []byte(val))

fmt.Println()
strTr1 := tr1.String()
strTr1 := tr1.(trieWithToString).ToString()
fmt.Println(strTr1)

hash1, _ := tr1.RootHash()
Expand All @@ -98,7 +103,7 @@ func TestExtensionNodeToBranchEdgeCaseSet1(t *testing.T) {
fmt.Printf("root hash2: %s\n", base64.StdEncoding.EncodeToString(hash2))

fmt.Println()
strTr2 := tr2.String()
strTr2 := tr2.(trieWithToString).ToString()
fmt.Println(strTr2)

assert.Equal(t, hash1, hash2)
Expand Down Expand Up @@ -126,7 +131,7 @@ func TestExtensionNodeToBranchEdgeCaseSet2(t *testing.T) {
_ = tr1.Update([]byte(key4), []byte(val))

fmt.Println()
strTr1 := tr1.String()
strTr1 := tr1.(trieWithToString).ToString()
fmt.Println(strTr1)

hash1, _ := tr1.RootHash()
Expand All @@ -140,7 +145,7 @@ func TestExtensionNodeToBranchEdgeCaseSet2(t *testing.T) {
_ = tr2.Update([]byte(key6), []byte(val))

fmt.Println()
strTr2 := tr2.String()
strTr2 := tr2.(trieWithToString).ToString()
fmt.Println(strTr2)

hash2, _ := tr2.RootHash()
Expand Down Expand Up @@ -299,12 +304,12 @@ func printTestDebugLines(

fmt.Println()
fmt.Println("Reference trie:")
strRefTrie := referenceTrie.String()
strRefTrie := referenceTrie.(trieWithToString).ToString()
fmt.Println(strRefTrie)

fmt.Println()
fmt.Println("Actual trie:")
strTr := tr.String()
strTr := tr.(trieWithToString).ToString()
fmt.Println(strTr)
}

Expand Down
10 changes: 8 additions & 2 deletions integrationTests/state/stateTrie/stateTrie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/multiversx/mx-chain-go/sharding"
"github.com/multiversx/mx-chain-go/state"
"github.com/multiversx/mx-chain-go/state/factory"
"github.com/multiversx/mx-chain-go/state/hashesCollector"
"github.com/multiversx/mx-chain-go/state/iteratorChannelsProvider"
"github.com/multiversx/mx-chain-go/state/lastSnapshotMarker"
"github.com/multiversx/mx-chain-go/state/storagePruningManager"
Expand Down Expand Up @@ -279,7 +280,7 @@ func TestTrieDB_RecreateFromStorageShouldWork(t *testing.T) {

_ = tr1.Update(key, value)
h1, _ := tr1.RootHash()
err := tr1.Commit()
err := tr1.Commit(hashesCollector.NewDisabledHashesCollector())
require.Nil(t, err)

tr2, err := tr1.Recreate(h1)
Expand Down Expand Up @@ -961,6 +962,11 @@ func TestAccountsDB_ExecALotOfBalanceTxOKorNOK(t *testing.T) {
integrationTests.PrintShardAccount(acntDest.(state.UserAccountHandler), "Destination")
}

type trieWithToString interface {
common.Trie
ToString() string
}

func BenchmarkCreateOneMillionAccountsWithMockDB(b *testing.B) {
nrOfAccounts := 1000000
balance := 1500000
Expand Down Expand Up @@ -994,7 +1000,7 @@ func BenchmarkCreateOneMillionAccountsWithMockDB(b *testing.B) {
core.ConvertBytes(rtm.Sys),
)

_ = tr.String()
_ = tr.(trieWithToString).ToString()
}

func BenchmarkCreateOneMillionAccounts(b *testing.B) {
Expand Down
7 changes: 4 additions & 3 deletions integrationTests/state/stateTrieClose/stateTrieClose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/multiversx/mx-chain-go/common"
"github.com/multiversx/mx-chain-go/common/errChan"
"github.com/multiversx/mx-chain-go/integrationTests"
"github.com/multiversx/mx-chain-go/state/hashesCollector"
"github.com/multiversx/mx-chain-go/state/parsers"
"github.com/multiversx/mx-chain-go/testscommon/enableEpochsHandlerMock"
"github.com/multiversx/mx-chain-go/testscommon/goroutines"
Expand All @@ -28,7 +29,7 @@ func TestPatriciaMerkleTrie_Close(t *testing.T) {
for i := 0; i < numLeavesToAdd; i++ {
_ = tr.Update([]byte(strconv.Itoa(i)), []byte(strconv.Itoa(i)))
}
_ = tr.Commit()
_ = tr.Commit(hashesCollector.NewDisabledHashesCollector())
time.Sleep(time.Second * 2) // allow the commit go routines to finish completely as to not alter the further counters

gc := goroutines.NewGoCounter(goroutines.TestsRelevantGoRoutines)
Expand Down Expand Up @@ -70,7 +71,7 @@ func TestPatriciaMerkleTrie_Close(t *testing.T) {
assert.Nil(t, err)

_ = tr.Update([]byte("god"), []byte("puppy"))
_ = tr.Commit()
_ = tr.Commit(hashesCollector.NewDisabledHashesCollector())

rootHash, _ = tr.RootHash()
leavesChannel1 = &common.TrieIteratorChannels{
Expand All @@ -91,7 +92,7 @@ func TestPatriciaMerkleTrie_Close(t *testing.T) {
assert.Nil(t, err)

_ = tr.Update([]byte("eggod"), []byte("cat"))
_ = tr.Commit()
_ = tr.Commit(hashesCollector.NewDisabledHashesCollector())

rootHash, _ = tr.RootHash()
leavesChannel2 := &common.TrieIteratorChannels{
Expand Down
5 changes: 3 additions & 2 deletions integrationTests/state/stateTrieSync/stateTrieSync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/multiversx/mx-chain-go/integrationTests"
"github.com/multiversx/mx-chain-go/process/factory"
"github.com/multiversx/mx-chain-go/state"
"github.com/multiversx/mx-chain-go/state/hashesCollector"
"github.com/multiversx/mx-chain-go/state/parsers"
"github.com/multiversx/mx-chain-go/state/syncer"
"github.com/multiversx/mx-chain-go/storage"
Expand Down Expand Up @@ -103,7 +104,7 @@ func testNodeRequestInterceptTrieNodesWithMessenger(t *testing.T, version int) {
_ = resolverTrie.Update([]byte(strconv.Itoa(i)), []byte(strconv.Itoa(i)))
}

_ = resolverTrie.Commit()
_ = resolverTrie.Commit(hashesCollector.NewDisabledHashesCollector())
rootHash, _ := resolverTrie.RootHash()

numLeaves := getNumLeaves(t, resolverTrie, rootHash)
Expand Down Expand Up @@ -224,7 +225,7 @@ func testNodeRequestInterceptTrieNodesWithMessengerNotSyncingShouldErr(t *testin
_ = resolverTrie.Update([]byte(strconv.Itoa(i)), []byte(strconv.Itoa(i)))
}

_ = resolverTrie.Commit()
_ = resolverTrie.Commit(hashesCollector.NewDisabledHashesCollector())
rootHash, _ := resolverTrie.RootHash()

numLeaves := getNumLeaves(t, resolverTrie, rootHash)
Expand Down
5 changes: 3 additions & 2 deletions integrationTests/vm/txsFee/migrateDataTrie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/multiversx/mx-chain-go/integrationTests/vm"
"github.com/multiversx/mx-chain-go/sharding"
"github.com/multiversx/mx-chain-go/state"
"github.com/multiversx/mx-chain-go/state/hashesCollector"
vmcommon "github.com/multiversx/mx-chain-vm-common-go"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -110,7 +111,7 @@ func TestMigrateDataTrieBuiltInFunc(t *testing.T) {
migrateDataTrie(t, testContext, sndAddr, gasPrice, gasLimit, vmcommon.Ok)
testGasConsumed(t, testContext, gasLimit, 20000)

err = dtr.Commit()
err = dtr.Commit(hashesCollector.NewDisabledHashesCollector())
require.Nil(t, err)

rootHash, err = dtr.RootHash()
Expand Down Expand Up @@ -162,7 +163,7 @@ func TestMigrateDataTrieBuiltInFunc(t *testing.T) {
migrateDataTrie(t, testContext, sndAddr, gasPrice, gasLimit, vmcommon.Ok)
numMigrateDataTrieCalls++

err = dtr.Commit()
err = dtr.Commit(hashesCollector.NewDisabledHashesCollector())
require.Nil(t, err)

rootHash, err = dtr.RootHash()
Expand Down
45 changes: 14 additions & 31 deletions state/accountsDB.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/multiversx/mx-chain-go/common"
"github.com/multiversx/mx-chain-go/common/errChan"
"github.com/multiversx/mx-chain-go/common/holders"
"github.com/multiversx/mx-chain-go/state/hashesCollector"
"github.com/multiversx/mx-chain-go/state/parsers"
"github.com/multiversx/mx-chain-go/trie/keyBuilder"
"github.com/multiversx/mx-chain-go/trie/statistics"
Expand Down Expand Up @@ -779,22 +780,26 @@ func (adb *AccountsDB) commit() ([]byte, error) {
log.Trace("accountsDB.Commit started")
adb.entries = make([]JournalEntry, 0)

oldHashes := make(common.ModifiedHashes)
newHashes := make(common.ModifiedHashes)
hc := hashesCollector.NewDisabledHashesCollector()
if adb.mainTrie.GetStorageManager().IsPruningEnabled() {
hc = hashesCollector.NewDataTrieHashesCollector()
}

// Step 1. commit all data tries
dataTries := adb.dataTries.GetAll()
for i := 0; i < len(dataTries); i++ {
err := adb.commitTrie(dataTries[i], oldHashes, newHashes)
err := dataTries[i].Commit(hc)
if err != nil {
return nil, err
}
}
adb.dataTries.Reset()

oldRoot := adb.mainTrie.GetOldRoot()

// Step 2. commit main trie
err := adb.commitTrie(adb.mainTrie, oldHashes, newHashes)
if adb.mainTrie.GetStorageManager().IsPruningEnabled() {
hc = hashesCollector.NewHashesCollector(hc)
}
err := adb.mainTrie.Commit(hc)
if err != nil {
return nil, err
}
Expand All @@ -805,7 +810,7 @@ func (adb *AccountsDB) commit() ([]byte, error) {
return nil, err
}

err = adb.markForEviction(oldRoot, newRoot, oldHashes, newHashes)
err = adb.markForEviction(newRoot, hc)
if err != nil {
return nil, err
}
Expand All @@ -820,36 +825,14 @@ func (adb *AccountsDB) commit() ([]byte, error) {
}

func (adb *AccountsDB) markForEviction(
oldRoot []byte,
newRoot []byte,
oldHashes common.ModifiedHashes,
newHashes common.ModifiedHashes,
collector common.TrieHashesCollector,
) error {
if !adb.mainTrie.GetStorageManager().IsPruningEnabled() {
return nil
}

return adb.storagePruningManager.MarkForEviction(oldRoot, newRoot, oldHashes, newHashes)
}

func (adb *AccountsDB) commitTrie(tr common.Trie, oldHashes common.ModifiedHashes, newHashes common.ModifiedHashes) error {
if adb.mainTrie.GetStorageManager().IsPruningEnabled() {
oldTrieHashes := tr.GetObsoleteHashes()
newTrieHashes, err := tr.GetDirtyHashes()
if err != nil {
return err
}

for _, hash := range oldTrieHashes {
oldHashes[string(hash)] = struct{}{}
}

for hash := range newTrieHashes {
newHashes[hash] = struct{}{}
}
}

return tr.Commit()
return adb.storagePruningManager.MarkForEviction(newRoot, collector)
}

// RootHash returns the main trie's root hash
Expand Down
Loading
Loading