From e89a44ec1e2abe1ef2eb7dc3567a3cc1958b2968 Mon Sep 17 00:00:00 2001 From: Francisco Moura Date: Mon, 29 Jul 2024 18:58:47 -0300 Subject: [PATCH] fixup! feat(evm-reader): Add max input fetch size --- docs/config.md | 2 +- internal/evmreader/evmreader.go | 28 ++--- internal/evmreader/evmreader_test.go | 140 ++++++++++++++++++++++ internal/node/config/generate/Config.toml | 2 +- 4 files changed, 156 insertions(+), 16 deletions(-) diff --git a/docs/config.md b/docs/config.md index c6d261825..e71a8ce62 100644 --- a/docs/config.md +++ b/docs/config.md @@ -233,7 +233,7 @@ At the end of each epoch, the node will send claims to the blockchain. ## `CARTESI_EVM_READER_MAX_FETCH_SIZE` -How many blocks should be read per each input fetch. +Maximum number of blocks that can be read on each input fetch request. * **Type:** `uint` * **Default:** `"10"` diff --git a/internal/evmreader/evmreader.go b/internal/evmreader/evmreader.go index 2b52b8c1e..5a6af9bbd 100644 --- a/internal/evmreader/evmreader.go +++ b/internal/evmreader/evmreader.go @@ -182,6 +182,20 @@ func (r *EvmReader) checkForNewInputs(ctx Context) error { step := uint64(r.maxFetchSize) + mostRecentHeader, err := r.fetchMostRecentHeader( + ctx, + r.config.DefaultBlock, + ) + if err != nil { + // slog.Error("Error fetching most recent block", + // "last default block", + // r.config.DefaultBlock, + // "error", + // err) + return err + } + mostRecentBlockNumber := mostRecentHeader.Number.Uint64() + for lastProcessedBlock, apps := range groupedApps { // Safeguard: Only check blocks starting from the block where the InputBox @@ -190,20 +204,6 @@ func (r *EvmReader) checkForNewInputs(ctx Context) error { lastProcessedBlock = r.config.InputBoxDeploymentBlock - 1 } - mostRecentHeader, err := r.fetchMostRecentHeader( - ctx, - r.config.DefaultBlock, - ) - if err != nil { - slog.Error("Error fetching most recent block", - "last default block", - r.config.DefaultBlock, - "error", - err) - continue - } - mostRecentBlockNumber := mostRecentHeader.Number.Uint64() - if mostRecentBlockNumber > lastProcessedBlock { // Check block range and split requests diff --git a/internal/evmreader/evmreader_test.go b/internal/evmreader/evmreader_test.go index d81d309ad..105109eda 100644 --- a/internal/evmreader/evmreader_test.go +++ b/internal/evmreader/evmreader_test.go @@ -664,6 +664,146 @@ func (s *EvmReaderSuite) TestItReadsInputsInSmallerBatches() { ) } +func (s *EvmReaderSuite) TestItReadsInputsInSmallerBatchesWithError() { + + waitGroup := sync.WaitGroup{} + wsClient := FakeWSEhtClient{} + wsClient.NewHeaders = []*Header{&header2} + wsClient.WaitGroup = &waitGroup + inputReader := NewEvmReader( + s.client, + &wsClient, + s.inputBox, + s.repository, + model.NodePersistentConfig{ + InputBoxDeploymentBlock: 0x10, + DefaultBlock: model.DefaultBlockStatusLatest, + }, + 2, + ) + + // Prepare Client + s.client.Unset("HeaderByNumber") + s.client.On( + "HeaderByNumber", + mock.Anything, + mock.Anything, + ).Return(&header3, nil).Once() + + // Prepare sequence of inputs + // Batch 0x11 - 0x12 for application 0x2E663fe9aE92275242406A185AA4fC8174339D3E + s.inputBox.Unset("RetrieveInputs") + events_0 := []inputbox.InputBoxInputAdded{inputAddedEvent0, inputAddedEvent1} + currentMostRecentFinalizedBlockNumber_0 := uint64(0x12) + retrieveInputsOpts_0 := bind.FilterOpts{ + Context: s.ctx, + Start: 0x11, + End: ¤tMostRecentFinalizedBlockNumber_0, + } + s.inputBox.On( + "RetrieveInputs", + &retrieveInputsOpts_0, + []common.Address{common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E")}, + mock.Anything, + ).Return(events_0, nil) + + // Batch 0x13 - 0x14 for application 0x2E663fe9aE92275242406A185AA4fC8174339D3E with error + currentMostRecentFinalizedBlockNumber_1 := uint64(0x14) + retrieveInputsOpts_1 := bind.FilterOpts{ + Context: s.ctx, + Start: 0x13, + End: ¤tMostRecentFinalizedBlockNumber_1, + } + s.inputBox.On( + "RetrieveInputs", + &retrieveInputsOpts_1, + []common.Address{common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E")}, + mock.Anything, + ).Return([]inputbox.InputBoxInputAdded{}, fmt.Errorf("An error")) + + // Batch 0x12 - 0x13 for application 0xFFFF3fe9aE92275242406A185AA4fC817433EEEE" + currentMostRecentFinalizedBlockNumber_2 := uint64(0x13) + retrieveInputsOpts_2 := bind.FilterOpts{ + Context: s.ctx, + Start: 0x12, + End: ¤tMostRecentFinalizedBlockNumber_2, + } + s.inputBox.On( + "RetrieveInputs", + &retrieveInputsOpts_2, + []common.Address{common.HexToAddress("0xFFFF3fe9aE92275242406A185AA4fC817433EEEE")}, + mock.Anything, + ).Return([]inputbox.InputBoxInputAdded{}, nil) + + // Batch 0x14 - 0x15 for application 0xFFFF3fe9aE92275242406A185AA4fC817433EEEE" + currentMostRecentFinalizedBlockNumber_3 := uint64(0x15) + retrieveInputsOpts_3 := bind.FilterOpts{ + Context: s.ctx, + Start: 0x14, + End: ¤tMostRecentFinalizedBlockNumber_3, + } + s.inputBox.On( + "RetrieveInputs", + &retrieveInputsOpts_3, + []common.Address{common.HexToAddress("0xFFFF3fe9aE92275242406A185AA4fC817433EEEE")}, + mock.Anything, + ).Return([]inputbox.InputBoxInputAdded{}, nil) + + // Batch 0x16 - 0x17 for application 0xFFFF3fe9aE92275242406A185AA4fC817433EEEE" + currentMostRecentFinalizedBlockNumber_4 := uint64(0x16) + retrieveInputsOpts_4 := bind.FilterOpts{ + Context: s.ctx, + Start: 0x16, + End: ¤tMostRecentFinalizedBlockNumber_4, + } + s.inputBox.On( + "RetrieveInputs", + &retrieveInputsOpts_4, + []common.Address{common.HexToAddress("0xFFFF3fe9aE92275242406A185AA4fC817433EEEE")}, + mock.Anything, + ).Return([]inputbox.InputBoxInputAdded{}, nil) + + // Prepare Repo + s.repository.Unset("GetAllRunningApplications") + s.repository.On( + "GetAllRunningApplications", + mock.Anything, + ).Return([]Application{ + { + ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), + LastProcessedBlock: 0x10, + }, + { + ContractAddress: common.HexToAddress("0xFFFF3fe9aE92275242406A185AA4fC817433EEEE"), + LastProcessedBlock: 0x11, + }}, nil).Once() + + // Start service + ready := make(chan struct{}, 1) + errChannel := make(chan error, 1) + + waitGroup.Add(1) + go func() { + errChannel <- inputReader.Run(s.ctx, ready) + }() + + select { + case <-ready: + break + case err := <-errChannel: + s.FailNow("unexpected error signal", err) + } + + waitGroup.Wait() + + s.inputBox.AssertNumberOfCalls(s.T(), "RetrieveInputs", 5) + s.repository.AssertNumberOfCalls( + s.T(), + "InsertInputsAndUpdateLastProcessedBlock", + 4, + ) +} + // Mock EthClient type MockEthClient struct { mock.Mock diff --git a/internal/node/config/generate/Config.toml b/internal/node/config/generate/Config.toml index 99d2211ba..c250bac41 100644 --- a/internal/node/config/generate/Config.toml +++ b/internal/node/config/generate/Config.toml @@ -60,7 +60,7 @@ How seconds the retry policy will wait between retries.""" default = "10" go-type = "uint" description = """ -How many blocks should be read per each input fetch.""" +Maximum number of blocks that can be read on each input fetch request.""" # # Blockchain