From f79d59fb89a58c9c511f1ca0e727a76e57ef04c4 Mon Sep 17 00:00:00 2001 From: Francisco Moura Date: Fri, 21 Jun 2024 16:56:19 -0300 Subject: [PATCH] feat(input-reader): Add input-reader to node Supervisor --- internal/inputreader/inputbox.go | 55 +++++++++++++++ internal/inputreader/inputreader.go | 19 +++--- internal/inputreader/inputreader_service.go | 76 +++++++++++++++++++++ internal/inputreader/inputreader_test.go | 52 +++++++------- internal/node/services.go | 13 ++++ 5 files changed, 180 insertions(+), 35 deletions(-) create mode 100644 internal/inputreader/inputbox.go create mode 100644 internal/inputreader/inputreader_service.go diff --git a/internal/inputreader/inputbox.go b/internal/inputreader/inputbox.go new file mode 100644 index 000000000..f3394de6f --- /dev/null +++ b/internal/inputreader/inputbox.go @@ -0,0 +1,55 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package inputreader + +import ( + "math/big" + + "github.com/cartesi/rollups-node/pkg/contracts/inputbox" + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethclient" +) + +// InputBox Wrapper +type InputBoxInputSource struct { + inputbox *inputbox.InputBox +} + +func NewInputBoxInputSource( + inputBoxAddress common.Address, + client *ethclient.Client, +) (*InputBoxInputSource, error) { + inputbox, err := inputbox.NewInputBox(inputBoxAddress, client) + if err != nil { + return nil, err + } + return &InputBoxInputSource{ + inputbox: inputbox, + }, nil +} + +func (i *InputBoxInputSource) RetrieveInputs( + opts *bind.FilterOpts, + appContract []common.Address, + index []*big.Int, +) ([]*inputbox.InputBoxInputAdded, error) { + + itr, err := i.inputbox.FilterInputAdded(opts, appContract, index) + if err != nil { + return nil, err + } + defer itr.Close() + + var events []*inputbox.InputBoxInputAdded + for itr.Next() { + inputAddedEvent := itr.Event + events = append(events, inputAddedEvent) + } + err = itr.Error() + if err != nil { + return nil, err + } + return events, nil +} diff --git a/internal/inputreader/inputreader.go b/internal/inputreader/inputreader.go index 3a6529629..55b79aa2d 100644 --- a/internal/inputreader/inputreader.go +++ b/internal/inputreader/inputreader.go @@ -39,12 +39,12 @@ type InputSource interface { // Interface for the node repository type InputReaderRepository interface { - InsertInputsAndUpdateMostRecentFinalizedBlockNumber( + InsertInputsAndUpdateMostRecentlyFinalizedBlock( ctx context.Context, inputs []*model.Input, blockNumber uint64, ) error - GetMostRecentFinalizedBlockNumber( + GetMostRecentlyFinalizedBlock( ctx context.Context, ) (uint64, error) } @@ -67,7 +67,7 @@ func (r InputReader) String() string { } // Creates a new InputReader. -func NewInputReader( +func newInputReader( client EthClient, inputSource InputSource, repository InputReaderRepository, @@ -90,7 +90,7 @@ func (r InputReader) Start( ready chan<- struct{}, ) error { // Check the last block processed by the the Input Reader - storedMostRecentFinalizedBlockNumber, err := r.repository.GetMostRecentFinalizedBlockNumber(ctx) + storedMostRecentFinalizedBlockNumber, err := r.repository.GetMostRecentlyFinalizedBlock(ctx) if err != nil { return err } @@ -158,14 +158,15 @@ func (r InputReader) readInputs( var inputs = []*model.Input{} for _, event := range inputsEvents { input := model.Input{ - Index: event.Index.Uint64(), - Status: "Enqueued", - Blob: event.Input, + Index: event.Index.Uint64(), + CompletionStatus: model.InputStatusNone, + Blob: event.Input, + BlockNumber: event.Raw.BlockNumber, } inputs = append(inputs, &input) } - err = r.repository.InsertInputsAndUpdateMostRecentFinalizedBlockNumber( + err = r.repository.InsertInputsAndUpdateMostRecentlyFinalizedBlock( ctx, inputs, *opts.End) @@ -199,7 +200,7 @@ func (r InputReader) watchForNewInputs( case <-headers: storedMostRecentFinalizedBlockNumber, err := r.repository. - GetMostRecentFinalizedBlockNumber(ctx) + GetMostRecentlyFinalizedBlock(ctx) if err != nil { return fmt.Errorf( "failed to retrieve known most recent finalized block from repo. %v", diff --git a/internal/inputreader/inputreader_service.go b/internal/inputreader/inputreader_service.go new file mode 100644 index 000000000..db6dccdf0 --- /dev/null +++ b/internal/inputreader/inputreader_service.go @@ -0,0 +1,76 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package inputreader + +import ( + "context" + + "github.com/cartesi/rollups-node/internal/repository" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethclient" +) + +// Service to manage InputReader lifecycle +type InputReaderService struct { + blockchainHttpEndpoint string + postgresEndpoint string + inputBoxAddress common.Address + inputBoxBlockNumber uint64 + applicationAddress common.Address +} + +func NewInputReaderService( + blockchainHttpEndpoint string, + postgresEndpoint string, + inputBoxAddress common.Address, + inputBoxBlockNumber uint64, + applicationAddress common.Address, +) InputReaderService { + return InputReaderService{ + blockchainHttpEndpoint: blockchainHttpEndpoint, + postgresEndpoint: postgresEndpoint, + inputBoxAddress: inputBoxAddress, + inputBoxBlockNumber: inputBoxBlockNumber, + applicationAddress: applicationAddress, + } +} + +func (s InputReaderService) Start( + ctx context.Context, + ready chan<- struct{}, +) error { + + db, err := repository.Connect(ctx, s.postgresEndpoint) + + if err != nil { + return err + } + + client, err := ethclient.DialContext(ctx, s.blockchainHttpEndpoint) + + if err != nil { + return err + } + + inputBoxWrapper, err := NewInputBoxInputSource(s.inputBoxAddress, client) + + if err != nil { + return err + } + + reader := newInputReader( + client, + inputBoxWrapper, + db, + s.inputBoxAddress, + s.inputBoxBlockNumber, + s.applicationAddress, + ) + + return reader.Start(ctx, ready) +} + +func (s InputReaderService) String() string { + return "input-reader" +} diff --git a/internal/inputreader/inputreader_test.go b/internal/inputreader/inputreader_test.go index 901b72741..4a076191c 100644 --- a/internal/inputreader/inputreader_test.go +++ b/internal/inputreader/inputreader_test.go @@ -108,7 +108,7 @@ func (s *InputReaderSuite) SetupTest() { s.client = newMockEthClient() s.inputBox = newMockInputBox(s) s.repository = newMockRepository() - inputReader := NewInputReader( + inputReader := newInputReader( s.client, s.inputBox, s.repository, @@ -144,7 +144,7 @@ func (s *InputReaderSuite) TestItEventuallyBecomesReady() { case <-ready: s.repository.AssertNumberOfCalls( s.T(), - "InsertInputsAndUpdateMostRecentFinalizedBlockNumber", + "InsertInputsAndUpdateMostRecentlyFinalizedBlock", 1, ) case err := <-errChannel: @@ -180,9 +180,9 @@ func (s *InputReaderSuite) TestItFailsToUpdateMostRecentFinalizedBlockOnStart() ready := make(chan struct{}, 1) errChannel := make(chan error, 1) - s.repository.Unset("InsertInputsAndUpdateMostRecentFinalizedBlockNumber") + s.repository.Unset("InsertInputsAndUpdateMostRecentlyFinalizedBlock") s.repository.On( - "InsertInputsAndUpdateMostRecentFinalizedBlockNumber", + "InsertInputsAndUpdateMostRecentlyFinalizedBlock", mock.Anything, mock.Anything, mock.Anything, @@ -233,7 +233,7 @@ func (s *InputReaderSuite) TestItFailsToSubscribeForNewInputsOnStart() { func (s *InputReaderSuite) TestItReadsAllPastInputs() { // Set finalized block - inputReader := NewInputReader( + inputReader := newInputReader( s.client, s.inputBox, s.repository, @@ -276,7 +276,7 @@ func (s *InputReaderSuite) TestItReadsAllPastInputs() { s.inputBox.AssertNumberOfCalls(s.T(), "RetrieveInputs", 1) s.repository.AssertNumberOfCalls( s.T(), - "InsertInputsAndUpdateMostRecentFinalizedBlockNumber", + "InsertInputsAndUpdateMostRecentlyFinalizedBlock", 1, ) @@ -288,7 +288,7 @@ func (s *InputReaderSuite) TestItReadsInputsFromNewBlocks() { client := FakeEhtClient{} client.NewHeaders = []*types.Header{&header1} client.WaitGroup = &waitGroup - inputReader := NewInputReader( + inputReader := newInputReader( &client, s.inputBox, s.repository, @@ -346,15 +346,15 @@ func (s *InputReaderSuite) TestItReadsInputsFromNewBlocks() { ).Return(events_1, nil) // Prepare Repo - s.repository.Unset("GetMostRecentFinalizedBlockNumber") + s.repository.Unset("GetMostRecentlyFinalizedBlock") s.repository.On( - "GetMostRecentFinalizedBlockNumber", + "GetMostRecentlyFinalizedBlock", mock.Anything, ). Once(). Return(uint64(0x0), nil) s.repository.On( - "GetMostRecentFinalizedBlockNumber", + "GetMostRecentlyFinalizedBlock", mock.Anything, ). Once(). @@ -381,14 +381,14 @@ func (s *InputReaderSuite) TestItReadsInputsFromNewBlocks() { s.inputBox.AssertNumberOfCalls(s.T(), "RetrieveInputs", 2) s.repository.AssertNumberOfCalls( s.T(), - "InsertInputsAndUpdateMostRecentFinalizedBlockNumber", + "InsertInputsAndUpdateMostRecentlyFinalizedBlock", 2, ) } func (s *InputReaderSuite) TestItReadsMultipleInputsFromSingleNewBlock() { - inputReader := NewInputReader( + inputReader := newInputReader( s.client, s.inputBox, s.repository, @@ -422,16 +422,16 @@ func (s *InputReaderSuite) TestItReadsMultipleInputsFromSingleNewBlock() { ).Return(events_2, nil) // Prepare Repo - s.repository.Unset("GetMostRecentFinalizedBlockNumber") + s.repository.Unset("GetMostRecentlyFinalizedBlock") s.repository.On( - "GetMostRecentFinalizedBlockNumber", + "GetMostRecentlyFinalizedBlock", mock.Anything, ). Once(). Return(uint64(0x12), nil) - s.repository.Unset("InsertInputsAndUpdateMostRecentFinalizedBlockNumber") + s.repository.Unset("InsertInputsAndUpdateMostRecentlyFinalizedBlock") s.repository.On( - "InsertInputsAndUpdateMostRecentFinalizedBlockNumber", + "InsertInputsAndUpdateMostRecentlyFinalizedBlock", mock.Anything, mock.Anything, mock.Anything, @@ -461,14 +461,14 @@ func (s *InputReaderSuite) TestItReadsMultipleInputsFromSingleNewBlock() { s.inputBox.AssertNumberOfCalls(s.T(), "RetrieveInputs", 1) s.repository.AssertNumberOfCalls( s.T(), - "InsertInputsAndUpdateMostRecentFinalizedBlockNumber", + "InsertInputsAndUpdateMostRecentlyFinalizedBlock", 1, ) } -func (s *InputReaderSuite) TestItStartsWhenInputBoxBlockIsTheMostRecentFinalizedBlock() { +func (s *InputReaderSuite) TestItStartsWhenInputBoxBlockIsTheMostRecentlyFinalizedBlock() { - inputReader := NewInputReader( + inputReader := newInputReader( s.client, s.inputBox, s.repository, @@ -486,9 +486,9 @@ func (s *InputReaderSuite) TestItStartsWhenInputBoxBlockIsTheMostRecentFinalized ).Return(&header0, nil).Once() // Prepare Repo - s.repository.Unset("GetMostRecentFinalizedBlockNumber") + s.repository.Unset("GetMostRecentlyFinalizedBlock") s.repository.On( - "GetMostRecentFinalizedBlockNumber", + "GetMostRecentlyFinalizedBlock", mock.Anything, ). Once(). @@ -512,7 +512,7 @@ func (s *InputReaderSuite) TestItStartsWhenInputBoxBlockIsTheMostRecentFinalized s.inputBox.AssertNumberOfCalls(s.T(), "RetrieveInputs", 0) s.repository.AssertNumberOfCalls( s.T(), - "InsertInputsAndUpdateMostRecentFinalizedBlockNumber", + "InsertInputsAndUpdateMostRecentlyFinalizedBlock", 0, ) } @@ -650,10 +650,10 @@ type MockRepository struct { func newMockRepository() *MockRepository { repo := &MockRepository{} - repo.On("GetMostRecentFinalizedBlockNumber", + repo.On("GetMostRecentlyFinalizedBlock", mock.Anything, mock.Anything).Return(uint64(0), nil) - repo.On("InsertInputsAndUpdateMostRecentFinalizedBlockNumber", + repo.On("InsertInputsAndUpdateMostRecentlyFinalizedBlock", mock.Anything, mock.Anything, mock.Anything).Return(nil) @@ -669,7 +669,7 @@ func (m *MockRepository) Unset(methodName string) { } } -func (m *MockRepository) InsertInputsAndUpdateMostRecentFinalizedBlockNumber( +func (m *MockRepository) InsertInputsAndUpdateMostRecentlyFinalizedBlock( ctx context.Context, inputs []*model.Input, blockNumber uint64, @@ -678,7 +678,7 @@ func (m *MockRepository) InsertInputsAndUpdateMostRecentFinalizedBlockNumber( return args.Error(0) } -func (m *MockRepository) GetMostRecentFinalizedBlockNumber( +func (m *MockRepository) GetMostRecentlyFinalizedBlock( ctx context.Context, ) (uint64, error) { args := m.Called(ctx) diff --git a/internal/node/services.go b/internal/node/services.go index cd5cd46af..42fecf370 100644 --- a/internal/node/services.go +++ b/internal/node/services.go @@ -8,8 +8,10 @@ import ( "log/slog" "os" + "github.com/cartesi/rollups-node/internal/inputreader" "github.com/cartesi/rollups-node/internal/node/config" "github.com/cartesi/rollups-node/internal/services" + "github.com/ethereum/go-ethereum/common" ) // We use an enum to define the ports of each service and avoid conflicts. @@ -125,6 +127,7 @@ func newSupervisorService(c config.NodeConfig, workDir string) services.Supervis s = append(s, newHttpService(c)) s = append(s, newPostgraphileService(c, workDir)) + s = append(s, newInputReaderService(c)) supervisor := services.SupervisorService{ Name: "rollups-node", @@ -164,3 +167,13 @@ func newPostgraphileService(c config.NodeConfig, workDir string) services.Comman s.WorkDir = workDir return s } + +func newInputReaderService(c config.NodeConfig) services.Service { + return inputreader.NewInputReaderService( + c.BlockchainHttpEndpoint.Value, + c.PostgresEndpoint.Value, + common.HexToAddress(c.ContractsInputBoxAddress), + uint64(c.ContractsInputBoxDeploymentBlockNumber), + common.HexToAddress(c.ContractsApplicationAddress), + ) +}