Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce indexer module #285

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ jobs:

ADDITIONAL_MODULES=(
"${GITHUB_WORKSPACE}/external/geth"
"${GITHUB_WORKSPACE}/indexer"
)

ALL_MODULES=$(printf "%s\n" "${WORKSPACE_MODULES}" "${ADDITIONAL_MODULES[@]}")
Expand Down
1 change: 1 addition & 0 deletions indexer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# indexer
387 changes: 387 additions & 0 deletions indexer/cmd/indexer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,387 @@
package main

import (
"context"
"encoding/hex"
"fmt"
"log/slog"
"math/big"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
kant777 marked this conversation as resolved.
Show resolved Hide resolved
"github.com/primev/mev-commit/indexer/pkg/ethclient"
"github.com/primev/mev-commit/indexer/pkg/store"
)

const TimeLayOut = "2006-01-02T15:04:05.000Z"

type Config struct {
EthClient ethclient.EthereumClient
Storage store.Storage
IndexInterval time.Duration
AccountAddresses []string
MinBlocksToFetchAccountAddresses uint
TimeoutToFetchAccountAddresses time.Duration
}

type BlockchainIndexer struct {
ethClient ethclient.EthereumClient
storage store.Storage
forwardBlockChan chan *types.Block
backwardBlockChan chan *types.Block
txChan chan *types.Transaction
indexInterval time.Duration
lastForwardIndexedBlock *big.Int
lastBackwardIndexedBlock *big.Int
logger *slog.Logger
accountAddresses []string
blockCounter uint
minBlocksToFetchAccountAddresses uint
timeoutToFetchAccountAddresses time.Duration
}

func NewBlockchainIndexer(config Config) *BlockchainIndexer {
return &BlockchainIndexer{
ethClient: config.EthClient,
storage: config.Storage,
forwardBlockChan: make(chan *types.Block, 100),
backwardBlockChan: make(chan *types.Block, 100),
txChan: make(chan *types.Transaction, 100),
indexInterval: config.IndexInterval,
logger: slog.Default(),
accountAddresses: config.AccountAddresses,
blockCounter: 0,
minBlocksToFetchAccountAddresses: config.MinBlocksToFetchAccountAddresses,
timeoutToFetchAccountAddresses: config.TimeoutToFetchAccountAddresses,
}
}

func (bi *BlockchainIndexer) Start(ctx context.Context) error {
if err := bi.storage.CreateIndices(ctx); err != nil {
return fmt.Errorf("failed to create indices: %w", err)
}

latestBlockNumber, err := bi.ethClient.BlockNumber(ctx)
bi.logger.Info("latest block number", "block number", latestBlockNumber)
if err != nil {
return fmt.Errorf("failed to get latest block number: %w", err)
}

if err = bi.initializeForwardIndex(ctx, latestBlockNumber.Uint64()); err != nil {
return err
}

if err = bi.initializeBackwardIndex(ctx, latestBlockNumber.Uint64()); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latestBlockNumber should be passed as big.Int, otherwise IsUint64() should be used before converting to Uint64().

return err
}

go bi.fetchForwardBlocks(ctx)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why the fetch and store blocks run asynchronously and are connected by a buffered channel if they are mapped 1:1. If one of them is slower/faster, the buffer channel will only help for a while. I would suggest doing block fetching and storing in one loop under one gorutine.

go bi.processForwardBlocks(ctx)
go bi.fetchBackwardBlocks(ctx)
go bi.processBackwardBlocks(ctx)
go bi.IndexAccountBalances(ctx)

<-ctx.Done()
return ctx.Err()
}

func (bi *BlockchainIndexer) initializeForwardIndex(ctx context.Context, latestBlockNumber uint64) error {
lastForwardIndexedBlock, err := bi.storage.GetLastIndexedBlock(ctx, "forward")
if err != nil {
return fmt.Errorf("failed to get last forward indexed block: %w", err)
}

bi.logger.Info("last indexed block", "blockNumber", lastForwardIndexedBlock, "direction", "forward")

if lastForwardIndexedBlock == nil || lastForwardIndexedBlock.Sign() == 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the lastForwardIndexedBlock == nil check needed here?

bi.lastForwardIndexedBlock = new(big.Int).SetUint64(latestBlockNumber - 1)
} else {
bi.lastForwardIndexedBlock = lastForwardIndexedBlock
}

return nil
}

