Skip to content

Commit

Permalink
feature: Support for multiple endpoints by chainID (#189)
Browse files Browse the repository at this point in the history
* new web3 endpoint pool to handle multiple web3 endpoint by chainID balancing between them
* deeper integration of multiple endpoints implementing bind.ContractBackend interface
* limit every web3 client context timeout to 2 seconds to keep the pool healty
* support retries in every web3 client method
* token holder scan parallelized, holder providers simplifyed, fixing GetEndpoint web3Pool
  • Loading branch information
lucasmenendez authored Apr 11, 2024
1 parent e434723 commit 4de4c85
Show file tree
Hide file tree
Showing 16 changed files with 886 additions and 458 deletions.
6 changes: 3 additions & 3 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Census3APIConf struct {
Port int
DataDir string
GroupKey string
Web3Providers web3.NetworkEndpoints
Web3Providers *web3.Web3Pool
HolderProviders map[uint64]providers.HolderProvider
AdminToken string
}
Expand All @@ -49,7 +49,7 @@ type census3API struct {
endpoint *api.API
censusDB *censusdb.CensusDB
queue *queue.BackgroundQueue
w3p web3.NetworkEndpoints
w3p *web3.Web3Pool
storage storagelayer.Storage
downloader *downloader.Downloader
holderProviders map[uint64]providers.HolderProvider
Expand Down Expand Up @@ -146,7 +146,7 @@ func (capi *census3API) getAPIInfo(msg *api.APIdata, ctx *httprouter.HTTPContext
info := &APIInfo{
SupportedChains: []SupportedChain{},
}
for _, provider := range capi.w3p {
for _, provider := range capi.w3p.SupportedNetworks() {
info.SupportedChains = append(info.SupportedChains, SupportedChain{
ChainID: provider.ChainID,
ShortName: provider.ShortName,
Expand Down
100 changes: 30 additions & 70 deletions cmd/census3/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ import (
"github.com/vocdoni/census3/db"
"github.com/vocdoni/census3/internal"
"github.com/vocdoni/census3/scanner"
"github.com/vocdoni/census3/scanner/providers"
"github.com/vocdoni/census3/scanner/providers/farcaster"
"github.com/vocdoni/census3/scanner/providers/gitcoin"
gitcoinDB "github.com/vocdoni/census3/scanner/providers/gitcoin/db"
"github.com/vocdoni/census3/scanner/providers/manager"
"github.com/vocdoni/census3/scanner/providers/poap"
"github.com/vocdoni/census3/scanner/providers/web3"
"go.vocdoni.io/dvote/log"
Expand All @@ -33,6 +33,7 @@ type Census3Config struct {
poapAPIEndpoint, poapAuthToken string
gitcoinEndpoint string
gitcoinCooldown time.Duration
scannerConcurrentTokens int
scannerCoolDown time.Duration
adminToken string
initialTokens string
Expand Down Expand Up @@ -60,6 +61,7 @@ func main() {
var strWeb3Providers string
flag.StringVar(&strWeb3Providers, "web3Providers", "", "the list of URL's of available web3 providers")
flag.DurationVar(&config.scannerCoolDown, "scannerCoolDown", 120*time.Second, "the time to wait before next scanner iteration")
flag.IntVar(&config.scannerConcurrentTokens, "scannerConcurrentTokens", 5, "the number of tokens to scan concurrently")
flag.StringVar(&config.adminToken, "adminToken", "", "the admin UUID token for the API")
flag.StringVar(&config.initialTokens, "initialTokens", "", "path of the initial tokens json file")
flag.BoolVar(&config.farcaster, "farcaster", false, "enables farcaster support")
Expand Down Expand Up @@ -112,6 +114,10 @@ func main() {
panic(err)
}
config.listOfWeb3Providers = strings.Split(pviper.GetString("web3Providers"), ",")
if err := pviper.BindPFlag("scannerConcurrentTokens", flag.Lookup("scannerConcurrentTokens")); err != nil {
panic(err)
}
config.scannerConcurrentTokens = pviper.GetInt("scannerConcurrentTokens")
if err := pviper.BindPFlag("scannerCoolDown", flag.Lookup("scannerCoolDown")); err != nil {
panic(err)
}
Expand All @@ -135,84 +141,45 @@ func main() {
log.Fatal("no web3 providers defined")
}
// check if the web3 providers are valid
w3p, err := web3.InitNetworkEndpoints(config.listOfWeb3Providers)
w3p, err := web3.NewWeb3Pool()
if err != nil {
log.Fatal(err)
}
for _, uri := range config.listOfWeb3Providers {
if err := w3p.AddEndpoint(uri); err != nil {
log.Fatal(err)
}
}
// init the database
database, err := db.Init(config.dataDir, "census3.sql")
if err != nil {
log.Fatal(err)
}

// start the holder scanner with the database and the providers
hc := scanner.NewScanner(database, w3p, config.scannerCoolDown)

// init the provider manager
pm := manager.NewProviderManager()
// init the web3 token providers
erc20Provider := new(web3.ERC20HolderProvider)
if err := erc20Provider.Init(web3.Web3ProviderConfig{Endpoints: w3p}); err != nil {
log.Fatal(err)
return
}
erc721Provider := new(web3.ERC721HolderProvider)
if err := erc721Provider.Init(web3.Web3ProviderConfig{Endpoints: w3p}); err != nil {
log.Fatal(err)
return
}
erc777Provider := new(web3.ERC777HolderProvider)
if err := erc777Provider.Init(web3.Web3ProviderConfig{Endpoints: w3p}); err != nil {
log.Fatal(err)
return
}

// set the providers in the scanner and the API
if err := hc.SetProviders(erc20Provider, erc721Provider, erc777Provider); err != nil {
log.Fatal(err)
return
}
apiProviders := map[uint64]providers.HolderProvider{
erc20Provider.Type(): erc20Provider,
erc721Provider.Type(): erc721Provider,
erc777Provider.Type(): erc777Provider,
}
web3ProviderConf := web3.Web3ProviderConfig{Endpoints: w3p}
pm.AddProvider(new(web3.ERC20HolderProvider).Type(), web3ProviderConf)
pm.AddProvider(new(web3.ERC721HolderProvider).Type(), web3ProviderConf)
pm.AddProvider(new(web3.ERC777HolderProvider).Type(), web3ProviderConf)
// init POAP external provider
if config.poapAPIEndpoint != "" {
poapProvider := new(poap.POAPHolderProvider)
if err := poapProvider.Init(poap.POAPConfig{
pm.AddProvider(new(poap.POAPHolderProvider).Type(), poap.POAPConfig{
APIEndpoint: config.poapAPIEndpoint,
AccessToken: config.poapAuthToken,
}); err != nil {
log.Fatal(err)
return
}
if err := hc.SetProviders(poapProvider); err != nil {
log.Fatal(err)
return
}
apiProviders[poapProvider.Type()] = poapProvider
})
}
if config.gitcoinEndpoint != "" {
gitcoinDatabase, err := gitcoinDB.Init(config.dataDir, "gitcoinpassport.sql")
if err != nil {
log.Fatal(err)
}
// init Gitcoin external provider
gitcoinProvider := new(gitcoin.GitcoinPassport)
if err := gitcoinProvider.Init(gitcoin.GitcoinPassportConf{
pm.AddProvider(new(gitcoin.GitcoinPassport).Type(), gitcoin.GitcoinPassportConf{
APIEndpoint: config.gitcoinEndpoint,
Cooldown: config.gitcoinCooldown,
DB: gitcoinDatabase,
}); err != nil {
log.Fatal(err)
return
}
if err := hc.SetProviders(gitcoinProvider); err != nil {
log.Fatal(err)
return
}
apiProviders[gitcoinProvider.Type()] = gitcoinProvider
})
}

// if farcaster is enabled, init the farcaster database and the provider
var farcasterDB *farcaster.DB
if config.farcaster {
Expand All @@ -221,21 +188,13 @@ func main() {
if err != nil {
log.Fatal(err)
}
farcasterProvider := new(farcaster.FarcasterProvider)
if err := farcasterProvider.Init(farcaster.FarcasterProviderConf{
pm.AddProvider(new(farcaster.FarcasterProvider).Type(), farcaster.FarcasterProviderConf{
Endpoints: w3p,
DB: farcasterDB,
}); err != nil {
log.Fatal(err)
return
}
if err := hc.SetProviders(farcasterProvider); err != nil {
log.Fatal(err)
return
}
apiProviders[farcasterProvider.Type()] = farcasterProvider
})
}

// start the holder scanner with the database and the provider manager
hc := scanner.NewScanner(database, w3p, pm, config.scannerCoolDown)
// if the admin token is not defined, generate a random one
if config.adminToken != "" {
if _, err := uuid.Parse(config.adminToken); err != nil {
Expand All @@ -254,7 +213,7 @@ func main() {
DataDir: config.dataDir,
Web3Providers: w3p,
GroupKey: config.connectKey,
HolderProviders: apiProviders,
HolderProviders: pm.Providers(),
AdminToken: config.adminToken,
})
if err != nil {
Expand All @@ -267,7 +226,8 @@ func main() {
}
log.Info("initial tokens created, or at least tried to")
}()
go hc.Start(ctx)
// start the holder scanner
go hc.Start(ctx, config.scannerConcurrentTokens)

metrics.NewCounter(fmt.Sprintf("census3_info{version=%q,chains=%q}",
internal.Version, w3p.String())).Set(1)
Expand Down
9 changes: 2 additions & 7 deletions scanner/providers/farcaster/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,9 @@ func (p *FarcasterProvider) Init(iconf any) error {
}
p.contracts.lastBlock.Store(uint64(lastBlock))
// init the web3 client and contracts
currentEndpoint, exists := p.endpoints.EndpointByChainID(ChainID)
if !exists {
return errors.New("endpoint not found for the given chainID")
}
// connect to the endpoint and set the client
p.client, err = currentEndpoint.GetClient(web3.DefaultMaxWeb3ClientRetries)
p.client, err = p.endpoints.Client(ChainID)
if err != nil {
return errors.Join(web3.ErrConnectingToWeb3Client, fmt.Errorf("[FARCASTER]: %w", err))
return errors.Join(web3.ErrConnectingToWeb3Client, fmt.Errorf("[FARCASTER]: error getting web3 client: %w", err))
}
// parse the addresses and initialize the contracts
idRegistryAddress := common.HexToAddress(IdRegistryAddress)
Expand Down
7 changes: 3 additions & 4 deletions scanner/providers/farcaster/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@ import (
"sync/atomic"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
fcir "github.com/vocdoni/census3/contracts/farcaster/idRegistry"
fckr "github.com/vocdoni/census3/contracts/farcaster/keyRegistry"
"github.com/vocdoni/census3/scanner/providers/web3"
)

type FarcasterProviderConf struct {
Endpoints web3.NetworkEndpoints
Endpoints *web3.Web3Pool
DB *DB
}

Expand All @@ -28,8 +27,8 @@ type FarcasterContracts struct {

type FarcasterProvider struct {
// web3
endpoints web3.NetworkEndpoints
client *ethclient.Client
endpoints *web3.Web3Pool
client *web3.Client
contracts FarcasterContracts
lastNetworkBlock atomic.Uint64
// db
Expand Down
93 changes: 93 additions & 0 deletions scanner/providers/manager/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package manager

// package manager provides a manager for providers of different types
// and a way to add and get them by type concurrently safe. It initializes a new
// provider based on the type and the configuration provided every time that a
// provider is requested to avoid data races. It also provides a way to get all
// the provider types and all the providers initialized at once.

import (
"fmt"
"sync"

"github.com/vocdoni/census3/scanner/providers"
"github.com/vocdoni/census3/scanner/providers/farcaster"
"github.com/vocdoni/census3/scanner/providers/gitcoin"
"github.com/vocdoni/census3/scanner/providers/poap"
"github.com/vocdoni/census3/scanner/providers/web3"
)

type ProviderManager struct {
confs sync.Map
}

// NewProviderManager creates a new provider manager
func NewProviderManager() *ProviderManager {
return &ProviderManager{}
}

// AddProvider adds a new provider configuration to the manager assigned to the
// specific type provided
func (m *ProviderManager) AddProvider(providerType uint64, conf any) {
m.confs.Store(providerType, conf)
}

// GetProvider returns a provider based on the type provided. It initializes the
// provider based on the configuration stored in the manager. It initializes a
// new provider every time to avoid data races. It returns an error if the
// provider type is not found or if the provider cannot be initialized.
func (m *ProviderManager) GetProvider(providerType uint64) (providers.HolderProvider, error) {
// load the configuration for the provider type
conf, ok := m.confs.Load(providerType)
if !ok {
return nil, fmt.Errorf("provider type %d not found", providerType)
}
// initialize the provider based on the type
var provider providers.HolderProvider
switch providerType {
case providers.CONTRACT_TYPE_ERC20:
provider = &web3.ERC20HolderProvider{}
case providers.CONTRACT_TYPE_ERC721:
provider = &web3.ERC721HolderProvider{}
case providers.CONTRACT_TYPE_ERC777:
provider = &web3.ERC777HolderProvider{}
case providers.CONTRACT_TYPE_POAP:
provider = &poap.POAPHolderProvider{}
case providers.CONTRACT_TYPE_GITCOIN:
provider = &gitcoin.GitcoinPassport{}
case providers.CONTRACT_TYPE_FARCASTER:
provider = &farcaster.FarcasterProvider{}
default:
return nil, fmt.Errorf("provider type %d not found", providerType)
}
// initialize the provider with the specific configuration
if err := provider.Init(conf); err != nil {
return nil, err
}
return provider, nil
}

// GetProviderTypes returns all the provider types stored in the manager as a
// slice of uint64.
func (m *ProviderManager) GetProviderTypes() []uint64 {
types := []uint64{}
m.confs.Range(func(t, _ any) bool {
types = append(types, t.(uint64))
return true
})
return types
}

// Providers returns all the providers stored in the manager associated to their
// types as a map of uint64 to HolderProvider.
func (m *ProviderManager) Providers() map[uint64]providers.HolderProvider {
providers := make(map[uint64]providers.HolderProvider)
for _, t := range m.GetProviderTypes() {
provider, err := m.GetProvider(t)
if err != nil {
panic(err)
}
providers[t] = provider
}
return providers
}
10 changes: 5 additions & 5 deletions scanner/providers/web3/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ const (
)

const (
shortNameSourceUri = "https://chainid.network/chains_mini.json"
checkNetworkEndpointsTimeout = time.Second * 10
TimeLayout = "2006-01-02T15:04:05Z07:00"
shortNameSourceUri = "https://chainid.network/chains_mini.json"
checkWeb3EndpointsTimeout = time.Second * 10
TimeLayout = "2006-01-02T15:04:05Z07:00"
)

var DefaultNetworkEndpoint = &NetworkEndpoint{
var DefaultWeb3Endpoint = &Web3Endpoint{
ChainID: 11155111,
Name: "Sepolia",
ShortName: "sep",
URIs: []string{"https://rpc2.sepolia.org"},
URI: "https://rpc2.sepolia.org",
}

const (
Expand Down
Loading

0 comments on commit 4de4c85

Please sign in to comment.