From 1b07463e9954d06584711e7657826bbac4e6583a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20Men=C3=A9ndez?= Date: Fri, 12 Apr 2024 11:07:31 +0200 Subject: [PATCH] including an atomic var to handle multiple concurrent gitcoin providers, including global context into the providers to handle shutdown gracefully --- cmd/census3/main.go | 2 +- scanner/providers/farcaster/provider.go | 6 +-- scanner/providers/gitcoin/gitcoin_provider.go | 41 +++++++++++++++---- .../gitcoin/gitcoin_provider_test.go | 4 +- scanner/providers/holders_provider.go | 2 +- scanner/providers/manager/manager.go | 9 ++-- scanner/providers/poap/poap_provider.go | 9 ++-- scanner/providers/poap/poap_provider_test.go | 2 +- scanner/providers/web3/erc20_provider.go | 2 +- scanner/providers/web3/erc721_provider.go | 2 +- scanner/providers/web3/erc777_provider.go | 2 +- scanner/scanner.go | 2 +- 12 files changed, 55 insertions(+), 28 deletions(-) diff --git a/cmd/census3/main.go b/cmd/census3/main.go index 37ff1162..5c21fddf 100644 --- a/cmd/census3/main.go +++ b/cmd/census3/main.go @@ -213,7 +213,7 @@ func main() { DataDir: config.dataDir, Web3Providers: w3p, GroupKey: config.connectKey, - HolderProviders: pm.Providers(), + HolderProviders: pm.Providers(ctx), AdminToken: config.adminToken, }) if err != nil { diff --git a/scanner/providers/farcaster/provider.go b/scanner/providers/farcaster/provider.go index 80e71868..94a518c2 100644 --- a/scanner/providers/farcaster/provider.go +++ b/scanner/providers/farcaster/provider.go @@ -46,7 +46,7 @@ var ( IterationSyncedCooldown = 60 * time.Second ) -func (p *FarcasterProvider) Init(iconf any) error { +func (p *FarcasterProvider) Init(globalCtx context.Context, iconf any) error { // parse the config and set the endpoints conf, ok := iconf.(FarcasterProviderConf) if !ok { @@ -69,7 +69,7 @@ func (p *FarcasterProvider) Init(iconf any) error { // and the database. By default, the last block is the creation block of the // key registry, because in the gap between the creation of the ID and Key // registries, there are no logs to scan. - ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + ctx, cancel := context.WithTimeout(globalCtx, 15*time.Second) defer cancel() idRegistryLastBlock, err := p.db.QueriesRO.LastBlock(ctx, IdRegistryAddress) if err != nil { @@ -122,7 +122,7 @@ func (p *FarcasterProvider) Init(iconf any) error { p.contracts.idRegistrySynced.Store(false) p.contracts.keyRegistrySynced.Store(false) // start the internal scanner - p.scannerCtx, p.cancelScanner = context.WithCancel(context.Background()) + p.scannerCtx, p.cancelScanner = context.WithCancel(globalCtx) go p.initInternalScanner() return nil } diff --git a/scanner/providers/gitcoin/gitcoin_provider.go b/scanner/providers/gitcoin/gitcoin_provider.go index 737c74d6..32d0a986 100644 --- a/scanner/providers/gitcoin/gitcoin_provider.go +++ b/scanner/providers/gitcoin/gitcoin_provider.go @@ -39,6 +39,12 @@ const ( metadataTimeout = time.Second * 5 ) +// since the scanne can scan multiple tokens concurrently, and every stamp +// is a token, we need to avoid multiple downloads at the same time. this +// variable is used to avoid multiple downloads at the same time using an +// atomic bool +var downloading atomic.Bool + type GitcoinPassport struct { // public endpoint to download the json apiEndpoint string @@ -67,7 +73,7 @@ type GitcoinPassportConf struct { // Init initializes the Gitcoin Passport provider with the given config. If the // config is not of type GitcoinPassportConf, or the API endpoint is missing, it // returns an error. If the cooldown is not set, it defaults to 6 hours. -func (g *GitcoinPassport) Init(iconf any) error { +func (g *GitcoinPassport) Init(globalCtx context.Context, iconf any) error { conf, ok := iconf.(GitcoinPassportConf) if !ok { return fmt.Errorf("invalid config type") @@ -85,7 +91,7 @@ func (g *GitcoinPassport) Init(iconf any) error { g.cooldown = conf.Cooldown g.db = conf.DB // init download variables - g.ctx, g.cancel = context.WithCancel(context.Background()) + g.ctx, g.cancel = context.WithCancel(globalCtx) g.scoresChan = make(chan *GitcoinScore) g.waiter = new(sync.WaitGroup) g.synced = atomic.Bool{} @@ -99,8 +105,11 @@ func (g *GitcoinPassport) Init(iconf any) error { if err == nil { g.lastSyncedTime.Store(lastSync) } - - g.startScoreUpdates() + // if there are other instances downloading, this one does not need to + // start the download process + if !downloading.Load() { + g.startScoreUpdates() + } return nil } @@ -134,7 +143,7 @@ func (g *GitcoinPassport) HoldersBalances(_ context.Context, stamp []byte, _ uin ) { // get the current scores from the db, handle the case when the stamp is // empty and when it is not to get the scores from the db - synced := g.synced.Load() + synced := g.isSynced(true) totalSupply := big.NewInt(0) currentScores := make(map[common.Address]*big.Int) if len(stamp) > 0 { @@ -190,9 +199,7 @@ func (g *GitcoinPassport) IsExternal() bool { // IsSynced returns true if the balances are not empty. func (g *GitcoinPassport) IsSynced(_ []byte) bool { - g.currentBalancesMtx.RLock() - defer g.currentBalancesMtx.RUnlock() - return len(g.currentBalances) > 0 + return g.isSynced(false) } // Address returns the address of the Gitcoin Passport contract. @@ -395,6 +402,22 @@ func (g *GitcoinPassport) updateLastSync(ctx context.Context) error { return nil } +func (g *GitcoinPassport) isSynced(update bool) bool { + if !update { + return g.synced.Load() + } + lastSync, err := g.loadLastSync(g.ctx) + if err != nil { + log.Warnw("error loading last sync time", "err", err) + return g.synced.Load() + } + g.lastSyncedTime.Store(lastSync) + tLastSync := time.Unix(lastSync, 0) + isSynced := time.Since(tLastSync) < g.cooldown + g.synced.Store(isSynced) + return isSynced +} + func (g *GitcoinPassport) startScoreUpdates() { log.Debug("starting Gitcoin Passport score updates") g.waiter.Add(1) @@ -431,6 +454,8 @@ func (g *GitcoinPassport) startScoreUpdates() { } func (g *GitcoinPassport) updateScores() error { + downloading.Store(true) + defer downloading.Store(false) // download de json from API endpoint req, err := http.NewRequestWithContext(g.ctx, http.MethodGet, g.apiEndpoint, nil) if err != nil { diff --git a/scanner/providers/gitcoin/gitcoin_provider_test.go b/scanner/providers/gitcoin/gitcoin_provider_test.go index c92e8e57..625ec9ad 100644 --- a/scanner/providers/gitcoin/gitcoin_provider_test.go +++ b/scanner/providers/gitcoin/gitcoin_provider_test.go @@ -42,7 +42,7 @@ func TestGitcoinPassport(t *testing.T) { }) // create the provider provider := new(GitcoinPassport) - c.Assert(provider.Init(GitcoinPassportConf{endpoints["/original"], time.Second * 2, testDB}), qt.IsNil) + c.Assert(provider.Init(ctx, GitcoinPassportConf{endpoints["/original"], time.Second * 2, testDB}), qt.IsNil) // start the first download emptyBalances, _, _, _, _, err := provider.HoldersBalances(context.TODO(), nil, 0) c.Assert(err, qt.IsNil) @@ -69,7 +69,7 @@ func TestGitcoinPassport(t *testing.T) { testDB, err = db.Init(tempDBDir, "gitcoinpassport.sql") c.Assert(err, qt.IsNil) newProvider := new(GitcoinPassport) - c.Assert(newProvider.Init(GitcoinPassportConf{endpoints["/updated"], time.Second * 2, testDB}), qt.IsNil) + c.Assert(newProvider.Init(ctx, GitcoinPassportConf{endpoints["/updated"], time.Second * 2, testDB}), qt.IsNil) // new endpoint with one change time.Sleep(time.Second * 5) c.Assert(newProvider.SetLastBalances(context.TODO(), nil, holders, 0), qt.IsNil) diff --git a/scanner/providers/holders_provider.go b/scanner/providers/holders_provider.go index 648081d4..a87707d0 100644 --- a/scanner/providers/holders_provider.go +++ b/scanner/providers/holders_provider.go @@ -15,7 +15,7 @@ type HolderProvider interface { // Init initializes the provider and its internal structures. Initial // attributes values must be defined in the struct that implements this // interface before calling this method. - Init(conf any) error + Init(ctx context.Context, conf any) error // SetRef sets the reference to the provider. It is used to define the // required token information to interact with the provider. SetRef(ref any) error diff --git a/scanner/providers/manager/manager.go b/scanner/providers/manager/manager.go index 85a95d57..35776833 100644 --- a/scanner/providers/manager/manager.go +++ b/scanner/providers/manager/manager.go @@ -7,6 +7,7 @@ package manager // the provider types and all the providers initialized at once. import ( + "context" "fmt" "sync" @@ -36,7 +37,7 @@ func (m *ProviderManager) AddProvider(providerType uint64, conf any) { // provider based on the configuration stored in the manager. It initializes a // new provider every time to avoid data races. It returns an error if the // provider type is not found or if the provider cannot be initialized. -func (m *ProviderManager) GetProvider(providerType uint64) (providers.HolderProvider, error) { +func (m *ProviderManager) GetProvider(ctx context.Context, providerType uint64) (providers.HolderProvider, error) { // load the configuration for the provider type conf, ok := m.confs.Load(providerType) if !ok { @@ -61,7 +62,7 @@ func (m *ProviderManager) GetProvider(providerType uint64) (providers.HolderProv return nil, fmt.Errorf("provider type %d not found", providerType) } // initialize the provider with the specific configuration - if err := provider.Init(conf); err != nil { + if err := provider.Init(ctx, conf); err != nil { return nil, err } return provider, nil @@ -80,10 +81,10 @@ func (m *ProviderManager) GetProviderTypes() []uint64 { // Providers returns all the providers stored in the manager associated to their // types as a map of uint64 to HolderProvider. -func (m *ProviderManager) Providers() map[uint64]providers.HolderProvider { +func (m *ProviderManager) Providers(ctx context.Context) map[uint64]providers.HolderProvider { providers := make(map[uint64]providers.HolderProvider) for _, t := range m.GetProviderTypes() { - provider, err := m.GetProvider(t) + provider, err := m.GetProvider(ctx, t) if err != nil { panic(err) } diff --git a/scanner/providers/poap/poap_provider.go b/scanner/providers/poap/poap_provider.go index ad1c83ac..25b0fe52 100644 --- a/scanner/providers/poap/poap_provider.go +++ b/scanner/providers/poap/poap_provider.go @@ -61,6 +61,7 @@ type POAPSnapshot struct { // POAP API to get the list of POAPs for an event ID and calculate the balances // of the token holders from the last snapshot. type POAPHolderProvider struct { + ctx context.Context apiEndpoint string accessToken string snapshots map[string]*POAPSnapshot @@ -75,19 +76,19 @@ type POAPConfig struct { // Init initializes the POAP external provider with the database provided. // It returns an error if the POAP access token or api endpoint uri is not // defined. -func (p *POAPHolderProvider) Init(iconf any) error { +func (p *POAPHolderProvider) Init(globalCtx context.Context, iconf any) error { // parse config conf, ok := iconf.(POAPConfig) if !ok { return fmt.Errorf("bad config type, it must be a POAPConfig struct") } - if conf.APIEndpoint == "" { return fmt.Errorf("no POAP URI defined") } if conf.AccessToken == "" { return fmt.Errorf("no POAP access token defined") } + p.ctx = globalCtx p.apiEndpoint = conf.APIEndpoint p.accessToken = conf.AccessToken p.snapshots = make(map[string]*POAPSnapshot) @@ -349,7 +350,7 @@ func (p *POAPHolderProvider) holdersPage(eventID string, offset int) (*POAPAPIRe q.Add("offset", fmt.Sprint(offset)) endpoint.RawQuery = q.Encode() // create request and add headers - req, err := http.NewRequest("GET", endpoint.String(), nil) + req, err := http.NewRequestWithContext(p.ctx, http.MethodGet, endpoint.String(), nil) if err != nil { return nil, err } @@ -392,7 +393,7 @@ func (p *POAPHolderProvider) getEventInfo(eventID string) (*EventAPIResponse, er return nil, err } // create request and add headers - req, err := http.NewRequest("GET", endpoint.String(), nil) + req, err := http.NewRequestWithContext(p.ctx, http.MethodGet, endpoint.String(), nil) if err != nil { return nil, err } diff --git a/scanner/providers/poap/poap_provider_test.go b/scanner/providers/poap/poap_provider_test.go index 36d55407..adef6074 100644 --- a/scanner/providers/poap/poap_provider_test.go +++ b/scanner/providers/poap/poap_provider_test.go @@ -38,7 +38,7 @@ func TestPOAP(t *testing.T) { }) provider := new(POAPHolderProvider) - c.Assert(provider.Init(POAPConfig{endpoints["/original"], "no-token"}), qt.IsNil) + c.Assert(provider.Init(ctx, POAPConfig{endpoints["/original"], "no-token"}), qt.IsNil) holders, _, _, _, _, err := provider.HoldersBalances(context.TODO(), nil, 0) c.Assert(err, qt.IsNil) c.Assert(len(holders), qt.Equals, len(expectedOriginalHolders)) diff --git a/scanner/providers/web3/erc20_provider.go b/scanner/providers/web3/erc20_provider.go index 7a8580ec..9b7db357 100644 --- a/scanner/providers/web3/erc20_provider.go +++ b/scanner/providers/web3/erc20_provider.go @@ -30,7 +30,7 @@ type ERC20HolderProvider struct { synced atomic.Bool } -func (p *ERC20HolderProvider) Init(iconf any) error { +func (p *ERC20HolderProvider) Init(_ context.Context, iconf any) error { // parse the config and set the endpoints conf, ok := iconf.(Web3ProviderConfig) if !ok { diff --git a/scanner/providers/web3/erc721_provider.go b/scanner/providers/web3/erc721_provider.go index a1205a3d..25bcb2a8 100644 --- a/scanner/providers/web3/erc721_provider.go +++ b/scanner/providers/web3/erc721_provider.go @@ -30,7 +30,7 @@ type ERC721HolderProvider struct { synced atomic.Bool } -func (p *ERC721HolderProvider) Init(iconf any) error { +func (p *ERC721HolderProvider) Init(_ context.Context, iconf any) error { // parse the config and set the endpoints conf, ok := iconf.(Web3ProviderConfig) if !ok { diff --git a/scanner/providers/web3/erc777_provider.go b/scanner/providers/web3/erc777_provider.go index 10248cfc..94c74e2d 100644 --- a/scanner/providers/web3/erc777_provider.go +++ b/scanner/providers/web3/erc777_provider.go @@ -30,7 +30,7 @@ type ERC777HolderProvider struct { synced atomic.Bool } -func (p *ERC777HolderProvider) Init(iconf any) error { +func (p *ERC777HolderProvider) Init(_ context.Context, iconf any) error { // parse the config and set the endpoints conf, ok := iconf.(Web3ProviderConfig) if !ok { diff --git a/scanner/scanner.go b/scanner/scanner.go index 8b4356ac..341d78bc 100644 --- a/scanner/scanner.go +++ b/scanner/scanner.go @@ -297,7 +297,7 @@ func (s *Scanner) ScanHolders(ctx context.Context, token ScannerToken) ( internalCtx, cancel := context.WithTimeout(ctx, SCAN_TIMEOUT) defer cancel() // get the correct token holder provider for the current token - provider, err := s.providerManager.GetProvider(token.Type) + provider, err := s.providerManager.GetProvider(s.ctx, token.Type) if err != nil { return nil, 0, token.LastBlock, token.Synced, nil, fmt.Errorf("token type %d not supported: %w", token.Type, err)