Skip to content

Commit

Permalink
Merge pull request #5 from 0xPolygon/Feature/synchronizer
Browse files Browse the repository at this point in the history
Feature/synchronizer
  • Loading branch information
christophercampbell authored Aug 9, 2023
2 parents 4ace9ee + ccab482 commit e1f21ce
Show file tree
Hide file tree
Showing 20 changed files with 703 additions and 84 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ ARCH := $(shell arch)

ifeq ($(ARCH),x86_64)
ARCH = amd64
else
else
ifeq ($(ARCH),aarch64)
ARCH = arm64
endif
endif

GOBASE := $(shell pwd)
GOBIN := $(GOBASE)/dist
GOENVVARS := GOBIN=$(GOBIN) CGO_ENABLED=0 GOOS=linux GOARCH=$(ARCH)
Expand Down
30 changes: 26 additions & 4 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@ import (
"fmt"
"os"
"os/signal"
"time"

dataavailability "github.com/0xPolygon/supernets2-data-availability"
"github.com/0xPolygon/supernets2-data-availability/config"
"github.com/0xPolygon/supernets2-data-availability/db"
"github.com/0xPolygon/supernets2-data-availability/dummyinterfaces"
"github.com/0xPolygon/supernets2-data-availability/services/datacom"
"github.com/0xPolygon/supernets2-data-availability/services/sync"
"github.com/0xPolygon/supernets2-data-availability/synchronizer"
dbConf "github.com/0xPolygon/supernets2-node/db"
"github.com/0xPolygon/supernets2-node/jsonrpc"
"github.com/0xPolygon/supernets2-node/log"
"github.com/ethereum/go-ethereum/crypto"
"github.com/urfave/cli/v2"
)

