Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/unifiying-censusdb #115

Merged
merged 6 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@ package api

import (
"encoding/json"
"path/filepath"

"github.com/vocdoni/census3/census"
"github.com/vocdoni/census3/db"
"github.com/vocdoni/census3/queue"
"github.com/vocdoni/census3/service"
"github.com/vocdoni/census3/state"
"go.vocdoni.io/dvote/api/censusdb"
storagelayer "go.vocdoni.io/dvote/data"
"go.vocdoni.io/dvote/data/downloader"
"go.vocdoni.io/dvote/data/ipfs"
"go.vocdoni.io/dvote/data/ipfs/ipfsconnect"
vocdoniDB "go.vocdoni.io/dvote/db"
"go.vocdoni.io/dvote/db/metadb"
"go.vocdoni.io/dvote/httprouter"
api "go.vocdoni.io/dvote/httprouter/apirest"
"go.vocdoni.io/dvote/log"
Expand All @@ -30,7 +33,7 @@ type census3API struct {
conf Census3APIConf
db *db.DB
endpoint *api.API
censusDB *census.CensusDB
censusDB *censusdb.CensusDB
queue *queue.BackgroundQueue
w3p state.Web3Providers
storage storagelayer.Storage
Expand Down Expand Up @@ -73,10 +76,16 @@ func Init(db *db.DB, conf Census3APIConf) (*census3API, error) {
// init the downloader using the storage layer
newAPI.downloader = downloader.NewDownloader(newAPI.storage)
newAPI.downloader.Start()
// init the census DB using the storage layer
if newAPI.censusDB, err = census.NewCensusDB(conf.DataDir, newAPI.storage); err != nil {
// init the database for the census trees
censusesDB, err := metadb.New(vocdoniDB.TypePebble, filepath.Join(conf.DataDir, "censusdb"))
if err != nil {
return nil, err
}
// init the censusDB of the API
if newAPI.censusDB = censusdb.NewCensusDB(censusesDB); err != nil {
return nil, err
}

// init handlers
if err := newAPI.initAPIHandlers(); err != nil {
return nil, err
Expand Down
30 changes: 16 additions & 14 deletions api/censuses.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"math/big"
"strconv"

"github.com/vocdoni/census3/census"
queries "github.com/vocdoni/census3/db/sqlc"
"go.vocdoni.io/dvote/httprouter"
api "go.vocdoni.io/dvote/httprouter/apirest"
Expand Down Expand Up @@ -61,7 +60,7 @@ func (capi *census3API) getCensus(msg *api.APIdata, ctx *httprouter.HTTPContext)
URI: capi.downloader.RemoteStorage.URIprefix() + currentCensus.Uri.String,
Size: currentCensus.Size,
Weight: new(big.Int).SetBytes(censusWeight).String(),
Anonymous: currentCensus.CensusType == uint64(census.AnonymousCensusType),
Anonymous: currentCensus.CensusType == uint64(anonymousCensusType),
})
if err != nil {
return ErrEncodeCensus.WithErr(err)
Expand Down Expand Up @@ -135,7 +134,7 @@ func (capi *census3API) createAndPublishCensus(req *CreateCensusRequest, qID str
return 0, ErrInvalidStrategyPredicate.With("empty predicate")
}
// init some variables to get computed in the following steps
strategyHolders, censusWeight, totalTokensBlockNumber, err := census.CalculateStrategyHolders(
strategyHolders, censusWeight, totalTokensBlockNumber, err := CalculateStrategyHolders(
internalCtx, capi.db.QueriesRO, capi.w3p, req.StrategyID, strategy.Predicate)
if err != nil {
return 0, ErrEvalStrategyPredicate.WithErr(err)
Expand All @@ -144,7 +143,7 @@ func (capi *census3API) createAndPublishCensus(req *CreateCensusRequest, qID str
return 0, ErrNoStrategyHolders
}
// compute the new censusId and censusType
newCensusID := census.InnerCensusID(totalTokensBlockNumber, req.StrategyID, req.Anonymous)
newCensusID := InnerCensusID(totalTokensBlockNumber, req.StrategyID, req.Anonymous)
// check if the census already exists
_, err = qtx.CensusByID(internalCtx, newCensusID)
if err != nil {
Expand All @@ -157,19 +156,22 @@ func (capi *census3API) createAndPublishCensus(req *CreateCensusRequest, qID str
return newCensusID, ErrCensusAlreadyExists
}
// check the censusType
censusType := census.DefaultCensusType
censusType := defaultCensusType
if req.Anonymous {
censusType = census.AnonymousCensusType
censusType = anonymousCensusType
}
// create a census tree and publish on IPFS
def := census.NewCensusDefinition(newCensusID, req.StrategyID, strategyHolders, req.Anonymous)
newCensus, err := capi.censusDB.CreateAndPublish(def)
root, uri, _, err := CreateAndPublishCensus(capi.censusDB, capi.storage, CensusOptions{
ID: newCensusID,
Type: censusType,
Holders: strategyHolders,
})
if err != nil {
return 0, ErrCantCreateCensus.WithErr(err)
}
// save the new census in the SQL database
sqlURI := &sql.NullString{}
if err := sqlURI.Scan(newCensus.URI); err != nil {
if err := sqlURI.Scan(uri); err != nil {
return 0, ErrCantCreateCensus.WithErr(err)
}
sqlCensusSize := &sql.NullInt64{}
Expand All @@ -181,10 +183,10 @@ func (capi *census3API) createAndPublishCensus(req *CreateCensusRequest, qID str
return 0, ErrCantCreateCensus.WithErr(err)
}
_, err = qtx.CreateCensus(internalCtx, queries.CreateCensusParams{
ID: newCensus.ID,
ID: newCensusID,
StrategyID: req.StrategyID,
CensusType: uint64(censusType),
MerkleRoot: newCensus.RootHash,
MerkleRoot: []byte(root),
Uri: *sqlURI,
Size: uint64(sqlCensusSize.Int64),
Weight: *sqlCensusWeight,
Expand All @@ -196,7 +198,7 @@ func (capi *census3API) createAndPublishCensus(req *CreateCensusRequest, qID str
if err := tx.Commit(); err != nil {
return 0, ErrCantCreateCensus.WithErr(err)
}
return newCensus.ID, nil
return newCensusID, nil
}

// enqueueCensus handler returns the current status of the queue item
Expand Down Expand Up @@ -250,7 +252,7 @@ func (capi *census3API) enqueueCensus(msg *api.APIdata, ctx *httprouter.HTTPCont
URI: capi.downloader.RemoteStorage.URIprefix() + currentCensus.Uri.String,
Size: currentCensus.Size,
Weight: censusWeight.String(),
Anonymous: currentCensus.CensusType == uint64(census.AnonymousCensusType),
Anonymous: currentCensus.CensusType == uint64(anonymousCensusType),
}
// remove the item from the queue
capi.queue.Dequeue(queueID)
Expand Down Expand Up @@ -300,7 +302,7 @@ func (capi *census3API) getStrategyCensuses(msg *api.APIdata, ctx *httprouter.HT
URI: capi.downloader.RemoteStorage.URIprefix() + censusInfo.Uri.String,
Size: censusInfo.Size,
Weight: censusWeight.String(),
Anonymous: censusInfo.CensusType == uint64(census.AnonymousCensusType),
Anonymous: censusInfo.CensusType == uint64(anonymousCensusType),
})
}
res, err := json.Marshal(censuses)
Expand Down
10 changes: 8 additions & 2 deletions api/const.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package api

import "time"
import (
"time"

"go.vocdoni.io/proto/build/go/models"
)

const (
// censuses
Expand All @@ -22,5 +26,7 @@ const (
)

const (
defaultPageSize = int32(10)
defaultPageSize = int32(10)
defaultCensusType = models.Census_ARBO_BLAKE2B
anonymousCensusType = models.Census_ARBO_POSEIDON
)
210 changes: 210 additions & 0 deletions api/helpers.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,27 @@
package api

import (
"context"
"database/sql"
"encoding/binary"
"errors"
"fmt"
"math"
"math/big"
"strconv"
"time"

"github.com/ethereum/go-ethereum/common"
queries "github.com/vocdoni/census3/db/sqlc"
"github.com/vocdoni/census3/lexer"
"github.com/vocdoni/census3/state"
"github.com/vocdoni/census3/strategyoperators"
"go.vocdoni.io/dvote/api/censusdb"
"go.vocdoni.io/dvote/censustree"
storagelayer "go.vocdoni.io/dvote/data"
"go.vocdoni.io/dvote/httprouter"
"go.vocdoni.io/dvote/types"
"go.vocdoni.io/proto/build/go/models"
)

// paginationFromCtx extracts from the request and returns the page size,
Expand Down Expand Up @@ -91,3 +108,196 @@ func paginationToRequest[T any](rows []T, dbPageSize int32, cursor string, goFor
}
return rows, nextCursor, prevCursor
}

// CensusOptions envolves the required parameters to create and publish a
// census merkle tree
type CensusOptions struct {
ID uint64
Type models.Census_Type
Holders map[common.Address]*big.Int
}

// CreateAndPublishCensus function creates a new census tree based on the
// options provided and publishes it to IPFS. It needs to persist it temporaly
// into a internal trees database. It returns the root of the tree, the IPFS
// URI and the tree dump.
func CreateAndPublishCensus(
lucasmenendez marked this conversation as resolved.
Show resolved Hide resolved
db *censusdb.CensusDB, storage storagelayer.Storage, opts CensusOptions,
) (types.HexBytes, string, []byte, error) {
bID := make([]byte, 8)
binary.LittleEndian.PutUint64(bID, opts.ID)
ref, err := db.New(bID, opts.Type, "", nil, censustree.DefaultMaxLevels)
if err != nil {
return nil, "", nil, err
}
// encode the holders
holdersAddresses, holdersValues := [][]byte{}, [][]byte{}
for addr, balance := range opts.Holders {
key := addr.Bytes()[:censustree.DefaultMaxKeyLen]
if opts.Type != anonymousCensusType {
if key, err = ref.Tree().Hash(addr.Bytes()); err != nil {
return nil, "", nil, err
}
}
holdersAddresses = append(holdersAddresses, key[:censustree.DefaultMaxKeyLen])
value := ref.Tree().BigIntToBytes(balance)
holdersValues = append(holdersValues, value)
}
// add the holders to the census tree
db.Lock()
if _, err := ref.Tree().AddBatch(holdersAddresses, holdersValues); err != nil {
return nil, "", nil, err
}
root, err := ref.Tree().Root()
if err != nil {
return nil, "", nil, err
}
data, err := ref.Tree().Dump()
if err != nil {
return nil, "", nil, err
}
db.Unlock()
lucasmenendez marked this conversation as resolved.
Show resolved Hide resolved
// generate the tree dump
dump, err := censusdb.BuildExportDump(root, data, opts.Type, censustree.DefaultMaxLevels)
if err != nil {
return nil, "", nil, err
}
// publish it on IPFS
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
lucasmenendez marked this conversation as resolved.
Show resolved Hide resolved
defer cancel()
uri, err := storage.Publish(ctx, dump)
if err != nil {
return nil, "", nil, err
}
db.Lock()
lucasmenendez marked this conversation as resolved.
Show resolved Hide resolved
if err := db.Del(bID); err != nil {
return nil, "", nil, err
}
db.Unlock()
lucasmenendez marked this conversation as resolved.
Show resolved Hide resolved
return root, uri, dump, nil
}

// InnerCensusID generates a unique identifier by concatenating the BlockNumber, StrategyID,
// and a numerical representation of the Anonymous flag from a CreateCensusRequest struct.
// The BlockNumber and StrategyID are concatenated as they are, and the Anonymous flag is
// represented as 1 for true and 0 for false. This concatenated string is then converted
// to a uint64 to create a unique identifier.
func InnerCensusID(blockNumber, strategyID uint64, anonymous bool) uint64 {
// Convert the boolean to a uint64: 1 for true, 0 for false
var anonymousUint uint64
if anonymous {
anonymousUint = 1
}
// Concatenate the three values as strings
concatenated := fmt.Sprintf("%d%d%d", blockNumber, strategyID, anonymousUint)
// Convert the concatenated string back to a uint64
result, err := strconv.ParseUint(concatenated, 10, 64)
if err != nil {
panic(err)
}
if result > math.MaxInt64 {
panic(err)
}
return result
}

// CalculateStrategyHolders function returns the holders of a strategy and the
// total weight of the census. It also returns the total block number of the
// census, which is the sum of the strategy block number or the last block
// number of every token chain id. To calculate the census holders, it uses the
// supplied predicate to filter the token holders using a lexer and evaluator.
// The evaluator uses the strategy operators to evaluate the predicate which
// uses the database queries to get the token holders and their balances, and
// combines them.
p4u marked this conversation as resolved.
Show resolved Hide resolved
func CalculateStrategyHolders(ctx context.Context, qdb *queries.Queries, w3p state.Web3Providers,
lucasmenendez marked this conversation as resolved.
Show resolved Hide resolved
id uint64, predicate string,
) (map[common.Address]*big.Int, *big.Int, uint64, error) {
// init some variables to get computed in the following steps
censusWeight := new(big.Int)
strategyHolders := map[common.Address]*big.Int{}
// parse the predicate
lx := lexer.NewLexer(strategyoperators.ValidOperatorsTags)
validPredicate, err := lx.Parse(predicate)
if err != nil {
return nil, nil, 0, err
}
// get strategy tokens from the database
strategyTokens, err := qdb.TokensByStrategyID(ctx, id)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil, 0, err
}
return nil, nil, 0, err
}
// any census strategy is identified by id created from the concatenation of
// the block number, the strategy id and the anonymous flag. The creation of
// censuses on specific block is not supported yet, so we need to get the
// last block of every token chain id to sum them and get the total block
// number, used to create the census id.
totalTokensBlockNumber := uint64(0)
for _, token := range strategyTokens {
w3uri, exists := w3p[token.ChainID]
if !exists {
return nil, nil, 0, err
}
w3 := state.Web3{}
lucasmenendez marked this conversation as resolved.
Show resolved Hide resolved
if err := w3.Init(ctx, w3uri.URI, common.BytesToAddress(token.ID), state.TokenType(token.TypeID)); err != nil {
return nil, nil, 0, err
}
currentBlockNumber, err := w3.LatestBlockNumber(ctx)
if err != nil {
return nil, nil, 0, err
}
totalTokensBlockNumber += currentBlockNumber
}
// if the current predicate is a literal, just query about its holders. If
// it is a complex predicate, create a evaluator and evaluate the predicate
if validPredicate.IsLiteral() {
// get the strategy holders from the database
holders, err := qdb.TokenHoldersByStrategyID(ctx, id)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil, totalTokensBlockNumber, nil
}
return nil, nil, totalTokensBlockNumber, err
}
// parse holders addresses and balances
for _, holder := range holders {
holderAddr := common.BytesToAddress(holder.HolderID)
holderBalance := new(big.Int).SetBytes(holder.Balance)
if _, exists := strategyHolders[holderAddr]; !exists {
strategyHolders[holderAddr] = holderBalance
censusWeight = new(big.Int).Add(censusWeight, holderBalance)
}
}
} else {
// parse token information
tokensInfo := map[string]*strategyoperators.TokenInformation{}
for _, token := range strategyTokens {
tokensInfo[token.Symbol] = &strategyoperators.TokenInformation{
ID: common.BytesToAddress(token.ID).String(),
ChainID: token.ChainID,
MinBalance: new(big.Int).SetBytes(token.MinBalance).String(),
Decimals: token.Decimals,
}
}
// init the operators and the predicate evaluator
operators := strategyoperators.InitOperators(qdb, tokensInfo)
eval := lexer.NewEval[*strategyoperators.StrategyIteration](operators.Map())
// execute the evaluation of the predicate
res, err := eval.EvalToken(validPredicate)
if err != nil {
return nil, nil, totalTokensBlockNumber, err
}
// parse the evaluation results
for address, value := range res.Data {
strategyHolders[common.HexToAddress(address)] = value
censusWeight = new(big.Int).Add(censusWeight, value)
}
}
// if no holders found, return an error
if len(strategyHolders) == 0 {
return nil, nil, totalTokensBlockNumber, nil
}
return strategyHolders, censusWeight, totalTokensBlockNumber, nil
}
Loading