Skip to content

Commit

Permalink
feat(core/state): async trie prefetching (#76)
Browse files Browse the repository at this point in the history
## Why this should be merged

Performs trie prefetching concurrently, required for equivalent
performance with `coreth` / `subnet-evm` implementations.

## How this works

`StateDB.StartPrefetcher()` accepts variadic options (for backwards
compatibility of function signatures). An option to specify a
`WorkerPool` is provided which, if present, is used to call
`Trie.Get{Account,Storage}()`; the pool is responsible for concurrency
but does not need to be able to wait on the work as that is handled by
this change.

## How this was tested

Unit test demonstrating hand-off of work to a `WorkerPool` as well as
API-guaranteed ordering of events.
  • Loading branch information
ARR4N authored Nov 26, 2024
1 parent 44068c8 commit 4feb960
Show file tree
Hide file tree
Showing 5 changed files with 307 additions and 17 deletions.
4 changes: 2 additions & 2 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,13 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error)
// StartPrefetcher initializes a new trie prefetcher to pull in nodes from the
// state trie concurrently while the state is mutated so that when we reach the
// commit phase, most of the needed data is already hot.
func (s *StateDB) StartPrefetcher(namespace string) {
func (s *StateDB) StartPrefetcher(namespace string, opts ...PrefetcherOption) {
if s.prefetcher != nil {
s.prefetcher.close()
s.prefetcher = nil
}
if s.snap != nil {
s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace)
s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace, opts...)
}
}

Expand Down
62 changes: 47 additions & 15 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync"

"github.com/ava-labs/libevm/common"
"github.com/ava-labs/libevm/libevm/options"
"github.com/ava-labs/libevm/log"
"github.com/ava-labs/libevm/metrics"
)
Expand Down Expand Up @@ -49,9 +50,11 @@ type triePrefetcher struct {
storageDupMeter metrics.Meter
storageSkipMeter metrics.Meter
storageWasteMeter metrics.Meter

options []PrefetcherOption
}

func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher {
func newTriePrefetcher(db Database, root common.Hash, namespace string, opts ...PrefetcherOption) *triePrefetcher {
prefix := triePrefetchMetricsPrefix + namespace
p := &triePrefetcher{
db: db,
Expand All @@ -67,6 +70,8 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre
storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil),
storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil),
storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil),

