From 2af372dd9ea0767171473145bbd66dae01338fd6 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Wed, 20 Nov 2024 14:20:43 +0700 Subject: [PATCH] release sync mode type. no functional changes. (#12794) --- .../historical_states_reader.go | 1 - cmd/integration/commands/stages.go | 5 ++- eth/backend.go | 10 ++--- eth/stagedsync/exec3.go | 15 +++---- eth/stagedsync/stage.go | 3 +- eth/stagedsync/stagedsynctest/harness.go | 2 + eth/stagedsync/stages/sync_mode.go | 42 +++++++++++++++++++ eth/stagedsync/sync.go | 4 +- eth/stagedsync/sync_test.go | 22 +++++----- turbo/stages/mock/mock_sentry.go | 8 ++-- turbo/stages/stageloop.go | 1 + 11 files changed, 79 insertions(+), 34 deletions(-) create mode 100644 eth/stagedsync/stages/sync_mode.go diff --git a/cl/persistence/state/historical_states_reader/historical_states_reader.go b/cl/persistence/state/historical_states_reader/historical_states_reader.go index 6bc33e3b005..919c35faae2 100644 --- a/cl/persistence/state/historical_states_reader/historical_states_reader.go +++ b/cl/persistence/state/historical_states_reader/historical_states_reader.go @@ -442,7 +442,6 @@ func (r *HistoricalStatesReader) reconstructDiffedUint64List(tx kv.Tx, kvGetter return nil, err } forward := remainder <= midpoint || currentStageProgress <= freshDumpSlot+clparams.SlotsPerDump - fmt.Println("forward", forward) if forward { compressed, err = kvGetter(dumpBucket, base_encoding.Encode64ToBytes4(freshDumpSlot)) if err != nil { diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index 005acab9c4f..95bc42ad302 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -1456,9 +1456,9 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig, bridgeStore = bridge.NewSnapshotStore(bridge.NewDbStore(db), borSn, chainConfig.Bor) heimdallStore = heimdall.NewSnapshotStore(heimdall.NewDbStore(db), borSn) } - stages := stages2.NewDefaultStages(context.Background(), db, snapDb, p2p.Config{}, &cfg, sentryControlServer, notifications, nil, blockReader, blockRetire, agg, nil, nil, + stageList := stages2.NewDefaultStages(context.Background(), db, snapDb, p2p.Config{}, &cfg, sentryControlServer, notifications, nil, blockReader, blockRetire, agg, nil, nil, heimdallClient, heimdallStore, bridgeStore, recents, signatures, logger) - sync := stagedsync.New(cfg.Sync, stages, stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger) + sync := stagedsync.New(cfg.Sync, stageList, stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger, stages.ModeApplyingBlocks) miner := stagedsync.NewMiningState(&cfg.Miner) miningCancel := make(chan struct{}) @@ -1498,6 +1498,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig, stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, logger, + stages.ModeBlockProduction, ) return engine, vmConfig, sync, miningSync, miner diff --git a/eth/backend.go b/eth/backend.go index 89e8244cff8..57233f8e5a5 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -723,7 +723,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger stagedsync.StageMiningExecCfg(backend.chainDB, miner, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir, nil, 0, backend.txPool, backend.txPoolDB, blockReader), stagedsync.StageMiningFinishCfg(backend.chainDB, *backend.chainConfig, backend.engine, miner, backend.miningSealingQuit, backend.blockReader, latestBlockBuiltStore), ), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, - logger) + logger, stages.ModeBlockProduction) var ethashApi *ethash.API if casted, ok := backend.engine.(*ethash.Ethash); ok { @@ -774,7 +774,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger ), stagedsync.StageSendersCfg(backend.chainDB, chainConfig, config.Sync, false, dirs.Tmp, config.Prune, blockReader, backend.sentriesClient.Hd), stagedsync.StageMiningExecCfg(backend.chainDB, miningStatePos, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir, interrupt, param.PayloadId, backend.txPool, backend.txPoolDB, blockReader), - stagedsync.StageMiningFinishCfg(backend.chainDB, *backend.chainConfig, backend.engine, miningStatePos, backend.miningSealingQuit, backend.blockReader, latestBlockBuiltStore)), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, logger) + stagedsync.StageMiningFinishCfg(backend.chainDB, *backend.chainConfig, backend.engine, miningStatePos, backend.miningSealingQuit, backend.blockReader, latestBlockBuiltStore)), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, logger, stages.ModeBlockProduction) // We start the mining step if err := stages2.MiningStep(ctx, backend.chainDB, proposingSync, tmpdir, logger); err != nil { return nil, err @@ -930,7 +930,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger backend.syncPruneOrder = stagedsync.DefaultPruneOrder } - backend.stagedSync = stagedsync.New(config.Sync, backend.syncStages, backend.syncUnwindOrder, backend.syncPruneOrder, logger) + backend.stagedSync = stagedsync.New(config.Sync, backend.syncStages, backend.syncUnwindOrder, backend.syncPruneOrder, logger, stages.ModeApplyingBlocks) hook := stages2.NewHook(backend.sentryCtx, backend.chainDB, backend.notifications, backend.stagedSync, backend.blockReader, backend.chainConfig, backend.logger, backend.sentriesClient.SetStatus) @@ -958,7 +958,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger checkStateRoot := true pipelineStages := stages2.NewPipelineStages(ctx, backend.chainDB, config, p2pConfig, backend.sentriesClient, backend.notifications, backend.downloaderClient, blockReader, blockRetire, backend.agg, backend.silkworm, backend.forkValidator, logger, checkStateRoot) - backend.pipelineStagedSync = stagedsync.New(config.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger) + backend.pipelineStagedSync = stagedsync.New(config.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger, stages.ModeApplyingBlocks) backend.eth1ExecutionServer = eth1.NewEthereumExecutionModule(blockReader, backend.chainDB, backend.pipelineStagedSync, backend.forkValidator, chainConfig, assembleBlockPOS, hook, backend.notifications.Accumulator, backend.notifications.StateChangesConsumer, logger, backend.engine, config.Sync, ctx) executionRpc := direct.NewExecutionClientDirect(backend.eth1ExecutionServer) @@ -1017,7 +1017,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger backend.sentryCtx, stagedsync.StageSnapshotsCfg( backend.chainDB, *backend.sentriesClient.ChainConfig, config.Sync, dirs, blockRetire, backend.downloaderClient, blockReader, backend.notifications, backend.agg, false, false, false, backend.silkworm, config.Prune, - )), nil, nil, backend.logger) + )), nil, nil, backend.logger, stages.ModeApplyingBlocks) // these range extractors set the db to the local db instead of the chain db // TODO this needs be refactored away by having a retire/merge component per diff --git a/eth/stagedsync/exec3.go b/eth/stagedsync/exec3.go index 798af97fa42..5be4d62c25d 100644 --- a/eth/stagedsync/exec3.go +++ b/eth/stagedsync/exec3.go @@ -144,10 +144,7 @@ func ExecV3(ctx context.Context, // TODO: e35 doesn't support parallel-exec yet parallel = false //nolint - batchSize := cfg.batchSize - chainDb := cfg.db blockReader := cfg.blockReader - engine := cfg.engine chainConfig := cfg.chainConfig totalGasUsed := uint64(0) start := time.Now() @@ -162,7 +159,7 @@ func ExecV3(ctx context.Context, if !useExternalTx { if !parallel { var err error - applyTx, err = chainDb.BeginRw(ctx) //nolint + applyTx, err = cfg.db.BeginRw(ctx) //nolint if err != nil { return err } @@ -279,7 +276,7 @@ func ExecV3(ctx context.Context, } } else { var _nothing bool - if err := chainDb.View(ctx, func(tx kv.Tx) (err error) { + if err := cfg.db.View(ctx, func(tx kv.Tx) (err error) { if _nothing, err = nothingToExec(applyTx); err != nil { return err } else if _nothing { @@ -347,7 +344,7 @@ func ExecV3(ctx context.Context, applyWorker.ResetState(rs, accumulator) defer applyWorker.LogLRUStats() - commitThreshold := batchSize.Bytes() + commitThreshold := cfg.batchSize.Bytes() progress := NewProgress(blockNum, commitThreshold, workerCount, false, execStage.LogPrefix(), logger) logEvery := time.NewTicker(20 * time.Second) defer logEvery.Stop() @@ -364,7 +361,7 @@ func ExecV3(ctx context.Context, if parallel { pe := ¶llelExecutor{ execStage: execStage, - chainDb: chainDb, + chainDb: cfg.db, applyWorker: applyWorker, applyTx: applyTx, outputTxNum: &outputTxNum, @@ -468,7 +465,7 @@ Loop: inputBlockNum.Store(blockNum) executor.domains().SetBlockNum(blockNum) - b, err = blockWithSenders(ctx, chainDb, executor.tx(), blockReader, blockNum) + b, err = blockWithSenders(ctx, cfg.db, executor.tx(), blockReader, blockNum) if err != nil { return err } @@ -490,7 +487,7 @@ Loop: return f(n) } totalGasUsed += b.GasUsed() - blockContext := core.NewEVMBlockContext(header, getHashFn, engine, cfg.author /* author */, chainConfig) + blockContext := core.NewEVMBlockContext(header, getHashFn, cfg.engine, cfg.author /* author */, chainConfig) // print type of engine if parallel { if err := executor.status(ctx, commitThreshold); err != nil { diff --git a/eth/stagedsync/stage.go b/eth/stagedsync/stage.go index f9a4b27dbe2..a67f9b812b3 100644 --- a/eth/stagedsync/stage.go +++ b/eth/stagedsync/stage.go @@ -70,7 +70,8 @@ type StageState struct { CurrentSyncCycle CurrentSyncCycleInfo } -func (s *StageState) LogPrefix() string { return s.state.LogPrefix() } +func (s *StageState) LogPrefix() string { return s.state.LogPrefix() } +func (s *StageState) SyncMode() stages.Mode { return s.state.mode } // Update updates the stage state (current block number) in the database. Can be called multiple times during stage execution. func (s *StageState) Update(db kv.Putter, newBlockNum uint64) error { diff --git a/eth/stagedsync/stagedsynctest/harness.go b/eth/stagedsync/stagedsynctest/harness.go index d0e362b3bf1..e0715ec5bb3 100644 --- a/eth/stagedsync/stagedsynctest/harness.go +++ b/eth/stagedsync/stagedsynctest/harness.go @@ -93,6 +93,7 @@ func InitHarness(ctx context.Context, t *testing.T, cfg HarnessCfg) Harness { stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger, + stages.ModeApplyingBlocks, ) miningSyncStages := stagedsync.MiningStages( ctx, @@ -109,6 +110,7 @@ func InitHarness(ctx context.Context, t *testing.T, cfg HarnessCfg) Harness { stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, logger, + stages.ModeBlockProduction, ) validatorKey, err := crypto.GenerateKey() require.NoError(t, err) diff --git a/eth/stagedsync/stages/sync_mode.go b/eth/stagedsync/stages/sync_mode.go new file mode 100644 index 00000000000..f20c9ce7e67 --- /dev/null +++ b/eth/stagedsync/stages/sync_mode.go @@ -0,0 +1,42 @@ +package stages + +import ( + "fmt" +) + +type Mode int8 // in which staged sync can run + +const ( + ModeUnknown Mode = iota + ModeBlockProduction + ModeForkValidation + ModeApplyingBlocks +) + +func (m Mode) String() string { + switch m { + case ModeBlockProduction: + return "ModeBlockProduction" + case ModeForkValidation: + return "ModeForkValidation" + case ModeApplyingBlocks: + return "ModeApplyingBlocks" + default: + return "UnknownMode" + } +} + +func ModeFromString(s string) Mode { //nolint + switch s { + case "ModeBlockProduction": + return ModeBlockProduction + case "ModeForkValidation": + return ModeForkValidation + case "ModeApplyingBlocks": + return ModeApplyingBlocks + case "UnknownMode": + return ModeUnknown + default: + panic(fmt.Sprintf("unexpected mode string: %s", s)) + } +} diff --git a/eth/stagedsync/sync.go b/eth/stagedsync/sync.go index 6b82fe8ddcc..f566c826cef 100644 --- a/eth/stagedsync/sync.go +++ b/eth/stagedsync/sync.go @@ -49,6 +49,7 @@ type Sync struct { logPrefixes []string logger log.Logger stagesIdsList []string + mode stages.Mode } type Timing struct { @@ -207,7 +208,7 @@ func (s *Sync) SetCurrentStage(id stages.SyncStage) error { return fmt.Errorf("stage not found with id: %v", id) } -func New(cfg ethconfig.Sync, stagesList []*Stage, unwindOrder UnwindOrder, pruneOrder PruneOrder, logger log.Logger) *Sync { +func New(cfg ethconfig.Sync, stagesList []*Stage, unwindOrder UnwindOrder, pruneOrder PruneOrder, logger log.Logger, mode stages.Mode) *Sync { unwindStages := make([]*Stage, len(stagesList)) for i, stageIndex := range unwindOrder { for _, s := range stagesList { @@ -243,6 +244,7 @@ func New(cfg ethconfig.Sync, stagesList []*Stage, unwindOrder UnwindOrder, prune logPrefixes: logPrefixes, logger: logger, stagesIdsList: stagesIdsList, + mode: mode, } } diff --git a/eth/stagedsync/sync_test.go b/eth/stagedsync/sync_test.go index ed9c8db253a..8a7932cfe13 100644 --- a/eth/stagedsync/sync_test.go +++ b/eth/stagedsync/sync_test.go @@ -59,7 +59,7 @@ func TestStagesSuccess(t *testing.T) { }, }, } - state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New()) + state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New(), stages.ModeApplyingBlocks) db, tx := memdb.NewTestTx(t) _, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.NoError(t, err) @@ -99,7 +99,7 @@ func TestDisabledStages(t *testing.T) { }, }, } - state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New()) + state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New(), stages.ModeApplyingBlocks) db, tx := memdb.NewTestTx(t) _, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.NoError(t, err) @@ -139,7 +139,7 @@ func TestErroredStage(t *testing.T) { }, }, } - state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New()) + state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New(), stages.ModeApplyingBlocks) db, tx := memdb.NewTestTx(t) _, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.Equal(t, fmt.Errorf("[2/3 Bodies] %w", expectedErr), err) @@ -207,7 +207,7 @@ func TestUnwindSomeStagesBehindUnwindPoint(t *testing.T) { }, }, } - state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New()) + state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New(), stages.ModeApplyingBlocks) db, tx := memdb.NewTestTx(t) _, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.NoError(t, err) @@ -285,7 +285,7 @@ func TestUnwind(t *testing.T) { }, }, } - state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New()) + state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New(), stages.ModeApplyingBlocks) db, tx := memdb.NewTestTx(t) _, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.NoError(t, err) @@ -374,7 +374,7 @@ func TestUnwindEmptyUnwinder(t *testing.T) { }, }, } - state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New()) + state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New(), stages.ModeApplyingBlocks) db, tx := memdb.NewTestTx(t) _, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.NoError(t, err) @@ -430,12 +430,12 @@ func TestSyncDoTwice(t *testing.T) { }, } - state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New()) + state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New(), stages.ModeApplyingBlocks) db, tx := memdb.NewTestTx(t) _, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.NoError(t, err) - state = New(ethconfig.Defaults.Sync, s, nil, nil, log.New()) + state = New(ethconfig.Defaults.Sync, s, nil, nil, log.New(), stages.ModeApplyingBlocks) _, err = state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.NoError(t, err) @@ -488,14 +488,14 @@ func TestStateSyncInterruptRestart(t *testing.T) { }, } - state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New()) + state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New(), stages.ModeApplyingBlocks) db, tx := memdb.NewTestTx(t) _, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.Equal(t, fmt.Errorf("[2/3 Bodies] %w", expectedErr), err) expectedErr = nil - state = New(ethconfig.Defaults.Sync, s, nil, nil, log.New()) + state = New(ethconfig.Defaults.Sync, s, nil, nil, log.New(), stages.ModeApplyingBlocks) _, err = state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.NoError(t, err) @@ -567,7 +567,7 @@ func TestSyncInterruptLongUnwind(t *testing.T) { }, }, } - state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New()) + state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New(), stages.ModeApplyingBlocks) db, tx := memdb.NewTestTx(t) _, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.Error(t, errInterrupted, err) diff --git a/turbo/stages/mock/mock_sentry.go b/turbo/stages/mock/mock_sentry.go index fa91fb61114..6968ac99600 100644 --- a/turbo/stages/mock/mock_sentry.go +++ b/turbo/stages/mock/mock_sentry.go @@ -508,7 +508,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK stagedsync.StageMiningExecCfg(mock.DB, miner, nil, *mock.ChainConfig, mock.Engine, &vm.Config{}, dirs.Tmp, nil, 0, mock.TxPool, nil, mock.BlockReader), stagedsync.StageMiningFinishCfg(mock.DB, *mock.ChainConfig, mock.Engine, miner, miningCancel, mock.BlockReader, latestBlockBuiltStore), ), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, - logger) + logger, stages.ModeBlockProduction) // We start the mining step if err := stages2.MiningStep(ctx, mock.DB, proposingSync, tmpdir, logger); err != nil { return nil, err @@ -545,13 +545,13 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK ), stagedsync.StageTxLookupCfg(mock.DB, prune, dirs.Tmp, mock.ChainConfig.Bor, mock.BlockReader), stagedsync.StageFinishCfg(mock.DB, dirs.Tmp, forkValidator), !withPosDownloader), stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, - logger, + logger, stages.ModeApplyingBlocks, ) cfg.Genesis = gspec pipelineStages := stages2.NewPipelineStages(mock.Ctx, db, &cfg, p2p.Config{}, mock.sentriesClient, mock.Notifications, snapDownloader, mock.BlockReader, blockRetire, mock.agg, nil, forkValidator, logger, checkStateRoot) - mock.posStagedSync = stagedsync.New(cfg.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger) + mock.posStagedSync = stagedsync.New(cfg.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger, stages.ModeApplyingBlocks) mock.Eth1ExecutionService = eth1.NewEthereumExecutionModule(mock.BlockReader, mock.DB, mock.posStagedSync, forkValidator, mock.ChainConfig, assembleBlockPOS, nil, mock.Notifications.Accumulator, mock.Notifications.StateChangesConsumer, logger, engine, cfg.Sync, ctx) @@ -587,7 +587,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK ), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, - logger, + logger, stages.ModeBlockProduction, ) cfg.Genesis = gspec diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index ef7d73c5c98..e680913d28d 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -763,6 +763,7 @@ func NewInMemoryExecution(ctx context.Context, db kv.RwDB, cfg *ethconfig.Config stagedsync.StateUnwindOrder, nil, /* pruneOrder */ logger, + stages.ModeForkValidation, ) }