Skip to content

Commit

Permalink
extend and fix treedb api, return the new logs ids from holder provid…
Browse files Browse the repository at this point in the history
…ers to add them to the token filter after save holders
  • Loading branch information
lucasmenendez committed Jun 27, 2024
1 parent c77d165 commit c6cf141
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 57 deletions.
103 changes: 82 additions & 21 deletions db/treedb/treedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const filterTreeLevels = 64
// ErrNotInitialized is returned when no tree is initialized in a TreeDB
// instance, which means that LoadTree has not been called and the tree is
// not ready to be used.
var ErrNotInitialized = fmt.Errorf("tree not initialized, call Load first")
var ErrNotInitialized = fmt.Errorf("tree not initialized, call LoadTree first")

// TokenFilter is a filter associated with a token.
type TreeDB struct {
Expand Down Expand Up @@ -56,6 +56,9 @@ func LoadTree(db db.Database, prefix string) (*TreeDB, error) {
}, wTx.Commit()
}

// Close closes the tree database. If the tree is not nil, it closes the
// underlying database. If the parent database is not nil, it closes it too.
// It returns an error if any of the databases cannot be closed.
func (tdb *TreeDB) Close() error {
if tdb.tree != nil {
if err := tdb.tree.DB().Close(); err != nil {
Expand All @@ -68,11 +71,11 @@ func (tdb *TreeDB) Close() error {
return nil
}

// DeleteTree deletes a tree from the database identified by current prefix.
// It iterates over all the keys in the tree and deletes them. If some key
// cannot be deleted, it logs a warning and continues with the next key. It
// commits the transaction at the end.
func (tdb *TreeDB) Delete() error {
// Purge deletes a tree from the database identified by current prefix. It
// iterates over all the keys in the tree and deletes them. If some key cannot
// be deleted, it logs a warning and continues with the next key. It commits the
// transaction at the end.
func (tdb *TreeDB) Purge() error {
treeDB := prefixeddb.NewPrefixedDatabase(tdb.parentDB, []byte(tdb.prefix))
wTx := treeDB.WriteTx()
if err := treeDB.Iterate(nil, func(k, _ []byte) bool {
Expand All @@ -86,36 +89,94 @@ func (tdb *TreeDB) Delete() error {
return wTx.Commit()
}

// Add adds a key to the tree.
func (tdb *TreeDB) Add(key, value []byte) error {
// Add adds a key to the tree. It no write transaction is provided, it creates
// a new one and commits it at the end. It returns an error if the tree is not
// initialized, if there is an error adding the key-value pair or committing
// the transaction if it was created. If a transaction is provided, it does
// not commit or discard it.
func (tdb *TreeDB) Add(wtx db.WriteTx, key, value []byte) error {
if tdb.tree == nil {
return ErrNotInitialized
}
commitTx := wtx == nil
if commitTx {
wtx = tdb.tree.DB().WriteTx()
defer wtx.Discard()
}
if err := tdb.tree.Add(wtx, key, value); err != nil {
return err
}
if commitTx {
return wtx.Commit()
}
return nil
}

// Del deletes a key from the tree. If no write transaction is provided, it
// creates a new one and commits it at the end. It returns an error if the tree
// is not initialized, if there is an error deleting the key-value pair or
// committing the transaction if it was provided. If a transaction is provided,
// it does not commit or discard it.
func (tdb *TreeDB) Del(wtx db.WriteTx, key []byte) error {
if tdb.tree == nil {
return ErrNotInitialized
}
commitTx := wtx == nil
if commitTx {
wtx = tdb.tree.DB().WriteTx()
defer wtx.Discard()
}
if err := tdb.tree.Del(wtx, key); err != nil {
return err
}
if commitTx {
return wtx.Commit()
}
return nil
}

// AddBatch adds a batch of keys and values to the tree. It is more efficient
// than calling Add for each key-value pair. It returns an error if the length
// of keys and values is different, if the tree is not initialized, if there
// is an error adding a key-value pair or committing the transaction. It uses
// a new write transaction to add all the keys and commits it at the end. If
// something goes wrong, it returns an error and discards the transaction.
func (tdb *TreeDB) AddBatch(keys, values [][]byte) error {
if tdb.tree == nil {
return ErrNotInitialized
}
if len(keys) != len(values) {
return fmt.Errorf("keys and values must have the same length")
}
wTx := tdb.tree.DB().WriteTx()
defer wTx.Discard()
if err := tdb.tree.Add(wTx, key, value); err != nil {
return err
for i := range keys {
if err := tdb.tree.Add(wTx, keys[i], values[i]); err != nil {
return err
}
}
return wTx.Commit()
}

// AddKey adds a key to the tree with nil value. It accepts variadic keys.
// AddKey adds a key to the tree with nil value. It accepts variadic keys. It
// uses a new write transaction to add all the keys and commits it at the end.
// If something goes wrong, it returns an error and discards the transaction.
func (tdb *TreeDB) AddKey(key ...[]byte) error {
if tdb.tree == nil {
return ErrNotInitialized
}
wTx := tdb.tree.DB().WriteTx()
defer wTx.Discard()
wtx := tdb.tree.DB().WriteTx()
defer wtx.Discard()
for _, k := range key {
if err := tdb.tree.Add(wTx, k, nil); err != nil {
if err := tdb.tree.Add(wtx, k, nil); err != nil {
return err
}
}
return wTx.Commit()
return wtx.Commit()
}

// TestKey checks if a key is in the tree.
func (tdb *TreeDB) TestKey(key []byte) (bool, error) {
// CheckKeyKey checks if a key is in the tree.
func (tdb *TreeDB) CheckKey(key []byte) (bool, error) {
if tdb.tree == nil {
return false, ErrNotInitialized
}
Expand All @@ -129,10 +190,10 @@ func (tdb *TreeDB) TestKey(key []byte) (bool, error) {
return true, nil
}

// TestAndAddKey checks if a key is in the tree, if not, add it to the tree. It
// is the combination of Test and conditional Add.
func (tdb *TreeDB) TestAndAddKey(key []byte) (bool, error) {
exists, err := tdb.TestKey(key)
// CheckAndAddKey checks if a key is in the tree, if not, add it to the tree. It
// is the combination of CheckKey and conditional AddKey.
func (tdb *TreeDB) CheckAndAddKey(key []byte) (bool, error) {
exists, err := tdb.CheckKey(key)
if err != nil {
return false, err
}
Expand Down
12 changes: 9 additions & 3 deletions scanner/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,15 @@ package scanner
import "time"

const (
READ_TIMEOUT = time.Minute
SCAN_TIMEOUT = 5 * time.Minute
SAVE_TIMEOUT = 5 * time.Minute
// READ_TIMEOUT is the timeout to get sorted tokens to scan from the database
READ_TIMEOUT = time.Minute
// SAVE_TIMEOUT is the timeout to save the scanned tokens to the database
SAVE_TIMEOUT = 5 * time.Minute
// PREPARE_TIMEOUT is the timeout to prepare the tokens to scan (calculate
// the birth block number, etc.)
PREPARE_TIMEOUT = 5 * time.Minute
// UPDATE_TIMEOUT is the timeout to update the tokens from their holders
// providers
UPDATE_TIMEOUT = 15 * time.Minute
)

Expand Down
6 changes: 3 additions & 3 deletions scanner/providers/holders_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ type BlocksDelta struct {
Block uint64
Synced bool
TotalSupply *big.Int
NewLogs [][]byte
}

// Filter interface defines the basic methods to interact with a filter to
// store the processed transfers identifiers and avoid to process them again,
// for example, if a token is rescanned. It allows to implement different
// filters, such as in-memory, disk, merkle tree, etc.
type Filter interface {
AddKey(key ...[]byte) error
TestKey(key []byte) (bool, error)
TestAndAddKey(key []byte) (bool, error)
CheckKey(key []byte) (bool, error)
CheckAndAddKey(key []byte) (bool, error)
}

// HolderProvider is the interface that wraps the basic methods to interact with
Expand Down
18 changes: 10 additions & 8 deletions scanner/providers/web3/erc20_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (p *ERC20HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fro
balances := make(map[common.Address]*big.Int)
// iterate the logs and update the balances
log.Infow("parsing logs", "address", p.address, "type", p.TypeName(), "count", len(logs))
processedLogs := &partialProcessedLogs{}
processedLogs := &PartialProcessedLogs{}
for _, currentLog := range logs {
// skip the log if it has been removed
if currentLog.Removed {
Expand All @@ -177,6 +177,7 @@ func (p *ERC20HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fro
AlreadyProcessedLogsCount: alreadyProcessedLogs,
Synced: false,
TotalSupply: big.NewInt(0),
NewLogs: *processedLogs,
}, errors.Join(ErrParsingTokenLogs, fmt.Errorf("[ERC20] %s: %w", p.address, err))
}
// check if the log has been already processed and add it to the filter
Expand All @@ -190,6 +191,7 @@ func (p *ERC20HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fro
AlreadyProcessedLogsCount: alreadyProcessedLogs,
Synced: false,
TotalSupply: big.NewInt(0),
NewLogs: *processedLogs,
}, errors.Join(ErrCheckingProcessedLogs, fmt.Errorf("[ERC20] %s: %w", p.address, err))
}
// if it is the first scan, it will not check if the log has been
Expand All @@ -211,9 +213,6 @@ func (p *ERC20HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fro
balances[logData.From] = new(big.Int).Neg(logData.Value)
}
}
if err := p.filter.AddKey(processedLogs.ids...); err != nil {
return nil, nil, errors.Join(ErrAddingProcessedLogs, fmt.Errorf("[ERC20] %s: %w", p.address, err))
}
log.Infow("logs parsed",
"count", len(balances),
"new_logs", newTransfers,
Expand All @@ -230,6 +229,7 @@ func (p *ERC20HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fro
AlreadyProcessedLogsCount: alreadyProcessedLogs,
Synced: synced,
TotalSupply: big.NewInt(0),
NewLogs: *processedLogs,
}
if delta.TotalSupply, err = p.TotalSupply(nil); err != nil {
log.Warnw("error getting total supply, it will retry in the next iteration", "error", err)
Expand Down Expand Up @@ -396,7 +396,7 @@ func (p *ERC20HolderProvider) CensusKeys(data map[common.Address]*big.Int) (map[
// number and log index. It returns true if the log has been already processed
// or false if it has not been processed yet. If some error occurs, it returns
// false and the error.
func (p *ERC20HolderProvider) isLogAlreadyProcessed(l types.Log, pl *partialProcessedLogs) (bool, error) {
func (p *ERC20HolderProvider) isLogAlreadyProcessed(l types.Log, pl *PartialProcessedLogs) (bool, error) {
// if the filter is not defined, return false
if p.filter == nil {
return false, nil
Expand All @@ -410,20 +410,22 @@ func (p *ERC20HolderProvider) isLogAlreadyProcessed(l types.Log, pl *partialProc
}
// check if the hash is in the filter
hID := hashFn.Sum(nil)[:8]
exists, err := p.filter.TestKey(hID)
exists, err := p.filter.CheckKey(hID)
if err != nil {
return false, err
}
if exists {
return true, nil
}
// if the hash is not in the filter, check if it is in the partial filter
for _, id := range pl.ids {
logs := *pl
for _, id := range logs {
if bytes.Equal(id, hID) {
return true, nil
}
}
// add the hash to the partial filter if it has not been processed and return
pl.ids = append(pl.ids, hID)
logs = append(logs, hID)
*pl = logs
return false, nil
}
18 changes: 10 additions & 8 deletions scanner/providers/web3/erc721_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (p *ERC721HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fr
balances := make(map[common.Address]*big.Int)
// iterate the logs and update the balances
log.Infow("parsing logs", "address", p.address, "type", p.TypeName(), "count", len(logs))
processedLogs := &partialProcessedLogs{}
processedLogs := &PartialProcessedLogs{}
for _, currentLog := range logs {
// skip the log if it has been removed
if currentLog.Removed {
Expand All @@ -175,6 +175,7 @@ func (p *ERC721HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fr
AlreadyProcessedLogsCount: alreadyProcessedLogs,
Synced: false,
TotalSupply: big.NewInt(0),
NewLogs: *processedLogs,
}, errors.Join(ErrParsingTokenLogs, fmt.Errorf("[ERC721] %s: %w", p.address, err))
}
// check if the log has been already processed and add it to the filter
Expand All @@ -188,6 +189,7 @@ func (p *ERC721HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fr
AlreadyProcessedLogsCount: alreadyProcessedLogs,
Synced: false,
TotalSupply: big.NewInt(0),
NewLogs: *processedLogs,
}, errors.Join(ErrCheckingProcessedLogs, fmt.Errorf("[ERC721] %s: %w", p.address, err))
}
// if it is the first scan, it will not check if the log has been
Expand All @@ -209,9 +211,6 @@ func (p *ERC721HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fr
balances[logData.From] = big.NewInt(-1)
}
}
if err := p.filter.AddKey(processedLogs.ids...); err != nil {
return nil, nil, errors.Join(ErrAddingProcessedLogs, fmt.Errorf("[ERC20] %s: %w", p.address, err))
}
log.Infow("logs parsed",
"count", len(balances),
"new_logs", newTransfers,
Expand All @@ -228,6 +227,7 @@ func (p *ERC721HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fr
AlreadyProcessedLogsCount: alreadyProcessedLogs,
Synced: synced,
TotalSupply: big.NewInt(0),
NewLogs: *processedLogs,
}
if delta.TotalSupply, err = p.TotalSupply(nil); err != nil {
log.Warnw("error getting total supply, it will retry in the next iteration", "error", err)
Expand Down Expand Up @@ -392,7 +392,7 @@ func (p *ERC721HolderProvider) CensusKeys(data map[common.Address]*big.Int) (map
// number and log index. It returns true if the log has been already processed
// or false if it has not been processed yet. If some error occurs, it returns
// false and the error.
func (p *ERC721HolderProvider) isLogAlreadyProcessed(l types.Log, pl *partialProcessedLogs) (bool, error) {
func (p *ERC721HolderProvider) isLogAlreadyProcessed(l types.Log, pl *PartialProcessedLogs) (bool, error) {
// if the filter is not defined, return false
if p.filter == nil {
return false, nil
Expand All @@ -406,20 +406,22 @@ func (p *ERC721HolderProvider) isLogAlreadyProcessed(l types.Log, pl *partialPro
}
// check if the hash is in the filter
hID := hashFn.Sum(nil)[:8]
exists, err := p.filter.TestKey(hID)
exists, err := p.filter.CheckKey(hID)
if err != nil {
return false, err
}
if exists {
return true, nil
}
// if the hash is not in the filter, check if it is in the partial filter
for _, id := range pl.ids {
logs := *pl
for _, id := range logs {
if bytes.Equal(id, hID) {
return true, nil
}
}
// add the hash to the partial filter if it has not been processed and return
pl.ids = append(pl.ids, hID)
logs = append(logs, hID)
*pl = logs
return false, nil
}
Loading

0 comments on commit c6cf141

Please sign in to comment.