From 5a350708c6e2cf1dbc11aac7ebfb13fda5b96daf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20Men=C3=A9ndez?= Date: Thu, 11 Apr 2024 11:09:58 +0200 Subject: [PATCH] token holder scan parallelized, holder providers simplifyed, fixing GetEndpoint web3Pool --- cmd/census3/main.go | 93 +++++------------ scanner/providers/manager/manager.go | 93 +++++++++++++++++ scanner/providers/web3/web3_pool.go | 76 ++++++++------ scanner/scanner.go | 149 ++++++++++++--------------- 4 files changed, 227 insertions(+), 184 deletions(-) create mode 100644 scanner/providers/manager/manager.go diff --git a/cmd/census3/main.go b/cmd/census3/main.go index 709fd311..df824ff4 100644 --- a/cmd/census3/main.go +++ b/cmd/census3/main.go @@ -17,10 +17,10 @@ import ( "github.com/vocdoni/census3/db" "github.com/vocdoni/census3/internal" "github.com/vocdoni/census3/scanner" - "github.com/vocdoni/census3/scanner/providers" "github.com/vocdoni/census3/scanner/providers/farcaster" "github.com/vocdoni/census3/scanner/providers/gitcoin" gitcoinDB "github.com/vocdoni/census3/scanner/providers/gitcoin/db" + "github.com/vocdoni/census3/scanner/providers/manager" "github.com/vocdoni/census3/scanner/providers/poap" "github.com/vocdoni/census3/scanner/providers/web3" "go.vocdoni.io/dvote/log" @@ -33,6 +33,7 @@ type Census3Config struct { poapAPIEndpoint, poapAuthToken string gitcoinEndpoint string gitcoinCooldown time.Duration + scannerConcurrentTokens int scannerCoolDown time.Duration adminToken string initialTokens string @@ -60,6 +61,7 @@ func main() { var strWeb3Providers string flag.StringVar(&strWeb3Providers, "web3Providers", "", "the list of URL's of available web3 providers") flag.DurationVar(&config.scannerCoolDown, "scannerCoolDown", 120*time.Second, "the time to wait before next scanner iteration") + flag.IntVar(&config.scannerConcurrentTokens, "scannerConcurrentTokens", 5, "the number of tokens to scan concurrently") flag.StringVar(&config.adminToken, "adminToken", "", "the admin UUID token for the API") flag.StringVar(&config.initialTokens, "initialTokens", "", "path of the initial tokens json file") flag.BoolVar(&config.farcaster, "farcaster", false, "enables farcaster support") @@ -112,6 +114,10 @@ func main() { panic(err) } config.listOfWeb3Providers = strings.Split(pviper.GetString("web3Providers"), ",") + if err := pviper.BindPFlag("scannerConcurrentTokens", flag.Lookup("scannerConcurrentTokens")); err != nil { + panic(err) + } + config.scannerConcurrentTokens = pviper.GetInt("scannerConcurrentTokens") if err := pviper.BindPFlag("scannerCoolDown", flag.Lookup("scannerCoolDown")); err != nil { panic(err) } @@ -149,75 +155,31 @@ func main() { if err != nil { log.Fatal(err) } - - // start the holder scanner with the database and the providers - hc := scanner.NewScanner(database, w3p, config.scannerCoolDown) - + // init the provider manager + pm := manager.NewProviderManager() // init the web3 token providers - erc20Provider := new(web3.ERC20HolderProvider) - if err := erc20Provider.Init(web3.Web3ProviderConfig{Endpoints: w3p}); err != nil { - log.Fatal(err) - return - } - erc721Provider := new(web3.ERC721HolderProvider) - if err := erc721Provider.Init(web3.Web3ProviderConfig{Endpoints: w3p}); err != nil { - log.Fatal(err) - return - } - erc777Provider := new(web3.ERC777HolderProvider) - if err := erc777Provider.Init(web3.Web3ProviderConfig{Endpoints: w3p}); err != nil { - log.Fatal(err) - return - } - - // set the providers in the scanner and the API - if err := hc.SetProviders(erc20Provider, erc721Provider, erc777Provider); err != nil { - log.Fatal(err) - return - } - apiProviders := map[uint64]providers.HolderProvider{ - erc20Provider.Type(): erc20Provider, - erc721Provider.Type(): erc721Provider, - erc777Provider.Type(): erc777Provider, - } + web3ProviderConf := web3.Web3ProviderConfig{Endpoints: w3p} + pm.AddProvider(new(web3.ERC20HolderProvider).Type(), web3ProviderConf) + pm.AddProvider(new(web3.ERC721HolderProvider).Type(), web3ProviderConf) + pm.AddProvider(new(web3.ERC777HolderProvider).Type(), web3ProviderConf) // init POAP external provider if config.poapAPIEndpoint != "" { - poapProvider := new(poap.POAPHolderProvider) - if err := poapProvider.Init(poap.POAPConfig{ + pm.AddProvider(new(poap.POAPHolderProvider).Type(), poap.POAPConfig{ APIEndpoint: config.poapAPIEndpoint, AccessToken: config.poapAuthToken, - }); err != nil { - log.Fatal(err) - return - } - if err := hc.SetProviders(poapProvider); err != nil { - log.Fatal(err) - return - } - apiProviders[poapProvider.Type()] = poapProvider + }) } if config.gitcoinEndpoint != "" { gitcoinDatabase, err := gitcoinDB.Init(config.dataDir, "gitcoinpassport.sql") if err != nil { log.Fatal(err) } - // init Gitcoin external provider - gitcoinProvider := new(gitcoin.GitcoinPassport) - if err := gitcoinProvider.Init(gitcoin.GitcoinPassportConf{ + pm.AddProvider(new(gitcoin.GitcoinPassport).Type(), gitcoin.GitcoinPassportConf{ APIEndpoint: config.gitcoinEndpoint, Cooldown: config.gitcoinCooldown, DB: gitcoinDatabase, - }); err != nil { - log.Fatal(err) - return - } - if err := hc.SetProviders(gitcoinProvider); err != nil { - log.Fatal(err) - return - } - apiProviders[gitcoinProvider.Type()] = gitcoinProvider + }) } - // if farcaster is enabled, init the farcaster database and the provider var farcasterDB *farcaster.DB if config.farcaster { @@ -226,21 +188,13 @@ func main() { if err != nil { log.Fatal(err) } - farcasterProvider := new(farcaster.FarcasterProvider) - if err := farcasterProvider.Init(farcaster.FarcasterProviderConf{ + pm.AddProvider(new(farcaster.FarcasterProvider).Type(), farcaster.FarcasterProviderConf{ Endpoints: w3p, DB: farcasterDB, - }); err != nil { - log.Fatal(err) - return - } - if err := hc.SetProviders(farcasterProvider); err != nil { - log.Fatal(err) - return - } - apiProviders[farcasterProvider.Type()] = farcasterProvider + }) } - + // start the holder scanner with the database and the provider manager + hc := scanner.NewScanner(database, w3p, pm, config.scannerCoolDown) // if the admin token is not defined, generate a random one if config.adminToken != "" { if _, err := uuid.Parse(config.adminToken); err != nil { @@ -259,7 +213,7 @@ func main() { DataDir: config.dataDir, Web3Providers: w3p, GroupKey: config.connectKey, - HolderProviders: apiProviders, + HolderProviders: pm.Providers(), AdminToken: config.adminToken, }) if err != nil { @@ -272,7 +226,8 @@ func main() { } log.Info("initial tokens created, or at least tried to") }() - go hc.Start(ctx) + // start the holder scanner + go hc.Start(ctx, config.scannerConcurrentTokens) metrics.NewCounter(fmt.Sprintf("census3_info{version=%q,chains=%q}", internal.Version, w3p.String())).Set(1) diff --git a/scanner/providers/manager/manager.go b/scanner/providers/manager/manager.go new file mode 100644 index 00000000..85a95d57 --- /dev/null +++ b/scanner/providers/manager/manager.go @@ -0,0 +1,93 @@ +package manager + +// package manager provides a manager for providers of different types +// and a way to add and get them by type concurrently safe. It initializes a new +// provider based on the type and the configuration provided every time that a +// provider is requested to avoid data races. It also provides a way to get all +// the provider types and all the providers initialized at once. + +import ( + "fmt" + "sync" + + "github.com/vocdoni/census3/scanner/providers" + "github.com/vocdoni/census3/scanner/providers/farcaster" + "github.com/vocdoni/census3/scanner/providers/gitcoin" + "github.com/vocdoni/census3/scanner/providers/poap" + "github.com/vocdoni/census3/scanner/providers/web3" +) + +type ProviderManager struct { + confs sync.Map +} + +// NewProviderManager creates a new provider manager +func NewProviderManager() *ProviderManager { + return &ProviderManager{} +} + +// AddProvider adds a new provider configuration to the manager assigned to the +// specific type provided +func (m *ProviderManager) AddProvider(providerType uint64, conf any) { + m.confs.Store(providerType, conf) +} + +// GetProvider returns a provider based on the type provided. It initializes the +// provider based on the configuration stored in the manager. It initializes a +// new provider every time to avoid data races. It returns an error if the +// provider type is not found or if the provider cannot be initialized. +func (m *ProviderManager) GetProvider(providerType uint64) (providers.HolderProvider, error) { + // load the configuration for the provider type + conf, ok := m.confs.Load(providerType) + if !ok { + return nil, fmt.Errorf("provider type %d not found", providerType) + } + // initialize the provider based on the type + var provider providers.HolderProvider + switch providerType { + case providers.CONTRACT_TYPE_ERC20: + provider = &web3.ERC20HolderProvider{} + case providers.CONTRACT_TYPE_ERC721: + provider = &web3.ERC721HolderProvider{} + case providers.CONTRACT_TYPE_ERC777: + provider = &web3.ERC777HolderProvider{} + case providers.CONTRACT_TYPE_POAP: + provider = &poap.POAPHolderProvider{} + case providers.CONTRACT_TYPE_GITCOIN: + provider = &gitcoin.GitcoinPassport{} + case providers.CONTRACT_TYPE_FARCASTER: + provider = &farcaster.FarcasterProvider{} + default: + return nil, fmt.Errorf("provider type %d not found", providerType) + } + // initialize the provider with the specific configuration + if err := provider.Init(conf); err != nil { + return nil, err + } + return provider, nil +} + +// GetProviderTypes returns all the provider types stored in the manager as a +// slice of uint64. +func (m *ProviderManager) GetProviderTypes() []uint64 { + types := []uint64{} + m.confs.Range(func(t, _ any) bool { + types = append(types, t.(uint64)) + return true + }) + return types +} + +// Providers returns all the providers stored in the manager associated to their +// types as a map of uint64 to HolderProvider. +func (m *ProviderManager) Providers() map[uint64]providers.HolderProvider { + providers := make(map[uint64]providers.HolderProvider) + for _, t := range m.GetProviderTypes() { + provider, err := m.GetProvider(t) + if err != nil { + panic(err) + } + providers[t] = provider + } + return providers +} diff --git a/scanner/providers/web3/web3_pool.go b/scanner/providers/web3/web3_pool.go index 2e3e595b..774fb225 100644 --- a/scanner/providers/web3/web3_pool.go +++ b/scanner/providers/web3/web3_pool.go @@ -153,55 +153,67 @@ func (nm *Web3Pool) DelEndoint(uri string) { // is found, it resets the available flag for all, resets the next available to // the first one and returns it. func (nm *Web3Pool) GetEndpoint(chainID uint64) (*Web3Endpoint, bool) { + // Cases: + // - is there any available endpoint for the chainID? + // - yes, continue + // - no, reset the available flag for all the endpoints, return the first + // one and set the second one as the next available (if there is one) + // - do the next available endpoint exists? + // - yes, continue + // - no, return the first one and set the second one as the next + // available (if there is one) + // - update the next available endpoint to the next one + // - is the current endpoint available? + // - yes, return it + // - no, start again nm.endpointsMtx.RLock() defer nm.endpointsMtx.RUnlock() - next, ok := nm.nextAvailable.Load(chainID) + // check if there is any available endpoint for the chainID + unavailable, ok := nm.unavailable.Load(chainID) + if ok && len(unavailable.([]int)) == len(nm.endpoints[chainID]) { + // if all the endpoints are unavailable, reset the available flag for + // all the endpoints, set the second one as the next available (if + // there) and return the first one + nm.unavailable.Delete(chainID) + if len(nm.endpoints[chainID]) > 1 { + nm.nextAvailable.Store(chainID, 1) + } else { + nm.nextAvailable.Store(chainID, 0) + } + return nm.endpoints[chainID][0], true + } + // get the next available endpoint for the chainID + currentEndpointIdx, ok := nm.nextAvailable.Load(chainID) if !ok { - if _, ok := nm.endpoints[chainID]; !ok { + // if there is no next available endpoint, set the second one as the next + // available (if there is one) and return the first one, if there is no + // endpoint, return false + if len(nm.endpoints[chainID]) == 0 { return nil, false } - endpoint := nm.endpoints[chainID][0] - if endpoint == nil { - return nil, false + if len(nm.endpoints[chainID]) > 1 { + nm.nextAvailable.Store(chainID, 1) + } else { + nm.nextAvailable.Store(chainID, 0) } - // if no available endpoint is found, set all the endpoints as available - // and return the first one - nm.unavailable.Delete(chainID) - nm.nextAvailable.Store(chainID, 0) return nm.endpoints[chainID][0], true } - endpointIdx, ok := next.(int) - if !ok { - return nil, false - } - // use the next available endpoint with the following endpoint for chainID - // if there are no more endpoints, use the first one as the next available - if _, ok := nm.endpoints[chainID]; !ok { - nm.nextAvailable.Delete(chainID) - return nil, false - } - endpoint := nm.endpoints[chainID][endpointIdx] - if endpoint == nil { - nm.nextAvailable.Delete(chainID) - return nil, false - } - // if the endpoint is available, set the next available to the next one - nextAvailable := endpointIdx + 1 + // update the next available endpoint to the next one + nextAvailable := currentEndpointIdx.(int) + 1 if nextAvailable >= len(nm.endpoints[chainID]) { nextAvailable = 0 } nm.nextAvailable.Store(chainID, nextAvailable) - // if the endpoint is not available, return call the method again to get the - // next available endpoint - if unavailable, ok := nm.unavailable.Load(chainID); ok { + // check if the current endpoint is available + if unavailable != nil { for _, unavailableIdx := range unavailable.([]int) { - if unavailableIdx == endpointIdx { + if unavailableIdx == currentEndpointIdx.(int) { return nm.GetEndpoint(chainID) } } } - // if it is available, return it - return endpoint, true + // return the current endpoint + return nm.endpoints[chainID][currentEndpointIdx.(int)], true } // DisableEndpoint method sets the available flag to false for the URI provided diff --git a/scanner/scanner.go b/scanner/scanner.go index 1453e9a0..8b4356ac 100644 --- a/scanner/scanner.go +++ b/scanner/scanner.go @@ -9,13 +9,14 @@ import ( "sort" "strings" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" "github.com/vocdoni/census3/db" "github.com/vocdoni/census3/db/annotations" queries "github.com/vocdoni/census3/db/sqlc" - "github.com/vocdoni/census3/scanner/providers" + "github.com/vocdoni/census3/scanner/providers/manager" "github.com/vocdoni/census3/scanner/providers/web3" "go.vocdoni.io/dvote/log" ) @@ -39,12 +40,12 @@ type ScannerToken struct { // holders of the tokens. It has a cool down time between iterations to avoid // overloading the providers. type Scanner struct { - ctx context.Context - cancel context.CancelFunc - db *db.DB - networks *web3.Web3Pool - providers map[uint64]providers.HolderProvider - coolDown time.Duration + ctx context.Context + cancel context.CancelFunc + db *db.DB + networks *web3.Web3Pool + providerManager *manager.ProviderManager + coolDown time.Duration tokens []*ScannerToken tokensMtx sync.Mutex @@ -55,11 +56,11 @@ type Scanner struct { // NewScanner returns a new scanner instance with the required parameters // initialized. -func NewScanner(db *db.DB, networks *web3.Web3Pool, coolDown time.Duration) *Scanner { +func NewScanner(db *db.DB, networks *web3.Web3Pool, pm *manager.ProviderManager, coolDown time.Duration) *Scanner { return &Scanner{ db: db, networks: networks, - providers: make(map[uint64]providers.HolderProvider), + providerManager: pm, coolDown: coolDown, tokens: []*ScannerToken{}, tokensMtx: sync.Mutex{}, @@ -69,41 +70,13 @@ func NewScanner(db *db.DB, networks *web3.Web3Pool, coolDown time.Duration) *Sca } } -// SetProviders sets the providers that the scanner will use to get the holders -// of the tokens. It also creates the token types in the database if they do not -// exist. It returns an error something goes wrong creating the token types in -// the database. -func (s *Scanner) SetProviders(newProviders ...providers.HolderProvider) error { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - // create a tx to use it in the following queries - for _, provider := range newProviders { - // try to create the token type in the database, if it already exists, - // update its name to ensure that it is correct according to the type id - if _, err := s.db.QueriesRW.CreateTokenType(ctx, queries.CreateTokenTypeParams{ - ID: provider.Type(), - TypeName: provider.TypeName(), - }); err != nil { - if !strings.Contains(err.Error(), "UNIQUE constraint failed") { - return err - } - if _, err := s.db.QueriesRW.UpdateTokenType(ctx, queries.UpdateTokenTypeParams{ - ID: provider.Type(), - TypeName: provider.TypeName(), - }); err != nil { - return err - } - } - // include the provider in the scanner - s.providers[provider.Type()] = provider - } - return nil -} - // Start starts the scanner. It starts a loop that scans the tokens in the // database and saves the holders in the database. It stops when the context is // cancelled. -func (s *Scanner) Start(ctx context.Context) { +func (s *Scanner) Start(ctx context.Context, concurrentTokens int) { + if concurrentTokens < 1 { + concurrentTokens = 1 + } s.ctx, s.cancel = context.WithCancel(ctx) itCounter := 0 // keep the latest block numbers updated @@ -127,50 +100,64 @@ func (s *Scanner) Start(ctx context.Context) { log.Error(err) continue } + // calculate number of batches + sem := make(chan struct{}, concurrentTokens) + defer close(sem) // iterate over the tokens to scan - atSyncGlobal := true + var atSyncGlobal atomic.Bool + atSyncGlobal.Store(true) for _, token := range tokens { - log.Infow("scanning token", - "address", token.Address.Hex(), - "chainID", token.ChainID, - "externalID", token.ExternalID, - "lastBlock", token.LastBlock, - "ready", token.Ready) - // scan the token - holders, newTransfers, lastBlock, synced, totalSupply, err := s.ScanHolders(ctx, token) - if err != nil { - atSyncGlobal = false - if errors.Is(err, context.Canceled) { - log.Info("scanner context cancelled, shutting down") + // get the semaphore + sem <- struct{}{} + go func(token ScannerToken) { + // release the semaphore when the goroutine finishes + defer func() { + <-sem + }() + log.Infow("scanning token", + "address", token.Address.Hex(), + "chainID", token.ChainID, + "externalID", token.ExternalID, + "lastBlock", token.LastBlock, + "ready", token.Ready) + // scan the token + holders, newTransfers, lastBlock, synced, totalSupply, err := s.ScanHolders(ctx, token) + if err != nil { + atSyncGlobal.Store(false) + if errors.Is(err, context.Canceled) { + log.Info("scanner context cancelled, shutting down") + return + } + log.Error(err) return } - log.Error(err) - continue - } - if !synced { - atSyncGlobal = false - } - // save the token holders in the database in a goroutine and - // continue with the next token - s.waiter.Add(1) - go func(t *ScannerToken, h map[common.Address]*big.Int, n, lb uint64, sy bool, ts *big.Int) { - defer s.waiter.Done() - if err = s.SaveHolders(ctx, t, h, n, lb, sy, ts); err != nil { + if !synced { + atSyncGlobal.Store(false) + } + // save the new token holders + if err = s.SaveHolders(ctx, token, holders, newTransfers, lastBlock, synced, totalSupply); err != nil { + if strings.Contains(err.Error(), "database is closed") { + return + } log.Warnw("error saving tokenholders", - "address", t.Address.Hex(), - "chainID", t.ChainID, - "externalID", t.ExternalID, + "address", token.Address.Hex(), + "chainID", token.ChainID, + "externalID", token.ExternalID, "error", err) } - }(token, holders, newTransfers, lastBlock, synced, totalSupply) + }(*token) + } + // wait for all the tokens to be scanned + for i := 0; i < concurrentTokens; i++ { + sem <- struct{}{} } log.Infow("scan iteration finished", "iteration", itCounter, "duration", time.Since(startTime).Seconds(), - "atSync", atSyncGlobal) + "atSync", atSyncGlobal.Load()) // if all the tokens are synced, sleep the cool down time, else, // sleep the scan sleep time - if atSyncGlobal { + if atSyncGlobal.Load() { time.Sleep(s.coolDown) } else { time.Sleep(scanSleepTime) @@ -183,11 +170,6 @@ func (s *Scanner) Start(ctx context.Context) { // finish. It also closes the providers. func (s *Scanner) Stop() { s.cancel() - for _, provider := range s.providers { - if err := provider.Close(); err != nil { - log.Error(err) - } - } s.waiter.Wait() } @@ -309,15 +291,16 @@ func (s *Scanner) TokensToScan(ctx context.Context) ([]*ScannerToken, error) { // from the database, set them into the provider and get the new ones. It // returns the new holders, the last block scanned and if the token is synced // after the scan. -func (s *Scanner) ScanHolders(ctx context.Context, token *ScannerToken) ( +func (s *Scanner) ScanHolders(ctx context.Context, token ScannerToken) ( map[common.Address]*big.Int, uint64, uint64, bool, *big.Int, error, ) { internalCtx, cancel := context.WithTimeout(ctx, SCAN_TIMEOUT) defer cancel() - // get the correct token holder for the current token - provider, exists := s.providers[token.Type] - if !exists { - return nil, 0, token.LastBlock, token.Synced, nil, fmt.Errorf("token type %d not supported", token.Type) + // get the correct token holder provider for the current token + provider, err := s.providerManager.GetProvider(token.Type) + if err != nil { + return nil, 0, token.LastBlock, token.Synced, nil, + fmt.Errorf("token type %d not supported: %w", token.Type, err) } // create a tx to use it in the following queries tx, err := s.db.RW.BeginTx(internalCtx, nil) @@ -422,7 +405,7 @@ func (s *Scanner) ScanHolders(ctx context.Context, token *ScannerToken) ( // holders with negative balances. // 2. To get the correct balances you must use the contract methods to get // the balances of the holders. -func (s *Scanner) SaveHolders(ctx context.Context, token *ScannerToken, +func (s *Scanner) SaveHolders(ctx context.Context, token ScannerToken, holders map[common.Address]*big.Int, newTransfers, lastBlock uint64, synced bool, totalSupply *big.Int, ) error {