Skip to content

Commit

Permalink
erigon snapshots: clean func (#11086)
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov authored Jul 9, 2024
1 parent 3406442 commit 78bd2bc
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 44 deletions.
33 changes: 13 additions & 20 deletions erigon-lib/state/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

"github.com/RoaringBitmap/roaring/roaring64"
"github.com/c2h5oh/datasize"
"github.com/tidwall/btree"
rand2 "golang.org/x/exp/rand"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
Expand Down Expand Up @@ -409,8 +410,8 @@ func (a *Aggregator) Files() []string {
return ac.Files()
}
func (a *Aggregator) LS() {
for _, d := range a.d {
d.dirtyFiles.Walk(func(items []*filesItem) bool {
doLS := func(dirtyFiles *btree.BTreeG[*filesItem]) {
dirtyFiles.Walk(func(items []*filesItem) bool {
for _, item := range items {
if item.decompressor == nil {
continue
Expand All @@ -420,27 +421,19 @@ func (a *Aggregator) LS() {
return true
})
}

a.dirtyFilesLock.Lock()
defer a.dirtyFilesLock.Unlock()
for _, d := range a.d {
doLS(d.dirtyFiles)
doLS(d.History.dirtyFiles)
doLS(d.History.InvertedIndex.dirtyFiles)
}
for _, d := range a.iis {
d.dirtyFiles.Walk(func(items []*filesItem) bool {
for _, item := range items {
if item.decompressor == nil {
continue
}
log.Info("[agg] ", "f", item.decompressor.FileName(), "words", item.decompressor.Count())
}
return true
})
doLS(d.dirtyFiles)
}
for _, d := range a.ap {
d.dirtyFiles.Walk(func(items []*filesItem) bool {
for _, item := range items {
if item.decompressor == nil {
continue
}
log.Info("[agg] ", "f", item.decompressor.FileName(), "words", item.decompressor.Count())
}
return true
})
doLS(d.dirtyFiles)
}
}

Expand Down
2 changes: 2 additions & 0 deletions erigon-lib/state/appendable.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ func (ap *Appendable) BuildMissedAccessors(ctx context.Context, g *errgroup.Grou
}

func (ap *Appendable) openDirtyFiles() error {
fmt.Printf("[dbg] dirtyFiles.Len() %d\n", ap.dirtyFiles.Len())

var invalidFileItems []*filesItem
invalidFileItemsLock := sync.Mutex{}
ap.dirtyFiles.Walk(func(items []*filesItem) bool {
Expand Down
39 changes: 15 additions & 24 deletions turbo/app/snapshots_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,14 +468,11 @@ func doIntegrity(cliCtx *cli.Context) error {

cfg := ethconfig.NewSnapCfg(true, false, true, true)

blockSnaps, borSnaps, caplinSnaps, blockRetire, agg, err := openSnaps(ctx, cfg, dirs, chainDB, logger)
_, _, _, blockRetire, agg, clean, err := openSnaps(ctx, cfg, dirs, chainDB, logger)
if err != nil {
return err
}
defer blockSnaps.Close()
defer borSnaps.Close()
defer caplinSnaps.Close()
defer agg.Close()
defer clean()

blockReader, _ := blockRetire.IO()
for _, chk := range integrity.AllChecks {
Expand Down Expand Up @@ -643,14 +640,11 @@ func doIndicesCommand(cliCtx *cli.Context, dirs datadir.Dirs) error {

cfg := ethconfig.NewSnapCfg(true, false, true, true)
chainConfig := fromdb.ChainConfig(chainDB)
blockSnaps, borSnaps, caplinSnaps, br, agg, err := openSnaps(ctx, cfg, dirs, chainDB, logger)
_, _, caplinSnaps, br, agg, clean, err := openSnaps(ctx, cfg, dirs, chainDB, logger)
if err != nil {
return err
}
defer blockSnaps.Close()
defer borSnaps.Close()
defer caplinSnaps.Close()
defer agg.Close()
defer clean()

if err := br.BuildMissedIndicesIfNeed(ctx, "Indexing", nil, chainConfig); err != nil {
return err
Expand All @@ -676,16 +670,11 @@ func doLS(cliCtx *cli.Context, dirs datadir.Dirs) error {
chainDB := dbCfg(kv.ChainDB, dirs.Chaindata).MustOpen()
defer chainDB.Close()
cfg := ethconfig.NewSnapCfg(true, false, true, true)
blockSnaps, borSnaps, caplinSnaps, _, agg, err := openSnaps(ctx, cfg, dirs, chainDB, logger)
blockSnaps, borSnaps, caplinSnaps, _, agg, clean, err := openSnaps(ctx, cfg, dirs, chainDB, logger)
if err != nil {
return err
}
defer blockSnaps.Close()
defer borSnaps.Close()
defer caplinSnaps.Close()
defer agg.Close()

agg.Close()
defer clean()

blockSnaps.LS()
borSnaps.LS()
Expand All @@ -697,7 +686,7 @@ func doLS(cliCtx *cli.Context, dirs datadir.Dirs) error {

func openSnaps(ctx context.Context, cfg ethconfig.BlocksFreezing, dirs datadir.Dirs, chainDB kv.RwDB, logger log.Logger) (
blockSnaps *freezeblocks.RoSnapshots, borSnaps *freezeblocks.BorRoSnapshots, csn *freezeblocks.CaplinSnapshots,
br *freezeblocks.BlockRetire, agg *libstate.Aggregator, err error,
br *freezeblocks.BlockRetire, agg *libstate.Aggregator, clean func(), err error,
) {
blockSnaps = freezeblocks.NewRoSnapshots(cfg, dirs.Snap, 0, logger)
if err = blockSnaps.ReopenFolder(); err != nil {
Expand Down Expand Up @@ -750,6 +739,12 @@ func openSnaps(ctx context.Context, cfg ethconfig.BlocksFreezing, dirs datadir.D
blockSnapBuildSema := semaphore.NewWeighted(int64(dbg.BuildSnapshotAllowance))
agg.SetSnapshotBuildSema(blockSnapBuildSema)
br = freezeblocks.NewBlockRetire(estimate.CompressSnapshot.Workers(), dirs, blockReader, blockWriter, chainDB, chainConfig, nil, blockSnapBuildSema, logger)
clean = func() {
defer blockSnaps.Close()
defer borSnaps.Close()
defer csn.Close()
defer agg.Close()
}
return
}

Expand Down Expand Up @@ -872,21 +867,17 @@ func doRetireCommand(cliCtx *cli.Context, dirs datadir.Dirs) error {
defer db.Close()

cfg := ethconfig.NewSnapCfg(true, false, true, true)
blockSnaps, borSnaps, caplinSnaps, br, agg, err := openSnaps(ctx, cfg, dirs, db, logger)
blockSnaps, _, caplinSnaps, br, agg, clean, err := openSnaps(ctx, cfg, dirs, db, logger)
if err != nil {
return err
}
defer clean()

// `erigon retire` command is designed to maximize resouces utilization. But `Erigon itself` does minimize background impact (because not in rush).
agg.SetCollateAndBuildWorkers(estimate.StateV3Collate.Workers())
agg.SetMergeWorkers(estimate.AlmostAllCPUs())
agg.SetCompressWorkers(estimate.CompressSnapshot.Workers())

defer blockSnaps.Close()
defer borSnaps.Close()
defer caplinSnaps.Close()
defer agg.Close()

chainConfig := fromdb.ChainConfig(db)
if err := br.BuildMissedIndicesIfNeed(ctx, "retire", nil, chainConfig); err != nil {
return err
Expand Down

0 comments on commit 78bd2bc

Please sign in to comment.