Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additional fix for externalCL integration (prevent MDBX_MAP_FULL) #12922

Merged
merged 1 commit into from
Nov 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion erigon-lib/kv/mdbx/kv_mdbx_temporary.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func NewTemporaryMdbx(ctx context.Context, tempdir string) (kv.RwDB, error) {
return &TemporaryMdbx{}, err
}

db, err := New(kv.ChainDB, log.Root()).InMem(path).GrowthStep(64 * datasize.MB).Open(ctx)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reverts the change from the previous PR

db, err := New(kv.ChainDB, log.Root()).InMem(path).Open(ctx)
if err != nil {
return &TemporaryMdbx{}, err
}
Expand Down
80 changes: 8 additions & 72 deletions turbo/engineapi/engine_block_downloader/block_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package engine_block_downloader

import (
"bytes"
"context"
"encoding/binary"
"fmt"
Expand All @@ -35,7 +34,6 @@ import (
"github.com/erigontech/erigon-lib/etl"
execution "github.com/erigontech/erigon-lib/gointerfaces/executionproto"
"github.com/erigontech/erigon-lib/kv"
"github.com/erigontech/erigon-lib/kv/dbutils"

"github.com/erigontech/erigon-lib/rlp"
"github.com/erigontech/erigon/core/rawdb"
Expand Down Expand Up @@ -164,10 +162,11 @@ func (e *EngineBlockDownloader) waitForEndOfHeadersDownload(ctx context.Context)
}

// waitForEndOfHeadersDownload waits until the download of headers ends and returns the outcome.
func (e *EngineBlockDownloader) loadDownloadedHeaders(tx kv.RwTx) (fromBlock uint64, toBlock uint64, fromHash libcommon.Hash, err error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return value fromHash will not be used, the function was changed accordingly

func (e *EngineBlockDownloader) loadDownloadedHeaders(tx kv.RwTx) (fromBlock uint64, toBlock uint64, err error) {
var lastValidHash libcommon.Hash
var badChainError error // TODO(yperbasis): this is not set anywhere
var foundPow bool
var found bool

headerLoadFunc := func(key, value []byte, _ etl.CurrentTableReader, _ etl.LoadNextFunc) error {
var h types.Header
Expand All @@ -185,8 +184,8 @@ func (e *EngineBlockDownloader) loadDownloadedHeaders(tx kv.RwTx) (fromBlock uin
lastValidHash = h.ParentHash
// If we are in PoW range then block validation is not required anymore.
if foundPow {
if (fromHash == libcommon.Hash{}) {
fromHash = h.Hash()
if !found {
found = true
fromBlock = h.Number.Uint64()
}
toBlock = h.Number.Uint64()
Expand All @@ -195,15 +194,15 @@ func (e *EngineBlockDownloader) loadDownloadedHeaders(tx kv.RwTx) (fromBlock uin

foundPow = h.Difficulty.Sign() != 0
if foundPow {
if (fromHash == libcommon.Hash{}) {
fromHash = h.Hash()
if !found {
found = true
fromBlock = h.Number.Uint64()
}
toBlock = h.Number.Uint64()
return saveHeader(tx, &h, h.Hash())
}
if (fromHash == libcommon.Hash{}) {
fromHash = h.Hash()
if !found {
found = true
fromBlock = h.Number.Uint64()
}
toBlock = h.Number.Uint64()
Expand Down Expand Up @@ -238,66 +237,3 @@ func saveHeader(db kv.RwTx, header *types.Header, hash libcommon.Hash) error {
}
return nil
}

func (e *EngineBlockDownloader) insertHeadersAndBodies(ctx context.Context, tx kv.Tx, fromBlock uint64, fromHash libcommon.Hash, toBlock uint64) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function was removed, as its functionality was moved to the function downloadAndLoadBodiesSyncronously which now inserts blocks via the same API as Caplin, without the need to accumulate them in the tempDb (and potentially cause MDBX_MAP_FULL)

blockBatchSize := 500
blockWrittenLogSize := 20_000
// We divide them in batches
blocksBatch := []*types.Block{}

headersCursors, err := tx.Cursor(kv.Headers)
if err != nil {
return err
}
inserted := uint64(0)

log.Info("Beginning downloaded blocks insertion")
// Start by seeking headers
for k, v, err := headersCursors.Seek(dbutils.HeaderKey(fromBlock, fromHash)); k != nil; k, v, err = headersCursors.Next() {
if err != nil {
return err
}
if len(blocksBatch) == blockBatchSize {
if err := e.chainRW.InsertBlocksAndWait(ctx, blocksBatch); err != nil {
return err
}
currentHeader := e.chainRW.CurrentHeader(ctx)
lastBlockNumber := blocksBatch[len(blocksBatch)-1].NumberU64()
isForkChoiceNeeded := currentHeader == nil || lastBlockNumber > currentHeader.Number.Uint64()
inserted += uint64(len(blocksBatch))
if inserted >= uint64(e.syncCfg.LoopBlockLimit) {
lastHash := blocksBatch[len(blocksBatch)-1].Hash()
if isForkChoiceNeeded {
if _, _, _, err := e.chainRW.UpdateForkChoice(ctx, lastHash, lastHash, lastHash); err != nil {
return err
}
}
inserted = 0
}
blocksBatch = blocksBatch[:0]
}
header := new(types.Header)
if err := rlp.Decode(bytes.NewReader(v), header); err != nil {
e.logger.Error("Invalid block header RLP", "err", err)
return nil
}
number := header.Number.Uint64()
if number > toBlock {
return e.chainRW.InsertBlocksAndWait(ctx, blocksBatch)
}
hash := header.Hash()
body, err := rawdb.ReadBodyWithTransactions(tx, hash, number)
if err != nil {
return err
}
if body == nil {
return fmt.Errorf("missing body at block=%d", number)
}
blocksBatch = append(blocksBatch, types.NewBlockFromStorage(hash, header, body.Transactions, body.Uncles, body.Withdrawals))
if number%uint64(blockWrittenLogSize) == 0 {
e.logger.Info("[insertHeadersAndBodies] Written blocks", "progress", number, "to", toBlock)
}
}
return e.chainRW.InsertBlocksAndWait(ctx, blocksBatch)

}
27 changes: 21 additions & 6 deletions turbo/engineapi/engine_block_downloader/body.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
libcommon "github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon-lib/common/dbg"
"github.com/erigontech/erigon-lib/kv"
"github.com/erigontech/erigon/core/rawdb"
"github.com/erigontech/erigon/core/types"
"github.com/erigontech/erigon/dataflow"
"github.com/erigontech/erigon/eth/stagedsync/stages"
"github.com/erigontech/erigon/turbo/stages/bodydownload"
Expand Down Expand Up @@ -80,6 +80,9 @@ func (e *EngineBlockDownloader) downloadAndLoadBodiesSyncronously(ctx context.Co
prevProgress := bodyProgress
var noProgressCount uint = 0 // How many time the progress was printed without actual progress
var totalDelivered uint64 = 0
blockBatchSize := 500
// We divide them in batches
blocksBatch := []*types.Block{}

loopBody := func() (bool, error) {
// loopCount is used here to ensure we don't get caught in a constant loop of making requests
Expand Down Expand Up @@ -146,14 +149,20 @@ func (e *EngineBlockDownloader) downloadAndLoadBodiesSyncronously(ctx context.Co
return false, fmt.Errorf("[%s] Header block unexpected when matching body, got %v, expected %v", logPrefix, blockHeight, nextBlock)
}

if len(blocksBatch) == blockBatchSize {
if err := e.chainRW.InsertBlocksAndWait(ctx, blocksBatch); err != nil {
return false, fmt.Errorf("InsertBlock: %w", err)
}
blocksBatch = blocksBatch[:0]
}
// Check existence before write - because WriteRawBody isn't idempotent (it allocates new sequence range for transactions on every call)
ok, err := rawdb.WriteRawBodyIfNotExists(tx, header.Hash(), blockHeight, rawBody)
rawBlock := types.RawBlock{Header: header, Body: rawBody}
block, err := rawBlock.AsBlock()
if err != nil {
return false, fmt.Errorf("WriteRawBodyIfNotExists: %w", err)
}
if ok {
dataflow.BlockBodyDownloadStates.AddChange(blockHeight, dataflow.BlockBodyCleared)
return false, fmt.Errorf("Could not construct block: %w", err)
}
blocksBatch = append(blocksBatch, block)
dataflow.BlockBodyDownloadStates.AddChange(blockHeight, dataflow.BlockBodyCleared)

if blockHeight > bodyProgress {
bodyProgress = blockHeight
Expand Down Expand Up @@ -202,6 +211,12 @@ func (e *EngineBlockDownloader) downloadAndLoadBodiesSyncronously(ctx context.Co
return err
}
}
if len(blocksBatch) > 0 {
if err := e.chainRW.InsertBlocksAndWait(ctx, blocksBatch); err != nil {
return fmt.Errorf("InsertBlock: %w", err)
}
blocksBatch = blocksBatch[:0]
}

if stopped {
return libcommon.ErrStopped
Expand Down
7 changes: 1 addition & 6 deletions turbo/engineapi/engine_block_downloader/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (e *EngineBlockDownloader) download(ctx context.Context, hashToDownload lib
return
}
}
startBlock, endBlock, startHash, err := e.loadDownloadedHeaders(memoryMutation)
startBlock, endBlock, err := e.loadDownloadedHeaders(memoryMutation)
if err != nil {
e.logger.Warn("[EngineBlockDownloader] Could not load headers", "err", err)
e.status.Store(headerdownload.Idle)
Expand All @@ -102,11 +102,6 @@ func (e *EngineBlockDownloader) download(ctx context.Context, hashToDownload lib
return
}
tx.Rollback() // Discard the original db tx
if err := e.insertHeadersAndBodies(ctx, tmpTx, startBlock, startHash, endBlock); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call is removed, since insertion of blocks was performed in the call to downloadAndLoadBodiesSyncronously

e.logger.Warn("[EngineBlockDownloader] Could not insert headers and bodies", "err", err)
e.status.Store(headerdownload.Idle)
return
}
e.logger.Info("[EngineBlockDownloader] Finished downloading blocks", "from", startBlock-1, "to", endBlock)
if block == nil {
e.status.Store(headerdownload.Idle)
Expand Down
Loading