Skip to content

Commit

Permalink
release sync mode type. no functional changes. (#12794)
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov authored Nov 20, 2024
1 parent 4deb016 commit 2af372d
Show file tree
Hide file tree
Showing 11 changed files with 79 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,6 @@ func (r *HistoricalStatesReader) reconstructDiffedUint64List(tx kv.Tx, kvGetter
return nil, err
}
forward := remainder <= midpoint || currentStageProgress <= freshDumpSlot+clparams.SlotsPerDump
fmt.Println("forward", forward)
if forward {
compressed, err = kvGetter(dumpBucket, base_encoding.Encode64ToBytes4(freshDumpSlot))
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -1456,9 +1456,9 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig,
bridgeStore = bridge.NewSnapshotStore(bridge.NewDbStore(db), borSn, chainConfig.Bor)
heimdallStore = heimdall.NewSnapshotStore(heimdall.NewDbStore(db), borSn)
}
stages := stages2.NewDefaultStages(context.Background(), db, snapDb, p2p.Config{}, &cfg, sentryControlServer, notifications, nil, blockReader, blockRetire, agg, nil, nil,
stageList := stages2.NewDefaultStages(context.Background(), db, snapDb, p2p.Config{}, &cfg, sentryControlServer, notifications, nil, blockReader, blockRetire, agg, nil, nil,
heimdallClient, heimdallStore, bridgeStore, recents, signatures, logger)
sync := stagedsync.New(cfg.Sync, stages, stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger)
sync := stagedsync.New(cfg.Sync, stageList, stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger, stages.ModeApplyingBlocks)

miner := stagedsync.NewMiningState(&cfg.Miner)
miningCancel := make(chan struct{})
Expand Down Expand Up @@ -1498,6 +1498,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig,
stagedsync.MiningUnwindOrder,
stagedsync.MiningPruneOrder,
logger,
stages.ModeBlockProduction,
)

return engine, vmConfig, sync, miningSync, miner
Expand Down
10 changes: 5 additions & 5 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
stagedsync.StageMiningExecCfg(backend.chainDB, miner, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir, nil, 0, backend.txPool, backend.txPoolDB, blockReader),
stagedsync.StageMiningFinishCfg(backend.chainDB, *backend.chainConfig, backend.engine, miner, backend.miningSealingQuit, backend.blockReader, latestBlockBuiltStore),
), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder,
logger)
logger, stages.ModeBlockProduction)

var ethashApi *ethash.API
if casted, ok := backend.engine.(*ethash.Ethash); ok {
Expand Down Expand Up @@ -774,7 +774,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
),
stagedsync.StageSendersCfg(backend.chainDB, chainConfig, config.Sync, false, dirs.Tmp, config.Prune, blockReader, backend.sentriesClient.Hd),
stagedsync.StageMiningExecCfg(backend.chainDB, miningStatePos, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir, interrupt, param.PayloadId, backend.txPool, backend.txPoolDB, blockReader),
stagedsync.StageMiningFinishCfg(backend.chainDB, *backend.chainConfig, backend.engine, miningStatePos, backend.miningSealingQuit, backend.blockReader, latestBlockBuiltStore)), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, logger)
stagedsync.StageMiningFinishCfg(backend.chainDB, *backend.chainConfig, backend.engine, miningStatePos, backend.miningSealingQuit, backend.blockReader, latestBlockBuiltStore)), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, logger, stages.ModeBlockProduction)
// We start the mining step
if err := stages2.MiningStep(ctx, backend.chainDB, proposingSync, tmpdir, logger); err != nil {
return nil, err
Expand Down Expand Up @@ -930,7 +930,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
backend.syncPruneOrder = stagedsync.DefaultPruneOrder
}

backend.stagedSync = stagedsync.New(config.Sync, backend.syncStages, backend.syncUnwindOrder, backend.syncPruneOrder, logger)
backend.stagedSync = stagedsync.New(config.Sync, backend.syncStages, backend.syncUnwindOrder, backend.syncPruneOrder, logger, stages.ModeApplyingBlocks)

hook := stages2.NewHook(backend.sentryCtx, backend.chainDB, backend.notifications, backend.stagedSync, backend.blockReader, backend.chainConfig, backend.logger, backend.sentriesClient.SetStatus)

Expand Down Expand Up @@ -958,7 +958,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger

