diff --git a/core/state/statedb.go b/core/state/statedb.go index 3b706002e765..d641fb3b0425 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -175,13 +175,13 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) // StartPrefetcher initializes a new trie prefetcher to pull in nodes from the // state trie concurrently while the state is mutated so that when we reach the // commit phase, most of the needed data is already hot. -func (s *StateDB) StartPrefetcher(namespace string) { +func (s *StateDB) StartPrefetcher(namespace string, opts ...PrefetcherOption) { if s.prefetcher != nil { s.prefetcher.close() s.prefetcher = nil } if s.snap != nil { - s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace) + s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace, opts...) } } diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index 45fac913dd0f..275f20b94b69 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -20,6 +20,7 @@ import ( "sync" "github.com/ava-labs/libevm/common" + "github.com/ava-labs/libevm/libevm/options" "github.com/ava-labs/libevm/log" "github.com/ava-labs/libevm/metrics" ) @@ -49,9 +50,11 @@ type triePrefetcher struct { storageDupMeter metrics.Meter storageSkipMeter metrics.Meter storageWasteMeter metrics.Meter + + options []PrefetcherOption } -func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher { +func newTriePrefetcher(db Database, root common.Hash, namespace string, opts ...PrefetcherOption) *triePrefetcher { prefix := triePrefetchMetricsPrefix + namespace p := &triePrefetcher{ db: db, @@ -67,6 +70,8 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil), storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil), storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil), + + options: opts, } return p } @@ -99,6 +104,7 @@ func (p *triePrefetcher) close() { } } } + p.releaseWorkerPools() // Clear out all fetchers (will crash on a second call, deliberate) p.fetchers = nil } @@ -122,6 +128,8 @@ func (p *triePrefetcher) copy() *triePrefetcher { storageDupMeter: p.storageDupMeter, storageSkipMeter: p.storageSkipMeter, storageWasteMeter: p.storageWasteMeter, + + options: p.options, } // If the prefetcher is already a copy, duplicate the data if p.fetches != nil { @@ -150,7 +158,7 @@ func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr comm id := p.trieID(owner, root) fetcher := p.fetchers[id] if fetcher == nil { - fetcher = newSubfetcher(p.db, p.root, owner, root, addr) + fetcher = newSubfetcher(p.db, p.root, owner, root, addr, p.options...) p.fetchers[id] = fetcher } fetcher.schedule(keys) @@ -226,11 +234,13 @@ type subfetcher struct { seen map[string]struct{} // Tracks the entries already loaded dups int // Number of duplicate preload tasks used [][]byte // Tracks the entries used in the end + + pool *subfetcherPool } // newSubfetcher creates a goroutine to prefetch state items belonging to a // particular root hash. -func newSubfetcher(db Database, state common.Hash, owner common.Hash, root common.Hash, addr common.Address) *subfetcher { +func newSubfetcher(db Database, state common.Hash, owner common.Hash, root common.Hash, addr common.Address, opts ...PrefetcherOption) *subfetcher { sf := &subfetcher{ db: db, state: state, @@ -243,6 +253,7 @@ func newSubfetcher(db Database, state common.Hash, owner common.Hash, root commo copy: make(chan chan Trie), seen: make(map[string]struct{}), } + options.As[prefetcherConfig](opts...).applyTo(sf) go sf.loop() return sf } @@ -294,7 +305,10 @@ func (sf *subfetcher) abort() { // out of tasks or its underlying trie is retrieved for committing. func (sf *subfetcher) loop() { // No matter how the loop stops, signal anyone waiting that it's terminated - defer close(sf.term) + defer func() { + sf.pool.wait() + close(sf.term) + }() // Start by opening the trie and stop processing if it fails if sf.owner == (common.Hash{}) { @@ -325,14 +339,14 @@ func (sf *subfetcher) loop() { sf.lock.Unlock() // Prefetch any tasks until the loop is interrupted - for i, task := range tasks { + for _, task := range tasks { select { - case <-sf.stop: - // If termination is requested, add any leftover back and return - sf.lock.Lock() - sf.tasks = append(sf.tasks, tasks[i:]...) - sf.lock.Unlock() - return + //libevm:start + // + // The <-sf.stop case has been removed, in keeping with the equivalent change below. Future geth + // versions also remove it so our modification here can be undone when merging upstream. + // + //libevm:end case ch := <-sf.copy: // Somebody wants a copy of the current trie, grant them @@ -344,9 +358,9 @@ func (sf *subfetcher) loop() { sf.dups++ } else { if len(task) == common.AddressLength { - sf.trie.GetAccount(common.BytesToAddress(task)) + sf.pool.GetAccount(common.BytesToAddress(task)) } else { - sf.trie.GetStorage(sf.addr, task) + sf.pool.GetStorage(sf.addr, task) } sf.seen[string(task)] = struct{}{} } @@ -358,8 +372,26 @@ func (sf *subfetcher) loop() { ch <- sf.db.CopyTrie(sf.trie) case <-sf.stop: - // Termination is requested, abort and leave remaining tasks - return + //libevm:start + // + // This is copied, with alteration, from ethereum/go-ethereum#29519 + // and can be deleted once we update to include that change. + + // Termination is requested, abort if no more tasks are pending. If + // there are some, exhaust them first. + sf.lock.Lock() + done := len(sf.tasks) == 0 + sf.lock.Unlock() + + if done { + return + } + + select { + case sf.wake <- struct{}{}: + default: + } + //libevm:end } } } diff --git a/core/state/trie_prefetcher.libevm.go b/core/state/trie_prefetcher.libevm.go new file mode 100644 index 000000000000..abee575be31a --- /dev/null +++ b/core/state/trie_prefetcher.libevm.go @@ -0,0 +1,126 @@ +// Copyright 2024 the libevm authors. +// +// The libevm additions to go-ethereum are free software: you can redistribute +// them and/or modify them under the terms of the GNU Lesser General Public License +// as published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The libevm additions are distributed in the hope that they will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see +// . + +package state + +import ( + "github.com/ava-labs/libevm/common" + "github.com/ava-labs/libevm/libevm/options" + "github.com/ava-labs/libevm/libevm/sync" + "github.com/ava-labs/libevm/log" +) + +// A PrefetcherOption configures behaviour of trie prefetching. +type PrefetcherOption = options.Option[prefetcherConfig] + +type prefetcherConfig struct { + newWorkers func() WorkerPool +} + +// A WorkerPool executes functions asynchronously. Done() is called to signal +// that the pool is no longer needed and that Execute() is guaranteed to not be +// called again. +type WorkerPool interface { + Execute(func()) + Done() +} + +// WithWorkerPools configures trie prefetching to execute asynchronously. The +// provided constructor is called once for each trie being fetched but it MAY +// return the same pool. +func WithWorkerPools(ctor func() WorkerPool) PrefetcherOption { + return options.Func[prefetcherConfig](func(c *prefetcherConfig) { + c.newWorkers = ctor + }) +} + +type subfetcherPool struct { + workers WorkerPool + tries sync.Pool[Trie] + wg sync.WaitGroup +} + +// applyTo configures the [subfetcher] to use a [WorkerPool] if one was provided +// with a [PrefetcherOption]. +func (c *prefetcherConfig) applyTo(sf *subfetcher) { + sf.pool = &subfetcherPool{ + tries: sync.Pool[Trie]{ + // Although the workers may be shared between all subfetchers, each + // MUST have its own Trie pool. + New: func() Trie { + return sf.db.CopyTrie(sf.trie) + }, + }, + } + if c.newWorkers != nil { + sf.pool.workers = c.newWorkers() + } +} + +// releaseWorkerPools calls Done() on all [WorkerPool]s. This MUST only be +// called after [subfetcher.abort] returns on ALL fetchers as a pool is allowed +// to be shared between them. This is because we guarantee in the public API +// that no further calls will be made to Execute() after a call to Done(). +func (p *triePrefetcher) releaseWorkerPools() { + for _, f := range p.fetchers { + if w := f.pool.workers; w != nil { + w.Done() + } + } +} + +func (p *subfetcherPool) wait() { + p.wg.Wait() +} + +// execute runs the provided function with a copy of the subfetcher's Trie. +// Copies are stored in a [sync.Pool] to reduce creation overhead. If p was +// configured with a [WorkerPool] then it is used for function execution, +// otherwise `fn` is just called directly. +func (p *subfetcherPool) execute(fn func(Trie)) { + p.wg.Add(1) + do := func() { + t := p.tries.Get() + fn(t) + p.tries.Put(t) + p.wg.Done() + } + + if w := p.workers; w != nil { + w.Execute(do) + } else { + do() + } +} + +// GetAccount optimistically pre-fetches an account, dropping the returned value +// and logging errors. See [subfetcherPool.execute] re worker pools. +func (p *subfetcherPool) GetAccount(addr common.Address) { + p.execute(func(t Trie) { + if _, err := t.GetAccount(addr); err != nil { + log.Error("account prefetching failed", "address", addr, "err", err) + } + }) +} + +// GetStorage is the storage equivalent of [subfetcherPool.GetAccount]. +func (p *subfetcherPool) GetStorage(addr common.Address, key []byte) { + p.execute(func(t Trie) { + if _, err := t.GetStorage(addr, key); err != nil { + log.Error("storage prefetching failed", "address", addr, "key", key, "err", err) + } + }) +} diff --git a/core/state/trie_prefetcher.libevm_test.go b/core/state/trie_prefetcher.libevm_test.go new file mode 100644 index 000000000000..884bfba56770 --- /dev/null +++ b/core/state/trie_prefetcher.libevm_test.go @@ -0,0 +1,80 @@ +// Copyright 2024 the libevm authors. +// +// The libevm additions to go-ethereum are free software: you can redistribute +// them and/or modify them under the terms of the GNU Lesser General Public License +// as published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The libevm additions are distributed in the hope that they will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see +// . + +package state + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/ava-labs/libevm/common" +) + +type synchronisingWorkerPool struct { + t *testing.T + executed, unblock chan struct{} + done bool + preconditionsToStopPrefetcher int +} + +var _ WorkerPool = (*synchronisingWorkerPool)(nil) + +func (p *synchronisingWorkerPool) Execute(fn func()) { + fn() + select { + case <-p.executed: + default: + close(p.executed) + } + + <-p.unblock + assert.False(p.t, p.done, "Done() called before Execute() returns") + p.preconditionsToStopPrefetcher++ +} + +func (p *synchronisingWorkerPool) Done() { + p.done = true + p.preconditionsToStopPrefetcher++ +} + +func TestStopPrefetcherWaitsOnWorkers(t *testing.T) { + pool := &synchronisingWorkerPool{ + t: t, + executed: make(chan struct{}), + unblock: make(chan struct{}), + } + opt := WithWorkerPools(func() WorkerPool { return pool }) + + db := filledStateDB() + db.prefetcher = newTriePrefetcher(db.db, db.originalRoot, "", opt) + db.prefetcher.prefetch(common.Hash{}, common.Hash{}, common.Address{}, [][]byte{{}}) + + go func() { + <-pool.executed + // Sleep otherwise there is a small chance that we close pool.unblock + // between db.StopPrefetcher() returning and the assertion. + time.Sleep(time.Second) + close(pool.unblock) + }() + + <-pool.executed + db.StopPrefetcher() + // If this errors then either Execute() hadn't returned or Done() wasn't + // called. + assert.Equalf(t, 2, pool.preconditionsToStopPrefetcher, "%T.StopPrefetcher() returned early", db) +} diff --git a/libevm/sync/sync.go b/libevm/sync/sync.go new file mode 100644 index 000000000000..991a3a875ee7 --- /dev/null +++ b/libevm/sync/sync.go @@ -0,0 +1,52 @@ +// Copyright 2024 the libevm authors. +// +// The libevm additions to go-ethereum are free software: you can redistribute +// them and/or modify them under the terms of the GNU Lesser General Public License +// as published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The libevm additions are distributed in the hope that they will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see +// . + +// Package sync extends the standard library's sync package. +package sync + +import "sync" + +// Aliases of stdlib sync's types to avoid having to import it alongside this +// package. +type ( + Cond = sync.Cond + Locker = sync.Locker + Map = sync.Map + Mutex = sync.Mutex + Once = sync.Once + RWMutex = sync.RWMutex + WaitGroup = sync.WaitGroup +) + +// A Pool is a type-safe wrapper around [sync.Pool]. +type Pool[T any] struct { + New func() T + pool sync.Pool + once Once +} + +// Get is equivalent to [sync.Pool.Get]. +func (p *Pool[T]) Get() T { + p.once.Do(func() { // Do() guarantees at least once, not just only once + p.pool.New = func() any { return p.New() } + }) + return p.pool.Get().(T) //nolint:forcetypeassert +} + +// Put is equivalent to [sync.Pool.Put]. +func (p *Pool[T]) Put(t T) { + p.pool.Put(t) +}