Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Beds 922/account dashboard backend #1111

Open
wants to merge 3 commits into
base: BEDS-526/fix-internal-tx
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions backend/cmd/store/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package main

import (
"errors"
"net/http"
"os"

"github.com/gobitfly/beaconchain/pkg/commons/db2/database"
"github.com/gobitfly/beaconchain/pkg/commons/log"
"github.com/gobitfly/beaconchain/pkg/commons/utils"
)

func main() {
args := os.Args[1:]
project := args[0]
instance := args[1]
table := args[2]

bt, err := database.NewBigTable(project, instance, nil)
if err != nil {
panic(err)
}
remote := database.NewRemote(database.Wrap(bt, table))
go func() {
log.Info("starting remote raw store on port 8087")
if err := http.ListenAndServe("0.0.0.0:8087", remote.Routes()); err != nil && !errors.Is(err, http.ErrServerClosed) {
panic(err)
}
}()
utils.WaitForCtrlC()
}
96 changes: 96 additions & 0 deletions backend/internal/e2e/data_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package e2e

import (
"context"
"encoding/hex"
"fmt"
"math/big"
"testing"

"github.com/ethereum/go-ethereum/common"

"github.com/gobitfly/beaconchain/internal/th"
"github.com/gobitfly/beaconchain/pkg/commons/db2/data"
"github.com/gobitfly/beaconchain/pkg/commons/db2/database"
"github.com/gobitfly/beaconchain/pkg/commons/db2/databasetest"
"github.com/gobitfly/beaconchain/pkg/commons/indexer"
"github.com/gobitfly/beaconchain/pkg/commons/rpc"
"github.com/gobitfly/beaconchain/pkg/commons/types"
)

func TestStoreWithBackend(t *testing.T) {
clientBT, adminBT := databasetest.NewBigTable(t)
bigtable, err := database.NewBigTableWithClient(context.Background(), clientBT, adminBT, data.Schema)
if err != nil {
t.Fatal(err)
}

store := data.NewStore(database.Wrap(bigtable, data.Table))
backend := th.NewBackend(t)
_, usdt := backend.DeployToken(t, "usdt", "usdt", backend.BankAccount.From)

transform := indexer.NewTransformer(indexer.NoopCache{})
indexer := indexer.New(store, transform.Tx, transform.ERC20)

client, err := rpc.NewErigonClient(backend.Endpoint)
if err != nil {
t.Fatal(err)
}

var addresses []common.Address
for i := 0; i < 10; i++ {
temp := th.CreateEOA(t)
addresses = append(addresses, temp.From)
for j := 0; j < 25; j++ {
if err := backend.Client().SendTransaction(context.Background(), backend.MakeTx(t, backend.BankAccount, &temp.From, big.NewInt(1), nil)); err != nil {
t.Fatal(err)
}
if _, err := usdt.Mint(backend.BankAccount.TransactOpts, temp.From, big.NewInt(1)); err != nil {
t.Fatal(i, j, err)
}
backend.Commit()
lastBlock, err := backend.Client().BlockNumber(context.Background())
if err != nil {
t.Fatal(err)
}
block, _, err := client.GetBlock(int64(lastBlock), "geth")
if err != nil {
t.Fatal(err)
}
if err := indexer.IndexBlocksWithTransformers(fmt.Sprintf("%d", backend.ChainID), []*types.Eth1Block{block}); err != nil {
t.Fatal(err)
}
}
}

t.Run("get interactions", func(t *testing.T) {
efficiencies := make(map[string]int64)
interactions, _, err := store.Get(addresses, nil, 50, data.WithDatabaseStats(func(msg string, args ...any) {
var efficiency int64
var rowRange string
for i := 0; i < len(args); i = i + 2 {
if args[i].(string) == database.KeyStatEfficiency {
efficiency = args[i+1].(int64)
}
if args[i].(string) == database.KeyStatRange {
rowRange = args[i+1].(string)
}
}
efficiencies[rowRange] = efficiency
}))
if err != nil {
t.Fatal(err)
}
for _, interaction := range interactions {
t.Log(interaction.Type, interaction.ChainID, "0x"+interaction.From, "0x"+interaction.To, "0x"+hex.EncodeToString(interaction.Hash), interaction.Time)
}
if got, want := len(efficiencies), len(addresses); got != want {
t.Errorf("got %d want %d", got, want)
}
for rowRange, efficiency := range efficiencies {
if got, want := efficiency, int64(1); got != want {
t.Errorf("efficiency for %s: got %d, want %d", rowRange, got, want)
}
}
})
}
209 changes: 209 additions & 0 deletions backend/pkg/commons/db2/data/data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
package data

import (
"encoding/hex"
"errors"
"fmt"
"sort"
"strings"
"time"

"github.com/ethereum/go-ethereum/common"
"golang.org/x/exp/maps"
"google.golang.org/protobuf/proto"

"github.com/gobitfly/beaconchain/pkg/commons/db2/database"
"github.com/gobitfly/beaconchain/pkg/commons/types"
)

type Store struct {
db database.Database
}

func NewStore(store database.Database) Store {
return Store{
db: store,
}
}