Expand Down Expand Up @@ -73,17 +76,36 @@ func start(cliCtx *cli.Context) error {
if err != nil {
log.Fatal(err)
}
// derive address
selfAddr := crypto.PubkeyToAddress(pk.PublicKey)

sequencerTracker, err := datacom.NewSequencerTracker(c.L1)
var cancelFuncs []context.CancelFunc

sequencerTracker, err := synchronizer.NewSequencerTracker(c.L1)
if err != nil {
log.Fatal(err)
}
go sequencerTracker.Start()
cancelFuncs = append(cancelFuncs, sequencerTracker.Stop)

var cancelFuncs []context.CancelFunc
detector, err := synchronizer.NewReorgDetector(c.L1.RpcURL, 1*time.Second)
if err != nil {
log.Fatal(err)
}

// Register the tracker's shutdown method
cancelFuncs = append(cancelFuncs, sequencerTracker.Stop)
err = detector.Start()
if err != nil {
log.Fatal(err)
}

cancelFuncs = append(cancelFuncs, detector.Stop)

batchSynchronizer, err := synchronizer.NewBatchSynchronizer(c.L1, selfAddr, storage, detector.Subscribe())
if err != nil {
log.Fatal(err)
}
go batchSynchronizer.Start()
cancelFuncs = append(cancelFuncs, batchSynchronizer.Stop)

// Register services
server := jsonrpc.NewServer(
Expand Down
10 changes: 6 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ type Config struct {

// L1Config is a struct that defines L1 contract and service settings
type L1Config struct {
WsURL string `mapstructure:"WsURL"`
Contract string `mapstructure:"Contract"`
Timeout types.Duration `mapstructure:"Timeout"`
RetryPeriod types.Duration `mapstructure:"RetryPeriod"`
WsURL string `mapstructure:"WsURL"`
RpcURL string `mapstructure:"RpcURL"`
Supernets2Address string `mapstructure:"Supernets2Address"`
DataCommitteeAddress string `mapstructure:"DataCommitteeAddress"`
Timeout types.Duration `mapstructure:"Timeout"`
RetryPeriod types.Duration `mapstructure:"RetryPeriod"`
}

// Load loads the configuration baseed on the cli context
Expand Down
8 changes: 6 additions & 2 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@ func Test_Defaults(t *testing.T) {
}{
{
path: "L1.WsURL",
expectedValue: "ws://localhost:8546",
expectedValue: "ws://127.0.0.1:8546",
},
{
path: "L1.Contract",
path: "L1.RpcURL",
expectedValue: "http://127.0.0.1:8545",
},
{
path: "L1.Supernets2Address",
expectedValue: "0x0DCd1Bf9A1b36cE34237eEaFef220932846BCD82",
},
{
Expand Down
6 changes: 4 additions & 2 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ const DefaultValues = `
PrivateKey = {Path = "/pk/test-member.keystore", Password = "testonly"}
[L1]
WsURL = "ws://localhost:8546"
Contract = "0x0DCd1Bf9A1b36cE34237eEaFef220932846BCD82"
WsURL = "ws://127.0.0.1:8546"
RpcURL = "http://127.0.0.1:8545"
Supernets2Address = "0x0DCd1Bf9A1b36cE34237eEaFef220932846BCD82"
DataCommitteeAddress = "0x0"
Timeout = "1m"
RetryPeriod = "5s"
Expand Down
71 changes: 63 additions & 8 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/0xPolygon/supernets2-node/jsonrpc/types"
"github.com/0xPolygon/supernets2-node/state"
"github.com/ethereum/go-ethereum/common"
"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
)
Expand All @@ -25,12 +26,12 @@ func New(pg *pgxpool.Pool) *DB {
}

// BeginStateTransaction begins a DB transaction. The caller is responsible for committing or rolling back the transaction
func (d *DB) BeginStateTransaction(ctx context.Context) (pgx.Tx, error) {
return d.pg.Begin(ctx)
func (db *DB) BeginStateTransaction(ctx context.Context) (pgx.Tx, error) {
return db.pg.Begin(ctx)
}

// StoreOffChainData stores and array of key valeus in the Db
func (p *DB) StoreOffChainData(ctx context.Context, od []offchaindata.OffChainData, dbTx pgx.Tx) error {
// StoreOffChainData stores and array of key values in the Db
func (db *DB) StoreOffChainData(ctx context.Context, od []offchaindata.OffChainData, dbTx pgx.Tx) error {
const storeOffChainDataSQL = `
INSERT INTO data_node.offchain_data (key, value)
VALUES ($1, $2)
Expand All @@ -50,21 +51,75 @@ func (p *DB) StoreOffChainData(ctx context.Context, od []offchaindata.OffChainDa
}

// GetOffChainData returns the value identified by the key
func (p *DB) GetOffChainData(ctx context.Context, key common.Hash, dbTx pgx.Tx) (types.ArgBytes, error) {
func (db *DB) GetOffChainData(ctx context.Context, key common.Hash, dbTx pgx.Tx) (types.ArgBytes, error) {
const getOffchainDataSQL = `
SELECT value
FROM data_node.offchain_data
WHERE key = $1 LIMIT 1;
`
var (
valueStr string
hexValue string
)

if err := dbTx.QueryRow(ctx, getOffchainDataSQL, key.Hex()).Scan(&valueStr); err != nil {
if err := dbTx.QueryRow(ctx, getOffchainDataSQL, key.Hex()).Scan(&hexValue); err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil, state.ErrStateNotSynchronized
}
return nil, err
}
return common.Hex2Bytes(valueStr), nil
return common.FromHex(hexValue), nil
}

// Exists checks if a key exists in offchain data table
func (db *DB) Exists(ctx context.Context, key common.Hash) bool {
var keyExists = "SELECT COUNT(*) FROM data_node.offchain_data WHERE key = $1"
var (
count uint
)

if err := db.pg.QueryRow(ctx, keyExists, key.Hex()).Scan(&count); err != nil {
return false
}
return count > 0
}

// GetLastProcessedBlock returns the latest block successfully processed by the synchronizer
func (db *DB) GetLastProcessedBlock(ctx context.Context) (uint64, error) {
const getLastProcessedBlockSQL = "SELECT max(block) FROM data_node.sync_info;"
var (
lastBlock uint64
)

if err := db.pg.QueryRow(ctx, getLastProcessedBlockSQL).Scan(&lastBlock); err != nil {
return 0, err
}
return lastBlock, nil
}

// ResetLastProcessedBlock removes all sync_info for blocks greater than `block`
func (db *DB) ResetLastProcessedBlock(ctx context.Context, block uint64) (uint64, error) {
const resetLastProcessedBlock = "DELETE FROM data_node.sync_info WHERE block > $1"
var (
ct pgconn.CommandTag
err error
)
if ct, err = db.pg.Exec(ctx, resetLastProcessedBlock, block); err != nil {
return 0, err
}
return uint64(ct.RowsAffected()), nil
}

// StoreLastProcessedBlock stores a record of a block processed by the synchronizer
func (db *DB) StoreLastProcessedBlock(ctx context.Context, block uint64, dbTx pgx.Tx) error {
const storeLastProcessedBlockSQL = `
INSERT INTO data_node.sync_info (block)
VALUES ($1)
ON CONFLICT (block) DO UPDATE
SET processed = NOW();
`

if _, err := dbTx.Exec(ctx, storeLastProcessedBlockSQL, block); err != nil {
return err
}
return nil
}
12 changes: 12 additions & 0 deletions db/migrations/0002.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- +migrate Down
DROP TABLE IF EXISTS data_node.sync_info CASCADE;

-- +migrate Up
CREATE TABLE data_node.sync_info
(
block BIGINT PRIMARY KEY,
processed TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);

-- init with the genesis block
INSERT INTO data_node.sync_info (block) VALUES (0);
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ require (
github.com/rubenv/sql-migrate v1.4.0
github.com/spf13/viper v1.16.0
github.com/stretchr/testify v1.8.4
github.com/umbracle/ethgo v0.1.4-0.20230712173909-df37dddf16f0
github.com/urfave/cli/v2 v2.25.5
)

require (
github.com/DataDog/zstd v1.5.2 // indirect
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 // indirect
github.com/VictoriaMetrics/fastcache v1.6.0 // indirect
github.com/andybalholm/brotli v1.0.4 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
Expand Down Expand Up @@ -99,8 +101,9 @@ require (
github.com/tklauser/go-sysconf v0.3.10 // indirect
github.com/tklauser/numcpus v0.4.0 // indirect
github.com/tyler-smith/go-bip39 v1.1.0 // indirect
github.com/umbracle/ethgo v0.1.3 // indirect
github.com/umbracle/fastrlp v0.0.0-20220527094140-59d5dd30e722 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.40.0 // indirect
github.com/valyala/fastjson v1.4.1 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
go.uber.org/atomic v1.9.0 // indirect
Expand Down
Loading

0 comments on commit e1f21ce

Please sign in to comment.