Skip to content

Commit

Permalink
feat(input-reader): Add input-reader to node Supervisor
Browse files Browse the repository at this point in the history
  • Loading branch information
fmoura committed Jun 27, 2024
1 parent 4ccf3ce commit f79d59f
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 35 deletions.
55 changes: 55 additions & 0 deletions internal/inputreader/inputbox.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// (c) Cartesi and individual authors (see AUTHORS)
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)

package inputreader

import (
"math/big"

"github.com/cartesi/rollups-node/pkg/contracts/inputbox"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
)

// InputBox Wrapper
type InputBoxInputSource struct {
inputbox *inputbox.InputBox
}

func NewInputBoxInputSource(
inputBoxAddress common.Address,
client *ethclient.Client,
) (*InputBoxInputSource, error) {
inputbox, err := inputbox.NewInputBox(inputBoxAddress, client)
if err != nil {
return nil, err
}
return &InputBoxInputSource{
inputbox: inputbox,
}, nil
}

func (i *InputBoxInputSource) RetrieveInputs(
opts *bind.FilterOpts,
appContract []common.Address,
index []*big.Int,
) ([]*inputbox.InputBoxInputAdded, error) {

itr, err := i.inputbox.FilterInputAdded(opts, appContract, index)
if err != nil {
return nil, err
}
defer itr.Close()

var events []*inputbox.InputBoxInputAdded
for itr.Next() {
inputAddedEvent := itr.Event
events = append(events, inputAddedEvent)
}
err = itr.Error()
if err != nil {
return nil, err
}
return events, nil
}
19 changes: 10 additions & 9 deletions internal/inputreader/inputreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ type InputSource interface {

// Interface for the node repository
type InputReaderRepository interface {
InsertInputsAndUpdateMostRecentFinalizedBlockNumber(
InsertInputsAndUpdateMostRecentlyFinalizedBlock(
ctx context.Context,
inputs []*model.Input,
blockNumber uint64,
) error
GetMostRecentFinalizedBlockNumber(
GetMostRecentlyFinalizedBlock(
ctx context.Context,
) (uint64, error)
}
Expand All @@ -67,7 +67,7 @@ func (r InputReader) String() string {
}

// Creates a new InputReader.
func NewInputReader(
func newInputReader(
client EthClient,
inputSource InputSource,
repository InputReaderRepository,
Expand All @@ -90,7 +90,7 @@ func (r InputReader) Start(
ready chan<- struct{},
) error {
// Check the last block processed by the the Input Reader
storedMostRecentFinalizedBlockNumber, err := r.repository.GetMostRecentFinalizedBlockNumber(ctx)
storedMostRecentFinalizedBlockNumber, err := r.repository.GetMostRecentlyFinalizedBlock(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -158,14 +158,15 @@ func (r InputReader) readInputs(
var inputs = []*model.Input{}
for _, event := range inputsEvents {
input := model.Input{
Index: event.Index.Uint64(),
Status: "Enqueued",
Blob: event.Input,
Index: event.Index.Uint64(),
CompletionStatus: model.InputStatusNone,
Blob: event.Input,
BlockNumber: event.Raw.BlockNumber,
}
inputs = append(inputs, &input)
}

err = r.repository.InsertInputsAndUpdateMostRecentFinalizedBlockNumber(
err = r.repository.InsertInputsAndUpdateMostRecentlyFinalizedBlock(
ctx,
inputs,
*opts.End)
Expand Down Expand Up @@ -199,7 +200,7 @@ func (r InputReader) watchForNewInputs(
case <-headers:

storedMostRecentFinalizedBlockNumber, err := r.repository.
GetMostRecentFinalizedBlockNumber(ctx)
GetMostRecentlyFinalizedBlock(ctx)
if err != nil {
return fmt.Errorf(
"failed to retrieve known most recent finalized block from repo. %v",
Expand Down
76 changes: 76 additions & 0 deletions internal/inputreader/inputreader_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// (c) Cartesi and individual authors (see AUTHORS)
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)

package inputreader

import (
"context"

"github.com/cartesi/rollups-node/internal/repository"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
)

// Service to manage InputReader lifecycle
type InputReaderService struct {
blockchainHttpEndpoint string
postgresEndpoint string
inputBoxAddress common.Address
inputBoxBlockNumber uint64
applicationAddress common.Address
}

func NewInputReaderService(
blockchainHttpEndpoint string,
postgresEndpoint string,
inputBoxAddress common.Address,
inputBoxBlockNumber uint64,
applicationAddress common.Address,
) InputReaderService {
return InputReaderService{
blockchainHttpEndpoint: blockchainHttpEndpoint,
postgresEndpoint: postgresEndpoint,
inputBoxAddress: inputBoxAddress,
inputBoxBlockNumber: inputBoxBlockNumber,
applicationAddress: applicationAddress,
}
}

func (s InputReaderService) Start(
ctx context.Context,
ready chan<- struct{},
) error {

db, err := repository.Connect(ctx, s.postgresEndpoint)

if err != nil {
return err
}

client, err := ethclient.DialContext(ctx, s.blockchainHttpEndpoint)

if err != nil {
return err
}

inputBoxWrapper, err := NewInputBoxInputSource(s.inputBoxAddress, client)

if err != nil {
return err
}

reader := newInputReader(
client,
inputBoxWrapper,
db,
s.inputBoxAddress,
s.inputBoxBlockNumber,
s.applicationAddress,
)

return reader.Start(ctx, ready)
}

func (s InputReaderService) String() string {
return "input-reader"
}
52 changes: 26 additions & 26 deletions internal/inputreader/inputreader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (s *InputReaderSuite) SetupTest() {
s.client = newMockEthClient()
s.inputBox = newMockInputBox(s)
s.repository = newMockRepository()
inputReader := NewInputReader(
inputReader := newInputReader(
s.client,
s.inputBox,
s.repository,
Expand Down Expand Up @@ -144,7 +144,7 @@ func (s *InputReaderSuite) TestItEventuallyBecomesReady() {
case <-ready:
s.repository.AssertNumberOfCalls(
s.T(),
"InsertInputsAndUpdateMostRecentFinalizedBlockNumber",
"InsertInputsAndUpdateMostRecentlyFinalizedBlock",
1,
)
case err := <-errChannel:
Expand Down Expand Up @@ -180,9 +180,9 @@ func (s *InputReaderSuite) TestItFailsToUpdateMostRecentFinalizedBlockOnStart()
ready := make(chan struct{}, 1)
errChannel := make(chan error, 1)

s.repository.Unset("InsertInputsAndUpdateMostRecentFinalizedBlockNumber")
s.repository.Unset("InsertInputsAndUpdateMostRecentlyFinalizedBlock")
s.repository.On(
"InsertInputsAndUpdateMostRecentFinalizedBlockNumber",
"InsertInputsAndUpdateMostRecentlyFinalizedBlock",
mock.Anything,
mock.Anything,
mock.Anything,
Expand Down Expand Up @@ -233,7 +233,7 @@ func (s *InputReaderSuite) TestItFailsToSubscribeForNewInputsOnStart() {

func (s *InputReaderSuite) TestItReadsAllPastInputs() {
// Set finalized block
inputReader := NewInputReader(
inputReader := newInputReader(
s.client,
s.inputBox,
s.repository,
Expand Down Expand Up @@ -276,7 +276,7 @@ func (s *InputReaderSuite) TestItReadsAllPastInputs() {
s.inputBox.AssertNumberOfCalls(s.T(), "RetrieveInputs", 1)
s.repository.AssertNumberOfCalls(
s.T(),
"InsertInputsAndUpdateMostRecentFinalizedBlockNumber",
"InsertInputsAndUpdateMostRecentlyFinalizedBlock",
1,
)

Expand All @@ -288,7 +288,7 @@ func (s *InputReaderSuite) TestItReadsInputsFromNewBlocks() {
client := FakeEhtClient{}
client.NewHeaders = []*types.Header{&header1}
client.WaitGroup = &waitGroup
inputReader := NewInputReader(
inputReader := newInputReader(
&client,
s.inputBox,
s.repository,
Expand Down Expand Up @@ -346,15 +346,15 @@ func (s *InputReaderSuite) TestItReadsInputsFromNewBlocks() {
).Return(events_1, nil)

// Prepare Repo
s.repository.Unset("GetMostRecentFinalizedBlockNumber")
s.repository.Unset("GetMostRecentlyFinalizedBlock")
s.repository.On(
"GetMostRecentFinalizedBlockNumber",
"GetMostRecentlyFinalizedBlock",
mock.Anything,
).
Once().
Return(uint64(0x0), nil)
s.repository.On(
"GetMostRecentFinalizedBlockNumber",
"GetMostRecentlyFinalizedBlock",
mock.Anything,
).
Once().
Expand All @@ -381,14 +381,14 @@ func (s *InputReaderSuite) TestItReadsInputsFromNewBlocks() {
s.inputBox.AssertNumberOfCalls(s.T(), "RetrieveInputs", 2)
s.repository.AssertNumberOfCalls(
s.T(),
"InsertInputsAndUpdateMostRecentFinalizedBlockNumber",
"InsertInputsAndUpdateMostRecentlyFinalizedBlock",
2,
)
}

func (s *InputReaderSuite) TestItReadsMultipleInputsFromSingleNewBlock() {

inputReader := NewInputReader(
inputReader := newInputReader(
s.client,
s.inputBox,
s.repository,
Expand Down Expand Up @@ -422,16 +422,16 @@ func (s *InputReaderSuite) TestItReadsMultipleInputsFromSingleNewBlock() {
).Return(events_2, nil)

// Prepare Repo
s.repository.Unset("GetMostRecentFinalizedBlockNumber")
s.repository.Unset("GetMostRecentlyFinalizedBlock")
s.repository.On(
"GetMostRecentFinalizedBlockNumber",
"GetMostRecentlyFinalizedBlock",
mock.Anything,
).
Once().
Return(uint64(0x12), nil)
s.repository.Unset("InsertInputsAndUpdateMostRecentFinalizedBlockNumber")
s.repository.Unset("InsertInputsAndUpdateMostRecentlyFinalizedBlock")
s.repository.On(
"InsertInputsAndUpdateMostRecentFinalizedBlockNumber",
"InsertInputsAndUpdateMostRecentlyFinalizedBlock",
mock.Anything,
mock.Anything,
mock.Anything,
Expand Down Expand Up @@ -461,14 +461,14 @@ func (s *InputReaderSuite) TestItReadsMultipleInputsFromSingleNewBlock() {
s.inputBox.AssertNumberOfCalls(s.T(), "RetrieveInputs", 1)
s.repository.AssertNumberOfCalls(
s.T(),
"InsertInputsAndUpdateMostRecentFinalizedBlockNumber",
"InsertInputsAndUpdateMostRecentlyFinalizedBlock",
1,
)
}

func (s *InputReaderSuite) TestItStartsWhenInputBoxBlockIsTheMostRecentFinalizedBlock() {
func (s *InputReaderSuite) TestItStartsWhenInputBoxBlockIsTheMostRecentlyFinalizedBlock() {

inputReader := NewInputReader(
inputReader := newInputReader(
s.client,
s.inputBox,
s.repository,
Expand All @@ -486,9 +486,9 @@ func (s *InputReaderSuite) TestItStartsWhenInputBoxBlockIsTheMostRecentFinalized
).Return(&header0, nil).Once()

// Prepare Repo
s.repository.Unset("GetMostRecentFinalizedBlockNumber")
s.repository.Unset("GetMostRecentlyFinalizedBlock")
s.repository.On(
"GetMostRecentFinalizedBlockNumber",
"GetMostRecentlyFinalizedBlock",
mock.Anything,
).
Once().
Expand All @@ -512,7 +512,7 @@ func (s *InputReaderSuite) TestItStartsWhenInputBoxBlockIsTheMostRecentFinalized
s.inputBox.AssertNumberOfCalls(s.T(), "RetrieveInputs", 0)
s.repository.AssertNumberOfCalls(
s.T(),
"InsertInputsAndUpdateMostRecentFinalizedBlockNumber",
"InsertInputsAndUpdateMostRecentlyFinalizedBlock",
0,
)
}
Expand Down Expand Up @@ -650,10 +650,10 @@ type MockRepository struct {
func newMockRepository() *MockRepository {
repo := &MockRepository{}

repo.On("GetMostRecentFinalizedBlockNumber",
repo.On("GetMostRecentlyFinalizedBlock",
mock.Anything,
mock.Anything).Return(uint64(0), nil)
repo.On("InsertInputsAndUpdateMostRecentFinalizedBlockNumber",
repo.On("InsertInputsAndUpdateMostRecentlyFinalizedBlock",
mock.Anything,
mock.Anything,
mock.Anything).Return(nil)
Expand All @@ -669,7 +669,7 @@ func (m *MockRepository) Unset(methodName string) {
}
}

func (m *MockRepository) InsertInputsAndUpdateMostRecentFinalizedBlockNumber(
func (m *MockRepository) InsertInputsAndUpdateMostRecentlyFinalizedBlock(
ctx context.Context,
inputs []*model.Input,
blockNumber uint64,
Expand All @@ -678,7 +678,7 @@ func (m *MockRepository) InsertInputsAndUpdateMostRecentFinalizedBlockNumber(
return args.Error(0)
}

func (m *MockRepository) GetMostRecentFinalizedBlockNumber(
func (m *MockRepository) GetMostRecentlyFinalizedBlock(
ctx context.Context,
) (uint64, error) {
args := m.Called(ctx)
Expand Down
Loading

0 comments on commit f79d59f

Please sign in to comment.