Skip to content

Commit

Permalink
Added polling mechanism to sequencer tracker (#77)
Browse files Browse the repository at this point in the history
  • Loading branch information
begmaroman authored Apr 9, 2024
1 parent 4e76ab1 commit 2002ac3
Show file tree
Hide file tree
Showing 21 changed files with 431 additions and 454 deletions.
9 changes: 0 additions & 9 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,3 @@ packages:
SequencerTracker:
config:
filename: sequencer_tracker.generated.go
github.com/0xPolygon/cdk-data-availability/types:
config:
interfaces:
EthClient:
config:
filename: eth_client.generated.go
EthClientFactory:
config:
filename: eth_client_factory.generated.go
25 changes: 17 additions & 8 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/0xPolygon/cdk-data-availability/services/status"
"github.com/0xPolygon/cdk-data-availability/services/sync"
"github.com/0xPolygon/cdk-data-availability/synchronizer"
"github.com/0xPolygon/cdk-data-availability/types"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
_ "github.com/lib/pq"
"github.com/urfave/cli/v2"
Expand Down Expand Up @@ -99,11 +99,13 @@ func start(cliCtx *cli.Context) error {
log.Fatal(err)
}

// derive address
selfAddr := crypto.PubkeyToAddress(pk.PublicKey)

// ensure synchro/reorg start block is set
err = synchronizer.InitStartBlock(storage, types.NewEthClientFactory(), c.L1)
err = synchronizer.InitStartBlock(
storage,
etm,
c.L1.GenesisBlock,
common.HexToAddress(c.L1.PolygonValidiumAddress),
)
if err != nil {
log.Fatal(err)
}
Expand All @@ -114,7 +116,7 @@ func start(cliCtx *cli.Context) error {
go sequencerTracker.Start(cliCtx.Context)
cancelFuncs = append(cancelFuncs, sequencerTracker.Stop)

detector, err := synchronizer.NewReorgDetector(c.L1.RpcURL, 1*time.Second)
detector, err := synchronizer.NewReorgDetector(c.L1.RpcURL, time.Second)
if err != nil {
log.Fatal(err)
}
Expand All @@ -125,8 +127,15 @@ func start(cliCtx *cli.Context) error {

cancelFuncs = append(cancelFuncs, detector.Stop)

batchSynchronizer, err := synchronizer.NewBatchSynchronizer(c.L1, selfAddr,
storage, detector.Subscribe(), etm, sequencerTracker, client.NewFactory())
batchSynchronizer, err := synchronizer.NewBatchSynchronizer(
c.L1,
crypto.PubkeyToAddress(pk.PublicKey),
storage,
detector.Subscribe(),
etm,
sequencerTracker,
client.NewFactory(),
)
if err != nil {
log.Fatal(err)
}
Expand Down
16 changes: 8 additions & 8 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ type Config struct {

// L1Config is a struct that defines L1 contract and service settings
type L1Config struct {
WsURL string `mapstructure:"WsURL"`
RpcURL string `mapstructure:"RpcURL"`
PolygonValidiumAddress string `mapstructure:"PolygonValidiumAddress"`
DataCommitteeAddress string `mapstructure:"DataCommitteeAddress"`
Timeout types.Duration `mapstructure:"Timeout"`
RetryPeriod types.Duration `mapstructure:"RetryPeriod"`
BlockBatchSize uint `mapstructure:"BlockBatchSize"`
TrackSequencer bool `mapstructure:"TrackSequencer"`
RpcURL string `mapstructure:"RpcURL"`
PolygonValidiumAddress string `mapstructure:"PolygonValidiumAddress"`
DataCommitteeAddress string `mapstructure:"DataCommitteeAddress"`
Timeout types.Duration `mapstructure:"Timeout"`
RetryPeriod types.Duration `mapstructure:"RetryPeriod"`
BlockBatchSize uint `mapstructure:"BlockBatchSize"`
TrackSequencer bool `mapstructure:"TrackSequencer"`
TrackSequencerPollInterval types.Duration `mapstructure:"TrackSequencerPollInterval"`

// GenesisBlock represents the block number where PolygonValidium contract is deployed on L1
GenesisBlock uint64 `mapstructure:"GenesisBlock"`
Expand Down
6 changes: 1 addition & 5 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,9 @@ func Test_Defaults(t *testing.T) {
path string
expectedValue interface{}
}{
{
path: "L1.WsURL",
expectedValue: "ws://127.0.0.1:8546",
},
{
path: "L1.RpcURL",
expectedValue: "http://127.0.0.1:8545",
expectedValue: "ws://127.0.0.1:8546",
},
{
path: "L1.PolygonValidiumAddress",
Expand Down
4 changes: 2 additions & 2 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ const DefaultValues = `
PrivateKey = {Path = "/pk/test-member.keystore", Password = "testonly"}
[L1]
WsURL = "ws://127.0.0.1:8546"
RpcURL = "http://127.0.0.1:8545"
RpcURL = "ws://127.0.0.1:8546"
PolygonValidiumAddress = "0x8dAF17A20c9DBA35f005b6324F493785D239719d"
DataCommitteeAddress = "0x68B1D87F95878fE05B998F19b66F4baba5De1aed"
Timeout = "1m"
RetryPeriod = "5s"
BlockBatchSize = "64"
GenesisBlock = "0"
TrackSequencer = true
TrackSequencerPollInterval = "1m"
[Log]
Environment = "development" # "production" or "development"
Expand Down
4 changes: 2 additions & 2 deletions docs/running.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,14 @@ services:
PrivateKey = {Path = "/pk/test-member.keystore", Password = "testonly"} # CHANGE THIS (the password): according to the private key file password
[L1]
WsURL = "ws://URLofYourL1Node:8546" # CHANGE THIS: use the URL of your L1 node
RpcURL = "http://URLofYourL1Node:8545" # CHANGE THIS: use the URL of your L1 node
RpcURL = "http://URLofYourL1Node:8545" # CHANGE THIS: use the URL of your L1 node, can be http(s) or ws(s)
PolygonValidiumAddress = "0x8dAF17A20c9DBA35f005b6324F493785D239719d" # CHANGE THIS: Address of the Validium smart contract
DataCommitteeAddress = "0x68B1D87F95878fE05B998F19b66F4baba5De1aed" # CHANGE THIS: Address of the data availability committee smart contract
Timeout = "3m"
RetryPeriod = "5s"
BlockBatchSize = 32
TrackSequencer = true
TrackSequencerPollInterval = "1m"
[Log]
Environment = "development" # "production" or "development"
Expand Down
31 changes: 22 additions & 9 deletions etherman/etherman.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,13 @@ type DataCommittee struct {

// Etherman defines functions that should be implemented by Etherman
type Etherman interface {
GetTx(ctx context.Context, txHash common.Hash) (*types.Transaction, bool, error)
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error)
CodeAt(ctx context.Context, account common.Address, blockNumber *big.Int) ([]byte, error)

GetCurrentDataCommittee() (*DataCommittee, error)
GetCurrentDataCommitteeMembers() ([]DataCommitteeMember, error)
GetTx(ctx context.Context, txHash common.Hash) (*types.Transaction, bool, error)
TrustedSequencer(ctx context.Context) (common.Address, error)
WatchSetTrustedSequencer(
ctx context.Context,
Expand All @@ -44,7 +48,6 @@ type Etherman interface {
ctx context.Context,
events chan *polygonvalidium.PolygonvalidiumSetTrustedSequencerURL,
) (event.Subscription, error)
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
FilterSequenceBatches(
opts *bind.FilterOpts,
numBatch []uint64,
Expand All @@ -63,9 +66,9 @@ func New(ctx context.Context, cfg config.L1Config) (Etherman, error) {
ctx, cancel := context.WithTimeout(ctx, cfg.Timeout.Duration)
defer cancel()

ethClient, err := ethclient.DialContext(ctx, cfg.WsURL)
ethClient, err := ethclient.DialContext(ctx, cfg.RpcURL)
if err != nil {
log.Errorf("error connecting to %s: %+v", cfg.WsURL, err)
log.Errorf("error connecting to %s: %+v", cfg.RpcURL, err)
return nil, err
}

Expand Down Expand Up @@ -97,6 +100,21 @@ func (e *etherman) GetTx(ctx context.Context, txHash common.Hash) (*types.Transa
return e.EthClient.TransactionByHash(ctx, txHash)
}

// HeaderByNumber returns header by number from the eth client
func (e *etherman) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) {
return e.EthClient.HeaderByNumber(ctx, number)
}

// BlockByNumber returns a block by the given number
func (e *etherman) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) {
return e.EthClient.BlockByNumber(ctx, number)
}

// CodeAt returns the contract code of the given account.
func (e *etherman) CodeAt(ctx context.Context, account common.Address, blockNumber *big.Int) ([]byte, error) {
return e.EthClient.CodeAt(ctx, account, blockNumber)
}

// TrustedSequencer gets trusted sequencer address
func (e *etherman) TrustedSequencer(ctx context.Context) (common.Address, error) {
return e.CDKValidium.TrustedSequencer(&bind.CallOpts{
Expand Down Expand Up @@ -129,11 +147,6 @@ func (e *etherman) WatchSetTrustedSequencerURL(
return e.CDKValidium.WatchSetTrustedSequencerURL(&bind.WatchOpts{Context: ctx}, events)
}

// HeaderByNumber returns header by number from the eth client
func (e *etherman) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) {
return e.EthClient.HeaderByNumber(ctx, number)
}

// FilterSequenceBatches retrieves filtered batches on CDK validium
func (e *etherman) FilterSequenceBatches(opts *bind.FilterOpts,
numBatch []uint64) (*polygonvalidium.PolygonvalidiumSequenceBatchesIterator, error) {
Expand Down
161 changes: 0 additions & 161 deletions mocks/eth_client.generated.go

This file was deleted.

Loading

0 comments on commit 2002ac3

Please sign in to comment.