From 1b967b09f211207ac657d45f2489d4808baf5162 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Mon, 2 Dec 2024 21:51:50 +0700 Subject: [PATCH] agg: tables list method (#12939) --- cmd/integration/commands/flags.go | 3 +- cmd/integration/commands/reset_state.go | 2 +- cmd/integration/commands/stages.go | 18 ++---------- core/rawdb/rawdbreset/reset_stages.go | 39 +++++-------------------- core/test/domains_restart_test.go | 4 +-- erigon-lib/state/aggregator.go | 13 +++++++++ erigon-lib/state/domain.go | 4 +++ erigon-lib/state/history.go | 4 +++ erigon-lib/state/inverted_index.go | 2 ++ 9 files changed, 37 insertions(+), 52 deletions(-) diff --git a/cmd/integration/commands/flags.go b/cmd/integration/commands/flags.go index 615281461af..a61fcaab2d3 100644 --- a/cmd/integration/commands/flags.go +++ b/cmd/integration/commands/flags.go @@ -32,7 +32,7 @@ var ( block, pruneTo, unwind uint64 unwindEvery uint64 batchSizeStr string - reset, warmup, noCommit bool + reset, noCommit bool resetPruneAt bool bucket string datadirCli, toChaindata string @@ -116,7 +116,6 @@ func withUnwindEvery(cmd *cobra.Command) { func withReset(cmd *cobra.Command) { cmd.Flags().BoolVar(&reset, "reset", false, "reset given stage") - cmd.Flags().BoolVar(&warmup, "warmup", false, "warmup relevant tables by parallel random reads") } func withResetPruneAt(cmd *cobra.Command) { diff --git a/cmd/integration/commands/reset_state.go b/cmd/integration/commands/reset_state.go index 4a4a61a5d65..f5b7785ba07 100644 --- a/cmd/integration/commands/reset_state.go +++ b/cmd/integration/commands/reset_state.go @@ -67,7 +67,7 @@ var cmdResetState = &cobra.Command{ return } - if err = reset2.ResetState(db, ctx, chain, "", log.Root()); err != nil { + if err = reset2.ResetState(db, agg, ctx, chain, "", log.Root()); err != nil { if !errors.Is(err, context.Canceled) { logger.Error(err.Error()) } diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index d94f33d7f24..a9326f52d25 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -646,7 +646,6 @@ func stageSnapshots(db kv.RwDB, ctx context.Context, logger log.Logger) error { br, bw := blocksIO(db, logger) _, _, _, _, _ = newSync(ctx, db, nil /* miningConfig */, logger) - chainConfig, _ := fromdb.ChainConfig(db), fromdb.PruneMode(db) return db.Update(ctx, func(tx kv.RwTx) error { if reset { @@ -655,7 +654,7 @@ func stageSnapshots(db kv.RwDB, ctx context.Context, logger log.Logger) error { } } dirs := datadir.New(datadirCli) - if err := reset2.ResetBlocks(tx, db, agg, br, bw, dirs, *chainConfig, logger); err != nil { + if err := reset2.ResetBlocks(tx, db, agg, br, bw, dirs, logger); err != nil { return fmt.Errorf("resetting blocks: %w", err) } ac := agg.BeginFilesRo() @@ -701,7 +700,6 @@ func stageHeaders(db kv.RwDB, ctx context.Context, logger log.Logger) error { defer agg.Close() br, bw := blocksIO(db, logger) _, _, _, _, _ = newSync(ctx, db, nil /* miningConfig */, logger) - chainConfig, _ := fromdb.ChainConfig(db), fromdb.PruneMode(db) if integritySlow { if err := db.View(ctx, func(tx kv.Tx) error { @@ -721,7 +719,7 @@ func stageHeaders(db kv.RwDB, ctx context.Context, logger log.Logger) error { return db.Update(ctx, func(tx kv.RwTx) error { if reset { - if err := reset2.ResetBlocks(tx, db, agg, br, bw, dirs, *chainConfig, logger); err != nil { + if err := reset2.ResetBlocks(tx, db, agg, br, bw, dirs, logger); err != nil { return err } return nil @@ -1031,11 +1029,8 @@ func stageExec(db kv.RwDB, ctx context.Context, logger log.Logger) error { defer sn.Close() defer borSn.Close() defer agg.Close() - if warmup { - return reset2.WarmupExec(ctx, db) - } if reset { - if err := reset2.ResetExec(ctx, db, chain, "", logger); err != nil { + if err := reset2.ResetExec(ctx, db, agg, chain, "", logger); err != nil { return err } return nil @@ -1177,10 +1172,6 @@ func stageCustomTrace(db kv.RwDB, ctx context.Context, logger log.Logger) error defer sn.Close() defer borSn.Close() defer agg.Close() - if warmup { - panic("not implemented") - //return reset2.WarmupExec(ctx, db) - } if reset { if err := reset2.Reset(ctx, db, stages.CustomTrace); err != nil { return err @@ -1218,9 +1209,6 @@ func stagePatriciaTrie(db kv.RwDB, ctx context.Context, logger log.Logger) error defer agg.Close() _, _, _, _, _ = newSync(ctx, db, nil /* miningConfig */, logger) - if warmup { - return reset2.Warmup(ctx, db, log.LvlInfo, stages.Execution) - } if reset { return reset2.Reset(ctx, db, stages.Execution) } diff --git a/core/rawdb/rawdbreset/reset_stages.go b/core/rawdb/rawdbreset/reset_stages.go index dcc4886027b..c7265c1a639 100644 --- a/core/rawdb/rawdbreset/reset_stages.go +++ b/core/rawdb/rawdbreset/reset_stages.go @@ -33,7 +33,7 @@ import ( "github.com/erigontech/erigon/turbo/services" ) -func ResetState(db kv.RwDB, ctx context.Context, chain string, tmpDir string, logger log.Logger) error { +func ResetState(db kv.RwDB, agg *state.Aggregator, ctx context.Context, chain string, tmpDir string, logger log.Logger) error { // don't reset senders here if err := db.Update(ctx, ResetTxLookup); err != nil { return err @@ -45,13 +45,13 @@ func ResetState(db kv.RwDB, ctx context.Context, chain string, tmpDir string, lo return err } - if err := ResetExec(ctx, db, chain, tmpDir, logger); err != nil { + if err := ResetExec(ctx, db, agg, chain, tmpDir, logger); err != nil { return err } return nil } -func ResetBlocks(tx kv.RwTx, db kv.RoDB, agg *state.Aggregator, br services.FullBlockReader, bw *blockio.BlockWriter, dirs datadir.Dirs, cc chain.Config, logger log.Logger) error { +func ResetBlocks(tx kv.RwTx, db kv.RoDB, agg *state.Aggregator, br services.FullBlockReader, bw *blockio.BlockWriter, dirs datadir.Dirs, logger log.Logger) error { // keep Genesis if err := rawdb.TruncateBlocks(context.Background(), tx, 1); err != nil { return err @@ -144,7 +144,7 @@ func ResetPolygonSync(tx kv.RwTx, db kv.RoDB, agg *state.Aggregator, br services } } - if err := ResetBlocks(tx, db, agg, br, bw, dirs, cc, logger); err != nil { + if err := ResetBlocks(tx, db, agg, br, bw, dirs, logger); err != nil { return err } @@ -158,22 +158,12 @@ func ResetSenders(ctx context.Context, db kv.RwDB, tx kv.RwTx) error { return clearStageProgress(tx, stages.Senders) } -func WarmupExec(ctx context.Context, db kv.RwDB) (err error) { - for _, tbl := range stateBuckets { - backup.WarmupTable(ctx, db, tbl, log.LvlInfo, backup.ReadAheadThreads) - } - for _, tbl := range stateHistoryV3Buckets { - backup.WarmupTable(ctx, db, tbl, log.LvlInfo, backup.ReadAheadThreads) - } - return -} - -func ResetExec(ctx context.Context, db kv.RwDB, chain string, tmpDir string, logger log.Logger) (err error) { +func ResetExec(ctx context.Context, db kv.RwDB, agg *state.Aggregator, chain string, tmpDir string, logger log.Logger) (err error) { cleanupList := make([]string, 0) cleanupList = append(cleanupList, stateBuckets...) cleanupList = append(cleanupList, stateHistoryBuckets...) - cleanupList = append(cleanupList, stateHistoryV3Buckets...) - cleanupList = append(cleanupList, stateV3Buckets...) + cleanupList = append(cleanupList, agg.DomainTables(kv.AccountsDomain, kv.StorageDomain, kv.CodeDomain, kv.CommitmentDomain, kv.ReceiptDomain)...) + cleanupList = append(cleanupList, agg.InvertedIndexTables(kv.LogAddrIdxPos, kv.LogTopicIdxPos, kv.TracesFromIdxPos, kv.TracesToIdxPos)...) return db.Update(ctx, func(tx kv.RwTx) error { if err := clearStageProgress(tx, stages.Execution); err != nil { @@ -212,21 +202,6 @@ var stateBuckets = []string{ kv.PlainContractCode, kv.ContractCode, kv.IncarnationMap, } var stateHistoryBuckets = []string{ - kv.Receipts, -} -var stateHistoryV3Buckets = []string{ - kv.TblAccountHistoryKeys, kv.TblAccountHistoryVals, kv.TblAccountIdx, - kv.TblStorageHistoryKeys, kv.TblStorageHistoryVals, kv.TblStorageIdx, - kv.TblCodeHistoryKeys, kv.TblCodeHistoryVals, kv.TblCodeIdx, - kv.TblLogAddressKeys, kv.TblLogAddressIdx, - kv.TblLogTopicsKeys, kv.TblLogTopicsIdx, - kv.TblTracesFromKeys, kv.TblTracesFromIdx, - kv.TblTracesToKeys, kv.TblTracesToIdx, -} -var stateV3Buckets = []string{ - kv.TblAccountVals, kv.TblStorageVals, kv.TblCodeVals, kv.TblCommitmentVals, kv.TblReceiptVals, - kv.TblCommitmentHistoryKeys, kv.TblCommitmentHistoryVals, kv.TblCommitmentIdx, - kv.TblReceiptHistoryKeys, kv.TblReceiptHistoryVals, kv.TblReceiptIdx, kv.TblPruningProgress, kv.ChangeSets3, } diff --git a/core/test/domains_restart_test.go b/core/test/domains_restart_test.go index 5fd7ea02572..952906fbb6b 100644 --- a/core/test/domains_restart_test.go +++ b/core/test/domains_restart_test.go @@ -241,7 +241,7 @@ func Test_AggregatorV3_RestartOnDatadir_WithoutDB(t *testing.T) { domCtx.Close() domains.Close() - err = reset2.ResetExec(ctx, db, networkname.Test, "", log.New()) + err = reset2.ResetExec(ctx, db, agg, networkname.Test, "", log.New()) require.NoError(t, err) // ======== reset domains end ======== @@ -411,7 +411,7 @@ func Test_AggregatorV3_RestartOnDatadir_WithoutAnything(t *testing.T) { domCtx.Close() domains.Close() - err = reset2.ResetExec(ctx, db, networkname.Test, "", log.New()) + err = reset2.ResetExec(ctx, db, agg, networkname.Test, "", log.New()) require.NoError(t, err) // ======== reset domains end ======== diff --git a/erigon-lib/state/aggregator.go b/erigon-lib/state/aggregator.go index 482ebbbbe45..28dca3ee8e6 100644 --- a/erigon-lib/state/aggregator.go +++ b/erigon-lib/state/aggregator.go @@ -857,6 +857,19 @@ func (a *Aggregator) integrateDirtyFiles(sf AggV3StaticFiles, txNumFrom, txNumTo } } +func (a *Aggregator) DomainTables(domains ...kv.Domain) (tables []string) { + for _, domain := range domains { + tables = append(tables, a.d[domain].Tables()...) + } + return tables +} +func (a *Aggregator) InvertedIndexTables(indices ...kv.InvertedIdxPos) (tables []string) { + for _, idx := range indices { + tables = append(tables, a.iis[idx].Tables()...) + } + return tables +} + type flusher interface { Flush(ctx context.Context, tx kv.RwTx) error } diff --git a/erigon-lib/state/domain.go b/erigon-lib/state/domain.go index 51b45eb1256..2d134eb89f9 100644 --- a/erigon-lib/state/domain.go +++ b/erigon-lib/state/domain.go @@ -450,6 +450,10 @@ func (d *Domain) reCalcVisibleFiles(toTxNum uint64) { d.History.reCalcVisibleFiles(toTxNum) } +func (d *Domain) Tables() []string { + return append([]string{d.keysTable, d.valuesTable}, d.History.Tables()...) +} + func (d *Domain) Close() { if d == nil { return diff --git a/erigon-lib/state/history.go b/erigon-lib/state/history.go index 2ab7c30d3ba..78ddeb764c2 100644 --- a/erigon-lib/state/history.go +++ b/erigon-lib/state/history.go @@ -284,6 +284,10 @@ func (h *History) closeWhatNotInList(fNames []string) { } } +func (h *History) Tables() []string { + return append([]string{h.keysTable, h.valuesTable}, h.InvertedIndex.Tables()...) +} + func (h *History) Close() { if h == nil { return diff --git a/erigon-lib/state/inverted_index.go b/erigon-lib/state/inverted_index.go index 7ce941d7510..83a96072511 100644 --- a/erigon-lib/state/inverted_index.go +++ b/erigon-lib/state/inverted_index.go @@ -332,6 +332,8 @@ func (ii *InvertedIndex) closeWhatNotInList(fNames []string) { } } +func (ii *InvertedIndex) Tables() []string { return []string{ii.keysTable, ii.valuesTable} } + func (ii *InvertedIndex) Close() { if ii == nil { return