options: opts,
}
return p
}
Expand Down Expand Up @@ -99,6 +104,7 @@ func (p *triePrefetcher) close() {
}
}
}
p.releaseWorkerPools()
// Clear out all fetchers (will crash on a second call, deliberate)
p.fetchers = nil
}
Expand All @@ -122,6 +128,8 @@ func (p *triePrefetcher) copy() *triePrefetcher {
storageDupMeter: p.storageDupMeter,
storageSkipMeter: p.storageSkipMeter,
storageWasteMeter: p.storageWasteMeter,

options: p.options,
}
// If the prefetcher is already a copy, duplicate the data
if p.fetches != nil {
Expand Down Expand Up @@ -150,7 +158,7 @@ func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr comm
id := p.trieID(owner, root)
fetcher := p.fetchers[id]
if fetcher == nil {
fetcher = newSubfetcher(p.db, p.root, owner, root, addr)
fetcher = newSubfetcher(p.db, p.root, owner, root, addr, p.options...)
p.fetchers[id] = fetcher
}
fetcher.schedule(keys)
Expand Down Expand Up @@ -226,11 +234,13 @@ type subfetcher struct {
seen map[string]struct{} // Tracks the entries already loaded
dups int // Number of duplicate preload tasks
used [][]byte // Tracks the entries used in the end

pool *subfetcherPool
}

// newSubfetcher creates a goroutine to prefetch state items belonging to a
// particular root hash.
func newSubfetcher(db Database, state common.Hash, owner common.Hash, root common.Hash, addr common.Address) *subfetcher {
func newSubfetcher(db Database, state common.Hash, owner common.Hash, root common.Hash, addr common.Address, opts ...PrefetcherOption) *subfetcher {
sf := &subfetcher{
db: db,
state: state,
Expand All @@ -243,6 +253,7 @@ func newSubfetcher(db Database, state common.Hash, owner common.Hash, root commo
copy: make(chan chan Trie),
seen: make(map[string]struct{}),
}
options.As[prefetcherConfig](opts...).applyTo(sf)
go sf.loop()
return sf
}
Expand Down Expand Up @@ -294,7 +305,10 @@ func (sf *subfetcher) abort() {
// out of tasks or its underlying trie is retrieved for committing.
func (sf *subfetcher) loop() {
// No matter how the loop stops, signal anyone waiting that it's terminated
defer close(sf.term)
defer func() {
sf.pool.wait()
close(sf.term)
}()

// Start by opening the trie and stop processing if it fails
if sf.owner == (common.Hash{}) {
Expand Down Expand Up @@ -325,14 +339,14 @@ func (sf *subfetcher) loop() {
sf.lock.Unlock()

// Prefetch any tasks until the loop is interrupted
for i, task := range tasks {
for _, task := range tasks {
select {
case <-sf.stop:
// If termination is requested, add any leftover back and return
sf.lock.Lock()
sf.tasks = append(sf.tasks, tasks[i:]...)
sf.lock.Unlock()
return
//libevm:start
//
// The <-sf.stop case has been removed, in keeping with the equivalent change below. Future geth
// versions also remove it so our modification here can be undone when merging upstream.
//
//libevm:end

case ch := <-sf.copy:
// Somebody wants a copy of the current trie, grant them
Expand All @@ -344,9 +358,9 @@ func (sf *subfetcher) loop() {
sf.dups++
} else {
if len(task) == common.AddressLength {
sf.trie.GetAccount(common.BytesToAddress(task))
sf.pool.GetAccount(common.BytesToAddress(task))
} else {
sf.trie.GetStorage(sf.addr, task)
sf.pool.GetStorage(sf.addr, task)
}
sf.seen[string(task)] = struct{}{}
}
Expand All @@ -358,8 +372,26 @@ func (sf *subfetcher) loop() {
ch <- sf.db.CopyTrie(sf.trie)

case <-sf.stop:
// Termination is requested, abort and leave remaining tasks
return
//libevm:start
//
// This is copied, with alteration, from ethereum/go-ethereum#29519
// and can be deleted once we update to include that change.

// Termination is requested, abort if no more tasks are pending. If
// there are some, exhaust them first.
sf.lock.Lock()
done := len(sf.tasks) == 0
sf.lock.Unlock()

if done {
return
}

select {
case sf.wake <- struct{}{}:
default:
}
//libevm:end
}
}
}
126 changes: 126 additions & 0 deletions core/state/trie_prefetcher.libevm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright 2024 the libevm authors.
//
// The libevm additions to go-ethereum are free software: you can redistribute
// them and/or modify them under the terms of the GNU Lesser General Public License
// as published by the Free Software Foundation, either version 3 of the License,
// or (at your option) any later version.
//
// The libevm additions are distributed in the hope that they will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see
// <http://www.gnu.org/licenses/>.

package state

import (
"github.com/ava-labs/libevm/common"
"github.com/ava-labs/libevm/libevm/options"
"github.com/ava-labs/libevm/libevm/sync"
"github.com/ava-labs/libevm/log"
)

// A PrefetcherOption configures behaviour of trie prefetching.
type PrefetcherOption = options.Option[prefetcherConfig]

type prefetcherConfig struct {
newWorkers func() WorkerPool
}

// A WorkerPool executes functions asynchronously. Done() is called to signal
// that the pool is no longer needed and that Execute() is guaranteed to not be
// called again.
type WorkerPool interface {
Execute(func())
Done()
}

// WithWorkerPools configures trie prefetching to execute asynchronously. The
// provided constructor is called once for each trie being fetched but it MAY
// return the same pool.
func WithWorkerPools(ctor func() WorkerPool) PrefetcherOption {
return options.Func[prefetcherConfig](func(c *prefetcherConfig) {
c.newWorkers = ctor
})
}

type subfetcherPool struct {
workers WorkerPool
tries sync.Pool[Trie]
wg sync.WaitGroup
}

// applyTo configures the [subfetcher] to use a [WorkerPool] if one was provided
// with a [PrefetcherOption].
func (c *prefetcherConfig) applyTo(sf *subfetcher) {
sf.pool = &subfetcherPool{
tries: sync.Pool[Trie]{
// Although the workers may be shared between all subfetchers, each
// MUST have its own Trie pool.
New: func() Trie {
return sf.db.CopyTrie(sf.trie)
},
},
}
if c.newWorkers != nil {
sf.pool.workers = c.newWorkers()
}
}

// releaseWorkerPools calls Done() on all [WorkerPool]s. This MUST only be
// called after [subfetcher.abort] returns on ALL fetchers as a pool is allowed
// to be shared between them. This is because we guarantee in the public API
// that no further calls will be made to Execute() after a call to Done().
func (p *triePrefetcher) releaseWorkerPools() {
for _, f := range p.fetchers {
if w := f.pool.workers; w != nil {
w.Done()
}
}
}

func (p *subfetcherPool) wait() {
p.wg.Wait()
}

// execute runs the provided function with a copy of the subfetcher's Trie.
// Copies are stored in a [sync.Pool] to reduce creation overhead. If p was
// configured with a [WorkerPool] then it is used for function execution,
// otherwise `fn` is just called directly.
func (p *subfetcherPool) execute(fn func(Trie)) {
p.wg.Add(1)
do := func() {
t := p.tries.Get()
fn(t)
p.tries.Put(t)
p.wg.Done()
}

if w := p.workers; w != nil {
w.Execute(do)
} else {
do()
}
}

// GetAccount optimistically pre-fetches an account, dropping the returned value
// and logging errors. See [subfetcherPool.execute] re worker pools.
func (p *subfetcherPool) GetAccount(addr common.Address) {
p.execute(func(t Trie) {
if _, err := t.GetAccount(addr); err != nil {
log.Error("account prefetching failed", "address", addr, "err", err)
}
})
}

// GetStorage is the storage equivalent of [subfetcherPool.GetAccount].
func (p *subfetcherPool) GetStorage(addr common.Address, key []byte) {
p.execute(func(t Trie) {
if _, err := t.GetStorage(addr, key); err != nil {
log.Error("storage prefetching failed", "address", addr, "key", key, "err", err)
}
})
}
80 changes: 80 additions & 0 deletions core/state/trie_prefetcher.libevm_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2024 the libevm authors.
//
// The libevm additions to go-ethereum are free software: you can redistribute
// them and/or modify them under the terms of the GNU Lesser General Public License
// as published by the Free Software Foundation, either version 3 of the License,
// or (at your option) any later version.
//
// The libevm additions are distributed in the hope that they will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see
// <http://www.gnu.org/licenses/>.

package state

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/ava-labs/libevm/common"
)

type synchronisingWorkerPool struct {
t *testing.T
executed, unblock chan struct{}
done bool
preconditionsToStopPrefetcher int
}

var _ WorkerPool = (*synchronisingWorkerPool)(nil)

func (p *synchronisingWorkerPool) Execute(fn func()) {
fn()
select {
case <-p.executed:
default:
close(p.executed)
}

<-p.unblock
assert.False(p.t, p.done, "Done() called before Execute() returns")
p.preconditionsToStopPrefetcher++
}

func (p *synchronisingWorkerPool) Done() {
p.done = true
p.preconditionsToStopPrefetcher++
}

func TestStopPrefetcherWaitsOnWorkers(t *testing.T) {
pool := &synchronisingWorkerPool{
t: t,
executed: make(chan struct{}),
unblock: make(chan struct{}),
}
opt := WithWorkerPools(func() WorkerPool { return pool })

db := filledStateDB()
db.prefetcher = newTriePrefetcher(db.db, db.originalRoot, "", opt)
db.prefetcher.prefetch(common.Hash{}, common.Hash{}, common.Address{}, [][]byte{{}})

go func() {
<-pool.executed
// Sleep otherwise there is a small chance that we close pool.unblock
// between db.StopPrefetcher() returning and the assertion.
time.Sleep(time.Second)
close(pool.unblock)
}()

<-pool.executed
db.StopPrefetcher()
// If this errors then either Execute() hadn't returned or Done() wasn't
// called.
assert.Equalf(t, 2, pool.preconditionsToStopPrefetcher, "%T.StopPrefetcher() returned early", db)
}
Loading

0 comments on commit 4feb960

Please sign in to comment.