Skip to content

Commit

Permalink
Merge pull request #1120 from zhiqiangxu/fix_MarkEstimate
Browse files Browse the repository at this point in the history
fix bug: should use Lock when mutating the flag
  • Loading branch information
temaniarpit27 authored Jan 19, 2024
2 parents 99e15b1 + d0c7045 commit b15e2dc
Show file tree
Hide file tree
Showing 16 changed files with 83 additions and 74 deletions.
6 changes: 1 addition & 5 deletions cmd/evm/testdata/3/readme.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,2 @@
<<<<<<< HEAD
These files examplify a transition where a transaction (executed on block 5) requests
=======
These files exemplify a transition where a transaction (executed on block 5) requests
>>>>>>> bed84606583893fdb698cc1b5058cc47b4dbd837
the blockhash for block `1`.
the blockhash for block `1`.
6 changes: 1 addition & 5 deletions cmd/evm/testdata/4/readme.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
<<<<<<< HEAD
These files examplify a transition where a transaction (executed on block 5) requests
=======
These files exemplify a transition where a transaction (executed on block 5) requests
>>>>>>> bed84606583893fdb698cc1b5058cc47b4dbd837
the blockhash for block `4`, but where the hash for that block is missing.
the blockhash for block `4`, but where the hash for that block is missing.
It's expected that executing these should cause `exit` with errorcode `4`.
30 changes: 12 additions & 18 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,13 +236,14 @@ type BlockChain struct {
stopping atomic.Bool // false if chain is running, true when stopped
procInterrupt atomic.Bool // interrupt signaler for block processing

engine consensus.Engine
validator Validator // Block and state validator interface
prefetcher Prefetcher
processor Processor // Block transaction processor interface
parallelProcessor Processor // Parallel block transaction processor interface
forker *ForkChoice
vmConfig vm.Config
engine consensus.Engine
validator Validator // Block and state validator interface
prefetcher Prefetcher
processor Processor // Block transaction processor interface
parallelProcessor Processor // Parallel block transaction processor interface
parallelSpeculativeProcesses int // Number of parallel speculative processes
forker *ForkChoice
vmConfig vm.Config

// Bor related changes
borReceiptsCache *lru.Cache[common.Hash, *types.Receipt] // Cache for the most recent bor receipt receipts per block
Expand Down Expand Up @@ -408,7 +409,8 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
// The first thing the node will do is reconstruct the verification data for
// the head block (ethash cache or clique voting snapshot). Might as well do
// it in advance.
bc.engine.VerifyHeader(bc, bc.CurrentHeader())
// BOR - commented out intentionally
// bc.engine.VerifyHeader(bc, bc.CurrentHeader())

// Check the current state of the block hashes and make sure that we do not have any of the bad blocks in our chain
for hash := range BadHashes {
Expand Down Expand Up @@ -481,7 +483,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
}

// NewParallelBlockChain , similar to NewBlockChain, creates a new blockchain object, but with a parallel state processor
func NewParallelBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis, overrides *ChainOverrides, engine consensus.Engine, vmConfig vm.Config, shouldPreserve func(header *types.Header) bool, txLookupLimit *uint64, checker ethereum.ChainValidator) (*BlockChain, error) {
func NewParallelBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis, overrides *ChainOverrides, engine consensus.Engine, vmConfig vm.Config, shouldPreserve func(header *types.Header) bool, txLookupLimit *uint64, checker ethereum.ChainValidator, numprocs int) (*BlockChain, error) {
bc, err := NewBlockChain(db, cacheConfig, genesis, overrides, engine, vmConfig, shouldPreserve, txLookupLimit, checker)

if err != nil {
Expand All @@ -500,6 +502,7 @@ func NewParallelBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis
}

bc.parallelProcessor = NewParallelStateProcessor(chainConfig, bc, engine)
bc.parallelSpeculativeProcesses = numprocs

return bc, nil
}
Expand Down Expand Up @@ -2139,15 +2142,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
}

statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps)
if err != nil {
return it.index, err
}

// Enable prefetching to pull in trie node paths while processing transactions
statedb.StartPrefetcher("chain")
activeState = statedb

