diff --git a/api/api.go b/api/api.go index 707acc68..f7af3f4c 100644 --- a/api/api.go +++ b/api/api.go @@ -38,7 +38,7 @@ type Census3APIConf struct { Port int DataDir string GroupKey string - Web3Providers web3.NetworkEndpoints + Web3Providers *web3.Web3Pool HolderProviders map[uint64]providers.HolderProvider AdminToken string } @@ -49,7 +49,7 @@ type census3API struct { endpoint *api.API censusDB *censusdb.CensusDB queue *queue.BackgroundQueue - w3p web3.NetworkEndpoints + w3p *web3.Web3Pool storage storagelayer.Storage downloader *downloader.Downloader holderProviders map[uint64]providers.HolderProvider @@ -146,7 +146,7 @@ func (capi *census3API) getAPIInfo(msg *api.APIdata, ctx *httprouter.HTTPContext info := &APIInfo{ SupportedChains: []SupportedChain{}, } - for _, provider := range capi.w3p { + for _, provider := range capi.w3p.SupportedNetworks() { info.SupportedChains = append(info.SupportedChains, SupportedChain{ ChainID: provider.ChainID, ShortName: provider.ShortName, diff --git a/cmd/census3/main.go b/cmd/census3/main.go index 55fbba78..df824ff4 100644 --- a/cmd/census3/main.go +++ b/cmd/census3/main.go @@ -17,10 +17,10 @@ import ( "github.com/vocdoni/census3/db" "github.com/vocdoni/census3/internal" "github.com/vocdoni/census3/scanner" - "github.com/vocdoni/census3/scanner/providers" "github.com/vocdoni/census3/scanner/providers/farcaster" "github.com/vocdoni/census3/scanner/providers/gitcoin" gitcoinDB "github.com/vocdoni/census3/scanner/providers/gitcoin/db" + "github.com/vocdoni/census3/scanner/providers/manager" "github.com/vocdoni/census3/scanner/providers/poap" "github.com/vocdoni/census3/scanner/providers/web3" "go.vocdoni.io/dvote/log" @@ -33,6 +33,7 @@ type Census3Config struct { poapAPIEndpoint, poapAuthToken string gitcoinEndpoint string gitcoinCooldown time.Duration + scannerConcurrentTokens int scannerCoolDown time.Duration adminToken string initialTokens string @@ -60,6 +61,7 @@ func main() { var strWeb3Providers string flag.StringVar(&strWeb3Providers, "web3Providers", "", "the list of URL's of available web3 providers") flag.DurationVar(&config.scannerCoolDown, "scannerCoolDown", 120*time.Second, "the time to wait before next scanner iteration") + flag.IntVar(&config.scannerConcurrentTokens, "scannerConcurrentTokens", 5, "the number of tokens to scan concurrently") flag.StringVar(&config.adminToken, "adminToken", "", "the admin UUID token for the API") flag.StringVar(&config.initialTokens, "initialTokens", "", "path of the initial tokens json file") flag.BoolVar(&config.farcaster, "farcaster", false, "enables farcaster support") @@ -112,6 +114,10 @@ func main() { panic(err) } config.listOfWeb3Providers = strings.Split(pviper.GetString("web3Providers"), ",") + if err := pviper.BindPFlag("scannerConcurrentTokens", flag.Lookup("scannerConcurrentTokens")); err != nil { + panic(err) + } + config.scannerConcurrentTokens = pviper.GetInt("scannerConcurrentTokens") if err := pviper.BindPFlag("scannerCoolDown", flag.Lookup("scannerCoolDown")); err != nil { panic(err) } @@ -135,84 +141,45 @@ func main() { log.Fatal("no web3 providers defined") } // check if the web3 providers are valid - w3p, err := web3.InitNetworkEndpoints(config.listOfWeb3Providers) + w3p, err := web3.NewWeb3Pool() if err != nil { log.Fatal(err) } + for _, uri := range config.listOfWeb3Providers { + if err := w3p.AddEndpoint(uri); err != nil { + log.Fatal(err) + } + } // init the database database, err := db.Init(config.dataDir, "census3.sql") if err != nil { log.Fatal(err) } - - // start the holder scanner with the database and the providers - hc := scanner.NewScanner(database, w3p, config.scannerCoolDown) - + // init the provider manager + pm := manager.NewProviderManager() // init the web3 token providers - erc20Provider := new(web3.ERC20HolderProvider) - if err := erc20Provider.Init(web3.Web3ProviderConfig{Endpoints: w3p}); err != nil { - log.Fatal(err) - return - } - erc721Provider := new(web3.ERC721HolderProvider) - if err := erc721Provider.Init(web3.Web3ProviderConfig{Endpoints: w3p}); err != nil { - log.Fatal(err) - return - } - erc777Provider := new(web3.ERC777HolderProvider) - if err := erc777Provider.Init(web3.Web3ProviderConfig{Endpoints: w3p}); err != nil { - log.Fatal(err) - return - } - - // set the providers in the scanner and the API - if err := hc.SetProviders(erc20Provider, erc721Provider, erc777Provider); err != nil { - log.Fatal(err) - return - } - apiProviders := map[uint64]providers.HolderProvider{ - erc20Provider.Type(): erc20Provider, - erc721Provider.Type(): erc721Provider, - erc777Provider.Type(): erc777Provider, - } + web3ProviderConf := web3.Web3ProviderConfig{Endpoints: w3p} + pm.AddProvider(new(web3.ERC20HolderProvider).Type(), web3ProviderConf) + pm.AddProvider(new(web3.ERC721HolderProvider).Type(), web3ProviderConf) + pm.AddProvider(new(web3.ERC777HolderProvider).Type(), web3ProviderConf) // init POAP external provider if config.poapAPIEndpoint != "" { - poapProvider := new(poap.POAPHolderProvider) - if err := poapProvider.Init(poap.POAPConfig{ + pm.AddProvider(new(poap.POAPHolderProvider).Type(), poap.POAPConfig{ APIEndpoint: config.poapAPIEndpoint, AccessToken: config.poapAuthToken, - }); err != nil { - log.Fatal(err) - return - } - if err := hc.SetProviders(poapProvider); err != nil { - log.Fatal(err) - return - } - apiProviders[poapProvider.Type()] = poapProvider + }) } if config.gitcoinEndpoint != "" { gitcoinDatabase, err := gitcoinDB.Init(config.dataDir, "gitcoinpassport.sql") if err != nil { log.Fatal(err) } - // init Gitcoin external provider - gitcoinProvider := new(gitcoin.GitcoinPassport) - if err := gitcoinProvider.Init(gitcoin.GitcoinPassportConf{ + pm.AddProvider(new(gitcoin.GitcoinPassport).Type(), gitcoin.GitcoinPassportConf{ APIEndpoint: config.gitcoinEndpoint, Cooldown: config.gitcoinCooldown, DB: gitcoinDatabase, - }); err != nil { - log.Fatal(err) - return - } - if err := hc.SetProviders(gitcoinProvider); err != nil { - log.Fatal(err) - return - } - apiProviders[gitcoinProvider.Type()] = gitcoinProvider + }) } - // if farcaster is enabled, init the farcaster database and the provider var farcasterDB *farcaster.DB if config.farcaster { @@ -221,21 +188,13 @@ func main() { if err != nil { log.Fatal(err) } - farcasterProvider := new(farcaster.FarcasterProvider) - if err := farcasterProvider.Init(farcaster.FarcasterProviderConf{ + pm.AddProvider(new(farcaster.FarcasterProvider).Type(), farcaster.FarcasterProviderConf{ Endpoints: w3p, DB: farcasterDB, - }); err != nil { - log.Fatal(err) - return - } - if err := hc.SetProviders(farcasterProvider); err != nil { - log.Fatal(err) - return - } - apiProviders[farcasterProvider.Type()] = farcasterProvider + }) } - + // start the holder scanner with the database and the provider manager + hc := scanner.NewScanner(database, w3p, pm, config.scannerCoolDown) // if the admin token is not defined, generate a random one if config.adminToken != "" { if _, err := uuid.Parse(config.adminToken); err != nil { @@ -254,7 +213,7 @@ func main() { DataDir: config.dataDir, Web3Providers: w3p, GroupKey: config.connectKey, - HolderProviders: apiProviders, + HolderProviders: pm.Providers(), AdminToken: config.adminToken, }) if err != nil { @@ -267,7 +226,8 @@ func main() { } log.Info("initial tokens created, or at least tried to") }() - go hc.Start(ctx) + // start the holder scanner + go hc.Start(ctx, config.scannerConcurrentTokens) metrics.NewCounter(fmt.Sprintf("census3_info{version=%q,chains=%q}", internal.Version, w3p.String())).Set(1) diff --git a/scanner/providers/farcaster/provider.go b/scanner/providers/farcaster/provider.go index 5ed646d8..80e71868 100644 --- a/scanner/providers/farcaster/provider.go +++ b/scanner/providers/farcaster/provider.go @@ -106,14 +106,9 @@ func (p *FarcasterProvider) Init(iconf any) error { } p.contracts.lastBlock.Store(uint64(lastBlock)) // init the web3 client and contracts - currentEndpoint, exists := p.endpoints.EndpointByChainID(ChainID) - if !exists { - return errors.New("endpoint not found for the given chainID") - } - // connect to the endpoint and set the client - p.client, err = currentEndpoint.GetClient(web3.DefaultMaxWeb3ClientRetries) + p.client, err = p.endpoints.Client(ChainID) if err != nil { - return errors.Join(web3.ErrConnectingToWeb3Client, fmt.Errorf("[FARCASTER]: %w", err)) + return errors.Join(web3.ErrConnectingToWeb3Client, fmt.Errorf("[FARCASTER]: error getting web3 client: %w", err)) } // parse the addresses and initialize the contracts idRegistryAddress := common.HexToAddress(IdRegistryAddress) diff --git a/scanner/providers/farcaster/types.go b/scanner/providers/farcaster/types.go index 99f73489..7c3040a6 100644 --- a/scanner/providers/farcaster/types.go +++ b/scanner/providers/farcaster/types.go @@ -7,14 +7,13 @@ import ( "sync/atomic" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/ethclient" fcir "github.com/vocdoni/census3/contracts/farcaster/idRegistry" fckr "github.com/vocdoni/census3/contracts/farcaster/keyRegistry" "github.com/vocdoni/census3/scanner/providers/web3" ) type FarcasterProviderConf struct { - Endpoints web3.NetworkEndpoints + Endpoints *web3.Web3Pool DB *DB } @@ -28,8 +27,8 @@ type FarcasterContracts struct { type FarcasterProvider struct { // web3 - endpoints web3.NetworkEndpoints - client *ethclient.Client + endpoints *web3.Web3Pool + client *web3.Client contracts FarcasterContracts lastNetworkBlock atomic.Uint64 // db diff --git a/scanner/providers/manager/manager.go b/scanner/providers/manager/manager.go new file mode 100644 index 00000000..85a95d57 --- /dev/null +++ b/scanner/providers/manager/manager.go @@ -0,0 +1,93 @@ +package manager + +// package manager provides a manager for providers of different types +// and a way to add and get them by type concurrently safe. It initializes a new +// provider based on the type and the configuration provided every time that a +// provider is requested to avoid data races. It also provides a way to get all +// the provider types and all the providers initialized at once. + +import ( + "fmt" + "sync" + + "github.com/vocdoni/census3/scanner/providers" + "github.com/vocdoni/census3/scanner/providers/farcaster" + "github.com/vocdoni/census3/scanner/providers/gitcoin" + "github.com/vocdoni/census3/scanner/providers/poap" + "github.com/vocdoni/census3/scanner/providers/web3" +) + +type ProviderManager struct { + confs sync.Map +} + +// NewProviderManager creates a new provider manager +func NewProviderManager() *ProviderManager { + return &ProviderManager{} +} + +// AddProvider adds a new provider configuration to the manager assigned to the +// specific type provided +func (m *ProviderManager) AddProvider(providerType uint64, conf any) { + m.confs.Store(providerType, conf) +} + +// GetProvider returns a provider based on the type provided. It initializes the +// provider based on the configuration stored in the manager. It initializes a +// new provider every time to avoid data races. It returns an error if the +// provider type is not found or if the provider cannot be initialized. +func (m *ProviderManager) GetProvider(providerType uint64) (providers.HolderProvider, error) { + // load the configuration for the provider type + conf, ok := m.confs.Load(providerType) + if !ok { + return nil, fmt.Errorf("provider type %d not found", providerType) + } + // initialize the provider based on the type + var provider providers.HolderProvider + switch providerType { + case providers.CONTRACT_TYPE_ERC20: + provider = &web3.ERC20HolderProvider{} + case providers.CONTRACT_TYPE_ERC721: + provider = &web3.ERC721HolderProvider{} + case providers.CONTRACT_TYPE_ERC777: + provider = &web3.ERC777HolderProvider{} + case providers.CONTRACT_TYPE_POAP: + provider = &poap.POAPHolderProvider{} + case providers.CONTRACT_TYPE_GITCOIN: + provider = &gitcoin.GitcoinPassport{} + case providers.CONTRACT_TYPE_FARCASTER: + provider = &farcaster.FarcasterProvider{} + default: + return nil, fmt.Errorf("provider type %d not found", providerType) + } + // initialize the provider with the specific configuration + if err := provider.Init(conf); err != nil { + return nil, err + } + return provider, nil +} + +// GetProviderTypes returns all the provider types stored in the manager as a +// slice of uint64. +func (m *ProviderManager) GetProviderTypes() []uint64 { + types := []uint64{} + m.confs.Range(func(t, _ any) bool { + types = append(types, t.(uint64)) + return true + }) + return types +} + +// Providers returns all the providers stored in the manager associated to their +// types as a map of uint64 to HolderProvider. +func (m *ProviderManager) Providers() map[uint64]providers.HolderProvider { + providers := make(map[uint64]providers.HolderProvider) + for _, t := range m.GetProviderTypes() { + provider, err := m.GetProvider(t) + if err != nil { + panic(err) + } + providers[t] = provider + } + return providers +} diff --git a/scanner/providers/web3/const.go b/scanner/providers/web3/const.go index 0f66f218..541425dc 100644 --- a/scanner/providers/web3/const.go +++ b/scanner/providers/web3/const.go @@ -8,16 +8,16 @@ const ( ) const ( - shortNameSourceUri = "https://chainid.network/chains_mini.json" - checkNetworkEndpointsTimeout = time.Second * 10 - TimeLayout = "2006-01-02T15:04:05Z07:00" + shortNameSourceUri = "https://chainid.network/chains_mini.json" + checkWeb3EndpointsTimeout = time.Second * 10 + TimeLayout = "2006-01-02T15:04:05Z07:00" ) -var DefaultNetworkEndpoint = &NetworkEndpoint{ +var DefaultWeb3Endpoint = &Web3Endpoint{ ChainID: 11155111, Name: "Sepolia", ShortName: "sep", - URIs: []string{"https://rpc2.sepolia.org"}, + URI: "https://rpc2.sepolia.org", } const ( diff --git a/scanner/providers/web3/endpoint.go b/scanner/providers/web3/endpoint.go deleted file mode 100644 index 04614cfb..00000000 --- a/scanner/providers/web3/endpoint.go +++ /dev/null @@ -1,199 +0,0 @@ -package web3 - -import ( - "context" - "encoding/json" - "fmt" - "net/http" - "os" - - "github.com/ethereum/go-ethereum/ethclient" - "go.vocdoni.io/dvote/log" -) - -// NetworkEndpoint struct contains all the required information about a web3 -// provider based on its URI. It includes its chain ID, its name (and shortName) -// and the URI. -type NetworkEndpoint struct { - ChainID uint64 `json:"chainId"` - Name string `json:"name"` - ShortName string `json:"shortName"` - URIs []string -} - -// GetClient method returns a web3 client for the first URI that can be dialed. -func (n *NetworkEndpoint) GetClient(maxRetries int) (*ethclient.Client, error) { - for try := 0; try < maxRetries; try++ { - for _, uri := range n.URIs { - if cli, err := ethclient.Dial(uri); err == nil { - return cli, nil - } - } - } - return nil, fmt.Errorf("error dialing web3 provider uris") -} - -// GetChainIDByURI function returns the chainID of the web3 provider URI -// provided. It dials the URI and gets the chainID from the web3 endpoint, -// using the context provided and the GetClient method with the -// DefaultMaxWeb3ClientRetries value. -func GetChainIDByURI(ctx context.Context, uri string) (uint64, error) { - n := &NetworkEndpoint{URIs: []string{uri}} - cli, err := n.GetClient(DefaultMaxWeb3ClientRetries) - if err != nil { - return 0, err - } - defer cli.Close() - ctx, cancel := context.WithTimeout(ctx, checkNetworkEndpointsTimeout) - defer cancel() - chainID, err := cli.ChainID(ctx) - if err != nil { - return 0, err - } - n.ChainID = chainID.Uint64() - return n.ChainID, nil -} - -// NetworkEndpoints type envolves a map of uint64-NetworkEndpoint, used to index the -// configured web3 providers by the chainID. -type NetworkEndpoints map[uint64]*NetworkEndpoint - -// EndpointByChainID method returns the NetworkEndpoint configured for the -// chainID provided. -func (nps NetworkEndpoints) EndpointByChainID(chainID uint64) (*NetworkEndpoint, bool) { - provider, ok := nps[chainID] - return provider, ok -} - -// URIByChainID method returns the URI configured for the chainID provided. -func (nps NetworkEndpoints) URIsByChainID(chainID uint64) ([]string, bool) { - provider, ok := nps[chainID] - if !ok { - return nil, false - } - return provider.URIs, true -} - -// ChainIDByShortName method returns the chainID configured for the networkEndpoint -// short name provided. -func (nps NetworkEndpoints) ChainIDByShortName(shortName string) (uint64, bool) { - for _, provider := range nps { - if provider.ShortName == shortName { - return provider.ChainID, true - } - } - return 0, false -} - -// ChainAddress method returns a prefixed string of the hex address provided, -// with the short name of the networkEndpoint identified by the chain id provided. -// Read more here: https://eips.ethereum.org/EIPS/eip-3770 -func (nps NetworkEndpoints) ChainAddress(chainID uint64, hexAddress string) (string, bool) { - provider, ok := nps[chainID] - if !ok { - return "", false - } - return fmt.Sprintf("%s:%s", provider.ShortName, hexAddress), true -} - -// String method returns a string representation of the NetworkEndpoints list. -func (nps NetworkEndpoints) String() string { - var shortNames []string - for _, provider := range nps { - shortNames = append(shortNames, provider.ShortName) - } - return fmt.Sprintf("%v", shortNames) -} - -// CurrentBlockNumbers method returns a map of uint64-uint64, where the key is -// the chainID and the value is the current block number of the network. -func (nps NetworkEndpoints) CurrentBlockNumbers(ctx context.Context) (map[uint64]uint64, error) { - blockNumbers := make(map[uint64]uint64) - for _, endpoint := range nps { - cli, err := endpoint.GetClient(DefaultMaxWeb3ClientRetries) - if err != nil { - return blockNumbers, err - } - blockNumber, err := cli.BlockNumber(ctx) - if err != nil { - return blockNumbers, fmt.Errorf("error getting the block number from %s network: %w", endpoint.Name, err) - } - blockNumbers[endpoint.ChainID] = blockNumber - } - return blockNumbers, nil -} - -// InitNetworkEndpoints function initializes a NetworkEndpoints list checking -// the web3 enpoint URI's provided as argument. It checks if the URI's are -// valid, getting its chain ID's and then query to shortNameSourceURI endpoint -// to get the chain name and short name. If more than one URI is provided for -// the same chainID, the URIs are grouped in the same NetworkEndpoint. If no -// valid URIs are provided, an error is returned. -func InitNetworkEndpoints(providersURIs []string) (NetworkEndpoints, error) { - if len(providersURIs) == 0 { - return nil, fmt.Errorf("no URIs provided") - } - // get chains information from external source - res, err := http.Get(shortNameSourceUri) - if err != nil { - return nil, fmt.Errorf("error getting chains information from external source: %v", err) - } - chainsData := []*NetworkEndpoint{} - if err := json.NewDecoder(res.Body).Decode(&chainsData); err != nil { - return nil, fmt.Errorf("error decoding chains information from external source: %v", err) - } - providers := make(NetworkEndpoints) - for _, uri := range providersURIs { - ctx, cancel := context.WithTimeout(context.Background(), checkNetworkEndpointsTimeout) - defer cancel() - - cli, err := ethclient.DialContext(ctx, uri) - if err != nil { - log.Errorf("error dialing web3 provider uri '%s': %v", uri, err) - continue - } - // get the chainID from the web3 endpoint - bChainID, err := cli.ChainID(ctx) - if err != nil { - log.Errorf("error getting the chainID from the web3 provider '%s': %v", uri, err) - continue - } - chainID := bChainID.Uint64() - if provider, ok := providers[chainID]; ok { - provider.URIs = append(provider.URIs, uri) - providers[chainID] = provider - continue - } - // get chain shortName - for _, info := range chainsData { - if info.ChainID == chainID { - info.URIs = []string{uri} - providers[info.ChainID] = info - break - } - } - } - if len(providers) == 0 { - return nil, fmt.Errorf("no valid URIs provided") - } - return providers, nil -} - -// TestNetworkEndpoint function returns a NetworkEndpoint for testing purposes. -// It checks if the WEB3_URI environment variable is set, and if so, it uses -// its value to initialize a NetworkEndpoint. If not, it returns the -// DefaultNetworkEndpoint. -func TestNetworkEndpoint() (*NetworkEndpoint, error) { - if uri := os.Getenv("WEB3_URI"); uri != "" { - endpoints, err := InitNetworkEndpoints([]string{uri}) - if err != nil { - return nil, err - } - var chainID uint64 - if chainID, err = GetChainIDByURI(context.Background(), uri); err != nil { - return nil, err - } - return endpoints[chainID], nil - } - return DefaultNetworkEndpoint, nil -} diff --git a/scanner/providers/web3/endpoint_test.go b/scanner/providers/web3/endpoint_test.go deleted file mode 100644 index e3df66b9..00000000 --- a/scanner/providers/web3/endpoint_test.go +++ /dev/null @@ -1,39 +0,0 @@ -package web3 - -import ( - "testing" - - qt "github.com/frankban/quicktest" -) - -func TestCheckWeb3Providers(t *testing.T) { - c := qt.New(t) - - testNetwork, err := InitNetworkEndpoints(DefaultNetworkEndpoint.URIs) - c.Assert(err, qt.IsNil) - c.Assert(testNetwork[DefaultNetworkEndpoint.ChainID].URIs, qt.ContentEquals, DefaultNetworkEndpoint.URIs) - c.Assert(testNetwork[DefaultNetworkEndpoint.ChainID].ShortName, qt.Equals, DefaultNetworkEndpoint.ShortName) - c.Assert(testNetwork[DefaultNetworkEndpoint.ChainID].Name, qt.Equals, DefaultNetworkEndpoint.Name) - - t.Run("URIByChainID", func(t *testing.T) { - _, ok := testNetwork.URIsByChainID(DefaultNetworkEndpoint.ChainID + 1) - c.Assert(ok, qt.IsFalse) - uri, ok := testNetwork.URIsByChainID(DefaultNetworkEndpoint.ChainID) - c.Assert(ok, qt.Equals, true) - c.Assert(uri, qt.ContentEquals, DefaultNetworkEndpoint.URIs) - }) - t.Run("ChainIDByShortName", func(t *testing.T) { - _, ok := testNetwork.ChainIDByShortName("UNKNOWN") - c.Assert(ok, qt.IsFalse) - chainID, ok := testNetwork.ChainIDByShortName(DefaultNetworkEndpoint.ShortName) - c.Assert(ok, qt.Equals, true) - c.Assert(chainID, qt.ContentEquals, DefaultNetworkEndpoint.ChainID) - }) - t.Run("ChainAddress", func(t *testing.T) { - _, ok := testNetwork.ChainAddress(DefaultNetworkEndpoint.ChainID+1, "0x1234567890") - c.Assert(ok, qt.IsFalse) - prefix, ok := testNetwork.ChainAddress(DefaultNetworkEndpoint.ChainID, "0x1234567890") - c.Assert(ok, qt.Equals, true) - c.Assert(prefix, qt.Equals, "sep:0x1234567890") - }) -} diff --git a/scanner/providers/web3/erc20_provider.go b/scanner/providers/web3/erc20_provider.go index e236c956..7a8580ec 100644 --- a/scanner/providers/web3/erc20_provider.go +++ b/scanner/providers/web3/erc20_provider.go @@ -9,15 +9,14 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/ethclient" erc20 "github.com/vocdoni/census3/contracts/erc/erc20" "github.com/vocdoni/census3/scanner/providers" "go.vocdoni.io/dvote/log" ) type ERC20HolderProvider struct { - endpoints NetworkEndpoints - client *ethclient.Client + endpoints *Web3Pool + client *Client contract *erc20.ERC20Contract address common.Address @@ -54,25 +53,20 @@ func (p *ERC20HolderProvider) Init(iconf any) error { // to use. It connects to the endpoint and initializes the contract. func (p *ERC20HolderProvider) SetRef(iref any) error { if p.endpoints == nil { - return errors.New("endpoints not defined") + return fmt.Errorf("endpoints not defined") } ref, ok := iref.(Web3ProviderRef) if !ok { - return errors.New("invalid ref type, it must be Web3ProviderRef") + return fmt.Errorf("invalid ref type, it must be Web3ProviderRef") } - currentEndpoint, exists := p.endpoints.EndpointByChainID(ref.ChainID) - if !exists { - return errors.New("endpoint not found for the given chainID") - } - // connect to the endpoint - client, err := currentEndpoint.GetClient(DefaultMaxWeb3ClientRetries) + var err error + p.client, err = p.endpoints.Client(ref.ChainID) if err != nil { - return errors.Join(ErrConnectingToWeb3Client, fmt.Errorf("[ERC20] %s: %w", ref.HexAddress, err)) + return fmt.Errorf("error getting web3 client for the given chainID: %w", err) } // set the client, parse the address and initialize the contract - p.client = client p.address = common.HexToAddress(ref.HexAddress) - if p.contract, err = erc20.NewERC20Contract(p.address, client); err != nil { + if p.contract, err = erc20.NewERC20Contract(p.address, p.client); err != nil { return errors.Join(ErrInitializingContract, fmt.Errorf("[ERC20] %s: %w", p.address, err)) } // reset the current creation block, if a creation block is defined in the @@ -127,7 +121,7 @@ func (p *ERC20HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fro // an error if fromBlock >= p.lastNetworkBlock && fromBlock == p.creationBlock { return nil, 0, fromBlock, false, big.NewInt(0), - errors.New("outdated last network block, it will retry in the next iteration") + fmt.Errorf("outdated last network block, it will retry in the next iteration") } // calculate the range of blocks to scan, by default take the last block // scanned and scan to the latest block, calculate the latest block if the diff --git a/scanner/providers/web3/erc721_provider.go b/scanner/providers/web3/erc721_provider.go index 7bbbe2e9..a1205a3d 100644 --- a/scanner/providers/web3/erc721_provider.go +++ b/scanner/providers/web3/erc721_provider.go @@ -9,15 +9,14 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/ethclient" erc721 "github.com/vocdoni/census3/contracts/erc/erc721" "github.com/vocdoni/census3/scanner/providers" "go.vocdoni.io/dvote/log" ) type ERC721HolderProvider struct { - endpoints NetworkEndpoints - client *ethclient.Client + endpoints *Web3Pool + client *Client contract *erc721.ERC721Contract address common.Address @@ -60,19 +59,14 @@ func (p *ERC721HolderProvider) SetRef(iref any) error { if !ok { return errors.New("invalid ref type, it must be Web3ProviderRef") } - currentEndpoint, exists := p.endpoints.EndpointByChainID(ref.ChainID) - if !exists { - return errors.New("endpoint not found for the given chainID") - } - // connect to the endpoint - client, err := currentEndpoint.GetClient(DefaultMaxWeb3ClientRetries) + var err error + p.client, err = p.endpoints.Client(ref.ChainID) if err != nil { - return errors.Join(ErrConnectingToWeb3Client, fmt.Errorf("[ERC721] %s: %w", ref.HexAddress, err)) + return fmt.Errorf("error getting web3 client for the given chainID: %w", err) } // set the client, parse the address and initialize the contract - p.client = client address := common.HexToAddress(ref.HexAddress) - if p.contract, err = erc721.NewERC721Contract(address, client); err != nil { + if p.contract, err = erc721.NewERC721Contract(address, p.client); err != nil { return errors.Join(ErrInitializingContract, fmt.Errorf("[ERC721] %s: %w", p.address, err)) } if ref.CreationBlock > 0 { diff --git a/scanner/providers/web3/erc777_provider.go b/scanner/providers/web3/erc777_provider.go index 97dddefd..10248cfc 100644 --- a/scanner/providers/web3/erc777_provider.go +++ b/scanner/providers/web3/erc777_provider.go @@ -9,15 +9,14 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/ethclient" erc777 "github.com/vocdoni/census3/contracts/erc/erc777" "github.com/vocdoni/census3/scanner/providers" "go.vocdoni.io/dvote/log" ) type ERC777HolderProvider struct { - endpoints NetworkEndpoints - client *ethclient.Client + endpoints *Web3Pool + client *Client contract *erc777.ERC777Contract address common.Address @@ -60,19 +59,14 @@ func (p *ERC777HolderProvider) SetRef(iref any) error { if !ok { return errors.New("invalid ref type, it must be Web3ProviderRef") } - currentEndpoint, exists := p.endpoints.EndpointByChainID(ref.ChainID) - if !exists { - return errors.New("endpoint not found for the given chainID") - } - // connect to the endpoint - client, err := currentEndpoint.GetClient(DefaultMaxWeb3ClientRetries) + var err error + p.client, err = p.endpoints.Client(ref.ChainID) if err != nil { - return errors.Join(ErrConnectingToWeb3Client, fmt.Errorf("[ERC777] %s: %w", ref.HexAddress, err)) + return fmt.Errorf("error getting web3 client for the given chainID: %w", err) } // set the client, parse the address and initialize the contract - p.client = client address := common.HexToAddress(ref.HexAddress) - if p.contract, err = erc777.NewERC777Contract(address, client); err != nil { + if p.contract, err = erc777.NewERC777Contract(address, p.client); err != nil { return errors.Join(ErrInitializingContract, fmt.Errorf("[ERC777] %s: %w", p.address, err)) } if ref.CreationBlock > 0 { diff --git a/scanner/providers/web3/web3_client.go b/scanner/providers/web3/web3_client.go new file mode 100644 index 00000000..ca0ca6f5 --- /dev/null +++ b/scanner/providers/web3/web3_client.go @@ -0,0 +1,331 @@ +package web3 + +import ( + "context" + "fmt" + "math/big" + "time" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" +) + +const defaultRetries = 3 + +var ( + defaultTimeout = 2 * time.Second + filterLogsTimeout = 3 * time.Second + retrySleep = 200 * time.Millisecond +) + +// Client struct implements bind.ContractBackend interface for a web3 pool with +// an specific chainID. It allows to interact with the blockchain using the +// methods provided by the interface balancing the load between the available +// endpoints in the pool for the chainID. +type Client struct { + w3p *Web3Pool + chainID uint64 +} + +// EthClient method returns the ethclient.Client for the chainID of the Client +// instance. It returns an error if the chainID is not found in the pool. +func (c *Client) EthClient() (*ethclient.Client, error) { + endpoint, err := c.w3p.Endpoint(c.chainID) + if err != nil { + return nil, fmt.Errorf("error getting endpoint for chainID %d: %w", c.chainID, err) + } + return endpoint.client, nil +} + +// CodeAt method wraps the CodeAt method from the ethclient.Client for the +// chainID of the Client instance. It returns an error if the chainID is not +// found in the pool or if the method fails. Required by the bind.ContractBackend +// interface. +func (c *Client) CodeAt(ctx context.Context, account common.Address, blockNumber *big.Int) ([]byte, error) { + endpoint, err := c.w3p.Endpoint(c.chainID) + if err != nil { + return nil, fmt.Errorf("error getting endpoint for chainID %d: %w", c.chainID, err) + } + // retry the method in case of failure and get final result and error + res, err := c.retryAndCheckErr(endpoint.URI, func() (any, error) { + internalCtx, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + return endpoint.client.CodeAt(internalCtx, account, blockNumber) + }) + if err != nil { + return nil, err + } + return res.([]byte), err +} + +// CallContract method wraps the CallContract method from the ethclient.Client +// for the chainID of the Client instance. It returns an error if the chainID is +// not found in the pool or if the method fails. Required by the +// bind.ContractBackend interface. +func (c *Client) CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) { + endpoint, err := c.w3p.Endpoint(c.chainID) + if err != nil { + return nil, fmt.Errorf("error getting endpoint for chainID %d: %w", c.chainID, err) + } + // retry the method in case of failure and get final result and error + res, err := c.retryAndCheckErr(endpoint.URI, func() (any, error) { + internalCtx, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + return endpoint.client.CallContract(internalCtx, call, blockNumber) + }) + if err != nil { + return nil, err + } + return res.([]byte), err +} + +// EstimateGas method wraps the EstimateGas method from the ethclient.Client for +// the chainID of the Client instance. It returns an error if the chainID is not +// found in the pool or if the method fails. Required by the bind.ContractBackend +// interface. +func (c *Client) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error) { + endpoint, err := c.w3p.Endpoint(c.chainID) + if err != nil { + return 0, fmt.Errorf("error getting endpoint for chainID %d: %w", c.chainID, err) + } + // retry the method in case of failure and get final result and error + res, err := c.retryAndCheckErr(endpoint.URI, func() (any, error) { + internalCtx, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + return endpoint.client.EstimateGas(internalCtx, msg) + }) + if err != nil { + return 0, err + } + return res.(uint64), err +} + +// FilterLogs method wraps the FilterLogs method from the ethclient.Client for +// the chainID of the Client instance. It returns an error if the chainID is not +// found in the pool or if the method fails. Required by the bind.ContractBackend +// interface. +func (c *Client) FilterLogs(ctx context.Context, query ethereum.FilterQuery) ([]types.Log, error) { + endpoint, err := c.w3p.Endpoint(c.chainID) + if err != nil { + return nil, fmt.Errorf("error getting endpoint for chainID %d: %w", c.chainID, err) + } + // retry the method in case of failure and get final result and error + res, err := c.retryAndCheckErr(endpoint.URI, func() (any, error) { + internalCtx, cancel := context.WithTimeout(ctx, filterLogsTimeout) + defer cancel() + return endpoint.client.FilterLogs(internalCtx, query) + }) + if err != nil { + return nil, err + } + return res.([]types.Log), nil +} + +// HeaderByNumber method wraps the HeaderByNumber method from the ethclient.Client +// for the chainID of the Client instance. It returns an error if the chainID is +// not found in the pool or if the method fails. Required by the +// bind.ContractBackend interface. +func (c *Client) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { + endpoint, err := c.w3p.Endpoint(c.chainID) + if err != nil { + return nil, fmt.Errorf("error getting endpoint for chainID %d: %w", c.chainID, err) + } + // retry the method in case of failure and get final result and error + res, err := c.retryAndCheckErr(endpoint.URI, func() (any, error) { + internalCtx, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + return endpoint.client.HeaderByNumber(internalCtx, number) + }) + if err != nil { + return nil, err + } + return res.(*types.Header), err +} + +// PendingNonceAt method wraps the PendingNonceAt method from the +// ethclient.Client for the chainID of the Client instance. It returns an error +// if the chainID is not found in the pool or if the method fails. Required by +// the bind.ContractBackend interface. +func (c *Client) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) { + endpoint, err := c.w3p.Endpoint(c.chainID) + if err != nil { + return 0, fmt.Errorf("error getting endpoint for chainID %d: %w", c.chainID, err) + } + // retry the method in case of failure and get final result and error + res, err := c.retryAndCheckErr(endpoint.URI, func() (any, error) { + internalCtx, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + return endpoint.client.PendingNonceAt(internalCtx, account) + }) + if err != nil { + return 0, err + } + return res.(uint64), err +} + +// SuggestGasPrice method wraps the SuggestGasPrice method from the +// ethclient.Client for the chainID of the Client instance. It returns an error +// if the chainID is not found in the pool or if the method fails. Required by +// the bind.ContractBackend interface. +func (c *Client) SuggestGasPrice(ctx context.Context) (*big.Int, error) { + endpoint, err := c.w3p.Endpoint(c.chainID) + if err != nil { + return nil, fmt.Errorf("error getting endpoint for chainID %d: %w", c.chainID, err) + } + // retry the method in case of failure and get final result and error + res, err := c.retryAndCheckErr(endpoint.URI, func() (any, error) { + internalCtx, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + return endpoint.client.SuggestGasPrice(internalCtx) + }) + if err != nil { + return nil, err + } + return res.(*big.Int), err +} + +// SendTransaction method wraps the SendTransaction method from the ethclient.Client +// for the chainID of the Client instance. It returns an error if the chainID is +// not found in the pool or if the method fails. Required by the +// bind.ContractBackend interface. +func (c *Client) SendTransaction(ctx context.Context, tx *types.Transaction) error { + endpoint, err := c.w3p.Endpoint(c.chainID) + if err != nil { + return fmt.Errorf("error getting endpoint for chainID %d: %w", c.chainID, err) + } + // retry the method in case of failure and get final result and error + _, err = c.retryAndCheckErr(endpoint.URI, func() (any, error) { + internalCtx, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + return nil, endpoint.client.SendTransaction(internalCtx, tx) + }) + return err +} + +// PendingCodeAt method wraps the PendingCodeAt method from the ethclient.Client +// for the chainID of the Client instance. It returns an error if the chainID is +// not found in the pool or if the method fails. Required by the +// bind.ContractBackend interface. +func (c *Client) PendingCodeAt(ctx context.Context, account common.Address) ([]byte, error) { + endpoint, err := c.w3p.Endpoint(c.chainID) + if err != nil { + return nil, fmt.Errorf("error getting endpoint for chainID %d: %w", c.chainID, err) + } + // retry the method in case of failure and get final result and error + res, err := c.retryAndCheckErr(endpoint.URI, func() (any, error) { + internalCtx, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + return endpoint.client.PendingCodeAt(internalCtx, account) + }) + if err != nil { + return nil, err + } + return res.([]byte), err +} + +// SubscribeFilterLogs method wraps the SubscribeFilterLogs method from the +// ethclient.Client for the chainID of the Client instance. It returns an error +// if the chainID is not found in the pool or if the method fails. Required by +// the bind.ContractBackend interface. +func (c *Client) SubscribeFilterLogs(ctx context.Context, + query ethereum.FilterQuery, ch chan<- types.Log, +) (ethereum.Subscription, error) { + endpoint, err := c.w3p.Endpoint(c.chainID) + if err != nil { + return nil, fmt.Errorf("error getting endpoint for chainID %d: %w", c.chainID, err) + } + // retry the method in case of failure and get final result and error + res, err := c.retryAndCheckErr(endpoint.URI, func() (any, error) { + internalCtx, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + return endpoint.client.SubscribeFilterLogs(internalCtx, query, ch) + }) + if err != nil { + return nil, err + } + return res.(ethereum.Subscription), err +} + +// SuggestGasTipCap method wraps the SuggestGasTipCap method from the +// ethclient.Client for the chainID of the Client instance. It returns an error +// if the chainID is not found in the pool or if the method fails. Required by +// the bind.ContractBackend interface. +func (c *Client) SuggestGasTipCap(ctx context.Context) (*big.Int, error) { + endpoint, err := c.w3p.Endpoint(c.chainID) + if err != nil { + return nil, fmt.Errorf("error getting endpoint for chainID %d: %w", c.chainID, err) + } + // retry the method in case of failure and get final result and error + res, err := c.retryAndCheckErr(endpoint.URI, func() (any, error) { + internalCtx, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + return endpoint.client.SuggestGasTipCap(internalCtx) + }) + if err != nil { + return nil, err + } + return res.(*big.Int), err +} + +// BalanceAt method wraps the BalanceAt method from the ethclient.Client for the +// chainID of the Client instance. It returns an error if the chainID is not +// found in the pool or if the method fails. This method is required by internal +// logic, it is not required by the bind.ContractBackend interface. +func (c *Client) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) { + endpoint, err := c.w3p.Endpoint(c.chainID) + if err != nil { + return nil, fmt.Errorf("error getting endpoint for chainID %d: %w", c.chainID, err) + } + // retry the method in case of failure and get final result and error + res, err := c.retryAndCheckErr(endpoint.URI, func() (any, error) { + internalCtx, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + return endpoint.client.BalanceAt(internalCtx, account, blockNumber) + }) + if err != nil { + return nil, err + } + return res.(*big.Int), err +} + +// BlockNumber method wraps the BlockNumber method from the ethclient.Client for +// the chainID of the Client instance. It returns an error if the chainID is not +// found in the pool or if the method fails. This method is required by internal +// logic, it is not required by the bind.ContractBackend interface. +func (c *Client) BlockNumber(ctx context.Context) (uint64, error) { + endpoint, err := c.w3p.Endpoint(c.chainID) + if err != nil { + return 0, fmt.Errorf("error getting endpoint for chainID %d: %w", c.chainID, err) + } + // retry the method in case of failure and get final result and error + res, err := c.retryAndCheckErr(endpoint.URI, func() (any, error) { + internalCtx, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + return endpoint.client.BlockNumber(internalCtx) + }) + if err != nil { + return 0, err + } + return res.(uint64), err +} + +// retryAndCheckErr method retries a function call in case of error and checks +// the error after the retries. It returns the result of the function call and +// the error if the retries are exhausted. It is used to retry the methods of +// the ethclient.Client in case of failure. If the error is not nil after the +// retries, the endpoint is disabled in the pool and the error is returned. +func (c *Client) retryAndCheckErr(uri string, fn func() (any, error)) (any, error) { + var res any + var err error + for i := 0; i < defaultRetries; i++ { + res, err = fn() + if err == nil { + return res, nil + } + time.Sleep(retrySleep) + } + c.w3p.DisableEndpoint(c.chainID, uri) + return nil, fmt.Errorf("error after %d retries: %w", defaultRetries, err) +} diff --git a/scanner/providers/web3/web3_iter.go b/scanner/providers/web3/web3_iter.go new file mode 100644 index 00000000..94d77ac2 --- /dev/null +++ b/scanner/providers/web3/web3_iter.go @@ -0,0 +1,108 @@ +package web3 + +import ( + "fmt" + "sync" + + "github.com/ethereum/go-ethereum/ethclient" +) + +// Web3Endpoint struct contains all the required information about a web3 +// provider based on its URI. It includes its chain ID, its name (and shortName) +// and the URI. +type Web3Endpoint struct { + ChainID uint64 `json:"chainId"` + Name string `json:"name"` + ShortName string `json:"shortName"` + URI string + client *ethclient.Client +} + +// Web3Iterator struct is a pool of Web3Endpoint that allows to get the next +// available endpoint in a round-robin fashion. It also allows to disable an +// endpoint if it fails. It allows to manage multiple endpoints safely. +type Web3Iterator struct { + nextIndex int + available []*Web3Endpoint + disabled []*Web3Endpoint + mtx sync.Mutex +} + +// NewWeb3Iterator creates a new Web3Iterator with the given endpoints. +func NewWeb3Iterator(endpoints ...*Web3Endpoint) *Web3Iterator { + if endpoints == nil { + endpoints = make([]*Web3Endpoint, 0) + } + return &Web3Iterator{ + available: endpoints, + disabled: make([]*Web3Endpoint, 0), + } +} + +// Add adds a new endpoint to the pool, making it available for the next +// requests. +func (w3pp *Web3Iterator) Add(endpoint ...*Web3Endpoint) { + w3pp.mtx.Lock() + defer w3pp.mtx.Unlock() + w3pp.available = append(w3pp.available, endpoint...) +} + +// Next returns the next available endpoint in a round-robin fashion. If +// there are no registered endpoints, it will return an error. If there are no +// available endpoints, it will reset the disabled endpoints and return the +// first available endpoint. +func (w3pp *Web3Iterator) Next() (*Web3Endpoint, error) { + w3pp.mtx.Lock() + defer w3pp.mtx.Unlock() + l := len(w3pp.available) + if l == 0 { + return nil, fmt.Errorf("no registered endpoints") + } + // get the current endpoint. the next index can not be invalid at this + // point because the available list not empty, the next index is always a + // valid index since it is updated when an endpoint is disabled or when the + // resulting endpoint is resolved, so the endpoint can not be nil + currentEndpoint := w3pp.available[w3pp.nextIndex] + // calculate the following next endpoint index based on the current one + if w3pp.nextIndex++; w3pp.nextIndex >= l { + // if the next index is out of bounds, reset it to the first one + w3pp.nextIndex = 0 + } + // update the next index and return the current endpoint + return currentEndpoint, nil +} + +// Disable method disables an endpoint, moving it from the available list to the +// the disabled list. +func (w3pp *Web3Iterator) Disable(uri string) { + w3pp.mtx.Lock() + defer w3pp.mtx.Unlock() + // get the index of the endpoint to disable + var index int + for i, e := range w3pp.available { + if e.URI == uri { + index = i + break + } + } + // get the endpoint to disable and move it to the disabled list + disabledEndpoint := w3pp.available[index] + w3pp.available = append(w3pp.available[:index], w3pp.available[index+1:]...) + w3pp.disabled = append(w3pp.disabled, disabledEndpoint) + // if the next index is the one to disable, update it to the next one + if w3pp.nextIndex == index { + w3pp.nextIndex++ + } + // if there are no available endpoints, reset all the disabled ones to + // available ones and reset the next index to the first one + if l := len(w3pp.available); l == 0 { + // reset the next index and move the disabled endpoints to the available + w3pp.nextIndex = 0 + w3pp.available = append(w3pp.available, w3pp.disabled...) + w3pp.disabled = make([]*Web3Endpoint, 0) + } + // if the next index is out of bounds, reset it to the first one + if w3pp.nextIndex >= len(w3pp.available) { + w3pp.nextIndex = 0 + } +} diff --git a/scanner/providers/web3/web3_pool.go b/scanner/providers/web3/web3_pool.go new file mode 100644 index 00000000..afdc060a --- /dev/null +++ b/scanner/providers/web3/web3_pool.go @@ -0,0 +1,212 @@ +package web3 + +// This package contains the Web3Pool struct, which is a pool of Web3Endpoint +// instances. It allows to add, remove and get endpoints, as well as to get the +// chainID by short name. It also provides an implementation of the +// bind.ContractBackend interface for a web3 pool with an specific chainID. +// It allows to interact with the blockchain using the methods provided by the +// interface balancing the load between the available endpoints in the pool for +// every chainID. +// The pool balances the load between the available endpoints in the pool for +// every chainID, allowing to use the endpoints concurrently and switch between +// them flagging them as available if they fail to keep the pool healthy. If +// every endpoint fails for a chainID, the pool resets the available flag for +// all the endpoints and starts again. + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + + "github.com/ethereum/go-ethereum/ethclient" +) + +// Web3Pool struct contains a map of chainID-[]*Web3Endpoint, where +// the key is the chainID and the value is a list of Web3Endpoint. It also +// contains a list of all the Web3Endpoint metadata. It provides methods to +// add, remove and get endpoints, as well as to get the chainID by short name. +// It allows to support multiple endpoints for the same chainID and switch +// between them looking for the available one. +type Web3Pool struct { + endpoints map[uint64]*Web3Iterator + metadata []*Web3Endpoint +} + +// NewWeb3Pool method returns a new *Web3Pool instance, initialized +// with the metadata from the external source. It returns an error if the metadata +// cannot be retrieved or decoded. +func NewWeb3Pool() (*Web3Pool, error) { + // get chains information from external source + res, err := http.Get(shortNameSourceUri) + if err != nil { + return nil, fmt.Errorf("error getting chains information from external source: %v", err) + } + chainsData := []*Web3Endpoint{} + if err := json.NewDecoder(res.Body).Decode(&chainsData); err != nil { + return nil, fmt.Errorf("error decoding chains information from external source: %v", err) + } + return &Web3Pool{ + endpoints: make(map[uint64]*Web3Iterator), + metadata: chainsData, + }, nil +} + +// AddEndpoint method adds a new web3 provider URI to the *Web3Pool +// instance. It returns an error if the chain metadata is not found or if the +// web3 client cannot be initialized. +func (nm *Web3Pool) AddEndpoint(uri string) error { + ctx, cancel := context.WithTimeout(context.Background(), checkWeb3EndpointsTimeout) + defer cancel() + // init the web3 client + client, err := connect(ctx, uri) + if err != nil { + return fmt.Errorf("error dialing web3 provider uri '%s': %w", uri, err) + } + // get the chainID from the web3 endpoint + bChainID, err := client.ChainID(ctx) + if err != nil { + return fmt.Errorf("error getting the chainID from the web3 provider '%s': %w", uri, err) + } + chainID := bChainID.Uint64() + // get chain name and the shortName + var name, shortName string + for _, info := range nm.metadata { + if info.ChainID == chainID { + name = info.Name + shortName = info.ShortName + break + } + } + // check if the chain metadata was found, if not, return an error + if name == "" || shortName == "" { + return fmt.Errorf("no chain metadata found for chainID %d", chainID) + } + // add the endpoint to the pool + endpoint := &Web3Endpoint{ + ChainID: chainID, + Name: name, + ShortName: shortName, + URI: uri, + client: client, + } + if _, ok := nm.endpoints[chainID]; !ok { + nm.endpoints[chainID] = NewWeb3Iterator(endpoint) + } else { + nm.endpoints[chainID].Add(endpoint) + } + return nil +} + +// DelEndpoint method removes a web3 provider URI from the *Web3Pool +// instance. It closes the client and removes the endpoint from the list of +// endpoints for the chainID where it was found. +func (nm *Web3Pool) DelEndoint(uri string) { + for _, endpoints := range nm.endpoints { + endpoints.Disable(uri) + } +} + +// Endpoint method returns the Web3Endpoint configured for the chainID +// provided. It returns the first available endpoint. If no available endpoint +// is found, returns an error. +func (nm *Web3Pool) Endpoint(chainID uint64) (*Web3Endpoint, error) { + return nm.endpoints[chainID].Next() +} + +// DisableEndpoint method sets the available flag to false for the URI provided +// in the chainID provided. +func (nm *Web3Pool) DisableEndpoint(chainID uint64, uri string) { + if endpoints, ok := nm.endpoints[chainID]; ok { + endpoints.Disable(uri) + } +} + +// Client method returns a new *Client instance for the chainID provided. +// It returns an error if the endpoint is not found. +func (nm *Web3Pool) Client(chainID uint64) (*Client, error) { + if _, err := nm.Endpoint(chainID); err != nil { + return nil, fmt.Errorf("error getting endpoint for chainID %d: %w", chainID, err) + } + return &Client{w3p: nm, chainID: chainID}, nil +} + +// ChainAddress method returns a prefixed string of the hex address provided, +// with the short name of the networkEndpoint identified by the chain id provided. +// Read more here: https://eips.ethereum.org/EIPS/eip-3770 +func (nps *Web3Pool) ChainAddress(chainID uint64, hexAddress string) (string, bool) { + for _, data := range nps.metadata { + if data.ChainID == chainID { + return fmt.Sprintf("%s:%s", data.ShortName, hexAddress), true + } + } + return "", false +} + +// String method returns a string representation of the *Web3Pool list. +func (nm *Web3Pool) String() string { + shortNames := map[string]bool{} + for chainID := range nm.endpoints { + for _, data := range nm.metadata { + if data.ChainID == chainID { + shortNames[data.ShortName] = true + } + } + } + var shortNamesSlice []string + for shortName := range shortNames { + shortNamesSlice = append(shortNamesSlice, shortName) + } + return fmt.Sprintf("%v", shortNamesSlice) +} + +// CurrentBlockNumbers method returns a map of uint64-uint64, where the key is +// the chainID and the value is the current block number of the network. +func (nm *Web3Pool) CurrentBlockNumbers(ctx context.Context) (map[uint64]uint64, error) { + blockNumbers := make(map[uint64]uint64) + for chainID := range nm.endpoints { + cli, err := nm.Endpoint(chainID) + if err != nil { + return nil, fmt.Errorf("error getting endpoint for chainID %d: %w", chainID, err) + } + blockNumber, err := cli.client.BlockNumber(ctx) + if err != nil { + return nil, fmt.Errorf("error getting block number for chainID %d: %w", chainID, err) + } + blockNumbers[chainID] = blockNumber + } + return blockNumbers, nil +} + +// SupportedNetworks method returns a list of all the supported Web3Endpoint +// metadata. It returns the chainID, name and shortName of unique supported +// chains. +func (nm *Web3Pool) SupportedNetworks() []*Web3Endpoint { + var supported []*Web3Endpoint + for chainID := range nm.endpoints { + for _, data := range nm.metadata { + if data.ChainID == chainID { + supported = append(supported, &Web3Endpoint{ + ChainID: chainID, + Name: data.Name, + ShortName: data.ShortName, + }) + break + } + } + } + return supported +} + +// connect method returns a new *ethclient.Client instance for the URI provided. +// It retries to connect to the web3 provider if it fails, up to the +// DefaultMaxWeb3ClientRetries times. +func connect(ctx context.Context, uri string) (client *ethclient.Client, err error) { + for i := 0; i < DefaultMaxWeb3ClientRetries; i++ { + if client, err = ethclient.DialContext(ctx, uri); err != nil { + continue + } + return + } + return nil, fmt.Errorf("error dialing web3 provider uri '%s': %w", uri, err) +} diff --git a/scanner/providers/web3/web3_provider.go b/scanner/providers/web3/web3_provider.go index 99115db7..535f4335 100644 --- a/scanner/providers/web3/web3_provider.go +++ b/scanner/providers/web3/web3_provider.go @@ -11,7 +11,6 @@ import ( "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/ethclient" "github.com/vocdoni/census3/scanner/providers" "go.vocdoni.io/dvote/db" "go.vocdoni.io/dvote/log" @@ -25,20 +24,24 @@ type Web3ProviderRef struct { type Web3ProviderConfig struct { Web3ProviderRef - Endpoints NetworkEndpoints + Endpoints *Web3Pool DB *db.Database } // creationBlock function returns the block number of the creation of a contract // address. It uses the `eth_getCode` method to get the contract code at the // block number provided. If the method is not supported, it returns 0 and nil. -func creationBlock(client *ethclient.Client, ctx context.Context, addr common.Address) (uint64, error) { +func creationBlock(client *Client, ctx context.Context, addr common.Address) (uint64, error) { // check if the current client supports `eth_getCode` method, if not, return // 1 and nil. It is assumed that the contract is created at block 1 to start // scanning from the first block. + ethClient, err := client.EthClient() + if err != nil { + return 0, err + } getCodeSupport := false for i := 0; i < DefaultMaxWeb3ClientRetries; i++ { - if getCodeSupport = providers.ClientSupportsGetCode(ctx, client, addr); getCodeSupport { + if getCodeSupport = providers.ClientSupportsGetCode(ctx, ethClient, addr); getCodeSupport { break } time.Sleep(RetryWeb3Cooldown) @@ -47,7 +50,6 @@ func creationBlock(client *ethclient.Client, ctx context.Context, addr common.Ad return 1, nil } // get the latest block number - var err error var lastBlock uint64 for i := 0; i < DefaultMaxWeb3ClientRetries; i++ { lastBlock, err = client.BlockNumber(ctx) @@ -72,7 +74,7 @@ func creationBlock(client *ethclient.Client, ctx context.Context, addr common.Ad // creationBlockInRange function finds the block number of a contract between // the bounds provided as start and end blocks. -func creationBlockInRange(client *ethclient.Client, ctx context.Context, addr common.Address, start, end uint64) (uint64, error) { +func creationBlockInRange(client *Client, ctx context.Context, addr common.Address, start, end uint64) (uint64, error) { // if both block numbers are equal, return its value as birthblock if start == end { return start, nil @@ -81,7 +83,8 @@ func creationBlockInRange(client *ethclient.Client, ctx context.Context, addr co // code at this block midBlock := (start + end) / 2 codeLen, err := sourceCodeLenAt(client, ctx, addr, midBlock) - if err != nil && !strings.Contains(err.Error(), fmt.Sprintf("No state available for block %d", midBlock)) { + if err != nil && !strings.Contains(err.Error(), fmt.Sprintf("No state available for block %d", midBlock)) && + !strings.Contains(err.Error(), "missing trie node") { return 0, err } // if any code is found, keep trying with the lower half of blocks until @@ -95,7 +98,7 @@ func creationBlockInRange(client *ethclient.Client, ctx context.Context, addr co // SourceCodeLenAt function returns the length of the current contract bytecode // at the block number provided. -func sourceCodeLenAt(client *ethclient.Client, ctx context.Context, addr common.Address, atBlockNumber uint64) (int, error) { +func sourceCodeLenAt(client *Client, ctx context.Context, addr common.Address, atBlockNumber uint64) (int, error) { blockNumber := new(big.Int).SetUint64(atBlockNumber) sourceCode, err := client.CodeAt(ctx, addr, blockNumber) return len(sourceCode), err @@ -105,7 +108,7 @@ func sourceCodeLenAt(client *ethclient.Client, ctx context.Context, addr common. // provided block numbers. It returns the logs, the last block scanned and an // error if any. It filters the logs by the topic hash and for the token // contract address provided. -func RangeOfLogs(ctx context.Context, client *ethclient.Client, addr common.Address, +func RangeOfLogs(ctx context.Context, client *Client, addr common.Address, fromBlock, lastBlock uint64, hexTopics ...string, ) ([]types.Log, uint64, bool, error) { // if the range is too big, scan only a part of it using the constant diff --git a/scanner/scanner.go b/scanner/scanner.go index b94ec525..8b4356ac 100644 --- a/scanner/scanner.go +++ b/scanner/scanner.go @@ -9,13 +9,14 @@ import ( "sort" "strings" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" "github.com/vocdoni/census3/db" "github.com/vocdoni/census3/db/annotations" queries "github.com/vocdoni/census3/db/sqlc" - "github.com/vocdoni/census3/scanner/providers" + "github.com/vocdoni/census3/scanner/providers/manager" "github.com/vocdoni/census3/scanner/providers/web3" "go.vocdoni.io/dvote/log" ) @@ -39,12 +40,12 @@ type ScannerToken struct { // holders of the tokens. It has a cool down time between iterations to avoid // overloading the providers. type Scanner struct { - ctx context.Context - cancel context.CancelFunc - db *db.DB - networks web3.NetworkEndpoints - providers map[uint64]providers.HolderProvider - coolDown time.Duration + ctx context.Context + cancel context.CancelFunc + db *db.DB + networks *web3.Web3Pool + providerManager *manager.ProviderManager + coolDown time.Duration tokens []*ScannerToken tokensMtx sync.Mutex @@ -55,11 +56,11 @@ type Scanner struct { // NewScanner returns a new scanner instance with the required parameters // initialized. -func NewScanner(db *db.DB, networks web3.NetworkEndpoints, coolDown time.Duration) *Scanner { +func NewScanner(db *db.DB, networks *web3.Web3Pool, pm *manager.ProviderManager, coolDown time.Duration) *Scanner { return &Scanner{ db: db, networks: networks, - providers: make(map[uint64]providers.HolderProvider), + providerManager: pm, coolDown: coolDown, tokens: []*ScannerToken{}, tokensMtx: sync.Mutex{}, @@ -69,41 +70,13 @@ func NewScanner(db *db.DB, networks web3.NetworkEndpoints, coolDown time.Duratio } } -// SetProviders sets the providers that the scanner will use to get the holders -// of the tokens. It also creates the token types in the database if they do not -// exist. It returns an error something goes wrong creating the token types in -// the database. -func (s *Scanner) SetProviders(newProviders ...providers.HolderProvider) error { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - // create a tx to use it in the following queries - for _, provider := range newProviders { - // try to create the token type in the database, if it already exists, - // update its name to ensure that it is correct according to the type id - if _, err := s.db.QueriesRW.CreateTokenType(ctx, queries.CreateTokenTypeParams{ - ID: provider.Type(), - TypeName: provider.TypeName(), - }); err != nil { - if !strings.Contains(err.Error(), "UNIQUE constraint failed") { - return err - } - if _, err := s.db.QueriesRW.UpdateTokenType(ctx, queries.UpdateTokenTypeParams{ - ID: provider.Type(), - TypeName: provider.TypeName(), - }); err != nil { - return err - } - } - // include the provider in the scanner - s.providers[provider.Type()] = provider - } - return nil -} - // Start starts the scanner. It starts a loop that scans the tokens in the // database and saves the holders in the database. It stops when the context is // cancelled. -func (s *Scanner) Start(ctx context.Context) { +func (s *Scanner) Start(ctx context.Context, concurrentTokens int) { + if concurrentTokens < 1 { + concurrentTokens = 1 + } s.ctx, s.cancel = context.WithCancel(ctx) itCounter := 0 // keep the latest block numbers updated @@ -127,50 +100,64 @@ func (s *Scanner) Start(ctx context.Context) { log.Error(err) continue } + // calculate number of batches + sem := make(chan struct{}, concurrentTokens) + defer close(sem) // iterate over the tokens to scan - atSyncGlobal := true + var atSyncGlobal atomic.Bool + atSyncGlobal.Store(true) for _, token := range tokens { - log.Infow("scanning token", - "address", token.Address.Hex(), - "chainID", token.ChainID, - "externalID", token.ExternalID, - "lastBlock", token.LastBlock, - "ready", token.Ready) - // scan the token - holders, newTransfers, lastBlock, synced, totalSupply, err := s.ScanHolders(ctx, token) - if err != nil { - atSyncGlobal = false - if errors.Is(err, context.Canceled) { - log.Info("scanner context cancelled, shutting down") + // get the semaphore + sem <- struct{}{} + go func(token ScannerToken) { + // release the semaphore when the goroutine finishes + defer func() { + <-sem + }() + log.Infow("scanning token", + "address", token.Address.Hex(), + "chainID", token.ChainID, + "externalID", token.ExternalID, + "lastBlock", token.LastBlock, + "ready", token.Ready) + // scan the token + holders, newTransfers, lastBlock, synced, totalSupply, err := s.ScanHolders(ctx, token) + if err != nil { + atSyncGlobal.Store(false) + if errors.Is(err, context.Canceled) { + log.Info("scanner context cancelled, shutting down") + return + } + log.Error(err) return } - log.Error(err) - continue - } - if !synced { - atSyncGlobal = false - } - // save the token holders in the database in a goroutine and - // continue with the next token - s.waiter.Add(1) - go func(t *ScannerToken, h map[common.Address]*big.Int, n, lb uint64, sy bool, ts *big.Int) { - defer s.waiter.Done() - if err = s.SaveHolders(ctx, t, h, n, lb, sy, ts); err != nil { + if !synced { + atSyncGlobal.Store(false) + } + // save the new token holders + if err = s.SaveHolders(ctx, token, holders, newTransfers, lastBlock, synced, totalSupply); err != nil { + if strings.Contains(err.Error(), "database is closed") { + return + } log.Warnw("error saving tokenholders", - "address", t.Address.Hex(), - "chainID", t.ChainID, - "externalID", t.ExternalID, + "address", token.Address.Hex(), + "chainID", token.ChainID, + "externalID", token.ExternalID, "error", err) } - }(token, holders, newTransfers, lastBlock, synced, totalSupply) + }(*token) + } + // wait for all the tokens to be scanned + for i := 0; i < concurrentTokens; i++ { + sem <- struct{}{} } log.Infow("scan iteration finished", "iteration", itCounter, "duration", time.Since(startTime).Seconds(), - "atSync", atSyncGlobal) + "atSync", atSyncGlobal.Load()) // if all the tokens are synced, sleep the cool down time, else, // sleep the scan sleep time - if atSyncGlobal { + if atSyncGlobal.Load() { time.Sleep(s.coolDown) } else { time.Sleep(scanSleepTime) @@ -183,11 +170,6 @@ func (s *Scanner) Start(ctx context.Context) { // finish. It also closes the providers. func (s *Scanner) Stop() { s.cancel() - for _, provider := range s.providers { - if err := provider.Close(); err != nil { - log.Error(err) - } - } s.waiter.Wait() } @@ -309,15 +291,16 @@ func (s *Scanner) TokensToScan(ctx context.Context) ([]*ScannerToken, error) { // from the database, set them into the provider and get the new ones. It // returns the new holders, the last block scanned and if the token is synced // after the scan. -func (s *Scanner) ScanHolders(ctx context.Context, token *ScannerToken) ( +func (s *Scanner) ScanHolders(ctx context.Context, token ScannerToken) ( map[common.Address]*big.Int, uint64, uint64, bool, *big.Int, error, ) { internalCtx, cancel := context.WithTimeout(ctx, SCAN_TIMEOUT) defer cancel() - // get the correct token holder for the current token - provider, exists := s.providers[token.Type] - if !exists { - return nil, 0, token.LastBlock, token.Synced, nil, fmt.Errorf("token type %d not supported", token.Type) + // get the correct token holder provider for the current token + provider, err := s.providerManager.GetProvider(token.Type) + if err != nil { + return nil, 0, token.LastBlock, token.Synced, nil, + fmt.Errorf("token type %d not supported: %w", token.Type, err) } // create a tx to use it in the following queries tx, err := s.db.RW.BeginTx(internalCtx, nil) @@ -422,7 +405,7 @@ func (s *Scanner) ScanHolders(ctx context.Context, token *ScannerToken) ( // holders with negative balances. // 2. To get the correct balances you must use the contract methods to get // the balances of the holders. -func (s *Scanner) SaveHolders(ctx context.Context, token *ScannerToken, +func (s *Scanner) SaveHolders(ctx context.Context, token ScannerToken, holders map[common.Address]*big.Int, newTransfers, lastBlock uint64, synced bool, totalSupply *big.Int, ) error {