Skip to content

Commit

Permalink
feat: e2e privacy integration
Browse files Browse the repository at this point in the history
  • Loading branch information
Alok committed Apr 11, 2024
1 parent dbae325 commit a4379d2
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 88 deletions.
37 changes: 23 additions & 14 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,13 @@ var (
Value: contracts.TestnetContracts.PreconfCommitmentStore,
})

optionBlockTrackerContractAddr = altsrc.NewStringFlag(&cli.StringFlag{
Name: "blocktracker-contract-addr",
Usage: "address of the block tracker contract",
EnvVars: []string{"MEV_ORACLE_BLOCKTRACKER_CONTRACT_ADDR"},
//Value: contracts.TestnetContracts.BlockTracker,
})

optionPgHost = altsrc.NewStringFlag(&cli.StringFlag{
Name: "pg-host",
Usage: "PostgreSQL host",
Expand Down Expand Up @@ -196,6 +203,7 @@ func main() {
optionSettlementRPCUrl,
optionOracleContractAddr,
optionPreconfContractAddr,
optionBlockTrackerContractAddr,
optionPgHost,
optionPgPort,
optionPgUser,
Expand Down Expand Up @@ -264,20 +272,21 @@ func launchOracleWithConfig(c *cli.Context) error {
logger.Info("key signer account", "address", keySigner.GetAddress().Hex(), "url", keySigner.String())

nd, err := node.NewNode(&node.Options{
Logger: logger,
KeySigner: keySigner,
HTTPPort: c.Int(optionHTTPPort.Name),
L1RPCUrl: c.String(optionL1RPCUrl.Name),
SettlementRPCUrl: c.String(optionSettlementRPCUrl.Name),
OracleContractAddr: common.HexToAddress(c.String(optionOracleContractAddr.Name)),
PreconfContractAddr: common.HexToAddress(c.String(optionPreconfContractAddr.Name)),
PgHost: c.String(optionPgHost.Name),
PgPort: c.Int(optionPgPort.Name),
PgUser: c.String(optionPgUser.Name),
PgPassword: c.String(optionPgPassword.Name),
PgDbname: c.String(optionPgDbname.Name),
LaggerdMode: c.Int(optionLaggerdMode.Name),
OverrideWinners: c.StringSlice(optionOverrideWinners.Name),
Logger: logger,
KeySigner: keySigner,
HTTPPort: c.Int(optionHTTPPort.Name),
L1RPCUrl: c.String(optionL1RPCUrl.Name),
SettlementRPCUrl: c.String(optionSettlementRPCUrl.Name),
OracleContractAddr: common.HexToAddress(c.String(optionOracleContractAddr.Name)),
PreconfContractAddr: common.HexToAddress(c.String(optionPreconfContractAddr.Name)),
BlockTrackerContractAddr: common.HexToAddress(c.String(optionBlockTrackerContractAddr.Name)),
PgHost: c.String(optionPgHost.Name),
PgPort: c.Int(optionPgPort.Name),
PgUser: c.String(optionPgUser.Name),
PgPassword: c.String(optionPgPassword.Name),
PgDbname: c.String(optionPgDbname.Name),
LaggerdMode: c.Int(optionLaggerdMode.Name),
OverrideWinners: c.StringSlice(optionOverrideWinners.Name),
})
if err != nil {
return fmt.Errorf("failed starting node: %w", err)
Expand Down
55 changes: 31 additions & 24 deletions pkg/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log/slog"
"math/big"
"sync"
"time"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
Expand All @@ -16,7 +17,8 @@ import (

// EVMClient is an interface for interacting with an Ethereum client for event subscription.
type EVMClient interface {
SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error)
BlockNumber(ctx context.Context) (uint64, error)
FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error)
}

// ProgressStore is an interface for storing the last block number processed by the event listener.
Expand Down Expand Up @@ -234,38 +236,43 @@ func (l *Listener) Start(ctx context.Context) <-chan struct{} {
contracts = append(contracts, addr)
}

q := ethereum.FilterQuery{
FromBlock: big.NewInt(int64(lastBlock + 1)),
ToBlock: nil,
Addresses: contracts,
}

logChan := make(chan types.Log)

sub, err := l.evmClient.SubscribeFilterLogs(ctx, q, logChan)
if err != nil {
l.logger.Error("failed to subscribe to logs", "error", err)
return
}
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
sub.Unsubscribe()
return
case err := <-sub.Err():
l.logger.Error("subscription error", "error", err)
return
case logMsg := <-logChan:
// process log
l.publishLogEvent(ctx, logMsg)
case <-ticker.C:
blockNumber, err := l.evmClient.BlockNumber(ctx)
if err != nil {
l.logger.Error("failed to get block number", "error", err)
return
}

if blockNumber > lastBlock {
q := ethereum.FilterQuery{
FromBlock: big.NewInt(int64(lastBlock + 1)),
ToBlock: big.NewInt(int64(blockNumber)),
Addresses: contracts,
}

logs, err := l.evmClient.FilterLogs(ctx, q)
if err != nil {
l.logger.Error("failed to filter logs", "error", err)
return
}

for _, logMsg := range logs {
// process log
l.publishLogEvent(ctx, logMsg)
}

if logMsg.BlockNumber > lastBlock {
if err := l.progressStore.SetLastBlock(logMsg.BlockNumber); err != nil {
if err := l.progressStore.SetLastBlock(blockNumber); err != nil {
l.logger.Error("failed to set last block", "error", err)
return
}
lastBlock = logMsg.BlockNumber
lastBlock = blockNumber
}
}
}
Expand Down
146 changes: 96 additions & 50 deletions pkg/events/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,27 +88,44 @@ func TestEventHandler(t *testing.T) {
func TestEventManager(t *testing.T) {
t.Parallel()

b := bidderregistry.BidderregistryBidderRegistered{
Bidder: common.HexToAddress("0xabcd"),
PrepaidAmount: big.NewInt(1000),
WindowNumber: big.NewInt(99),
bidders := []bidderregistry.BidderregistryBidderRegistered{
{
Bidder: common.HexToAddress("0xabcd"),
PrepaidAmount: big.NewInt(1000),
WindowNumber: big.NewInt(99),
},
{
Bidder: common.HexToAddress("0xcdef"),
PrepaidAmount: big.NewInt(2000),
WindowNumber: big.NewInt(100),
},
}

handlerTriggered := make(chan struct{})
count := 0

handlerTriggered1 := make(chan struct{})
handlerTriggered2 := make(chan struct{})

evtHdlr := events.NewEventHandler(
"BidderRegistered",
func(ev *bidderregistry.BidderregistryBidderRegistered) error {
defer close(handlerTriggered)

if ev.Bidder.Hex() != b.Bidder.Hex() {
return fmt.Errorf("expected bidder %s, got %s", b.Bidder.Hex(), ev.Bidder.Hex())
if count >= len(bidders) {
return fmt.Errorf("unexpected event")
}
if ev.PrepaidAmount.Cmp(b.PrepaidAmount) != 0 {
return fmt.Errorf("expected prepaid amount %d, got %d", b.PrepaidAmount, ev.PrepaidAmount)
if ev.Bidder.Hex() != bidders[count].Bidder.Hex() {
return fmt.Errorf("expected bidder %s, got %s", bidders[count].Bidder.Hex(), ev.Bidder.Hex())
}
if ev.WindowNumber.Cmp(b.WindowNumber) != 0 {
return fmt.Errorf("expected window number %d, got %d", b.WindowNumber, ev.WindowNumber)
if ev.PrepaidAmount.Cmp(bidders[count].PrepaidAmount) != 0 {
return fmt.Errorf("expected prepaid amount %d, got %d", bidders[count].PrepaidAmount, ev.PrepaidAmount)
}
if ev.WindowNumber.Cmp(bidders[count].WindowNumber) != 0 {
return fmt.Errorf("expected window number %d, got %d", bidders[count].WindowNumber, ev.WindowNumber)
}
count++
if count == 1 {
close(handlerTriggered1)
} else {
close(handlerTriggered2)
}
return nil
},
Expand All @@ -119,10 +136,40 @@ func TestEventManager(t *testing.T) {
t.Fatal(err)
}

data1, err := bidderABI.Events["BidderRegistered"].Inputs.NonIndexed().Pack(
bidders[0].PrepaidAmount,
bidders[0].WindowNumber,
)
if err != nil {
t.Fatal(err)
}

data2, err := bidderABI.Events["BidderRegistered"].Inputs.NonIndexed().Pack(
bidders[1].PrepaidAmount,
bidders[1].WindowNumber,
)
if err != nil {
t.Fatal(err)
}

evmClient := &testEVMClient{
logsSub: make(chan struct{}),
sub: &testSub{
errC: make(chan error),
logs: []types.Log{
{
Topics: []common.Hash{
bidderABI.Events["BidderRegistered"].ID,
common.HexToHash(bidders[0].Bidder.Hex()),
},
Data: data1,
BlockNumber: 1,
},
{
Topics: []common.Hash{
bidderABI.Events["BidderRegistered"].ID,
common.HexToHash(bidders[1].Bidder.Hex()),
},
Data: data2,
BlockNumber: 2,
},
},
}

Expand All @@ -149,34 +196,22 @@ func TestEventManager(t *testing.T) {

defer sub.Unsubscribe()

<-evmClient.logsSub

data, err := bidderABI.Events["BidderRegistered"].Inputs.NonIndexed().Pack(
b.PrepaidAmount,
b.WindowNumber,
)
if err != nil {
t.Fatal(err)
}
evmClient.SetBlockNumber(1)

evmClient.logs <- types.Log{
Topics: []common.Hash{
bidderABI.Events["BidderRegistered"].ID,
common.HexToHash(b.Bidder.Hex()),
},
Data: data,
BlockNumber: 1,
select {
case <-handlerTriggered1:
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for handler to be triggered")
}

evmClient.SetBlockNumber(2)
select {
case <-handlerTriggered:
case err := <-sub.Err():
t.Fatal("handler not triggered", err)
case <-handlerTriggered2:
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for handler to be triggered")
}

if b, err := store.LastBlock(); err != nil || b != 1 {
if b, err := store.LastBlock(); err != nil || b != 2 {
t.Fatalf("expected block number 1, got %d", store.blockNumber)
}

Expand All @@ -185,29 +220,40 @@ func TestEventManager(t *testing.T) {
}

type testEVMClient struct {
logs chan<- types.Log
logsSub chan struct{}
sub *testSub
mu sync.Mutex
blockNum uint64
logs []types.Log
}

type testSub struct {
errC chan error
func (t *testEVMClient) SetBlockNumber(blockNum uint64) {
t.mu.Lock()
defer t.mu.Unlock()

t.blockNum = blockNum
}

func (t *testSub) Unsubscribe() {}
func (t *testEVMClient) BlockNumber(context.Context) (uint64, error) {
t.mu.Lock()
defer t.mu.Unlock()

func (t *testSub) Err() <-chan error {
return t.errC
return t.blockNum, nil
}

func (t *testEVMClient) SubscribeFilterLogs(
func (t *testEVMClient) FilterLogs(
ctx context.Context,
q ethereum.FilterQuery,
ch chan<- types.Log,
) (ethereum.Subscription, error) {
defer close(t.logsSub)
t.logs = ch
return t.sub, nil
) ([]types.Log, error) {
t.mu.Lock()
defer t.mu.Unlock()

logs := make([]types.Log, 0, len(t.logs))
for _, log := range t.logs {
if log.BlockNumber >= q.FromBlock.Uint64() && log.BlockNumber <= q.ToBlock.Uint64() {
logs = append(logs, log)
}
}

return logs, nil
}

type testStore struct {
Expand Down

0 comments on commit a4379d2

Please sign in to comment.