// If we have a followup block, run that against the current state to pre-cache
// transactions and probabilistically some of the account/storage trie nodes.
var followupInterrupt atomic.Bool
Expand Down
37 changes: 14 additions & 23 deletions core/blockstm/mvhashmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,34 +121,23 @@ func (mv *MVHashMap) Write(k Key, v Version, data interface{}) {
return
})

cells.rw.RLock()
ci, ok := cells.tm.Get(v.TxnIndex)
cells.rw.RUnlock()

if ok {
cells.rw.Lock()
if ci, ok := cells.tm.Get(v.TxnIndex); !ok {
cells.tm.Put(v.TxnIndex, &WriteCell{
flag: FlagDone,
incarnation: v.Incarnation,
data: data,
})
} else {
if ci.(*WriteCell).incarnation > v.Incarnation {
panic(fmt.Errorf("existing transaction value does not have lower incarnation: %v, %v",
k, v.TxnIndex))
}

ci.(*WriteCell).flag = FlagDone
ci.(*WriteCell).incarnation = v.Incarnation
ci.(*WriteCell).data = data
} else {
cells.rw.Lock()
if ci, ok = cells.tm.Get(v.TxnIndex); !ok {
cells.tm.Put(v.TxnIndex, &WriteCell{
flag: FlagDone,
incarnation: v.Incarnation,
data: data,
})
} else {
ci.(*WriteCell).flag = FlagDone
ci.(*WriteCell).incarnation = v.Incarnation
ci.(*WriteCell).data = data
}
cells.rw.Unlock()
}
cells.rw.Unlock()
}

func (mv *MVHashMap) ReadStorage(k Key, fallBack func() any) any {
Expand All @@ -166,13 +155,13 @@ func (mv *MVHashMap) MarkEstimate(k Key, txIdx int) {
panic(fmt.Errorf("path must already exist"))
})

cells.rw.RLock()
cells.rw.Lock()
if ci, ok := cells.tm.Get(txIdx); !ok {
panic(fmt.Sprintf("should not happen - cell should be present for path. TxIdx: %v, path, %x, cells keys: %v", txIdx, k, cells.tm.Keys()))
} else {
ci.(*WriteCell).flag = FlagEstimate
}
cells.rw.RUnlock()
cells.rw.Unlock()
}

func (mv *MVHashMap) Delete(k Key, txIdx int) {
Expand Down Expand Up @@ -233,8 +222,8 @@ func (mv *MVHashMap) Read(k Key, txIdx int) (res MVReadResult) {
}

cells.rw.RLock()

fk, fv := cells.tm.Floor(txIdx - 1)
cells.rw.RUnlock()

if fk != nil && fv != nil {
c := fv.(*WriteCell)
Expand All @@ -253,6 +242,8 @@ func (mv *MVHashMap) Read(k Key, txIdx int) (res MVReadResult) {
}
}

cells.rw.RUnlock()

return
}

Expand Down
4 changes: 2 additions & 2 deletions core/parallel_state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat
backupStateDB := statedb.Copy()

profile := false
result, err := blockstm.ExecuteParallel(tasks, profile, metadata, cfg.ParallelSpeculativeProcesses, interruptCtx)
result, err := blockstm.ExecuteParallel(tasks, profile, metadata, p.bc.parallelSpeculativeProcesses, interruptCtx)

if err == nil && profile && result.Deps != nil {
_, weight := result.Deps.LongestPath(*result.Stats)
Expand Down Expand Up @@ -398,7 +398,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat
t.totalUsedGas = usedGas
}

_, err = blockstm.ExecuteParallel(tasks, false, metadata, cfg.ParallelSpeculativeProcesses, interruptCtx)
_, err = blockstm.ExecuteParallel(tasks, false, metadata, p.bc.parallelSpeculativeProcesses, interruptCtx)

