Skip to content

Commit

Permalink
Merge pull request #4533 from ElrondNetwork/intercepted-trie-nodes-re…
Browse files Browse the repository at this point in the history
…factor

Intercepted trie nodes are no longer saved as unmarshalled entities in cacher
  • Loading branch information
gabi-vuls authored Oct 2, 2022
2 parents a7f97d6 + b46027b commit 78a0427
Show file tree
Hide file tree
Showing 24 changed files with 905 additions and 91 deletions.
5 changes: 3 additions & 2 deletions cmd/node/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -903,8 +903,9 @@
[TrieSync]
NumConcurrentTrieSyncers = 200
MaxHardCapForMissingNodes = 5000
#available versions: 1 and 2. 1 is the initial version, 2 is updated, more efficient version
TrieSyncerVersion = 2
#available versions: 1, 2 and 3. 1 is the initial version, 2 is updated, more efficient version employing 2 lists
#the 3-rd one uses depth-first algorithm which keeps the memory consumption low
TrieSyncerVersion = 3
CheckNodesOnDisk = false

[Resolvers]
Expand Down
1 change: 0 additions & 1 deletion common/statistics/resourceMonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ func GetRuntimeStatistics() []interface{} {
return []interface{}{
"timestamp", time.Now().Unix(),
"num go", runtime.NumGoroutine(),
"alloc", core.ConvertBytes(memStats.Alloc),
"heap alloc", core.ConvertBytes(memStats.HeapAlloc),
"heap idle", core.ConvertBytes(memStats.HeapIdle),
"heap inuse", core.ConvertBytes(memStats.HeapInuse),
Expand Down
40 changes: 34 additions & 6 deletions integrationTests/state/stateTrieSync/stateTrieSync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ func createTestProcessorNodeAndTrieStorage(
}

func TestNode_RequestInterceptTrieNodesWithMessenger(t *testing.T) {
t.Run("test with double lists version", func(t *testing.T) {
testNodeRequestInterceptTrieNodesWithMessenger(t, 2)
})
t.Run("test with depth version", func(t *testing.T) {
testNodeRequestInterceptTrieNodesWithMessenger(t, 3)
})
}

func testNodeRequestInterceptTrieNodesWithMessenger(t *testing.T, version int) {
if testing.Short() {
t.Skip("this is not a short test")
}
Expand Down Expand Up @@ -120,7 +129,7 @@ func TestNode_RequestInterceptTrieNodesWithMessenger(t *testing.T) {
MaxHardCapForMissingNodes: 10000,
CheckNodesOnDisk: false,
}
trieSyncer, _ := trie.NewDoubleListTrieSyncer(arg)
trieSyncer, _ := trie.CreateTrieSyncer(arg, version)

ctxPrint, cancel := context.WithCancel(context.Background())
go printStatistics(ctxPrint, tss)
Expand Down Expand Up @@ -179,6 +188,15 @@ func printStatistics(ctx context.Context, stats common.SizeSyncStatisticsHandler
}

func TestNode_RequestInterceptTrieNodesWithMessengerNotSyncingShouldErr(t *testing.T) {
t.Run("test with double lists version", func(t *testing.T) {
testNodeRequestInterceptTrieNodesWithMessengerNotSyncingShouldErr(t, 2)
})
t.Run("test with depth version", func(t *testing.T) {
testNodeRequestInterceptTrieNodesWithMessengerNotSyncingShouldErr(t, 3)
})
}

func testNodeRequestInterceptTrieNodesWithMessengerNotSyncingShouldErr(t *testing.T, version int) {
if testing.Short() {
t.Skip("this is not a short test")
}
Expand Down Expand Up @@ -251,7 +269,7 @@ func TestNode_RequestInterceptTrieNodesWithMessengerNotSyncingShouldErr(t *testi
MaxHardCapForMissingNodes: 10000,
CheckNodesOnDisk: false,
}
trieSyncer, _ := trie.NewDoubleListTrieSyncer(arg)
trieSyncer, _ := trie.CreateTrieSyncer(arg, version)

ctxPrint, cancel := context.WithCancel(context.Background())
go printStatistics(ctxPrint, tss)
Expand Down Expand Up @@ -280,18 +298,28 @@ func TestMultipleDataTriesSyncSmallValues(t *testing.T) {
t.Skip("this is not a short test")
}

testMultipleDataTriesSync(t, 1000, 50, 32)
t.Run("test with double lists version", func(t *testing.T) {
testMultipleDataTriesSync(t, 1000, 50, 32, 2)
})
t.Run("test with depth version", func(t *testing.T) {
testMultipleDataTriesSync(t, 1000, 50, 32, 3)
})
}

func TestMultipleDataTriesSyncLargeValues(t *testing.T) {
if testing.Short() {
t.Skip("this is not a short test")
}

testMultipleDataTriesSync(t, 3, 3, 1<<21)
t.Run("test with double lists version", func(t *testing.T) {
testMultipleDataTriesSync(t, 3, 3, 1<<21, 2)
})
t.Run("test with depth version", func(t *testing.T) {
testMultipleDataTriesSync(t, 3, 3, 1<<21, 3)
})
}

func testMultipleDataTriesSync(t *testing.T, numAccounts int, numDataTrieLeaves int, valSize int) {
func testMultipleDataTriesSync(t *testing.T, numAccounts int, numDataTrieLeaves int, valSize int, version int) {
if testing.Short() {
t.Skip("this is not a short test")
}
Expand Down Expand Up @@ -354,7 +382,7 @@ func testMultipleDataTriesSync(t *testing.T, numAccounts int, numDataTrieLeaves
Cacher: nRequester.DataPool.TrieNodes(),
MaxTrieLevelInMemory: 200,
MaxHardCapForMissingNodes: 5000,
TrieSyncerVersion: 2,
TrieSyncerVersion: version,
CheckNodesOnDisk: false,
},
ShardId: shardID,
Expand Down
2 changes: 1 addition & 1 deletion integrationTests/testProcessorNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -2847,7 +2847,7 @@ func (tpn *TestProcessorNode) initRoundHandler() {
}

func (tpn *TestProcessorNode) initRequestedItemsHandler() {
tpn.RequestedItemsHandler = timecache.NewTimeCache(roundDuration)
tpn.RequestedItemsHandler = timecache.NewTimeCache(time.Second)
}

func (tpn *TestProcessorNode) initBlockTracker() {
Expand Down
1 change: 1 addition & 0 deletions node/nodeRunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,7 @@ func (nr *nodeRunner) CreateManagedProcessComponents(
}

log.Trace("creating time cache for requested items components")
// TODO consider lowering this (perhaps to 1 second) and use a common const
requestedItemsHandler := timecache.NewTimeCache(
time.Duration(uint64(time.Millisecond) * coreComponents.GenesisNodesSetup().GetRoundDuration()))

Expand Down
11 changes: 7 additions & 4 deletions process/block/preprocess/gasComputation.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/ElrondNetwork/elrond-go/process"
)

const initialAllocation = 1000

var _ process.GasHandler = (*gasComputation)(nil)

type gasComputation struct {
Expand Down Expand Up @@ -91,18 +93,19 @@ func (gc *gasComputation) Init() {
}

// Reset method resets tx hashes with gas provided, refunded and penalized since last reset
// TODO remove this call from basePreProcess.handleProcessTransactionInit
func (gc *gasComputation) Reset(key []byte) {
gc.mutGasProvided.Lock()
gc.txHashesWithGasProvidedSinceLastReset[string(key)] = make([][]byte, 0)
gc.txHashesWithGasProvidedAsScheduledSinceLastReset[string(key)] = make([][]byte, 0)
gc.txHashesWithGasProvidedSinceLastReset[string(key)] = make([][]byte, 0, initialAllocation)
gc.txHashesWithGasProvidedAsScheduledSinceLastReset[string(key)] = make([][]byte, 0, initialAllocation)
gc.mutGasProvided.Unlock()

gc.mutGasRefunded.Lock()
gc.txHashesWithGasRefundedSinceLastReset[string(key)] = make([][]byte, 0)
gc.txHashesWithGasRefundedSinceLastReset[string(key)] = make([][]byte, 0, initialAllocation)
gc.mutGasRefunded.Unlock()

gc.mutGasPenalized.Lock()
gc.txHashesWithGasPenalizedSinceLastReset[string(key)] = make([][]byte, 0)
gc.txHashesWithGasPenalizedSinceLastReset[string(key)] = make([][]byte, 0, initialAllocation)
gc.mutGasPenalized.Unlock()
}

Expand Down
12 changes: 3 additions & 9 deletions process/interceptors/factory/interceptedTrieNodeDataFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,14 @@ package factory
import (
"github.com/ElrondNetwork/elrond-go-core/core/check"
"github.com/ElrondNetwork/elrond-go-core/hashing"
"github.com/ElrondNetwork/elrond-go-core/marshal"
"github.com/ElrondNetwork/elrond-go/process"
"github.com/ElrondNetwork/elrond-go/trie"
)

var _ process.InterceptedDataFactory = (*interceptedTrieNodeDataFactory)(nil)

type interceptedTrieNodeDataFactory struct {
marshalizer marshal.Marshalizer
hasher hashing.Hasher
hasher hashing.Hasher
}

// NewInterceptedTrieNodeDataFactory creates an instance of interceptedTrieNodeDataFactory
Expand All @@ -26,22 +24,18 @@ func NewInterceptedTrieNodeDataFactory(
if check.IfNil(argument.CoreComponents) {
return nil, process.ErrNilCoreComponentsHolder
}
if check.IfNil(argument.CoreComponents.InternalMarshalizer()) {
return nil, process.ErrNilMarshalizer
}
if check.IfNil(argument.CoreComponents.Hasher()) {
return nil, process.ErrNilHasher
}

return &interceptedTrieNodeDataFactory{
marshalizer: argument.CoreComponents.InternalMarshalizer(),
hasher: argument.CoreComponents.Hasher(),
hasher: argument.CoreComponents.Hasher(),
}, nil
}

// Create creates instances of InterceptedData by unmarshalling provided buffer
func (sidf *interceptedTrieNodeDataFactory) Create(buff []byte) (process.InterceptedData, error) {
return trie.NewInterceptedTrieNode(buff, sidf.marshalizer, sidf.hasher)
return trie.NewInterceptedTrieNode(buff, sidf.hasher)
}

// IsInterfaceNil returns true if there is no value under the interface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,6 @@ func TestNewInterceptedTrieNodeDataFactory_NilArgumentsShouldErr(t *testing.T) {
assert.Equal(t, process.ErrNilArgumentStruct, err)
}

func TestNewInterceptedTrieNodeDataFactory_NilMarshalizerShouldErr(t *testing.T) {
t.Parallel()

coreComponents, cryptoComponents := createMockComponentHolders()
coreComponents.IntMarsh = nil
arg := createMockArgument(coreComponents, cryptoComponents)

itn, err := NewInterceptedTrieNodeDataFactory(arg)
assert.Nil(t, itn)
assert.Equal(t, process.ErrNilMarshalizer, err)
}

func TestNewInterceptedTrieNodeDataFactory_NilHasherShouldErr(t *testing.T) {
t.Parallel()

Expand Down
3 changes: 3 additions & 0 deletions state/syncer/baseAccountsSyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ElrondNetwork/elrond-go-core/hashing"
"github.com/ElrondNetwork/elrond-go-core/marshal"
"github.com/ElrondNetwork/elrond-go/common"
"github.com/ElrondNetwork/elrond-go/common/statistics"
"github.com/ElrondNetwork/elrond-go/state"
"github.com/ElrondNetwork/elrond-go/storage"
"github.com/ElrondNetwork/elrond-go/trie"
Expand Down Expand Up @@ -147,6 +148,7 @@ func (b *baseAccountsSyncer) printStatistics(ssh common.SizeSyncStatisticsHandle
"peak processing speed", peakSpeed,
"average processing speed", averageSpeed,
)
log.Debug("trie sync node statistics", statistics.GetRuntimeStatistics()...)
return
case <-time.After(timeBetweenStatisticsPrints):
bytesReceivedDelta := ssh.NumBytesReceived() - lastDataReceived
Expand Down Expand Up @@ -174,6 +176,7 @@ func (b *baseAccountsSyncer) printStatistics(ssh common.SizeSyncStatisticsHandle
"iterations", ssh.NumIterations(),
"CPU time", ssh.ProcessingTime(),
"processing speed", speed)
log.Debug("trie sync node statistics", statistics.GetRuntimeStatistics()...)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion trie/branchNode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1069,7 +1069,7 @@ func TestBranchNode_loadChildren(t *testing.T) {
nodes, _ := getEncodedTrieNodesAndHashes(tr)
nodesCacher, _ := lrucache.NewCache(100)
for i := range nodes {
n, _ := NewInterceptedTrieNode(nodes[i], marsh, hasher)
n, _ := NewInterceptedTrieNode(nodes[i], hasher)
nodesCacher.Put(n.hash, n, len(n.GetSerialized()))
}

Expand Down
Loading

0 comments on commit 78a0427

Please sign in to comment.