Skip to content

Commit

Permalink
feat(data store): update bigtable index for better querying
Browse files Browse the repository at this point in the history
feat(data store): add with filter
  • Loading branch information
Tangui-Bitfly committed Nov 22, 2024
1 parent e119447 commit 41d0e9d
Show file tree
Hide file tree
Showing 23 changed files with 1,498 additions and 1,003 deletions.
94 changes: 94 additions & 0 deletions backend/internal/e2e/data_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package e2e

import (
"context"
"encoding/hex"
"fmt"
"math/big"
"testing"

"github.com/ethereum/go-ethereum/common"

"github.com/gobitfly/beaconchain/internal/th"
"github.com/gobitfly/beaconchain/pkg/commons/db2/data"
"github.com/gobitfly/beaconchain/pkg/commons/db2/database"
"github.com/gobitfly/beaconchain/pkg/commons/db2/databasetest"
"github.com/gobitfly/beaconchain/pkg/commons/indexer"
"github.com/gobitfly/beaconchain/pkg/commons/rpc"
"github.com/gobitfly/beaconchain/pkg/commons/types"
)

func TestStoreWithBackend(t *testing.T) {
clientBT, adminBT := databasetest.NewBigTable(t)
bigtable, err := database.NewBigTableWithClient(context.Background(), clientBT, adminBT, data.Schema)
if err != nil {
t.Fatal(err)
}

store := data.NewStore(database.Wrap(bigtable, data.Table))
backend := th.NewBackend(t)
_, usdt := backend.DeployToken(t, "usdt", "usdt", backend.BankAccount.From)

transform := indexer.NewTransformer(indexer.NoopCache{})
indexer := indexer.New(store, transform.Tx, transform.ERC20)

client, err := rpc.NewErigonClient(backend.Endpoint)
if err != nil {
t.Fatal(err)
}

var addresses []common.Address
for i := 0; i < 10; i++ {
temp := th.CreateEOA(t)
addresses = append(addresses, temp.From)
for j := 0; j < 25; j++ {
if err := backend.Client().SendTransaction(context.Background(), backend.MakeTx(t, backend.BankAccount, &temp.From, big.NewInt(1), nil)); err != nil {
t.Fatal(err)
}
if _, err := usdt.Mint(backend.BankAccount.TransactOpts, temp.From, big.NewInt(1)); err != nil {
t.Fatal(i, j, err)
}
backend.Commit()
lastBlock, err := backend.Client().BlockNumber(context.Background())
if err != nil {
t.Fatal(err)
}
block, _, err := client.GetBlock(int64(lastBlock), "geth")
if err != nil {
t.Fatal(err)
}
if err := indexer.IndexBlocksWithTransformers(fmt.Sprintf("%d", backend.ChainID), []*types.Eth1Block{block}); err != nil {
t.Fatal(err)
}
}
}

efficiencies := make(map[string]int64)
interactions, _, err := store.Get(addresses, nil, 25, data.WithDatabaseStats(func(msg string, args ...any) {
var efficiency int64
var rowRange string
for i := 0; i < len(args); i = i + 2 {
if args[i].(string) == database.KeyStatEfficiency {
efficiency = args[i+1].(int64)
}
if args[i].(string) == database.KeyStatRange {
rowRange = args[i+1].(string)
}
}
efficiencies[rowRange] = efficiency
}))
if err != nil {
t.Fatal(err)
}
for _, interaction := range interactions {
t.Log(interaction.Type, interaction.ChainID, "0x"+interaction.From, "0x"+interaction.To, "0x"+hex.EncodeToString(interaction.Hash), interaction.Time)
}
if got, want := len(efficiencies), len(addresses); got != want {
t.Errorf("got %d want %d", got, want)
}
for rowRange, efficiency := range efficiencies {
if got, want := efficiency, int64(1); got != want {
t.Errorf("efficiency for %s: got %d, want %d", rowRange, got, want)
}
}
}
185 changes: 58 additions & 127 deletions backend/pkg/commons/db2/data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,172 +26,105 @@ func NewStore(store database.Database) Store {
}
}

type TransferWithIndexes struct {
Indexed *types.Eth1ERC20Indexed
TxIndex int
LogIndex int
}

