Skip to content

Commit

Permalink
agg: tables list method (#12939)
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov authored Dec 2, 2024
1 parent 0b31e6d commit 1b967b0
Show file tree
Hide file tree
Showing 9 changed files with 37 additions and 52 deletions.
3 changes: 1 addition & 2 deletions cmd/integration/commands/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var (
block, pruneTo, unwind uint64
unwindEvery uint64
batchSizeStr string
reset, warmup, noCommit bool
reset, noCommit bool
resetPruneAt bool
bucket string
datadirCli, toChaindata string
Expand Down Expand Up @@ -116,7 +116,6 @@ func withUnwindEvery(cmd *cobra.Command) {

func withReset(cmd *cobra.Command) {
cmd.Flags().BoolVar(&reset, "reset", false, "reset given stage")
cmd.Flags().BoolVar(&warmup, "warmup", false, "warmup relevant tables by parallel random reads")
}

func withResetPruneAt(cmd *cobra.Command) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/integration/commands/reset_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ var cmdResetState = &cobra.Command{
return
}

if err = reset2.ResetState(db, ctx, chain, "", log.Root()); err != nil {
if err = reset2.ResetState(db, agg, ctx, chain, "", log.Root()); err != nil {
if !errors.Is(err, context.Canceled) {
logger.Error(err.Error())
}
Expand Down
18 changes: 3 additions & 15 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,6 @@ func stageSnapshots(db kv.RwDB, ctx context.Context, logger log.Logger) error {

br, bw := blocksIO(db, logger)
_, _, _, _, _ = newSync(ctx, db, nil /* miningConfig */, logger)
chainConfig, _ := fromdb.ChainConfig(db), fromdb.PruneMode(db)

return db.Update(ctx, func(tx kv.RwTx) error {
if reset {
Expand All @@ -655,7 +654,7 @@ func stageSnapshots(db kv.RwDB, ctx context.Context, logger log.Logger) error {
}
}
dirs := datadir.New(datadirCli)
if err := reset2.ResetBlocks(tx, db, agg, br, bw, dirs, *chainConfig, logger); err != nil {
if err := reset2.ResetBlocks(tx, db, agg, br, bw, dirs, logger); err != nil {
return fmt.Errorf("resetting blocks: %w", err)
}
ac := agg.BeginFilesRo()
Expand Down Expand Up @@ -701,7 +700,6 @@ func stageHeaders(db kv.RwDB, ctx context.Context, logger log.Logger) error {
defer agg.Close()
br, bw := blocksIO(db, logger)
_, _, _, _, _ = newSync(ctx, db, nil /* miningConfig */, logger)
chainConfig, _ := fromdb.ChainConfig(db), fromdb.PruneMode(db)

if integritySlow {
if err := db.View(ctx, func(tx kv.Tx) error {
Expand All @@ -721,7 +719,7 @@ func stageHeaders(db kv.RwDB, ctx context.Context, logger log.Logger) error {

return db.Update(ctx, func(tx kv.RwTx) error {
if reset {
if err := reset2.ResetBlocks(tx, db, agg, br, bw, dirs, *chainConfig, logger); err != nil {
if err := reset2.ResetBlocks(tx, db, agg, br, bw, dirs, logger); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -1031,11 +1029,8 @@ func stageExec(db kv.RwDB, ctx context.Context, logger log.Logger) error {
defer sn.Close()
defer borSn.Close()
defer agg.Close()
if warmup {
return reset2.WarmupExec(ctx, db)
}
if reset {
if err := reset2.ResetExec(ctx, db, chain, "", logger); err != nil {
if err := reset2.ResetExec(ctx, db, agg, chain, "", logger); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -1177,10 +1172,6 @@ func stageCustomTrace(db kv.RwDB, ctx context.Context, logger log.Logger) error
defer sn.Close()
defer borSn.Close()
defer agg.Close()
if warmup {
panic("not implemented")
//return reset2.WarmupExec(ctx, db)
}
if reset {
if err := reset2.Reset(ctx, db, stages.CustomTrace); err != nil {
return err
Expand Down Expand Up @@ -1218,9 +1209,6 @@ func stagePatriciaTrie(db kv.RwDB, ctx context.Context, logger log.Logger) error
defer agg.Close()
_, _, _, _, _ = newSync(ctx, db, nil /* miningConfig */, logger)

if warmup {
return reset2.Warmup(ctx, db, log.LvlInfo, stages.Execution)
}
if reset {
return reset2.Reset(ctx, db, stages.Execution)
}
Expand Down
39 changes: 7 additions & 32 deletions core/rawdb/rawdbreset/reset_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"github.com/erigontech/erigon/turbo/services"
)

func ResetState(db kv.RwDB, ctx context.Context, chain string, tmpDir string, logger log.Logger) error {
func ResetState(db kv.RwDB, agg *state.Aggregator, ctx context.Context, chain string, tmpDir string, logger log.Logger) error {
// don't reset senders here
if err := db.Update(ctx, ResetTxLookup); err != nil {
return err
Expand All @@ -45,13 +45,13 @@ func ResetState(db kv.RwDB, ctx context.Context, chain string, tmpDir string, lo
return err
}

if err := ResetExec(ctx, db, chain, tmpDir, logger); err != nil {
if err := ResetExec(ctx, db, agg, chain, tmpDir, logger); err != nil {
return err
}
return nil
}

func ResetBlocks(tx kv.RwTx, db kv.RoDB, agg *state.Aggregator, br services.FullBlockReader, bw *blockio.BlockWriter, dirs datadir.Dirs, cc chain.Config, logger log.Logger) error {
func ResetBlocks(tx kv.RwTx, db kv.RoDB, agg *state.Aggregator, br services.FullBlockReader, bw *blockio.BlockWriter, dirs datadir.Dirs, logger log.Logger) error {
// keep Genesis
if err := rawdb.TruncateBlocks(context.Background(), tx, 1); err != nil {
return err
Expand Down Expand Up @@ -144,7 +144,7 @@ func ResetPolygonSync(tx kv.RwTx, db kv.RoDB, agg *state.Aggregator, br services
}
}

if err := ResetBlocks(tx, db, agg, br, bw, dirs, cc, logger); err != nil {
if err := ResetBlocks(tx, db, agg, br, bw, dirs, logger); err != nil {
return err
}

Expand All @@ -158,22 +158,12 @@ func ResetSenders(ctx context.Context, db kv.RwDB, tx kv.RwTx) error {
return clearStageProgress(tx, stages.Senders)
}

func WarmupExec(ctx context.Context, db kv.RwDB) (err error) {
for _, tbl := range stateBuckets {
backup.WarmupTable(ctx, db, tbl, log.LvlInfo, backup.ReadAheadThreads)
}
for _, tbl := range stateHistoryV3Buckets {
backup.WarmupTable(ctx, db, tbl, log.LvlInfo, backup.ReadAheadThreads)
}
return
}

func ResetExec(ctx context.Context, db kv.RwDB, chain string, tmpDir string, logger log.Logger) (err error) {
func ResetExec(ctx context.Context, db kv.RwDB, agg *state.Aggregator, chain string, tmpDir string, logger log.Logger) (err error) {
cleanupList := make([]string, 0)
cleanupList = append(cleanupList, stateBuckets...)
cleanupList = append(cleanupList, stateHistoryBuckets...)
cleanupList = append(cleanupList, stateHistoryV3Buckets...)
cleanupList = append(cleanupList, stateV3Buckets...)
cleanupList = append(cleanupList, agg.DomainTables(kv.AccountsDomain, kv.StorageDomain, kv.CodeDomain, kv.CommitmentDomain, kv.ReceiptDomain)...)
cleanupList = append(cleanupList, agg.InvertedIndexTables(kv.LogAddrIdxPos, kv.LogTopicIdxPos, kv.TracesFromIdxPos, kv.TracesToIdxPos)...)

return db.Update(ctx, func(tx kv.RwTx) error {
if err := clearStageProgress(tx, stages.Execution); err != nil {
Expand Down Expand Up @@ -212,21 +202,6 @@ var stateBuckets = []string{
kv.PlainContractCode, kv.ContractCode, kv.IncarnationMap,
}
var stateHistoryBuckets = []string{
kv.Receipts,
}
var stateHistoryV3Buckets = []string{
kv.TblAccountHistoryKeys, kv.TblAccountHistoryVals, kv.TblAccountIdx,
kv.TblStorageHistoryKeys, kv.TblStorageHistoryVals, kv.TblStorageIdx,
kv.TblCodeHistoryKeys, kv.TblCodeHistoryVals, kv.TblCodeIdx,
kv.TblLogAddressKeys, kv.TblLogAddressIdx,
kv.TblLogTopicsKeys, kv.TblLogTopicsIdx,
kv.TblTracesFromKeys, kv.TblTracesFromIdx,
kv.TblTracesToKeys, kv.TblTracesToIdx,
}
var stateV3Buckets = []string{
kv.TblAccountVals, kv.TblStorageVals, kv.TblCodeVals, kv.TblCommitmentVals, kv.TblReceiptVals,
kv.TblCommitmentHistoryKeys, kv.TblCommitmentHistoryVals, kv.TblCommitmentIdx,
kv.TblReceiptHistoryKeys, kv.TblReceiptHistoryVals, kv.TblReceiptIdx,
kv.TblPruningProgress,
kv.ChangeSets3,
}
Expand Down
4 changes: 2 additions & 2 deletions core/test/domains_restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func Test_AggregatorV3_RestartOnDatadir_WithoutDB(t *testing.T) {
domCtx.Close()
domains.Close()

err = reset2.ResetExec(ctx, db, networkname.Test, "", log.New())
err = reset2.ResetExec(ctx, db, agg, networkname.Test, "", log.New())
require.NoError(t, err)
// ======== reset domains end ========

Expand Down Expand Up @@ -411,7 +411,7 @@ func Test_AggregatorV3_RestartOnDatadir_WithoutAnything(t *testing.T) {
domCtx.Close()
domains.Close()

err = reset2.ResetExec(ctx, db, networkname.Test, "", log.New())
err = reset2.ResetExec(ctx, db, agg, networkname.Test, "", log.New())
require.NoError(t, err)
// ======== reset domains end ========

Expand Down
13 changes: 13 additions & 0 deletions erigon-lib/state/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,19 @@ func (a *Aggregator) integrateDirtyFiles(sf AggV3StaticFiles, txNumFrom, txNumTo
}
}

func (a *Aggregator) DomainTables(domains ...kv.Domain) (tables []string) {
for _, domain := range domains {
tables = append(tables, a.d[domain].Tables()...)
}
return tables
}
func (a *Aggregator) InvertedIndexTables(indices ...kv.InvertedIdxPos) (tables []string) {
for _, idx := range indices {
tables = append(tables, a.iis[idx].Tables()...)
}
return tables
}

type flusher interface {
Flush(ctx context.Context, tx kv.RwTx) error
}
Expand Down
4 changes: 4 additions & 0 deletions erigon-lib/state/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,10 @@ func (d *Domain) reCalcVisibleFiles(toTxNum uint64) {
d.History.reCalcVisibleFiles(toTxNum)
}

func (d *Domain) Tables() []string {
return append([]string{d.keysTable, d.valuesTable}, d.History.Tables()...)
}

func (d *Domain) Close() {
if d == nil {
return
Expand Down
4 changes: 4 additions & 0 deletions erigon-lib/state/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,10 @@ func (h *History) closeWhatNotInList(fNames []string) {
}
}

func (h *History) Tables() []string {
return append([]string{h.keysTable, h.valuesTable}, h.InvertedIndex.Tables()...)
}

func (h *History) Close() {
if h == nil {
return
Expand Down
2 changes: 2 additions & 0 deletions erigon-lib/state/inverted_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ func (ii *InvertedIndex) closeWhatNotInList(fNames []string) {
}
}

func (ii *InvertedIndex) Tables() []string { return []string{ii.keysTable, ii.valuesTable} }

func (ii *InvertedIndex) Close() {
if ii == nil {
return
Expand Down

0 comments on commit 1b967b0

Please sign in to comment.