Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rescan tokens to process missing transactions #202

Open
wants to merge 27 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e620842
basic integration
lucasmenendez May 31, 2024
5805ce6
initial updater, no heavy tested and no comments yet
lucasmenendez Jun 3, 2024
e2dabf5
new filter abstraction to load and save them in filesystem
lucasmenendez Jun 4, 2024
ff2d01d
new endpoints to rescan a token
lucasmenendez Jun 4, 2024
f320e45
include comments
lucasmenendez Jun 4, 2024
54b6519
last fixes about filters
lucasmenendez Jun 5, 2024
b620177
solve linter issues
lucasmenendez Jun 5, 2024
cf7c29d
make updater request last block internal to avoid loop for ever
lucasmenendez Jun 5, 2024
b6b42c7
removing annoying logs
lucasmenendez Jun 5, 2024
803f80a
initial integration between updater and scanner, no concurrent
lucasmenendez Jun 12, 2024
df310ca
debug txs and filters
lucasmenendez Jun 17, 2024
ab5dc1a
concurrent updater
lucasmenendez Jun 18, 2024
08f70b2
mock filter
lucasmenendez Jun 19, 2024
bec68a6
Merge branch 'main' into f/rescan_tokens
lucasmenendez Jun 19, 2024
08b1e76
update dependencies
lucasmenendez Jun 19, 2024
276976b
linter fixes
lucasmenendez Jun 19, 2024
72d71b4
Merge branch 'main' into f/rescan_tokens
lucasmenendez Jun 20, 2024
b04d12e
Merge branch 'main' into f/rescan_tokens
lucasmenendez Jun 21, 2024
3979a7d
including arbo as key-value database to be used with updater filter t…
lucasmenendez Jun 21, 2024
09f8392
new 'next' method for updater to get the next token to update in each…
lucasmenendez Jun 23, 2024
4c4b318
filter logs partially and them update the filter in a single call
lucasmenendez Jun 23, 2024
5299dce
trying to fix annoying gitcoin test
lucasmenendez Jun 23, 2024
ef978c2
final minor changes
lucasmenendez Jun 24, 2024
1810575
Merge branch 'main' into f/rescan_tokens
lucasmenendez Jun 25, 2024
c77d165
Merge branch 'main' into f/rescan_tokens
lucasmenendez Jun 27, 2024
cb943ab
extend and fix treedb api, return the new logs ids from holder provid…
lucasmenendez Jun 27, 2024
f964eeb
Merge branch 'main' into f/rescan_tokens
lucasmenendez Jun 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
run:
go: '1.20'
skip-files:
- scanner/providers/gitcoin/gitcoin_stamps.go
issues:
max-same-issues: 0
exclude-use-default: false
exclude-files:
- scanner/providers/gitcoin/gitcoin_stamps.go
- scripts/*
linters:
enable:
- misspell
Expand Down
7 changes: 7 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
queries "github.com/vocdoni/census3/db/sqlc"
"github.com/vocdoni/census3/helpers/queue"
"github.com/vocdoni/census3/helpers/web3"
"github.com/vocdoni/census3/scanner"
"github.com/vocdoni/census3/scanner/providers"
"github.com/vocdoni/census3/scanner/providers/manager"
web3provider "github.com/vocdoni/census3/scanner/providers/web3"
Expand All @@ -39,8 +40,10 @@ type Census3APIConf struct {
Hostname string
Port int
DataDir string
FiltersPath string
GroupKey string
Web3Providers *web3.Web3Pool
TokenUpdater *scanner.Updater
HolderProviders *manager.ProviderManager
AdminToken string
}
Expand All @@ -57,6 +60,8 @@ type census3API struct {
holderProviders *manager.ProviderManager
cache *lru.Cache[CacheKey, any]
router *httprouter.HTTProuter
tokenUpdater *scanner.Updater
filtersPath string
}

func Init(db *db.DB, conf Census3APIConf) (*census3API, error) {
Expand All @@ -70,8 +75,10 @@ func Init(db *db.DB, conf Census3APIConf) (*census3API, error) {
w3p: conf.Web3Providers,
queue: queue.NewBackgroundQueue(),
holderProviders: conf.HolderProviders,
tokenUpdater: conf.TokenUpdater,
cache: cache,
router: &httprouter.HTTProuter{},
filtersPath: conf.FiltersPath,
}
// get the current chainID
log.Infow("starting API", "web3Providers", conf.Web3Providers.String())
Expand Down
10 changes: 10 additions & 0 deletions api/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,16 @@ var (
HTTPstatus: apirest.HTTPstatusNotFound,
Err: fmt.Errorf("token holder not found for the token provided"),
}
ErrNoSyncedToken = apirest.APIerror{
Code: 4024,
HTTPstatus: apirest.HTTPstatusBadRequest,
Err: fmt.Errorf("token is not synced yet"),
}
ErrMalformedRescanQueueID = apirest.APIerror{
Code: 4025,
HTTPstatus: apirest.HTTPstatusBadRequest,
Err: fmt.Errorf("malformed queue ID"),
}
ErrCantCreateToken = apirest.APIerror{
Code: 5000,
HTTPstatus: apirest.HTTPstatusInternalErr,
Expand Down
104 changes: 104 additions & 0 deletions api/tokens.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ import (
queries "github.com/vocdoni/census3/db/sqlc"
"github.com/vocdoni/census3/helpers/lexer"
"github.com/vocdoni/census3/metrics"
"github.com/vocdoni/census3/scanner"
"github.com/vocdoni/census3/scanner/providers"
"github.com/vocdoni/census3/scanner/providers/web3"
"go.vocdoni.io/dvote/httprouter"
api "go.vocdoni.io/dvote/httprouter/apirest"
"go.vocdoni.io/dvote/log"
"go.vocdoni.io/dvote/util"
)

func (capi *census3API) initTokenHandlers() error {
Expand All @@ -43,6 +45,14 @@ func (capi *census3API) initTokenHandlers() error {
api.MethodAccessTypePublic, capi.tokenStartBlock); err != nil {
return err
}
if err := capi.endpoint.RegisterMethod("/tokens/update/{tokenID}", "POST",
api.MethodAccessTypeAdmin, capi.rescanToken); err != nil {
return err
}
if err := capi.endpoint.RegisterMethod("/tokens/update/queue/{queueID}", "GET",
api.MethodAccessTypeAdmin, capi.checkRescanToken); err != nil {
return err
}
if err := capi.endpoint.RegisterMethod("/tokens/{tokenID}", "DELETE",
api.MethodAccessTypeAdmin, capi.launchDeleteToken); err != nil {
return err
Expand Down Expand Up @@ -607,6 +617,100 @@ func (capi *census3API) getToken(msg *api.APIdata, ctx *httprouter.HTTPContext)
return ctx.Send(res, api.HTTPstatusOK)
}

// rescanToken function handler enqueues the rescan process for the token with
// the given ID. The token is scanned from the creation block to the last block
// stored in the database. It returns a 400 error if the provided ID is wrong or
// empty, a 404 error if the token is not found, a 500 error if something fails
// or a 200 response if the process is enqueued. It returns a queue ID to track
// the status of the process.
func (capi *census3API) rescanToken(msg *api.APIdata, ctx *httprouter.HTTPContext) error {
// get contract address from the tokenID query param and decode check if
// it is provided, if not return an error
strAddress := ctx.URLParam("tokenID")
if strAddress == "" {
return ErrMalformedToken.With("tokenID is required")
}
address := common.HexToAddress(strAddress)
// get chainID from query params and decode it as integer, if it's not
// provided or it's not a valid integer return an error
strChainID := ctx.Request.URL.Query().Get("chainID")
if strChainID == "" {
return ErrMalformedChainID.With("chainID is required")
}
chainID, err := strconv.Atoi(strChainID)
if err != nil {
return ErrMalformedChainID.WithErr(err)
} else if chainID < 0 {
return ErrMalformedChainID.With("chainID must be a positive number")
}
// get token information from the database
internalCtx, cancel := context.WithTimeout(ctx.Request.Context(), getTokenTimeout)
defer cancel()
tokenData, err := capi.db.QueriesRO.GetToken(internalCtx,
queries.GetTokenParams{
ID: address.Bytes(),
ChainID: uint64(chainID),
})
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return ErrNotFoundToken.WithErr(err)
}
return ErrCantGetToken.WithErr(err)
}
// only the tokens that are already synced can be rescanned
if !tokenData.Synced {
return ErrNoSyncedToken
}
// enqueue the rescan token process
id := util.RandomHex(4)
if err := capi.tokenUpdater.SetRequest(id, &scanner.UpdateRequest{
Address: address,
ChainID: uint64(chainID),
Type: tokenData.TypeID,
CreationBlock: uint64(tokenData.CreationBlock),
EndBlock: uint64(tokenData.LastBlock),
}); err != nil {
return ErrEncodeQueueItem.WithErr(err)
}
// encoding the result and response it
res, err := json.Marshal(QueueResponse{id})
if err != nil {
return ErrEncodeQueueItem.WithErr(err)
}
return ctx.Send(res, api.HTTPstatusOK)
}

// checkRescanToken function handler returns the status of the rescan process
// with the given queue ID. It returns a 400 error if the provided ID is wrong
// or empty, a 404 error if the token is not found in the queue or a 500 error
// if something fails. The response contains the address of the token, the chain
// ID, the status of the process, the number of logs scanned, the number of new
// logs found, and the number of duplicated logs.
func (capi *census3API) checkRescanToken(msg *api.APIdata, ctx *httprouter.HTTPContext) error {
queueID := ctx.URLParam("queueID")
if queueID == "" {
return ErrMalformedRescanQueueID
}
// get the rescan status from the updater
status := capi.tokenUpdater.RequestStatus(queueID, true)
if status == nil {
return ErrNotFoundToken.Withf("the ID %s does not exist in the queue", queueID)
}
// encoding the result and response it
response, err := json.Marshal(RescanTokenStatus{
Address: status.Address.String(),
ChainID: status.ChainID,
Done: status.Done,
LogsScanned: status.TotalLogs,
NewLogs: status.TotalNewLogs,
DuplicatedLogs: status.TotalAlreadyProcessedLogs,
})
if err != nil {
return ErrEncodeQueueItem.WithErr(err)
}
return ctx.Send(response, api.HTTPstatusOK)
}

func (capi *census3API) tokenStartBlock(msg *api.APIdata, ctx *httprouter.HTTPContext) error {
req := Token{}
if err := json.Unmarshal(msg.Data, &req); err != nil {
Expand Down
14 changes: 14 additions & 0 deletions api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,17 @@ type DeleteTokenQueueResponse struct {
Done bool `json:"done"`
Error error `json:"error"`
}

type RescanTokenResponse struct {
ID string `json:"ID"`
}

type RescanTokenStatus struct {
Address string `json:"address"`
ChainID uint64 `json:"chainID"`
Done bool `json:"done"`

LogsScanned uint64 `json:"logsScanned"`
NewLogs uint64 `json:"newLogs"`
DuplicatedLogs uint64 `json:"duplicatedLogs"`
}
23 changes: 21 additions & 2 deletions cmd/census3/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/vocdoni/census3/scanner/providers/manager"
"github.com/vocdoni/census3/scanner/providers/poap"
web3provider "github.com/vocdoni/census3/scanner/providers/web3"
dvotedb "go.vocdoni.io/dvote/db"
"go.vocdoni.io/dvote/db/metadb"
"go.vocdoni.io/dvote/log"
)

Expand All @@ -39,6 +41,7 @@ type Census3Config struct {
adminToken string
initialTokens string
farcaster bool
filtersPath string
}

func main() {
Expand Down Expand Up @@ -135,6 +138,12 @@ func main() {
panic(err)
}
config.farcaster = pviper.GetBool("farcaster")
// set the filters path into the config, create the folder if it does not
// exitst yet
config.filtersPath = config.dataDir + "/filters"
if err := os.MkdirAll(config.filtersPath, os.ModePerm); err != nil {
log.Fatal(err)
}
// init logger
log.Init(config.logLevel, "stdout", nil)
// check if the web3 providers are defined
Expand Down Expand Up @@ -194,8 +203,15 @@ func main() {
DB: farcasterDB,
})
}
// init the filters database
filtersDB, err := metadb.New(dvotedb.TypePebble, config.filtersPath)
if err != nil {
log.Fatal(err)
}
// start the token updater with the database and the provider manager
updater := scanner.NewUpdater(database, w3p, pm, filtersDB, config.scannerCoolDown)
// start the holder scanner with the database and the provider manager
hc := scanner.NewScanner(database, w3p, pm, config.scannerCoolDown)
hc := scanner.NewScanner(database, updater, w3p, pm, config.scannerCoolDown)
// if the admin token is not defined, generate a random one
if config.adminToken != "" {
if _, err := uuid.Parse(config.adminToken); err != nil {
Expand All @@ -216,6 +232,7 @@ func main() {
GroupKey: config.connectKey,
HolderProviders: pm,
AdminToken: config.adminToken,
TokenUpdater: updater,
})
if err != nil {
log.Fatal(err)
Expand All @@ -228,7 +245,8 @@ func main() {
log.Info("initial tokens created, or at least tried to")
}()
// start the holder scanner
go hc.Start(ctx, config.scannerConcurrentTokens)
go hc.Start(ctx)
go updater.Start(ctx, config.scannerConcurrentTokens)

metrics.NewCounter(fmt.Sprintf("census3_info{version=%q,chains=%q}",
internal.Version, w3p.String())).Set(1)
Expand All @@ -243,6 +261,7 @@ func main() {
// closing database
go func() {
hc.Stop()
updater.Stop()
if err := apiService.Stop(); err != nil {
log.Fatal(err)
}
Expand Down
Loading
Loading