func (bi *BlockchainIndexer) initializeBackwardIndex(ctx context.Context, latestBlockNumber uint64) error {
lastBackwardIndexedBlock, err := bi.storage.GetLastIndexedBlock(ctx, "backward")
if err != nil {
return fmt.Errorf("failed to get last backward indexed block: %w", err)
}

bi.logger.Info("last indexed block", "blockNumber", lastBackwardIndexedBlock, "direction", "backward")

if lastBackwardIndexedBlock == nil || lastBackwardIndexedBlock.Sign() == 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the lastBackwardIndexedBlock == nil check needed here?

bi.lastBackwardIndexedBlock = new(big.Int).SetUint64(latestBlockNumber)
} else {
bi.lastBackwardIndexedBlock = lastBackwardIndexedBlock
}

return nil
}

func (bi *BlockchainIndexer) fetchForwardBlocks(ctx context.Context) {
ticker := time.NewTicker(bi.indexInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
latestBlockNumber, err := bi.ethClient.BlockNumber(ctx)
if err != nil {
bi.logger.Error("failed to get latest block number", "error", err)
continue
}

for blockNum := new(big.Int).Add(bi.lastForwardIndexedBlock, big.NewInt(1)); blockNum.Cmp(latestBlockNumber) <= 0; blockNum.Add(blockNum, big.NewInt(5)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we improve readability of this for loop?

endBlockNum := new(big.Int).Add(blockNum, big.NewInt(4))
if endBlockNum.Cmp(latestBlockNumber) > 0 {
endBlockNum.Set(latestBlockNumber)
}

var blockNums []*big.Int
for bn := new(big.Int).Set(blockNum); bn.Cmp(endBlockNum) <= 0; bn.Add(bn, big.NewInt(1)) {
blockNums = append(blockNums, new(big.Int).Set(bn))
}

blocks, err := bi.fetchBlocks(ctx, blockNums)
if err != nil {
bi.logger.Error("failed to fetch blocks", "start", blockNum, "end", endBlockNum, "error", err)
continue
}

for _, block := range blocks {
bi.forwardBlockChan <- block
bi.lastForwardIndexedBlock.Set(block.Number())
bi.blockCounter++
}
}
}
}
}

func (bi *BlockchainIndexer) fetchBackwardBlocks(ctx context.Context) {
ticker := time.NewTicker(bi.indexInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if bi.lastBackwardIndexedBlock.Sign() <= 0 {
return
}
zeroBigNum := big.NewInt(0)
blockNum := new(big.Int).Sub(bi.lastBackwardIndexedBlock, big.NewInt(1))

for i := 0; blockNum.Cmp(zeroBigNum) >= 0; i++ {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the for loop i variable needed?

endBlockNum := new(big.Int).Sub(blockNum, big.NewInt(4))
if endBlockNum.Cmp(zeroBigNum) < 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comparison can be simplified with using the Sign() method instead.

endBlockNum.Set(zeroBigNum)
}

var blockNums []*big.Int
for bn := new(big.Int).Set(blockNum); bn.Cmp(endBlockNum) >= 0; bn.Sub(bn, big.NewInt(1)) {
blockNums = append(blockNums, new(big.Int).Set(bn))
}

blocks, err := bi.fetchBlocks(ctx, blockNums)
if err != nil {
bi.logger.Error("failed to fetch blocks", "start", blockNum, "end", endBlockNum, "error", err)
break
}

for _, block := range blocks {
bi.backwardBlockChan <- block
bi.lastBackwardIndexedBlock.Set(block.Number())
if block.Number().Cmp(zeroBigNum) == 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comparison can be simplified with using the Sign() method instead.

bi.logger.Info("done fetching backward blocks...")
return
}
}
blockNum.Sub(endBlockNum, big.NewInt(1))
}
}
}
}

func (bi *BlockchainIndexer) processForwardBlocks(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case block := <-bi.forwardBlockChan:
if err := bi.indexBlock(ctx, block); err != nil {
bi.logger.Error("failed to index block", "error", err)
}
if err := bi.indexTransactions(ctx, block); err != nil {
bi.logger.Error("failed to index transactions", "error", err)
}
}
}
}

func (bi *BlockchainIndexer) processBackwardBlocks(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case block := <-bi.backwardBlockChan:
if err := bi.indexBlock(ctx, block); err != nil {
bi.logger.Error("failed to index block", "error", err)
}
if err := bi.indexTransactions(ctx, block); err != nil {
bi.logger.Error("failed to index transactions", "error", err)
}
if block.Number().Cmp(big.NewInt(0)) == 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comparison can be simplified with using the Sign() method instead.

bi.logger.Info("done processing backward blocks...")
return
}
}
}
}

