Skip to content

Commit

Permalink
commit every N blocks in BorHeimdallForward (#12706)
Browse files Browse the repository at this point in the history
step2 of #11094. 
step1:  #12097
  • Loading branch information
stevemilk authored Nov 19, 2024
1 parent d502471 commit 439f28b
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 25 deletions.
15 changes: 7 additions & 8 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,9 +787,8 @@ func stageBorHeimdall(db kv.RwDB, ctx context.Context, unwindTypes []string, log

heimdallClient := engine.(*bor.Bor).HeimdallClient

var tx kv.RwTx
if reset {
if err := reset2.ResetBorHeimdall(ctx, tx, db); err != nil {
if err := reset2.ResetBorHeimdall(ctx, nil, db); err != nil {
return err
}
return nil
Expand All @@ -800,7 +799,7 @@ func stageBorHeimdall(db kv.RwDB, ctx context.Context, unwindTypes []string, log
defer borSn.Close()
defer agg.Close()

stageState := stage(sync, tx, nil, stages.BorHeimdall)
stageState := stage(sync, nil, db, stages.BorHeimdall)

snapshotsMaxBlock := borSn.BlocksAvailable()
if unwind <= snapshotsMaxBlock {
Expand All @@ -813,11 +812,11 @@ func stageBorHeimdall(db kv.RwDB, ctx context.Context, unwindTypes []string, log

unwindState := sync.NewUnwindState(stages.BorHeimdall, stageState.BlockNumber-unwind, stageState.BlockNumber, true, false)
cfg := stagedsync.StageBorHeimdallCfg(db, nil, miningState, *chainConfig, nil, heimdallStore, bridgeStore, nil, nil, nil, nil, nil, false, unwindTypes)
if err := stagedsync.BorHeimdallUnwind(unwindState, ctx, stageState, tx, cfg); err != nil {
if err := stagedsync.BorHeimdallUnwind(unwindState, ctx, stageState, nil, cfg); err != nil {
return err
}

stageProgress, err := stagedsync.BorHeimdallStageProgress(tx, cfg)
stageProgress, err := stagedsync.BorHeimdallStageProgress(nil, cfg)
if err != nil {
return fmt.Errorf("re-read bor heimdall progress: %w", err)
}
Expand All @@ -843,12 +842,12 @@ func stageBorHeimdall(db kv.RwDB, ctx context.Context, unwindTypes []string, log
}
cfg := stagedsync.StageBorHeimdallCfg(db, snapDb, miningState, *chainConfig, heimdallClient, heimdallStore, bridgeStore, blockReader, nil, nil, recents, signatures, false, unwindTypes)

stageState := stage(sync, tx, nil, stages.BorHeimdall)
if err := stagedsync.BorHeimdallForward(stageState, sync, ctx, tx, cfg, logger); err != nil {
stageState := stage(sync, nil, db, stages.BorHeimdall)
if err := stagedsync.BorHeimdallForward(stageState, sync, ctx, nil, cfg, logger); err != nil {
return err
}

stageProgress, err := stagedsync.BorHeimdallStageProgress(tx, cfg)
stageProgress, err := stagedsync.BorHeimdallStageProgress(nil, cfg)
if err != nil {
return fmt.Errorf("re-read bor heimdall progress: %w", err)
}
Expand Down
52 changes: 35 additions & 17 deletions eth/stagedsync/stage_bor_heimdall.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,9 @@ func BorHeimdallForward(
defer logTimer.Stop()

logger.Info(fmt.Sprintf("[%s] Processing sync events...", s.LogPrefix()), "from", lastBlockNum+1, "to", headNumber)

var nextEventRecord *heimdall.EventRecordWithTime

// sometimes via config eveents are skipped from particular blocks and
// sometimes via config events are skipped from particular blocks and
// pushed into the next one, when this happens we need to skip validation
// as the times won't match the expected window. In practice it only affects
// these blocks: 14949120,14949184, 14953472, 14953536, 14953600, 14953664,
Expand All @@ -252,7 +251,24 @@ func BorHeimdallForward(
// this becomes more prevalent this will need to be re-thought
var skipCount int

// allow committing every N blocks to avoid long transaction and potential progress lost when running `./build/bin/integration stage_bor_heimdall` forward operation manually,
// for more details see the use case: https://github.com/erigontech/erigon/pull/12706#issuecomment-2477818677,
// N=1000 is not verified to be most optimal value, but works fine in unit tests
var commitBatchLimit = 1_000
var commitCnt int

// newTx==true means a batch has been committed and should init a fresh new tx to handle next batch
newTx := false
for blockNum = lastBlockNum + 1; blockNum <= headNumber; blockNum++ {
if !useExternalTx && newTx {
newTx = false
tx, err = cfg.db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback() // rollback nil tx is supported
chainReader = NewChainReaderImpl(cfg.chainConfig, tx, cfg.blockReader, logger)
}
select {
default:
case <-logTimer.C:
Expand Down Expand Up @@ -283,7 +299,7 @@ func BorHeimdallForward(
}

// Whitelist whitelistService is called to check if the bor chainReader is
// on the cannonical chainReader according to milestones
// on the canonical chainReader according to milestones
if whitelistService != nil && !whitelistService.IsValidChain(blockNum, []*types.Header{header}) {
logger.Debug(
fmt.Sprintf("[%s] Verification failed for header", s.LogPrefix()),
Expand Down Expand Up @@ -350,7 +366,7 @@ func BorHeimdallForward(

snapInitTime = snapInitTime + time.Since(snapStart)

if err = persistValidatorSets(
err = persistValidatorSets(
snap,
u,
tx,
Expand All @@ -363,7 +379,8 @@ func BorHeimdallForward(
cfg.snapDb,
logger,
s.LogPrefix(),
); err != nil {
)
if err != nil {
return fmt.Errorf("can't persist validator sets: %w", err)
}
}
Expand Down Expand Up @@ -426,17 +443,19 @@ func BorHeimdallForward(
fetchTime += callTime
syncEventTime = syncEventTime + time.Since(syncEventStart)

}

if err = s.Update(tx, headNumber); err != nil {
return err
}

lastStateSyncEventID, _, _ = cfg.blockReader.LastEventId(ctx, tx)

if !useExternalTx {
if err = tx.Commit(); err != nil {
return err
commitCnt++
if !useExternalTx {
if commitCnt >= commitBatchLimit || blockNum == headNumber {
if err = s.Update(tx, blockNum); err != nil {
return err
}
lastStateSyncEventID, _, _ = cfg.blockReader.LastEventId(ctx, tx)
if err = tx.Commit(); err != nil {
return err
}
commitCnt = 0
newTx = true
}
}
}

Expand All @@ -455,7 +474,6 @@ func BorHeimdallForward(
"waypoint time", waypointTime,
"process time", time.Since(processStart),
)

return
}

Expand Down

0 comments on commit 439f28b

Please sign in to comment.