Skip to content

Commit

Permalink
fastnode: fix some pbss saving&rewind issues; (#2830)
Browse files Browse the repository at this point in the history
  • Loading branch information
galaio authored Dec 30, 2024
1 parent e36ed32 commit 76689de
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 40 deletions.
80 changes: 41 additions & 39 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1354,49 +1354,51 @@ func (bc *BlockChain) Stop() {
}
bc.snaps.Release()
}
if bc.triedb.Scheme() == rawdb.PathScheme {
// Ensure that the in-memory trie nodes are journaled to disk properly.
if err := bc.triedb.Journal(bc.CurrentBlock().Root); err != nil {
log.Info("Failed to journal in-memory trie nodes", "err", err)
}
} else {
// Ensure the state of a recent block is also stored to disk before exiting.
// We're writing three different states to catch different restart scenarios:
// - HEAD: So we don't need to reprocess any blocks in the general case
// - HEAD-1: So we don't do large reorgs if our HEAD becomes an uncle
// - HEAD-127: So we have a hard limit on the number of blocks reexecuted
if !bc.cacheConfig.TrieDirtyDisabled {
triedb := bc.triedb
var once sync.Once
for _, offset := range []uint64{0, 1, state.TriesInMemory - 1} {
if number := bc.CurrentBlock().Number.Uint64(); number > offset {
recent := bc.GetBlockByNumber(number - offset)
log.Info("Writing cached state to disk", "block", recent.Number(), "hash", recent.Hash(), "root", recent.Root())
if err := triedb.Commit(recent.Root(), true); err != nil {
log.Error("Failed to commit recent state trie", "err", err)
} else {
rawdb.WriteSafePointBlockNumber(bc.db, recent.NumberU64())
once.Do(func() {
rawdb.WriteHeadBlockHash(bc.db.BlockStore(), recent.Hash())
})
}
}
if !bc.NoTries() {
if bc.triedb.Scheme() == rawdb.PathScheme {
// Ensure that the in-memory trie nodes are journaled to disk properly.
if err := bc.triedb.Journal(bc.CurrentBlock().Root); err != nil {
log.Info("Failed to journal in-memory trie nodes", "err", err)
}
} else {
// Ensure the state of a recent block is also stored to disk before exiting.
// We're writing three different states to catch different restart scenarios:
// - HEAD: So we don't need to reprocess any blocks in the general case
// - HEAD-1: So we don't do large reorgs if our HEAD becomes an uncle
// - HEAD-127: So we have a hard limit on the number of blocks reexecuted
if !bc.cacheConfig.TrieDirtyDisabled {
triedb := bc.triedb
var once sync.Once
for _, offset := range []uint64{0, 1, state.TriesInMemory - 1} {
if number := bc.CurrentBlock().Number.Uint64(); number > offset {
recent := bc.GetBlockByNumber(number - offset)
log.Info("Writing cached state to disk", "block", recent.Number(), "hash", recent.Hash(), "root", recent.Root())
if err := triedb.Commit(recent.Root(), true); err != nil {
log.Error("Failed to commit recent state trie", "err", err)
} else {
rawdb.WriteSafePointBlockNumber(bc.db, recent.NumberU64())
once.Do(func() {
rawdb.WriteHeadBlockHash(bc.db.BlockStore(), recent.Hash())
})
}
}

if snapBase != (common.Hash{}) {
log.Info("Writing snapshot state to disk", "root", snapBase)
if err := triedb.Commit(snapBase, true); err != nil {
log.Error("Failed to commit recent state trie", "err", err)
} else {
rawdb.WriteSafePointBlockNumber(bc.db, bc.CurrentBlock().Number.Uint64())
if snapBase != (common.Hash{}) {
log.Info("Writing snapshot state to disk", "root", snapBase)
if err := triedb.Commit(snapBase, true); err != nil {
log.Error("Failed to commit recent state trie", "err", err)
} else {
rawdb.WriteSafePointBlockNumber(bc.db, bc.CurrentBlock().Number.Uint64())
}
}
for !bc.triegc.Empty() {
triedb.Dereference(bc.triegc.PopItem())
}
if _, size, _, _ := triedb.Size(); size != 0 {
log.Error("Dangling trie nodes after full cleanup")
}
}
}
for !bc.triegc.Empty() {
triedb.Dereference(bc.triegc.PopItem())
}
if _, size, _, _ := triedb.Size(); size != 0 {
log.Error("Dangling trie nodes after full cleanup")
}
}
}
// Allow tracers to clean-up and release resources.
Expand Down
13 changes: 12 additions & 1 deletion core/blockchain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"errors"
"math/big"

"github.com/ethereum/go-ethereum/log"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core/rawdb"
Expand Down Expand Up @@ -359,7 +361,16 @@ func (bc *BlockChain) GetTd(hash common.Hash, number uint64) *big.Int {
// HasState checks if state trie is fully present in the database or not.
func (bc *BlockChain) HasState(hash common.Hash) bool {
if bc.NoTries() {
return bc.snaps != nil && bc.snaps.Snapshot(hash) != nil
if bc.snaps != nil {
return bc.snaps.Snapshot(hash) != nil
}
// snaps is nil when the blockchain creates
found, err := snapshot.PreCheckSnapshot(bc.db, hash)
if err != nil {
log.Warn("Check HasState in NoTries mode failed", "root", hash, "err", err)
return false
}
return found
}
_, err := bc.statedb.OpenTrie(hash)
return err == nil
Expand Down
18 changes: 18 additions & 0 deletions core/state/snapshot/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,24 @@ func loadAndParseJournal(db ethdb.KeyValueStore, base *diskLayer) (snapshot, jou
return current, generator, nil
}

func PreCheckSnapshot(db ethdb.KeyValueStore, root common.Hash) (bool, error) {
baseRoot := rawdb.ReadSnapshotRoot(db)
if baseRoot == (common.Hash{}) {
return false, errors.New("missing or corrupted snapshot")
}
if baseRoot == root {
return true, nil
}
var found bool
err := iterateJournal(db, func(parent common.Hash, current common.Hash, accountData map[common.Hash][]byte, storageData map[common.Hash]map[common.Hash][]byte) error {
if current == root {
found = true
}
return nil
})
return found, err
}

// loadSnapshot loads a pre-existing state snapshot backed by a key-value store.
func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *triedb.Database, root common.Hash, cache int, recovery bool, noBuild bool, withoutTrie bool) (snapshot, bool, error) {
// If snapshotting is disabled (initial sync in progress), don't do anything,
Expand Down

0 comments on commit 76689de

Please sign in to comment.