func (bi *BlockchainIndexer) IndexAccountBalances(ctx context.Context) {
timer := time.NewTimer(bi.timeoutToFetchAccountAddresses)
defer timer.Stop()

for {
select {
case <-ctx.Done():
return
case <-timer.C:
if err := bi.indexBalances(ctx, 0); err != nil {
return
}
bi.blockCounter = 0
timer.Reset(bi.timeoutToFetchAccountAddresses)
default:
if bi.blockCounter >= bi.minBlocksToFetchAccountAddresses {
if err := bi.indexBalances(ctx, bi.lastForwardIndexedBlock.Uint64()); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bi.lastForwardIndexedBlock should be passed as big.Int, otherwise IsUint64() should be used before converting to Uint64().

return
}
bi.blockCounter = 0
timer.Reset(bi.timeoutToFetchAccountAddresses)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the timer reset here?

}
}
}
}

func (bi *BlockchainIndexer) indexBalances(ctx context.Context, blockNumber uint64) error {
addresses, err := bi.storage.GetAddresses(ctx)
if err != nil {
return err
}

addresses = append(addresses, bi.accountAddresses...)

addrs := make([]common.Address, len(addresses))
for i, address := range addresses {
addrs[i] = common.HexToAddress(address)
}

accBalances, err := bi.ethClient.AccountBalances(ctx, addrs, blockNumber)
if err != nil {
return err
}

return bi.storage.IndexAccountBalances(ctx, accBalances)
}

func (bi *BlockchainIndexer) indexBlock(ctx context.Context, block *types.Block) error {
timestamp := time.UnixMilli(int64(block.Time())).UTC().Format(TimeLayOut)
indexBlock := &store.IndexBlock{
Number: block.NumberU64(),
Hash: block.Hash().Hex(),
ParentHash: block.ParentHash().Hex(),
Root: block.Root().Hex(),
Nonce: block.Nonce(),
Timestamp: timestamp,
Transactions: len(block.Transactions()),
BaseFee: block.BaseFee().Uint64(),
GasLimit: block.GasLimit(),
GasUsed: block.GasUsed(),
Difficulty: block.Difficulty().Uint64(),
ExtraData: hex.EncodeToString(block.Extra()),
}

return bi.storage.IndexBlock(ctx, indexBlock)
}

func (bi *BlockchainIndexer) indexTransactions(ctx context.Context, block *types.Block) error {
var transactions []*store.IndexTransaction
var txHashes []string

for _, tx := range block.Transactions() {
from, err := types.Sender(types.NewCancunSigner(tx.ChainId()), tx)
if err != nil {
return fmt.Errorf("failed to derive sender: %w", err)
}

v, r, s := tx.RawSignatureValues()
timestamp := tx.Time().UTC().Format(TimeLayOut)
transaction := &store.IndexTransaction{
Hash: tx.Hash().Hex(),
From: from.Hex(),
Gas: tx.Gas(),
Nonce: tx.Nonce(),
BlockHash: block.Hash().Hex(),
BlockNumber: block.NumberU64(),
ChainId: tx.ChainId().String(),
V: v.String(),
kant777 marked this conversation as resolved.
Show resolved Hide resolved
R: r.String(),
S: s.String(),
Input: hex.EncodeToString(tx.Data()),
Timestamp: timestamp,
}

if tx.To() != nil {
transaction.To = tx.To().Hex()
}
if tx.GasPrice() != nil {
transaction.GasPrice = tx.GasPrice().Uint64()
}
if tx.GasTipCap() != nil {
transaction.GasTipCap = tx.GasTipCap().Uint64()
}
if tx.GasFeeCap() != nil {
transaction.GasFeeCap = tx.GasFeeCap().Uint64()
}
if tx.Value() != nil {
transaction.Value = tx.Value().String()
}

transactions = append(transactions, transaction)
txHashes = append(txHashes, tx.Hash().Hex())
}

receipts, err := bi.fetchReceipts(ctx, txHashes)
if err != nil {
return fmt.Errorf("failed to fetch transaction receipts: %w", err)
}

for _, tx := range transactions {
if receipt, ok := receipts[tx.Hash]; ok {
tx.Status = receipt.Status
tx.GasUsed = receipt.GasUsed
tx.CumulativeGasUsed = receipt.CumulativeGasUsed
tx.ContractAddress = receipt.ContractAddress.Hex()
tx.TransactionIndex = receipt.TransactionIndex
tx.ReceiptBlockHash = receipt.BlockHash.Hex()
tx.ReceiptBlockNumber = receipt.BlockNumber.Uint64()
}
}

return bi.storage.IndexTransactions(ctx, transactions)
}

func (bi *BlockchainIndexer) fetchReceipts(ctx context.Context, txHashes []string) (map[string]*types.Receipt, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest removing these methods, as they are just wrappers for synonymous bi.ethClient methods with no added value.

return bi.ethClient.TxReceipts(ctx, txHashes)
}

func (bi *BlockchainIndexer) fetchBlocks(ctx context.Context, blockNums []*big.Int) ([]*types.Block, error) {
return bi.ethClient.GetBlocks(ctx, blockNums)
}
Loading
Loading