Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/evm reader smaller batches #536

Draft
wants to merge 4 commits into
base: next/2.0
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/cartesi-rollups-evm-reader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ var (
)

const (
CMD_NAME = "evm-reader"
devnetInputBoxDeploymentBlockNumber = uint64(16)
CMD_NAME = "evm-reader"
)

var Cmd = &cobra.Command{
Expand Down Expand Up @@ -158,6 +157,7 @@ func run(cmd *cobra.Command, args []string) {
database,
c.EvmReaderRetryPolicyMaxRetries,
c.EvmReaderRetryPolicyMaxDelay,
c.EvmReaderMaxFetchSize,
)

// logs startup time
Expand Down
7 changes: 7 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,13 @@ At the end of each epoch, the node will send claims to the blockchain.
* **Type:** `uint64`
* **Default:** `"7200"`

## `CARTESI_EVM_READER_MAX_FETCH_SIZE`

Maximum number of blocks that can be read on each input fetch request.

* **Type:** `uint`
* **Default:** `"10"`

## `CARTESI_EVM_READER_RETRY_POLICY_MAX_DELAY`

How seconds the retry policy will wait between retries.
Expand Down
91 changes: 51 additions & 40 deletions internal/evmreader/evmreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,12 @@ func (e *SubscriptionError) Error() string {

// EvmReader reads inputs from the blockchain
type EvmReader struct {
client EthClient
wsClient EthWsClient
inputSource InputSource
repository EvmReaderRepository
config NodePersistentConfig
client EthClient
wsClient EthWsClient
inputSource InputSource
repository EvmReaderRepository
config NodePersistentConfig
maxFetchSize uint
}

func (r *EvmReader) String() string {
Expand All @@ -104,13 +105,15 @@ func NewEvmReader(
inputSource InputSource,
repository EvmReaderRepository,
config NodePersistentConfig,
maxFetchSize uint,
) EvmReader {
return EvmReader{
client: client,
wsClient: wsClient,
inputSource: inputSource,
repository: repository,
config: config,
client: client,
wsClient: wsClient,
inputSource: inputSource,
repository: repository,
config: config,
maxFetchSize: maxFetchSize,
}
}

Expand Down Expand Up @@ -177,6 +180,17 @@ func (r *EvmReader) checkForNewInputs(ctx Context) error {

groupedApps := r.classifyApplicationsByLastProcessedInput(apps)

step := uint64(r.maxFetchSize)

mostRecentHeader, err := r.fetchMostRecentHeader(
ctx,
r.config.DefaultBlock,
)
if err != nil {
return err
}
mostRecentBlockNumber := mostRecentHeader.Number.Uint64()

for lastProcessedBlock, apps := range groupedApps {

// Safeguard: Only check blocks starting from the block where the InputBox
Expand All @@ -185,42 +199,39 @@ func (r *EvmReader) checkForNewInputs(ctx Context) error {
lastProcessedBlock = r.config.InputBoxDeploymentBlock - 1
}

currentMostRecentFinalizedHeader, err := r.fetchMostRecentHeader(
ctx,
r.config.DefaultBlock,
)
if err != nil {
slog.Error("Error fetching most recent block",
"last default block",
r.config.DefaultBlock,
"error",
err)
continue
}
currentMostRecentFinalizedBlockNumber := currentMostRecentFinalizedHeader.Number.Uint64()
if mostRecentBlockNumber > lastProcessedBlock {

if currentMostRecentFinalizedBlockNumber > lastProcessedBlock {
// Check block range and split requests
start := lastProcessedBlock + 1
end := uint64(0)
for ; start <= mostRecentBlockNumber; start = start + step {

err = r.readInputs(ctx,
lastProcessedBlock+1,
currentMostRecentFinalizedBlockNumber,
apps,
)
if err != nil {
slog.Error("Error reading inputs",
"start",
lastProcessedBlock+1,
"end",
currentMostRecentFinalizedBlockNumber,
"error",
err)
continue
end = start + step - 1

if end > mostRecentBlockNumber {
end = mostRecentBlockNumber
}
err = r.readInputs(ctx,
start,
end,
apps,
)
if err != nil {
slog.Error("Error reading inputs",
"start",
start,
"end",
end,
"error",
err)
break
}
}
} else if lastProcessedBlock < currentMostRecentFinalizedBlockNumber {
} else if lastProcessedBlock < mostRecentBlockNumber {
slog.Warn(
"current most recent block is lower than the last processed one",
"most recent block",
currentMostRecentFinalizedBlockNumber,
mostRecentBlockNumber,
"last processed",
lastProcessedBlock,
)
Expand Down
Loading
Loading