checkStateRoot := true
pipelineStages := stages2.NewPipelineStages(ctx, backend.chainDB, config, p2pConfig, backend.sentriesClient, backend.notifications, backend.downloaderClient, blockReader, blockRetire, backend.agg, backend.silkworm, backend.forkValidator, logger, checkStateRoot)
backend.pipelineStagedSync = stagedsync.New(config.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger)
backend.pipelineStagedSync = stagedsync.New(config.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger, stages.ModeApplyingBlocks)
backend.eth1ExecutionServer = eth1.NewEthereumExecutionModule(blockReader, backend.chainDB, backend.pipelineStagedSync, backend.forkValidator, chainConfig, assembleBlockPOS, hook, backend.notifications.Accumulator, backend.notifications.StateChangesConsumer, logger, backend.engine, config.Sync, ctx)
executionRpc := direct.NewExecutionClientDirect(backend.eth1ExecutionServer)

Expand Down Expand Up @@ -1017,7 +1017,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
backend.sentryCtx, stagedsync.StageSnapshotsCfg(
backend.chainDB, *backend.sentriesClient.ChainConfig, config.Sync, dirs, blockRetire, backend.downloaderClient,
blockReader, backend.notifications, backend.agg, false, false, false, backend.silkworm, config.Prune,
)), nil, nil, backend.logger)
)), nil, nil, backend.logger, stages.ModeApplyingBlocks)