break
}
Expand Down
44 changes: 40 additions & 4 deletions core/state/pruner/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const (
// Config includes all the configurations for pruning.
type Config struct {
Datadir string // The directory of the state database
Cachedir string // The directory of state clean cache
BloomSize uint64 // The Megabytes of memory allocated to bloom-filter
}

Expand Down Expand Up @@ -261,7 +262,7 @@ func (p *Pruner) Prune(root common.Hash) error {
}

if stateBloomRoot != (common.Hash{}) {
return RecoverPruning(p.config.Datadir, p.db)
return RecoverPruning(p.config.Datadir, p.db, p.config.Cachedir)
}
// If the target state root is not specified, use the HEAD-127 as the
// target. The reason for picking it is:
Expand All @@ -286,8 +287,8 @@ func (p *Pruner) Prune(root common.Hash) error {
// is the presence of root can indicate the presence of the
// entire trie.
if !rawdb.HasLegacyTrieNode(p.db, root) {
// The special case is for clique based networks(goerli
// and some other private networks), it's possible that two
// The special case is for clique based networks(goerli and
// some other private networks), it's possible that two
// consecutive blocks will have same root. In this case snapshot
// difflayer won't be created. So HEAD-127 may not paired with
// head-127 layer. Instead the paired layer is higher than the
Expand Down Expand Up @@ -324,6 +325,12 @@ func (p *Pruner) Prune(root common.Hash) error {
log.Info("Selecting user-specified state as the pruning target", "root", root)
}
}
// Before start the pruning, delete the clean trie cache first.
// It's necessary otherwise in the next restart we will hit the
// deleted state root in the "clean cache" so that the incomplete
// state is picked for usage.
deleteCleanTrieCache(p.config.Cachedir)

// All the state roots of the middle layer should be forcibly pruned,
// otherwise the dangling state will be left.
middleRoots := make(map[common.Hash]struct{})
Expand Down Expand Up @@ -368,7 +375,7 @@ func (p *Pruner) Prune(root common.Hash) error {
// pruning can be resumed. What's more if the bloom filter is constructed, the
// pruning **has to be resumed**. Otherwise a lot of dangling nodes may be left
// in the disk.
func RecoverPruning(datadir string, db ethdb.Database) error {
func RecoverPruning(datadir string, db ethdb.Database, trieCachePath string) error {
stateBloomPath, stateBloomRoot, err := findBloomFilter(datadir)
if err != nil {
return err
Expand Down Expand Up @@ -409,6 +416,12 @@ func RecoverPruning(datadir string, db ethdb.Database) error {

log.Info("Loaded state bloom filter", "path", stateBloomPath)

// Before start the pruning, delete the clean trie cache first.
// It's necessary otherwise in the next restart we will hit the
// deleted state root in the "clean cache" so that the incomplete
// state is picked for usage.
deleteCleanTrieCache(trieCachePath)

// All the state roots of the middle layers should be forcibly pruned,
// otherwise the dangling state will be left.
var (
Expand Down Expand Up @@ -451,6 +464,7 @@ func extractGenesis(db ethdb.Database, stateBloom *stateBloom) error {
if err != nil {
return err
}

accIter, err := t.NodeIterator(nil)
if err != nil {
return err
Expand All @@ -477,6 +491,7 @@ func extractGenesis(db ethdb.Database, stateBloom *stateBloom) error {
if err != nil {
return err
}

storageIter, err := storageTrie.NodeIterator(nil)
if err != nil {
return err
Expand Down Expand Up @@ -536,3 +551,24 @@ func findBloomFilter(datadir string) (string, common.Hash, error) {

return stateBloomPath, stateBloomRoot, nil
}

const warningLog = `
WARNING!
The clean trie cache is not found. Please delete it by yourself after the
pruning. Remember don't start the Geth without deleting the clean trie cache
otherwise the entire database may be damaged!
Check the command description "geth snapshot prune-state --help" for more details.
`

func deleteCleanTrieCache(path string) {
if !common.FileExist(path) {
log.Warn(warningLog)
return
}

os.RemoveAll(path)
log.Info("Deleted trie clean cache", "path", path)
}
4 changes: 0 additions & 4 deletions core/vm/interpreter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ type Config struct {
NoBaseFee bool // Forces the EIP-1559 baseFee to 0 (needed for 0 price calls)
EnablePreimageRecording bool // Enables recording of SHA3/keccak preimages
ExtraEips []int // Additional EIPS that are to be enabled

// parallel EVM configs
ParallelEnable bool
ParallelSpeculativeProcesses int
}

// ScopeContext contains the things that are per-call, such as stack and memory,
Expand Down
4 changes: 2 additions & 2 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
if err != nil {
return nil, err
}
if err := pruner.RecoverPruning(stack.ResolvePath(""), chainDb); err != nil {
if err := pruner.RecoverPruning(stack.ResolvePath(""), chainDb, ""); err != nil {
log.Error("Failed to recover state", "error", err)
}

Expand Down Expand Up @@ -236,7 +236,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
// check if Parallel EVM is enabled
// if enabled, use parallel state processor
if config.ParallelEVM.Enable {
eth.blockchain, err = core.NewParallelBlockChain(chainDb, cacheConfig, config.Genesis, &overrides, eth.engine, vmConfig, eth.shouldPreserve, &config.TxLookupLimit, checker)
eth.blockchain, err = core.NewParallelBlockChain(chainDb, cacheConfig, config.Genesis, &overrides, eth.engine, vmConfig, eth.shouldPreserve, &config.TxLookupLimit, checker, config.ParallelEVM.SpeculativeProcesses)
} else {
eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, config.Genesis, &overrides, eth.engine, vmConfig, eth.shouldPreserve, &config.TxLookupLimit, checker)
}
Expand Down
2 changes: 1 addition & 1 deletion miner/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,7 +949,7 @@ func BenchmarkBorMiningBlockSTMMetadata(b *testing.B) {
db2 := rawdb.NewMemoryDatabase()
back.genesis.MustCommit(db2)

chain, _ := core.NewParallelBlockChain(db2, nil, back.genesis, nil, engine, vm.Config{ParallelEnable: true, ParallelSpeculativeProcesses: 8}, nil, nil, nil)
chain, _ := core.NewParallelBlockChain(db2, nil, back.genesis, nil, engine, vm.Config{}, nil, nil, nil, 8)
defer chain.Stop()

// Ignore empty commit here for less noise.
Expand Down
2 changes: 1 addition & 1 deletion packaging/templates/package_scripts/control
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Source: bor
Version: 1.2.0-beta
Version: 1.2.1
Section: develop
Priority: standard
Maintainer: Polygon <[email protected]>
Expand Down
2 changes: 1 addition & 1 deletion packaging/templates/package_scripts/control.arm64
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Source: bor
Version: 1.2.0-beta
Version: 1.2.1
Section: develop
Priority: standard
Maintainer: Polygon <[email protected]>
Expand Down
2 changes: 1 addition & 1 deletion packaging/templates/package_scripts/control.profile.amd64
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Source: bor-profile
Version: 1.2.0-beta
Version: 1.2.1
Section: develop
Priority: standard
Maintainer: Polygon <[email protected]>
Expand Down
2 changes: 1 addition & 1 deletion packaging/templates/package_scripts/control.profile.arm64
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Source: bor-profile
Version: 1.2.0-beta
Version: 1.2.1
Section: develop
Priority: standard
Maintainer: Polygon <[email protected]>
Expand Down
2 changes: 1 addition & 1 deletion packaging/templates/package_scripts/control.validator
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Source: bor-profile
Version: 1.2.0-beta
Version: 1.2.1
Section: develop
Priority: standard
Maintainer: Polygon <[email protected]>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Source: bor-profile
Version: 1.2.0-beta
Version: 1.2.1
Section: develop
Priority: standard
Maintainer: Polygon <[email protected]>
Expand Down
8 changes: 4 additions & 4 deletions params/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
)

const (
VersionMajor = 1 // Major version component of the current release
VersionMinor = 2 // Minor version component of the current release
VersionPatch = 0 // Patch version component of the current release
VersionMeta = "beta" // Version metadata to append to the version string
VersionMajor = 1 // Major version component of the current release
VersionMinor = 2 // Minor version component of the current release
VersionPatch = 1 // Patch version component of the current release
VersionMeta = "" // Version metadata to append to the version string
)

var GitCommit string
Expand Down

0 comments on commit b15e2dc

Please sign in to comment.