From cb943ab43891048751e63c550e2f0d55eacb2fe9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20Men=C3=A9ndez?= Date: Thu, 27 Jun 2024 17:05:53 +0200 Subject: [PATCH] extend and fix treedb api, return the new logs ids from holder providers to add them to the token filter after save holders --- db/treedb/treedb.go | 103 +++++++++++++++++----- scanner/const.go | 12 ++- scanner/providers/holders_provider.go | 6 +- scanner/providers/web3/erc20_provider.go | 18 ++-- scanner/providers/web3/erc721_provider.go | 18 ++-- scanner/providers/web3/erc777_provider.go | 18 ++-- scanner/providers/web3/web3_provider.go | 6 +- scanner/scanner.go | 2 +- scanner/updater.go | 9 +- 9 files changed, 135 insertions(+), 57 deletions(-) diff --git a/db/treedb/treedb.go b/db/treedb/treedb.go index 2544bf67..4804d825 100644 --- a/db/treedb/treedb.go +++ b/db/treedb/treedb.go @@ -23,7 +23,7 @@ const filterTreeLevels = 64 // ErrNotInitialized is returned when no tree is initialized in a TreeDB // instance, which means that LoadTree has not been called and the tree is // not ready to be used. -var ErrNotInitialized = fmt.Errorf("tree not initialized, call Load first") +var ErrNotInitialized = fmt.Errorf("tree not initialized, call LoadTree first") // TokenFilter is a filter associated with a token. type TreeDB struct { @@ -56,6 +56,9 @@ func LoadTree(db db.Database, prefix string) (*TreeDB, error) { }, wTx.Commit() } +// Close closes the tree database. If the tree is not nil, it closes the +// underlying database. If the parent database is not nil, it closes it too. +// It returns an error if any of the databases cannot be closed. func (tdb *TreeDB) Close() error { if tdb.tree != nil { if err := tdb.tree.DB().Close(); err != nil { @@ -68,11 +71,11 @@ func (tdb *TreeDB) Close() error { return nil } -// DeleteTree deletes a tree from the database identified by current prefix. -// It iterates over all the keys in the tree and deletes them. If some key -// cannot be deleted, it logs a warning and continues with the next key. It -// commits the transaction at the end. -func (tdb *TreeDB) Delete() error { +// Purge deletes a tree from the database identified by current prefix. It +// iterates over all the keys in the tree and deletes them. If some key cannot +// be deleted, it logs a warning and continues with the next key. It commits the +// transaction at the end. +func (tdb *TreeDB) Purge() error { treeDB := prefixeddb.NewPrefixedDatabase(tdb.parentDB, []byte(tdb.prefix)) wTx := treeDB.WriteTx() if err := treeDB.Iterate(nil, func(k, _ []byte) bool { @@ -86,36 +89,94 @@ func (tdb *TreeDB) Delete() error { return wTx.Commit() } -// Add adds a key to the tree. -func (tdb *TreeDB) Add(key, value []byte) error { +// Add adds a key to the tree. It no write transaction is provided, it creates +// a new one and commits it at the end. It returns an error if the tree is not +// initialized, if there is an error adding the key-value pair or committing +// the transaction if it was created. If a transaction is provided, it does +// not commit or discard it. +func (tdb *TreeDB) Add(wtx db.WriteTx, key, value []byte) error { if tdb.tree == nil { return ErrNotInitialized } + commitTx := wtx == nil + if commitTx { + wtx = tdb.tree.DB().WriteTx() + defer wtx.Discard() + } + if err := tdb.tree.Add(wtx, key, value); err != nil { + return err + } + if commitTx { + return wtx.Commit() + } + return nil +} + +// Del deletes a key from the tree. If no write transaction is provided, it +// creates a new one and commits it at the end. It returns an error if the tree +// is not initialized, if there is an error deleting the key-value pair or +// committing the transaction if it was provided. If a transaction is provided, +// it does not commit or discard it. +func (tdb *TreeDB) Del(wtx db.WriteTx, key []byte) error { + if tdb.tree == nil { + return ErrNotInitialized + } + commitTx := wtx == nil + if commitTx { + wtx = tdb.tree.DB().WriteTx() + defer wtx.Discard() + } + if err := tdb.tree.Del(wtx, key); err != nil { + return err + } + if commitTx { + return wtx.Commit() + } + return nil +} + +// AddBatch adds a batch of keys and values to the tree. It is more efficient +// than calling Add for each key-value pair. It returns an error if the length +// of keys and values is different, if the tree is not initialized, if there +// is an error adding a key-value pair or committing the transaction. It uses +// a new write transaction to add all the keys and commits it at the end. If +// something goes wrong, it returns an error and discards the transaction. +func (tdb *TreeDB) AddBatch(keys, values [][]byte) error { + if tdb.tree == nil { + return ErrNotInitialized + } + if len(keys) != len(values) { + return fmt.Errorf("keys and values must have the same length") + } wTx := tdb.tree.DB().WriteTx() defer wTx.Discard() - if err := tdb.tree.Add(wTx, key, value); err != nil { - return err + for i := range keys { + if err := tdb.tree.Add(wTx, keys[i], values[i]); err != nil { + return err + } } return wTx.Commit() } -// AddKey adds a key to the tree with nil value. It accepts variadic keys. +// AddKey adds a key to the tree with nil value. It accepts variadic keys. It +// uses a new write transaction to add all the keys and commits it at the end. +// If something goes wrong, it returns an error and discards the transaction. func (tdb *TreeDB) AddKey(key ...[]byte) error { if tdb.tree == nil { return ErrNotInitialized } - wTx := tdb.tree.DB().WriteTx() - defer wTx.Discard() + wtx := tdb.tree.DB().WriteTx() + defer wtx.Discard() for _, k := range key { - if err := tdb.tree.Add(wTx, k, nil); err != nil { + if err := tdb.tree.Add(wtx, k, nil); err != nil { return err } } - return wTx.Commit() + return wtx.Commit() } -// TestKey checks if a key is in the tree. -func (tdb *TreeDB) TestKey(key []byte) (bool, error) { +// CheckKeyKey checks if a key is in the tree. +func (tdb *TreeDB) CheckKey(key []byte) (bool, error) { if tdb.tree == nil { return false, ErrNotInitialized } @@ -129,10 +190,10 @@ func (tdb *TreeDB) TestKey(key []byte) (bool, error) { return true, nil } -// TestAndAddKey checks if a key is in the tree, if not, add it to the tree. It -// is the combination of Test and conditional Add. -func (tdb *TreeDB) TestAndAddKey(key []byte) (bool, error) { - exists, err := tdb.TestKey(key) +// CheckAndAddKey checks if a key is in the tree, if not, add it to the tree. It +// is the combination of CheckKey and conditional AddKey. +func (tdb *TreeDB) CheckAndAddKey(key []byte) (bool, error) { + exists, err := tdb.CheckKey(key) if err != nil { return false, err } diff --git a/scanner/const.go b/scanner/const.go index b47f56fd..4b56ab5a 100644 --- a/scanner/const.go +++ b/scanner/const.go @@ -3,9 +3,15 @@ package scanner import "time" const ( - READ_TIMEOUT = time.Minute - SCAN_TIMEOUT = 5 * time.Minute - SAVE_TIMEOUT = 5 * time.Minute + // READ_TIMEOUT is the timeout to get sorted tokens to scan from the database + READ_TIMEOUT = time.Minute + // SAVE_TIMEOUT is the timeout to save the scanned tokens to the database + SAVE_TIMEOUT = 5 * time.Minute + // PREPARE_TIMEOUT is the timeout to prepare the tokens to scan (calculate + // the birth block number, etc.) + PREPARE_TIMEOUT = 5 * time.Minute + // UPDATE_TIMEOUT is the timeout to update the tokens from their holders + // providers UPDATE_TIMEOUT = 15 * time.Minute ) diff --git a/scanner/providers/holders_provider.go b/scanner/providers/holders_provider.go index f8cde985..c031b62c 100644 --- a/scanner/providers/holders_provider.go +++ b/scanner/providers/holders_provider.go @@ -19,6 +19,7 @@ type BlocksDelta struct { Block uint64 Synced bool TotalSupply *big.Int + NewLogs [][]byte } // Filter interface defines the basic methods to interact with a filter to @@ -26,9 +27,8 @@ type BlocksDelta struct { // for example, if a token is rescanned. It allows to implement different // filters, such as in-memory, disk, merkle tree, etc. type Filter interface { - AddKey(key ...[]byte) error - TestKey(key []byte) (bool, error) - TestAndAddKey(key []byte) (bool, error) + CheckKey(key []byte) (bool, error) + CheckAndAddKey(key []byte) (bool, error) } // HolderProvider is the interface that wraps the basic methods to interact with diff --git a/scanner/providers/web3/erc20_provider.go b/scanner/providers/web3/erc20_provider.go index 3eb3c746..8d9765b0 100644 --- a/scanner/providers/web3/erc20_provider.go +++ b/scanner/providers/web3/erc20_provider.go @@ -161,7 +161,7 @@ func (p *ERC20HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fro balances := make(map[common.Address]*big.Int) // iterate the logs and update the balances log.Infow("parsing logs", "address", p.address, "type", p.TypeName(), "count", len(logs)) - processedLogs := &partialProcessedLogs{} + processedLogs := &PartialProcessedLogs{} for _, currentLog := range logs { // skip the log if it has been removed if currentLog.Removed { @@ -177,6 +177,7 @@ func (p *ERC20HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fro AlreadyProcessedLogsCount: alreadyProcessedLogs, Synced: false, TotalSupply: big.NewInt(0), + NewLogs: *processedLogs, }, errors.Join(ErrParsingTokenLogs, fmt.Errorf("[ERC20] %s: %w", p.address, err)) } // check if the log has been already processed and add it to the filter @@ -190,6 +191,7 @@ func (p *ERC20HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fro AlreadyProcessedLogsCount: alreadyProcessedLogs, Synced: false, TotalSupply: big.NewInt(0), + NewLogs: *processedLogs, }, errors.Join(ErrCheckingProcessedLogs, fmt.Errorf("[ERC20] %s: %w", p.address, err)) } // if it is the first scan, it will not check if the log has been @@ -211,9 +213,6 @@ func (p *ERC20HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fro balances[logData.From] = new(big.Int).Neg(logData.Value) } } - if err := p.filter.AddKey(processedLogs.ids...); err != nil { - return nil, nil, errors.Join(ErrAddingProcessedLogs, fmt.Errorf("[ERC20] %s: %w", p.address, err)) - } log.Infow("logs parsed", "count", len(balances), "new_logs", newTransfers, @@ -230,6 +229,7 @@ func (p *ERC20HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fro AlreadyProcessedLogsCount: alreadyProcessedLogs, Synced: synced, TotalSupply: big.NewInt(0), + NewLogs: *processedLogs, } if delta.TotalSupply, err = p.TotalSupply(nil); err != nil { log.Warnw("error getting total supply, it will retry in the next iteration", "error", err) @@ -396,7 +396,7 @@ func (p *ERC20HolderProvider) CensusKeys(data map[common.Address]*big.Int) (map[ // number and log index. It returns true if the log has been already processed // or false if it has not been processed yet. If some error occurs, it returns // false and the error. -func (p *ERC20HolderProvider) isLogAlreadyProcessed(l types.Log, pl *partialProcessedLogs) (bool, error) { +func (p *ERC20HolderProvider) isLogAlreadyProcessed(l types.Log, pl *PartialProcessedLogs) (bool, error) { // if the filter is not defined, return false if p.filter == nil { return false, nil @@ -410,7 +410,7 @@ func (p *ERC20HolderProvider) isLogAlreadyProcessed(l types.Log, pl *partialProc } // check if the hash is in the filter hID := hashFn.Sum(nil)[:8] - exists, err := p.filter.TestKey(hID) + exists, err := p.filter.CheckKey(hID) if err != nil { return false, err } @@ -418,12 +418,14 @@ func (p *ERC20HolderProvider) isLogAlreadyProcessed(l types.Log, pl *partialProc return true, nil } // if the hash is not in the filter, check if it is in the partial filter - for _, id := range pl.ids { + logs := *pl + for _, id := range logs { if bytes.Equal(id, hID) { return true, nil } } // add the hash to the partial filter if it has not been processed and return - pl.ids = append(pl.ids, hID) + logs = append(logs, hID) + *pl = logs return false, nil } diff --git a/scanner/providers/web3/erc721_provider.go b/scanner/providers/web3/erc721_provider.go index 8e188ebb..d50cd270 100644 --- a/scanner/providers/web3/erc721_provider.go +++ b/scanner/providers/web3/erc721_provider.go @@ -159,7 +159,7 @@ func (p *ERC721HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fr balances := make(map[common.Address]*big.Int) // iterate the logs and update the balances log.Infow("parsing logs", "address", p.address, "type", p.TypeName(), "count", len(logs)) - processedLogs := &partialProcessedLogs{} + processedLogs := &PartialProcessedLogs{} for _, currentLog := range logs { // skip the log if it has been removed if currentLog.Removed { @@ -175,6 +175,7 @@ func (p *ERC721HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fr AlreadyProcessedLogsCount: alreadyProcessedLogs, Synced: false, TotalSupply: big.NewInt(0), + NewLogs: *processedLogs, }, errors.Join(ErrParsingTokenLogs, fmt.Errorf("[ERC721] %s: %w", p.address, err)) } // check if the log has been already processed and add it to the filter @@ -188,6 +189,7 @@ func (p *ERC721HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fr AlreadyProcessedLogsCount: alreadyProcessedLogs, Synced: false, TotalSupply: big.NewInt(0), + NewLogs: *processedLogs, }, errors.Join(ErrCheckingProcessedLogs, fmt.Errorf("[ERC721] %s: %w", p.address, err)) } // if it is the first scan, it will not check if the log has been @@ -209,9 +211,6 @@ func (p *ERC721HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fr balances[logData.From] = big.NewInt(-1) } } - if err := p.filter.AddKey(processedLogs.ids...); err != nil { - return nil, nil, errors.Join(ErrAddingProcessedLogs, fmt.Errorf("[ERC20] %s: %w", p.address, err)) - } log.Infow("logs parsed", "count", len(balances), "new_logs", newTransfers, @@ -228,6 +227,7 @@ func (p *ERC721HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fr AlreadyProcessedLogsCount: alreadyProcessedLogs, Synced: synced, TotalSupply: big.NewInt(0), + NewLogs: *processedLogs, } if delta.TotalSupply, err = p.TotalSupply(nil); err != nil { log.Warnw("error getting total supply, it will retry in the next iteration", "error", err) @@ -392,7 +392,7 @@ func (p *ERC721HolderProvider) CensusKeys(data map[common.Address]*big.Int) (map // number and log index. It returns true if the log has been already processed // or false if it has not been processed yet. If some error occurs, it returns // false and the error. -func (p *ERC721HolderProvider) isLogAlreadyProcessed(l types.Log, pl *partialProcessedLogs) (bool, error) { +func (p *ERC721HolderProvider) isLogAlreadyProcessed(l types.Log, pl *PartialProcessedLogs) (bool, error) { // if the filter is not defined, return false if p.filter == nil { return false, nil @@ -406,7 +406,7 @@ func (p *ERC721HolderProvider) isLogAlreadyProcessed(l types.Log, pl *partialPro } // check if the hash is in the filter hID := hashFn.Sum(nil)[:8] - exists, err := p.filter.TestKey(hID) + exists, err := p.filter.CheckKey(hID) if err != nil { return false, err } @@ -414,12 +414,14 @@ func (p *ERC721HolderProvider) isLogAlreadyProcessed(l types.Log, pl *partialPro return true, nil } // if the hash is not in the filter, check if it is in the partial filter - for _, id := range pl.ids { + logs := *pl + for _, id := range logs { if bytes.Equal(id, hID) { return true, nil } } // add the hash to the partial filter if it has not been processed and return - pl.ids = append(pl.ids, hID) + logs = append(logs, hID) + *pl = logs return false, nil } diff --git a/scanner/providers/web3/erc777_provider.go b/scanner/providers/web3/erc777_provider.go index 71bec291..a9d790a6 100644 --- a/scanner/providers/web3/erc777_provider.go +++ b/scanner/providers/web3/erc777_provider.go @@ -159,7 +159,7 @@ func (p *ERC777HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fr balances := make(map[common.Address]*big.Int) // iterate the logs and update the balances log.Infow("parsing logs", "address", p.address, "type", p.TypeName(), "count", len(logs)) - processedLogs := &partialProcessedLogs{} + processedLogs := &PartialProcessedLogs{} for _, currentLog := range logs { // skip the log if it has been removed if currentLog.Removed { @@ -175,6 +175,7 @@ func (p *ERC777HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fr AlreadyProcessedLogsCount: alreadyProcessedLogs, Synced: false, TotalSupply: big.NewInt(0), + NewLogs: *processedLogs, }, errors.Join(ErrParsingTokenLogs, fmt.Errorf("[ERC777] %s: %w", p.address, err)) } // check if the log has been already processed and add it to the filter @@ -188,6 +189,7 @@ func (p *ERC777HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fr AlreadyProcessedLogsCount: alreadyProcessedLogs, Synced: false, TotalSupply: big.NewInt(0), + NewLogs: *processedLogs, }, errors.Join(ErrCheckingProcessedLogs, fmt.Errorf("[ERC777] %s: %w", p.address, err)) } // if it is the first scan, it will not check if the log has been @@ -209,9 +211,6 @@ func (p *ERC777HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fr balances[logData.From] = big.NewInt(-1) } } - if err := p.filter.AddKey(processedLogs.ids...); err != nil { - return nil, nil, errors.Join(ErrAddingProcessedLogs, fmt.Errorf("[ERC20] %s: %w", p.address, err)) - } log.Infow("logs parsed", "count", len(balances), "new_logs", newTransfers, @@ -228,6 +227,7 @@ func (p *ERC777HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fr AlreadyProcessedLogsCount: alreadyProcessedLogs, Synced: synced, TotalSupply: big.NewInt(0), + NewLogs: *processedLogs, } if delta.TotalSupply, err = p.TotalSupply(nil); err != nil { log.Warnw("error getting total supply, it will retry in the next iteration", "error", err) @@ -392,7 +392,7 @@ func (p *ERC777HolderProvider) CensusKeys(data map[common.Address]*big.Int) (map // number and log index. It returns true if the log has been already processed // or false if it has not been processed yet. If some error occurs, it returns // false and the error. -func (p *ERC777HolderProvider) isLogAlreadyProcessed(l types.Log, pl *partialProcessedLogs) (bool, error) { +func (p *ERC777HolderProvider) isLogAlreadyProcessed(l types.Log, pl *PartialProcessedLogs) (bool, error) { // if the filter is not defined, return false if p.filter == nil { return false, nil @@ -406,7 +406,7 @@ func (p *ERC777HolderProvider) isLogAlreadyProcessed(l types.Log, pl *partialPro } // check if the hash is in the filter hID := hashFn.Sum(nil)[:8] - exists, err := p.filter.TestKey(hID) + exists, err := p.filter.CheckKey(hID) if err != nil { return false, err } @@ -414,12 +414,14 @@ func (p *ERC777HolderProvider) isLogAlreadyProcessed(l types.Log, pl *partialPro return true, nil } // if the hash is not in the filter, check if it is in the partial filter - for _, id := range pl.ids { + logs := *pl + for _, id := range logs { if bytes.Equal(id, hID) { return true, nil } } // add the hash to the partial filter if it has not been processed and return - pl.ids = append(pl.ids, hID) + logs = append(logs, hID) + *pl = logs return false, nil } diff --git a/scanner/providers/web3/web3_provider.go b/scanner/providers/web3/web3_provider.go index 206bed30..4c338e66 100644 --- a/scanner/providers/web3/web3_provider.go +++ b/scanner/providers/web3/web3_provider.go @@ -30,13 +30,11 @@ type Web3ProviderConfig struct { DB *db.Database } -// partialProcessedLogs struct is used to store the logs that are partially +// PartialProcessedLogs struct is used to store the logs that are partially // processed by the provider. It is used to avoid to process the same logs // multiple times if the provider is rescanned and to store the logs that are // already processed in a single call to the token filter. -type partialProcessedLogs struct { - ids [][]byte -} +type PartialProcessedLogs [][]byte // creationBlock function returns the block number of the creation of a contract // address. It uses the `eth_getCode` method to get the contract code at the diff --git a/scanner/scanner.go b/scanner/scanner.go index aa8f2158..0b23d3fb 100644 --- a/scanner/scanner.go +++ b/scanner/scanner.go @@ -392,7 +392,7 @@ func (s *Scanner) updateInternalTokenStatus(token ScannerToken, lastBlock uint64 // if something fails in the process. It sets the last block of the token to // the creation block of the token to start scanning from the creation block. func (s *Scanner) prepareToken(token *ScannerToken) error { - ctx, cancel := context.WithTimeout(s.ctx, UPDATE_TIMEOUT) + ctx, cancel := context.WithTimeout(s.ctx, PREPARE_TIMEOUT) defer cancel() // get the provider by token type provider, err := s.providerManager.GetProvider(ctx, token.Type) diff --git a/scanner/updater.go b/scanner/updater.go index d0c7885f..70d64f1f 100644 --- a/scanner/updater.go +++ b/scanner/updater.go @@ -277,13 +277,14 @@ func (u *Updater) process(id string, req UpdateRequest) (UpdateRequest, error) { return req, fmt.Errorf("error getting provider for token: %v", err) } // if the token is a external token, return an error + var filter *treedb.TreeDB if !provider.IsExternal() { chainAddress, ok := u.networks.ChainAddress(req.ChainID, req.Address.Hex()) if !ok { return req, fmt.Errorf("error getting chain address for token: %v", err) } // load filter of the token from the database - filter, err := treedb.LoadTree(u.kvdb, chainAddress) + filter, err = treedb.LoadTree(u.kvdb, chainAddress) if err != nil { return req, err } @@ -362,6 +363,12 @@ func (u *Updater) process(id string, req UpdateRequest) (UpdateRequest, error) { if err != nil { return req, fmt.Errorf("error saving token holders balances: %v", err) } + // add the new keys to the filter if it is defined (not external token) + if filter != nil && delta.NewLogs != nil { + if err := filter.AddKey(delta.NewLogs...); err != nil { + return req, fmt.Errorf("error adding keys to filter: %v", err) + } + } log.Debugw("token holders balances updated", "token", req.Address.Hex(), "chainID", req.ChainID,