// these range extractors set the db to the local db instead of the chain db
// TODO this needs be refactored away by having a retire/merge component per
Expand Down
15 changes: 6 additions & 9 deletions eth/stagedsync/exec3.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,7 @@ func ExecV3(ctx context.Context,
// TODO: e35 doesn't support parallel-exec yet
parallel = false //nolint

batchSize := cfg.batchSize
chainDb := cfg.db
blockReader := cfg.blockReader
engine := cfg.engine
chainConfig := cfg.chainConfig
totalGasUsed := uint64(0)
start := time.Now()
Expand All @@ -162,7 +159,7 @@ func ExecV3(ctx context.Context,
if !useExternalTx {
if !parallel {
var err error
applyTx, err = chainDb.BeginRw(ctx) //nolint
applyTx, err = cfg.db.BeginRw(ctx) //nolint
if err != nil {
return err
}
Expand Down Expand Up @@ -279,7 +276,7 @@ func ExecV3(ctx context.Context,
}
} else {
var _nothing bool
if err := chainDb.View(ctx, func(tx kv.Tx) (err error) {
if err := cfg.db.View(ctx, func(tx kv.Tx) (err error) {
if _nothing, err = nothingToExec(applyTx); err != nil {
return err
} else if _nothing {
Expand Down Expand Up @@ -347,7 +344,7 @@ func ExecV3(ctx context.Context,
applyWorker.ResetState(rs, accumulator)
defer applyWorker.LogLRUStats()

commitThreshold := batchSize.Bytes()
commitThreshold := cfg.batchSize.Bytes()
progress := NewProgress(blockNum, commitThreshold, workerCount, false, execStage.LogPrefix(), logger)
logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()
Expand All @@ -364,7 +361,7 @@ func ExecV3(ctx context.Context,
if parallel {
pe := &parallelExecutor{
execStage: execStage,
chainDb: chainDb,
chainDb: cfg.db,
applyWorker: applyWorker,
applyTx: applyTx,
outputTxNum: &outputTxNum,
Expand Down Expand Up @@ -468,7 +465,7 @@ Loop:
inputBlockNum.Store(blockNum)
executor.domains().SetBlockNum(blockNum)

b, err = blockWithSenders(ctx, chainDb, executor.tx(), blockReader, blockNum)
b, err = blockWithSenders(ctx, cfg.db, executor.tx(), blockReader, blockNum)
if err != nil {
return err
}
Expand All @@ -490,7 +487,7 @@ Loop:
return f(n)
}
totalGasUsed += b.GasUsed()
blockContext := core.NewEVMBlockContext(header, getHashFn, engine, cfg.author /* author */, chainConfig)
blockContext := core.NewEVMBlockContext(header, getHashFn, cfg.engine, cfg.author /* author */, chainConfig)
// print type of engine
if parallel {
if err := executor.status(ctx, commitThreshold); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion eth/stagedsync/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ type StageState struct {
CurrentSyncCycle CurrentSyncCycleInfo
}

func (s *StageState) LogPrefix() string { return s.state.LogPrefix() }
func (s *StageState) LogPrefix() string { return s.state.LogPrefix() }
func (s *StageState) SyncMode() stages.Mode { return s.state.mode }

// Update updates the stage state (current block number) in the database. Can be called multiple times during stage execution.
func (s *StageState) Update(db kv.Putter, newBlockNum uint64) error {
Expand Down
2 changes: 2 additions & 0 deletions eth/stagedsync/stagedsynctest/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func InitHarness(ctx context.Context, t *testing.T, cfg HarnessCfg) Harness {
stagedsync.DefaultUnwindOrder,
stagedsync.DefaultPruneOrder,
logger,
stages.ModeApplyingBlocks,
)
miningSyncStages := stagedsync.MiningStages(
ctx,
Expand All @@ -109,6 +110,7 @@ func InitHarness(ctx context.Context, t *testing.T, cfg HarnessCfg) Harness {
stagedsync.MiningUnwindOrder,
stagedsync.MiningPruneOrder,
logger,
stages.ModeBlockProduction,
)
validatorKey, err := crypto.GenerateKey()
require.NoError(t, err)
Expand Down
42 changes: 42 additions & 0 deletions eth/stagedsync/stages/sync_mode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package stages

import (
"fmt"
)

type Mode int8 // in which staged sync can run

const (
ModeUnknown Mode = iota
ModeBlockProduction
ModeForkValidation
ModeApplyingBlocks
)

func (m Mode) String() string {
switch m {
case ModeBlockProduction:
return "ModeBlockProduction"
case ModeForkValidation:
return "ModeForkValidation"
case ModeApplyingBlocks:
return "ModeApplyingBlocks"
default:
return "UnknownMode"
}
}

func ModeFromString(s string) Mode { //nolint
switch s {
case "ModeBlockProduction":
return ModeBlockProduction
case "ModeForkValidation":
return ModeForkValidation
case "ModeApplyingBlocks":
return ModeApplyingBlocks
case "UnknownMode":
return ModeUnknown
default:
panic(fmt.Sprintf("unexpected mode string: %s", s))

Check failure on line 40 in eth/stagedsync/stages/sync_mode.go

View workflow job for this annotation

GitHub Actions / lint

fmt.Sprintf can be replaced with string concatenation (perfsprint)
}
}
4 changes: 3 additions & 1 deletion eth/stagedsync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Sync struct {
logPrefixes []string
logger log.Logger
stagesIdsList []string
mode stages.Mode
}

type Timing struct {
Expand Down Expand Up @@ -207,7 +208,7 @@ func (s *Sync) SetCurrentStage(id stages.SyncStage) error {
return fmt.Errorf("stage not found with id: %v", id)
}

func New(cfg ethconfig.Sync, stagesList []*Stage, unwindOrder UnwindOrder, pruneOrder PruneOrder, logger log.Logger) *Sync {
func New(cfg ethconfig.Sync, stagesList []*Stage, unwindOrder UnwindOrder, pruneOrder PruneOrder, logger log.Logger, mode stages.Mode) *Sync {
unwindStages := make([]*Stage, len(stagesList))
for i, stageIndex := range unwindOrder {
for _, s := range stagesList {
Expand Down Expand Up @@ -243,6 +244,7 @@ func New(cfg ethconfig.Sync, stagesList []*Stage, unwindOrder UnwindOrder, prune
logPrefixes: logPrefixes,
logger: logger,
stagesIdsList: stagesIdsList,
mode: mode,
}
}

Expand Down
22 changes: 11 additions & 11 deletions eth/stagedsync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestStagesSuccess(t *testing.T) {
},
},
}
state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New())
state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New(), stages.ModeApplyingBlocks)
db, tx := memdb.NewTestTx(t)
_, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false)
assert.NoError(t, err)
Expand Down Expand Up @@ -99,7 +99,7 @@ func TestDisabledStages(t *testing.T) {
},
},
}
state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New())
state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New(), stages.ModeApplyingBlocks)
db, tx := memdb.NewTestTx(t)
_, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false)
assert.NoError(t, err)
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestErroredStage(t *testing.T) {
},
},
}
state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New())
state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New(), stages.ModeApplyingBlocks)
db, tx := memdb.NewTestTx(t)
_, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false)
assert.Equal(t, fmt.Errorf("[2/3 Bodies] %w", expectedErr), err)
Expand Down Expand Up @@ -207,7 +207,7 @@ func TestUnwindSomeStagesBehindUnwindPoint(t *testing.T) {
},
},
}
state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New())
state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New(), stages.ModeApplyingBlocks)
db, tx := memdb.NewTestTx(t)
_, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false)
assert.NoError(t, err)
Expand Down Expand Up @@ -285,7 +285,7 @@ func TestUnwind(t *testing.T) {
},
},
}
state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New())
state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New(), stages.ModeApplyingBlocks)
db, tx := memdb.NewTestTx(t)
_, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false)
assert.NoError(t, err)
Expand Down Expand Up @@ -374,7 +374,7 @@ func TestUnwindEmptyUnwinder(t *testing.T) {
},
},
}
state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New())
state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New(), stages.ModeApplyingBlocks)
db, tx := memdb.NewTestTx(t)
_, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false)
assert.NoError(t, err)
Expand Down Expand Up @@ -430,12 +430,12 @@ func TestSyncDoTwice(t *testing.T) {
},
}

