Skip to content

Commit

Permalink
Merge branch 'main' into bloom_rm
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov committed Dec 2, 2024
2 parents c05057e + 39d6c4a commit 60e963e
Show file tree
Hide file tree
Showing 5 changed files with 4 additions and 82 deletions.
60 changes: 4 additions & 56 deletions erigon-lib/state/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,8 @@ type Aggregator struct {

// To keep DB small - need move data to small files ASAP.
// It means goroutine which creating small files - can't be locked by merge or indexing.
buildingFiles atomic.Bool
mergingFiles atomic.Bool
buildingOptionalIndices atomic.Bool
buildingFiles atomic.Bool
mergingFiles atomic.Bool

//warmupWorking atomic.Bool
ctx context.Context
Expand Down Expand Up @@ -485,54 +484,6 @@ func (a *Aggregator) LS() {
}
}

func (a *Aggregator) BuildOptionalMissedIndicesInBackground(ctx context.Context, workers int) {
if ok := a.buildingOptionalIndices.CompareAndSwap(false, true); !ok {
return
}
a.wg.Add(1)
go func() {
defer a.wg.Done()
defer a.buildingOptionalIndices.Store(false)
aggTx := a.BeginFilesRo()
defer aggTx.Close()
if err := aggTx.buildOptionalMissedIndices(ctx, workers); err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, common2.ErrStopped) {
return
}
a.logger.Warn("[snapshots] BuildOptionalMissedIndicesInBackground", "err", err)
}
}()
}

func (a *Aggregator) BuildOptionalMissedIndices(ctx context.Context, workers int) error {
if ok := a.buildingOptionalIndices.CompareAndSwap(false, true); !ok {
return nil
}
defer a.buildingOptionalIndices.Store(false)
filesTx := a.BeginFilesRo()
defer filesTx.Close()
if err := filesTx.buildOptionalMissedIndices(ctx, workers); err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, common2.ErrStopped) {
return nil
}
return err
}
return nil
}

func (ac *AggregatorRoTx) buildOptionalMissedIndices(ctx context.Context, workers int) error {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(workers)
ps := background.NewProgressSet()
for _, d := range ac.d {
d := d
if d != nil {
g.Go(func() error { return d.BuildOptionalMissedIndices(ctx, ps) })
}
}
return g.Wait()
}

func (a *Aggregator) BuildMissedIndices(ctx context.Context, workers int) error {
startIndexingTime := time.Now()
{
Expand Down Expand Up @@ -778,7 +729,7 @@ func (a *Aggregator) buildFiles(ctx context.Context, step uint64) error {

func (a *Aggregator) BuildFiles(toTxNum uint64) (err error) {
finished := a.BuildFilesInBackground(toTxNum)
if !(a.buildingFiles.Load() || a.mergingFiles.Load() || a.buildingOptionalIndices.Load()) {
if !(a.buildingFiles.Load() || a.mergingFiles.Load()) {
return nil
}

Expand All @@ -793,7 +744,7 @@ Loop:
fmt.Println("BuildFiles finished")
break Loop
case <-logEvery.C:
if !(a.buildingFiles.Load() || a.mergingFiles.Load() || a.buildingOptionalIndices.Load()) {
if !(a.buildingFiles.Load() || a.mergingFiles.Load()) {
break Loop
}
if a.HasBackgroundFilesBuild() {
Expand Down Expand Up @@ -1735,7 +1686,6 @@ func (a *Aggregator) BuildFilesInBackground(txNum uint64) chan struct{} {
break
}
}
a.BuildOptionalMissedIndicesInBackground(a.ctx, 1)

if dbg.NoMerge() {
close(fin)
Expand All @@ -1759,8 +1709,6 @@ func (a *Aggregator) BuildFilesInBackground(txNum uint64) chan struct{} {
}
a.logger.Warn("[snapshots] merge", "err", err)
}

a.BuildOptionalMissedIndicesInBackground(a.ctx, 1)
}()
}()
return fin
Expand Down
5 changes: 0 additions & 5 deletions erigon-lib/state/history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -946,11 +946,6 @@ func collateAndMergeHistory(tb testing.TB, db kv.RwDB, h *History, txs uint64, d
}
}

hc := h.BeginFilesRo()
defer hc.Close()
err = hc.iit.BuildOptionalMissedIndices(ctx, background.NewProgressSet())
require.NoError(err)

err = tx.Commit()
require.NoError(err)
}
Expand Down
11 changes: 0 additions & 11 deletions erigon-lib/state/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,17 +236,6 @@ func (r HistoryRanges) any() bool {
return r.history.needMerge || r.index.needMerge
}

func (dt *DomainRoTx) BuildOptionalMissedIndices(ctx context.Context, ps *background.ProgressSet) (err error) {
if err := dt.ht.iit.BuildOptionalMissedIndices(ctx, ps); err != nil {
return err
}
return nil
}

func (iit *InvertedIndexRoTx) BuildOptionalMissedIndices(ctx context.Context, ps *background.ProgressSet) (err error) {
return nil
}

// staticFilesInRange returns list of static files with txNum in specified range [startTxNum; endTxNum)
// files are in the descending order of endTxNum
func (dt *DomainRoTx) staticFilesInRange(r DomainRanges) (valuesFiles, indexFiles, historyFiles []*filesItem) {
Expand Down
4 changes: 0 additions & 4 deletions eth/stagedsync/stage_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,6 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R
}

indexWorkers := estimate.IndexSnapshot.Workers()
if err := cfg.agg.BuildOptionalMissedIndices(ctx, indexWorkers); err != nil {
return err
}

diagnostics.Send(diagnostics.CurrentSyncSubStage{SubStage: "E3 Indexing"})
if err := cfg.agg.BuildMissedIndices(ctx, indexWorkers); err != nil {
return err
Expand Down
6 changes: 0 additions & 6 deletions turbo/app/snapshots_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -1355,9 +1355,6 @@ func doRetireCommand(cliCtx *cli.Context, dirs datadir.Dirs) error {

logger.Info("Work on state history snapshots")
indexWorkers := estimate.IndexSnapshot.Workers()
if err = agg.BuildOptionalMissedIndices(ctx, indexWorkers); err != nil {
return err
}
if err = agg.BuildMissedIndices(ctx, indexWorkers); err != nil {
return err
}
Expand Down Expand Up @@ -1413,9 +1410,6 @@ func doRetireCommand(cliCtx *cli.Context, dirs datadir.Dirs) error {
if err = agg.MergeLoop(ctx); err != nil {
return err
}
if err = agg.BuildOptionalMissedIndices(ctx, indexWorkers); err != nil {
return err
}
if err = agg.BuildMissedIndices(ctx, indexWorkers); err != nil {
return err
}
Expand Down

0 comments on commit 60e963e

Please sign in to comment.