func (store Store) BlockERC20TransfersToItems(chainID string, transfers []TransferWithIndexes) (map[string][]database.Item, error) {
items := make(map[string][]database.Item)
for _, transfer := range transfers {
b, err := proto.Marshal(transfer.Indexed)
if err != nil {
return nil, err
}
key := keyERC20(chainID, transfer.Indexed.ParentHash, transfer.LogIndex)
item := []database.Item{{Family: defaultFamily, Column: key}}
items[key] = []database.Item{{Family: defaultFamily, Column: dataColumn, Data: b}}

items[keyERC20Time(chainID, transfer.Indexed, transfer.Indexed.From, transfer.TxIndex, transfer.LogIndex)] = item
items[keyERC20Time(chainID, transfer.Indexed, transfer.Indexed.To, transfer.TxIndex, transfer.LogIndex)] = item

items[keyERC20ContractAllTime(chainID, transfer.Indexed, transfer.TxIndex, transfer.LogIndex)] = item
items[keyERC20ContractTime(chainID, transfer.Indexed, transfer.Indexed.From, transfer.TxIndex, transfer.LogIndex)] = item
items[keyERC20ContractTime(chainID, transfer.Indexed, transfer.Indexed.To, transfer.TxIndex, transfer.LogIndex)] = item

items[keyERC20To(chainID, transfer.Indexed, transfer.TxIndex, transfer.LogIndex)] = item
items[keyERC20From(chainID, transfer.Indexed, transfer.TxIndex, transfer.LogIndex)] = item
items[keyERC20Sent(chainID, transfer.Indexed, transfer.TxIndex, transfer.LogIndex)] = item
items[keyERC20Received(chainID, transfer.Indexed, transfer.TxIndex, transfer.LogIndex)] = item
}
return items, nil
func (store Store) AddItems(items map[string][]database.Item) error {
return store.db.BulkAdd(items)
}

func (store Store) AddBlockERC20Transfers(chainID string, transactions []TransferWithIndexes) error {
items, err := store.BlockERC20TransfersToItems(chainID, transactions)
items, err := BlockERC20TransfersToItemsV2(chainID, transactions)
if err != nil {
return err
}
return store.db.BulkAdd(items)
}

func (store Store) BlockTransactionsToItems(chainID string, transactions []*types.Eth1TransactionIndexed) (map[string][]database.Item, error) {
items := make(map[string][]database.Item)
for i, transaction := range transactions {
b, err := proto.Marshal(transaction)
if err != nil {
return nil, err
}
key := keyTx(chainID, transaction.GetHash())
item := []database.Item{{Family: defaultFamily, Column: key}}
items[key] = []database.Item{{Family: defaultFamily, Column: dataColumn, Data: b}}
items[keyTxSent(chainID, transaction, i)] = item
items[keyTxReceived(chainID, transaction, i)] = item

items[keyTxTime(chainID, transaction, transaction.To, i)] = item
items[keyTxBlock(chainID, transaction, transaction.To, i)] = item
items[keyTxMethod(chainID, transaction, transaction.To, i)] = item

items[keyTxTime(chainID, transaction, transaction.From, i)] = item
items[keyTxBlock(chainID, transaction, transaction.From, i)] = item
items[keyTxMethod(chainID, transaction, transaction.From, i)] = item

if transaction.ErrorMsg != "" {
items[keyTxError(chainID, transaction, transaction.To, i)] = item
items[keyTxError(chainID, transaction, transaction.From, i)] = item
}

if transaction.IsContractCreation {
items[keyTxContractCreation(chainID, transaction, transaction.To, i)] = item
items[keyTxContractCreation(chainID, transaction, transaction.From, i)] = item
}
}
return items, nil
}

func (store Store) AddBlockTransactions(chainID string, transactions []*types.Eth1TransactionIndexed) error {
items, err := store.BlockTransactionsToItems(chainID, transactions)
items, err := BlockTransactionsToItemsV2(chainID, transactions)
if err != nil {
return err
}
return store.db.BulkAdd(items)
}

func (store Store) Get(chainIDs []string, addresses []common.Address, prefixes map[string]map[string]string, limit int64, opts ...Option) ([]*Interaction, map[string]map[string]string, error) {
sources := map[formatType]unMarshalInteraction{
typeTx: unMarshalTx,
typeTransfer: unMarshalTransfer,
}
func (store Store) Get(addresses []common.Address, prefixes map[string]string, limit int64, opts ...Option) ([]*Interaction, map[string]string, error) {
options := apply(opts)
if options.ignoreTxs {
delete(sources, typeTx)

filter, err := newQueryFilter(options)
if err != nil {
return nil, nil, err
}
if options.ignoreTransfers {
delete(sources, typeTransfer)
databaseOptions := []database.Option{
database.WithLimit(limit),
database.WithOpenRange(true),
}
var interactions []*interactionWithInfo
for interactionType, unMarshalFunc := range sources {
filter, err := makeFilters(options, interactionType)
if err != nil {
return nil, nil, err
}
temp, err := store.getBy(unMarshalFunc, chainIDs, addresses, prefixes, limit, filter)
if err != nil {
return nil, nil, err
}
interactions = append(interactions, temp...)
if options.statsReporter != nil {
databaseOptions = append(databaseOptions, database.WithStats(options.statsReporter))
}
interactions, err := store.getBy(addresses, prefixes, filter, databaseOptions)
if err != nil {
return nil, nil, err
}

sort.Sort(byTimeDesc(interactions))
if int64(len(interactions)) > limit {
interactions = interactions[:limit]
}

var res []*Interaction
if prefixes == nil {
prefixes = make(map[string]map[string]string)
prefixes = make(map[string]string)
}
for i := 0; i < len(interactions); i++ {
if prefixes[interactions[i].chainID] == nil {
prefixes[interactions[i].chainID] = make(map[string]string)
}
prefixes[interactions[i].chainID][interactions[i].root] = interactions[i].key
prefixes[interactions[i].root] = interactions[i].key
res = append(res, interactions[i].Interaction)
}
return res, prefixes, nil
}

func (store Store) getBy(unMarshal unMarshalInteraction, chainIDs []string, addresses []common.Address, prefixes map[string]map[string]string, limit int64, condition filter) ([]*interactionWithInfo, error) {
func (store Store) getBy(addresses []common.Address, prefixes map[string]string, condition filter, databaseOptions []database.Option) ([]*interactionWithInfo, error) {
var interactions []*interactionWithInfo
for _, chainID := range chainIDs {
for _, address := range addresses {
root := condition.get(chainID, address)
prefix := root
if prefixes != nil && prefixes[chainID] != nil && prefixes[chainID][root] != "" {
prefix = prefixes[chainID][root]
for _, address := range addresses {
root := condition.get(address)
prefix := root
if prefixes != nil && prefixes[root] != "" {
prefix = prefixes[root]
}
upper := condition.limit(root)
indexRows, err := store.db.GetRowsRange(upper, prefix, databaseOptions...)
if err != nil {
if errors.Is(err, database.ErrNotFound) {
continue
}
upper := condition.limit(root)
indexRows, err := store.db.GetRowsRange(upper, prefix, database.WithLimit(limit), database.WithOpenRange(true))
if err != nil {
if errors.Is(err, database.ErrNotFound) {
continue
}
return nil, err
return nil, err
}
txKeys := make(map[string]string)
for _, row := range indexRows {
for key := range row.Values {
txKey := strings.TrimPrefix(key, fmt.Sprintf("%s:", defaultFamily))
txKeys[txKey] = row.Key
}
txKeys := make(map[string]string)
for _, row := range indexRows {
for key := range row.Values {
txKey := strings.TrimPrefix(key, fmt.Sprintf("%s:", defaultFamily))
txKeys[txKey] = row.Key
}
}
txRows, err := store.db.GetRowsWithKeys(maps.Keys(txKeys))
if err != nil {
return nil, err
}
for _, row := range txRows {
parts := strings.Split(row.Key, ":")
unMarshal := unMarshalTx
if parts[0] == "ERC20" {
unMarshal = unMarshalTransfer
}
txRows, err := store.db.GetRowsWithKeys(maps.Keys(txKeys))
interaction, err := unMarshal(row.Values[fmt.Sprintf("%s:%s", defaultFamily, dataColumn)])
if err != nil {
return nil, err
}
for _, row := range txRows {
interaction, err := unMarshal(row.Values[fmt.Sprintf("%s:%s", defaultFamily, dataColumn)])
if err != nil {
return nil, err
}
interaction.ChainID = chainID
interactions = append(interactions, &interactionWithInfo{
Interaction: interaction,
chainID: chainID,
root: root,
key: txKeys[row.Key],
})
}
interaction.ChainID = parts[1]
interactions = append(interactions, &interactionWithInfo{
Interaction: interaction,
chainID: parts[1],
root: root,
key: txKeys[row.Key],
})
}
}
return interactions, nil
Expand Down Expand Up @@ -231,8 +164,6 @@ type Interaction struct {

var erc20Transfer, _ = hex.DecodeString("a9059cbb")

type unMarshalInteraction func(b []byte) (*Interaction, error)

func unMarshalTx(b []byte) (*Interaction, error) {
tx := &types.Eth1TransactionIndexed{}
if err := proto.Unmarshal(b, tx); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions backend/pkg/commons/db2/data/data_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestStoreExternal(t *testing.T) {
chainIDs: chainIDs,
addresses: addresses,
opts: []data.Option{
data.IgnoreTransfers(),
data.OnlyTransactions(),
data.ByMethod("a9059cbb"),
},
},
Expand All @@ -87,15 +87,15 @@ func TestStoreExternal(t *testing.T) {
chainIDs: chainIDs,
addresses: addresses,
opts: []data.Option{
data.IgnoreTransactions(),
data.OnlyTransactions(),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var lastPrefixes map[string]map[string]string
var lastPrefixes map[string]string
for i := 0; i < tt.scroll+1; i++ {
interactions, prefixes, err := store.Get(chainIDs, addresses, lastPrefixes, tt.limit, tt.opts...)
interactions, prefixes, err := store.Get(addresses, lastPrefixes, tt.limit, tt.opts...)
if err != nil {
t.Fatal(err)
}
Expand Down
Loading

0 comments on commit 41d0e9d

Please sign in to comment.