state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New())
state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New(), stages.ModeApplyingBlocks)
db, tx := memdb.NewTestTx(t)
_, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false)
assert.NoError(t, err)

state = New(ethconfig.Defaults.Sync, s, nil, nil, log.New())
state = New(ethconfig.Defaults.Sync, s, nil, nil, log.New(), stages.ModeApplyingBlocks)
_, err = state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false)
assert.NoError(t, err)

Expand Down Expand Up @@ -488,14 +488,14 @@ func TestStateSyncInterruptRestart(t *testing.T) {
},
}

state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New())
state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New(), stages.ModeApplyingBlocks)
db, tx := memdb.NewTestTx(t)
_, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false)
assert.Equal(t, fmt.Errorf("[2/3 Bodies] %w", expectedErr), err)

expectedErr = nil

state = New(ethconfig.Defaults.Sync, s, nil, nil, log.New())
state = New(ethconfig.Defaults.Sync, s, nil, nil, log.New(), stages.ModeApplyingBlocks)
_, err = state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false)
assert.NoError(t, err)

Expand Down Expand Up @@ -567,7 +567,7 @@ func TestSyncInterruptLongUnwind(t *testing.T) {
},
},
}
state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New())
state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New(), stages.ModeApplyingBlocks)
db, tx := memdb.NewTestTx(t)
_, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false)
assert.Error(t, errInterrupted, err)
Expand Down
8 changes: 4 additions & 4 deletions turbo/stages/mock/mock_sentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK
stagedsync.StageMiningExecCfg(mock.DB, miner, nil, *mock.ChainConfig, mock.Engine, &vm.Config{}, dirs.Tmp, nil, 0, mock.TxPool, nil, mock.BlockReader),
stagedsync.StageMiningFinishCfg(mock.DB, *mock.ChainConfig, mock.Engine, miner, miningCancel, mock.BlockReader, latestBlockBuiltStore),
), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder,
logger)
logger, stages.ModeBlockProduction)
// We start the mining step
if err := stages2.MiningStep(ctx, mock.DB, proposingSync, tmpdir, logger); err != nil {
return nil, err
Expand Down Expand Up @@ -545,13 +545,13 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK
), stagedsync.StageTxLookupCfg(mock.DB, prune, dirs.Tmp, mock.ChainConfig.Bor, mock.BlockReader), stagedsync.StageFinishCfg(mock.DB, dirs.Tmp, forkValidator), !withPosDownloader),
stagedsync.DefaultUnwindOrder,
stagedsync.DefaultPruneOrder,
logger,
logger, stages.ModeApplyingBlocks,
)

cfg.Genesis = gspec
pipelineStages := stages2.NewPipelineStages(mock.Ctx, db, &cfg, p2p.Config{}, mock.sentriesClient, mock.Notifications,
snapDownloader, mock.BlockReader, blockRetire, mock.agg, nil, forkValidator, logger, checkStateRoot)
mock.posStagedSync = stagedsync.New(cfg.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger)
mock.posStagedSync = stagedsync.New(cfg.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger, stages.ModeApplyingBlocks)

mock.Eth1ExecutionService = eth1.NewEthereumExecutionModule(mock.BlockReader, mock.DB, mock.posStagedSync, forkValidator, mock.ChainConfig, assembleBlockPOS, nil, mock.Notifications.Accumulator, mock.Notifications.StateChangesConsumer, logger, engine, cfg.Sync, ctx)

Expand Down Expand Up @@ -587,7 +587,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK
),
stagedsync.MiningUnwindOrder,
stagedsync.MiningPruneOrder,
logger,
logger, stages.ModeBlockProduction,
)

cfg.Genesis = gspec
Expand Down
1 change: 1 addition & 0 deletions turbo/stages/stageloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,7 @@ func NewInMemoryExecution(ctx context.Context, db kv.RwDB, cfg *ethconfig.Config
stagedsync.StateUnwindOrder,
nil, /* pruneOrder */
logger,
stages.ModeForkValidation,
)
}

Expand Down

0 comments on commit 2af372d

Please sign in to comment.