func (store Store) AddItems(items map[string][]database.Item) error {
return store.db.BulkAdd(items)
}

func (store Store) AddBlockERC20Transfers(chainID string, transactions []TransferWithIndexes) error {
items, err := BlockERC20TransfersToItemsV2(chainID, transactions)
if err != nil {
return err
}
return store.db.BulkAdd(items)
}

func (store Store) AddBlockTransactions(chainID string, transactions []*types.Eth1TransactionIndexed) error {
items, err := BlockTransactionsToItemsV2(chainID, transactions)
if err != nil {
return err
}
return store.db.BulkAdd(items)
}

func (store Store) Get(addresses []common.Address, prefixes map[string]string, limit int64, opts ...Option) ([]*Interaction, map[string]string, error) {
options := apply(opts)

filter, err := newQueryFilter(options)
if err != nil {
return nil, nil, err
}
databaseOptions := []database.Option{
database.WithLimit(limit),
database.WithOpenRange(true),
}
if options.statsReporter != nil {
databaseOptions = append(databaseOptions, database.WithStats(options.statsReporter))
}
interactions, err := store.getBy(addresses, prefixes, filter, databaseOptions)
if err != nil {
return nil, nil, err
}

sort.Sort(byTimeDesc(interactions))
if int64(len(interactions)) > limit {
interactions = interactions[:limit]
}

var res []*Interaction
if prefixes == nil {
prefixes = make(map[string]string)
}
for i := 0; i < len(interactions); i++ {
prefixes[interactions[i].root] = interactions[i].key
res = append(res, interactions[i].Interaction)
}
return res, prefixes, nil
}

func (store Store) getBy(addresses []common.Address, prefixes map[string]string, condition filter, databaseOptions []database.Option) ([]*interactionWithInfo, error) {
var interactions []*interactionWithInfo
for _, address := range addresses {
root := condition.get(address)
prefix := root
if prefixes != nil && prefixes[root] != "" {
prefix = prefixes[root]
}
upper := condition.limit(root)
indexRows, err := store.db.GetRowsRange(upper, prefix, databaseOptions...)
if err != nil {
if errors.Is(err, database.ErrNotFound) {
continue
}
return nil, err
}
txKeys := make(map[string]string)
for _, row := range indexRows {
for key := range row.Values {
txKey := strings.TrimPrefix(key, fmt.Sprintf("%s:", defaultFamily))
txKeys[txKey] = row.Key
}
}
txRows, err := store.db.GetRowsWithKeys(maps.Keys(txKeys))
if err != nil {
return nil, err
}
for _, row := range txRows {
parts := strings.Split(row.Key, ":")
unMarshal := unMarshalTx
if parts[0] == "ERC20" {
unMarshal = unMarshalTransfer
}
interaction, err := unMarshal(row.Values[fmt.Sprintf("%s:%s", defaultFamily, dataColumn)])
if err != nil {
return nil, err
}
interaction.ChainID = parts[1]
interactions = append(interactions, &interactionWithInfo{
Interaction: interaction,
chainID: parts[1],
root: root,
key: txKeys[row.Key],
})
}
}
return interactions, nil
}

type interactionWithInfo struct {
*Interaction
chainID string
root string
key string
}

type byTimeDesc []*interactionWithInfo

func (c byTimeDesc) Len() int { return len(c) }
func (c byTimeDesc) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
func (c byTimeDesc) Less(i, j int) bool {
t1 := c[i].Interaction.Time
t2 := c[j].Interaction.Time
if t1.Equal(t2) {
return c[i].key < c[j].key
}
return t1.After(t2)
}

type Interaction struct {
ChainID string
Hash []byte
Method []byte
Time time.Time
Type string
Value []byte
Asset string
From string
To string
}

var erc20Transfer, _ = hex.DecodeString("a9059cbb")

func unMarshalTx(b []byte) (*Interaction, error) {
tx := &types.Eth1TransactionIndexed{}
if err := proto.Unmarshal(b, tx); err != nil {
return nil, err
}
return parseTx(tx), nil
}

func unMarshalTransfer(b []byte) (*Interaction, error) {
tx := &types.Eth1ERC20Indexed{}
if err := proto.Unmarshal(b, tx); err != nil {
return nil, err
}
return parseTransfer(tx), nil
}

func parseTransfer(transfer *types.Eth1ERC20Indexed) *Interaction {
return &Interaction{
ChainID: "",
Hash: transfer.ParentHash,
Method: erc20Transfer,
Time: transfer.Time.AsTime(),
Type: "ERC20",
Value: transfer.Value,
Asset: hex.EncodeToString(transfer.TokenAddress),
From: hex.EncodeToString(transfer.From),
To: hex.EncodeToString(transfer.To),
}
}

func parseTx(tx *types.Eth1TransactionIndexed) *Interaction {
return &Interaction{
ChainID: "",
Hash: tx.Hash,
Method: tx.MethodId,
Time: tx.Time.AsTime(),
Type: "Transaction",
Value: tx.Value,
Asset: "ETH",
From: hex.EncodeToString(tx.From),
To: hex.EncodeToString(tx.To),
}
}
Loading